Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support management pulsar instance #5

Merged
merged 13 commits into from
Dec 28, 2023
22 changes: 17 additions & 5 deletions cloud/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,22 @@ var descriptions map[string]string

func init() {
descriptions = map[string]string{
"key_file_path": "The path of the private key file",
"organization": "The organization name",
"name": "The service account name",
"admin": "Whether the service account is admin",
"private_key_data": "The private key data",
"key_file_path": "The path of the private key file",
"organization": "The organization name",
"name": "The service account name",
"admin": "Whether the service account is admin",
"private_key_data": "The private key data",
"availability-mode": "The availability mode, supporting 'zonal' and 'regional'",
"pool_name": "The infrastructure pool name to use.",
"pool_namespace": "The infrastructure pool namespace to use",
"instance_name": "The pulsar instance name",
"location": "The location of the pulsar cluster",
"bookie_replicas": "The number of bookie replicas",
"broker_replicas": "The number of broker replicas",
"compute_unit": "compute unit, 1 compute unit is 2 cpu and 8gb memory",
"storage_unit": "storage unit, 1 storage unit is 2 cpu and 8gb memory",
"cluster_ready": "Pulsar cluster is ready, it will be set to 'True' after the cluster is ready",
"instance_ready": "Pulsar instance is ready, it will be set to 'True' after the instance is ready",
}
}

Expand All @@ -50,6 +61,7 @@ func Provider() *schema.Provider {
},
ResourcesMap: map[string]*schema.Resource{
"streamnative_service_account": resourceServiceAccount(),
"streamnative_pulsar_instance": resourcePulsarInstance(),
},
}
provider.ConfigureContextFunc = func(_ context.Context, d *schema.ResourceData) (interface{}, diag.Diagnostics) {
Expand Down
195 changes: 195 additions & 0 deletions cloud/resource_pulsar_instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package cloud

import (
"context"
"fmt"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
cloudv1alpha1 "github.com/streamnative/cloud-api-server/pkg/apis/cloud/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
"time"
)

func resourcePulsarInstance() *schema.Resource {
return &schema.Resource{
CreateContext: resourcePulsarInstanceCreate,
ReadContext: resourcePulsarInstanceRead,
UpdateContext: resourcePulsarInstanceUpdate,
DeleteContext: resourcePulsarInstanceDelete,
CustomizeDiff: func(ctx context.Context, diff *schema.ResourceDiff, i interface{}) error {
oldOrg, _ := diff.GetChange("organization")
oldName, _ := diff.GetChange("name")
if oldOrg.(string) == "" && oldName.(string) == "" {
// This is create event, so we don't need to check the diff.
return nil
}
if diff.HasChange("name") ||
diff.HasChanges("organization") ||
diff.HasChanges("availability_mode") ||
diff.HasChanges("pool_name") ||
diff.HasChanges("pool_namespace") {
return fmt.Errorf("ERROR_UPDATE_PULSAR_INSTANCE: " +
"The pulsar instance does not support updates, please recreate it")
}
return nil
},
Importer: &schema.ResourceImporter{
StateContext: func(ctx context.Context, d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
organizationInstance := strings.Split(d.Id(), "/")
_ = d.Set("organization", organizationInstance[0])
_ = d.Set("name", organizationInstance[1])
err := resourcePulsarInstanceRead(ctx, d, meta)
if err.HasError() {
return nil, fmt.Errorf("import %q: %s", d.Id(), err[0].Summary)
}
return []*schema.ResourceData{d}, nil
},
},
Schema: map[string]*schema.Schema{
"organization": {
Type: schema.TypeString,
Required: true,
Description: descriptions["organization"],
ValidateFunc: validateNotBlank,
},
"name": {
Type: schema.TypeString,
Required: true,
Description: descriptions["name"],
ValidateFunc: validateNotBlank,
},
"availability_mode": {
Type: schema.TypeString,
Required: true,
Elem: &schema.Schema{
Type: schema.TypeString,
},
Description: descriptions["availability-mode"],
},
"pool_name": {
Type: schema.TypeString,
Required: true,
Description: descriptions["pool_name"],
ValidateFunc: validateNotBlank,
},
"pool_namespace": {
Type: schema.TypeString,
Required: true,
Description: descriptions["pool_namespace"],
ValidateFunc: validateNotBlank,
},
"ready": {
Type: schema.TypeString,
Computed: true,
Description: descriptions["instance_ready"],
},
},
}
}

func resourcePulsarInstanceCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
namespace := d.Get("organization").(string)
name := d.Get("name").(string)
availabilityMode := d.Get("availability_mode").(string)
poolName := d.Get("pool_name").(string)
poolNamespace := d.Get("pool_namespace").(string)
clientSet, err := getClientSet(getFactoryFromMeta(meta))
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_INIT_CLIENT_ON_PULSAR_INSTANCE: %w", err))
}
poolRef := &cloudv1alpha1.PoolRef{
Namespace: poolNamespace,
Name: poolName,
}
pulsarInstance := &cloudv1alpha1.PulsarInstance{
TypeMeta: metav1.TypeMeta{
Kind: "PulsarInstance",
APIVersion: cloudv1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: cloudv1alpha1.PulsarInstanceSpec{
AvailabilityMode: cloudv1alpha1.InstanceAvailabilityMode(availabilityMode),
Type: cloudv1alpha1.PulsarInstanceTypeStandard,
PoolRef: poolRef,
},
}
pi, err := clientSet.CloudV1alpha1().PulsarInstances(namespace).Create(ctx, pulsarInstance, metav1.CreateOptions{
FieldManager: "terraform-create",
})
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_CREATE_PULSAR_INSTANCE: %w", err))
}
if pi.Status.Conditions != nil {
ready := false
for _, condition := range pi.Status.Conditions {
if condition.Type == "Ready" && condition.Status == "True" {
ready = true
}
}
if ready {
_ = d.Set("organization", namespace)
_ = d.Set("name", name)
return resourcePulsarInstanceRead(ctx, d, meta)
}
}
err = retry.RetryContext(ctx, 3*time.Minute, func() *retry.RetryError {
dia := resourcePulsarInstanceRead(ctx, d, meta)
if dia.HasError() {
return retry.NonRetryableError(fmt.Errorf("ERROR_RETRY_READ_PULSAR_INSTANCE: %s", dia[0].Summary))
}
ready := d.Get("ready")
if ready == "False" {
return retry.RetryableError(fmt.Errorf("CONTINUE_RETRY_READ_PULSAR_INSTANCE"))
}
return nil
})
return nil
}

func resourcePulsarInstanceRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
namespace := d.Get("organization").(string)
name := d.Get("name").(string)
clientSet, err := getClientSet(getFactoryFromMeta(meta))
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_INIT_CLIENT_ON_READ_SERVICE_ACCOUNT: %w", err))
}
pulsarInstance, err := clientSet.CloudV1alpha1().PulsarInstances(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_READ_PULSAR_INSTANCE: %w", err))
}
_ = d.Set("ready", "False")
if pulsarInstance.Status.Conditions != nil {
for _, condition := range pulsarInstance.Status.Conditions {
if condition.Type == "Ready" && condition.Status == "True" {
_ = d.Set("ready", "True")
}
}
}
d.SetId(fmt.Sprintf("%s/%s", pulsarInstance.Namespace, pulsarInstance.Name))
return nil
}

func resourcePulsarInstanceUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_INSTANCE: " +
"The pulsar instance does not support updates, please recreate it"))
}

func resourcePulsarInstanceDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
clientSet, err := getClientSet(getFactoryFromMeta(meta))
if err != nil {
return diag.FromErr(fmt.Errorf("ERROR_INIT_CLIENT_ON_DELETE_PULSAR_INSTANCE: %w", err))
}
namespace := d.Get("organization").(string)
name := d.Get("name").(string)
err = clientSet.CloudV1alpha1().PulsarInstances(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil {
return diag.FromErr(fmt.Errorf("DELETE_PULSAR_INSTANCE: %w", err))
}
_ = d.Set("name", "")
return nil
}
116 changes: 116 additions & 0 deletions cloud/resource_pulsar_instance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package cloud

import (
"context"
"fmt"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
"testing"
"time"
)

func TestResourcePulsarInstance(t *testing.T) {
resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
},
ProviderFactories: testAccProviderFactories,
CheckDestroy: testCheckResourcePulsarInstanceDestroy,
Steps: []resource.TestStep{
{
Config: testResourcePulsarInstance(
"sndev",
"terraform-test-pulsar-instance-b",
"zonal",
"shared-gcp",
"streamnative"),
Check: resource.ComposeTestCheckFunc(
testCheckResourcePulsarInstanceExists("streamnative_pulsar_instance.test-pulsar-instance"),
),
},
},
})
}

