Skip to content

Commit

Permalink
add OnEvent on Azure providers (#297)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktong authored Apr 22, 2024
1 parent 5f9354e commit e1213e5
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

- Support change notification via SNS topic (#267).
- Support change notification via PubSub topic (#294).
- Add OnEvent on Azure providers (#297).

## [1.0.0] - 2024-03-16

Expand Down
48 changes: 46 additions & 2 deletions provider/azappconfig/appconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@
// It requires following roles to access Azure App Configuration:
// - App Configuration Data Reader
//
// # Change notification
//
// By default, it periodically polls the configuration only.
// It also listens to change events by register it to notifier with [Cloud Event schema].
//
// Only following events trigger polling the configuration and other type of events are ignored:
// - Microsoft.AppConfiguration.KeyValueModified
// - Microsoft.AppConfiguration.KeyValueDeleted
//
// [App Configuration]: https://docs.microsoft.com/en-us/azure/azure-app-configuration/
// [Cloud Event schema]: https://learn.microsoft.com/en-us/azure/azure-app-configuration/concept-app-configuration-event?tabs=cloud-event-schema
package azappconfig

import (
Expand All @@ -20,6 +30,7 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/messaging"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig"

Expand All @@ -33,8 +44,9 @@ type AppConfig struct {
splitter func(string) []string
pollInterval time.Duration

onStatus func(bool, error)
client clientProxy
onStatus func(bool, error)
changedCh chan struct{}
client clientProxy
}

// New creates an AppConfig with the given endpoint and Option(s).
Expand All @@ -45,6 +57,7 @@ func New(endpoint string, opts ...Option) *AppConfig {
credential: &azidentity.DefaultAzureCredential{},
endpoint: endpoint,
},
changedCh: make(chan struct{}, 1),
}
for _, opt := range opts {
opt(option)
Expand All @@ -70,6 +83,9 @@ func (a *AppConfig) Watch(ctx context.Context, onChange func(map[string]any)) er
if a == nil {
return errNil
}
if a.changedCh == nil {
a.changedCh = make(chan struct{}, 1)
}

pollInterval := time.Minute
if a.pollInterval > 0 {
Expand All @@ -81,6 +97,8 @@ func (a *AppConfig) Watch(ctx context.Context, onChange func(map[string]any)) er
for {
select {
case <-ticker.C:
a.changed()
case <-a.changedCh:
values, changed, err := a.load(ctx)
if a.onStatus != nil {
a.onStatus(changed, err)
Expand All @@ -94,6 +112,14 @@ func (a *AppConfig) Watch(ctx context.Context, onChange func(map[string]any)) er
}
}

func (a *AppConfig) changed() {
select {
case a.changedCh <- struct{}{}:
default:
// Ignore if the channel is full since it's already triggered.
}
}

func (a *AppConfig) load(ctx context.Context) (map[string]any, bool, error) {
resp, changed, err := a.client.load(ctx)
if !changed || err != nil {
Expand All @@ -117,6 +143,24 @@ func (a *AppConfig) load(ctx context.Context) (map[string]any, bool, error) {
return values, true, nil
}

func (a *AppConfig) OnEvent(event messaging.CloudEvent) error {
if a == nil {
return errNil
}

if strings.HasPrefix(*event.Subject, a.client.endpoint) {
switch event.Type {
case "Microsoft.AppConfiguration.KeyValueModified",
"Microsoft.AppConfiguration.KeyValueDeleted":
a.changed()
}

return nil
}

return fmt.Errorf("unsupported app configuration event: %w", errors.ErrUnsupported)
}

func (a *AppConfig) Status(onStatus func(bool, error)) {
a.onStatus = onStatus
}
Expand Down
66 changes: 62 additions & 4 deletions provider/azappconfig/appconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/messaging"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"

"github.com/nil-go/konf/provider/azappconfig"
"github.com/nil-go/konf/provider/azappconfig/internal/assert"
)
Expand All @@ -25,6 +29,8 @@ func TestAppConfig_empty(t *testing.T) {
assert.Equal(t, nil, values)
err = loader.Watch(context.Background(), nil)
assert.EqualError(t, err, "nil AppConfig")
err = loader.OnEvent(messaging.CloudEvent{})
assert.EqualError(t, err, "nil AppConfig")
}

func TestAppConfig(t *testing.T) {
Expand Down Expand Up @@ -133,9 +139,13 @@ func TestAppConfig_Load(t *testing.T) {
func TestAppConfig_Watch(t *testing.T) {
t.Parallel()

server := httpServer()
t.Cleanup(server.Close)

testcases := []struct {
description string
opts []azappconfig.Option
event messaging.CloudEvent
expected map[string]any
err string
}{
Expand Down Expand Up @@ -163,6 +173,46 @@ list settings error
--------------------------------------------------------------------------------
`,
},
{
description: "KeyValueModified",
event: messaging.CloudEvent{
Type: "Microsoft.AppConfiguration.KeyValueModified",
Subject: to.Ptr(server.URL + "/kv/k"),
},
expected: map[string]any{
"p": map[string]any{
"k": "v",
"d": ".",
},
},
},
{
description: "KeyValueDeleted",
event: messaging.CloudEvent{
Type: "Microsoft.AppConfiguration.KeyValueDeleted",
Subject: to.Ptr(server.URL + "/kv/k"),
},
expected: map[string]any{
"p": map[string]any{
"k": "v",
"d": ".",
},
},
},
{
description: "unmatched event",
event: messaging.CloudEvent{
Type: "Microsoft.Storage.BlobCreated",
Subject: to.Ptr("https://another.azconfig.io/kv/"),
},
expected: map[string]any{
"p": map[string]any{
"k": "v",
"d": ".",
},
},
err: "unsupported app configuration event: unsupported operation",
},
}

for _, testcase := range testcases {
Expand All @@ -171,9 +221,6 @@ list settings error
t.Run(testcase.description, func(t *testing.T) {
t.Parallel()

server := httpServer()
defer server.Close()

loader := azappconfig.New(
server.URL,
append(
Expand Down Expand Up @@ -204,12 +251,23 @@ list settings error
}()
<-started

if !reflect.ValueOf(testcase.event).IsZero() {
eerr := loader.OnEvent(testcase.event)
if testcase.err == "" {
assert.NoError(t, eerr)
} else {
assert.EqualError(t, eerr, testcase.err)
}
}

time.Sleep(15 * time.Millisecond) // wait for the first tick, but not the second
select {
case val := <-values:
assert.Equal(t, testcase.expected, val)
default:
assert.EqualError(t, *err.Load(), fmt.Sprintf(testcase.err, server.URL))
if e := err.Load(); e != nil {
assert.EqualError(t, *e, fmt.Sprintf(testcase.err, server.URL))
}
}
})
}
Expand Down
84 changes: 65 additions & 19 deletions provider/azblob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
// It requires following roles to access blob from Azure Blob Storage:
// - Storage Blob Data Reader
//
// # Change notification
//
// By default, it periodically polls the configuration only.
// It also listens to change events by register it to notifier with [Cloud Event schema].
//
// Only Microsoft.Storage.BlobCreated events trigger polling the configuration and other type of events are ignored.
//
// [Blob Storage]: https://azure.microsoft.com/en-us/products/storage/blobs
// [Cloud Event schema]: https://learn.microsoft.com/en-us/azure/event-grid/event-schema-blob-storage?tabs=cloud-event-schema
package azblob

import (
Expand All @@ -16,10 +24,12 @@ import (
"fmt"
"io"
"reflect"
"strings"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/messaging"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
Expand All @@ -33,8 +43,9 @@ type Blob struct {
pollInterval time.Duration
unmarshal func([]byte, any) error

onStatus func(bool, error)
client clientProxy
onStatus func(bool, error)
changedCh chan struct{}
client clientProxy
}

// New creates an Blob with the given endpoint and Option(s).
Expand All @@ -47,6 +58,7 @@ func New(endpoint, container, blob string, opts ...Option) *Blob {
container: container,
blob: blob,
},
changedCh: make(chan struct{}, 1),
}
for _, opt := range opts {
opt(option)
Expand All @@ -58,34 +70,39 @@ func New(endpoint, container, blob string, opts ...Option) *Blob {

var errNil = errors.New("nil Blob")

func (a *Blob) Load() (map[string]any, error) {
if a == nil {
func (b *Blob) Load() (map[string]any, error) {
if b == nil {
return nil, errNil
}

values, _, err := a.load(context.Background())
values, _, err := b.load(context.Background())

return values, err
}

func (a *Blob) Watch(ctx context.Context, onChange func(map[string]any)) error {
if a == nil {
func (b *Blob) Watch(ctx context.Context, onChange func(map[string]any)) error {
if b == nil {
return errNil
}
if b.changedCh == nil {
b.changedCh = make(chan struct{}, 1)
}

pollInterval := time.Minute
if a.pollInterval > 0 {
pollInterval = a.pollInterval
if b.pollInterval > 0 {
pollInterval = b.pollInterval
}
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
values, changed, err := a.load(ctx)
if a.onStatus != nil {
a.onStatus(changed, err)
b.changed()
case <-b.changedCh:
values, changed, err := b.load(ctx)
if b.onStatus != nil {
b.onStatus(changed, err)
}
if changed {
onChange(values)
Expand All @@ -96,13 +113,21 @@ func (a *Blob) Watch(ctx context.Context, onChange func(map[string]any)) error {
}
}

func (a *Blob) load(ctx context.Context) (map[string]any, bool, error) {
resp, changed, err := a.client.load(ctx)
func (b *Blob) changed() {
select {
case b.changedCh <- struct{}{}:
default:
// Ignore if the channel is full since it's already triggered.
}
}

func (b *Blob) load(ctx context.Context) (map[string]any, bool, error) {
resp, changed, err := b.client.load(ctx)
if !changed || err != nil {
return nil, false, err
}

unmarshal := a.unmarshal
unmarshal := b.unmarshal
if unmarshal == nil {
unmarshal = json.Unmarshal
}
Expand All @@ -114,12 +139,33 @@ func (a *Blob) load(ctx context.Context) (map[string]any, bool, error) {
return values, true, nil
}

func (a *Blob) Status(onStatus func(bool, error)) {
a.onStatus = onStatus
func (b *Blob) OnEvent(event messaging.CloudEvent) error {
if b == nil {
return errNil
}

var account string
if strings.Contains(b.client.endpoint, ".blob.core.windows.net") {
account, _, _ = strings.Cut(strings.TrimPrefix(b.client.endpoint, "https://"), ".blob.core.windows.net")
}
if strings.HasSuffix(event.Source, account) &&
*event.Subject == "/blobServices/default/containers/"+b.client.container+"/blobs/"+b.client.blob {
if event.Type == "Microsoft.Storage.BlobCreated" {
b.changed()
}

return nil
}

return fmt.Errorf("unsupported blob storage event: %w", errors.ErrUnsupported)
}

func (b *Blob) Status(onStatus func(bool, error)) {
b.onStatus = onStatus
}

func (a *Blob) String() string {
return a.client.endpoint + "/" + a.client.container + "/" + a.client.blob
func (b *Blob) String() string {
return b.client.endpoint + "/" + b.client.container + "/" + b.client.blob
}

type clientProxy struct {
Expand Down
Loading

0 comments on commit e1213e5

Please sign in to comment.