Skip to content

Commit

Permalink
Support serverless cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
tuteng committed Aug 6, 2024
1 parent 54e5aed commit 2f2d07f
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 22 deletions.
17 changes: 13 additions & 4 deletions cloud/data_source_pulsar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cloud
import (
"context"
"fmt"
cloudv1alpha1 "github.com/streamnative/cloud-api-server/pkg/apis/cloud/v1alpha1"
"strings"

"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
Expand Down Expand Up @@ -235,6 +236,11 @@ func dataSourcePulsarCluster() *schema.Resource {
Computed: true,
Description: descriptions["bookkeeper_version"],
},
"type": {
Type: schema.TypeString,
Computed: true,
Description: descriptions["instance_type"],
},
},
}
}
Expand Down Expand Up @@ -321,10 +327,13 @@ func dataSourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, me
return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER_CONFIG: %w", err))
}
}
brokerImage := strings.Split(pulsarCluster.Spec.Broker.Image, ":")
_ = d.Set("pulsar_version", brokerImage[1])
bookkeeperImage := strings.Split(pulsarCluster.Spec.BookKeeper.Image, ":")
_ = d.Set("bookkeeper_version", bookkeeperImage[1])
if pulsarInstance.Spec.Type != cloudv1alpha1.PulsarInstanceTypeServerless {
brokerImage := strings.Split(pulsarCluster.Spec.Broker.Image, ":")
_ = d.Set("pulsar_version", brokerImage[1])
bookkeeperImage := strings.Split(pulsarCluster.Spec.BookKeeper.Image, ":")
_ = d.Set("bookkeeper_version", bookkeeperImage[1])
}
_ = d.Set("type", pulsarInstance.Spec.Type)
releaseChannel := pulsarCluster.Spec.ReleaseChannel
if releaseChannel != "" {
_ = d.Set("release_channel", releaseChannel)
Expand Down
2 changes: 2 additions & 0 deletions cloud/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func init() {
"service_account_name": "The service account name",
"service_account_binding_name": "The service account binding name",
"cluster_name": "The pulsar cluster name",
"cluster_display_name": "The pulsar cluster display name",
"admin": "Whether the service account is admin",
"private_key_data": "The private key data",
"availability-mode": "The availability mode, supporting 'zonal' and 'regional'",
Expand All @@ -74,6 +75,7 @@ func init() {
"pool_member_type": "Type of infrastructure pool member, one of aws, gcloud and azure",
"pool_member_location": "The location of the infrastructure pool member",
"instance_name": "The pulsar instance name",
"instance_type": "The streamnative cloud instance type, supporting 'serverless' and 'standard'",
"location": "The location of the pulsar cluster, " +
"supported location https://docs.streamnative.io/docs/cluster#cluster-location",
"release_channel": "The release channel of the pulsar cluster subscribe to, it must to be lts or rapid, default rapid",
Expand Down
88 changes: 74 additions & 14 deletions cloud/resource_pulsar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ func resourcePulsarCluster() *schema.Resource {
DeleteContext: resourcePulsarClusterDelete,
CustomizeDiff: func(ctx context.Context, diff *schema.ResourceDiff, i interface{}) error {
oldOrg, _ := diff.GetChange("organization")
oldName, _ := diff.GetChange("name")
oldName, newName := diff.GetChange("name")
if oldOrg.(string) == "" && oldName.(string) == "" {
// This is create event, so we don't need to check the diff.
return nil
}
if oldName != "" && newName == "" {
// Auto generate the name, so we don't need to check the diff.
return nil
}
if diff.HasChanges([]string{"organization", "name", "instance_name", "location", "pool_member_name"}...) {
return fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
"The pulsar cluster organization, name, instance_name, location, pool_member_name does not support updates, please recreate it")
Expand All @@ -68,10 +72,14 @@ func resourcePulsarCluster() *schema.Resource {
ValidateFunc: validateNotBlank,
},
"name": {
Type: schema.TypeString,
Required: true,
Description: descriptions["cluster_name"],
ValidateFunc: validateNotBlank,
Type: schema.TypeString,
Optional: true,
Description: descriptions["cluster_name"],
},
"display_name": {
Type: schema.TypeString,
Optional: true,
Description: descriptions["cluster_display_name"],
},
"instance_name": {
Type: schema.TypeString,
Expand Down Expand Up @@ -290,13 +298,19 @@ func resourcePulsarCluster() *schema.Resource {
Computed: true,
Description: descriptions["bookkeeper_version"],
},
"type": {
Type: schema.TypeString,
Computed: true,
Description: descriptions["instance_type"],
},
},
}
}

func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
namespace := d.Get("organization").(string)
name := d.Get("name").(string)
displayName := d.Get("display_name").(string)
instanceName := d.Get("instance_name").(string)
pool_member_name := d.Get("pool_member_name").(string)
location := d.Get("location").(string)
Expand Down Expand Up @@ -355,7 +369,6 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me
APIVersion: cloudv1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: cloudv1alpha1.PulsarClusterSpec{
Expand All @@ -380,6 +393,16 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me
},
},
}
if name != "" {
pulsarCluster.ObjectMeta.Name = name
}
if displayName != "" {
pulsarCluster.Spec.DisplayName = displayName
}
if displayName == "" && name == "" {
return diag.FromErr(fmt.Errorf("ERROR_CREATE_PULSAR_CLUSTER: " +
"either name or display_name must be provided"))
}
if pool_member_name != "" {
pulsarCluster.Spec.PoolMemberRef = cloudv1alpha1.PoolMemberReference{
Name: pool_member_name,
Expand All @@ -397,13 +420,17 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me
}
}
}
getPulsarClusterChanged(pulsarCluster, d)
if pulsarInstance.Spec.Type == cloudv1alpha1.PulsarInstanceTypeStandard {
getPulsarClusterChanged(pulsarCluster, d)
}
pc, err := clientSet.CloudV1alpha1().PulsarClusters(namespace).Create(ctx, pulsarCluster, metav1.CreateOptions{
FieldManager: "terraform-create",
})
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_CREATE_PULSAR_CLUSTER: %w", err))
}
d.Set("name", pc.Name)
d.Set("display_name", pc.Spec.DisplayName)
if pc.Status.Conditions != nil {
ready := false
for _, condition := range pc.Status.Conditions {
Expand Down Expand Up @@ -435,13 +462,32 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me
func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
namespace := d.Get("organization").(string)
name := d.Get("name").(string)
displayName := d.Get("display_name").(string)
clientSet, err := getClientSet(getFactoryFromMeta(meta))
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_INIT_CLIENT_ON_READ_PULSAR_CLUSTER: %w", err))
}
pulsarCluster, err := clientSet.CloudV1alpha1().PulsarClusters(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER: %w", err))
var pulsarCluster *cloudv1alpha1.PulsarCluster
if name != "" {
pulsarCluster, err = clientSet.CloudV1alpha1().PulsarClusters(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER: %w", err))
}
} else {
pulsarClusters, err := clientSet.CloudV1alpha1().PulsarClusters(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_LIST_PULSAR_CLUSTER: %w", err))
}
for _, cluster := range pulsarClusters.Items {
if cluster.Spec.DisplayName == displayName {
pulsarCluster = &cluster
break
}
}
if pulsarCluster == nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER: "+
"the pulsar cluster with display_name %s does not exist", displayName))
}
}
_ = d.Set("ready", "False")
if pulsarCluster.Status.Conditions != nil {
Expand Down Expand Up @@ -514,19 +560,28 @@ func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta
return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_CLUSTER_CONFIG: %w", err))
}
}
brokerImage := strings.Split(pulsarCluster.Spec.Broker.Image, ":")
_ = d.Set("pulsar_version", brokerImage[1])
bookkeeperImage := strings.Split(pulsarCluster.Spec.BookKeeper.Image, ":")
_ = d.Set("bookkeeper_version", bookkeeperImage[1])
if pulsarInstance.Spec.Type != cloudv1alpha1.PulsarInstanceTypeServerless {
brokerImage := strings.Split(pulsarCluster.Spec.Broker.Image, ":")
_ = d.Set("pulsar_version", brokerImage[1])
bookkeeperImage := strings.Split(pulsarCluster.Spec.BookKeeper.Image, ":")
_ = d.Set("bookkeeper_version", bookkeeperImage[1])
}
releaseChannel := pulsarCluster.Spec.ReleaseChannel
if releaseChannel != "" {
_ = d.Set("release_channel", releaseChannel)
}
_ = d.Set("type", pulsarInstance.Spec.Type)
d.SetId(fmt.Sprintf("%s/%s", pulsarCluster.Namespace, pulsarCluster.Name))
return nil
}

