Skip to content

Commit

Permalink
Consumers: Unsubscribe topics from consumer group (#549)
Browse files Browse the repository at this point in the history
Co-authored-by: Roman Zabaluev <[email protected]>
  • Loading branch information
p-eye and Haarolean authored Oct 9, 2024
1 parent fbef485 commit f36c18d
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName,
.thenReturn(ResponseEntity.ok().build());
}

@Override
public Mono<ResponseEntity<Void>> deleteConsumerGroupOffsets(String clusterName,
String groupId,
String topicName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroupActions(groupId, RESET_OFFSETS)
.topicActions(topicName, TopicAction.VIEW)
.operationName("deleteConsumerGroupOffsets")
.build();

return validateAccess(context)
.then(consumerGroupService.deleteConsumerGroupOffset(getCluster(clusterName), groupId, topicName))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

@Override
public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clusterName,
String consumerGroupId,
Expand Down
20 changes: 14 additions & 6 deletions api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,13 @@ private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdmi
}


private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac,
List<ConsumerGroupListing> groups,
Comparator<GroupWithDescr> comparator,
int pageNum,
int perPage,
SortOrderDTO sortOrderDto) {
private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(
ReactiveAdminClient ac,
List<ConsumerGroupListing> groups,
Comparator<GroupWithDescr> comparator,
int pageNum,
int perPage,
SortOrderDTO sortOrderDto) {
var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();

return ac.describeConsumerGroups(groupNames)
Expand Down Expand Up @@ -247,6 +248,13 @@ public Mono<Void> deleteConsumerGroupById(KafkaCluster cluster,
.flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
}

public Mono<Void> deleteConsumerGroupOffset(KafkaCluster cluster,
String groupId,
String topicName) {
return adminClientService.get(cluster)
.flatMap(adminClient -> adminClient.deleteConsumerGroupOffsets(groupId, topicName));
}

public EnhancedConsumer createConsumer(KafkaCluster cluster) {
return createConsumer(cluster, Map.of());
}
Expand Down
22 changes: 22 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
Expand Down Expand Up @@ -436,6 +437,27 @@ public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
}

public Mono<Void> deleteConsumerGroupOffsets(String groupId, String topicName) {
return listConsumerGroupOffsets(List.of(groupId), null)
.flatMap(table -> {
// filter TopicPartitions by topicName
Set<TopicPartition> partitions = table.row(groupId).keySet().stream()
.filter(tp -> tp.topic().equals(topicName))
.collect(Collectors.toSet());
// check if partitions have no committed offsets
return partitions.isEmpty()
? Mono.error(new NotFoundException("The topic or partition is unknown"))
// call deleteConsumerGroupOffsets
: toMono(client.deleteConsumerGroupOffsets(groupId, partitions).all());
})
.onErrorResume(GroupIdNotFoundException.class,
th -> Mono.error(new NotFoundException("The group id does not exist")))
.onErrorResume(UnknownTopicOrPartitionException.class,
th -> Mono.error(new NotFoundException("The topic or partition is unknown")))
.onErrorResume(GroupSubscribedToTopicException.class,
th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
}

public Mono<Void> createTopic(String name,
int numPartitions,
@Nullable Integer replicationFactor,
Expand Down
67 changes: 67 additions & 0 deletions api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import io.kafbat.ui.model.ConsumerGroupDTO;
import io.kafbat.ui.model.ConsumerGroupsPageResponseDTO;
import io.kafbat.ui.producer.KafkaTestProducer;
import java.io.Closeable;
import java.time.Duration;
import java.util.Comparator;
Expand All @@ -22,6 +23,8 @@
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
Expand All @@ -31,12 +34,76 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
@Test
void shouldNotFoundWhenNoSuchConsumerGroupId() {
String groupId = "groupA";
String topicName = "topicX";

webTestClient
.delete()
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
.exchange()
.expectStatus()
.isNotFound();

webTestClient
.delete()
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, topicName)
.exchange()
.expectStatus()
.isNotFound();
}

@Test
void shouldNotFoundWhenNoSuchTopic() {
String topicName = createTopicWithRandomName();
String topicNameUnSubscribed = "topicX";

//Create a consumer and subscribe to the topic
String groupId = UUID.randomUUID().toString();
try (val consumer = createTestConsumerWithGroupId(groupId)) {
consumer.subscribe(List.of(topicName));
consumer.poll(Duration.ofMillis(100));

webTestClient
.delete()
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId,
topicNameUnSubscribed)
.exchange()
.expectStatus()
.isNotFound();
}
}

@Test
void shouldOkWhenConsumerGroupIsNotActiveAndPartitionOffsetExists() {
String topicName = createTopicWithRandomName();

//Create a consumer and subscribe to the topic
String groupId = UUID.randomUUID().toString();

try (KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
Flux.fromStream(
Stream.of("one", "two", "three", "four")
.map(value -> Mono.fromFuture(producer.send(topicName, value)))
).blockLast();
} catch (Throwable e) {
log.error("Error on sending", e);
throw new RuntimeException(e);
}

try (val consumer = createTestConsumerWithGroupId(groupId)) {
consumer.subscribe(List.of(topicName));
consumer.poll(Duration.ofMillis(100));

//Stop consumers to delete consumer offset from the topic
consumer.pause(consumer.assignment());
}

//Delete the consumer offset when it's INACTIVE and check
webTestClient
.delete()
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, topicName)
.exchange()
.expectStatus()
.isOk();
}

