diff --git a/README.md b/README.md
index 7be5fa9..0b93704 100644
--- a/README.md
+++ b/README.md
@@ -221,6 +221,12 @@ resource "pulsar_namespace" "test" {
dispatch_byte_throttling_rate = 2048
}
+ subscription_dispatch_rate {
+ dispatch_msg_throttling_rate = 50
+ rate_period_seconds = 50
+ dispatch_byte_throttling_rate = 2048
+ }
+
retention_policies {
retention_minutes = "1600"
retention_size_in_mb = "10000"
@@ -249,17 +255,18 @@ resource "pulsar_namespace" "test" {
#### Properties
-| Property | Description | Required |
-| ---------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- |
-| `tenant` | Name of the Tenant managing this namespace | Yes |
-| `namespace` | name of the namespace | Yes |
-| `enable_deduplication` | Message deduplication state on a namespace | No |
-| `namespace_config` | Configuration for your namespaces like max allowed producers to produce messages | No |
-| `dispatch_rate` | Apache Pulsar throttling config | No |
-| `retention_policies` | Data retention policies | No |
-| `backlog_quota` | [Backlog Quota](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-backlog-quota-policies) for all topics | No |
-| `persistence_policies` | [Persistence policies](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-persistence-policies) for all topics under a given namespace | No |
-| `permission_grant` | [Permission grants](https://pulsar.apache.org/docs/en/admin-api-permissions/) on a namespace. This block can be repeated for each grant you'd like to add | No |
+| Property | Description | Required |
+| ---------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- | -------- |
+| `tenant` | Name of the Tenant managing this namespace | Yes |
+| `namespace` | name of the namespace | Yes |
+| `enable_deduplication` | Message deduplication state on a namespace | No |
+| `namespace_config` | Configuration for your namespaces like max allowed producers to produce messages | No |
+| `dispatch_rate` | [Apache Pulsar throttling config for topics](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-dispatch-throttling-for-topics) | No |
+| `subscription_dispatch_rate` | [Apache Pulsar throttling config for subscriptions](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-dispatch-throttling-for-subscription) | No |
+| `retention_policies` | Data retention policies | No |
+| `backlog_quota` | [Backlog Quota](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-backlog-quota-policies) for all topics | No |
+| `persistence_policies` | [Persistence policies](https://pulsar.apache.org/docs/en/admin-api-namespaces/#set-persistence-policies) for all topics under a given namespace | No |
+| `permission_grant` | [Permission grants](https://pulsar.apache.org/docs/en/admin-api-permissions/) on a namespace. This block can be repeated for each grant you'd like to add | No |
namespace_config nested schema
diff --git a/docs/resources/namespace.md b/docs/resources/namespace.md
index d19f28b..20778a8 100644
--- a/docs/resources/namespace.md
+++ b/docs/resources/namespace.md
@@ -20,8 +20,10 @@ description: |-
### Optional
- `backlog_quota` (Block Set) (see [below for nested schema](#nestedblock--backlog_quota))
-- `dispatch_rate` (Block Set, Max: 1) Data transfer rate, in and out of the Pulsar Broker (
+- `dispatch_rate` (Block Set, Max: 1) Data transfer rate for all the topics under the given namespace (
see [below for nested schema](#nestedblock--dispatch_rate))
+- `subscription_dispatch_rate` (Block Set, Max: 1) Data transfer rate for all the subscriptions under the given
+ namespace (see [below for nested schema](#nestedblock--subscription_dispatch_rate))
- `enable_deduplication` (Boolean)
- `namespace_config` (Block Set, Max: 1) (see [below for nested schema](#nestedblock--namespace_config))
- `permission_grant` (Block Set) (see [below for nested schema](#nestedblock--permission_grant))
@@ -54,6 +56,16 @@ Required:
- `dispatch_msg_throttling_rate` (Number)
- `rate_period_seconds` (Number)
+
+
+### Nested Schema for `subscription_dispatch_rate`
+
+Required:
+
+- `dispatch_byte_throttling_rate` (Number)
+- `dispatch_msg_throttling_rate` (Number)
+- `rate_period_seconds` (Number)
+
### Nested Schema for `namespace_config`
diff --git a/pulsar/provider.go b/pulsar/provider.go
index 154aaa5..47aa5cd 100644
--- a/pulsar/provider.go
+++ b/pulsar/provider.go
@@ -57,7 +57,8 @@ func init() {
"max_consumers_per_subscription": "Max number of consumers per subscription",
"max_consumers_per_topic": "Max number of consumers per topic",
"message_ttl_seconds": "Sets the message time to live",
- "dispatch_rate": "Data transfer rate, in and out of the Pulsar Broker",
+ "dispatch_rate": "Data transfer rate for all the topics under the given namespace",
+ "subscription_dispatch_rate": "Data transfer rate for all the subscriptions under the given namespace",
"persistence_policy": "Policy for the namespace for data persistence",
"backlog_quota": "",
"issuer_url": "The OAuth 2.0 URL of the authentication provider which allows the Pulsar client to obtain an access token",
diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go
index 54a6e83..183a68c 100644
--- a/pulsar/resource_pulsar_namespace.go
+++ b/pulsar/resource_pulsar_namespace.go
@@ -94,6 +94,29 @@ func resourcePulsarNamespace() *schema.Resource {
},
Set: dispatchRateToHash,
},
+ "subscription_dispatch_rate": {
+ Type: schema.TypeSet,
+ Optional: true,
+ Description: descriptions["subscription_dispatch_rate"],
+ MaxItems: 1,
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "dispatch_msg_throttling_rate": {
+ Type: schema.TypeInt,
+ Required: true,
+ },
+ "rate_period_seconds": {
+ Type: schema.TypeInt,
+ Required: true,
+ },
+ "dispatch_byte_throttling_rate": {
+ Type: schema.TypeInt,
+ Required: true,
+ },
+ },
+ },
+ Set: dispatchRateToHash,
+ },
"retention_policies": {
Type: schema.TypeSet,
Optional: true,
@@ -426,6 +449,21 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me
}))
}
+ if subscriptionDispatchRateCfg, ok := d.GetOk("subscription_dispatch_rate"); ok && subscriptionDispatchRateCfg.(*schema.Set).Len() > 0 { //nolint:lll
+ sdr, err := client.GetSubscriptionDispatchRate(*ns)
+ if err != nil {
+ return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSubscriptionDispatchRate: %w", err))
+ }
+
+ _ = d.Set("subscription_dispatch_rate", schema.NewSet(dispatchRateToHash, []interface{}{
+ map[string]interface{}{
+ "dispatch_msg_throttling_rate": sdr.DispatchThrottlingRateInMsg,
+ "rate_period_seconds": sdr.RatePeriodInSecond,
+ "dispatch_byte_throttling_rate": int(sdr.DispatchThrottlingRateInByte),
+ },
+ }))
+ }
+
if permissionGrantCfg, ok := d.GetOk("permission_grant"); ok && len(permissionGrantCfg.(*schema.Set).List()) > 0 {
grants, err := client.GetNamespacePermissions(*ns)
if err != nil {
@@ -465,6 +503,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,
retentionPoliciesConfig := d.Get("retention_policies").(*schema.Set)
backlogQuotaConfig := d.Get("backlog_quota").(*schema.Set)
dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set)
+ subscriptionDispatchRateConfig := d.Get("subscription_dispatch_rate").(*schema.Set)
persistencePoliciesConfig := d.Get("persistence_policies").(*schema.Set)
permissionGrantConfig := d.Get("permission_grant").(*schema.Set)
topicAutoCreation := d.Get("topic_auto_creation").(*schema.Set)
@@ -560,6 +599,13 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,
}
}
+ if subscriptionDispatchRateConfig.Len() > 0 {
+ subscriptionDispatchRate := unmarshalDispatchRate(subscriptionDispatchRateConfig)
+ if err = client.SetSubscriptionDispatchRate(*nsName, *subscriptionDispatchRate); err != nil {
+ errs = multierror.Append(errs, fmt.Errorf("SetSubscriptionDispatchRate: %w", err))
+ }
+ }
+
if persistencePoliciesConfig.Len() > 0 {
persistencePolicies := unmarshalPersistencePolicies(persistencePoliciesConfig)
if err = client.SetPersistence(nsName.String(), *persistencePolicies); err != nil {
@@ -646,6 +692,7 @@ func resourcePulsarNamespaceDelete(ctx context.Context, d *schema.ResourceData,
_ = d.Set("retention_policies", nil)
_ = d.Set("backlog_quota", nil)
_ = d.Set("dispatch_rate", nil)
+ _ = d.Set("subscription_dispatch_rate", nil)
_ = d.Set("persistence_policies", nil)
_ = d.Set("permission_grant", nil)
_ = d.Set("topic_auto_creation", nil)
diff --git a/pulsar/resource_pulsar_namespace_test.go b/pulsar/resource_pulsar_namespace_test.go
index 9260157..a0eb450 100644
--- a/pulsar/resource_pulsar_namespace_test.go
+++ b/pulsar/resource_pulsar_namespace_test.go
@@ -110,6 +110,7 @@ func TestNamespaceWithUpdate(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "0"),
+ resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "0"),
resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "0"),
resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "0"),
resource.TestCheckNoResourceAttr(resourceName, "enable_deduplication"),
@@ -121,6 +122,7 @@ func TestNamespaceWithUpdate(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "1"),
+ resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "1"),
resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "1"),
resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "enable_deduplication", "true"),
@@ -162,6 +164,7 @@ func TestNamespaceWithUndefinedOptionalsUpdate(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "0"),
+ resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "0"),
resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "0"),
resource.TestCheckResourceAttr(resourceName, "backlog_quota.#", "0"),
resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "0"),
@@ -174,6 +177,7 @@ func TestNamespaceWithUndefinedOptionalsUpdate(t *testing.T) {
Check: resource.ComposeTestCheckFunc(
testPulsarNamespaceExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "dispatch_rate.#", "0"),
+ resource.TestCheckResourceAttr(resourceName, "subscription_dispatch_rate.#", "0"),
resource.TestCheckResourceAttr(resourceName, "retention_policies.#", "0"),
resource.TestCheckResourceAttr(resourceName, "backlog_quota.#", "0"),
resource.TestCheckResourceAttr(resourceName, "namespace_config.#", "1"),
@@ -395,8 +399,8 @@ func testNamespaceImported() resource.ImportStateCheckFunc {
return fmt.Errorf("expected %d states, got %d: %#v", 1, len(s), s)
}
- if len(s[0].Attributes) != 11 {
- return fmt.Errorf("expected %d attrs, got %d: %#v", 11, len(s[0].Attributes), s[0].Attributes)
+ if len(s[0].Attributes) != 12 {
+ return fmt.Errorf("expected %d attrs, got %d: %#v", 12, len(s[0].Attributes), s[0].Attributes)
}
return nil
@@ -507,6 +511,12 @@ resource "pulsar_namespace" "test" {
dispatch_byte_throttling_rate = 2048
}
+ subscription_dispatch_rate {
+ dispatch_msg_throttling_rate = 50
+ rate_period_seconds = 50
+ dispatch_byte_throttling_rate = 2048
+ }
+
retention_policies {
retention_minutes = "1600"
retention_size_in_mb = "10000"