func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
serverless := d.Get("type")
if serverless == cloudv1alpha1.PulsarInstanceTypeServerless {
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: "+
"updating a cluster under instance of type '%s' is no longer allowed",
cloudv1alpha1.PulsarInstanceTypeServerless))
}
if d.HasChange("organization") {
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
"The pulsar cluster organization does not support updates"))
Expand Down Expand Up @@ -616,7 +671,12 @@ func resourcePulsarClusterDelete(ctx context.Context, d *schema.ResourceData, me
return diag.FromErr(fmt.Errorf("ERROR_INIT_CLIENT_ON_DELETE_PULSAR_CLUSTER: %w", err))
}
namespace := d.Get("organization").(string)
t := d.Get("type")
name := d.Get("name").(string)
if t == cloudv1alpha1.PulsarInstanceTypeServerless {
id := strings.Split(d.Id(), "/")
name = id[1]
}
err = clientSet.CloudV1alpha1().PulsarClusters(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_DELETE_PULSAR_CLUSTER: %w", err))
Expand Down
15 changes: 14 additions & 1 deletion cloud/resource_pulsar_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func resourcePulsarInstance() *schema.Resource {
Description: descriptions["pool_namespace"],
ValidateFunc: validateNotBlank,
},
"type": {
Type: schema.TypeString,
Optional: true,
Description: descriptions["instance_type"],
ValidateFunc: validateInstanceType,
},
"ready": {
Type: schema.TypeString,
Computed: true,
Expand All @@ -110,6 +116,7 @@ func resourcePulsarInstanceCreate(ctx context.Context, d *schema.ResourceData, m
availabilityMode := d.Get("availability_mode").(string)
poolName := d.Get("pool_name").(string)
poolNamespace := d.Get("pool_namespace").(string)
instanceType := d.Get("type").(string)
clientSet, err := getClientSet(getFactoryFromMeta(meta))
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_INIT_CLIENT_ON_PULSAR_INSTANCE: %w", err))
Expand All @@ -118,6 +125,12 @@ func resourcePulsarInstanceCreate(ctx context.Context, d *schema.ResourceData, m
Namespace: poolNamespace,
Name: poolName,
}
var t cloudv1alpha1.PulsarInstanceType
if instanceType == "" {
t = cloudv1alpha1.PulsarInstanceTypeStandard
} else {
t = cloudv1alpha1.PulsarInstanceType(instanceType)
}
pulsarInstance := &cloudv1alpha1.PulsarInstance{
TypeMeta: metav1.TypeMeta{
Kind: "PulsarInstance",
Expand All @@ -129,7 +142,7 @@ func resourcePulsarInstanceCreate(ctx context.Context, d *schema.ResourceData, m
},
Spec: cloudv1alpha1.PulsarInstanceSpec{
AvailabilityMode: cloudv1alpha1.InstanceAvailabilityMode(availabilityMode),
Type: cloudv1alpha1.PulsarInstanceTypeStandard,
Type: t,
PoolRef: poolRef,
},
}
Expand Down
8 changes: 8 additions & 0 deletions cloud/validate_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ func validateNotBlank(val interface{}, key string) (warns []string, errs []error
return
}

func validateInstanceType(val interface{}, key string) (warns []string, errs []error) {
v := val.(string)
if v != "serverless" && v != "standard" {
errs = append(errs, fmt.Errorf("%q must be serverless or standard", key))
}
return
}

func validateReleaseChannel(val interface{}, key string) (warns []string, errs []error) {
v := val.(string)
if v != "lts" && v != "rapid" {
Expand Down
68 changes: 68 additions & 0 deletions examples/serverless/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# // Licensed to the Apache Software Foundation (ASF) under one
# // or more contributor license agreements. See the NOTICE file
# // distributed with this work for additional information
# // regarding copyright ownership. The ASF licenses this file
# // to you under the Apache License, Version 2.0 (the
# // "License"); you may not use this file except in compliance
# // with the License. You may obtain a copy of the License at
# //
# // http://www.apache.org/licenses/LICENSE-2.0
# //
# // Unless required by applicable law or agreed to in writing,
# // software distributed under the License is distributed on an
# // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# // KIND, either express or implied. See the License for the
# // specific language governing permissions and limitations
# // under the License.

terraform {
required_providers {
streamnative = {
version = "0.1.0"
source = "streamnative/streamnative"
}
}
}

provider "streamnative" {
# Please replace path use your own key file path
key_file_path = "/Users/tuteng/Downloads/o-uvzjb-testadmin.json"
}

resource "streamnative_pulsar_instance" "test-serverless" {
organization = "o-uvzjb"
name = "test-serverless"
availability_mode = "zonal"
pool_name = "functions-aws"
pool_namespace = "streamnative"
type = "serverless"
}

data "streamnative_pulsar_instance" "test-serverless" {
depends_on = [streamnative_pulsar_instance.test-serverless]
name = streamnative_pulsar_instance.test-serverless.name
organization = streamnative_pulsar_instance.test-serverless.organization
}

output "pulsar_instance" {
value = data.streamnative_pulsar_instance.test-serverless
}

resource "streamnative_pulsar_cluster" "test-serverless" {
depends_on = [streamnative_pulsar_instance.test-serverless]
organization = streamnative_pulsar_instance.test-serverless.organization
display_name = "test-serverless"
instance_name = streamnative_pulsar_instance.test-serverless.name
location = "us-east-2"
release_channel = "rapid"
}

data "streamnative_pulsar_cluster" "test-serverless" {
depends_on = [streamnative_pulsar_cluster.test-serverless]
organization = streamnative_pulsar_cluster.test-serverless.organization
name = streamnative_pulsar_cluster.test-serverless.name
}

output "pulsar_cluster_serverless" {
value = data.streamnative_pulsar_cluster.test-serverless
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/lestrrat-go/jwx/v2 v2.0.21
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.9.1
github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c
github.com/streamnative/cloud-api-server v1.25.2-0.20240803044529-9e543679949e
github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680
github.com/xhit/go-str2duration/v2 v2.1.0
k8s.io/apimachinery v0.28.4
Expand Down Expand Up @@ -246,6 +246,7 @@ replace (
github.com/streamnative/pulsar-operators/pulsar-operator/api => github.com/streamnative/pulsar-operators/pulsar-operator/api v0.17.3
github.com/streamnative/pulsar-operators/zookeeper-operator => github.com/streamnative/pulsar-operators/zookeeper-operator v0.17.3
github.com/streamnative/pulsar-operators/zookeeper-operator/api => github.com/streamnative/pulsar-operators/zookeeper-operator/api v0.17.3
github.com/streamnative/sn-operator/api => github.com/streamnative/sn-operator/api v0.4.10
google.golang.org/api => google.golang.org/api v0.32.0
google.golang.org/genproto => google.golang.org/genproto v0.0.0-20200825200019-8632dd797987
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streamnative/apiserver-builder-alpha v0.0.0-20230717175906-0f9240887463 h1:ukAAbYtzonTxP/zNwAZhFVvsqc6A8fVWSKwH9Yfu9w0=
github.com/streamnative/apiserver-builder-alpha v0.0.0-20230717175906-0f9240887463/go.mod h1:W1q2VCPvT9GjMUsFX4JYmdXfSAYsrmLVLALfutt5LsE=
github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c h1:AaV+hCJ0qEoQhBSc+vzG5zJRRNWRjt6b5l3PwX1LRSs=
github.com/streamnative/cloud-api-server v1.25.2-0.20240607131732-e2211877910c/go.mod h1:7Zv0BDZYJA7XTCE+S324I40ee5aT0CpnaZcXPUk9r50=
github.com/streamnative/cloud-api-server v1.25.2-0.20240803044529-9e543679949e h1:xF3QG0fIs6HoTiwp7Rke8nWPu5DldHy+NcBWG6qnqOY=
github.com/streamnative/cloud-api-server v1.25.2-0.20240803044529-9e543679949e/go.mod h1:QVY82royWwFivIFXWESh5nt+g388hsxP7sqz/f6ZMpM=
github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680 h1:PRqjyTsfwTCAd3SGDjgPT4Nmbczeegdm/ApyDGr14PA=
github.com/streamnative/cloud-cli v0.14.3-0.20240202094224-5eec608e4680/go.mod h1:48XuDAuu3upsbndxqN+grxz9yPSklZk9ycxKB235dGs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down

0 comments on commit 2f2d07f

Please sign in to comment.