diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f56b91086..f23e4ee3d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,10 @@ All notable changes to this project will be documented in this file. - Field `avro_raw_json` was deprecated in favor of `avro.raw_unions` for processor `schema_registry_decode`. (@rockwotj) +### Changed + +- The `snowpipe_streaming` output now has better error handling for authentication failures when uploading to cloud storage. (@rockwotj) + ## 4.45.1 - 2025-01-17 ### Fixed diff --git a/internal/impl/snowflake/streaming/integration_test.go b/internal/impl/snowflake/streaming/integration_test.go index 6ff60ab860..8f0b3835b4 100644 --- a/internal/impl/snowflake/streaming/integration_test.go +++ b/internal/impl/snowflake/streaming/integration_test.go @@ -606,7 +606,10 @@ func TestChannelOffsetToken(t *testing.T) { }, &streaming.OffsetTokenRange{Start: "0", End: "2"}) require.NoError(t, err) require.Equal(t, ptr(streaming.OffsetToken("2")), channelA.LatestOffsetToken()) + _, err = channelA.WaitUntilCommitted(ctx) + require.NoError(t, err) channelB, err := streamClient.OpenChannel(ctx, channelOpts) + require.NoError(t, err) require.Equal(t, ptr(streaming.OffsetToken("2")), channelB.LatestOffsetToken()) require.EventuallyWithT(t, func(collect *assert.CollectT) { // Always order by A so we get consistent ordering for our test diff --git a/internal/impl/snowflake/streaming/streaming.go b/internal/impl/snowflake/streaming/streaming.go index 6ef8f76d88..8800cad853 100644 --- a/internal/impl/snowflake/streaming/streaming.go +++ b/internal/impl/snowflake/streaming/streaming.go @@ -31,7 +31,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/redpanda-data/connect/v4/internal/asyncroutine" - "github.com/redpanda-data/connect/v4/internal/typed" ) const debug = false @@ -52,11 +51,6 @@ type ClientOptions struct { ConnectVersion string } -type stageUploaderResult struct { - uploader uploader - err error -} - // SnowflakeServiceClient is a port from Java :) type SnowflakeServiceClient struct { client *SnowflakeRestClient @@ -65,8 +59,7 @@ type SnowflakeServiceClient struct { options ClientOptions requestIDCounter *atomic.Int64 - uploader *typed.AtomicValue[stageUploaderResult] - uploadRefreshLoop *asyncroutine.Periodic + uploaderManager *uploaderManager flusher *asyncroutine.Batcher[blobMetadata, blobRegisterStatus] } @@ -88,51 +81,37 @@ func NewSnowflakeServiceClient(ctx context.Context, opts ClientOptions) (*Snowfl return nil, err } if resp.StatusCode != responseSuccess { + if resp.Message == "" { + resp.Message = "(no message)" + } return nil, fmt.Errorf("unable to initialize client - status: %d, message: %s", resp.StatusCode, resp.Message) } - uploader, err := newUploader(resp.StageLocation) - if err != nil { - return nil, fmt.Errorf("unable to initialize stage uploader: %w", err) + um := newUploaderManager(client, opts.Role) + if err := um.Start(ctx); err != nil { + return nil, err } - uploaderAtomic := typed.NewAtomicValue(stageUploaderResult{ - uploader: uploader, - }) ssc := &SnowflakeServiceClient{ client: client, clientPrefix: fmt.Sprintf("%s_%d", resp.Prefix, resp.DeploymentID), deploymentID: resp.DeploymentID, options: opts, - uploader: uploaderAtomic, - // Tokens expire every hour, so refresh a bit before that - uploadRefreshLoop: asyncroutine.NewPeriodicWithContext(time.Hour-(5*time.Minute), func(ctx context.Context) { - client.logger.Info("refreshing snowflake storage credentials") - resp, err := client.configureClient(ctx, clientConfigureRequest{Role: opts.Role}) - if err != nil { - client.logger.Warnf("refreshing snowflake storage credentials failure: %v", err) - uploaderAtomic.Store(stageUploaderResult{err: err}) - return - } - client.logger.Debug("refreshing snowflake storage credentials success") - // TODO: Do the other checks here that the Java SDK does (deploymentID, etc) - uploader, err := newUploader(resp.StageLocation) - uploaderAtomic.Store(stageUploaderResult{uploader: uploader, err: err}) - }), + uploaderManager: um, requestIDCounter: &atomic.Int64{}, } // Flush up to 100 blobs at once, that seems like a fairly high upper bound ssc.flusher, err = asyncroutine.NewBatcher(100, ssc.registerBlobs) if err != nil { + um.Stop() // Don't leak the goroutine on failure return nil, err } - ssc.uploadRefreshLoop.Start() return ssc, nil } // Close closes the client and future requests have undefined behavior. func (c *SnowflakeServiceClient) Close() { c.options.Logger.Debug("closing snowflake streaming output") - c.uploadRefreshLoop.Stop() + c.uploaderManager.Stop() c.client.Close() c.flusher.Close() } @@ -217,13 +196,13 @@ func (c *SnowflakeServiceClient) OpenChannel(ctx context.Context, opts ChannelOp return nil, err } ch := &SnowflakeIngestionChannel{ - ChannelOptions: opts, - clientPrefix: c.clientPrefix, - schema: schema, - parquetWriter: newParquetWriter(c.options.ConnectVersion, schema), - client: c.client, - role: c.options.Role, - uploader: c.uploader, + ChannelOptions: opts, + clientPrefix: c.clientPrefix, + schema: schema, + parquetWriter: newParquetWriter(c.options.ConnectVersion, schema), + client: c.client, + role: c.options.Role, + uploaderManager: c.uploaderManager, encryptionInfo: &encryptionInfo{ encryptionKeyID: resp.EncryptionKeyID, encryptionKey: resp.EncryptionKey, @@ -299,7 +278,7 @@ type SnowflakeIngestionChannel struct { parquetWriter *parquetWriter schema *parquet.Schema client *SnowflakeRestClient - uploader *typed.AtomicValue[stageUploaderResult] + uploaderManager *uploaderManager flusher *asyncroutine.Batcher[blobMetadata, blobRegisterStatus] encryptionInfo *encryptionInfo clientSequencer int64 @@ -435,24 +414,39 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic if err != nil { return insertStats, fmt.Errorf("unable to encrypt output: %w", err) } + fullMD5Hash := md5.Sum(part.parquetFile) uploadStartTime := time.Now() - uploaderResult := c.uploader.Load() - if uploaderResult.err != nil { - return insertStats, fmt.Errorf("failed to acquire stage uploader: %w", uploaderResult.err) - } - uploader := uploaderResult.uploader - fullMD5Hash := md5.Sum(part.parquetFile) - err = backoff.Retry(func() error { - return uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:], map[string]string{ + for i := range 3 { + ur := c.uploaderManager.GetUploader() + if ur.err != nil { + return insertStats, fmt.Errorf("failed to acquire stage uploader (last fetch time=%v): %w", ur.timestamp, ur.err) + } + err = ur.uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:], map[string]string{ "ingestclientname": partnerID, "ingestclientkey": c.clientPrefix, }) - }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) + if err == nil { + break + } + err = fmt.Errorf("unable to upload to storage (last cred refresh time=%v): %w", ur.timestamp, err) + // Similar to the Java SDK, the first failure we retry immediately after attempting to refresh + // our uploader. It seems there are some cases where the 1 hour refresh interval is too slow + // and tokens are only valid for ~30min. This is a poor man's workaround for dynamic token + // refreshing. + if i == 0 { + c.uploaderManager.RefreshUploader(ctx) + continue + } + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return insertStats, ctx.Err() + } + } if err != nil { - return insertStats, fmt.Errorf("unable to upload to storage: %w", err) + return insertStats, err } - uploadFinishTime := time.Now() resp, err := c.flusher.Submit(ctx, blobMetadata{ diff --git a/internal/impl/snowflake/streaming/uploader.go b/internal/impl/snowflake/streaming/uploader.go index 00173845c0..d82d81ca34 100644 --- a/internal/impl/snowflake/streaming/uploader.go +++ b/internal/impl/snowflake/streaming/uploader.go @@ -20,6 +20,8 @@ import ( "net/url" "path/filepath" "strings" + "sync" + "time" gcs "cloud.google.com/go/storage" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" @@ -28,8 +30,11 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/cenkalti/backoff/v4" "golang.org/x/oauth2" gcsopt "google.golang.org/api/option" + + "github.com/redpanda-data/connect/v4/internal/asyncroutine" ) type uploader interface { @@ -187,3 +192,89 @@ func splitBucketAndPath(stageLocation string) (string, string, error) { } return bucketAndPath[0], bucketAndPath[1], nil } + +type ( + uploaderLoadResult struct { + uploader uploader + // Time of when the uploader was created + timestamp time.Time + // If there was an error creating the uploader + err error + } + + uploaderManager struct { + state *uploaderLoadResult + client *SnowflakeRestClient + role string + stateMu sync.RWMutex + uploadMu sync.Mutex + periodic asyncroutine.Periodic + } +) + +func newUploaderManager(client *SnowflakeRestClient, role string) *uploaderManager { + m := &uploaderManager{state: nil, client: client, role: role} + // According to the Java SDK tokens are refreshed every hour on GCP + // and 2 hours on AWS. It seems in practice some customers only have + // tokens that live for 30 minutes, so we need to support ealier + // refreshes (those are opt in however). + const refreshTime = time.Hour - time.Minute*5 + m.periodic = *asyncroutine.NewPeriodicWithContext(refreshTime, m.RefreshUploader) + return m +} + +func (m *uploaderManager) Start(ctx context.Context) error { + m.RefreshUploader(ctx) + s := m.GetUploader() + if s.err != nil { + return s.err + } + m.periodic.Start() + return nil +} + +func (m *uploaderManager) GetUploader() *uploaderLoadResult { + m.stateMu.RLock() + defer m.stateMu.RUnlock() + return m.state +} + +func (m *uploaderManager) RefreshUploader(ctx context.Context) { + m.uploadMu.Lock() + defer m.uploadMu.Unlock() + r := m.GetUploader() + // Don't refresh sooner than every minute. + if r != nil && time.Now().Before(r.timestamp.Add(time.Minute)) { + return + } + u, err := backoff.RetryWithData(func() (uploader, error) { + resp, err := m.client.configureClient(ctx, clientConfigureRequest{Role: m.role}) + if err == nil && resp.StatusCode != responseSuccess { + msg := "(no message)" + if resp.Message != "" { + msg = resp.Message + } + err = fmt.Errorf("unable to reconfigure client - status: %d, message: %s", resp.StatusCode, msg) + } + if err != nil { + return nil, err + } + // TODO: Do the other checks here that the Java SDK does (deploymentID, etc) + return newUploader(resp.StageLocation) + }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3)) + if r != nil { + // Only log when this is running as a background task (so it's a refresh not initial setup). + if err != nil { + m.client.logger.Warnf("refreshing snowflake storage credentials failure: %v", err) + } else { + m.client.logger.Debug("refreshing snowflake storage credentials success") + } + } + m.stateMu.Lock() + defer m.stateMu.Unlock() + m.state = &uploaderLoadResult{uploader: u, timestamp: time.Now(), err: err} +} + +func (m *uploaderManager) Stop() { + m.periodic.Stop() +}