@Test
Expand Down
26 changes: 26 additions & 0 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,32 @@ paths:
200:
description: OK

/api/clusters/{clusterName}/consumer-groups/{id}/topics/{topicName}:
delete:
tags:
- Consumer Groups
summary: delete consumer group offsets
operationId: deleteConsumerGroupOffsets
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: id
in: path
required: true
schema:
type: string
- name: topicName
in: path
required: true
schema:
type: string
responses:
200:
description: OK

/api/clusters/{clusterName}/schemas:
post:
tags:
Expand Down
36 changes: 34 additions & 2 deletions frontend/src/components/ConsumerGroups/Details/ListItem.tsx
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import React from 'react';
import { ConsumerGroupTopicPartition } from 'generated-sources';
import {
Action,
ConsumerGroupTopicPartition,
ResourceType,
} from 'generated-sources';
import { Link } from 'react-router-dom';
import { ClusterName } from 'lib/interfaces/cluster';
import { clusterTopicPath } from 'lib/paths';
import { ClusterGroupParam, clusterTopicPath } from 'lib/paths';
import { useDeleteConsumerGroupOffsetsMutation } from 'lib/hooks/api/consumers';
import useAppParams from 'lib/hooks/useAppParams';
import { Dropdown } from 'components/common/Dropdown';
import { ActionDropdownItem } from 'components/common/ActionComponent';
import MessageToggleIcon from 'components/common/Icons/MessageToggleIcon';
import IconButtonWrapper from 'components/common/Icons/IconButtonWrapper';
import { TableKeyLink } from 'components/common/table/Table/TableKeyLink.styled';
Expand All @@ -18,6 +26,9 @@ interface Props {

const ListItem: React.FC<Props> = ({ clusterName, name, consumers }) => {
const [isOpen, setIsOpen] = React.useState(false);
const consumerProps = useAppParams<ClusterGroupParam>();
const deleteOffsetMutation =
useDeleteConsumerGroupOffsetsMutation(consumerProps);

const getTotalconsumerLag = () => {
let count = 0;
Expand All @@ -27,6 +38,11 @@ const ListItem: React.FC<Props> = ({ clusterName, name, consumers }) => {
return count;
};

const deleteOffsetHandler = (topicName?: string) => {
if (topicName === undefined) return;
deleteOffsetMutation.mutateAsync(topicName);
};

return (
<>
<tr>
Expand All @@ -41,6 +57,22 @@ const ListItem: React.FC<Props> = ({ clusterName, name, consumers }) => {
</FlexWrapper>
</td>
<td>{getTotalconsumerLag()}</td>
<td>
<Dropdown>
<ActionDropdownItem
onClick={() => deleteOffsetHandler(name)}
danger
confirm="Are you sure you want to delete offsets from the topic?"
permission={{
resource: ResourceType.CONSUMER,
action: Action.RESET_OFFSETS,
value: consumerProps.consumerGroupID,
}}
>
<span>Delete offsets</span>
</ActionDropdownItem>
</Dropdown>
</td>
</tr>
{isOpen && <TopicContents consumers={consumers} />}
</>
Expand Down
27 changes: 27 additions & 0 deletions frontend/src/lib/hooks/api/consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,30 @@ export const useResetConsumerGroupOffsetsMutation = ({
}
);
};

export const useDeleteConsumerGroupOffsetsMutation = ({
clusterName,
consumerGroupID,
}: UseConsumerGroupDetailsProps) => {
const queryClient = useQueryClient();
return useMutation(
(topicName: string) =>
api.deleteConsumerGroupOffsets({
clusterName,
id: consumerGroupID,
topicName,
}),
{
onSuccess: (_, topicName) => {
showSuccessAlert({
message: `Consumer ${consumerGroupID} group offsets in topic ${topicName} deleted`,
});
queryClient.invalidateQueries([
'clusters',
clusterName,
'consumerGroups',
]);
},
}
);
};

0 comments on commit f36c18d

Please sign in to comment.