Skip to content

Commit

Permalink
Fix errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng committed Jan 29, 2024
1 parent 71e45ae commit 6dabcd9
Showing 1 changed file with 37 additions and 16 deletions.
53 changes: 37 additions & 16 deletions pulsar/resource_pulsar_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,13 @@ func resourcePulsarNamespace() *schema.Resource {
},
"type": {
Type: schema.TypeString,
Required: false,
Optional: true,
ValidateFunc: validatePartitionedTopicType,
Default: "non-partitioned",
},
"partitions": {
Type: schema.TypeInt,
Required: false,
Optional: true,
},
},
},
Expand Down Expand Up @@ -440,13 +441,15 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetTopicAutoCreation: %w", err))
}

_ = d.Set("topic_auto_creation", schema.NewSet(topicAutoCreationPoliciesToHash, []interface{}{
map[string]interface{}{
"enable": autoCreation.Allow,
"type": autoCreation.Type,
"partitions": autoCreation.Partitions,
},
}))
data := map[string]interface{}{
"enable": autoCreation.Allow,
"type": autoCreation.Type.String(),
}
if autoCreation.Partitions != nil {
data["partitions"] = *autoCreation.Partitions
}

_ = d.Set("topic_auto_creation", schema.NewSet(topicAutoCreationPoliciesToHash, []interface{}{data}))
}

return nil
Expand Down Expand Up @@ -602,9 +605,13 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,
}

if topicAutoCreation.Len() > 0 {
topicAutoCreationPolicy := unmarshalTopicAutoCreation(topicAutoCreation)
if err = client.SetTopicAutoCreation(*nsName, *topicAutoCreationPolicy); err != nil {
topicAutoCreationPolicy, err := unmarshalTopicAutoCreation(topicAutoCreation)
if err != nil {
errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err))
} else {
if err = client.SetTopicAutoCreation(*nsName, *topicAutoCreationPolicy); err != nil {
errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err))
}
}
} else { // remove the topicAutoCreation
if err = client.RemoveTopicAutoCreation(*nsName); err != nil {
Expand Down Expand Up @@ -701,7 +708,9 @@ func topicAutoCreationPoliciesToHash(v interface{}) int {

buf.WriteString(fmt.Sprintf("%t-", m["enable"].(bool)))
buf.WriteString(fmt.Sprintf("%s-", m["type"].(string)))
buf.WriteString(fmt.Sprintf("%d-", m["partitions"].(int)))
if m["partitions"] != nil {
buf.WriteString(fmt.Sprintf("%d-", m["partitions"].(int)))
}

return hashcode.String(buf.String())
}
Expand Down Expand Up @@ -773,16 +782,28 @@ func unmarshalPersistencePolicies(v *schema.Set) *utils.PersistencePolicies {
return &persPolicies
}

func unmarshalTopicAutoCreation(v *schema.Set) *utils.TopicAutoCreationConfig {
func unmarshalTopicAutoCreation(v *schema.Set) (*utils.TopicAutoCreationConfig, error) {
var topicAutoCreation utils.TopicAutoCreationConfig

for _, policy := range v.List() {
data := policy.(map[string]interface{})

topicAutoCreation.Allow = data["allow"].(bool)
topicAutoCreation.Allow = data["enable"].(bool)
topicAutoCreation.Type = utils.TopicType(data["type"].(string))
topicAutoCreation.Partitions = data["partitions"].(*int)
if topicAutoCreation.Type == utils.Partitioned {
if data["partitions"] == nil {
return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: partitions is required for partitioned topic")
}
partitions := data["partitions"].(int)
topicAutoCreation.Partitions = &partitions
} else if topicAutoCreation.Type == utils.NonPartitioned {
if data["partitions"] != nil {
return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: partitions is not allowed for non-partitioned topic")
}
} else {
return nil, fmt.Errorf("ERROR_PARSE_TOPIC_AUTO_CREATION: unknown topic type %s", topicAutoCreation.Type)
}
}

return &topicAutoCreation
return &topicAutoCreation, nil
}

0 comments on commit 6dabcd9

Please sign in to comment.