Skip to content

Commit

Permalink
do not use atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
ktong committed Feb 10, 2024
1 parent 3e8945e commit 66099eb
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 13 deletions.
23 changes: 11 additions & 12 deletions provider/appconfig/appconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"log/slog"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -39,11 +38,11 @@ type AppConfig struct {
pollInterval time.Duration

timeout time.Duration
token atomic.Pointer[string]
token *string
}

// New creates an AppConfig with the given application, environment, profile and Option(s).
func New(application, environment, profile string, opts ...Option) *AppConfig {
func New(application, environment, profile string, opts ...Option) AppConfig {
if application == "" {
panic("cannot create AppConfig with empty application")
}
Expand Down Expand Up @@ -76,14 +75,14 @@ func New(application, environment, profile string, opts ...Option) *AppConfig {
}
option.client = &clientProxy{config: option.awsConfig}

return &option.AppConfig
return option.AppConfig
}

func (a *AppConfig) Load() (map[string]any, error) {
func (a AppConfig) Load() (map[string]any, error) {
ctx, cancel := context.WithTimeout(context.Background(), a.timeout)
defer cancel()

if a.token.Load() == nil {
if a.token == nil {
input := &appconfigdata.StartConfigurationSessionInput{
ApplicationIdentifier: aws.String(a.application),
ConfigurationProfileIdentifier: aws.String(a.profile),
Expand All @@ -94,13 +93,13 @@ func (a *AppConfig) Load() (map[string]any, error) {
if err != nil {
return nil, err
}
a.token.Store(output.InitialConfigurationToken)
a.token = output.InitialConfigurationToken
}

return a.load(ctx)
}

func (a *AppConfig) Watch(ctx context.Context, onChange func(map[string]any)) error {
func (a AppConfig) Watch(ctx context.Context, onChange func(map[string]any)) error {
ticker := time.NewTicker(a.pollInterval)
defer ticker.Stop()

Expand Down Expand Up @@ -129,18 +128,18 @@ func (a *AppConfig) Watch(ctx context.Context, onChange func(map[string]any)) er
}
}

func (a *AppConfig) load(ctx context.Context) (map[string]any, error) {
func (a AppConfig) load(ctx context.Context) (map[string]any, error) {
ctx, cancel := context.WithTimeout(ctx, a.timeout)
defer cancel()

input := &appconfigdata.GetLatestConfigurationInput{
ConfigurationToken: a.token.Load(),
ConfigurationToken: a.token,
}
output, err := a.client.GetLatestConfiguration(ctx, input)
if err != nil {
return nil, err
}
a.token.Store(output.NextPollConfigurationToken)
a.token = output.NextPollConfigurationToken

if len(output.Configuration) == 0 {
// It may return empty configuration data
Expand All @@ -156,7 +155,7 @@ func (a *AppConfig) load(ctx context.Context) (map[string]any, error) {
return out, nil
}

func (a *AppConfig) String() string {
func (a AppConfig) String() string {
return "appConfig:" + a.application + "-" + a.environment + "-" + a.profile
}

Expand Down
2 changes: 1 addition & 1 deletion provider/appconfig/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func BenchmarkNew(b *testing.B) {
var loader *appconfig.AppConfig
var loader appconfig.AppConfig
for i := 0; i < b.N; i++ {
loader = appconfig.New("app", "env", "profile")
}
Expand Down
148 changes: 148 additions & 0 deletions provider/azappconfig/appconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,160 @@
package azappconfig

import (
"context"
"fmt"
"log/slog"
"strings"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig"

"github.com/nil-go/konf/provider/azappconfig/internal/maps"
)

type AppConfig struct {
logger *slog.Logger

prefix string
label string
splitter func(string) []string

client *clientProxy
pollInterval time.Duration

timeout time.Duration
}

func New(endpoint string, opts ...Option) *AppConfig {
if endpoint == "" {
panic("cannot create Azure AppConfig with empty endpoint")
}

option := &options{
AppConfig: AppConfig{
timeout: 30 * time.Second, //nolint:gomnd
},
}
for _, opt := range opts {
opt(option)
}

if option.splitter == nil {
option.splitter = func(s string) []string { return strings.Split(s, "/") }
}
if option.logger == nil {
option.logger = slog.Default()
}
if option.pollInterval <= 0 {
option.pollInterval = time.Minute
}
option.client = &clientProxy{endpoint: endpoint}

return &option.AppConfig
}

func (a *AppConfig) Load() (map[string]any, error) {
pager, err := a.client.NewListSettingsPager(
azappconfig.SettingSelector{
KeyFilter: &a.prefix,
LabelFilter: &a.label,
Fields: []azappconfig.SettingFields{
azappconfig.SettingFieldsKey,
azappconfig.SettingFieldsValue,
},
},
&azappconfig.ListSettingsOptions{},
)
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), a.timeout)
defer cancel()

var (
values = make(map[string]any)
page azappconfig.ListSettingsPageResponse
)
for pager.More() {
if page, err = pager.NextPage(ctx); err != nil {
return nil, fmt.Errorf("next page of list settings: %w", err)
}

for _, setting := range page.Settings {
maps.Insert(values, a.splitter(*setting.Key), *setting.Value)
}
}

return values, nil
}

func (a *AppConfig) Watch(ctx context.Context, onChange func(map[string]any)) error {
ticker := time.NewTicker(a.pollInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
values, err := a.Load()
if err != nil {
a.logger.WarnContext(
ctx, "Error when reloading from Azure App Configuration",
"error", err,
)

continue
}

if values != nil {
onChange(values)
}
case <-ctx.Done():
return nil
}
}
}

type clientProxy struct {
client *azappconfig.Client
clientOnce sync.Once

endpoint string
credential azcore.TokenCredential
}

func (c *clientProxy) NewListSettingsPager(
selector azappconfig.SettingSelector,
options *azappconfig.ListSettingsOptions,
) (*runtime.Pager[azappconfig.ListSettingsPageResponse], error) {
client, err := c.loadClient()
if err != nil {
return nil, err
}

return client.NewListSettingsPager(selector, options), nil
}

func (c *clientProxy) loadClient() (*azappconfig.Client, error) {
var err error

c.clientOnce.Do(func() {
if c.credential, err = azidentity.NewDefaultAzureCredential(nil); err != nil {
err = fmt.Errorf("load default Azure credential: %w", err)

return
}

if c.client, err = azappconfig.NewClient(c.endpoint, c.credential, nil); err != nil {
err = fmt.Errorf("create Azure app configuration client: %w", err)

return
}
})

return c.client, err
}

0 comments on commit 66099eb

Please sign in to comment.