Skip to content

Commit

Permalink
rebird the option to disable deletes when not required anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
purbon committed May 30, 2020
1 parent 713f5de commit fdb6dae
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 17 deletions.
3 changes: 2 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
latest:
* Add support for platform wide acls for schema registry in the topology description file.
* Add support for platform wide acls for schema registry in the topology description file.
* Rebird the option to disable deletes when not required anymore.

v0.11:
* Add support for storing the current generated acls within a state file. This is useful to delete
Expand Down
26 changes: 24 additions & 2 deletions src/main/java/com/purbon/kafka/topology/AccessControlManager.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.purbon.kafka.topology;

import static com.purbon.kafka.topology.BuilderCLI.ALLOW_DELETE_OPTION;

import com.purbon.kafka.topology.model.DynamicUser;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.model.User;
Expand All @@ -9,6 +11,7 @@
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -18,23 +21,42 @@
public class AccessControlManager {

private static final Logger LOGGER = LogManager.getLogger(AccessControlManager.class);
private final Boolean allowDelete;

private AccessControlProvider controlProvider;
private ClusterState clusterState;
private Map<String, String> cliParams;

public AccessControlManager(AccessControlProvider controlProvider) {
this(controlProvider, new ClusterState());
this(controlProvider, new ClusterState(), new HashMap<>());
}

public AccessControlManager(
AccessControlProvider controlProvider, Map<String, String> cliParams) {
this(controlProvider, new ClusterState(), cliParams);
}

public AccessControlManager(AccessControlProvider controlProvider, ClusterState clusterState) {
this(controlProvider, clusterState, new HashMap<>());
}

public AccessControlManager(
AccessControlProvider controlProvider,
ClusterState clusterState,
Map<String, String> cliParams) {
this.controlProvider = controlProvider;
this.clusterState = clusterState;
this.cliParams = cliParams;

this.allowDelete = Boolean.valueOf(cliParams.getOrDefault(ALLOW_DELETE_OPTION, "true"));
}

public void clearAcls() {
try {
clusterState.load();
controlProvider.clearAcls(clusterState);
if (allowDelete) {
controlProvider.clearAcls(clusterState);
}
} catch (Exception e) {
LOGGER.error(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ public class KafkaTopologyBuilder {
private final Properties properties;
private final TopologyBuilderAdminClient builderAdminClient;
private final boolean quiteOut;
private final Map<String, String> cliParams;

public KafkaTopologyBuilder(String topologyFile, Map<String, String> cliParams) {
this.topologyFile = topologyFile;
this.parser = new TopologySerdes();

this.cliParams = cliParams;
this.properties = buildProperties(cliParams);
this.builderAdminClient = buildTopologyAdminClient(cliParams);
this.quiteOut = Boolean.valueOf(cliParams.getOrDefault(QUITE_OPTION, "false"));
Expand All @@ -46,9 +47,9 @@ public void run() throws IOException {
Topology topology = parser.deserialise(new File(topologyFile));

AccessControlProvider aclsProvider = buildAccessControlProvider();
AccessControlManager accessControlManager = new AccessControlManager(aclsProvider);
AccessControlManager accessControlManager = new AccessControlManager(aclsProvider, cliParams);

TopicManager topicManager = new TopicManager(builderAdminClient);
TopicManager topicManager = new TopicManager(builderAdminClient, cliParams);

topicManager.sync(topology);
accessControlManager.sync(topology);
Expand Down
36 changes: 25 additions & 11 deletions src/main/java/com/purbon/kafka/topology/TopicManager.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.purbon.kafka.topology;

import static com.purbon.kafka.topology.BuilderCLI.ALLOW_DELETE_OPTION;

import com.purbon.kafka.topology.model.Project;
import com.purbon.kafka.topology.model.Topic;
import com.purbon.kafka.topology.model.Topology;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -19,9 +23,17 @@ public class TopicManager {
public static final String REPLICATION_FACTOR = "replication.factor";

private final TopologyBuilderAdminClient adminClient;
private final Map<String, String> cliParams;
private final Boolean allowDelete;

public TopicManager(TopologyBuilderAdminClient adminClient) {
this(adminClient, new HashMap<>());
}

public TopicManager(TopologyBuilderAdminClient adminClient, Map<String, String> cliParams) {
this.adminClient = adminClient;
this.cliParams = cliParams;
this.allowDelete = Boolean.valueOf(cliParams.getOrDefault(ALLOW_DELETE_OPTION, "true"));
}

public void sync(Topology topology) {
Expand All @@ -44,17 +56,19 @@ public void sync(Topology topology) {
updatedListOfTopics.add(fullTopicName);
}));

// Handle topic delete: Topics in the initial list, but not present anymore after a
// full topic sync should be deleted
List<String> topicsToBeDeleted = new ArrayList<>();
listOfTopics.stream()
.forEach(
originalTopic -> {
if (!updatedListOfTopics.contains(originalTopic)) {
topicsToBeDeleted.add(originalTopic);
}
});
adminClient.deleteTopics(topicsToBeDeleted);
if (allowDelete) {
// Handle topic delete: Topics in the initial list, but not present anymore after a
// full topic sync should be deleted
List<String> topicsToBeDeleted = new ArrayList<>();
listOfTopics.stream()
.forEach(
originalTopic -> {
if (!updatedListOfTopics.contains(originalTopic)) {
topicsToBeDeleted.add(originalTopic);
}
});
adminClient.deleteTopics(topicsToBeDeleted);
}
}

public void syncTopic(Topic topic, String fullTopicName, Set<String> listOfTopics) {
Expand Down

0 comments on commit fdb6dae

Please sign in to comment.