Skip to content

Commit

Permalink
partitions listeners without pid
Browse files Browse the repository at this point in the history
  • Loading branch information
gelivisg committed Feb 11, 2021
1 parent fccc9ae commit 189ed1b
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions lib/kaffe/consumer_group/subscriber/group_member.ex
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ defmodule Kaffe.GroupMember do
# configuration.
def handle_cast({:assignments_received, gen_id, assignments}, state) do
Logger.info("event#assignments_received=#{name(state.subscriber_name, state.topic)} generation_id=#{gen_id}")

partitions_listener().assigned(assignments)
Process.send_after(self(), {:allocate_subscribers, gen_id, assignments}, rebalance_delay())
{:noreply, %{state | current_gen_id: gen_id}}
end

def handle_cast({:assignments_revoked}, state) do
Logger.info("event#assignments_revoked=#{name(state.subscriber_name, state.topic)}")

partitions_listener().revoked()
stop_subscribers(state.subscribers)
{:noreply, %{state | :subscribers => []}}
end
Expand Down Expand Up @@ -207,6 +207,10 @@ defmodule Kaffe.GroupMember do
Application.get_env(:kaffe, :subscriber_mod, Kaffe.Subscriber)
end

defp partitions_listener do
Application.get_env(:kaffe, :partitions_listener)
end

defp name(subscriber_name, topic) do
:"#{__MODULE__}.#{subscriber_name}.#{topic}"
end
Expand Down

0 comments on commit 189ed1b

Please sign in to comment.