Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove group member, subscriber and workers when unsubscribing #96

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion lib/kaffe/consumer_group/group_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ defmodule Kaffe.GroupManager do
GenServer.call(name(), {:subscribe_to_topics, topics})
end

@doc """
Dynamically unsubscribe topics.
"""
def unsubscribe_from_topics(topics) do
GenServer.call(name(), {:unsubscribe_from_topics, topics})
end

@doc """
List of currently subscribed topics.
"""
Expand All @@ -57,6 +64,7 @@ defmodule Kaffe.GroupManager do
Logger.info("event#startup=#{__MODULE__} name=#{name()}")

config = Kaffe.Config.Consumer.configuration()

case kafka().start_client(config.endpoints, config.subscriber_name, config.consumer_config) do
:ok ->
:ok
Expand Down Expand Up @@ -85,7 +93,9 @@ defmodule Kaffe.GroupManager do
def handle_cast({:start_group_members}, state) do
Logger.debug("Starting worker supervisors for group manager: #{inspect(self())}")

{:ok, worker_supervisor_pid} = group_member_supervisor().start_worker_supervisor(state.supervisor_pid, state.subscriber_name)
{:ok, worker_supervisor_pid} =
group_member_supervisor().start_worker_supervisor(state.supervisor_pid, state.subscriber_name)

{:ok, worker_manager_pid} = worker_supervisor().start_worker_manager(worker_supervisor_pid, state.subscriber_name)

state = %State{state | worker_manager_pid: worker_manager_pid}
Expand All @@ -106,6 +116,20 @@ defmodule Kaffe.GroupManager do
{:reply, {:ok, new_topics}, %State{state | topics: state.topics ++ new_topics}}
end

@doc """
Unsubscribe from the given set of topics.
"""
def handle_call({:unsubscribe_from_topics, requested_topics}, _from, %State{topics: topics} = state) do
old_topics =
requested_topics
|> Enum.into(MapSet.new())
|> MapSet.intersection(Enum.into(topics, MapSet.new()))
|> MapSet.to_list()

unsubscribe_from_topics(state, old_topics)
{:reply, {:ok, old_topics}, %State{state | topics: topics -- old_topics}}
end

@doc """
List the currently subscribed topics
"""
Expand Down Expand Up @@ -133,6 +157,21 @@ defmodule Kaffe.GroupManager do
)
end

defp unsubscribe_from_topics(state, topics) do
for topic <- topics do
Logger.debug("Stopping group member for topic: #{topic}")
:ok = unsubscribe_from_topic(state, topic)
end
end

defp unsubscribe_from_topic(state, topic) do
group_member_supervisor().stop_group_member(
state.supervisor_pid,
state.subscriber_name,
topic
)
end

defp kafka do
Application.get_env(:kaffe, :kafka_mod, :brod)
end
Expand Down
12 changes: 12 additions & 0 deletions lib/kaffe/consumer_group/group_member_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ defmodule Kaffe.GroupMemberSupervisor do
)
end

def stop_group_member(
supervisor_pid,
subscriber_name,
topic
) do
group_member_id = :"group_member_#{subscriber_name}_#{topic}"
:ok = Kaffe.GroupMember.stop_subscribers(subscriber_name, topic)
:ok = Supervisor.terminate_child(supervisor_pid, group_member_id)
:ok = Supervisor.delete_child(supervisor_pid, group_member_id)
:ok
end

def init(:ok) do
Logger.info("event#starting=#{__MODULE__}")

Expand Down
25 changes: 20 additions & 5 deletions lib/kaffe/consumer_group/subscriber/group_member.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ defmodule Kaffe.GroupMember do
GenServer.cast(pid, {:assignments_revoked})
end

def stop_subscribers(subscriber_name, topic) do
case Process.whereis(name(subscriber_name, topic)) do
nil ->
{:error, :not_found}

pid when is_pid(pid) ->
GenServer.call(pid, :stop_subscribers)
end
end

## ==========================================================================
## Callbacks
## ==========================================================================
Expand All @@ -89,12 +99,10 @@ defmodule Kaffe.GroupMember do
self()
)

