Skip to content

Commit

Permalink
Support topic_auto_creation
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng committed Jan 29, 2024
1 parent 60223d9 commit 71e45ae
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/streamnative/terraform-provider-pulsar
go 1.18

require (
github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf
github.com/apache/pulsar-client-go v0.12.0
github.com/cenkalti/backoff/v4 v4.1.2
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/terraform-plugin-log v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki
github.com/agext/levenshtein v1.2.2 h1:0S/Yg6LYmFJ5stwQeRp6EeOcCbj7xiqQSdNelsXvaqE=
github.com/agext/levenshtein v1.2.2/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf h1:k9hqsKPh5ncKf0e3CkzvBTYXLwCYNYFb1Vtk3qnYAvk=
github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf/go.mod h1:Ea/yiZA7plgiaWRyOuO1B0k5/hjpl1thmiKig+D9PBQ=
github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I=
github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/apparentlymart/go-cidr v1.1.0 h1:2mAhrMoF+nhXqxTzSZMUzDHkLjmIHC+Zzn4tdgBZjnU=
github.com/apparentlymart/go-cidr v1.1.0/go.mod h1:EBcsNrHc3zQeuaeCeCtQruQm+n9/YjEn/vI25Lg7Gwc=
github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM=
Expand Down
76 changes: 76 additions & 0 deletions pulsar/resource_pulsar_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,29 @@ func resourcePulsarNamespace() *schema.Resource {
},
},
},
"topic_auto_creation": {
Type: schema.TypeSet,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enable": {
Type: schema.TypeBool,
Required: true,
},
"type": {
Type: schema.TypeString,
Required: false,
ValidateFunc: validatePartitionedTopicType,
},
"partitions": {
Type: schema.TypeInt,
Required: false,
},
},
},
Set: topicAutoCreationPoliciesToHash,
},
},
}
}
Expand Down Expand Up @@ -411,6 +434,21 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me
setPermissionGrant(d, grants)
}

if topicAutoCreation, ok := d.GetOk("topic_auto_creation"); ok && topicAutoCreation.(*schema.Set).Len() > 0 {
autoCreation, err := client.GetTopicAutoCreation(*ns)
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetTopicAutoCreation: %w", err))
}

_ = d.Set("topic_auto_creation", schema.NewSet(topicAutoCreationPoliciesToHash, []interface{}{
map[string]interface{}{
"enable": autoCreation.Allow,
"type": autoCreation.Type,
"partitions": autoCreation.Partitions,
},
}))
}

return nil
}

Expand All @@ -426,6 +464,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,
dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set)
persistencePoliciesConfig := d.Get("persistence_policies").(*schema.Set)
permissionGrantConfig := d.Get("permission_grant").(*schema.Set)
topicAutoCreation := d.Get("topic_auto_creation").(*schema.Set)

nsName, err := utils.GetNameSpaceName(tenant, namespace)
if err != nil {
Expand Down Expand Up @@ -562,6 +601,17 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData,
}
}

if topicAutoCreation.Len() > 0 {
topicAutoCreationPolicy := unmarshalTopicAutoCreation(topicAutoCreation)
if err = client.SetTopicAutoCreation(*nsName, *topicAutoCreationPolicy); err != nil {
errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err))
}
} else { // remove the topicAutoCreation
if err = client.RemoveTopicAutoCreation(*nsName); err != nil {
errs = multierror.Append(errs, fmt.Errorf("RemoveTopicAutoCreation: %w", err))
}
}

if errs != nil {
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_NAMESPACE_CONFIG: %w", errs))
}
Expand Down Expand Up @@ -591,6 +641,7 @@ func resourcePulsarNamespaceDelete(ctx context.Context, d *schema.ResourceData,
_ = d.Set("dispatch_rate", nil)
_ = d.Set("persistence_policies", nil)
_ = d.Set("permission_grant", nil)
_ = d.Set("topic_auto_creation", nil)

return nil
}
Expand Down Expand Up @@ -644,6 +695,17 @@ func persistencePoliciesToHash(v interface{}) int {
return hashcode.String(buf.String())
}

func topicAutoCreationPoliciesToHash(v interface{}) int {
var buf bytes.Buffer
m := v.(map[string]interface{})

buf.WriteString(fmt.Sprintf("%t-", m["enable"].(bool)))
buf.WriteString(fmt.Sprintf("%s-", m["type"].(string)))
buf.WriteString(fmt.Sprintf("%d-", m["partitions"].(int)))

return hashcode.String(buf.String())
}

func unmarshalDispatchRate(v *schema.Set) *utils.DispatchRate {
var dispatchRate utils.DispatchRate

Expand Down Expand Up @@ -710,3 +772,17 @@ func unmarshalPersistencePolicies(v *schema.Set) *utils.PersistencePolicies {

return &persPolicies
}

func unmarshalTopicAutoCreation(v *schema.Set) *utils.TopicAutoCreationConfig {
var topicAutoCreation utils.TopicAutoCreationConfig

for _, policy := range v.List() {
data := policy.(map[string]interface{})

topicAutoCreation.Allow = data["allow"].(bool)
topicAutoCreation.Type = utils.TopicType(data["type"].(string))
topicAutoCreation.Partitions = data["partitions"].(*int)
}

return &topicAutoCreation
}
9 changes: 9 additions & 0 deletions pulsar/validate_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,12 @@ func validateAuthAction(val interface{}, key string) (warns []string, errs []err
}
return
}

func validatePartitionedTopicType(val interface{}, key string) (warns []string, errs []error) {
v := val.(string)
_, err := utils.ParseTopicType(v)
if err != nil {
errs = append(errs, fmt.Errorf("%q must be a valid topic type (got: %s): %w", key, v, err))
}
return
}

0 comments on commit 71e45ae

Please sign in to comment.