Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove group member, subscriber and workers when unsubscribing #96

Closed
wants to merge 2 commits into from

Conversation

pedro-gutierrez
Copy link

This PR iterates on the work done a while ago by @dams in #59.
It is still a bit rough, and it probably deserves some more testing, however at this stage, I was wondering whether the approach is correct or not?
Thanks!

@objectuser
Copy link
Contributor

@pedro-gutierrez Thanks very much! We'll check this out. I need to remember what I was thinking with #59. 😄

@objectuser
Copy link
Contributor

@pedro-gutierrez One thing I was thinking about after I read my comments in #59 was the results from :observer when the unsubscribe happened. Did you verify that the processes were removed as expected?

@pedro-gutierrez
Copy link
Author

Hey @objectuser thanks for having a look at that! Actually, I did check with the observer, and I had the impression processes where properly removed, but please let me come back with some screenshots, so we can both confirm it.

@pedro-gutierrez
Copy link
Author

pedro-gutierrez commented Feb 20, 2020

Hey, so I did a simple test, as follows. I start with 4 topics, named "topic.a", "topic.b", "topic.c" and "topic.d". Here is the state of my supervision tree:

image

Then I call:

iex(32)> Kaffe.GroupManager.unsubscribe_from_topics(["topic.a"])
[debug] Stopping group member for topic: topic.a
[debug] Stopping worker: topic.a / 0
[debug] Stopping worker: worker_0
[debug] Stopping worker: worker_0
[info] event#stopping=#PID<0.10057.0>
[info] Group member (rmb,coor=#PID<0.10058.0>,cb=#PID<0.10057.0>,generation=13):
Leaving group, reason: :shutdown

{:ok, ["topic.a"]}

As a result, the new state of my supervision tree becomes:

image

That seems to remove the subscriber process ok.

However if I try to unsubscribe from multiple topics in a single call, it crashes.

iex(34)> Kaffe.GroupManager.unsubscribe_from_topics(["topic.d", "topic.c"])
[debug] Stopping group member for topic: topic.c
[debug] Stopping worker: topic.c / 0
[debug] Stopping worker: worker_0
[debug] Stopping worker: worker_0
[info] event#stopping=#PID<0.10062.0>
[info] Group member (rmb,coor=#PID<0.10063.0>,cb=#PID<0.10062.0>,generation=14):
Leaving group, reason: :shutdown

[debug] Stopping group member for topic: topic.d
[debug] Stopping worker: topic.d / 0
[debug] Stopping worker: worker_0
[debug] Stopping worker: worker_0
[error] GenServer :"Elixir.Kaffe.WorkerManager.rmb" terminating
** (MatchError) no match of right hand side value: {:error, :not_found}
    (kaffe 1.15.0) lib/kaffe/consumer_group/worker/worker_supervisor.ex:31: Kaffe.WorkerSupervisor.stop_worker/3
    (kaffe 1.15.0) lib/kaffe/consumer_group/worker/worker_manager.ex:101: Kaffe.WorkerManager.stop_worker/2
    (kaffe 1.15.0) lib/kaffe/consumer_group/worker/worker_manager.ex:68: Kaffe.WorkerManager.handle_call/3
    (stdlib 3.11.2) gen_server.erl:661: :gen_server.try_handle_call/4
    (stdlib 3.11.2) gen_server.erl:690: :gen_server.handle_msg/6
    (stdlib 3.11.2) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message (from :"Elixir.Kaffe.GroupMember.rmb.topic.d"): {:stop_worker_for, "topic.d", 0}
State: %{subscriber_name: :rmb, supervisor_pid: #PID<0.7167.0>, worker_table: #Reference<0.318440085.2632318978.146415>}
Client :"Elixir.Kaffe.GroupMember.rmb.topic.d" is alive

    (stdlib 3.11.2) gen.erl:167: :gen.do_call/4
    (elixir 1.10.1) lib/gen_server.ex:1020: GenServer.call/3
    (kaffe 1.15.0) lib/kaffe/consumer_group/subscriber/group_member.ex:181: anonymous fn/4 in Kaffe.GroupMember.handle_call/3
    (elixir 1.10.1) lib/enum.ex:783: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir 1.10.1) lib/enum.ex:783: Enum.each/2
    (kaffe 1.15.0) lib/kaffe/consumer_group/subscriber/group_member.ex:179: Kaffe.GroupMember.handle_call/3
    (stdlib 3.11.2) gen_server.erl:661: :gen_server.try_handle_call/4
    (stdlib 3.11.2) gen_server.erl:690: :gen_server.handle_msg/6
    (stdlib 3.11.2) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
** (exit) exited in: GenServer.call(:"Elixir.Kaffe.GroupManager.rmb", {:unsubscribe_from_topics, ["topic.d", "topic.c"]}, 5000)
    ** (EXIT) exited in: GenServer.call(#PID<0.10064.0>, :stop_subscribers_and_workers, 5000)
        ** (EXIT) shutdown
    (elixir 1.10.1) lib/gen_server.ex:1023: GenServer.call/3
[info] Group member (rmb,coor=#PID<0.10065.0>,cb=#PID<0.10064.0>,generation=14):
Leaving group, reason: :shutdown

I think the way I am stopping workers is not correct, or I am stopping too many workers, or both :)

@pedro-gutierrez
Copy link
Author

pedro-gutierrez commented Feb 20, 2020

I removed the code that attempts to stop the consumer group worker, and now it seems to behave better.

Here is my supervision tree after I unsubscribe from topic.a:

image

Then if I unsubscribe from the rest of the topics:

iex(26)> Kaffe.GroupManager.unsubscribe_from_topics(["topic.b", "topic.c", "topic.d"])
[debug] Stopping group member for topic: topic.b
[info] event#stopping=#PID<0.2675.0>
[info] Group member (rmb,coor=#PID<0.2676.0>,cb=#PID<0.2675.0>,generation=16):
Leaving group, reason: :shutdown

[debug] Stopping group member for topic: topic.c
[info] event#stopping=#PID<0.2677.0>
[info] Group member (rmb,coor=#PID<0.2678.0>,cb=#PID<0.2677.0>,generation=16):
Leaving group, reason: :shutdown

[debug] Stopping group member for topic: topic.d
[info] event#stopping=#PID<0.2679.0>
[info] Group member (rmb,coor=#PID<0.2680.0>,cb=#PID<0.2679.0>,generation=16):
Leaving group, reason: :shutdown

{:ok, ["topic.b", "topic.c", "topic.d"]}

I end up with this tree:

image

I guess I would still need to delete worker_0, but only when there are
no more subscriptions in the consumer group, correct?

@objectuser
Copy link
Contributor

@pedro-gutierrez I wonder if this is really trying to do more work than necessary.

I called unsubscribe_from_topics/1 and it triggered a rebalance, which, in turn, caused all the subscribers and workers to be killed and new processes to take their place.

Would it be enough to just kill the subscriber and update the state?

However, if the intent is that the other subscribers should not be impacted, I don't think this has the desired effect, since they're all killed anyway. For example, I didn't see this statement ever logged:

Logger.debug("Stopping worker: #{topic} / #{partition}")

Are you seeing something different?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants