Skip to content

Commit

Permalink
Merge pull request #3139 from redpanda-data/snow_log
Browse files Browse the repository at this point in the history
snowpipe: improve token refresh behavior
  • Loading branch information
rockwotj authored Jan 23, 2025
2 parents c36dcb6 + 9460ddf commit b20ec06
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 50 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ All notable changes to this project will be documented in this file.

- Field `avro_raw_json` was deprecated in favor of `avro.raw_unions` for processor `schema_registry_decode`. (@rockwotj)

### Changed

- The `snowpipe_streaming` output now has better error handling for authentication failures when uploading to cloud storage. (@rockwotj)

## 4.45.1 - 2025-01-17

### Fixed
Expand Down
3 changes: 3 additions & 0 deletions internal/impl/snowflake/streaming/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,10 @@ func TestChannelOffsetToken(t *testing.T) {
}, &streaming.OffsetTokenRange{Start: "0", End: "2"})
require.NoError(t, err)
require.Equal(t, ptr(streaming.OffsetToken("2")), channelA.LatestOffsetToken())
_, err = channelA.WaitUntilCommitted(ctx)
require.NoError(t, err)
channelB, err := streamClient.OpenChannel(ctx, channelOpts)
require.NoError(t, err)
require.Equal(t, ptr(streaming.OffsetToken("2")), channelB.LatestOffsetToken())
require.EventuallyWithT(t, func(collect *assert.CollectT) {
// Always order by A so we get consistent ordering for our test
Expand Down
94 changes: 44 additions & 50 deletions internal/impl/snowflake/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"golang.org/x/sync/errgroup"

"github.com/redpanda-data/connect/v4/internal/asyncroutine"
"github.com/redpanda-data/connect/v4/internal/typed"
)

const debug = false
Expand All @@ -52,11 +51,6 @@ type ClientOptions struct {
ConnectVersion string
}

type stageUploaderResult struct {
uploader uploader
err error
}

// SnowflakeServiceClient is a port from Java :)
type SnowflakeServiceClient struct {
client *SnowflakeRestClient
Expand All @@ -65,8 +59,7 @@ type SnowflakeServiceClient struct {
options ClientOptions
requestIDCounter *atomic.Int64

uploader *typed.AtomicValue[stageUploaderResult]
uploadRefreshLoop *asyncroutine.Periodic
uploaderManager *uploaderManager

flusher *asyncroutine.Batcher[blobMetadata, blobRegisterStatus]
}
Expand All @@ -88,51 +81,37 @@ func NewSnowflakeServiceClient(ctx context.Context, opts ClientOptions) (*Snowfl
return nil, err
}
if resp.StatusCode != responseSuccess {
if resp.Message == "" {
resp.Message = "(no message)"
}
return nil, fmt.Errorf("unable to initialize client - status: %d, message: %s", resp.StatusCode, resp.Message)
}
uploader, err := newUploader(resp.StageLocation)
if err != nil {
return nil, fmt.Errorf("unable to initialize stage uploader: %w", err)
um := newUploaderManager(client, opts.Role)
if err := um.Start(ctx); err != nil {
return nil, err
}
uploaderAtomic := typed.NewAtomicValue(stageUploaderResult{
uploader: uploader,
})
ssc := &SnowflakeServiceClient{
client: client,
clientPrefix: fmt.Sprintf("%s_%d", resp.Prefix, resp.DeploymentID),
deploymentID: resp.DeploymentID,
options: opts,

uploader: uploaderAtomic,
// Tokens expire every hour, so refresh a bit before that
uploadRefreshLoop: asyncroutine.NewPeriodicWithContext(time.Hour-(5*time.Minute), func(ctx context.Context) {
client.logger.Info("refreshing snowflake storage credentials")
resp, err := client.configureClient(ctx, clientConfigureRequest{Role: opts.Role})
if err != nil {
client.logger.Warnf("refreshing snowflake storage credentials failure: %v", err)
uploaderAtomic.Store(stageUploaderResult{err: err})
return
}
client.logger.Debug("refreshing snowflake storage credentials success")
// TODO: Do the other checks here that the Java SDK does (deploymentID, etc)
uploader, err := newUploader(resp.StageLocation)
uploaderAtomic.Store(stageUploaderResult{uploader: uploader, err: err})
}),
uploaderManager: um,
requestIDCounter: &atomic.Int64{},
}
// Flush up to 100 blobs at once, that seems like a fairly high upper bound
ssc.flusher, err = asyncroutine.NewBatcher(100, ssc.registerBlobs)
if err != nil {
um.Stop() // Don't leak the goroutine on failure
return nil, err
}
ssc.uploadRefreshLoop.Start()
return ssc, nil
}

// Close closes the client and future requests have undefined behavior.
func (c *SnowflakeServiceClient) Close() {
c.options.Logger.Debug("closing snowflake streaming output")
c.uploadRefreshLoop.Stop()
c.uploaderManager.Stop()
c.client.Close()
c.flusher.Close()
}
Expand Down Expand Up @@ -217,13 +196,13 @@ func (c *SnowflakeServiceClient) OpenChannel(ctx context.Context, opts ChannelOp
return nil, err
}
ch := &SnowflakeIngestionChannel{
ChannelOptions: opts,
clientPrefix: c.clientPrefix,
schema: schema,
parquetWriter: newParquetWriter(c.options.ConnectVersion, schema),
client: c.client,
role: c.options.Role,
uploader: c.uploader,
ChannelOptions: opts,
clientPrefix: c.clientPrefix,
schema: schema,
parquetWriter: newParquetWriter(c.options.ConnectVersion, schema),
client: c.client,
role: c.options.Role,
uploaderManager: c.uploaderManager,
encryptionInfo: &encryptionInfo{
encryptionKeyID: resp.EncryptionKeyID,
encryptionKey: resp.EncryptionKey,
Expand Down Expand Up @@ -299,7 +278,7 @@ type SnowflakeIngestionChannel struct {
parquetWriter *parquetWriter
schema *parquet.Schema
client *SnowflakeRestClient
uploader *typed.AtomicValue[stageUploaderResult]
uploaderManager *uploaderManager
flusher *asyncroutine.Batcher[blobMetadata, blobRegisterStatus]
encryptionInfo *encryptionInfo
clientSequencer int64
Expand Down Expand Up @@ -435,24 +414,39 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic
if err != nil {
return insertStats, fmt.Errorf("unable to encrypt output: %w", err)
}
fullMD5Hash := md5.Sum(part.parquetFile)

uploadStartTime := time.Now()
uploaderResult := c.uploader.Load()
if uploaderResult.err != nil {
return insertStats, fmt.Errorf("failed to acquire stage uploader: %w", uploaderResult.err)
}
uploader := uploaderResult.uploader
fullMD5Hash := md5.Sum(part.parquetFile)
err = backoff.Retry(func() error {
return uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:], map[string]string{
for i := range 3 {
ur := c.uploaderManager.GetUploader()
if ur.err != nil {
return insertStats, fmt.Errorf("failed to acquire stage uploader (last fetch time=%v): %w", ur.timestamp, ur.err)
}
err = ur.uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:], map[string]string{
"ingestclientname": partnerID,
"ingestclientkey": c.clientPrefix,
})
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3))
if err == nil {
break
}
err = fmt.Errorf("unable to upload to storage (last cred refresh time=%v): %w", ur.timestamp, err)
// Similar to the Java SDK, the first failure we retry immediately after attempting to refresh
// our uploader. It seems there are some cases where the 1 hour refresh interval is too slow
// and tokens are only valid for ~30min. This is a poor man's workaround for dynamic token
// refreshing.
if i == 0 {
c.uploaderManager.RefreshUploader(ctx)
continue
}
select {
case <-time.After(time.Second):
case <-ctx.Done():
return insertStats, ctx.Err()
}
}
if err != nil {
return insertStats, fmt.Errorf("unable to upload to storage: %w", err)
return insertStats, err
}

uploadFinishTime := time.Now()

resp, err := c.flusher.Submit(ctx, blobMetadata{
Expand Down
91 changes: 91 additions & 0 deletions internal/impl/snowflake/streaming/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"net/url"
"path/filepath"
"strings"
"sync"
"time"

gcs "cloud.google.com/go/storage"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
Expand All @@ -28,8 +30,11 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/cenkalti/backoff/v4"
"golang.org/x/oauth2"
gcsopt "google.golang.org/api/option"

"github.com/redpanda-data/connect/v4/internal/asyncroutine"
)

type uploader interface {
Expand Down Expand Up @@ -187,3 +192,89 @@ func splitBucketAndPath(stageLocation string) (string, string, error) {
}
return bucketAndPath[0], bucketAndPath[1], nil
}

type (
uploaderLoadResult struct {
uploader uploader
// Time of when the uploader was created
timestamp time.Time
// If there was an error creating the uploader
err error
}

uploaderManager struct {
state *uploaderLoadResult
client *SnowflakeRestClient
role string
stateMu sync.RWMutex
uploadMu sync.Mutex
periodic asyncroutine.Periodic
}
)

func newUploaderManager(client *SnowflakeRestClient, role string) *uploaderManager {
m := &uploaderManager{state: nil, client: client, role: role}
// According to the Java SDK tokens are refreshed every hour on GCP
// and 2 hours on AWS. It seems in practice some customers only have
// tokens that live for 30 minutes, so we need to support ealier
// refreshes (those are opt in however).
const refreshTime = time.Hour - time.Minute*5
m.periodic = *asyncroutine.NewPeriodicWithContext(refreshTime, m.RefreshUploader)
return m
}

func (m *uploaderManager) Start(ctx context.Context) error {
m.RefreshUploader(ctx)
s := m.GetUploader()
if s.err != nil {
return s.err
}
m.periodic.Start()
return nil
}

func (m *uploaderManager) GetUploader() *uploaderLoadResult {
m.stateMu.RLock()
defer m.stateMu.RUnlock()
return m.state
}

func (m *uploaderManager) RefreshUploader(ctx context.Context) {
m.uploadMu.Lock()
defer m.uploadMu.Unlock()
r := m.GetUploader()
// Don't refresh sooner than every minute.
if r != nil && time.Now().Before(r.timestamp.Add(time.Minute)) {
return
}
u, err := backoff.RetryWithData(func() (uploader, error) {
resp, err := m.client.configureClient(ctx, clientConfigureRequest{Role: m.role})
if err == nil && resp.StatusCode != responseSuccess {
msg := "(no message)"
if resp.Message != "" {
msg = resp.Message
}
err = fmt.Errorf("unable to reconfigure client - status: %d, message: %s", resp.StatusCode, msg)
}
if err != nil {
return nil, err
}
// TODO: Do the other checks here that the Java SDK does (deploymentID, etc)
return newUploader(resp.StageLocation)
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3))
if r != nil {
// Only log when this is running as a background task (so it's a refresh not initial setup).
if err != nil {
m.client.logger.Warnf("refreshing snowflake storage credentials failure: %v", err)
} else {
m.client.logger.Debug("refreshing snowflake storage credentials success")
}
}
m.stateMu.Lock()
defer m.stateMu.Unlock()
m.state = &uploaderLoadResult{uploader: u, timestamp: time.Now(), err: err}
}

func (m *uploaderManager) Stop() {
m.periodic.Stop()
}

0 comments on commit b20ec06

Please sign in to comment.