From be6258f91151a1c04f3d89f470069e714c44f450 Mon Sep 17 00:00:00 2001 From: Can Hanhan Date: Thu, 9 May 2024 11:27:52 -0500 Subject: [PATCH] Implements "managed_replication_factor" attribute --- kafka/resource_kafka_topic.go | 51 ++++++++++---- kafka/resource_kafka_topic_test.go | 109 +++++++++++++++++++++++++++++ kafka/topic.go | 5 ++ 3 files changed, 153 insertions(+), 12 deletions(-) diff --git a/kafka/resource_kafka_topic.go b/kafka/resource_kafka_topic.go index 1794246a..36206f2a 100644 --- a/kafka/resource_kafka_topic.go +++ b/kafka/resource_kafka_topic.go @@ -36,11 +36,20 @@ func kafkaTopicResource() *schema.Resource { ValidateFunc: validation.IntAtLeast(1), }, "replication_factor": { - Type: schema.TypeInt, - Required: true, - ForceNew: false, - Description: "Number of replicas.", - ValidateFunc: validation.IntAtLeast(1), + Type: schema.TypeInt, + Required: true, + ForceNew: false, + Description: "Number of replicas.", + ValidateFunc: validation.IntAtLeast(1), + DiffSuppressFunc: checkManagedReplicationFactor, + }, + "managed_replication_factor": { + Type: schema.TypeBool, + Required: false, + Optional: true, + ForceNew: false, + Default: false, + Description: "Replication factor is managed by server-side.", }, "config": { Type: schema.TypeMap, @@ -101,8 +110,11 @@ func topicUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) return diag.FromErr(err) } - // update replica count of existing partitions before adding new ones - if d.HasChange("replication_factor") { + managedReplicationFactor := d.Get("managed_replication_factor").(bool) + if managedReplicationFactor { + log.Printf("[INFO] Ignoring replication factor") + } else if d.HasChange("replication_factor") { + // update replica count of existing partitions before adding new ones oi, ni := d.GetChange("replication_factor") oldRF := oi.(int) newRF := ni.(int) @@ -131,7 +143,7 @@ func topicUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) } } - if err := waitForTopicRefresh(ctx, c, d.Id(), t); err != nil { + if err := waitForTopicRefresh(ctx, c, d.Id(), t, managedReplicationFactor); err != nil { return diag.FromErr(err) } @@ -170,12 +182,12 @@ func waitForRFUpdate(ctx context.Context, client *LazyClient, topic string) erro return nil } -func waitForTopicRefresh(ctx context.Context, client *LazyClient, topic string, expected Topic) error { +func waitForTopicRefresh(ctx context.Context, client *LazyClient, topic string, expected Topic, managedReplicationFactor bool) error { timeout := time.Duration(client.Config.Timeout) * time.Second stateConf := &retry.StateChangeConf{ Pending: []string{"Updating"}, Target: []string{"Ready"}, - Refresh: topicRefreshFunc(client, topic, expected), + Refresh: topicRefreshFunc(client, topic, expected, managedReplicationFactor), Timeout: timeout, Delay: 1 * time.Second, PollInterval: 1 * time.Second, @@ -191,7 +203,7 @@ func waitForTopicRefresh(ctx context.Context, client *LazyClient, topic string, return nil } -func topicRefreshFunc(client *LazyClient, topic string, expected Topic) retry.StateRefreshFunc { +func topicRefreshFunc(client *LazyClient, topic string, expected Topic, managedReplicationFactor bool) retry.StateRefreshFunc { return func() (result interface{}, s string, err error) { log.Printf("[DEBUG] waiting for topic to update %s", topic) actual, err := client.ReadTopic(topic, true) @@ -200,6 +212,12 @@ func topicRefreshFunc(client *LazyClient, topic string, expected Topic) retry.St return actual, "Error", err } + // If the managed replication factor is set by server side + // we should ignore the replication factor in the comparison + if managedReplicationFactor { + actual.ReplicationFactor = -1 + } + if expected.Equal(actual) { return actual, "Ready", nil } @@ -274,6 +292,7 @@ func topicRead(ctx context.Context, d *schema.ResourceData, meta interface{}) di errSet.Set("name", topic.Name) errSet.Set("partitions", topic.Partitions) errSet.Set("replication_factor", topic.ReplicationFactor) + errSet.Set("managed_replication_factor", d.Get("managed_replication_factor")) errSet.Set("config", topic.Config) if errSet.err != nil { @@ -302,7 +321,11 @@ func customDiff(ctx context.Context, diff *schema.ResourceDiff, v interface{}) e } } - if diff.HasChange("replication_factor") { + managedReplicationFactor := diff.Get("managed_replication_factor").(bool) + if managedReplicationFactor { + log.Printf("[INFO] Ignoring replication factor") + + } else if diff.HasChange("replication_factor") { log.Printf("[INFO] Checking the diff!") client := v.(*LazyClient) @@ -321,3 +344,7 @@ func customDiff(ctx context.Context, diff *schema.ResourceDiff, v interface{}) e return nil } + +func checkManagedReplicationFactor(k, old, new string, d *schema.ResourceData) bool { + return d.Get("managed_replication_factor").(bool) +} diff --git a/kafka/resource_kafka_topic_test.go b/kafka/resource_kafka_topic_test.go index 4bdf1e84..a82a5d64 100644 --- a/kafka/resource_kafka_topic_test.go +++ b/kafka/resource_kafka_topic_test.go @@ -175,6 +175,77 @@ func TestAcc_TopicAlterReplicationFactor(t *testing.T) { }) } +func TestAcc_TopicAlterBetweenManagedReplicationFactorAndUnmanaged(t *testing.T) { + t.Parallel() + u, err := uuid.GenerateUUID() + if err != nil { + t.Fatal(err) + } + topicName := fmt.Sprintf("syslog-%s", u) + bs := testBootstrapServers[0] + + keyEncoder := sarama.StringEncoder("same key -> same partition -> same ordering") + messages := []*sarama.ProducerMessage{ + { + Topic: topicName, + Key: keyEncoder, + Value: sarama.StringEncoder("Krusty"), + }, + { + Topic: topicName, + Key: keyEncoder, + Value: sarama.StringEncoder("Krab"), + }, + { + Topic: topicName, + Key: keyEncoder, + Value: sarama.StringEncoder("Pizza"), + }, + } + + r.Test(t, r.TestCase{ + ProviderFactories: overrideProviderFactory(), + PreCheck: func() { testAccPreCheck(t) }, + CheckDestroy: testAccCheckTopicDestroy, + Steps: []r.TestStep{ + { + Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_updateRF, topicName, 1, 3)), + Check: r.ComposeTestCheckFunc( + testResourceTopic_produceMessages(messages), + testResourceTopic_initialCheck), + }, + { + // Test altering from unmanaged replication factor to managed replication factor with same values + Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_managedReplicationFactor, topicName, 1, 3)), + Check: r.ComposeTestCheckFunc( + testResourceTopic_updateManagedRFCheck, + testResourceTopic_checkSameMessages(messages)), + }, + { + // Test updating partitions in managed replication factor + Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_managedReplicationFactor, topicName, 1, 4)), + Check: r.ComposeTestCheckFunc( + testResourceTopic_updateManagedRFCheck, + testResourceTopic_checkSameMessages(messages)), + }, + { + // Test updating replication factor while managed replication factor + Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_managedReplicationFactor, topicName, 2, 4)), + Check: r.ComposeTestCheckFunc( + testResourceTopic_updateManagedRFCheck, + testResourceTopic_checkSameMessages(messages)), + }, + { + // Test switching to unmanaged replication factor + Config: cfg(t, bs, fmt.Sprintf(testResourceTopic_updateRF, topicName, 1, 5)), + Check: r.ComposeTestCheckFunc( + testResourceTopic_updateRFCheck, + testResourceTopic_checkSameMessages(messages)), + }, + }, + }) +} + func testResourceTopic_noConfigCheck(s *terraform.State) error { resourceState := s.Modules[0].Resources["kafka_topic.test"] if resourceState == nil { @@ -354,6 +425,30 @@ func testResourceTopic_updateRFCheck(s *terraform.State) error { return nil } +func testResourceTopic_updateManagedRFCheck(s *terraform.State) error { + resourceState := s.Modules[0].Resources["kafka_topic.test"] + instanceState := resourceState.Primary + client := testProvider.Meta().(*LazyClient) + topicName := instanceState.Attributes["name"] + + parsed, err := strconv.ParseInt(instanceState.Attributes["partitions"], 10, 32) + if err != nil { + return err + } + expectedPartitions := int32(parsed) + + topic, err := client.ReadTopic(topicName, true) + if err != nil { + return err + } + + if actual := topic.Partitions; actual != expectedPartitions { + return fmt.Errorf("expected %d partitions, but got %d", expectedPartitions, actual) + } + + return nil +} + func testResourceTopic_checkSameMessages(producedMessages []*sarama.ProducerMessage) r.TestCheckFunc { return func(s *terraform.State) error { resourceState := s.Modules[0].Resources["kafka_topic.test"] @@ -509,3 +604,17 @@ resource "kafka_topic" "test" { } } ` + +const testResourceTopic_managedReplicationFactor = ` +resource "kafka_topic" "test" { + name = "%s" + replication_factor = %d + partitions = %d + managed_replication_factor = true + + config = { + "retention.ms" = "11111" + "segment.ms" = "22222" + } +} +` diff --git a/kafka/topic.go b/kafka/topic.go index d4712008..d893c681 100644 --- a/kafka/topic.go +++ b/kafka/topic.go @@ -72,6 +72,11 @@ func metaToTopic(d *schema.ResourceData, meta interface{}) Topic { convertedPartitions := int32(partitions) convertedRF := int16(replicationFactor) config := d.Get("config").(map[string]interface{}) + managedReplicationFactor := d.Get("managed_replication_factor").(bool) + + if managedReplicationFactor { + convertedRF = -1 + } m2 := make(map[string]*string) for key, value := range config {