Skip to content

Commit

Permalink
fix pulsar cluster config drift and cusu refresh (#95)
Browse files Browse the repository at this point in the history
1. pulsar cluster config drift
2. pulsar cluster CUSU in state not update if change externally
  • Loading branch information
freeznet authored Jan 17, 2025
1 parent fdcf35a commit 79b420b
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 1 deletion.
120 changes: 120 additions & 0 deletions cloud/pulsar_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ func TestPulsarCluster(t *testing.T) {
})
}

func TestPulsarClusterNoConfig(t *testing.T) {
var clusterGeneratedName = fmt.Sprintf("t-%d-%d", rand.Intn(1000), rand.Intn(100))
resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
},
ProviderFactories: testAccProviderFactories,
CheckDestroy: testCheckPulsarClusterDestroy,
Steps: []resource.TestStep{
{
Config: testResourceDataSourcePulsarClusterWithoutConfig(
"sndev",
clusterGeneratedName,
"shared-gcp",
"streamnative",
"us-central1", "rapid"),
Check: resource.ComposeTestCheckFunc(
testCheckPulsarClusterExists("streamnative_pulsar_cluster.test-pulsar-cluster"),
),
},
},
})
}

func testCheckPulsarClusterDestroy(s *terraform.State) error {
time.Sleep(30 * time.Second)
for _, rs := range s.RootModule().Resources {
Expand Down Expand Up @@ -115,6 +139,74 @@ func testCheckPulsarClusterExists(name string) resource.TestCheckFunc {
}
}

func TestPulsarClusterConfigDrift(t *testing.T) {
var clusterGeneratedName = fmt.Sprintf("t-%d-%d", rand.Intn(1000), rand.Intn(100))
resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
},
ProviderFactories: testAccProviderFactories,
CheckDestroy: testCheckPulsarClusterDestroy,
Steps: []resource.TestStep{
{
Config: testResourceDataSourcePulsarCluster(
"sndev",
clusterGeneratedName,
"shared-gcp",
"streamnative",
"us-central1", "rapid"),
Check: resource.ComposeTestCheckFunc(
testCheckPulsarClusterExists("streamnative_pulsar_cluster.test-pulsar-cluster"),
),
},
{
Config: testResourceDataSourcePulsarCluster(
"sndev",
clusterGeneratedName,
"shared-gcp",
"streamnative",
"us-central1", "rapid"),
PlanOnly: true,
ExpectNonEmptyPlan: false,
},
},
})
}

func TestPulsarClusterNoConfigConfigDrift(t *testing.T) {
var clusterGeneratedName = fmt.Sprintf("t-%d-%d", rand.Intn(1000), rand.Intn(100))
resource.Test(t, resource.TestCase{
PreCheck: func() {
testAccPreCheck(t)
},
ProviderFactories: testAccProviderFactories,
CheckDestroy: testCheckPulsarClusterDestroy,
Steps: []resource.TestStep{
{
Config: testResourceDataSourcePulsarClusterWithoutConfig(
"sndev",
clusterGeneratedName,
"shared-gcp",
"streamnative",
"us-central1", "rapid"),
Check: resource.ComposeTestCheckFunc(
testCheckPulsarClusterExists("streamnative_pulsar_cluster.test-pulsar-cluster"),
),
},
{
Config: testResourceDataSourcePulsarClusterWithoutConfig(
"sndev",
clusterGeneratedName,
"shared-gcp",
"streamnative",
"us-central1", "rapid"),
PlanOnly: true,
ExpectNonEmptyPlan: false,
},
},
})
}

func testResourceDataSourcePulsarCluster(organization, name, poolName, poolNamespace, location, releaseChannel string) string {
return fmt.Sprintf(`
provider "streamnative" {
Expand Down Expand Up @@ -159,3 +251,31 @@ data "streamnative_pulsar_cluster" "test-pulsar-cluster" {
}
`, organization, name, poolName, poolNamespace, organization, name, name, location, releaseChannel)
}

