From 2f2d07f0bee983ec8f6898789c44e0694ba3012c Mon Sep 17 00:00:00 2001 From: Guangning E Date: Tue, 6 Aug 2024 21:02:38 +0800 Subject: [PATCH] Support serverless cluster --- cloud/data_source_pulsar_cluster.go | 17 ++++-- cloud/provider.go | 2 + cloud/resource_pulsar_cluster.go | 88 ++++++++++++++++++++++++----- cloud/resource_pulsar_instance.go | 15 ++++- cloud/validate_helpers.go | 8 +++ examples/serverless/main.tf | 68 ++++++++++++++++++++++ go.mod | 3 +- go.sum | 4 +- 8 files changed, 183 insertions(+), 22 deletions(-) create mode 100644 examples/serverless/main.tf diff --git a/cloud/data_source_pulsar_cluster.go b/cloud/data_source_pulsar_cluster.go index afba55b..5082927 100644 --- a/cloud/data_source_pulsar_cluster.go +++ b/cloud/data_source_pulsar_cluster.go @@ -17,6 +17,7 @@ package cloud import ( "context" "fmt" + cloudv1alpha1 "github.com/streamnative/cloud-api-server/pkg/apis/cloud/v1alpha1" "strings" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" @@ -235,6 +236,11 @@ func dataSourcePulsarCluster() *schema.Resource { Computed: true, Description: descriptions["bookkeeper_version"], }, + "type": { + Type: schema.TypeString, + Computed: true, + Description: descriptions["instance_type"], + }, }, } } @@ -321,10 +327,13 @@ func dataSourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, me return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER_CONFIG: %w", err)) } } - brokerImage := strings.Split(pulsarCluster.Spec.Broker.Image, ":") - _ = d.Set("pulsar_version", brokerImage[1]) - bookkeeperImage := strings.Split(pulsarCluster.Spec.BookKeeper.Image, ":") - _ = d.Set("bookkeeper_version", bookkeeperImage[1]) + if pulsarInstance.Spec.Type != cloudv1alpha1.PulsarInstanceTypeServerless { + brokerImage := strings.Split(pulsarCluster.Spec.Broker.Image, ":") + _ = d.Set("pulsar_version", brokerImage[1]) + bookkeeperImage := strings.Split(pulsarCluster.Spec.BookKeeper.Image, ":") + _ = d.Set("bookkeeper_version", bookkeeperImage[1]) + } + _ = d.Set("type", pulsarInstance.Spec.Type) releaseChannel := pulsarCluster.Spec.ReleaseChannel if releaseChannel != "" { _ = d.Set("release_channel", releaseChannel) diff --git a/cloud/provider.go b/cloud/provider.go index 7fbca52..aefb7ea 100644 --- a/cloud/provider.go +++ b/cloud/provider.go @@ -63,6 +63,7 @@ func init() { "service_account_name": "The service account name", "service_account_binding_name": "The service account binding name", "cluster_name": "The pulsar cluster name", + "cluster_display_name": "The pulsar cluster display name", "admin": "Whether the service account is admin", "private_key_data": "The private key data", "availability-mode": "The availability mode, supporting 'zonal' and 'regional'", @@ -74,6 +75,7 @@ func init() { "pool_member_type": "Type of infrastructure pool member, one of aws, gcloud and azure", "pool_member_location": "The location of the infrastructure pool member", "instance_name": "The pulsar instance name", + "instance_type": "The streamnative cloud instance type, supporting 'serverless' and 'standard'", "location": "The location of the pulsar cluster, " + "supported location https://docs.streamnative.io/docs/cluster#cluster-location", "release_channel": "The release channel of the pulsar cluster subscribe to, it must to be lts or rapid, default rapid", diff --git a/cloud/resource_pulsar_cluster.go b/cloud/resource_pulsar_cluster.go index dbe5184..41fdc87 100644 --- a/cloud/resource_pulsar_cluster.go +++ b/cloud/resource_pulsar_cluster.go @@ -37,11 +37,15 @@ func resourcePulsarCluster() *schema.Resource { DeleteContext: resourcePulsarClusterDelete, CustomizeDiff: func(ctx context.Context, diff *schema.ResourceDiff, i interface{}) error { oldOrg, _ := diff.GetChange("organization") - oldName, _ := diff.GetChange("name") + oldName, newName := diff.GetChange("name") if oldOrg.(string) == "" && oldName.(string) == "" { // This is create event, so we don't need to check the diff. return nil } + if oldName != "" && newName == "" { + // Auto generate the name, so we don't need to check the diff. + return nil + } if diff.HasChanges([]string{"organization", "name", "instance_name", "location", "pool_member_name"}...) { return fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " + "The pulsar cluster organization, name, instance_name, location, pool_member_name does not support updates, please recreate it") @@ -68,10 +72,14 @@ func resourcePulsarCluster() *schema.Resource { ValidateFunc: validateNotBlank, }, "name": { - Type: schema.TypeString, - Required: true, - Description: descriptions["cluster_name"], - ValidateFunc: validateNotBlank, + Type: schema.TypeString, + Optional: true, + Description: descriptions["cluster_name"], + }, + "display_name": { + Type: schema.TypeString, + Optional: true, + Description: descriptions["cluster_display_name"], }, "instance_name": { Type: schema.TypeString, @@ -290,6 +298,11 @@ func resourcePulsarCluster() *schema.Resource { Computed: true, Description: descriptions["bookkeeper_version"], }, + "type": { + Type: schema.TypeString, + Computed: true, + Description: descriptions["instance_type"], + }, }, } } @@ -297,6 +310,7 @@ func resourcePulsarCluster() *schema.Resource { func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { namespace := d.Get("organization").(string) name := d.Get("name").(string) + displayName := d.Get("display_name").(string) instanceName := d.Get("instance_name").(string) pool_member_name := d.Get("pool_member_name").(string) location := d.Get("location").(string) @@ -355,7 +369,6 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me APIVersion: cloudv1alpha1.SchemeGroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ - Name: name, Namespace: namespace, }, Spec: cloudv1alpha1.PulsarClusterSpec{ @@ -380,6 +393,16 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me }, }, } + if name != "" { + pulsarCluster.ObjectMeta.Name = name + } + if displayName != "" { + pulsarCluster.Spec.DisplayName = displayName + } + if displayName == "" && name == "" { + return diag.FromErr(fmt.Errorf("ERROR_CREATE_PULSAR_CLUSTER: " + + "either name or display_name must be provided")) + } if pool_member_name != "" { pulsarCluster.Spec.PoolMemberRef = cloudv1alpha1.PoolMemberReference{ Name: pool_member_name, @@ -397,13 +420,17 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me } } } - getPulsarClusterChanged(pulsarCluster, d) + if pulsarInstance.Spec.Type == cloudv1alpha1.PulsarInstanceTypeStandard { + getPulsarClusterChanged(pulsarCluster, d) + } pc, err := clientSet.CloudV1alpha1().PulsarClusters(namespace).Create(ctx, pulsarCluster, metav1.CreateOptions{ FieldManager: "terraform-create", }) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_CREATE_PULSAR_CLUSTER: %w", err)) } + d.Set("name", pc.Name) + d.Set("display_name", pc.Spec.DisplayName) if pc.Status.Conditions != nil { ready := false for _, condition := range pc.Status.Conditions { @@ -435,13 +462,32 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { namespace := d.Get("organization").(string) name := d.Get("name").(string) + displayName := d.Get("display_name").(string) clientSet, err := getClientSet(getFactoryFromMeta(meta)) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_INIT_CLIENT_ON_READ_PULSAR_CLUSTER: %w", err)) } - pulsarCluster, err := clientSet.CloudV1alpha1().PulsarClusters(namespace).Get(ctx, name, metav1.GetOptions{}) - if err != nil { - return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER: %w", err)) + var pulsarCluster *cloudv1alpha1.PulsarCluster + if name != "" { + pulsarCluster, err = clientSet.CloudV1alpha1().PulsarClusters(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER: %w", err)) + } + } else { + pulsarClusters, err := clientSet.CloudV1alpha1().PulsarClusters(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return diag.FromErr(fmt.Errorf("ERROR_LIST_PULSAR_CLUSTER: %w", err)) + } + for _, cluster := range pulsarClusters.Items { + if cluster.Spec.DisplayName == displayName { + pulsarCluster = &cluster + break + } + } + if pulsarCluster == nil { + return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER: "+ + "the pulsar cluster with display_name %s does not exist", displayName)) + } } _ = d.Set("ready", "False") if pulsarCluster.Status.Conditions != nil { @@ -514,19 +560,28 @@ func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER_CONFIG: %w", err)) } } - brokerImage := strings.Split(pulsarCluster.Spec.Broker.Image, ":") - _ = d.Set("pulsar_version", brokerImage[1]) - bookkeeperImage := strings.Split(pulsarCluster.Spec.BookKeeper.Image, ":") - _ = d.Set("bookkeeper_version", bookkeeperImage[1]) + if pulsarInstance.Spec.Type != cloudv1alpha1.PulsarInstanceTypeServerless { + brokerImage := strings.Split(pulsarCluster.Spec.Broker.Image, ":") + _ = d.Set("pulsar_version", brokerImage[1]) + bookkeeperImage := strings.Split(pulsarCluster.Spec.BookKeeper.Image, ":") + _ = d.Set("bookkeeper_version", bookkeeperImage[1]) + } releaseChannel := pulsarCluster.Spec.ReleaseChannel if releaseChannel != "" { _ = d.Set("release_channel", releaseChannel) } + _ = d.Set("type", pulsarInstance.Spec.Type) d.SetId(fmt.Sprintf("%s/%s", pulsarCluster.Namespace, pulsarCluster.Name)) return nil } func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + serverless := d.Get("type") + if serverless == cloudv1alpha1.PulsarInstanceTypeServerless { + return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: "+ + "updating a cluster under instance of type '%s' is no longer allowed", + cloudv1alpha1.PulsarInstanceTypeServerless)) + } if d.HasChange("organization") { return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " + "The pulsar cluster organization does not support updates")) @@ -616,7 +671,12 @@ func resourcePulsarClusterDelete(ctx context.Context, d *schema.ResourceData, me return diag.FromErr(fmt.Errorf("ERROR_INIT_CLIENT_ON_DELETE_PULSAR_CLUSTER: %w", err)) } namespace := d.Get("organization").(string) + t := d.Get("type") name := d.Get("name").(string) + if t == cloudv1alpha1.PulsarInstanceTypeServerless { + id := strings.Split(d.Id(), "/") + name = id[1] + } err = clientSet.CloudV1alpha1().PulsarClusters(namespace).Delete(ctx, name, metav1.DeleteOptions{}) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_DELETE_PULSAR_CLUSTER: %w", err)) diff --git a/cloud/resource_pulsar_instance.go b/cloud/resource_pulsar_instance.go index ddf59c0..489c701 100644 --- a/cloud/resource_pulsar_instance.go +++ b/cloud/resource_pulsar_instance.go @@ -95,6 +95,12 @@ func resourcePulsarInstance() *schema.Resource { Description: descriptions["pool_namespace"], ValidateFunc: validateNotBlank, }, + "type": { + Type: schema.TypeString, + Optional: true, + Description: descriptions["instance_type"], + ValidateFunc: validateInstanceType, + }, "ready": { Type: schema.TypeString, Computed: true, @@ -110,6 +116,7 @@ func resourcePulsarInstanceCreate(ctx context.Context, d *schema.ResourceData, m availabilityMode := d.Get("availability_mode").(string) poolName := d.Get("pool_name").(string) poolNamespace := d.Get("pool_namespace").(string) + instanceType := d.Get("type").(string) clientSet, err := getClientSet(getFactoryFromMeta(meta)) if err != nil { return diag.FromErr(fmt.Errorf("ERROR_INIT_CLIENT_ON_PULSAR_INSTANCE: %w", err)) @@ -118,6 +125,12 @@ func resourcePulsarInstanceCreate(ctx context.Context, d *schema.ResourceData, m Namespace: poolNamespace, Name: poolName, } + var t cloudv1alpha1.PulsarInstanceType + if instanceType == "" { + t = cloudv1alpha1.PulsarInstanceTypeStandard + } else { + t = cloudv1alpha1.PulsarInstanceType(instanceType) + } pulsarInstance := &cloudv1alpha1.PulsarInstance{ TypeMeta: metav1.TypeMeta{ Kind: "PulsarInstance", @@ -129,7 +142,7 @@ func resourcePulsarInstanceCreate(ctx context.Context, d *schema.ResourceData, m }, Spec: cloudv1alpha1.PulsarInstanceSpec{ AvailabilityMode: cloudv1alpha1.InstanceAvailabilityMode(availabilityMode), - Type: cloudv1alpha1.PulsarInstanceTypeStandard, + Type: t, PoolRef: poolRef, }, } diff --git a/cloud/validate_helpers.go b/cloud/validate_helpers.go index 243defb..3125ad0 100644 --- a/cloud/validate_helpers.go +++ b/cloud/validate_helpers.go @@ -28,6 +28,14 @@ func validateNotBlank(val interface{}, key string) (warns []string, errs []error return } +func validateInstanceType(val interface{}, key string) (warns []string, errs []error) { + v := val.(string) + if v != "serverless" && v != "standard" { + errs = append(errs, fmt.Errorf("%q must be serverless or standard", key)) + } + return +} + func validateReleaseChannel(val interface{}, key string) (warns []string, errs []error) { v := val.(string) if v != "lts" && v != "rapid" { diff --git a/examples/serverless/main.tf b/examples/serverless/main.tf new file mode 100644 index 0000000..56ff1c8 --- /dev/null +++ b/examples/serverless/main.tf @@ -0,0 +1,68 @@ +# // Licensed to the Apache Software Foundation (ASF) under one +# // or more contributor license agreements. See the NOTICE file +# // distributed with this work for additional information +# // regarding copyright ownership. The ASF licenses this file +# // to you under the Apache License, Version 2.0 (the +# // "License"); you may not use this file except in compliance +# // with the License. You may obtain a copy of the License at +# // +# // http://www.apache.org/licenses/LICENSE-2.0 +# // +# // Unless required by applicable law or agreed to in writing, +# // software distributed under the License is distributed on an +# // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# // KIND, either express or implied. See the License for the +# // specific language governing permissions and limitations +# // under the License. + +terraform { + required_providers { + streamnative = { + version = "0.1.0" + source = "streamnative/streamnative" + } + } +} + +provider "streamnative" { + # Please replace path use your own key file path + key_file_path = "/Users/tuteng/Downloads/o-uvzjb-testadmin.json" +} + +resource "streamnative_pulsar_instance" "test-serverless" { + organization = "o-uvzjb" + name = "test-serverless" + availability_mode = "zonal" + pool_name = "functions-aws" + pool_namespace = "streamnative" + type = "serverless" +} + +data "streamnative_pulsar_instance" "test-serverless" { + depends_on = [streamnative_pulsar_instance.test-serverless] + name = streamnative_pulsar_instance.test-serverless.name + organization = streamnative_pulsar_instance.test-serverless.organization +} + +output "pulsar_instance" { + value = data.streamnative_pulsar_instance.test-serverless +} + +resource "streamnative_pulsar_cluster" "test-serverless" { + depends_on = [streamnative_pulsar_instance.test-serverless] + organization = streamnative_pulsar_instance.test-serverless.organization + display_name = "test-serverless" + instance_name = streamnative_pulsar_instance.test-serverless.name + location = "us-east-2" + release_channel = "rapid" +} + +data "streamnative_pulsar_cluster" "test-serverless" { + depends_on = [streamnative_pulsar_cluster.test-serverless] + organization = streamnative_pulsar_cluster.test-serverless.organization + name = streamnative_pulsar_cluster.test-serverless.name +} + +output "pulsar_cluster_serverless" { + value = data.streamnative_pulsar_cluster.test-serverless +} \ No newline at end of file diff --git a/go.mod b/go.mod index f637209..018bdfb 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/lestrrat-go/jwx/v2 v2.0.21 github.com/mitchellh/go-homedir v1.1.0 github.com/pkg/errors v0.9.1 - github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c + github.com/streamnative/cloud-api-server v1.25.2-0.20240803044529-9e543679949e github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680 github.com/xhit/go-str2duration/v2 v2.1.0 k8s.io/apimachinery v0.28.4 @@ -246,6 +246,7 @@ replace ( github.com/streamnative/pulsar-operators/pulsar-operator/api => github.com/streamnative/pulsar-operators/pulsar-operator/api v0.17.3 github.com/streamnative/pulsar-operators/zookeeper-operator => github.com/streamnative/pulsar-operators/zookeeper-operator v0.17.3 github.com/streamnative/pulsar-operators/zookeeper-operator/api => github.com/streamnative/pulsar-operators/zookeeper-operator/api v0.17.3 + github.com/streamnative/sn-operator/api => github.com/streamnative/sn-operator/api v0.4.10 google.golang.org/api => google.golang.org/api v0.32.0 google.golang.org/genproto => google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 ) diff --git a/go.sum b/go.sum index 00d61bd..9b385f6 100644 --- a/go.sum +++ b/go.sum @@ -553,8 +553,8 @@ github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/streamnative/apiserver-builder-alpha v0.0.0-20230717175906-0f9240887463 h1:ukAAbYtzonTxP/zNwAZhFVvsqc6A8fVWSKwH9Yfu9w0= github.com/streamnative/apiserver-builder-alpha v0.0.0-20230717175906-0f9240887463/go.mod h1:W1q2VCPvT9GjMUsFX4JYmdXfSAYsrmLVLALfutt5LsE= -github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c h1:AaV+hCJ0qEoQhBSc+vzG5zJRRNWRjt6b5l3PwX1LRSs= -github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c/go.mod h1:7Zv0BDZYJA7XTCE+S324I40ee5aT0CpnaZcXPUk9r50= +github.com/streamnative/cloud-api-server v1.25.2-0.20240803044529-9e543679949e h1:xF3QG0fIs6HoTiwp7Rke8nWPu5DldHy+NcBWG6qnqOY= +github.com/streamnative/cloud-api-server v1.25.2-0.20240803044529-9e543679949e/go.mod h1:QVY82royWwFivIFXWESh5nt+g388hsxP7sqz/f6ZMpM= github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680 h1:PRqjyTsfwTCAd3SGDjgPT4Nmbczeegdm/ApyDGr14PA= github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680/go.mod h1:48XuDAuu3upsbndxqN+grxz9yPSklZk9ycxKB235dGs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=