func testCheckResourcePulsarInstanceDestroy(s *terraform.State) error {
// Add a sleep for wait the service account to be deleted
// It seems that azure connection to gcp is slow, so add a delay to wait
// for the resource to be cleaned up and check it again
time.Sleep(5 * time.Second)
for _, rs := range s.RootModule().Resources {
if rs.Type != "streamnative_pulsar_instance" {
continue
}
meta := testAccProvider.Meta()
clientSet, err := getClientSet(getFactoryFromMeta(meta))
if err != nil {
return err
}
organizationInstance := strings.Split(rs.Primary.ID, "/")
_, err = clientSet.CloudV1alpha1().
PulsarInstances(organizationInstance[0]).
Get(context.Background(), organizationInstance[1], metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return fmt.Errorf(`ERROR_RESOURCE_PULSAR_INSTANCE_STILL_EXISTS: "%s"`, rs.Primary.ID)
}
return nil
}

func testCheckResourcePulsarInstanceExists(name string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[name]
if !ok {
return fmt.Errorf(`ERROR_RESOURCE_PULSAR_INSTANCE_NOT_FOUND: "%s"`, name)
}
if rs.Primary.ID == "" {
return fmt.Errorf("ERROR_RESOURCE_PULSAR_INSTANCE_ID_NOT_SET")
}
meta := testAccProvider.Meta()
clientSet, err := getClientSet(getFactoryFromMeta(meta))
if err != nil {
return err
}
organizationInstance := strings.Split(rs.Primary.ID, "/")
pulsarInstance, err := clientSet.CloudV1alpha1().
PulsarInstances(organizationInstance[0]).
Get(context.Background(), organizationInstance[1], metav1.GetOptions{})
if err != nil {
return err
}
if pulsarInstance.Status.Conditions != nil {
ready := false
for _, condition := range pulsarInstance.Status.Conditions {
if condition.Type == "Ready" && condition.Status == "True" {
ready = true
}
}
if !ready {
return fmt.Errorf(`ERROR_RESOURCE_PULSAR_INSTANCE_NOT_READY: "%s"`, rs.Primary.ID)
}
}
return nil
}
}

func testResourcePulsarInstance(
organization string, name string, availabilityMode string, poolName string, poolNamespace string) string {
return fmt.Sprintf(`
provider "streamnative" {
}
resource "streamnative_pulsar_instance" "test-pulsar-instance" {
organization = "%s"
name = "%s"
availability_mode = "%s"
pool_name = "%s"
pool_namespace = "%s"
}
`, organization, name, availabilityMode, poolName, poolNamespace)
}
3 changes: 2 additions & 1 deletion cloud/resource_service_account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ func TestResourceServiceAccount(t *testing.T) {
CheckDestroy: testCheckResourceServiceAccountDestroy,
Steps: []resource.TestStep{
{
Config: testResourceServiceAccount("sndev", "test-service-account-b", true),
Config: testResourceServiceAccount(
"sndev", "terraform-test-service-account-b", true),
Check: resource.ComposeTestCheckFunc(
testCheckResourceServiceAccountExists("streamnative_service_account.test-service-account"),
),
Expand Down
27 changes: 27 additions & 0 deletions cloud/validate_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,30 @@ func validateNotBlank(val interface{}, key string) (warns []string, errs []error
}
return
}

func validateBookieReplicas(val interface{}, key string) (warns []string, errs []error) {
v := val.(int)
if v < 3 || v > 15 {
errs = append(errs, fmt.Errorf(
"%q should be greater than or equal to 3 and less than or equal to 15, got: %d", key, v))
}
return
}

func validateBrokerReplicas(val interface{}, key string) (warns []string, errs []error) {
v := val.(int)
if v < 1 || v > 15 {
errs = append(errs, fmt.Errorf(
"%q should be greater than or equal to 1 and less than or equal to 15, got: %d", key, v))
}
return
}

func validateCUSU(val interface{}, key string) (warns []string, errs []error) {
v := val.(float64)
if v < 0.2 || v > 8 {
errs = append(errs, fmt.Errorf(
"%q should be greater than or equal to 0.2 and less than or equal to 8, got: %f", key, v))
}
return
}
Loading