func testResourceDataSourcePulsarClusterWithoutConfig(organization, name, poolName, poolNamespace, location, releaseChannel string) string {
return fmt.Sprintf(`
provider "streamnative" {
}
resource "streamnative_pulsar_instance" "test-pulsar-instance" {
organization = "%s"
name = "%s"
availability_mode = "zonal"
pool_name = "%s"
pool_namespace = "%s"
type = "dedicated"
}
resource "streamnative_pulsar_cluster" "test-pulsar-cluster" {
organization = "%s"
name = "%s"
instance_name = "%s"
location = "%s"
release_channel = "%s"
depends_on = [streamnative_pulsar_instance.test-pulsar-instance]
}
data "streamnative_pulsar_cluster" "test-pulsar-cluster" {
depends_on = [streamnative_pulsar_cluster.test-pulsar-cluster]
organization = streamnative_pulsar_cluster.test-pulsar-cluster.organization
name = streamnative_pulsar_cluster.test-pulsar-cluster.name
}
`, organization, name, poolName, poolNamespace, organization, name, name, location, releaseChannel)
}
26 changes: 25 additions & 1 deletion cloud/resource_pulsar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cloud
import (
"context"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -177,6 +178,7 @@ func resourcePulsarCluster() *schema.Resource {
return d.Get("type") == string(cloudv1alpha1.PulsarInstanceTypeServerless)
},
MinItems: 0,
Computed: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"websocket_enabled": {
Expand Down Expand Up @@ -629,6 +631,10 @@ func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta
_ = d.Set("release_channel", releaseChannel)
}
_ = d.Set("type", pulsarInstance.Spec.Type)
computeUnit := convertCpuAndMemoryToComputeUnit(pulsarCluster)
storageUnit := convertCpuAndMemoryToStorageUnit(pulsarCluster)
_ = d.Set("compute_unit_per_broker", computeUnit)
_ = d.Set("storage_unit_per_bookie", storageUnit)
d.SetId(fmt.Sprintf("%s/%s", pulsarCluster.Namespace, pulsarCluster.Name))
return nil
}
Expand Down Expand Up @@ -940,8 +946,26 @@ func getComputeUnit(d *schema.ResourceData) float64 {

func getStorageUnit(d *schema.ResourceData) float64 {
storageUnit := d.Get("storage_unit").(float64)
if newStorageUnit, exist := d.GetOk("storage_unit"); exist {
if newStorageUnit, exist := d.GetOk("storage_unit_per_bookie"); exist {
storageUnit = newStorageUnit.(float64)
}
return storageUnit
}

func convertCpuAndMemoryToComputeUnit(pc *cloudv1alpha1.PulsarCluster) float64 {
if pc != nil && pc.Spec.Broker.Resources != nil {
cpu := pc.Spec.Broker.Resources.Cpu.MilliValue()
memory := pc.Spec.Broker.Resources.Memory.Value()
return math.Max(float64(cpu)/2/1000, float64(memory)/(8*1024*1024*1024))
}
return 0.5 // default value
}

func convertCpuAndMemoryToStorageUnit(pc *cloudv1alpha1.PulsarCluster) float64 {
if pc != nil && pc.Spec.BookKeeper.Resources != nil {
cpu := pc.Spec.BookKeeper.Resources.Cpu.MilliValue()
memory := pc.Spec.BookKeeper.Resources.Memory.Value()
return math.Max(float64(cpu)/2/1000, float64(memory)/(8*1024*1024*1024))
}
return 0.5 // default value
}
6 changes: 6 additions & 0 deletions cloud/resource_pulsar_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func resourcePulsarInstance() *schema.Resource {
Required: true,
Description: descriptions["instance_name"],
ValidateFunc: validateNotBlank,
ForceNew: true,
},
"availability_mode": {
Type: schema.TypeString,
Expand All @@ -84,30 +85,35 @@ func resourcePulsarInstance() *schema.Resource {
Type: schema.TypeString,
},
Description: descriptions["availability-mode"],
ForceNew: true,
},
"pool_name": {
Type: schema.TypeString,
Required: true,
Description: descriptions["pool_name"],
ValidateFunc: validateNotBlank,
ForceNew: true,
},
"pool_namespace": {
Type: schema.TypeString,
Required: true,
Description: descriptions["pool_namespace"],
ValidateFunc: validateNotBlank,
ForceNew: true,
},
"type": {
Type: schema.TypeString,
Optional: true,
Description: descriptions["instance_type"],
ValidateFunc: validateInstanceType,
ForceNew: true,
},
"engine": {
Type: schema.TypeString,
Optional: true,
Description: descriptions["instance_engine"],
ValidateFunc: validateEngine,
ForceNew: true,
},
"ready": {
Type: schema.TypeString,
Expand Down

0 comments on commit 79b420b

Please sign in to comment.