diff --git a/changelog.md b/changelog.md index 20118a93b..4f6ed9edb 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,6 @@ latest: -* Add support for platform wide acls for schema registry in the topology description file. +* Add support for platform wide acls for schema registry in the topology description file. +* Rebird the option to disable deletes when not required anymore. v0.11: * Add support for storing the current generated acls within a state file. This is useful to delete diff --git a/src/main/java/com/purbon/kafka/topology/AccessControlManager.java b/src/main/java/com/purbon/kafka/topology/AccessControlManager.java index dbd5fba7e..8829737ac 100644 --- a/src/main/java/com/purbon/kafka/topology/AccessControlManager.java +++ b/src/main/java/com/purbon/kafka/topology/AccessControlManager.java @@ -1,5 +1,7 @@ package com.purbon.kafka.topology; +import static com.purbon.kafka.topology.BuilderCLI.ALLOW_DELETE_OPTION; + import com.purbon.kafka.topology.model.DynamicUser; import com.purbon.kafka.topology.model.Topology; import com.purbon.kafka.topology.model.User; @@ -9,6 +11,7 @@ import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -18,23 +21,42 @@ public class AccessControlManager { private static final Logger LOGGER = LogManager.getLogger(AccessControlManager.class); + private final Boolean allowDelete; private AccessControlProvider controlProvider; private ClusterState clusterState; + private Map cliParams; public AccessControlManager(AccessControlProvider controlProvider) { - this(controlProvider, new ClusterState()); + this(controlProvider, new ClusterState(), new HashMap<>()); + } + + public AccessControlManager( + AccessControlProvider controlProvider, Map cliParams) { + this(controlProvider, new ClusterState(), cliParams); } public AccessControlManager(AccessControlProvider controlProvider, ClusterState clusterState) { + this(controlProvider, clusterState, new HashMap<>()); + } + + public AccessControlManager( + AccessControlProvider controlProvider, + ClusterState clusterState, + Map cliParams) { this.controlProvider = controlProvider; this.clusterState = clusterState; + this.cliParams = cliParams; + + this.allowDelete = Boolean.valueOf(cliParams.getOrDefault(ALLOW_DELETE_OPTION, "true")); } public void clearAcls() { try { clusterState.load(); - controlProvider.clearAcls(clusterState); + if (allowDelete) { + controlProvider.clearAcls(clusterState); + } } catch (Exception e) { LOGGER.error(e); } finally { diff --git a/src/main/java/com/purbon/kafka/topology/KafkaTopologyBuilder.java b/src/main/java/com/purbon/kafka/topology/KafkaTopologyBuilder.java index 2b4403cbf..5ef39f96e 100644 --- a/src/main/java/com/purbon/kafka/topology/KafkaTopologyBuilder.java +++ b/src/main/java/com/purbon/kafka/topology/KafkaTopologyBuilder.java @@ -31,11 +31,12 @@ public class KafkaTopologyBuilder { private final Properties properties; private final TopologyBuilderAdminClient builderAdminClient; private final boolean quiteOut; + private final Map cliParams; public KafkaTopologyBuilder(String topologyFile, Map cliParams) { this.topologyFile = topologyFile; this.parser = new TopologySerdes(); - + this.cliParams = cliParams; this.properties = buildProperties(cliParams); this.builderAdminClient = buildTopologyAdminClient(cliParams); this.quiteOut = Boolean.valueOf(cliParams.getOrDefault(QUITE_OPTION, "false")); @@ -46,9 +47,9 @@ public void run() throws IOException { Topology topology = parser.deserialise(new File(topologyFile)); AccessControlProvider aclsProvider = buildAccessControlProvider(); - AccessControlManager accessControlManager = new AccessControlManager(aclsProvider); + AccessControlManager accessControlManager = new AccessControlManager(aclsProvider, cliParams); - TopicManager topicManager = new TopicManager(builderAdminClient); + TopicManager topicManager = new TopicManager(builderAdminClient, cliParams); topicManager.sync(topology); accessControlManager.sync(topology); diff --git a/src/main/java/com/purbon/kafka/topology/TopicManager.java b/src/main/java/com/purbon/kafka/topology/TopicManager.java index 90f077012..b45a3c22b 100644 --- a/src/main/java/com/purbon/kafka/topology/TopicManager.java +++ b/src/main/java/com/purbon/kafka/topology/TopicManager.java @@ -1,12 +1,16 @@ package com.purbon.kafka.topology; +import static com.purbon.kafka.topology.BuilderCLI.ALLOW_DELETE_OPTION; + import com.purbon.kafka.topology.model.Project; import com.purbon.kafka.topology.model.Topic; import com.purbon.kafka.topology.model.Topology; import java.io.PrintStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -19,9 +23,17 @@ public class TopicManager { public static final String REPLICATION_FACTOR = "replication.factor"; private final TopologyBuilderAdminClient adminClient; + private final Map cliParams; + private final Boolean allowDelete; public TopicManager(TopologyBuilderAdminClient adminClient) { + this(adminClient, new HashMap<>()); + } + + public TopicManager(TopologyBuilderAdminClient adminClient, Map cliParams) { this.adminClient = adminClient; + this.cliParams = cliParams; + this.allowDelete = Boolean.valueOf(cliParams.getOrDefault(ALLOW_DELETE_OPTION, "true")); } public void sync(Topology topology) { @@ -44,17 +56,19 @@ public void sync(Topology topology) { updatedListOfTopics.add(fullTopicName); })); - // Handle topic delete: Topics in the initial list, but not present anymore after a - // full topic sync should be deleted - List topicsToBeDeleted = new ArrayList<>(); - listOfTopics.stream() - .forEach( - originalTopic -> { - if (!updatedListOfTopics.contains(originalTopic)) { - topicsToBeDeleted.add(originalTopic); - } - }); - adminClient.deleteTopics(topicsToBeDeleted); + if (allowDelete) { + // Handle topic delete: Topics in the initial list, but not present anymore after a + // full topic sync should be deleted + List topicsToBeDeleted = new ArrayList<>(); + listOfTopics.stream() + .forEach( + originalTopic -> { + if (!updatedListOfTopics.contains(originalTopic)) { + topicsToBeDeleted.add(originalTopic); + } + }); + adminClient.deleteTopics(topicsToBeDeleted); + } } public void syncTopic(Topic topic, String fullTopicName, Set listOfTopics) {