Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add a consumer group role configuration #529

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ private List<AclBindingsResult> buildProjectAclBindings(Topology topology) {
connector
.getConnectors()
.ifPresent(
(list) -> aclBindingsResults.add(
new ConnectorAuthorizationAclBindingsBuilder(bindingsBuilder, connector)
.getAclBindings()));
(list) ->
aclBindingsResults.add(
new ConnectorAuthorizationAclBindingsBuilder(bindingsBuilder, connector)
.getAclBindings()));
}

for (Schemas schemaAuthorization : project.getSchemas()) {
Expand Down Expand Up @@ -307,7 +308,6 @@ private List<Action> buildUpdateBindingsActions(
return updateActions;
}


// Sync platform relevant Access Control List.
private List<AclBindingsResult> buildPlatformLevelActions(final Topology topology) {
List<AclBindingsResult> aclBindingsResults = new ArrayList<>();
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;

public class Configuration {

Expand Down Expand Up @@ -153,15 +152,16 @@ private void validateGeneralConfiguration(Topology topology) throws Configuratio
}

Arrays.asList(TOPIC_MANAGED_PREFIXES, GROUP_MANAGED_PREFIXES, SERVICE_ACCOUNT_MANAGED_PREFIXES)
.forEach(this::validateManagedPrefixes);
.forEach(this::validateManagedPrefixes);
}

private void validateManagedPrefixes(String key) {
List<String> managedTopicPrefixes = config.getStringList(key);
if (managedTopicPrefixes.contains("")) {
throw new ConfigurationException(
String.format("The config key %s, contains empty strings, this is not possible, please review", key)
);
String.format(
"The config key %s, contains empty strings, this is not possible, please review",
key));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.purbon.kafka.topology.model.users;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.purbon.kafka.topology.model.User;
import com.purbon.kafka.topology.roles.rbac.RBACPredefinedRoles;
import java.util.Objects;
import java.util.Optional;

public class Consumer extends User {

private Optional<String> group;

@JsonProperty(value = "group-role")
private String groupRole = RBACPredefinedRoles.RESOURCE_OWNER;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This model is used for both RBAC and ACLs permissions and might be even used later on for other integrations (I know some people who tried to do it with Ranger, for example).

I think we should not be using here:

  • The variable name as group-role, as with ACLs there are no roles per se.
  • The default value as RESOURCE_OWNER as it is Confluent RBAC dependant.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I will try to see how to deal with this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add a level called "rbac" in the configuration of a consumer, would it be okay for you ?

consumers:
     - principal: "Group:CONSUMER_READ"
       rbac:
           role: DeveloperRead
     - principal: "Group:CONSUMER_VIEW"
       rbac:
            role: DeveloperRead

We should do the same in the schema configuration I guess, even if currently, ACLs are only available with RBAC for schemas.
WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@purbon kind reminder on the proposal :)


public Consumer() {
super();
group = Optional.empty();
Expand All @@ -34,6 +39,14 @@ public void setGroup(Optional<String> group) {
this.group = group;
}

public String getGroupRole() {
return groupRole;
}

public void setGroupRole(String groupRole) {
this.groupRole = groupRole;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -44,11 +57,12 @@ public boolean equals(Object o) {
}
Consumer consumer = (Consumer) o;
return getPrincipal().equals(consumer.getPrincipal())
&& groupRole.equals(consumer.getGroupRole())
&& groupString().equals(consumer.groupString());
}

@Override
public int hashCode() {
return Objects.hash(groupString(), getPrincipal());
return Objects.hash(groupString(), getPrincipal(), groupRole);
}
}
136 changes: 67 additions & 69 deletions src/main/java/com/purbon/kafka/topology/roles/ResourceFilter.java
Original file line number Diff line number Diff line change
@@ -1,82 +1,80 @@
package com.purbon.kafka.topology.roles;

import com.purbon.kafka.topology.AccessControlManager;
import com.purbon.kafka.topology.Configuration;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

public class ResourceFilter {

private static final Logger LOGGER = LogManager.getLogger(ResourceFilter.class);

private final List<String> managedServiceAccountPrefixes;
private final List<String> managedTopicPrefixes;
private final List<String> managedGroupPrefixes;

public ResourceFilter(Configuration config) {
this.managedServiceAccountPrefixes = config.getServiceAccountManagedPrefixes();
this.managedTopicPrefixes = config.getTopicManagedPrefixes();
this.managedGroupPrefixes = config.getGroupManagedPrefixes();
private static final Logger LOGGER = LogManager.getLogger(ResourceFilter.class);

private final List<String> managedServiceAccountPrefixes;
private final List<String> managedTopicPrefixes;
private final List<String> managedGroupPrefixes;

public ResourceFilter(Configuration config) {
this.managedServiceAccountPrefixes = config.getServiceAccountManagedPrefixes();
this.managedTopicPrefixes = config.getTopicManagedPrefixes();
this.managedGroupPrefixes = config.getGroupManagedPrefixes();
}

public boolean matchesManagedPrefixList(TopologyAclBinding topologyAclBinding) {
String resourceName = topologyAclBinding.getResourceName();
String principle = topologyAclBinding.getPrincipal();
// For global wild cards ACL's we manage only if we manage the service account/principle,
// regardless. Filtering by service account will always take precedence if defined
if (haveServiceAccountPrefixFilters() || resourceName.equals("*")) {
if (resourceName.equals("*")) {
return matchesServiceAccountPrefixList(principle);
} else {
return matchesServiceAccountPrefixList(principle)
&& matchesTopicOrGroupPrefix(topologyAclBinding, resourceName);
}
} else if (haveTopicNamePrefixFilter() || haveGroupNamePrefixFilter()) {
return matchesTopicOrGroupPrefix(topologyAclBinding, resourceName);
}

public boolean matchesManagedPrefixList(TopologyAclBinding topologyAclBinding) {
String resourceName = topologyAclBinding.getResourceName();
String principle = topologyAclBinding.getPrincipal();
// For global wild cards ACL's we manage only if we manage the service account/principle,
// regardless. Filtering by service account will always take precedence if defined
if (haveServiceAccountPrefixFilters() || resourceName.equals("*")) {
if (resourceName.equals("*")) {
return matchesServiceAccountPrefixList(principle);
} else {
return matchesServiceAccountPrefixList(principle)
&& matchesTopicOrGroupPrefix(topologyAclBinding, resourceName);
}
} else if (haveTopicNamePrefixFilter() || haveGroupNamePrefixFilter()) {
return matchesTopicOrGroupPrefix(topologyAclBinding, resourceName);
}

return true; // should include everything if not properly excluded earlier.
}

private boolean matchesTopicOrGroupPrefix(
TopologyAclBinding topologyAclBinding, String resourceName) {
if ("TOPIC".equalsIgnoreCase(topologyAclBinding.getResourceType())) {
return matchesTopicPrefixList(resourceName);
} else if ("GROUP".equalsIgnoreCase(topologyAclBinding.getResourceType())) {
return matchesGroupPrefixList(resourceName);
}
return false;
}

private boolean matchesTopicPrefixList(String topic) {
return matchesPrefix(managedTopicPrefixes, topic, "Topic");
}

private boolean matchesGroupPrefixList(String group) {
return matchesPrefix(managedGroupPrefixes, group, "Group");
}

private boolean matchesServiceAccountPrefixList(String principal) {
return matchesPrefix(managedServiceAccountPrefixes, principal, "Principal");
}

private boolean haveServiceAccountPrefixFilters() {
return managedServiceAccountPrefixes.size() != 0;
}

private boolean haveTopicNamePrefixFilter() {
return managedTopicPrefixes.size() != 0;
}

private boolean haveGroupNamePrefixFilter() {
return managedGroupPrefixes.size() != 0;
}
return true; // should include everything if not properly excluded earlier.
}

private boolean matchesPrefix(List<String> prefixes, String item, String type) {
boolean matches = prefixes.size() == 0 || prefixes.stream().anyMatch(item::startsWith);
LOGGER.debug(String.format("%s %s matches %s with $s", type, item, matches, prefixes));
return matches;
private boolean matchesTopicOrGroupPrefix(
TopologyAclBinding topologyAclBinding, String resourceName) {
if ("TOPIC".equalsIgnoreCase(topologyAclBinding.getResourceType())) {
return matchesTopicPrefixList(resourceName);
} else if ("GROUP".equalsIgnoreCase(topologyAclBinding.getResourceType())) {
return matchesGroupPrefixList(resourceName);
}
return false;
}

private boolean matchesTopicPrefixList(String topic) {
return matchesPrefix(managedTopicPrefixes, topic, "Topic");
}

private boolean matchesGroupPrefixList(String group) {
return matchesPrefix(managedGroupPrefixes, group, "Group");
}

private boolean matchesServiceAccountPrefixList(String principal) {
return matchesPrefix(managedServiceAccountPrefixes, principal, "Principal");
}

private boolean haveServiceAccountPrefixFilters() {
return managedServiceAccountPrefixes.size() != 0;
}

private boolean haveTopicNamePrefixFilter() {
return managedTopicPrefixes.size() != 0;
}

private boolean haveGroupNamePrefixFilter() {
return managedGroupPrefixes.size() != 0;
}

private boolean matchesPrefix(List<String> prefixes, String item, String type) {
boolean matches = prefixes.size() == 0 || prefixes.stream().anyMatch(item::startsWith);
LOGGER.debug(String.format("%s %s matches %s with $s", type, item, matches, prefixes));
return matches;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public List<TopologyAclBinding> buildBindingsForConsumers(
binding =
apiClient.bind(
consumer.getPrincipal(),
RESOURCE_OWNER,
consumer.getGroupRole(),
evaluateResourcePattern(consumer.groupString()),
"Group",
evaluateResourcePatternType(consumer.groupString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ public void shouldFetchAConfigSubsetSuccessfully() {
}

@Test
public void nonEmptyTopicManagedPrefixConfigsShouldValidateSuccessfully() throws ConfigurationException {
public void nonEmptyTopicManagedPrefixConfigsShouldValidateSuccessfully()
throws ConfigurationException {
var topology = TestTopologyBuilder.createProject().buildTopology();
props.put(TOPIC_MANAGED_PREFIXES + ".0", "foo");
Configuration config = new Configuration(cliOps, props);
Expand Down
Loading