Skip to content

Commit

Permalink
fix: JOIN-50367 add labels do dead letter topic subscription (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-taran authored Jan 30, 2025
1 parent 8c9b126 commit b7718a6
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 38 deletions.
22 changes: 15 additions & 7 deletions packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,11 @@ export class Subscriber<T = unknown> {

private async initializeDeadLetterSubscription() {
if (this.deadLetterSubscriptionName && this.deadLetterSubscription) {
await this.initializeSubscription(this.deadLetterSubscriptionName, this.deadLetterSubscription)
const options = this.createEmptyOptions()
if (this.subscriberOptions.subscriptionOptions?.labels) {
options.labels = this.subscriberOptions.subscriptionOptions.labels
}
await this.initializeSubscription(this.deadLetterSubscriptionName, this.deadLetterSubscription, options)
await this.addPubsubServiceAccountRole(this.subscription.iam, 'roles/pubsub.subscriber')
}
}
Expand Down Expand Up @@ -296,12 +300,7 @@ export class Subscriber<T = unknown> {
}

private getInitializationOptions(): ISubscriptionInitializationOptions {
const options: ISubscriptionInitializationOptions = {
deadLetterPolicy: null,
retryPolicy: {},
labels: {},
filter: ''
}
const options: ISubscriptionInitializationOptions = this.createEmptyOptions()

const { subscriptionOptions } = this.subscriberOptions
if (!subscriptionOptions) {
Expand Down Expand Up @@ -398,4 +397,13 @@ export class Subscriber<T = unknown> {
}
return false;
}

private createEmptyOptions(): ISubscriptionInitializationOptions {
return {
deadLetterPolicy: null,
retryPolicy: {},
labels: {},
filter: '',
}
}
}
86 changes: 55 additions & 31 deletions packages/pubsub/src/__tests__/Subscriber.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,7 @@ describe('Subscriber', () => {
},
])

const subscriberWithFilter = new Subscriber(
{
topicName,
subscriptionName,
subscriptionOptions: {
...subscriptionOptions,
},
},
clientMock as unknown as PubSub,
schemaClientMock as unknown as SchemaServiceClient,
undefined as unknown as SubscriberClient,
loggerMock,
)

await subscriberWithFilter.initialize()
await subscriber.initialize()

expect(loggerMock.error).not.toHaveBeenCalled()
})
Expand Down Expand Up @@ -391,29 +377,33 @@ describe('Subscriber', () => {
it('does not update metadata if dead letter policy did not change', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
const deadLetterPolicy = {
const returnedFromGcloudDeadLetterPolicy = {
maxDeliveryAttempts: 123,
deadLetterTopic: 'projects/gcloudProjectName/topics/subscription-name-unack',
}
subscriptionMock.getMetadata.mockResolvedValue([
{
retryPolicy: {
minimumBackoff: { seconds: String(subscriptionOptions.minBackoffSeconds) },
maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) },
},
labels: { testKey: 'testValue' },
deadLetterPolicy
const topicSubscriptionMetadata = {
retryPolicy: {
minimumBackoff: { seconds: String(subscriptionOptions.minBackoffSeconds) },
maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) },
},
])
const optionsWithCopiedDeadLetterPolicy: ISubscriptionOptions = { ...subscriptionOptions,
labels: { testKey: 'testValue' },
deadLetterPolicy: returnedFromGcloudDeadLetterPolicy,
}
subscriptionMock.getMetadata.mockResolvedValueOnce([topicSubscriptionMetadata])
const deadLetterTopicSubscriptionMetadata = {
labels: { testKey: 'testValue' },
}
subscriptionMock.getMetadata.mockResolvedValueOnce([deadLetterTopicSubscriptionMetadata])
const optionsWithSameDeadLetterPolicy: ISubscriptionOptions = {
...subscriptionOptions,
maxDeliveryAttempts: 123,
gcloudProject: {
name: 'gcloudProjectName',
id: 123456789,
},
}
subscriber = new Subscriber(
{ topicName, subscriptionName, subscriptionOptions: optionsWithCopiedDeadLetterPolicy },
{ topicName, subscriptionName, subscriptionOptions: optionsWithSameDeadLetterPolicy },
clientMock as unknown as PubSub,
schemaClientMock as unknown as SchemaServiceClient,
undefined as unknown as SubscriberClient,
Expand All @@ -429,7 +419,7 @@ describe('Subscriber', () => {
it('updates metadata if dead letter policy changed', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
const deadLetterPolicy = {
const returnedFromGcloudDeadLetterPolicy = {
maxDeliveryAttempts: 123,
deadLetterTopic: 'projects/gcloudProjectName/topics/subscription-name-unack',
}
Expand All @@ -440,18 +430,18 @@ describe('Subscriber', () => {
maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) },
},
labels: { testKey: 'testValue' },
deadLetterPolicy
deadLetterPolicy: returnedFromGcloudDeadLetterPolicy
},
])
const optionsWithCopiedDeadLetterPolicy: ISubscriptionOptions = { ...subscriptionOptions,
const optionsWithChangedDeadLetterPolicy: ISubscriptionOptions = { ...subscriptionOptions,
maxDeliveryAttempts: 10,
gcloudProject: {
name: 'gcloudProjectName',
id: 123456789,
},
}
subscriber = new Subscriber(
{ topicName, subscriptionName, subscriptionOptions: optionsWithCopiedDeadLetterPolicy },
{ topicName, subscriptionName, subscriptionOptions: optionsWithChangedDeadLetterPolicy },
clientMock as unknown as PubSub,
schemaClientMock as unknown as SchemaServiceClient,
undefined as unknown as SubscriberClient,
Expand Down Expand Up @@ -603,11 +593,45 @@ describe('Subscriber', () => {

expect(subscriptionMock.create).toHaveBeenCalledTimes(2)
expect(subscriptionMock.create).toHaveBeenLastCalledWith({
deadLetterPolicy: null,
retryPolicy: {},
labels: { testKey: 'testValue' },
filter: '',
gaxOpts: createCallOptions,
})
expect(topicMock.subscription).toHaveBeenLastCalledWith(deadLetterSubscriptionName)
})

it('updates labels on dead letter subscription if changed', async () => {
subscriptionMock.exists.mockResolvedValue([true])
const deadLetterPolicy = {
maxDeliveryAttempts: 123,
deadLetterTopic: 'projects/gcloudProjectName/topics/subscription-name-unack',
}
const topicSubscriptionMetadata = {
retryPolicy: {
minimumBackoff: { seconds: String(subscriptionOptions.minBackoffSeconds) },
maximumBackoff: { seconds: String(subscriptionOptions.maxBackoffSeconds) },
},
labels: { testKey: 'testValue' },
deadLetterPolicy
}
subscriptionMock.getMetadata.mockResolvedValueOnce([topicSubscriptionMetadata])
const deadLetterTopicSubscriptionMetadata = {
labels: { },
}
subscriptionMock.getMetadata.mockResolvedValueOnce([deadLetterTopicSubscriptionMetadata])

await subscriber.initialize()

expect(subscriptionMock.setMetadata).toHaveBeenLastCalledWith({
deadLetterPolicy: null,
retryPolicy: {},
labels: { testKey: 'testValue' },
})
expect(topicMock.subscription).toHaveBeenLastCalledWith(deadLetterSubscriptionName)
})

it('adds subscriber role to pubsub service account', async () => {
subscriptionMock.exists.mockResolvedValue([false])

Expand Down

0 comments on commit b7718a6

Please sign in to comment.