Skip to content

Commit

Permalink
feat: add support for a lazy refresh (#565)
Browse files Browse the repository at this point in the history
When creating a Dialer with the WithLazyRefresh option, the connection
info and ephemeral certificate will be refreshed only when the cache
certificate has expired. No background goroutines run with this option,
making it ideal for use in Cloud Run and other serverless environments
where the CPU may be throttled.

This is a port of
GoogleCloudPlatform/cloud-sql-go-connector#772

Fixes #549
  • Loading branch information
enocom authored May 1, 2024
1 parent 154ab5f commit 75fb63e
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 12 deletions.
31 changes: 24 additions & 7 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ type Dialer struct {
// closed reports if the dialer has been closed.
closed chan struct{}

// lazyRefresh determines what kind of caching is used for ephemeral
// certificates. When lazyRefresh is true, the dialer will use a lazy
// cache, refresh certificates only when a connection attempt needs a fresh
// certificate. Otherwise, a refresh ahead cache will be used. The refresh
// ahead cache assumes a background goroutine may run consistently.
lazyRefresh bool

client *alloydbadmin.AlloyDBAdminClient
logger debug.Logger

Expand Down Expand Up @@ -186,6 +193,7 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
d := &Dialer{
closed: make(chan struct{}),
cache: make(map[alloydb.InstanceURI]monitoredCache),
lazyRefresh: cfg.lazyRefresh,
key: cfg.rsaKey,
refreshTimeout: cfg.refreshTimeout,
client: client,
Expand Down Expand Up @@ -551,18 +559,27 @@ func (d *Dialer) connectionInfoCache(
// Recheck to ensure instance wasn't created between locks
c, ok = d.cache[uri]
if !ok {
c = monitoredCache{
connectionInfoCache: alloydb.NewRefreshAheadCache(
d.logger.Debugf(
"[%v] Connection info added to cache",
uri.String(),
)
var cache connectionInfoCache
if d.lazyRefresh {
cache = alloydb.NewLazyRefreshCache(
uri,
d.logger,
d.client, d.key,
d.refreshTimeout, d.dialerID,
),
)
} else {
cache = alloydb.NewRefreshAheadCache(
uri,
d.logger,
d.client, d.key,
d.refreshTimeout, d.dialerID,
)
}
d.logger.Debugf(
"[%v] Connection info added to cache",
uri.String(),
)
c = monitoredCache{connectionInfoCache: cache}
d.cache[uri] = c
}
}
Expand Down
120 changes: 120 additions & 0 deletions internal/alloydb/lazy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package alloydb

import (
"context"
"crypto/rsa"
"sync"
"time"

alloydbadmin "cloud.google.com/go/alloydb/apiv1alpha"
"cloud.google.com/go/alloydbconn/debug"
)

// LazyRefreshCache is caches connection info and refreshes the cache only when
// a caller requests connection info and the current certificate is expired.
type LazyRefreshCache struct {
uri InstanceURI
logger debug.Logger
key *rsa.PrivateKey
r refresher
mu sync.Mutex
needsRefresh bool
cached ConnectionInfo
}

// NewLazyRefreshCache initializes a new LazyRefreshCache.
func NewLazyRefreshCache(
uri InstanceURI,
l debug.Logger,
client *alloydbadmin.AlloyDBAdminClient,
key *rsa.PrivateKey,
_ time.Duration,
dialerID string,
) *LazyRefreshCache {
return &LazyRefreshCache{
uri: uri,
logger: l,
key: key,
r: newRefresher(
client,
dialerID,
),
}
}

// ConnectionInfo returns connection info for the associated instance. New
// connection info is retrieved under two conditions:
// - the current connection info's certificate has expired, or
// - a caller has separately called ForceRefresh
func (c *LazyRefreshCache) ConnectionInfo(
ctx context.Context,
) (ConnectionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
// strip monotonic clock with UTC()
now := time.Now().UTC()
// Pad expiration with a buffer to give the client plenty of time to
// establish a connection to the server with the certificate.
exp := c.cached.Expiration.UTC().Add(-refreshBuffer)
if !c.needsRefresh && now.Before(exp) {
c.logger.Debugf(
"[%v] Connection info is still valid, using cached info",
c.uri.String(),
)
return c.cached, nil
}

c.logger.Debugf(
"[%v] Connection info refresh operation started",
c.uri.String(),
)
ci, err := c.r.performRefresh(ctx, c.uri, c.key)
if err != nil {
c.logger.Debugf(
"[%v] Connection info refresh operation failed, err = %v",
c.uri.String(),
err,
)
return ConnectionInfo{}, err
}
c.logger.Debugf(
"[%v] Connection info refresh operation complete",
c.uri.String(),
)
c.logger.Debugf(
"[%v] Current certificate expiration = %v",
c.uri.String(),
ci.Expiration.UTC().Format(time.RFC3339),
)
c.cached = ci
c.needsRefresh = false
return ci, nil
}

// ForceRefresh invalidates the caches and configures the next call to
// ConnectionInfo to retrieve a fresh connection info.
func (c *LazyRefreshCache) ForceRefresh() {
c.mu.Lock()
defer c.mu.Unlock()
c.needsRefresh = true
}

// Close is a no-op and provided purely for a consistent interface with other
// caching types.
func (c *LazyRefreshCache) Close() error {
return nil
}
107 changes: 107 additions & 0 deletions internal/alloydb/lazy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package alloydb

import (
"context"
"testing"
"time"

alloydbadmin "cloud.google.com/go/alloydb/apiv1alpha"
"cloud.google.com/go/alloydbconn/internal/mock"
"google.golang.org/api/option"
)

func TestLazyRefreshCacheConnectionInfo(t *testing.T) {
u := testInstanceURI()
inst := mock.NewFakeInstance(u.project, u.region, u.cluster, u.name)
client, url, cleanup := mock.HTTPClient(
mock.InstanceGetSuccess(inst, 1),
mock.CreateEphemeralSuccess(inst, 1),
)
defer func() {
if err := cleanup(); err != nil {
t.Fatalf("%v", err)
}
}()
ctx := context.Background()
c, err := alloydbadmin.NewAlloyDBAdminRESTClient(
ctx,
option.WithHTTPClient(client),
option.WithEndpoint(url),
option.WithTokenSource(stubTokenSource{}),
)
if err != nil {
t.Fatalf("expected NewClient to succeed, but got error: %v", err)
}
cache := NewLazyRefreshCache(
testInstanceURI(), nullLogger{}, c,
RSAKey, 30*time.Second, "",
)

ci, err := cache.ConnectionInfo(context.Background())
if err != nil {
t.Fatal(err)
}
if ci.Instance != u {
t.Fatalf("want = %v, got = %v", u, ci.Instance)
}
// Request connection info again to ensure it uses the cache and doesn't
// send another API call.
_, err = cache.ConnectionInfo(context.Background())
if err != nil {
t.Fatal(err)
}
}

func TestLazyRefreshCacheForceRefresh(t *testing.T) {
u := testInstanceURI()
inst := mock.NewFakeInstance(u.project, u.region, u.cluster, u.name)
client, url, cleanup := mock.HTTPClient(
mock.InstanceGetSuccess(inst, 2),
mock.CreateEphemeralSuccess(inst, 2),
)
defer func() {
if err := cleanup(); err != nil {
t.Fatalf("%v", err)
}
}()
ctx := context.Background()
c, err := alloydbadmin.NewAlloyDBAdminRESTClient(
ctx,
option.WithHTTPClient(client),
option.WithEndpoint(url),
option.WithTokenSource(stubTokenSource{}),
)
if err != nil {
t.Fatalf("expected NewClient to succeed, but got error: %v", err)
}
cache := NewLazyRefreshCache(
testInstanceURI(), nullLogger{}, c,
RSAKey, 30*time.Second, "",
)

_, err = cache.ConnectionInfo(context.Background())
if err != nil {
t.Fatal(err)
}

cache.ForceRefresh()

_, err = cache.ConnectionInfo(context.Background())
if err != nil {
t.Fatal(err)
}
}
12 changes: 7 additions & 5 deletions internal/alloydb/refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,22 +232,23 @@ type refresher struct {

// ConnectionInfo holds all the data necessary to connect to an instance.
type ConnectionInfo struct {
Instance InstanceURI
IPAddrs map[string]string
ClientCert tls.Certificate
RootCAs *x509.CertPool
Expiration time.Time
}

func (r refresher) performRefresh(
ctx context.Context, cn InstanceURI, k *rsa.PrivateKey,
ctx context.Context, i InstanceURI, k *rsa.PrivateKey,
) (res ConnectionInfo, err error) {
var refreshEnd trace.EndSpanFunc
ctx, refreshEnd = trace.StartSpan(ctx, "cloud.google.com/go/alloydbconn/internal.RefreshConnection",
trace.AddInstanceName(cn.String()),
trace.AddInstanceName(i.String()),
)
defer func() {
go trace.RecordRefreshResult(
context.Background(), cn.String(), r.dialerID, err,
context.Background(), i.String(), r.dialerID, err,
)
refreshEnd(err)
}()
Expand All @@ -259,7 +260,7 @@ func (r refresher) performRefresh(
mdCh := make(chan mdRes, 1)
go func() {
defer close(mdCh)
c, err := fetchInstanceInfo(ctx, r.client, cn)
c, err := fetchInstanceInfo(ctx, r.client, i)
mdCh <- mdRes{info: c, err: err}
}()

Expand All @@ -270,7 +271,7 @@ func (r refresher) performRefresh(
certCh := make(chan certRes, 1)
go func() {
defer close(certCh)
cc, err := fetchClientCertificate(ctx, r.client, cn, k)
cc, err := fetchClientCertificate(ctx, r.client, i, k)
certCh <- certRes{cc: cc, err: err}
}()

Expand Down Expand Up @@ -303,6 +304,7 @@ func (r refresher) performRefresh(
caCerts := x509.NewCertPool()
caCerts.AddCert(cc.caCert)
ci := ConnectionInfo{
Instance: i,
IPAddrs: info.ipAddrs,
ClientCert: cc.certChain,
RootCAs: caCerts,
Expand Down
14 changes: 14 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type dialerConfig struct {
userAgents []string
useIAMAuthN bool
logger debug.Logger
lazyRefresh bool
// err tracks any dialer options that may have failed.
err error
}
Expand Down Expand Up @@ -174,6 +175,19 @@ func WithDebugLogger(l debug.Logger) Option {
}
}

// WithLazyRefresh configures the dialer to refresh certificates on an
// as-needed basis. If a certificate is expired when a connection request
// occurs, the Go Connector will block the attempt and refresh the certificate
// immediately. This option is useful when running the Go Connector in
// environments where the CPU may be throttled, thus preventing a background
// goroutine from running consistently (e.g., in Cloud Run the CPU is throttled
// outside of a request context causing the background refresh to fail).
func WithLazyRefresh() Option {
return func(d *dialerConfig) {
d.lazyRefresh = true
}
}

// A DialOption is an option for configuring how a Dialer's Dial call is
// executed.
type DialOption func(d *dialCfg)
Expand Down

0 comments on commit 75fb63e

Please sign in to comment.