Skip to content

Commit

Permalink
Add the catalog configuration in the pulsarcluster (#60)
Browse files Browse the repository at this point in the history
---

### Motivation

Add the catalog configuration in the pulsarcluster
  • Loading branch information
zymap authored Aug 8, 2024
1 parent 54e5aed commit a78e8b8
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 7 deletions.
5 changes: 5 additions & 0 deletions cloud/data_source_pulsar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func dataSourcePulsarCluster() *schema.Resource {
},
},
},
"lakehouse_storage": {
Type: schema.TypeMap,
Computed: true,
Description: descriptions["lakehouse_storage"],
},
"custom": {
Type: schema.TypeMap,
Computed: true,
Expand Down
5 changes: 5 additions & 0 deletions cloud/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func init() {
"mqtt": "Controls the mqtt protocol config of pulsar cluster",
"categories": "Controls the audit log categories config of pulsar cluster, supported categories: " +
"\"Management\", \"Describe\", \"Produce\", \"Consume\"",
"lakehouse_type": "The type of the lakehouse",
"lakehouse_catalog": "The name of the lakehouse catalog",
"catalog_credentials": "The credentials of the lakehouse catalog",
"catalog_connection_url": "The connection url of the lakehouse catalog",
"catalog_warehouse": "The warehouse of the lakehouse catalog",
"custom": "Controls the custom config of pulsar cluster",
"http_tls_service_url": "The service url of the pulsar cluster, use it to management the pulsar cluster.",
"http_tls_service_urls": "The service url of the pulsar cluster, use it to management the pulsar cluster. There'll be multiple service urls if the cluster attached with multiple gateways",
Expand Down
17 changes: 17 additions & 0 deletions cloud/pulsar_cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func flattenPulsarClusterConfig(in *cloudv1alpha1.Config) []interface{} {
if in.AuditLog != nil {
att["audit_log"] = flattenAuditLog(in.AuditLog)
}
if in.LakehouseStorage != nil {
att["lakehouse_storage"] = flattenLakehouseStorage(in.LakehouseStorage)
}
if in.Custom != nil {
att["custom"] = in.Custom
}
Expand Down Expand Up @@ -81,3 +84,17 @@ func flattenCategories(in []string) []interface{} {
}
return att
}

func flattenLakehouseStorage(in *cloudv1alpha1.LakehouseStorageConfig) map[string]interface{} {
att := make(map[string]interface{})
if in.LakehouseType != nil {
att["lakehouse_type"] = *in.LakehouseType
}
if in.CatalogType != nil {
att["catalog_type"] = *in.CatalogType
}
att["catalog_credentials"] = in.CatalogCredentials
att["catalog_connection_url"] = in.CatalogConnectionUrl
att["catalog_warehouse"] = in.CatalogWarehouse
return att
}
49 changes: 46 additions & 3 deletions cloud/resource_pulsar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cloud
import (
"context"
"fmt"
"github.com/hashicorp/terraform-plugin-log/tflog"
"strings"
"time"

Expand Down Expand Up @@ -189,6 +190,11 @@ func resourcePulsarCluster() *schema.Resource {
},
},
},
"lakehouse_storage": {
Type: schema.TypeMap,
Optional: true,
Description: descriptions["lakehouse_storage"],
},
"custom": {
Type: schema.TypeMap,
Optional: true,
Expand Down Expand Up @@ -397,7 +403,7 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me
}
}
}
getPulsarClusterChanged(pulsarCluster, d)
getPulsarClusterChanged(ctx, pulsarCluster, d)
pc, err := clientSet.CloudV1alpha1().PulsarClusters(namespace).Create(ctx, pulsarCluster, metav1.CreateOptions{
FieldManager: "terraform-create",
})
Expand Down Expand Up @@ -509,6 +515,9 @@ func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta
_ = d.Set("mqtt_service_url", "")
}
if pulsarCluster.Spec.Config != nil {
tflog.Debug(ctx, "pulsar cluster config: ", map[string]interface{}{
"config": pulsarCluster.Spec.Config,
})
err = d.Set("config", flattenPulsarClusterConfig(pulsarCluster.Spec.Config))
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER_CONFIG: %w", err))
Expand Down Expand Up @@ -547,6 +556,10 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
"The pulsar cluster release channel does not support updates"))
}
if d.HasChanges("lakehouse_type", "catalog_type", "catalog_credentials", "catalog_connection_url", "catalog_warehouse") {
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
"The pulsar cluster lakehouse storage does not support updates"))
}
namespace := d.Get("organization").(string)
name := d.Get("name").(string)
clientSet, err := getClientSet(getFactoryFromMeta(meta))
Expand Down Expand Up @@ -579,7 +592,7 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me
pulsarCluster.Spec.BookKeeper.Resources.Memory = resource.NewQuantity(
int64(storageUnit*8*1024*1024*1024), resource.DecimalSI)
}
changed := getPulsarClusterChanged(pulsarCluster, d)
changed := getPulsarClusterChanged(ctx, pulsarCluster, d)
if d.HasChange("bookie_replicas") ||
d.HasChange("broker_replicas") ||
d.HasChange("compute_unit") ||
Expand Down Expand Up @@ -624,7 +637,7 @@ func resourcePulsarClusterDelete(ctx context.Context, d *schema.ResourceData, me
return nil
}