Logger.info(
"event#init=#{__MODULE__}
Logger.info("event#init=#{__MODULE__}
group_coordinator=#{inspect(pid)}
subscriber_name=#{subscriber_name}
consumer_group=#{consumer_group}"
)
consumer_group=#{consumer_group}")

{:ok,
%State{
Expand Down Expand Up @@ -124,7 +132,8 @@ defmodule Kaffe.GroupMember do
end

# If we're not at the latest generation, discard the assignment for whatever is next.
def handle_info({:allocate_subscribers, gen_id, _assignments}, %{current_gen_id: current_gen_id} = state) when gen_id < current_gen_id do
def handle_info({:allocate_subscribers, gen_id, _assignments}, %{current_gen_id: current_gen_id} = state)
when gen_id < current_gen_id do
Logger.debug("Discarding old generation #{gen_id} for current generation: #{current_gen_id}")
{:noreply, state}
end
Expand Down Expand Up @@ -163,6 +172,11 @@ defmodule Kaffe.GroupMember do
{:noreply, %{state | :subscribers => subscribers}}
end

def handle_call(:stop_subscribers, _from, state) do
stop_subscribers(state.subscribers)
{:reply, :ok, %{state | subscribers: []}}
end

## ==========================================================================
## Helpers
## ==========================================================================
Expand All @@ -175,6 +189,7 @@ defmodule Kaffe.GroupMember do
defp compute_offset(:undefined, configured_offset) do
[begin_offset: configured_offset]
end

defp compute_offset(offset, _configured_offset) do
[begin_offset: offset]
end
Expand Down
14 changes: 13 additions & 1 deletion lib/kaffe/consumer_group/subscriber/subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ defmodule Kaffe.Subscriber do
GenServer.stop(subscriber_pid)
end

def status(subscriber_pid) do
GenServer.call(subscriber_pid, :status)
end

def commit_offsets(subscriber_pid, topic, partition, generation_id, offset) do
GenServer.cast(subscriber_pid, {:commit_offsets, topic, partition, generation_id, offset})
end
Expand Down Expand Up @@ -154,7 +158,11 @@ defmodule Kaffe.Subscriber do
end

def handle_cast({:commit_offsets, topic, partition, generation_id, offset}, state) do
Logger.debug("event#commit_offsets topic=#{state.topic} partition=#{state.partition} offset=#{offset} generation=#{generation_id}")
Logger.debug(
"event#commit_offsets topic=#{state.topic} partition=#{state.partition} offset=#{offset} generation=#{
generation_id
}"
)

# Is this the ack we're looking for?
^topic = state.topic
Expand Down Expand Up @@ -182,6 +190,10 @@ defmodule Kaffe.Subscriber do
{:noreply, state}
end

def handle_call(:status, _, state) do
{:reply, Map.take(state, [:subscriber_name, :topic, :partition]), state}
end

defp handle_subscribe({:ok, subscriber_pid}, state) do
Logger.debug("Subscribe success: #{inspect(subscriber_pid)}")
Process.monitor(subscriber_pid)
Expand Down
26 changes: 26 additions & 0 deletions lib/kaffe/consumer_group/worker/worker_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ defmodule Kaffe.WorkerManager do
{:reply, worker_pid, state}
end

def handle_call({:stop_worker_for, topic, partition}, _from, state) do
Logger.debug("Stopping worker: #{topic} / #{partition}")

reply =
worker_name(topic, partition)
|> stop_worker(state)

{:reply, reply, state}
end

## ==========================================================================
## Helpers
## ==========================================================================
Expand All @@ -81,11 +91,27 @@ defmodule Kaffe.WorkerManager do
|> capture_worker(worker_name, state)
end

defp stop_worker(worker_name, state) do
Logger.debug("Stopping worker: #{worker_name}")

worker_supervisor().stop_worker(
state.supervisor_pid,
state.subscriber_name,
worker_name
)
|> forget_worker(worker_name, state)
end

defp capture_worker({:ok, pid}, worker_name, %{worker_table: worker_table}) do
true = :ets.insert(worker_table, {worker_name, pid})
pid
end

defp forget_worker(:ok, worker_name, %{worker_table: worker_table}) do
true = :ets.delete(worker_table, worker_name)
:ok
end

def worker_name(topic, partition) do
case worker_allocation_strategy() do
:worker_per_partition -> :"worker_#{partition}"
Expand Down
7 changes: 7 additions & 0 deletions lib/kaffe/consumer_group/worker/worker_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ defmodule Kaffe.WorkerSupervisor do
)
end

def stop_worker(pid, subscriber_name, worker_name) do
Logger.debug("Stopping worker: #{worker_name}")
worker_id = :"worker_#{subscriber_name}_#{worker_name}"
:ok = Supervisor.terminate_child(pid, worker_id)
:ok = Supervisor.delete_child(pid, worker_id)
end

def init(subscriber_name) do
Logger.info("event#startup=#{__MODULE__} subscriber_name=#{subscriber_name}")

Expand Down