From cdcc3a8be00980c1fd851560c0612874e57e6e44 Mon Sep 17 00:00:00 2001 From: Guangning E Date: Fri, 29 Dec 2023 15:11:04 +0800 Subject: [PATCH] Update example, fixed update operation --- cloud/resource_pulsar_cluster.go | 221 ++++++++++++++++++++------ cloud/resource_pulsar_cluster_test.go | 25 +-- examples/pulsarclusters/main.tf | 27 +++- 3 files changed, 207 insertions(+), 66 deletions(-) diff --git a/cloud/resource_pulsar_cluster.go b/cloud/resource_pulsar_cluster.go index ce628a6..ac1f996 100644 --- a/cloud/resource_pulsar_cluster.go +++ b/cloud/resource_pulsar_cluster.go @@ -142,6 +142,68 @@ func resourcePulsarCluster() *schema.Resource { Optional: true, Description: descriptions["custom"], }, + "config": { + Type: schema.TypeSet, + Optional: true, + MinItems: 0, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "websocket_enabled": { + Type: schema.TypeBool, + Optional: true, + }, + "function_enabled": { + Type: schema.TypeBool, + Optional: true, + }, + "transaction_enabled": { + Type: schema.TypeBool, + Optional: true, + }, + "protocols": { + Type: schema.TypeSet, + Optional: true, + Description: descriptions["protocols"], + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "kafka": { + Type: schema.TypeMap, + Default: map[string]interface{}{}, + Optional: true, + }, + "mqtt": { + Type: schema.TypeMap, + Optional: true, + }, + }, + }, + }, + "audit_log": { + Type: schema.TypeSet, + Optional: true, + Description: descriptions["audit_log"], + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "categories": { + Type: schema.TypeSet, + Optional: true, + MinItems: 1, + Elem: &schema.Schema{ + Type: schema.TypeString, + ValidateFunc: validateAuditLog, + }, + }, + }, + }, + }, + "custom": { + Type: schema.TypeMap, + Optional: true, + Description: descriptions["custom"], + }, + }, + }, + }, "ready": { Type: schema.TypeString, Computed: true, @@ -376,8 +438,8 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me if err != nil { return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: %w", err)) } - // Delay 5 seconds to wait for api server start reconcile. - time.Sleep(5 * time.Second) + // Delay 10 seconds to wait for api server start reconcile. + time.Sleep(10 * time.Second) err = retry.RetryContext(ctx, 15*time.Minute, func() *retry.RetryError { dia := resourcePulsarClusterRead(ctx, d, meta) if dia.HasError() { @@ -415,55 +477,116 @@ func getPulsarClusterChanged(pulsarCluster *cloudv1alpha1.PulsarCluster, d *sche if pulsarCluster.Spec.Config == nil { pulsarCluster.Spec.Config = &cloudv1alpha1.Config{} } - websocketEnabled := d.Get("websocket_enabled").(bool) - if websocketEnabled { - pulsarCluster.Spec.Config.WebsocketEnabled = &websocketEnabled - changed = true - } - functionEnabled := d.Get("function_enabled").(bool) - if functionEnabled { - pulsarCluster.Spec.Config.FunctionEnabled = &functionEnabled - changed = true - } - transactionEnabled := d.Get("transaction_enabled").(bool) - if transactionEnabled { - pulsarCluster.Spec.Config.TransactionEnabled = &transactionEnabled - changed = true - } - auditLog := d.Get("audit_log").(*schema.Set) - if auditLog.Len() > 0 { - categories := make([]string, 0) - for _, category := range auditLog.List() { - categories = append(categories, category.(string)) - } - pulsarCluster.Spec.Config.AuditLog = &cloudv1alpha1.AuditLog{ - Categories: categories, - } - changed = true - } - if pulsarCluster.Spec.Config.Protocols == nil { - pulsarCluster.Spec.Config.Protocols = &cloudv1alpha1.ProtocolsConfig{} - } - kafka := d.Get("kafka").(map[string]interface{}) - if kafka != nil { - pulsarCluster.Spec.Config.Protocols.Kafka = &cloudv1alpha1.KafkaConfig{} - changed = true - } - mqtt := d.Get("mqtt").(map[string]interface{}) - if mqtt != nil { - pulsarCluster.Spec.Config.Protocols.Mqtt = &cloudv1alpha1.MqttConfig{} - changed = true - } - custom := d.Get("custom").(map[string]interface{}) - if custom != nil && len(custom) > 0 { - result := map[string]string{} - for k := range custom { - if v, ok := custom[k].(string); ok { - result[k] = v + config := d.Get("config").(*schema.Set) + if config.Len() > 0 { + for _, configItem := range config.List() { + configItemMap := configItem.(map[string]interface{}) + if configItemMap["websocket_enabled"] != nil { + webSocketEnabled := configItemMap["websocket_enabled"].(bool) + pulsarCluster.Spec.Config.WebsocketEnabled = &webSocketEnabled + changed = true + } + if configItemMap["function_enabled"] != nil { + functionEnabled := configItemMap["function_enabled"].(bool) + pulsarCluster.Spec.Config.FunctionEnabled = &functionEnabled + changed = true + } + if configItemMap["transaction_enabled"] != nil { + transactionEnabled := configItemMap["transaction_enabled"].(bool) + pulsarCluster.Spec.Config.TransactionEnabled = &transactionEnabled + changed = true + } + kafkaEnabled := true + mqttEnabled := true + if configItemMap["protocols"] != nil { + if pulsarCluster.Spec.Config.Protocols == nil { + pulsarCluster.Spec.Config.Protocols = &cloudv1alpha1.ProtocolsConfig{} + } + protocols := configItemMap["protocols"].(*schema.Set) + for _, protocolItem := range protocols.List() { + protocolItemMap := protocolItem.(map[string]interface{}) + kafka, ok := protocolItemMap["kafka"] + if ok { + if kafka != nil { + kafkaMap := kafka.(map[string]interface{}) + if enabled, ok := kafkaMap["enabled"]; ok { + flag := enabled.(string) + if flag == "false" { + kafkaEnabled = false + } + } + } + } + mqtt, ok := protocolItemMap["mqtt"] + if ok { + if mqtt != nil { + mqttMap := mqtt.(map[string]interface{}) + if enabled, ok := mqttMap["enabled"]; ok { + flag := enabled.(string) + if flag == "false" { + mqttEnabled = false + } + } + } + } + } + } + if kafkaEnabled { + pulsarCluster.Spec.Config.Protocols.Kafka = &cloudv1alpha1.KafkaConfig{} + } else { + pulsarCluster.Spec.Config.Protocols.Kafka = nil + } + if mqttEnabled { + pulsarCluster.Spec.Config.Protocols.Mqtt = &cloudv1alpha1.MqttConfig{} + } else { + pulsarCluster.Spec.Config.Protocols.Mqtt = nil + } + if d.HasChanges("protocols") { + changed = true + } + auditLogEnabled := false + categories := make([]string, 0) + if configItemMap["audit_log"] != nil { + auditLog := configItemMap["audit_log"].(*schema.Set) + if auditLog.Len() > 0 { + for _, category := range auditLog.List() { + c := category.(map[string]interface{}) + if _, ok := c["categories"]; ok { + categoriesSchema := c["categories"].(*schema.Set) + if categoriesSchema.Len() > 0 { + auditLogEnabled = true + for _, categoryItem := range categoriesSchema.List() { + categories = append(categories, categoryItem.(string)) + } + } + } + } + } + } + if auditLogEnabled { + pulsarCluster.Spec.Config.AuditLog = &cloudv1alpha1.AuditLog{ + Categories: categories, + } + } else { + pulsarCluster.Spec.Config.AuditLog = nil + } + if d.HasChanges("audit_log") { + changed = true + } + if configItemMap["custom"] != nil { + custom := configItemMap["custom"].(map[string]interface{}) + if len(custom) > 0 { + result := map[string]string{} + for k := range custom { + if v, ok := custom[k].(string); ok { + result[k] = v + } + } + pulsarCluster.Spec.Config.Custom = result + changed = true + } } } - pulsarCluster.Spec.Config.Custom = result - changed = true } return changed } diff --git a/cloud/resource_pulsar_cluster_test.go b/cloud/resource_pulsar_cluster_test.go index 182a4cf..722d0dd 100644 --- a/cloud/resource_pulsar_cluster_test.go +++ b/cloud/resource_pulsar_cluster_test.go @@ -105,15 +105,22 @@ resource "streamnative_pulsar_cluster" "test-pulsar-cluster" { name = "%s" instance_name = "%s" location = "%s" - websocket_enabled = true - function_enabled = true - transaction_enabled = true - kafka = {} - mqtt = {} - audit_log = ["Management", "Describe", "Produce", "Consume"] - custom = { - "allowAutoTopicCreation": "true" - } + config { + websocket_enabled = true + function_enabled = false + transaction_enabled = false + protocols { + mqtt = { + enabled = false + } + kafka = { + enabled = true + } + } + custom = { + allowAutoTopicCreation = "true" + } + } } `, organization, name, instanceName, location) } diff --git a/examples/pulsarclusters/main.tf b/examples/pulsarclusters/main.tf index dd29029..d4b041b 100644 --- a/examples/pulsarclusters/main.tf +++ b/examples/pulsarclusters/main.tf @@ -21,14 +21,25 @@ resource "streamnative_pulsar_cluster" "test-cluster-1" { broker_replicas = 2 compute_unit = 0.3 storage_unit = 0.3 - websocket_enabled = true - function_enabled = false - transaction_enabled = true - kafka = {} - mqtt = {} - audit_log = ["Management", "Describe", "Produce", "Consume"] - custom = { - "allowAutoTopicCreation": "true" + + config { + websocket_enabled = true + function_enabled = false + transaction_enabled = false + protocols { + mqtt = { + enabled = false + } + kafka = { + enabled = true + } + } + audit_log { + categories = ["Management", "Describe", "Produce", "Consume"] + } + custom = { + allowAutoTopicCreation = "true" + } } }