From ef8750f097f9e13ccc0f875e3416632292f96bfa Mon Sep 17 00:00:00 2001 From: Frank McGeough Date: Fri, 22 Nov 2024 22:29:38 -0500 Subject: [PATCH 1/3] Fixes #607. Terminate Group Coordinator if MemberPid is no longer alive when assignments_revoked, assign_partitions or assignments_received must be called. --- src/brod_group_coordinator.erl | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index 50884d1a..aa2fdbac 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -462,6 +462,13 @@ terminate(Reason, #state{ connection = Connection %%%_* Internal Functions ======================================================= +-spec ensure_member_pid_alive(pid()) -> ok + ensure_member_pid_alive(MemberPid) -> + case brod_utils:is_pid_alive(MemberPid) of + true -> ok + false -> exit(member_pid_shutdown) + end. + -spec discover_coordinator(state()) -> {ok, state()}. discover_coordinator(#state{ client = Client , connection = Connection0 @@ -499,6 +506,7 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds log(State0, info, "re-joining group, reason:~p", [Reason]), %% 1. unsubscribe all currently assigned partitions + ok = ensure_member_pid_alive(MemberPid), ok = MemberModule:assignments_revoked(MemberPid), %% 2. some brod_group_member implementations may wait for messages @@ -674,6 +682,7 @@ sync_group(#state{ groupId = GroupId %% get my partition assignments Assignment = kpro:find(assignment, RspBody), TopicAssignments = get_topic_assignments(State, Assignment), + ok = ensure_member_pid_alive(MemberPid), ok = MemberModule:assignments_received(MemberPid, MemberId, GenerationId, TopicAssignments), NewState = State#state{is_in_group = true}, @@ -834,6 +843,7 @@ assign_partitions(State) when ?IS_LEADER(State) -> Assignments = case Strategy =:= callback_implemented of true -> + ok = ensure_member_pid_alive(MemberPid), MemberModule:assign_partitions(MemberPid, Members, AllPartitions); false -> do_assign_partitions(Strategy, Members, AllPartitions) From 90e9acaa765c4cfec849248d35b2413633972c4c Mon Sep 17 00:00:00 2001 From: Frank McGeough Date: Mon, 25 Nov 2024 13:13:50 -0500 Subject: [PATCH 2/3] Fix link, dialyzer issues --- src/brod_group_coordinator.erl | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index aa2fdbac..e419f6c9 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -462,12 +462,14 @@ terminate(Reason, #state{ connection = Connection %%%_* Internal Functions ======================================================= --spec ensure_member_pid_alive(pid()) -> ok - ensure_member_pid_alive(MemberPid) -> - case brod_utils:is_pid_alive(MemberPid) of - true -> ok - false -> exit(member_pid_shutdown) - end. +% Exit the process if MemberPid is set to pid but it's not +% currently alive +-spec ensure_member_pid_alive(pid()) -> ok. +ensure_member_pid_alive(MemberPid) -> + case brod_utils:is_pid_alive(MemberPid) of + true -> ok; + false -> exit(member_pid_shutdown) +end. -spec discover_coordinator(state()) -> {ok, state()}. discover_coordinator(#state{ client = Client @@ -506,7 +508,7 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds log(State0, info, "re-joining group, reason:~p", [Reason]), %% 1. unsubscribe all currently assigned partitions - ok = ensure_member_pid_alive(MemberPid), + ok = ensure_member_pid_alive(MemberPid), ok = MemberModule:assignments_revoked(MemberPid), %% 2. some brod_group_member implementations may wait for messages From 18c0dab4118c182a354414ea0bc1d5f3654a5893 Mon Sep 17 00:00:00 2001 From: Frank McGeough Date: Sun, 8 Dec 2024 22:29:49 -0500 Subject: [PATCH 3/3] catch noproc instead of doing alive check the alive check does not work because a race condition still exists. --- src/brod_group_coordinator.erl | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/brod_group_coordinator.erl b/src/brod_group_coordinator.erl index e419f6c9..cd7d7e86 100644 --- a/src/brod_group_coordinator.erl +++ b/src/brod_group_coordinator.erl @@ -75,6 +75,13 @@ {lo_cmd_stabilize, AttemptCount, Reason}). -define(INITIAL_MEMBER_ID, <<>>). +-define(CALL_MEMBER(MemberPid, EXPR), + try + EXPR + catch + exit:{noproc, {gen_server, call, [MemberPid | _]}} -> + exit({shutdown, member_down}) + end). -type config() :: brod:group_config(). -type ts() :: erlang:timestamp(). @@ -462,15 +469,6 @@ terminate(Reason, #state{ connection = Connection %%%_* Internal Functions ======================================================= -% Exit the process if MemberPid is set to pid but it's not -% currently alive --spec ensure_member_pid_alive(pid()) -> ok. -ensure_member_pid_alive(MemberPid) -> - case brod_utils:is_pid_alive(MemberPid) of - true -> ok; - false -> exit(member_pid_shutdown) -end. - -spec discover_coordinator(state()) -> {ok, state()}. discover_coordinator(#state{ client = Client , connection = Connection0 @@ -508,8 +506,7 @@ stabilize(#state{ rejoin_delay_seconds = RejoinDelaySeconds log(State0, info, "re-joining group, reason:~p", [Reason]), %% 1. unsubscribe all currently assigned partitions - ok = ensure_member_pid_alive(MemberPid), - ok = MemberModule:assignments_revoked(MemberPid), + ?CALL_MEMBER(MemberPid, MemberModule:assignments_revoked(MemberPid)), %% 2. some brod_group_member implementations may wait for messages %% to finish processing when assignments_revoked is called. @@ -684,9 +681,8 @@ sync_group(#state{ groupId = GroupId %% get my partition assignments Assignment = kpro:find(assignment, RspBody), TopicAssignments = get_topic_assignments(State, Assignment), - ok = ensure_member_pid_alive(MemberPid), - ok = MemberModule:assignments_received(MemberPid, MemberId, - GenerationId, TopicAssignments), + ?CALL_MEMBER(MemberPid, + MemberModule:assignments_received(MemberPid, MemberId, GenerationId, TopicAssignments)), NewState = State#state{is_in_group = true}, log(NewState, info, "assignments received:~s", [format_assignments(TopicAssignments)]), @@ -845,8 +841,7 @@ assign_partitions(State) when ?IS_LEADER(State) -> Assignments = case Strategy =:= callback_implemented of true -> - ok = ensure_member_pid_alive(MemberPid), - MemberModule:assign_partitions(MemberPid, Members, AllPartitions); + ?CALL_MEMBER(MemberPid, MemberModule:assign_partitions(MemberPid, Members, AllPartitions)); false -> do_assign_partitions(Strategy, Members, AllPartitions) end,