Skip to content

Commit

Permalink
Update example, fixed update operation
Browse files Browse the repository at this point in the history
  • Loading branch information
tuteng committed Dec 29, 2023
1 parent 0f10447 commit cdcc3a8
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 66 deletions.
221 changes: 172 additions & 49 deletions cloud/resource_pulsar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,68 @@ func resourcePulsarCluster() *schema.Resource {
Optional: true,
Description: descriptions["custom"],
},
"config": {
Type: schema.TypeSet,
Optional: true,
MinItems: 0,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"websocket_enabled": {
Type: schema.TypeBool,
Optional: true,
},
"function_enabled": {
Type: schema.TypeBool,
Optional: true,
},
"transaction_enabled": {
Type: schema.TypeBool,
Optional: true,
},
"protocols": {
Type: schema.TypeSet,
Optional: true,
Description: descriptions["protocols"],
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"kafka": {
Type: schema.TypeMap,
Default: map[string]interface{}{},
Optional: true,
},
"mqtt": {
Type: schema.TypeMap,
Optional: true,
},
},
},
},
"audit_log": {
Type: schema.TypeSet,
Optional: true,
Description: descriptions["audit_log"],
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"categories": {
Type: schema.TypeSet,
Optional: true,
MinItems: 1,
Elem: &schema.Schema{
Type: schema.TypeString,
ValidateFunc: validateAuditLog,
},
},
},
},
},
"custom": {
Type: schema.TypeMap,
Optional: true,
Description: descriptions["custom"],
},
},
},
},
"ready": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -376,8 +438,8 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: %w", err))
}
// Delay 5 seconds to wait for api server start reconcile.
time.Sleep(5 * time.Second)
// Delay 10 seconds to wait for api server start reconcile.
time.Sleep(10 * time.Second)
err = retry.RetryContext(ctx, 15*time.Minute, func() *retry.RetryError {
dia := resourcePulsarClusterRead(ctx, d, meta)
if dia.HasError() {
Expand Down Expand Up @@ -415,55 +477,116 @@ func getPulsarClusterChanged(pulsarCluster *cloudv1alpha1.PulsarCluster, d *sche
if pulsarCluster.Spec.Config == nil {
pulsarCluster.Spec.Config = &cloudv1alpha1.Config{}
}
websocketEnabled := d.Get("websocket_enabled").(bool)
if websocketEnabled {
pulsarCluster.Spec.Config.WebsocketEnabled = &websocketEnabled
changed = true
}
functionEnabled := d.Get("function_enabled").(bool)
if functionEnabled {
pulsarCluster.Spec.Config.FunctionEnabled = &functionEnabled
changed = true
}
transactionEnabled := d.Get("transaction_enabled").(bool)
if transactionEnabled {
pulsarCluster.Spec.Config.TransactionEnabled = &transactionEnabled
changed = true
}
auditLog := d.Get("audit_log").(*schema.Set)
if auditLog.Len() > 0 {
categories := make([]string, 0)
for _, category := range auditLog.List() {
categories = append(categories, category.(string))
}
pulsarCluster.Spec.Config.AuditLog = &cloudv1alpha1.AuditLog{
Categories: categories,
}
changed = true
}
if pulsarCluster.Spec.Config.Protocols == nil {
pulsarCluster.Spec.Config.Protocols = &cloudv1alpha1.ProtocolsConfig{}
}
kafka := d.Get("kafka").(map[string]interface{})
if kafka != nil {
pulsarCluster.Spec.Config.Protocols.Kafka = &cloudv1alpha1.KafkaConfig{}
changed = true
}
mqtt := d.Get("mqtt").(map[string]interface{})
if mqtt != nil {
pulsarCluster.Spec.Config.Protocols.Mqtt = &cloudv1alpha1.MqttConfig{}
changed = true
}
custom := d.Get("custom").(map[string]interface{})
if custom != nil && len(custom) > 0 {
result := map[string]string{}
for k := range custom {
if v, ok := custom[k].(string); ok {
result[k] = v
config := d.Get("config").(*schema.Set)
if config.Len() > 0 {
for _, configItem := range config.List() {
configItemMap := configItem.(map[string]interface{})
if configItemMap["websocket_enabled"] != nil {
webSocketEnabled := configItemMap["websocket_enabled"].(bool)
pulsarCluster.Spec.Config.WebsocketEnabled = &webSocketEnabled
changed = true
}
if configItemMap["function_enabled"] != nil {
functionEnabled := configItemMap["function_enabled"].(bool)
pulsarCluster.Spec.Config.FunctionEnabled = &functionEnabled
changed = true
}
if configItemMap["transaction_enabled"] != nil {
transactionEnabled := configItemMap["transaction_enabled"].(bool)
pulsarCluster.Spec.Config.TransactionEnabled = &transactionEnabled
changed = true
}
kafkaEnabled := true
mqttEnabled := true
if configItemMap["protocols"] != nil {
if pulsarCluster.Spec.Config.Protocols == nil {
pulsarCluster.Spec.Config.Protocols = &cloudv1alpha1.ProtocolsConfig{}
}
protocols := configItemMap["protocols"].(*schema.Set)
for _, protocolItem := range protocols.List() {
protocolItemMap := protocolItem.(map[string]interface{})
kafka, ok := protocolItemMap["kafka"]
if ok {
if kafka != nil {
kafkaMap := kafka.(map[string]interface{})
if enabled, ok := kafkaMap["enabled"]; ok {
flag := enabled.(string)
if flag == "false" {
kafkaEnabled = false
}
}
}
}
mqtt, ok := protocolItemMap["mqtt"]
if ok {
if mqtt != nil {
mqttMap := mqtt.(map[string]interface{})
if enabled, ok := mqttMap["enabled"]; ok {
flag := enabled.(string)
if flag == "false" {
mqttEnabled = false
}
}
}
}
}
}
if kafkaEnabled {
pulsarCluster.Spec.Config.Protocols.Kafka = &cloudv1alpha1.KafkaConfig{}
} else {
pulsarCluster.Spec.Config.Protocols.Kafka = nil
}
if mqttEnabled {
pulsarCluster.Spec.Config.Protocols.Mqtt = &cloudv1alpha1.MqttConfig{}
} else {
pulsarCluster.Spec.Config.Protocols.Mqtt = nil
}
if d.HasChanges("protocols") {
changed = true
}
auditLogEnabled := false
categories := make([]string, 0)
if configItemMap["audit_log"] != nil {
auditLog := configItemMap["audit_log"].(*schema.Set)
if auditLog.Len() > 0 {
for _, category := range auditLog.List() {
c := category.(map[string]interface{})
if _, ok := c["categories"]; ok {
categoriesSchema := c["categories"].(*schema.Set)
if categoriesSchema.Len() > 0 {
auditLogEnabled = true
for _, categoryItem := range categoriesSchema.List() {
categories = append(categories, categoryItem.(string))
}
}
}
}
}
}
if auditLogEnabled {
pulsarCluster.Spec.Config.AuditLog = &cloudv1alpha1.AuditLog{
Categories: categories,
}
} else {
pulsarCluster.Spec.Config.AuditLog = nil
}
if d.HasChanges("audit_log") {
changed = true
}
if configItemMap["custom"] != nil {
custom := configItemMap["custom"].(map[string]interface{})
if len(custom) > 0 {
result := map[string]string{}
for k := range custom {
if v, ok := custom[k].(string); ok {
result[k] = v
}
}
pulsarCluster.Spec.Config.Custom = result
changed = true
}
}
}
pulsarCluster.Spec.Config.Custom = result
changed = true
}
return changed
}
25 changes: 16 additions & 9 deletions cloud/resource_pulsar_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,22 @@ resource "streamnative_pulsar_cluster" "test-pulsar-cluster" {
name = "%s"
instance_name = "%s"
location = "%s"
websocket_enabled = true
function_enabled = true
transaction_enabled = true
kafka = {}
mqtt = {}
audit_log = ["Management", "Describe", "Produce", "Consume"]
custom = {
"allowAutoTopicCreation": "true"
}
config {
websocket_enabled = true
function_enabled = false
transaction_enabled = false
protocols {
mqtt = {
enabled = false
}
kafka = {
enabled = true
}
}
custom = {
allowAutoTopicCreation = "true"
}
}
}
`, organization, name, instanceName, location)
}
27 changes: 19 additions & 8 deletions examples/pulsarclusters/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,25 @@ resource "streamnative_pulsar_cluster" "test-cluster-1" {
broker_replicas = 2
compute_unit = 0.3
storage_unit = 0.3
websocket_enabled = true
function_enabled = false
transaction_enabled = true
kafka = {}
mqtt = {}
audit_log = ["Management", "Describe", "Produce", "Consume"]
custom = {
"allowAutoTopicCreation": "true"

config {
websocket_enabled = true
function_enabled = false
transaction_enabled = false
protocols {
mqtt = {
enabled = false
}
kafka = {
enabled = true
}
}
audit_log {
categories = ["Management", "Describe", "Produce", "Consume"]
}
custom = {
allowAutoTopicCreation = "true"
}
}
}

Expand Down

0 comments on commit cdcc3a8

Please sign in to comment.