From e07ebda3d5b64c2ee63210f94b5b6ec2a19495a4 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 23 Jan 2025 04:00:48 +0000 Subject: [PATCH 1/4] snowpipe: add better logging and error handling for stage refreshing --- .../impl/snowflake/streaming/streaming.go | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 6ef8f76d88..0f504ce4fd 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -53,8 +53,9 @@ type ClientOptions struct { } type stageUploaderResult struct { - uploader uploader - err error + uploader uploader + fetchTime time.Time + err error } // SnowflakeServiceClient is a port from Java :) @@ -88,14 +89,18 @@ func NewSnowflakeServiceClient(ctx context.Context, opts ClientOptions) (*Snowfl return nil, err } if resp.StatusCode != responseSuccess { + if resp.Message == "" { + resp.Message = "(no message)" + } return nil, fmt.Errorf("unable to initialize client - status: %d, message: %s", resp.StatusCode, resp.Message) } - uploader, err := newUploader(resp.StageLocation) + u, err := newUploader(resp.StageLocation) if err != nil { return nil, fmt.Errorf("unable to initialize stage uploader: %w", err) } uploaderAtomic := typed.NewAtomicValue(stageUploaderResult{ - uploader: uploader, + uploader: u, + fetchTime: time.Now(), }) ssc := &SnowflakeServiceClient{ client: client, @@ -107,16 +112,27 @@ func NewSnowflakeServiceClient(ctx context.Context, opts ClientOptions) (*Snowfl // Tokens expire every hour, so refresh a bit before that uploadRefreshLoop: asyncroutine.NewPeriodicWithContext(time.Hour-(5*time.Minute), func(ctx context.Context) { client.logger.Info("refreshing snowflake storage credentials") - resp, err := client.configureClient(ctx, clientConfigureRequest{Role: opts.Role}) + u, err := backoff.RetryWithData(func() (uploader, error) { + resp, err := client.configureClient(ctx, clientConfigureRequest{Role: opts.Role}) + if err == nil && resp.StatusCode != responseSuccess { + msg := "(no message)" + if resp.Message != "" { + msg = resp.Message + } + err = fmt.Errorf("unable to reconfigure client - status: %d, message: %s", resp.StatusCode, msg) + } + if err != nil { + return nil, err + } + // TODO: Do the other checks here that the Java SDK does (deploymentID, etc) + return newUploader(resp.StageLocation) + }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) if err != nil { client.logger.Warnf("refreshing snowflake storage credentials failure: %v", err) - uploaderAtomic.Store(stageUploaderResult{err: err}) - return + } else { + client.logger.Debug("refreshing snowflake storage credentials success") } - client.logger.Debug("refreshing snowflake storage credentials success") - // TODO: Do the other checks here that the Java SDK does (deploymentID, etc) - uploader, err := newUploader(resp.StageLocation) - uploaderAtomic.Store(stageUploaderResult{uploader: uploader, err: err}) + uploaderAtomic.Store(stageUploaderResult{uploader: u, fetchTime: time.Now(), err: err}) }), requestIDCounter: &atomic.Int64{}, } @@ -439,7 +455,7 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic uploadStartTime := time.Now() uploaderResult := c.uploader.Load() if uploaderResult.err != nil { - return insertStats, fmt.Errorf("failed to acquire stage uploader: %w", uploaderResult.err) + return insertStats, fmt.Errorf("failed to acquire stage uploader (last fetch time=%v): %w", uploaderResult.fetchTime, uploaderResult.err) } uploader := uploaderResult.uploader fullMD5Hash := md5.Sum(part.parquetFile) @@ -450,7 +466,7 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic }) }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) if err != nil { - return insertStats, fmt.Errorf("unable to upload to storage: %w", err) + return insertStats, fmt.Errorf("unable to upload to storage (last fetch time=%v): %w", uploaderResult.fetchTime, err) } uploadFinishTime := time.Now() From cefce586431303f1d2aae6f467219bc5a5ff147b Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 23 Jan 2025 04:12:02 +0000 Subject: [PATCH 2/4] snowpipe: deflake integration test --- internal/impl/snowflake/streaming/integration_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/impl/snowflake/streaming/integration_test.go b/internal/impl/snowflake/streaming/integration_test.go index 6ff60ab860..8f0b3835b4 100644 --- a/internal/impl/snowflake/streaming/integration_test.go +++ b/internal/impl/snowflake/streaming/integration_test.go @@ -606,7 +606,10 @@ func TestChannelOffsetToken(t *testing.T) { }, &streaming.OffsetTokenRange{Start: "0", End: "2"}) require.NoError(t, err) require.Equal(t, ptr(streaming.OffsetToken("2")), channelA.LatestOffsetToken()) + _, err = channelA.WaitUntilCommitted(ctx) + require.NoError(t, err) channelB, err := streamClient.OpenChannel(ctx, channelOpts) + require.NoError(t, err) require.Equal(t, ptr(streaming.OffsetToken("2")), channelB.LatestOffsetToken()) require.EventuallyWithT(t, func(collect *assert.CollectT) { // Always order by A so we get consistent ordering for our test From ae3e043b1a769f1d81b1362ecad619b191d7c06b Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 23 Jan 2025 17:04:59 +0000 Subject: [PATCH 3/4] snowpipe: refresh on upload failures This makes refreshes a little more similar to the Java SDK in that we refresh when stuff fails to upload. This seems to be needed because that 1 hour constant refresh is not always true.. --- .../impl/snowflake/streaming/streaming.go | 104 +++++++----------- internal/impl/snowflake/streaming/uploader.go | 91 +++++++++++++++ 2 files changed, 132 insertions(+), 63 deletions(-) diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 0f504ce4fd..8800cad853 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -31,7 +31,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/redpanda-data/connect/v4/internal/asyncroutine" - "github.com/redpanda-data/connect/v4/internal/typed" ) const debug = false @@ -52,12 +51,6 @@ type ClientOptions struct { ConnectVersion string } -type stageUploaderResult struct { - uploader uploader - fetchTime time.Time - err error -} - // SnowflakeServiceClient is a port from Java :) type SnowflakeServiceClient struct { client *SnowflakeRestClient @@ -66,8 +59,7 @@ type SnowflakeServiceClient struct { options ClientOptions requestIDCounter *atomic.Int64 - uploader *typed.AtomicValue[stageUploaderResult] - uploadRefreshLoop *asyncroutine.Periodic + uploaderManager *uploaderManager flusher *asyncroutine.Batcher[blobMetadata, blobRegisterStatus] } @@ -94,61 +86,32 @@ func NewSnowflakeServiceClient(ctx context.Context, opts ClientOptions) (*Snowfl } return nil, fmt.Errorf("unable to initialize client - status: %d, message: %s", resp.StatusCode, resp.Message) } - u, err := newUploader(resp.StageLocation) - if err != nil { - return nil, fmt.Errorf("unable to initialize stage uploader: %w", err) + um := newUploaderManager(client, opts.Role) + if err := um.Start(ctx); err != nil { + return nil, err } - uploaderAtomic := typed.NewAtomicValue(stageUploaderResult{ - uploader: u, - fetchTime: time.Now(), - }) ssc := &SnowflakeServiceClient{ client: client, clientPrefix: fmt.Sprintf("%s_%d", resp.Prefix, resp.DeploymentID), deploymentID: resp.DeploymentID, options: opts, - uploader: uploaderAtomic, - // Tokens expire every hour, so refresh a bit before that - uploadRefreshLoop: asyncroutine.NewPeriodicWithContext(time.Hour-(5*time.Minute), func(ctx context.Context) { - client.logger.Info("refreshing snowflake storage credentials") - u, err := backoff.RetryWithData(func() (uploader, error) { - resp, err := client.configureClient(ctx, clientConfigureRequest{Role: opts.Role}) - if err == nil && resp.StatusCode != responseSuccess { - msg := "(no message)" - if resp.Message != "" { - msg = resp.Message - } - err = fmt.Errorf("unable to reconfigure client - status: %d, message: %s", resp.StatusCode, msg) - } - if err != nil { - return nil, err - } - // TODO: Do the other checks here that the Java SDK does (deploymentID, etc) - return newUploader(resp.StageLocation) - }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) - if err != nil { - client.logger.Warnf("refreshing snowflake storage credentials failure: %v", err) - } else { - client.logger.Debug("refreshing snowflake storage credentials success") - } - uploaderAtomic.Store(stageUploaderResult{uploader: u, fetchTime: time.Now(), err: err}) - }), + uploaderManager: um, requestIDCounter: &atomic.Int64{}, } // Flush up to 100 blobs at once, that seems like a fairly high upper bound ssc.flusher, err = asyncroutine.NewBatcher(100, ssc.registerBlobs) if err != nil { + um.Stop() // Don't leak the goroutine on failure return nil, err } - ssc.uploadRefreshLoop.Start() return ssc, nil } // Close closes the client and future requests have undefined behavior. func (c *SnowflakeServiceClient) Close() { c.options.Logger.Debug("closing snowflake streaming output") - c.uploadRefreshLoop.Stop() + c.uploaderManager.Stop() c.client.Close() c.flusher.Close() } @@ -233,13 +196,13 @@ func (c *SnowflakeServiceClient) OpenChannel(ctx context.Context, opts ChannelOp return nil, err } ch := &SnowflakeIngestionChannel{ - ChannelOptions: opts, - clientPrefix: c.clientPrefix, - schema: schema, - parquetWriter: newParquetWriter(c.options.ConnectVersion, schema), - client: c.client, - role: c.options.Role, - uploader: c.uploader, + ChannelOptions: opts, + clientPrefix: c.clientPrefix, + schema: schema, + parquetWriter: newParquetWriter(c.options.ConnectVersion, schema), + client: c.client, + role: c.options.Role, + uploaderManager: c.uploaderManager, encryptionInfo: &encryptionInfo{ encryptionKeyID: resp.EncryptionKeyID, encryptionKey: resp.EncryptionKey, @@ -315,7 +278,7 @@ type SnowflakeIngestionChannel struct { parquetWriter *parquetWriter schema *parquet.Schema client *SnowflakeRestClient - uploader *typed.AtomicValue[stageUploaderResult] + uploaderManager *uploaderManager flusher *asyncroutine.Batcher[blobMetadata, blobRegisterStatus] encryptionInfo *encryptionInfo clientSequencer int64 @@ -451,24 +414,39 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic if err != nil { return insertStats, fmt.Errorf("unable to encrypt output: %w", err) } + fullMD5Hash := md5.Sum(part.parquetFile) uploadStartTime := time.Now() - uploaderResult := c.uploader.Load() - if uploaderResult.err != nil { - return insertStats, fmt.Errorf("failed to acquire stage uploader (last fetch time=%v): %w", uploaderResult.fetchTime, uploaderResult.err) - } - uploader := uploaderResult.uploader - fullMD5Hash := md5.Sum(part.parquetFile) - err = backoff.Retry(func() error { - return uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:], map[string]string{ + for i := range 3 { + ur := c.uploaderManager.GetUploader() + if ur.err != nil { + return insertStats, fmt.Errorf("failed to acquire stage uploader (last fetch time=%v): %w", ur.timestamp, ur.err) + } + err = ur.uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:], map[string]string{ "ingestclientname": partnerID, "ingestclientkey": c.clientPrefix, }) - }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) + if err == nil { + break + } + err = fmt.Errorf("unable to upload to storage (last cred refresh time=%v): %w", ur.timestamp, err) + // Similar to the Java SDK, the first failure we retry immediately after attempting to refresh + // our uploader. It seems there are some cases where the 1 hour refresh interval is too slow + // and tokens are only valid for ~30min. This is a poor man's workaround for dynamic token + // refreshing. + if i == 0 { + c.uploaderManager.RefreshUploader(ctx) + continue + } + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return insertStats, ctx.Err() + } + } if err != nil { - return insertStats, fmt.Errorf("unable to upload to storage (last fetch time=%v): %w", uploaderResult.fetchTime, err) + return insertStats, err } - uploadFinishTime := time.Now() resp, err := c.flusher.Submit(ctx, blobMetadata{ diff --git a/internal/impl/snowflake/streaming/uploader.go b/internal/impl/snowflake/streaming/uploader.go index 00173845c0..d82d81ca34 100644 --- a/internal/impl/snowflake/streaming/uploader.go +++ b/internal/impl/snowflake/streaming/uploader.go @@ -20,6 +20,8 @@ import ( "net/url" "path/filepath" "strings" + "sync" + "time" gcs "cloud.google.com/go/storage" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" @@ -28,8 +30,11 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/cenkalti/backoff/v4" "golang.org/x/oauth2" gcsopt "google.golang.org/api/option" + + "github.com/redpanda-data/connect/v4/internal/asyncroutine" ) type uploader interface { @@ -187,3 +192,89 @@ func splitBucketAndPath(stageLocation string) (string, string, error) { } return bucketAndPath[0], bucketAndPath[1], nil } + +type ( + uploaderLoadResult struct { + uploader uploader + // Time of when the uploader was created + timestamp time.Time + // If there was an error creating the uploader + err error + } + + uploaderManager struct { + state *uploaderLoadResult + client *SnowflakeRestClient + role string + stateMu sync.RWMutex + uploadMu sync.Mutex + periodic asyncroutine.Periodic + } +) + +func newUploaderManager(client *SnowflakeRestClient, role string) *uploaderManager { + m := &uploaderManager{state: nil, client: client, role: role} + // According to the Java SDK tokens are refreshed every hour on GCP + // and 2 hours on AWS. It seems in practice some customers only have + // tokens that live for 30 minutes, so we need to support ealier + // refreshes (those are opt in however). + const refreshTime = time.Hour - time.Minute*5 + m.periodic = *asyncroutine.NewPeriodicWithContext(refreshTime, m.RefreshUploader) + return m +} + +func (m *uploaderManager) Start(ctx context.Context) error { + m.RefreshUploader(ctx) + s := m.GetUploader() + if s.err != nil { + return s.err + } + m.periodic.Start() + return nil +} + +func (m *uploaderManager) GetUploader() *uploaderLoadResult { + m.stateMu.RLock() + defer m.stateMu.RUnlock() + return m.state +} + +func (m *uploaderManager) RefreshUploader(ctx context.Context) { + m.uploadMu.Lock() + defer m.uploadMu.Unlock() + r := m.GetUploader() + // Don't refresh sooner than every minute. + if r != nil && time.Now().Before(r.timestamp.Add(time.Minute)) { + return + } + u, err := backoff.RetryWithData(func() (uploader, error) { + resp, err := m.client.configureClient(ctx, clientConfigureRequest{Role: m.role}) + if err == nil && resp.StatusCode != responseSuccess { + msg := "(no message)" + if resp.Message != "" { + msg = resp.Message + } + err = fmt.Errorf("unable to reconfigure client - status: %d, message: %s", resp.StatusCode, msg) + } + if err != nil { + return nil, err + } + // TODO: Do the other checks here that the Java SDK does (deploymentID, etc) + return newUploader(resp.StageLocation) + }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) + if r != nil { + // Only log when this is running as a background task (so it's a refresh not initial setup). + if err != nil { + m.client.logger.Warnf("refreshing snowflake storage credentials failure: %v", err) + } else { + m.client.logger.Debug("refreshing snowflake storage credentials success") + } + } + m.stateMu.Lock() + defer m.stateMu.Unlock() + m.state = &uploaderLoadResult{uploader: u, timestamp: time.Now(), err: err} +} + +func (m *uploaderManager) Stop() { + m.periodic.Stop() +} From 9460ddf23e15531472dbbcbbf7e3000431173ede Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 23 Jan 2025 18:33:07 +0000 Subject: [PATCH 4/4] update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74deeaf4b8..9ece0d70df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ All notable changes to this project will be documented in this file. - Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj) - Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj) +### Changed + +- The `snowpipe_streaming` output now has better error handling for authentication failures when uploading to cloud storage. (@rockwotj) + ## 4.45.1 - 2025-01-17 ### Fixed