diff --git a/.github/workflows/release-service-stream.yml b/.github/workflows/release-service-stream.yml index 0e297df9ac..3e5ee725c3 100644 --- a/.github/workflows/release-service-stream.yml +++ b/.github/workflows/release-service-stream.yml @@ -27,10 +27,12 @@ env: jobs: test-lint: + if: startsWith(github.ref, 'refs/tags/production-stream-') name: "Lint" secrets: inherit uses: ./.github/workflows/test-lint.yml test-unit: + if: startsWith(github.ref, 'refs/tags/production-stream-') name: "Unit Tests" secrets: inherit uses: ./.github/workflows/test-unit.yml @@ -38,6 +40,7 @@ jobs: without-cache: true package-exception-regex: "./internal/pkg/service/appsproxy|./internal/pkg/service/templates|./internal/pkg/service/cli" test-e2e-service-stream: + if: startsWith(github.ref, 'refs/tags/production-stream-') name: "E2E: Stream" secrets: inherit uses: ./.github/workflows/test-e2e-service-stream.yml diff --git a/go.mod b/go.mod index f35e9a38ac..fd4c1741e5 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ replace github.com/oauth2-proxy/mockoidc => github.com/keboola/go-mockoidc v0.0. require ( ariga.io/atlas v0.29.0 + cloud.google.com/go/kms v1.20.0 entgo.io/ent v0.14.1 github.com/ActiveState/vt10x v1.3.1 github.com/AlecAivazis/survey/v2 v2.3.7 @@ -25,7 +26,6 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 github.com/coder/websocket v1.8.12 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc - github.com/dgraph-io/ristretto/v2 v2.0.0 github.com/fatih/color v1.18.0 github.com/go-playground/locales v0.14.1 github.com/go-playground/universal-translator v0.18.1 @@ -54,6 +54,7 @@ require ( github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/oauth2-proxy/mockoidc v0.0.0-20240214162133-caebfff84d25 github.com/oauth2-proxy/oauth2-proxy/v7 v7.7.1 + github.com/pkg/errors v0.9.1 github.com/pquerna/cachecontrol v0.2.0 github.com/prometheus/client_golang v1.20.5 github.com/qiangxue/fasthttp-routing v0.0.0-20160225050629-6ccdc2a18d87 @@ -154,7 +155,6 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.5 // indirect cloud.google.com/go/compute/metadata v0.5.2 // indirect cloud.google.com/go/iam v1.2.1 // indirect - cloud.google.com/go/kms v1.20.0 // indirect cloud.google.com/go/longrunning v0.6.1 // indirect cloud.google.com/go/monitoring v1.21.1 // indirect cloud.google.com/go/storage v1.46.0 // indirect @@ -221,6 +221,7 @@ require ( github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/creack/pty v1.1.18 // indirect + github.com/dgraph-io/ristretto/v2 v2.0.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/disintegration/imaging v1.6.2 // indirect github.com/dlclark/regexp2 v1.7.0 // indirect @@ -285,7 +286,6 @@ require ( github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect diff --git a/internal/pkg/service/common/dependencies/encryption.go b/internal/pkg/service/common/dependencies/encryption.go index ba13eede6d..f1a5acf755 100644 --- a/internal/pkg/service/common/dependencies/encryption.go +++ b/internal/pkg/service/common/dependencies/encryption.go @@ -27,7 +27,7 @@ func newEncryptionScope(ctx context.Context, cfg encryption.Config, d encryption ctx, span := d.Telemetry().Tracer().Start(ctx, "keboola.go.common.dependencies.NewEncryptionScope") defer span.End(&err) - encryptor, err := encryption.NewEncryptor(ctx, cfg) + encryptor, err := encryption.NewEncryptor(ctx, cfg, d.Logger()) if err != nil { return nil, err } diff --git a/internal/pkg/service/stream/encryption/encode.go b/internal/pkg/service/stream/encryption/encode.go new file mode 100644 index 0000000000..0fb7b704fb --- /dev/null +++ b/internal/pkg/service/stream/encryption/encode.go @@ -0,0 +1,59 @@ +package encryption + +import ( + "bytes" + "compress/gzip" + "encoding/base64" + "encoding/gob" + + "github.com/pkg/errors" +) + +func Encode(data any) ([]byte, error) { + var buffer bytes.Buffer + + // Base64 encode + encoder := base64.NewEncoder(base64.StdEncoding, &buffer) + + // Gzip compress + writer := gzip.NewWriter(encoder) + + // gob encode + err := gob.NewEncoder(writer).Encode(data) + if err != nil { + return nil, errors.Wrapf(err, "gob encoder failed: %s", err.Error()) + } + + err = writer.Close() + if err != nil { + return nil, errors.Wrapf(err, "can't close gzip writer: %s", err.Error()) + } + + err = encoder.Close() + if err != nil { + return nil, errors.Wrapf(err, "base64 encoder failed: %s", err.Error()) + } + + return buffer.Bytes(), nil +} + +func Decode[T any](data []byte) (decoded T, err error) { + // Base64 decode + decoder := base64.NewDecoder(base64.StdEncoding, bytes.NewReader(data)) + + // Gzip uncompress + reader, err := gzip.NewReader(decoder) + if err != nil { + return decoded, errors.Wrapf(err, "can't create gzip reader: %s", err.Error()) + } + + defer reader.Close() + + // gob decode + err = gob.NewDecoder(reader).Decode(&decoded) + if err != nil { + return decoded, errors.Wrapf(err, "gob decoder failed: %s", err.Error()) + } + + return decoded, nil +} diff --git a/internal/pkg/service/stream/encryption/encryption.go b/internal/pkg/service/stream/encryption/encryption.go index c738d0e9e7..01d14955e0 100644 --- a/internal/pkg/service/stream/encryption/encryption.go +++ b/internal/pkg/service/stream/encryption/encryption.go @@ -4,6 +4,8 @@ import ( "context" "github.com/keboola/go-cloud-encrypt/pkg/cloudencrypt" + + "github.com/keboola/keboola-as-code/internal/pkg/log" ) const ( @@ -16,7 +18,7 @@ const ( type Provider string -func NewEncryptor(ctx context.Context, config Config) (cloudencrypt.Encryptor, error) { +func NewEncryptor(ctx context.Context, config Config, logger log.Logger) (cloudencrypt.Encryptor, error) { var encryptor cloudencrypt.Encryptor var err error @@ -31,7 +33,7 @@ func NewEncryptor(ctx context.Context, config Config) (cloudencrypt.Encryptor, e return encryptor, nil case ProviderGCP: - encryptor, err = cloudencrypt.NewGCPEncryptor(ctx, config.GCP.KMSKeyID) + encryptor, err = NewGCPEncryptor(ctx, config.GCP.KMSKeyID, logger) if err != nil { return nil, err } @@ -47,6 +49,11 @@ func NewEncryptor(ctx context.Context, config Config) (cloudencrypt.Encryptor, e } } + encryptor, err = NewLoggedEncryptor(ctx, encryptor, logger) + if err != nil { + return nil, err + } + encryptor, err = cloudencrypt.NewDualEncryptor(ctx, encryptor) if err != nil { return nil, err diff --git a/internal/pkg/service/stream/encryption/gcp.go b/internal/pkg/service/stream/encryption/gcp.go new file mode 100644 index 0000000000..967c371231 --- /dev/null +++ b/internal/pkg/service/stream/encryption/gcp.go @@ -0,0 +1,98 @@ +package encryption + +import ( + "context" + "hash/crc32" + + kms "cloud.google.com/go/kms/apiv1" + "cloud.google.com/go/kms/apiv1/kmspb" + "github.com/keboola/go-cloud-encrypt/pkg/cloudencrypt" + "github.com/pkg/errors" + + "github.com/keboola/keboola-as-code/internal/pkg/log" +) + +// GCPEncryptor Implements Encryptor using Google Cloud's Key Management Service. +type GCPEncryptor struct { + client *kms.KeyManagementClient + keyID string + logger log.Logger +} + +func NewGCPEncryptor(ctx context.Context, keyID string, logger log.Logger) (*GCPEncryptor, error) { + client, err := kms.NewKeyManagementClient(ctx) + if err != nil { + return nil, errors.Wrapf(err, "can't create gpc kms client: %s", err.Error()) + } + + return &GCPEncryptor{ + client: client, + keyID: keyID, + logger: logger, + }, nil +} + +func (encryptor *GCPEncryptor) Encrypt(ctx context.Context, plaintext []byte, metadata cloudencrypt.Metadata) ([]byte, error) { + additionalData, err := Encode(metadata) + if err != nil { + return nil, err + } + + table := crc32.MakeTable(crc32.IEEE) + + encryptor.logger.Infof(ctx, "encryption key: %08x, %s", crc32.Checksum([]byte(encryptor.keyID), table), encryptor.keyID) + encryptor.logger.Infof(ctx, "encryption metadata: %08x, %s", crc32.Checksum(additionalData, table), string(additionalData)) + encryptor.logger.Infof(ctx, "encryption plaintext: %08x, %s", crc32.Checksum(plaintext, table), string(plaintext)) + + request := &kmspb.EncryptRequest{ + Name: encryptor.keyID, + Plaintext: plaintext, + AdditionalAuthenticatedData: additionalData, + } + + response, err := encryptor.client.Encrypt(ctx, request) + if err != nil { + return nil, errors.Wrapf(err, "gcp encryption failed: %s", err.Error()) + } + + encryptor.logger.Infof(ctx, "encryption ciphertext: %08x, %s", crc32.Checksum(response.GetCiphertext(), table), string(response.GetCiphertext())) + + return response.GetCiphertext(), nil +} + +func (encryptor *GCPEncryptor) Decrypt(ctx context.Context, ciphertext []byte, metadata cloudencrypt.Metadata) ([]byte, error) { + additionalData, err := Encode(metadata) + if err != nil { + return nil, err + } + + table := crc32.MakeTable(crc32.IEEE) + + encryptor.logger.Infof(ctx, "decryption key: %08x, %s", crc32.Checksum([]byte(encryptor.keyID), table), encryptor.keyID) + encryptor.logger.Infof(ctx, "decryption metadata: %08x, %s", crc32.Checksum(additionalData, table), string(additionalData)) + encryptor.logger.Infof(ctx, "decryption ciphertext: %08x, %s", crc32.Checksum(ciphertext, table), string(ciphertext)) + + request := &kmspb.DecryptRequest{ + Name: encryptor.keyID, + Ciphertext: ciphertext, + AdditionalAuthenticatedData: additionalData, + } + + response, err := encryptor.client.Decrypt(ctx, request) + if err != nil { + return nil, errors.Wrapf(err, "gcp decryption failed: %s", err.Error()) + } + + encryptor.logger.Infof(ctx, "decryption plaintext: %08x, %s", crc32.Checksum(response.GetPlaintext(), table), string(response.GetPlaintext())) + + return response.GetPlaintext(), nil +} + +func (encryptor *GCPEncryptor) Close() error { + err := encryptor.client.Close() + if err != nil { + return errors.Wrapf(err, "can't close gcp client: %s", err.Error()) + } + + return nil +} diff --git a/internal/pkg/service/stream/encryption/log.go b/internal/pkg/service/stream/encryption/log.go new file mode 100644 index 0000000000..232c0f860a --- /dev/null +++ b/internal/pkg/service/stream/encryption/log.go @@ -0,0 +1,77 @@ +package encryption + +import ( + "context" + + "github.com/keboola/go-cloud-encrypt/pkg/cloudencrypt" + + "github.com/keboola/keboola-as-code/internal/pkg/log" + "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" +) + +// LoggedEncryptor wraps another Encryptor and adds logging. +type LoggedEncryptor struct { + encryptor cloudencrypt.Encryptor + logger log.Logger +} + +func NewLoggedEncryptor(ctx context.Context, encryptor cloudencrypt.Encryptor, logger log.Logger) (*LoggedEncryptor, error) { + return &LoggedEncryptor{ + encryptor: encryptor, + logger: logger, + }, nil +} + +func (encryptor *LoggedEncryptor) Encrypt(ctx context.Context, plaintext []byte, metadata cloudencrypt.Metadata) ([]byte, error) { + meta := "" + for k, v := range metadata { + meta += k + ": " + v + ", " + } + + encryptor.logger.Infof(ctx, "encryption metadata: "+meta) + + if len(plaintext) == 0 { + err := errors.New("text should not be empty") + encryptor.logger.Infof(ctx, "encryption error: %s", err.Error()) + return nil, err + } + + encryptedValue, err := encryptor.encryptor.Encrypt(ctx, plaintext, metadata) + if err != nil { + encryptor.logger.Infof(ctx, "encryption error: %s", err.Error()) + return nil, err + } + + encryptor.logger.Info(ctx, "encryption success") + + return encryptedValue, nil +} + +func (encryptor *LoggedEncryptor) Decrypt(ctx context.Context, ciphertext []byte, metadata cloudencrypt.Metadata) ([]byte, error) { + meta := "" + for k, v := range metadata { + meta += k + ": " + v + ", " + } + + encryptor.logger.Infof(ctx, "decryption metadata: "+meta) + + if len(ciphertext) == 0 { + err := errors.New("text should not be empty") + encryptor.logger.Infof(ctx, "decryption error: %s", err.Error()) + return nil, err + } + + plaintext, err := encryptor.encryptor.Decrypt(ctx, ciphertext, metadata) + if err != nil { + encryptor.logger.Infof(ctx, "decryption error: %s", err.Error()) + return nil, err + } + + encryptor.logger.Info(ctx, "decryption success") + + return plaintext, nil +} + +func (encryptor *LoggedEncryptor) Close() error { + return encryptor.encryptor.Close() +} diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go index 07983a5b3b..a575ba52fd 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/bridge.go @@ -3,9 +3,7 @@ package bridge import ( "context" "sync" - "time" - "github.com/dgraph-io/ristretto/v2" "github.com/keboola/go-client/pkg/keboola" "github.com/keboola/go-cloud-encrypt/pkg/cloudencrypt" etcd "go.etcd.io/etcd/client/v3" @@ -83,18 +81,6 @@ func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) (*B encryptor := d.Encryptor() if encryptor != nil { - cache, err := ristretto.NewCache( - &ristretto.Config[[]byte, []byte]{ - NumCounters: 1e6, - MaxCost: 1 << 20, - BufferItems: 64, - }, - ) - if err != nil { - return nil, err - } - - encryptor = cloudencrypt.NewCachedEncryptor(encryptor, time.Hour, cache) tokenEncryptor = cloudencrypt.NewGenericEncryptor[keboola.Token](encryptor) credentialsEncryptor = cloudencrypt.NewGenericEncryptor[keboola.FileUploadCredentials](encryptor) } diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go index 836a44f65d..0cfed49246 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go @@ -170,6 +170,8 @@ func (b *Bridge) importFile(ctx context.Context, file plugin.File, stats statist // Prepare encryption metadata metadata := cloudencrypt.Metadata{"sink": file.SinkKey.String()} + b.logger.Infof(ctx, "decryption token: %s, %s", file.SinkKey.String(), string(existingToken.EncryptedToken)) + // Decrypt token token, err := existingToken.DecryptToken(ctx, b.tokenEncryptor, metadata) if err != nil { diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go index 1aa0b073e5..7ca3e963a6 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/job.go @@ -83,6 +83,8 @@ func (b *Bridge) CleanJob(ctx context.Context, job model.Job) (err error, delete // Prepare encryption metadata metadata := cloudencrypt.Metadata{"sink": job.SinkKey.String()} + b.logger.Infof(ctx, "decryption token: %s, %s", job.SinkKey.String(), string(existingToken.EncryptedToken)) + // Decrypt token token, err := existingToken.DecryptToken(ctx, b.tokenEncryptor, metadata) if err != nil { diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/slice.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/slice.go index b47aa5985d..d265d546a2 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/slice.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/slice.go @@ -40,6 +40,8 @@ func (b *Bridge) uploadSlice(ctx context.Context, volume *diskreader.Volume, sli // Prepare encryption metadata metadata := cloudencrypt.Metadata{"sink": slice.SinkKey.String()} + b.logger.Infof(ctx, "decryption token: %s, %s", slice.SinkKey.String(), string(existingToken.EncryptedToken)) + // Decrypt token token, err := existingToken.DecryptToken(ctx, b.tokenEncryptor, metadata) if err != nil { diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/token.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/token.go index b4457ad09e..bc975e8612 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/token.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/token.go @@ -84,6 +84,8 @@ func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definitio return keboola.Token{}, serviceError.NewResourceNotFoundError("sink token", sink.SinkKey.String(), "database") } + b.logger.Infof(ctx, "decryption token: %s, %s", sink.SinkKey.String(), string(existingToken.EncryptedToken)) + // Decrypt token token, err := existingToken.DecryptToken(ctx, b.tokenEncryptor, metadata) if err != nil { @@ -144,6 +146,8 @@ func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definitio return keboola.Token{}, err } newToken.EncryptedToken = ciphertext + + b.logger.Infof(ctx, "encryption token: %s, %s", sink.SinkKey.String(), string(newToken.EncryptedToken)) } else { newToken.Token = result }