Skip to content

Commit

Permalink
support both name and arn for sns topic (#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktong authored Apr 22, 2024
1 parent f7c8b85 commit 5f9354e
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ It also supports watching the changes of configuration using corresponding notif
For example, the `sns` notifier notifies the changes of `appconfig` and `s3` provider:

```
notifier := sns.NewNotifier("arn:aws:sns:us-west-1:851725503283:konf-test")
notifier := sns.NewNotifier("konf-test")
notifier.Register(s3Loader)
notifier.Register(appConfigLoader)
go func() {
Expand Down
22 changes: 18 additions & 4 deletions notifier/sns/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// It [Fanout SNS topic to Amazon SQS queues], which requires following permissions:
// - sns:Subscribe
// - sns:Unsubscribe
// - sns:CreateTopic
// - sqs:CreateQueue
//
// [Fanout SNS topic to Amazon SQS queues]: https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html
Expand All @@ -22,6 +23,7 @@ import (
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/arn"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sqs"
Expand All @@ -44,7 +46,7 @@ type Notifier struct {

type loader interface{ OnEvent([]byte) error }

// NewNotifier creates a Notifier with the given SNS topic ARN.
// NewNotifier creates a Notifier with the given SNS topic Name or ARN.
func NewNotifier(topic string, opts ...Option) *Notifier {
option := &options{
topic: topic,
Expand Down Expand Up @@ -88,6 +90,19 @@ func (n *Notifier) Start(ctx context.Context) error { //nolint:cyclop,funlen,goc
}
}

snsClient := sns.NewFromConfig(n.config)
topicArn := n.topic
if !arn.IsARN(topicArn) {
// Here uses CreateTopic to get topic ARN as the topic already exists.
topic, err := snsClient.CreateTopic(ctx, &sns.CreateTopicInput{
Name: aws.String(n.topic),
})
if err != nil {
return fmt.Errorf("get sns topic ARN: %w", err)
}
topicArn = *topic.TopicArn
}

stsClient := sts.NewFromConfig(n.config)
identity, err := stsClient.GetCallerIdentity(ctx, &sts.GetCallerIdentityInput{})
if err != nil {
Expand Down Expand Up @@ -124,7 +139,7 @@ func (n *Notifier) Start(ctx context.Context) error { //nolint:cyclop,funlen,goc
"Resource":"*"
}
]
}`, n.topic, aws.ToString(identity.Arn))
}`, topicArn, aws.ToString(identity.Arn))

sqsClient := sqs.NewFromConfig(n.config)
uuid, err := rand.NewUUID(rand.Reader).GetUUID()
Expand Down Expand Up @@ -160,9 +175,8 @@ func (n *Notifier) Start(ctx context.Context) error { //nolint:cyclop,funlen,goc
}
queueArn := queueAttrs.Attributes["QueueArn"]

snsClient := sns.NewFromConfig(n.config)
Subscription, err := snsClient.Subscribe(ctx, &sns.SubscribeInput{
TopicArn: aws.String(n.topic),
TopicArn: aws.String(topicArn),
Protocol: aws.String("sqs"),
Endpoint: aws.String(queueArn),
Attributes: map[string]string{"RawMessageDelivery": "true"},
Expand Down
89 changes: 89 additions & 0 deletions notifier/sns/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ func TestNotifier(t *testing.T) {
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{
Result: &sqs.CreateQueueOutput{
Expand Down Expand Up @@ -130,6 +136,12 @@ func TestNotifier(t *testing.T) {
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{
Result: &sqs.CreateQueueOutput{
Expand Down Expand Up @@ -187,6 +199,12 @@ func TestNotifier(t *testing.T) {
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{
Result: &sqs.CreateQueueOutput{
Expand Down Expand Up @@ -256,6 +274,12 @@ level=WARN msg="No loader to process message." msg=message
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{
Result: &sqs.CreateQueueOutput{
Expand Down Expand Up @@ -320,13 +344,36 @@ level=WARN msg="Fail to process message." msg=message loader=loader error="proce
switch awsMiddleware.GetOperationName(ctx) {
case "GetCallerIdentity":
return middleware.FinalizeOutput{}, middleware.Metadata{}, errors.New("get caller identity error")
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
default:
return middleware.FinalizeOutput{}, middleware.Metadata{}, nil
}
},
notified: false,
error: "get caller identity: operation error STS: GetCallerIdentity, get caller identity error",
},
{
description: "CreateTopic error",
middleware: func(
ctx context.Context,
_ middleware.FinalizeInput,
_ middleware.FinalizeHandler,
) (middleware.FinalizeOutput, middleware.Metadata, error) {
switch awsMiddleware.GetOperationName(ctx) {
case "CreateTopic":
return middleware.FinalizeOutput{}, middleware.Metadata{}, errors.New("create topic error")
default:
return middleware.FinalizeOutput{}, middleware.Metadata{}, nil
}
},
notified: false,
error: "get sns topic ARN: operation error SNS: CreateTopic, create topic error",
},
{
description: "CreateQueue error",
middleware: func(
Expand All @@ -341,6 +388,12 @@ level=WARN msg="Fail to process message." msg=message loader=loader error="proce
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{}, middleware.Metadata{}, errors.New("create queue error")
default:
Expand All @@ -364,6 +417,12 @@ level=WARN msg="Fail to process message." msg=message loader=loader error="proce
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{
Result: &sqs.CreateQueueOutput{
Expand Down Expand Up @@ -397,6 +456,12 @@ level=WARN msg="Fail to process message." msg=message loader=loader error="proce
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{
Result: &sqs.CreateQueueOutput{
Expand Down Expand Up @@ -463,6 +528,12 @@ level=WARN msg="Fail to delete sqs queue." queue=https://sqs.us-west-2.amazonaws
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{
Result: &sqs.CreateQueueOutput{
Expand Down Expand Up @@ -504,6 +575,12 @@ level=WARN msg="Fail to delete sqs queue." queue=https://sqs.us-west-2.amazonaws
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{
Result: &sqs.CreateQueueOutput{
Expand Down Expand Up @@ -570,6 +647,12 @@ level=WARN msg="Fail to unsubscribe sns topic." topic=topic error="operation err
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{
Result: &sqs.CreateQueueOutput{
Expand Down Expand Up @@ -623,6 +706,12 @@ level=WARN msg="Fail to receive sqs message." queue=https://sqs.us-west-2.amazon
Arn: aws.String("arn:aws:sts::123456789012:assumed-role/role-name/session-name"),
},
}, middleware.Metadata{}, nil
case "CreateTopic":
return middleware.FinalizeOutput{
Result: &sns.CreateTopicOutput{
TopicArn: aws.String("arn:aws:sns:us-west-2:123456789012:MyTopic"),
},
}, middleware.Metadata{}, nil
case "CreateQueue":
return middleware.FinalizeOutput{
Result: &sqs.CreateQueueOutput{
Expand Down

0 comments on commit 5f9354e

Please sign in to comment.