Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
freeznet committed Jan 8, 2025
1 parent ec143af commit 6713f4f
Showing 1 changed file with 68 additions and 57 deletions.
125 changes: 68 additions & 57 deletions pulsar/resource_pulsar_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,31 +153,26 @@ func resourcePulsarNamespace() *schema.Resource {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validateNotBlank,
Computed: true,
},
"max_consumers_per_subscription": {
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validateGtEq0,
Computed: true,
},
"max_consumers_per_topic": {
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validateGtEq0,
Computed: true,
},
"max_producers_per_topic": {
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validateGtEq0,
Computed: true,
},
"message_ttl_seconds": {
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validateGtEq0,
Computed: true,
},
"replication_clusters": {
Type: schema.TypeList,
Expand All @@ -186,12 +181,10 @@ func resourcePulsarNamespace() *schema.Resource {
Elem: &schema.Schema{
Type: schema.TypeString,
},
Computed: true,
},
"schema_validation_enforce": {
Type: schema.TypeBool,
Optional: true,
Computed: true,
},
"schema_compatibility_strategy": {
Type: schema.TypeString,
Expand All @@ -202,13 +195,11 @@ func resourcePulsarNamespace() *schema.Resource {
"is_allow_auto_update_schema": {
Type: schema.TypeBool,
Optional: true,
Computed: true,
},
"offload_threshold_size_in_mb": {
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validateGtEq0,
Computed: true,
},
},
},
Expand Down Expand Up @@ -336,75 +327,95 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me
_ = d.Set("tenant", tenant)

if namespaceConfig, ok := d.GetOk("namespace_config"); ok && namespaceConfig.(*schema.Set).Len() > 0 {
afgrp, err := client.GetNamespaceAntiAffinityGroup(ns.String())
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceAntiAffinityGroup: %w", err))
}
configData := namespaceConfig.(*schema.Set).List()[0].(map[string]interface{})
data := make(map[string]interface{})

maxConsPerSub, err := client.GetMaxConsumersPerSubscription(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerSubscription: %w", err))
if _, ok := configData["anti_affinity"]; ok {
afgrp, err := client.GetNamespaceAntiAffinityGroup(ns.String())
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceAntiAffinityGroup: %w", err))
}
data["anti_affinity"] = strings.Trim(strings.TrimSpace(afgrp), "\"")
}

maxConsPerTopic, err := client.GetMaxConsumersPerTopic(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerTopic: %w", err))
if _, ok := configData["max_consumers_per_subscription"]; ok {
maxConsPerSub, err := client.GetMaxConsumersPerSubscription(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerSubscription: %w", err))
}
data["max_consumers_per_subscription"] = maxConsPerSub
}

maxProdPerTopic, err := client.GetMaxProducersPerTopic(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err))
if _, ok := configData["max_consumers_per_topic"]; ok {
maxConsPerTopic, err := client.GetMaxConsumersPerTopic(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxConsumersPerTopic: %w", err))
}
data["max_consumers_per_topic"] = maxConsPerTopic
}

messageTTL, err := client.GetNamespaceMessageTTL(ns.String())
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceMessageTTL: %w", err))
if _, ok := configData["max_producers_per_topic"]; ok {
maxProdPerTopic, err := client.GetMaxProducersPerTopic(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err))
}
data["max_producers_per_topic"] = maxProdPerTopic
}

schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err))
if _, ok := configData["message_ttl_seconds"]; ok {
messageTTL, err := client.GetNamespaceMessageTTL(ns.String())
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetNamespaceMessageTTL: %w", err))
}
data["message_ttl_seconds"] = messageTTL
}

schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err))
if _, ok := configData["schema_validation_enforce"]; ok {
schemaValidationEnforce, err := client.GetSchemaValidationEnforced(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaValidationEnforced: %w", err))
}
data["schema_validation_enforce"] = schemaValidationEnforce
}

replClustersRaw, err := client.GetNamespaceReplicationClusters(ns.String())
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err))
if _, ok := configData["schema_compatibility_strategy"]; ok {
schemaCompatibilityStrategy, err := client.GetSchemaAutoUpdateCompatibilityStrategy(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetSchemaAutoUpdateCompatibilityStrategy: %w", err))
}
data["schema_compatibility_strategy"] = schemaCompatibilityStrategy.String()
}

replClusters := make([]interface{}, len(replClustersRaw))
for i, cl := range replClustersRaw {
replClusters[i] = cl
if _, ok := configData["replication_clusters"]; ok {
replClustersRaw, err := client.GetNamespaceReplicationClusters(ns.String())
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetMaxProducersPerTopic: %w", err))
}

replClusters := make([]interface{}, len(replClustersRaw))
for i, cl := range replClustersRaw {
replClusters[i] = cl
}
data["replication_clusters"] = replClusters
}

isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err))
if _, ok := configData["is_allow_auto_update_schema"]; ok {
isAllowAutoUpdateSchema, err := client.GetIsAllowAutoUpdateSchema(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetIsAllowAutoUpdateSchema: %w", err))
}
data["is_allow_auto_update_schema"] = isAllowAutoUpdateSchema
}

offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err))
if _, ok := configData["offload_threshold_size_in_mb"]; ok {
offloadTresholdSizeInMb, err := client.GetOffloadThreshold(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetOffloadThreshold: %w", err))
}
data["offload_threshold_size_in_mb"] = int(offloadTresholdSizeInMb)
}

_ = d.Set("namespace_config", schema.NewSet(namespaceConfigToHash, []interface{}{
map[string]interface{}{
"anti_affinity": strings.Trim(strings.TrimSpace(afgrp), "\""),
"max_consumers_per_subscription": maxConsPerSub,
"max_consumers_per_topic": maxConsPerTopic,
"max_producers_per_topic": maxProdPerTopic,
"message_ttl_seconds": messageTTL,
"replication_clusters": replClusters,
"schema_validation_enforce": schemaValidationEnforce,
"schema_compatibility_strategy": schemaCompatibilityStrategy.String(),
"is_allow_auto_update_schema": isAllowAutoUpdateSchema,
"offload_threshold_size_in_mb": int(offloadTresholdSizeInMb),
},
}))
_ = d.Set("namespace_config", schema.NewSet(namespaceConfigToHash, []interface{}{data}))
}

if persPoliciesCfg, ok := d.GetOk("persistence_policies"); ok && persPoliciesCfg.(*schema.Set).Len() > 0 {
Expand Down

0 comments on commit 6713f4f

Please sign in to comment.