Skip to content

Commit

Permalink
Merge commit 'aa0a60a122fd8cd13aefb664ac4282fee7518e40' into jt-psgo-…
Browse files Browse the repository at this point in the history
…911-stream-token-encryption
  • Loading branch information
jachym-tousek-keboola committed Dec 20, 2024
2 parents b62ae72 + aa0a60a commit f2b4cfd
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/release-service-stream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ env:

jobs:
test-lint:
if: startsWith(github.ref, 'refs/tags/production-stream-')
name: "Lint"
secrets: inherit
uses: ./.github/workflows/test-lint.yml
test-unit:
if: startsWith(github.ref, 'refs/tags/production-stream-')
name: "Unit Tests"
secrets: inherit
uses: ./.github/workflows/test-unit.yml
with:
without-cache: true
package-exception-regex: "./internal/pkg/service/appsproxy|./internal/pkg/service/templates|./internal/pkg/service/cli"
test-e2e-service-stream:
if: startsWith(github.ref, 'refs/tags/production-stream-')
name: "E2E: Stream"
secrets: inherit
uses: ./.github/workflows/test-e2e-service-stream.yml
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ replace github.com/oauth2-proxy/mockoidc => github.com/keboola/go-mockoidc v0.0.

require (
ariga.io/atlas v0.29.0
cloud.google.com/go/kms v1.20.0
entgo.io/ent v0.14.1
github.com/ActiveState/vt10x v1.3.1
github.com/AlecAivazis/survey/v2 v2.3.7
Expand All @@ -25,7 +26,6 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/coder/websocket v1.8.12
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/dgraph-io/ristretto/v2 v2.0.0
github.com/fatih/color v1.18.0
github.com/go-playground/locales v0.14.1
github.com/go-playground/universal-translator v0.18.1
Expand Down Expand Up @@ -54,6 +54,7 @@ require (
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/oauth2-proxy/mockoidc v0.0.0-20240214162133-caebfff84d25
github.com/oauth2-proxy/oauth2-proxy/v7 v7.7.1
github.com/pkg/errors v0.9.1
github.com/pquerna/cachecontrol v0.2.0
github.com/prometheus/client_golang v1.20.5
github.com/qiangxue/fasthttp-routing v0.0.0-20160225050629-6ccdc2a18d87
Expand Down Expand Up @@ -154,7 +155,6 @@ require (
cloud.google.com/go/auth/oauth2adapt v0.2.5 // indirect
cloud.google.com/go/compute/metadata v0.5.2 // indirect
cloud.google.com/go/iam v1.2.1 // indirect
cloud.google.com/go/kms v1.20.0 // indirect
cloud.google.com/go/longrunning v0.6.1 // indirect
cloud.google.com/go/monitoring v1.21.1 // indirect
cloud.google.com/go/storage v1.46.0 // indirect
Expand Down Expand Up @@ -221,6 +221,7 @@ require (
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/creack/pty v1.1.18 // indirect
github.com/dgraph-io/ristretto/v2 v2.0.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/disintegration/imaging v1.6.2 // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
Expand Down Expand Up @@ -285,7 +286,6 @@ require (
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/common/dependencies/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func newEncryptionScope(ctx context.Context, cfg encryption.Config, d encryption
ctx, span := d.Telemetry().Tracer().Start(ctx, "keboola.go.common.dependencies.NewEncryptionScope")
defer span.End(&err)

encryptor, err := encryption.NewEncryptor(ctx, cfg)
encryptor, err := encryption.NewEncryptor(ctx, cfg, d.Logger())
if err != nil {
return nil, err
}
Expand Down
59 changes: 59 additions & 0 deletions internal/pkg/service/stream/encryption/encode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package encryption

import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/gob"

"github.com/pkg/errors"
)

func Encode(data any) ([]byte, error) {
var buffer bytes.Buffer

// Base64 encode
encoder := base64.NewEncoder(base64.StdEncoding, &buffer)

// Gzip compress
writer := gzip.NewWriter(encoder)

// gob encode
err := gob.NewEncoder(writer).Encode(data)
if err != nil {
return nil, errors.Wrapf(err, "gob encoder failed: %s", err.Error())
}

err = writer.Close()
if err != nil {
return nil, errors.Wrapf(err, "can't close gzip writer: %s", err.Error())
}

err = encoder.Close()
if err != nil {
return nil, errors.Wrapf(err, "base64 encoder failed: %s", err.Error())
}

return buffer.Bytes(), nil
}

func Decode[T any](data []byte) (decoded T, err error) {
// Base64 decode
decoder := base64.NewDecoder(base64.StdEncoding, bytes.NewReader(data))

// Gzip uncompress
reader, err := gzip.NewReader(decoder)
if err != nil {
return decoded, errors.Wrapf(err, "can't create gzip reader: %s", err.Error())
}

defer reader.Close()

// gob decode
err = gob.NewDecoder(reader).Decode(&decoded)
if err != nil {
return decoded, errors.Wrapf(err, "gob decoder failed: %s", err.Error())
}

return decoded, nil
}
11 changes: 9 additions & 2 deletions internal/pkg/service/stream/encryption/encryption.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

"github.com/keboola/go-cloud-encrypt/pkg/cloudencrypt"

"github.com/keboola/keboola-as-code/internal/pkg/log"
)

const (
Expand All @@ -16,7 +18,7 @@ const (

type Provider string

func NewEncryptor(ctx context.Context, config Config) (cloudencrypt.Encryptor, error) {
func NewEncryptor(ctx context.Context, config Config, logger log.Logger) (cloudencrypt.Encryptor, error) {
var encryptor cloudencrypt.Encryptor
var err error

Expand All @@ -31,7 +33,7 @@ func NewEncryptor(ctx context.Context, config Config) (cloudencrypt.Encryptor, e

return encryptor, nil
case ProviderGCP:
encryptor, err = cloudencrypt.NewGCPEncryptor(ctx, config.GCP.KMSKeyID)
encryptor, err = NewGCPEncryptor(ctx, config.GCP.KMSKeyID, logger)
if err != nil {
return nil, err
}
Expand All @@ -47,6 +49,11 @@ func NewEncryptor(ctx context.Context, config Config) (cloudencrypt.Encryptor, e
}
}

encryptor, err = NewLoggedEncryptor(ctx, encryptor, logger)
if err != nil {
return nil, err
}

encryptor, err = cloudencrypt.NewDualEncryptor(ctx, encryptor)
if err != nil {
return nil, err
Expand Down
98 changes: 98 additions & 0 deletions internal/pkg/service/stream/encryption/gcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package encryption

import (
"context"
"hash/crc32"

kms "cloud.google.com/go/kms/apiv1"
"cloud.google.com/go/kms/apiv1/kmspb"
"github.com/keboola/go-cloud-encrypt/pkg/cloudencrypt"
"github.com/pkg/errors"

"github.com/keboola/keboola-as-code/internal/pkg/log"
)

// GCPEncryptor Implements Encryptor using Google Cloud's Key Management Service.
type GCPEncryptor struct {
client *kms.KeyManagementClient
keyID string
logger log.Logger
}

func NewGCPEncryptor(ctx context.Context, keyID string, logger log.Logger) (*GCPEncryptor, error) {
client, err := kms.NewKeyManagementClient(ctx)
if err != nil {
return nil, errors.Wrapf(err, "can't create gpc kms client: %s", err.Error())
}

return &GCPEncryptor{
client: client,
keyID: keyID,
logger: logger,
}, nil
}

func (encryptor *GCPEncryptor) Encrypt(ctx context.Context, plaintext []byte, metadata cloudencrypt.Metadata) ([]byte, error) {
additionalData, err := Encode(metadata)
if err != nil {
return nil, err
}

table := crc32.MakeTable(crc32.IEEE)

encryptor.logger.Infof(ctx, "encryption key: %08x, %s", crc32.Checksum([]byte(encryptor.keyID), table), encryptor.keyID)
encryptor.logger.Infof(ctx, "encryption metadata: %08x, %s", crc32.Checksum(additionalData, table), string(additionalData))
encryptor.logger.Infof(ctx, "encryption plaintext: %08x, %s", crc32.Checksum(plaintext, table), string(plaintext))

request := &kmspb.EncryptRequest{
Name: encryptor.keyID,
Plaintext: plaintext,
AdditionalAuthenticatedData: additionalData,
}

response, err := encryptor.client.Encrypt(ctx, request)
if err != nil {
return nil, errors.Wrapf(err, "gcp encryption failed: %s", err.Error())
}

encryptor.logger.Infof(ctx, "encryption ciphertext: %08x, %s", crc32.Checksum(response.GetCiphertext(), table), string(response.GetCiphertext()))

return response.GetCiphertext(), nil
}

func (encryptor *GCPEncryptor) Decrypt(ctx context.Context, ciphertext []byte, metadata cloudencrypt.Metadata) ([]byte, error) {
additionalData, err := Encode(metadata)
if err != nil {
return nil, err
}

table := crc32.MakeTable(crc32.IEEE)

encryptor.logger.Infof(ctx, "decryption key: %08x, %s", crc32.Checksum([]byte(encryptor.keyID), table), encryptor.keyID)
encryptor.logger.Infof(ctx, "decryption metadata: %08x, %s", crc32.Checksum(additionalData, table), string(additionalData))
encryptor.logger.Infof(ctx, "decryption ciphertext: %08x, %s", crc32.Checksum(ciphertext, table), string(ciphertext))

request := &kmspb.DecryptRequest{
Name: encryptor.keyID,
Ciphertext: ciphertext,
AdditionalAuthenticatedData: additionalData,
}

response, err := encryptor.client.Decrypt(ctx, request)
if err != nil {
return nil, errors.Wrapf(err, "gcp decryption failed: %s", err.Error())
}

encryptor.logger.Infof(ctx, "decryption plaintext: %08x, %s", crc32.Checksum(response.GetPlaintext(), table), string(response.GetPlaintext()))

return response.GetPlaintext(), nil
}

func (encryptor *GCPEncryptor) Close() error {
err := encryptor.client.Close()
if err != nil {
return errors.Wrapf(err, "can't close gcp client: %s", err.Error())
}

return nil
}
77 changes: 77 additions & 0 deletions internal/pkg/service/stream/encryption/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package encryption

import (
"context"

"github.com/keboola/go-cloud-encrypt/pkg/cloudencrypt"

"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

// LoggedEncryptor wraps another Encryptor and adds logging.
type LoggedEncryptor struct {
encryptor cloudencrypt.Encryptor
logger log.Logger
}

func NewLoggedEncryptor(ctx context.Context, encryptor cloudencrypt.Encryptor, logger log.Logger) (*LoggedEncryptor, error) {
return &LoggedEncryptor{
encryptor: encryptor,
logger: logger,
}, nil
}

func (encryptor *LoggedEncryptor) Encrypt(ctx context.Context, plaintext []byte, metadata cloudencrypt.Metadata) ([]byte, error) {
meta := ""
for k, v := range metadata {
meta += k + ": " + v + ", "
}

encryptor.logger.Infof(ctx, "encryption metadata: "+meta)

if len(plaintext) == 0 {
err := errors.New("text should not be empty")
encryptor.logger.Infof(ctx, "encryption error: %s", err.Error())
return nil, err
}

encryptedValue, err := encryptor.encryptor.Encrypt(ctx, plaintext, metadata)
if err != nil {
encryptor.logger.Infof(ctx, "encryption error: %s", err.Error())
return nil, err
}

encryptor.logger.Info(ctx, "encryption success")

return encryptedValue, nil
}

func (encryptor *LoggedEncryptor) Decrypt(ctx context.Context, ciphertext []byte, metadata cloudencrypt.Metadata) ([]byte, error) {
meta := ""
for k, v := range metadata {
meta += k + ": " + v + ", "
}

encryptor.logger.Infof(ctx, "decryption metadata: "+meta)

if len(ciphertext) == 0 {
err := errors.New("text should not be empty")
encryptor.logger.Infof(ctx, "decryption error: %s", err.Error())
return nil, err
}

plaintext, err := encryptor.encryptor.Decrypt(ctx, ciphertext, metadata)
if err != nil {
encryptor.logger.Infof(ctx, "decryption error: %s", err.Error())
return nil, err
}

encryptor.logger.Info(ctx, "decryption success")

return plaintext, nil
}

func (encryptor *LoggedEncryptor) Close() error {
return encryptor.encryptor.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package bridge
import (
"context"
"sync"
"time"

"github.com/dgraph-io/ristretto/v2"
"github.com/keboola/go-client/pkg/keboola"
"github.com/keboola/go-cloud-encrypt/pkg/cloudencrypt"
etcd "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -83,18 +81,6 @@ func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) (*B

encryptor := d.Encryptor()
if encryptor != nil {
cache, err := ristretto.NewCache(
&ristretto.Config[[]byte, []byte]{
NumCounters: 1e6,
MaxCost: 1 << 20,
BufferItems: 64,
},
)
if err != nil {
return nil, err
}

encryptor = cloudencrypt.NewCachedEncryptor(encryptor, time.Hour, cache)
tokenEncryptor = cloudencrypt.NewGenericEncryptor[keboola.Token](encryptor)
credentialsEncryptor = cloudencrypt.NewGenericEncryptor[keboola.FileUploadCredentials](encryptor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ func (b *Bridge) importFile(ctx context.Context, file plugin.File, stats statist
// Prepare encryption metadata
metadata := cloudencrypt.Metadata{"sink": file.SinkKey.String()}

b.logger.Infof(ctx, "decryption token: %s, %s", file.SinkKey.String(), string(existingToken.EncryptedToken))

// Decrypt token
token, err := existingToken.DecryptToken(ctx, b.tokenEncryptor, metadata)
if err != nil {
Expand Down
Loading

0 comments on commit f2b4cfd

Please sign in to comment.