From a78e8b867de446c14721f99579326d1233288ba7 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Thu, 8 Aug 2024 09:52:55 +0800 Subject: [PATCH] Add the catalog configuration in the pulsarcluster (#60) --- ### Motivation Add the catalog configuration in the pulsarcluster --- cloud/data_source_pulsar_cluster.go | 5 +++ cloud/provider.go | 5 +++ cloud/pulsar_cluster_config.go | 17 ++++++++++ cloud/resource_pulsar_cluster.go | 49 +++++++++++++++++++++++++++-- docs/data-sources/pulsar_cluster.md | 1 + docs/resources/pulsar_cluster.md | 1 + go.mod | 5 +-- go.sum | 4 +-- 8 files changed, 80 insertions(+), 7 deletions(-) diff --git a/cloud/data_source_pulsar_cluster.go b/cloud/data_source_pulsar_cluster.go index afba55b..2a387a3 100644 --- a/cloud/data_source_pulsar_cluster.go +++ b/cloud/data_source_pulsar_cluster.go @@ -147,6 +147,11 @@ func dataSourcePulsarCluster() *schema.Resource { }, }, }, + "lakehouse_storage": { + Type: schema.TypeMap, + Computed: true, + Description: descriptions["lakehouse_storage"], + }, "custom": { Type: schema.TypeMap, Computed: true, diff --git a/cloud/provider.go b/cloud/provider.go index 7fbca52..7859355 100644 --- a/cloud/provider.go +++ b/cloud/provider.go @@ -90,6 +90,11 @@ func init() { "mqtt": "Controls the mqtt protocol config of pulsar cluster", "categories": "Controls the audit log categories config of pulsar cluster, supported categories: " + "\"Management\", \"Describe\", \"Produce\", \"Consume\"", + "lakehouse_type": "The type of the lakehouse", + "lakehouse_catalog": "The name of the lakehouse catalog", + "catalog_credentials": "The credentials of the lakehouse catalog", + "catalog_connection_url": "The connection url of the lakehouse catalog", + "catalog_warehouse": "The warehouse of the lakehouse catalog", "custom": "Controls the custom config of pulsar cluster", "http_tls_service_url": "The service url of the pulsar cluster, use it to management the pulsar cluster.", "http_tls_service_urls": "The service url of the pulsar cluster, use it to management the pulsar cluster. There'll be multiple service urls if the cluster attached with multiple gateways", diff --git a/cloud/pulsar_cluster_config.go b/cloud/pulsar_cluster_config.go index b6662ce..4a9ca69 100644 --- a/cloud/pulsar_cluster_config.go +++ b/cloud/pulsar_cluster_config.go @@ -36,6 +36,9 @@ func flattenPulsarClusterConfig(in *cloudv1alpha1.Config) []interface{} { if in.AuditLog != nil { att["audit_log"] = flattenAuditLog(in.AuditLog) } + if in.LakehouseStorage != nil { + att["lakehouse_storage"] = flattenLakehouseStorage(in.LakehouseStorage) + } if in.Custom != nil { att["custom"] = in.Custom } @@ -81,3 +84,17 @@ func flattenCategories(in []string) []interface{} { } return att } + +func flattenLakehouseStorage(in *cloudv1alpha1.LakehouseStorageConfig) map[string]interface{} { + att := make(map[string]interface{}) + if in.LakehouseType != nil { + att["lakehouse_type"] = *in.LakehouseType + } + if in.CatalogType != nil { + att["catalog_type"] = *in.CatalogType + } + att["catalog_credentials"] = in.CatalogCredentials + att["catalog_connection_url"] = in.CatalogConnectionUrl + att["catalog_warehouse"] = in.CatalogWarehouse + return att +} diff --git a/cloud/resource_pulsar_cluster.go b/cloud/resource_pulsar_cluster.go index dbe5184..06f907a 100644 --- a/cloud/resource_pulsar_cluster.go +++ b/cloud/resource_pulsar_cluster.go @@ -17,6 +17,7 @@ package cloud import ( "context" "fmt" + "github.com/hashicorp/terraform-plugin-log/tflog" "strings" "time" @@ -189,6 +190,11 @@ func resourcePulsarCluster() *schema.Resource { }, }, }, + "lakehouse_storage": { + Type: schema.TypeMap, + Optional: true, + Description: descriptions["lakehouse_storage"], + }, "custom": { Type: schema.TypeMap, Optional: true, @@ -397,7 +403,7 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me } } } - getPulsarClusterChanged(pulsarCluster, d) + getPulsarClusterChanged(ctx, pulsarCluster, d) pc, err := clientSet.CloudV1alpha1().PulsarClusters(namespace).Create(ctx, pulsarCluster, metav1.CreateOptions{ FieldManager: "terraform-create", }) @@ -509,6 +515,9 @@ func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta _ = d.Set("mqtt_service_url", "") } if pulsarCluster.Spec.Config != nil { + tflog.Debug(ctx, "pulsar cluster config: ", map[string]interface{}{ + "config": pulsarCluster.Spec.Config, + }) err = d.Set("config", flattenPulsarClusterConfig(pulsarCluster.Spec.Config)) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER_CONFIG: %w", err)) @@ -547,6 +556,10 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " + "The pulsar cluster release channel does not support updates")) } + if d.HasChanges("lakehouse_type", "catalog_type", "catalog_credentials", "catalog_connection_url", "catalog_warehouse") { + return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " + + "The pulsar cluster lakehouse storage does not support updates")) + } namespace := d.Get("organization").(string) name := d.Get("name").(string) clientSet, err := getClientSet(getFactoryFromMeta(meta)) @@ -579,7 +592,7 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me pulsarCluster.Spec.BookKeeper.Resources.Memory = resource.NewQuantity( int64(storageUnit*8*1024*1024*1024), resource.DecimalSI) } - changed := getPulsarClusterChanged(pulsarCluster, d) + changed := getPulsarClusterChanged(ctx, pulsarCluster, d) if d.HasChange("bookie_replicas") || d.HasChange("broker_replicas") || d.HasChange("compute_unit") || @@ -624,7 +637,7 @@ func resourcePulsarClusterDelete(ctx context.Context, d *schema.ResourceData, me return nil } -func getPulsarClusterChanged(pulsarCluster *cloudv1alpha1.PulsarCluster, d *schema.ResourceData) bool { +func getPulsarClusterChanged(ctx context.Context, pulsarCluster *cloudv1alpha1.PulsarCluster, d *schema.ResourceData) bool { changed := false if pulsarCluster.Spec.Config == nil { pulsarCluster.Spec.Config = &cloudv1alpha1.Config{} @@ -633,6 +646,7 @@ func getPulsarClusterChanged(pulsarCluster *cloudv1alpha1.PulsarCluster, d *sche if len(config) > 0 { for _, configItem := range config { configItemMap := configItem.(map[string]interface{}) + tflog.Debug(ctx, "configItemMap: %v", configItemMap) if configItemMap["websocket_enabled"] != nil { webSocketEnabled := configItemMap["websocket_enabled"].(bool) pulsarCluster.Spec.Config.WebsocketEnabled = &webSocketEnabled @@ -727,6 +741,32 @@ func getPulsarClusterChanged(pulsarCluster *cloudv1alpha1.PulsarCluster, d *sche if d.HasChanges("audit_log") { changed = true } + if configItemMap["lakehouse_storage"] != nil { + lakehouseStorageItemMap := configItemMap["lakehouse_storage"].(map[string]interface{}) + tflog.Debug(ctx, "lakehouseStorageItemMap:", lakehouseStorageItemMap) + pulsarClusterLakehouseStorage := &cloudv1alpha1.LakehouseStorageConfig{} + if len(lakehouseStorageItemMap) > 0 { + if _, ok := lakehouseStorageItemMap["lakehouse_type"]; ok { + lakehouseType := lakehouseStorageItemMap["lakehouse_type"].(string) + pulsarClusterLakehouseStorage.LakehouseType = &lakehouseType + } + if _, ok := lakehouseStorageItemMap["catalog_type"]; ok { + catalogType := lakehouseStorageItemMap["catalog_type"].(string) + pulsarClusterLakehouseStorage.CatalogType = &catalogType + } + if _, ok := lakehouseStorageItemMap["catalog_credentials"]; ok { + pulsarClusterLakehouseStorage.CatalogCredentials = lakehouseStorageItemMap["catalog_credentials"].(string) + } + if _, ok := lakehouseStorageItemMap["catalog_connection_url"]; ok { + pulsarClusterLakehouseStorage.CatalogConnectionUrl = lakehouseStorageItemMap["catalog_connection_url"].(string) + } + if _, ok := lakehouseStorageItemMap["catalog_warehouse"]; ok { + pulsarClusterLakehouseStorage.CatalogWarehouse = lakehouseStorageItemMap["catalog_warehouse"].(string) + } + pulsarCluster.Spec.Config.LakehouseStorage = pulsarClusterLakehouseStorage + changed = true + } + } if configItemMap["custom"] != nil { custom := configItemMap["custom"].(map[string]interface{}) if len(custom) > 0 { @@ -742,5 +782,8 @@ func getPulsarClusterChanged(pulsarCluster *cloudv1alpha1.PulsarCluster, d *sche } } } + tflog.Debug(ctx, "get pulsarcluster changed: %v", map[string]interface{}{ + "pulsarcluster": *pulsarCluster.Spec.Config, + }) return changed } diff --git a/docs/data-sources/pulsar_cluster.md b/docs/data-sources/pulsar_cluster.md index 7e85ff3..a9382c7 100644 --- a/docs/data-sources/pulsar_cluster.md +++ b/docs/data-sources/pulsar_cluster.md @@ -53,6 +53,7 @@ Read-Only: - `audit_log` (List of Object) (see [below for nested schema](#nestedobjatt--config--audit_log)) - `custom` (Map of String) - `function_enabled` (Boolean) +- `lakehouse_storage` (Map of String) - `protocols` (List of Object) (see [below for nested schema](#nestedobjatt--config--protocols)) - `transaction_enabled` (Boolean) - `websocket_enabled` (Boolean) diff --git a/docs/resources/pulsar_cluster.md b/docs/resources/pulsar_cluster.md index 57be21e..0e70861 100644 --- a/docs/resources/pulsar_cluster.md +++ b/docs/resources/pulsar_cluster.md @@ -58,6 +58,7 @@ Optional: - `audit_log` (Block List) (see [below for nested schema](#nestedblock--config--audit_log)) - `custom` (Map of String) Controls the custom config of pulsar cluster - `function_enabled` (Boolean) Whether the function is enabled +- `lakehouse_storage` (Map of String) - `protocols` (Block List) (see [below for nested schema](#nestedblock--config--protocols)) - `transaction_enabled` (Boolean) Whether the transaction is enabled - `websocket_enabled` (Boolean) diff --git a/go.mod b/go.mod index f637209..0cb5bbb 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,12 @@ go 1.19 require ( github.com/99designs/keyring v1.2.0 github.com/hashicorp/terraform-plugin-docs v0.16.0 + github.com/hashicorp/terraform-plugin-log v0.9.0 github.com/hashicorp/terraform-plugin-sdk/v2 v2.28.0 github.com/lestrrat-go/jwx/v2 v2.0.21 github.com/mitchellh/go-homedir v1.1.0 github.com/pkg/errors v0.9.1 - github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c + github.com/streamnative/cloud-api-server v1.26.1 github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680 github.com/xhit/go-str2duration/v2 v2.1.0 k8s.io/apimachinery v0.28.4 @@ -86,7 +87,6 @@ require ( github.com/hashicorp/terraform-exec v0.18.1 // indirect github.com/hashicorp/terraform-json v0.17.1 // indirect github.com/hashicorp/terraform-plugin-go v0.18.0 // indirect - github.com/hashicorp/terraform-plugin-log v0.9.0 // indirect github.com/hashicorp/terraform-registry-address v0.2.1 // indirect github.com/hashicorp/terraform-svchost v0.1.1 // indirect github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect @@ -246,6 +246,7 @@ replace ( github.com/streamnative/pulsar-operators/pulsar-operator/api => github.com/streamnative/pulsar-operators/pulsar-operator/api v0.17.3 github.com/streamnative/pulsar-operators/zookeeper-operator => github.com/streamnative/pulsar-operators/zookeeper-operator v0.17.3 github.com/streamnative/pulsar-operators/zookeeper-operator/api => github.com/streamnative/pulsar-operators/zookeeper-operator/api v0.17.3 + github.com/streamnative/sn-operator/api => github.com/streamnative/sn-operator/api v0.5.0 google.golang.org/api => google.golang.org/api v0.32.0 google.golang.org/genproto => google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 ) diff --git a/go.sum b/go.sum index 00d61bd..962a292 100644 --- a/go.sum +++ b/go.sum @@ -553,8 +553,8 @@ github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/streamnative/apiserver-builder-alpha v0.0.0-20230717175906-0f9240887463 h1:ukAAbYtzonTxP/zNwAZhFVvsqc6A8fVWSKwH9Yfu9w0= github.com/streamnative/apiserver-builder-alpha v0.0.0-20230717175906-0f9240887463/go.mod h1:W1q2VCPvT9GjMUsFX4JYmdXfSAYsrmLVLALfutt5LsE= -github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c h1:AaV+hCJ0qEoQhBSc+vzG5zJRRNWRjt6b5l3PwX1LRSs= -github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c/go.mod h1:7Zv0BDZYJA7XTCE+S324I40ee5aT0CpnaZcXPUk9r50= +github.com/streamnative/cloud-api-server v1.26.1 h1:ZQ2eCtG2Dpfs6gn7vXgYit4zZCxznWCr+w+nOHexiTI= +github.com/streamnative/cloud-api-server v1.26.1/go.mod h1:7Zv0BDZYJA7XTCE+S324I40ee5aT0CpnaZcXPUk9r50= github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680 h1:PRqjyTsfwTCAd3SGDjgPT4Nmbczeegdm/ApyDGr14PA= github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680/go.mod h1:48XuDAuu3upsbndxqN+grxz9yPSklZk9ycxKB235dGs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=