func getPulsarClusterChanged(pulsarCluster *cloudv1alpha1.PulsarCluster, d *schema.ResourceData) bool {
func getPulsarClusterChanged(ctx context.Context, pulsarCluster *cloudv1alpha1.PulsarCluster, d *schema.ResourceData) bool {
changed := false
if pulsarCluster.Spec.Config == nil {
pulsarCluster.Spec.Config = &cloudv1alpha1.Config{}
Expand All @@ -633,6 +646,7 @@ func getPulsarClusterChanged(pulsarCluster *cloudv1alpha1.PulsarCluster, d *sche
if len(config) > 0 {
for _, configItem := range config {
configItemMap := configItem.(map[string]interface{})
tflog.Debug(ctx, "configItemMap: %v", configItemMap)
if configItemMap["websocket_enabled"] != nil {
webSocketEnabled := configItemMap["websocket_enabled"].(bool)
pulsarCluster.Spec.Config.WebsocketEnabled = &webSocketEnabled
Expand Down Expand Up @@ -727,6 +741,32 @@ func getPulsarClusterChanged(pulsarCluster *cloudv1alpha1.PulsarCluster, d *sche
if d.HasChanges("audit_log") {
changed = true
}
if configItemMap["lakehouse_storage"] != nil {
lakehouseStorageItemMap := configItemMap["lakehouse_storage"].(map[string]interface{})
tflog.Debug(ctx, "lakehouseStorageItemMap:", lakehouseStorageItemMap)
pulsarClusterLakehouseStorage := &cloudv1alpha1.LakehouseStorageConfig{}
if len(lakehouseStorageItemMap) > 0 {
if _, ok := lakehouseStorageItemMap["lakehouse_type"]; ok {
lakehouseType := lakehouseStorageItemMap["lakehouse_type"].(string)
pulsarClusterLakehouseStorage.LakehouseType = &lakehouseType
}
if _, ok := lakehouseStorageItemMap["catalog_type"]; ok {
catalogType := lakehouseStorageItemMap["catalog_type"].(string)
pulsarClusterLakehouseStorage.CatalogType = &catalogType
}
if _, ok := lakehouseStorageItemMap["catalog_credentials"]; ok {
pulsarClusterLakehouseStorage.CatalogCredentials = lakehouseStorageItemMap["catalog_credentials"].(string)
}
if _, ok := lakehouseStorageItemMap["catalog_connection_url"]; ok {
pulsarClusterLakehouseStorage.CatalogConnectionUrl = lakehouseStorageItemMap["catalog_connection_url"].(string)
}
if _, ok := lakehouseStorageItemMap["catalog_warehouse"]; ok {
pulsarClusterLakehouseStorage.CatalogWarehouse = lakehouseStorageItemMap["catalog_warehouse"].(string)
}
pulsarCluster.Spec.Config.LakehouseStorage = pulsarClusterLakehouseStorage
changed = true
}
}
if configItemMap["custom"] != nil {
custom := configItemMap["custom"].(map[string]interface{})
if len(custom) > 0 {
Expand All @@ -742,5 +782,8 @@ func getPulsarClusterChanged(pulsarCluster *cloudv1alpha1.PulsarCluster, d *sche
}
}
}
tflog.Debug(ctx, "get pulsarcluster changed: %v", map[string]interface{}{
"pulsarcluster": *pulsarCluster.Spec.Config,
})
return changed
}
1 change: 1 addition & 0 deletions docs/data-sources/pulsar_cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Read-Only:
- `audit_log` (List of Object) (see [below for nested schema](#nestedobjatt--config--audit_log))
- `custom` (Map of String)
- `function_enabled` (Boolean)
- `lakehouse_storage` (Map of String)
- `protocols` (List of Object) (see [below for nested schema](#nestedobjatt--config--protocols))
- `transaction_enabled` (Boolean)
- `websocket_enabled` (Boolean)
Expand Down
1 change: 1 addition & 0 deletions docs/resources/pulsar_cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Optional:
- `audit_log` (Block List) (see [below for nested schema](#nestedblock--config--audit_log))
- `custom` (Map of String) Controls the custom config of pulsar cluster
- `function_enabled` (Boolean) Whether the function is enabled
- `lakehouse_storage` (Map of String)
- `protocols` (Block List) (see [below for nested schema](#nestedblock--config--protocols))
- `transaction_enabled` (Boolean) Whether the transaction is enabled
- `websocket_enabled` (Boolean)
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ go 1.19
require (
github.com/99designs/keyring v1.2.0
github.com/hashicorp/terraform-plugin-docs v0.16.0
github.com/hashicorp/terraform-plugin-log v0.9.0
github.com/hashicorp/terraform-plugin-sdk/v2 v2.28.0
github.com/lestrrat-go/jwx/v2 v2.0.21
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.9.1
github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c
github.com/streamnative/cloud-api-server v1.26.1
github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680
github.com/xhit/go-str2duration/v2 v2.1.0
k8s.io/apimachinery v0.28.4
Expand Down Expand Up @@ -86,7 +87,6 @@ require (
github.com/hashicorp/terraform-exec v0.18.1 // indirect
github.com/hashicorp/terraform-json v0.17.1 // indirect
github.com/hashicorp/terraform-plugin-go v0.18.0 // indirect
github.com/hashicorp/terraform-plugin-log v0.9.0 // indirect
github.com/hashicorp/terraform-registry-address v0.2.1 // indirect
github.com/hashicorp/terraform-svchost v0.1.1 // indirect
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect
Expand Down Expand Up @@ -246,6 +246,7 @@ replace (
github.com/streamnative/pulsar-operators/pulsar-operator/api => github.com/streamnative/pulsar-operators/pulsar-operator/api v0.17.3
github.com/streamnative/pulsar-operators/zookeeper-operator => github.com/streamnative/pulsar-operators/zookeeper-operator v0.17.3
github.com/streamnative/pulsar-operators/zookeeper-operator/api => github.com/streamnative/pulsar-operators/zookeeper-operator/api v0.17.3
github.com/streamnative/sn-operator/api => github.com/streamnative/sn-operator/api v0.5.0
google.golang.org/api => google.golang.org/api v0.32.0
google.golang.org/genproto => google.golang.org/genproto v0.0.0-20200825200019-8632dd797987
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streamnative/apiserver-builder-alpha v0.0.0-20230717175906-0f9240887463 h1:ukAAbYtzonTxP/zNwAZhFVvsqc6A8fVWSKwH9Yfu9w0=
github.com/streamnative/apiserver-builder-alpha v0.0.0-20230717175906-0f9240887463/go.mod h1:W1q2VCPvT9GjMUsFX4JYmdXfSAYsrmLVLALfutt5LsE=
github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c h1:AaV+hCJ0qEoQhBSc+vzG5zJRRNWRjt6b5l3PwX1LRSs=
github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c/go.mod h1:7Zv0BDZYJA7XTCE+S324I40ee5aT0CpnaZcXPUk9r50=
github.com/streamnative/cloud-api-server v1.26.1 h1:ZQ2eCtG2Dpfs6gn7vXgYit4zZCxznWCr+w+nOHexiTI=
github.com/streamnative/cloud-api-server v1.26.1/go.mod h1:7Zv0BDZYJA7XTCE+S324I40ee5aT0CpnaZcXPUk9r50=
github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680 h1:PRqjyTsfwTCAd3SGDjgPT4Nmbczeegdm/ApyDGr14PA=
github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680/go.mod h1:48XuDAuu3upsbndxqN+grxz9yPSklZk9ycxKB235dGs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down

0 comments on commit a78e8b8

Please sign in to comment.