diff --git a/internal/alloydb/instance.go b/internal/alloydb/instance.go index 55d8e9f3..af9318e0 100644 --- a/internal/alloydb/instance.go +++ b/internal/alloydb/instance.go @@ -48,7 +48,7 @@ const ( var ( // Instance URI is in the format: - // '/projects//locations//clusters//instances/' + // 'projects//locations//clusters//instances/' // Additionally, we have to support legacy "domain-scoped" projects // (e.g. "google.com:PROJECT") instURIRegex = regexp.MustCompile("projects/([^:]+(:[^:]+)?)/locations/([^:]+)/clusters/([^:]+)/instances/([^:]+)") @@ -138,13 +138,12 @@ func (r *refreshOperation) isValid() bool { type RefreshAheadCache struct { instanceURI InstanceURI logger debug.ContextLogger - key *rsa.PrivateKey // refreshTimeout sets the maximum duration a refresh cycle can run // for. refreshTimeout time.Duration // l controls the rate at which refresh cycles are run. l *rate.Limiter - r refresher + r adminAPIClient resultGuard sync.RWMutex // cur represents the current refreshOperation that will be used to @@ -175,9 +174,8 @@ func NewRefreshAheadCache( i := &RefreshAheadCache{ instanceURI: instance, logger: l, - key: key, l: rate.NewLimiter(rate.Every(refreshInterval), refreshBurst), - r: newRefresher(client, dialerID), + r: newAdminAPIClient(client, key, dialerID), refreshTimeout: refreshTimeout, ctx: ctx, cancel: cancel, @@ -296,7 +294,7 @@ func (i *RefreshAheadCache) scheduleRefresh(d time.Duration) *refreshOperation { r.err, ) } else { - r.result, r.err = i.r.performRefresh(i.ctx, i.instanceURI, i.key) + r.result, r.err = i.r.connectionInfo(i.ctx, i.instanceURI) i.logger.Debugf( ctx, "[%v] Connection info refresh operation complete", diff --git a/internal/alloydb/instance_test.go b/internal/alloydb/instance_test.go index 4eeadc83..5dad9e68 100644 --- a/internal/alloydb/instance_test.go +++ b/internal/alloydb/instance_test.go @@ -42,8 +42,8 @@ func genRSAKey() *rsa.PrivateKey { return key } -// RSAKey is used for test only. -var RSAKey = genRSAKey() +// rsaKey is used for test only. +var rsaKey = genRSAKey() func TestParseInstURI(t *testing.T) { tcs := []struct { @@ -53,7 +53,7 @@ func TestParseInstURI(t *testing.T) { }{ { desc: "vanilla instance URI", - in: "/projects/proj/locations/reg/clusters/clust/instances/name", + in: "projects/proj/locations/reg/clusters/clust/instances/name", want: InstanceURI{ project: "proj", region: "reg", @@ -63,7 +63,7 @@ func TestParseInstURI(t *testing.T) { }, { desc: "with legacy domain-scoped project", - in: "/projects/google.com:proj/locations/reg/clusters/clust/instances/name", + in: "projects/google.com:proj/locations/reg/clusters/clust/instances/name", want: InstanceURI{ project: "google.com:proj", region: "reg", @@ -157,7 +157,7 @@ func TestConnectionInfo(t *testing.T) { i := NewRefreshAheadCache( testInstanceURI(), nullLogger{}, - c, RSAKey, 30*time.Second, "dialer-id", + c, rsaKey, 30*time.Second, "dialer-id", ) if err != nil { t.Fatalf("failed to create mock instance: %v", err) @@ -192,7 +192,7 @@ func TestConnectionInfo(t *testing.T) { } func testInstanceURI() InstanceURI { - i, _ := ParseInstURI("/projects/my-project/locations/my-region/clusters/my-cluster/instances/my-instance") + i, _ := ParseInstURI("projects/my-project/locations/my-region/clusters/my-cluster/instances/my-instance") return i } @@ -209,7 +209,7 @@ func TestConnectInfoErrors(t *testing.T) { i := NewRefreshAheadCache( testInstanceURI(), nullLogger{}, - c, RSAKey, 0, "dialer-id", + c, rsaKey, 0, "dialer-id", ) if err != nil { t.Fatalf("failed to initialize Instance: %v", err) @@ -242,7 +242,7 @@ func TestClose(t *testing.T) { i := NewRefreshAheadCache( testInstanceURI(), nullLogger{}, - c, RSAKey, 30, "dialer-ider", + c, rsaKey, 30, "dialer-ider", ) if err != nil { t.Fatalf("failed to initialize Instance: %v", err) diff --git a/internal/alloydb/lazy.go b/internal/alloydb/lazy.go index 0f0602cf..0f2e1985 100644 --- a/internal/alloydb/lazy.go +++ b/internal/alloydb/lazy.go @@ -29,8 +29,7 @@ import ( type LazyRefreshCache struct { uri InstanceURI logger debug.ContextLogger - key *rsa.PrivateKey - r refresher + r adminAPIClient mu sync.Mutex needsRefresh bool cached ConnectionInfo @@ -48,9 +47,9 @@ func NewLazyRefreshCache( return &LazyRefreshCache{ uri: uri, logger: l, - key: key, - r: newRefresher( + r: newAdminAPIClient( client, + key, dialerID, ), } @@ -84,7 +83,7 @@ func (c *LazyRefreshCache) ConnectionInfo( "[%v] Connection info refresh operation started", c.uri.String(), ) - ci, err := c.r.performRefresh(ctx, c.uri, c.key) + ci, err := c.r.connectionInfo(ctx, c.uri) if err != nil { c.logger.Debugf( ctx, diff --git a/internal/alloydb/lazy_test.go b/internal/alloydb/lazy_test.go index b924abb3..b108e051 100644 --- a/internal/alloydb/lazy_test.go +++ b/internal/alloydb/lazy_test.go @@ -48,7 +48,7 @@ func TestLazyRefreshCacheConnectionInfo(t *testing.T) { } cache := NewLazyRefreshCache( testInstanceURI(), nullLogger{}, c, - RSAKey, 30*time.Second, "", + rsaKey, 30*time.Second, "", ) ci, err := cache.ConnectionInfo(context.Background()) @@ -90,7 +90,7 @@ func TestLazyRefreshCacheForceRefresh(t *testing.T) { } cache := NewLazyRefreshCache( testInstanceURI(), nullLogger{}, c, - RSAKey, 30*time.Second, "", + rsaKey, 30*time.Second, "", ) _, err = cache.ConnectionInfo(context.Background()) diff --git a/internal/alloydb/refresh.go b/internal/alloydb/refresh.go index 5a6b0707..2bd922d1 100644 --- a/internal/alloydb/refresh.go +++ b/internal/alloydb/refresh.go @@ -221,23 +221,25 @@ func newClientCertificate( }, nil } -// newRefresher creates a Refresher. -func newRefresher( +func newAdminAPIClient( client *alloydbadmin.AlloyDBAdminClient, + key *rsa.PrivateKey, dialerID string, -) refresher { - return refresher{ +) adminAPIClient { + return adminAPIClient{ client: client, + key: key, dialerID: dialerID, } } -// refresher manages the AlloyDB Admin API access to instance metadata and to -// ephemeral certificates. -type refresher struct { +// adminAPIClient manages the AlloyDB Admin API access to instance metadata and +// to ephemeral certificates. +type adminAPIClient struct { // client provides access to the AlloyDB Admin API client *alloydbadmin.AlloyDBAdminClient - + // key is used to request client certificates + key *rsa.PrivateKey // dialerID is the unique ID of the associated dialer. dialerID string } @@ -251,16 +253,17 @@ type ConnectionInfo struct { Expiration time.Time } -func (r refresher) performRefresh( - ctx context.Context, i InstanceURI, k *rsa.PrivateKey, +func (c adminAPIClient) connectionInfo( + ctx context.Context, i InstanceURI, ) (res ConnectionInfo, err error) { + var refreshEnd trace.EndSpanFunc ctx, refreshEnd = trace.StartSpan(ctx, "cloud.google.com/go/alloydbconn/internal.RefreshConnection", trace.AddInstanceName(i.String()), ) defer func() { go trace.RecordRefreshResult( - context.Background(), i.String(), r.dialerID, err, + context.Background(), i.String(), c.dialerID, err, ) refreshEnd(err) }() @@ -272,7 +275,7 @@ func (r refresher) performRefresh( mdCh := make(chan mdRes, 1) go func() { defer close(mdCh) - c, err := fetchInstanceInfo(ctx, r.client, i) + c, err := fetchInstanceInfo(ctx, c.client, i) mdCh <- mdRes{info: c, err: err} }() @@ -283,7 +286,7 @@ func (r refresher) performRefresh( certCh := make(chan certRes, 1) go func() { defer close(certCh) - cc, err := fetchClientCertificate(ctx, r.client, i, k) + cc, err := fetchClientCertificate(ctx, c.client, i, c.key) certCh <- certRes{cc: cc, err: err} }() diff --git a/internal/alloydb/refresh_test.go b/internal/alloydb/refresh_test.go index 7cc9a1ff..9be270ae 100644 --- a/internal/alloydb/refresh_test.go +++ b/internal/alloydb/refresh_test.go @@ -32,7 +32,7 @@ func TestRefresh(t *testing.T) { wantPublicIP := "127.0.0.1" wantPSC := "x.y.alloydb.goog" wantExpiry := time.Now().Add(time.Hour).UTC().Round(time.Second) - wantInstURI := "/projects/my-project/locations/my-region/clusters/my-cluster/instances/my-instance" + wantInstURI := "projects/my-project/locations/my-region/clusters/my-cluster/instances/my-instance" cn, err := ParseInstURI(wantInstURI) if err != nil { t.Fatalf("parseConnName(%s)failed : %v", cn, err) @@ -62,8 +62,8 @@ func TestRefresh(t *testing.T) { if err != nil { t.Fatalf("admin API client error: %v", err) } - r := newRefresher(cl, testDialerID) - res, err := r.performRefresh(context.Background(), cn, RSAKey) + r := newAdminAPIClient(cl, rsaKey, testDialerID) + res, err := r.connectionInfo(context.Background(), cn) if err != nil { t.Fatalf("performRefresh unexpectedly failed with error: %v", err) } @@ -98,7 +98,7 @@ func TestRefresh(t *testing.T) { } func TestRefreshFailsFast(t *testing.T) { - wantInstURI := "/projects/my-project/locations/my-region/clusters/my-cluster/instances/my-instance" + wantInstURI := "projects/my-project/locations/my-region/clusters/my-cluster/instances/my-instance" cn, err := ParseInstURI(wantInstURI) if err != nil { t.Fatalf("parseConnName(%s)failed : %v", cn, err) @@ -124,9 +124,9 @@ func TestRefreshFailsFast(t *testing.T) { if err != nil { t.Fatalf("admin API client error: %v", err) } - r := newRefresher(cl, testDialerID) + r := newAdminAPIClient(cl, rsaKey, testDialerID) - _, err = r.performRefresh(context.Background(), cn, RSAKey) + _, err = r.connectionInfo(context.Background(), cn) if err != nil { t.Fatalf("expected no error, got = %v", err) } @@ -134,7 +134,7 @@ func TestRefreshFailsFast(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // context is canceled - _, err = r.performRefresh(ctx, cn, RSAKey) + _, err = r.connectionInfo(ctx, cn) if !errors.Is(err, context.Canceled) { t.Fatalf("expected context.Canceled error, got = %v", err) }