From 71e45aef879beaf3ae3806c6fcf3b898f19c3ec1 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Mon, 29 Jan 2024 10:17:27 +0800 Subject: [PATCH] Support topic_auto_creation --- go.mod | 2 +- go.sum | 4 +- pulsar/resource_pulsar_namespace.go | 76 +++++++++++++++++++++++++++++ pulsar/validate_helpers.go | 9 ++++ 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index bed91e0..b9ad84e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/streamnative/terraform-provider-pulsar go 1.18 require ( - github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf + github.com/apache/pulsar-client-go v0.12.0 github.com/cenkalti/backoff/v4 v4.1.2 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/terraform-plugin-log v0.4.0 diff --git a/go.sum b/go.sum index ac1b044..4878009 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki github.com/agext/levenshtein v1.2.2 h1:0S/Yg6LYmFJ5stwQeRp6EeOcCbj7xiqQSdNelsXvaqE= github.com/agext/levenshtein v1.2.2/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= -github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf h1:k9hqsKPh5ncKf0e3CkzvBTYXLwCYNYFb1Vtk3qnYAvk= -github.com/apache/pulsar-client-go v0.9.1-0.20230816081803-fbee610ddcbf/go.mod h1:Ea/yiZA7plgiaWRyOuO1B0k5/hjpl1thmiKig+D9PBQ= +github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I= +github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/apparentlymart/go-cidr v1.1.0 h1:2mAhrMoF+nhXqxTzSZMUzDHkLjmIHC+Zzn4tdgBZjnU= github.com/apparentlymart/go-cidr v1.1.0/go.mod h1:EBcsNrHc3zQeuaeCeCtQruQm+n9/YjEn/vI25Lg7Gwc= github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM= diff --git a/pulsar/resource_pulsar_namespace.go b/pulsar/resource_pulsar_namespace.go index 8c391bd..0656e02 100644 --- a/pulsar/resource_pulsar_namespace.go +++ b/pulsar/resource_pulsar_namespace.go @@ -230,6 +230,29 @@ func resourcePulsarNamespace() *schema.Resource { }, }, }, + "topic_auto_creation": { + Type: schema.TypeSet, + Optional: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "enable": { + Type: schema.TypeBool, + Required: true, + }, + "type": { + Type: schema.TypeString, + Required: false, + ValidateFunc: validatePartitionedTopicType, + }, + "partitions": { + Type: schema.TypeInt, + Required: false, + }, + }, + }, + Set: topicAutoCreationPoliciesToHash, + }, }, } } @@ -411,6 +434,21 @@ func resourcePulsarNamespaceRead(ctx context.Context, d *schema.ResourceData, me setPermissionGrant(d, grants) } + if topicAutoCreation, ok := d.GetOk("topic_auto_creation"); ok && topicAutoCreation.(*schema.Set).Len() > 0 { + autoCreation, err := client.GetTopicAutoCreation(*ns) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_NAMESPACE: GetTopicAutoCreation: %w", err)) + } + + _ = d.Set("topic_auto_creation", schema.NewSet(topicAutoCreationPoliciesToHash, []interface{}{ + map[string]interface{}{ + "enable": autoCreation.Allow, + "type": autoCreation.Type, + "partitions": autoCreation.Partitions, + }, + })) + } + return nil } @@ -426,6 +464,7 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, dispatchRateConfig := d.Get("dispatch_rate").(*schema.Set) persistencePoliciesConfig := d.Get("persistence_policies").(*schema.Set) permissionGrantConfig := d.Get("permission_grant").(*schema.Set) + topicAutoCreation := d.Get("topic_auto_creation").(*schema.Set) nsName, err := utils.GetNameSpaceName(tenant, namespace) if err != nil { @@ -562,6 +601,17 @@ func resourcePulsarNamespaceUpdate(ctx context.Context, d *schema.ResourceData, } } + if topicAutoCreation.Len() > 0 { + topicAutoCreationPolicy := unmarshalTopicAutoCreation(topicAutoCreation) + if err = client.SetTopicAutoCreation(*nsName, *topicAutoCreationPolicy); err != nil { + errs = multierror.Append(errs, fmt.Errorf("SetTopicAutoCreation: %w", err)) + } + } else { // remove the topicAutoCreation + if err = client.RemoveTopicAutoCreation(*nsName); err != nil { + errs = multierror.Append(errs, fmt.Errorf("RemoveTopicAutoCreation: %w", err)) + } + } + if errs != nil { return diag.FromErr(fmt.Errorf("ERROR_UPDATE_NAMESPACE_CONFIG: %w", errs)) } @@ -591,6 +641,7 @@ func resourcePulsarNamespaceDelete(ctx context.Context, d *schema.ResourceData, _ = d.Set("dispatch_rate", nil) _ = d.Set("persistence_policies", nil) _ = d.Set("permission_grant", nil) + _ = d.Set("topic_auto_creation", nil) return nil } @@ -644,6 +695,17 @@ func persistencePoliciesToHash(v interface{}) int { return hashcode.String(buf.String()) } +func topicAutoCreationPoliciesToHash(v interface{}) int { + var buf bytes.Buffer + m := v.(map[string]interface{}) + + buf.WriteString(fmt.Sprintf("%t-", m["enable"].(bool))) + buf.WriteString(fmt.Sprintf("%s-", m["type"].(string))) + buf.WriteString(fmt.Sprintf("%d-", m["partitions"].(int))) + + return hashcode.String(buf.String()) +} + func unmarshalDispatchRate(v *schema.Set) *utils.DispatchRate { var dispatchRate utils.DispatchRate @@ -710,3 +772,17 @@ func unmarshalPersistencePolicies(v *schema.Set) *utils.PersistencePolicies { return &persPolicies } + +func unmarshalTopicAutoCreation(v *schema.Set) *utils.TopicAutoCreationConfig { + var topicAutoCreation utils.TopicAutoCreationConfig + + for _, policy := range v.List() { + data := policy.(map[string]interface{}) + + topicAutoCreation.Allow = data["allow"].(bool) + topicAutoCreation.Type = utils.TopicType(data["type"].(string)) + topicAutoCreation.Partitions = data["partitions"].(*int) + } + + return &topicAutoCreation +} diff --git a/pulsar/validate_helpers.go b/pulsar/validate_helpers.go index a08f53f..3904eb0 100644 --- a/pulsar/validate_helpers.go +++ b/pulsar/validate_helpers.go @@ -50,3 +50,12 @@ func validateAuthAction(val interface{}, key string) (warns []string, errs []err } return } + +func validatePartitionedTopicType(val interface{}, key string) (warns []string, errs []error) { + v := val.(string) + _, err := utils.ParseTopicType(v) + if err != nil { + errs = append(errs, fmt.Errorf("%q must be a valid topic type (got: %s): %w", key, v, err)) + } + return +}