From a84d44bf8832c54ebab2223384b75b08e941a7e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Wed, 13 Mar 2024 18:57:30 +0100 Subject: [PATCH 1/6] Merge ered and ered_cluster processes The command handling code is moved from ered to ered_cluster. The ered module becomes just an API module without a process of its own. This eliminates some duplicated structures and copying between processes. Change ered_client_sup into to simple_one_for_one supervisor. No API changes. --- src/ered.erl | 320 ++--------------------------------- src/ered_client_sup.erl | 21 ++- src/ered_cluster.erl | 361 +++++++++++++++++++++++++++++++++------- test/ered_SUITE.erl | 24 +-- 4 files changed, 345 insertions(+), 381 deletions(-) diff --git a/src/ered.erl b/src/ered.erl index bba2e32..bccce50 100644 --- a/src/ered.erl +++ b/src/ered.erl @@ -1,11 +1,6 @@ -module(ered). %% External API for using connecting and sending commands to Redis cluster. -%% -%% This module is responsible for doing the command routing to the correct -%% redis node and handle command redirection replies. - --behaviour(gen_server). %% API -export([start_link/2, @@ -19,10 +14,6 @@ get_addr_to_client_map/1, update_slots/1, update_slots/2]). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, format_status/2]). - -export_type([opt/0, command/0, reply/0, @@ -34,30 +25,7 @@ %%% Definitions %%%=================================================================== --record(st, {cluster_pid :: pid(), - slots :: binary(), % The byte at offset N is an - % index into the clients tuple - % for slot N. - clients = {} :: tuple(), % Tuple of pid(), or addr() as - % placeholder when the process - % is gone. - slot_map_version = 0 :: non_neg_integer(), - addr_map = #{} :: #{addr() => pid() | Placeholder :: addr()}, - pending = #{} :: #{gen_server:from() => pid()}, - try_again_delay :: non_neg_integer(), - redirect_attempts :: non_neg_integer() - }). - --type opt() :: - %% If there is a TRYAGAIN response from Redis then wait - %% this many milliseconds before re-sending the command - {try_again_delay, non_neg_integer()} | - %% Only do these many retries or re-sends before giving - %% up and returning the result. This affects ASK, MOVED - %% and TRYAGAIN responses - {redirect_attempts, non_neg_integer()} | - ered_cluster:opt(). - +-type opt() :: ered_cluster:opt(). -type addr() :: ered_cluster:addr(). -type server_ref() :: pid(). -type command() :: ered_command:command(). @@ -73,23 +41,23 @@ %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec start_link([addr()], [opt()]) -> {ok, server_ref()} | {error, term()}. %% -%% Start the main process. This will also start the cluster handling +%% Start the cluster handling %% process which will set up clients to the provided addresses and %% fetch the cluster slot map. Once there is a complete slot map and %% all Redis node clients are connected this process is ready to -%% server requests. +%% serve requests. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - start_link(Addrs, Opts) -> - gen_server:start_link(?MODULE, [Addrs, Opts], []). + ered_cluster:start_link(Addrs, Opts). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec stop(server_ref()) -> ok. %% -%% Stop the main process. This will also stop the cluster handling +%% Stop the cluster handling %% process and in turn disconnect and stop all clients. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - stop(ServerRef) -> - gen_server:stop(ServerRef). + ered_cluster:stop(ServerRef). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command(server_ref(), command(), key()) -> reply(). @@ -106,11 +74,10 @@ stop(ServerRef) -> %% Command/3 is the same as setting the timeout to infinity. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command(ServerRef, Command, Key) -> - command(ServerRef, Command, Key, infinity). + ered_cluster:command(ServerRef, Command, Key, infinity). command(ServerRef, Command, Key, Timeout) -> - C = ered_command:convert_to(Command), - gen_server:call(ServerRef, {command, C, Key}, Timeout). + ered_cluster:command(ServerRef, Command, Key, Timeout). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_async(server_ref(), command(), key(), fun((reply()) -> any())) -> ok. @@ -120,8 +87,7 @@ command(ServerRef, Command, Key, Timeout) -> %% runs in an unspecified process. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_async(ServerRef, Command, Key, ReplyFun) when is_function(ReplyFun, 1) -> - C = ered_command:convert_to(Command), - gen_server:cast(ServerRef, {command_async, C, Key, ReplyFun}). + ered_cluster:command_async(ServerRef, Command, Key, ReplyFun). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_all(server_ref(), command()) -> [reply()]. @@ -130,14 +96,10 @@ command_async(ServerRef, Command, Key, ReplyFun) when is_function(ReplyFun, 1) - %% Send the same command to all connected master Redis nodes. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_all(ServerRef, Command) -> - command_all(ServerRef, Command, infinity). + ered_cluster:command_all(ServerRef, Command, infinity). command_all(ServerRef, Command, Timeout) -> - %% Send command in sequence to all instances. - %% This could be done in parallel but but keeping it easy and - %% aligned with eredis_cluster for now - Cmd = ered_command:convert_to(Command), - [ered_client:command(ClientRef, Cmd, Timeout) || ClientRef <- get_clients(ServerRef)]. + ered_cluster:command_all(ServerRef, Command, Timeout). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec command_client(client_ref(), command()) -> reply(). @@ -146,7 +108,7 @@ command_all(ServerRef, Command, Timeout) -> %% Send the command to a specific Redis client without any client routing. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - command_client(ClientRef, Command) -> - command_client(ClientRef, Command, infinity). + ered_client:command(ClientRef, Command, infinity). command_client(ClientRef, Command, Timeout) -> ered_client:command(ClientRef, Command, Timeout). @@ -168,7 +130,7 @@ command_client_async(ClientRef, Command, CallbackFun) -> %% Get all Redis master node clients %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - get_clients(ServerRef) -> - gen_server:call(ServerRef, get_clients). + ered_cluster:get_clients(ServerRef). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec get_addr_to_client_map(server_ref()) -> #{addr() => client_ref()}. @@ -176,7 +138,7 @@ get_clients(ServerRef) -> %% Get the address to client mapping. This includes all clients. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - get_addr_to_client_map(ServerRef) -> - gen_server:call(ServerRef, get_addr_to_client_map). + ered_cluster:get_addr_to_client_map(ServerRef). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec update_slots(server_ref()) -> ok. @@ -184,7 +146,7 @@ get_addr_to_client_map(ServerRef) -> %% Trigger a slot-to-node mapping update using any connected client. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - update_slots(ServerRef) -> - gen_server:cast(ServerRef, {update_slots, none}). + ered_cluster:update_slots(ServerRef, any, any). -spec update_slots(server_ref(), client_ref()) -> ok. %% @@ -193,254 +155,4 @@ update_slots(ServerRef) -> %% used. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - update_slots(ServerRef, ClientRef) -> - gen_server:cast(ServerRef, {update_slots, ClientRef}). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -init([Addrs, Opts1]) -> - %% Register callback to get slot map updates - InfoPids = [self() | proplists:get_value(info_pid, Opts1, [])], - Opts2 = [{info_pid, InfoPids} | proplists:delete(info_pid, Opts1)], - - {TryAgainDelay, Opts3} = take_prop(try_again_delay, Opts2, 200), - {RedirectAttempts, Opts4} = take_prop(redirect_attempts, Opts3, 10), - - {ok, ClusterPid} = ered_cluster:start_link(Addrs, Opts4), - EmptySlots = create_lookup_table(0, [], <<>>), - {ok, #st{cluster_pid = ClusterPid, - slots = EmptySlots, - try_again_delay = TryAgainDelay, - redirect_attempts = RedirectAttempts}}. - -handle_call({command, Command, Key}, From, State) -> - Slot = ered_lib:hash(Key), - State1 = send_command_to_slot(Command, Slot, From, State, State#st.redirect_attempts), - {noreply, State1}; - -handle_call(get_clients, _From, State) -> - {reply, tuple_to_list(State#st.clients), State}; - -handle_call(get_addr_to_client_map, _From, State) -> - {reply, State#st.addr_map, State}. - -handle_cast({command_async, Command, Key, ReplyFun}, State) -> - Slot = ered_lib:hash(Key), - State1 = send_command_to_slot(Command, Slot, ReplyFun, State, State#st.redirect_attempts), - {noreply, State1}; - -handle_cast({replied, To}, State) -> - {noreply, State#st{pending = maps:remove(To, State#st.pending)}}; - -handle_cast({update_slots, ClientRef}, State) -> - ered_cluster:update_slots(State#st.cluster_pid, State#st.slot_map_version, ClientRef), - {noreply, State}; - -handle_cast({forward_command, Command, Slot, From, Addr, AttemptsLeft}, State) -> - {Client, State1} = connect_addr(Addr, State), - Fun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), - ered_client:command_async(Client, Command, Fun), - {noreply, State1}; - -handle_cast({forward_command_asking, Command, Slot, From, Addr, AttemptsLeft, OldReply}, State) -> - {Client, State1} = connect_addr(Addr, State), - Command1 = ered_command:add_asking(OldReply, Command), - HandleReplyFun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), - Fun = fun(Reply) -> HandleReplyFun(ered_command:fix_ask_reply(OldReply, Reply)) end, - ered_client:command_async(Client, Command1, Fun), - {noreply, State1}. - -handle_info({command_try_again, Command, Slot, From, AttemptsLeft}, State) -> - State1 = send_command_to_slot(Command, Slot, From, State, AttemptsLeft), - {noreply, State1}; - -handle_info(#{msg_type := slot_map_updated}, State) -> - {MapVersion, ClusterMap, AddrToPid} = ered_cluster:get_slot_map_info(State#st.cluster_pid), - %% The idea is to store the client pids in a tuple and then - %% have a binary where each byte corresponds to a slot and the - %% value maps to a index in the tuple. - - MasterAddrToPid = maps:with(ered_lib:slotmap_master_nodes(ClusterMap), AddrToPid), - %% Create a list of indices, one for each client pid - Ixs = lists:seq(1, maps:size(MasterAddrToPid)), - %% Combine the indices with the Addresses to create a lookup from Addr -> Ix - AddrToIx = maps:from_list(lists:zip(maps:keys(MasterAddrToPid), Ixs)), - - Slots = create_lookup_table(ClusterMap, AddrToIx), - Clients = create_client_pid_tuple(MasterAddrToPid, AddrToIx), - %% Monitor the client processes - maps:foreach(fun (Addr, Pid) when map_get(Addr, State#st.addr_map) =:= Pid -> - ok; % Process already known - (Addr, Pid) -> - _ = monitor(process, Pid, [{tag, {'DOWN', Addr}}]) - end, - AddrToPid), - {noreply, State#st{slots = Slots, - clients = Clients, - slot_map_version = MapVersion, - addr_map = AddrToPid}}; - -handle_info(#{msg_type := connected, addr := Addr, client_id := Pid}, - State = #st{addr_map = AddrMap, clients = Clients}) - when is_map(State#st.addr_map) -> - case maps:find(Addr, AddrMap) of - {ok, Pid} -> - %% We already have this pid. - {noreply, State}; - {ok, OldPid} -> - %% The pid has changed for this client. It was probably restarted. - _Mon = monitor(process, Pid, [{tag, {'DOWN', Addr}}]), - %% Replace the pid in our lookup tables. - ClientList = [case P of - OldPid -> Pid; - Other -> Other - end || P <- tuple_to_list(Clients)], - {noreply, State#st{addr_map = AddrMap#{Addr := Pid}, - clients = list_to_tuple(ClientList)}}; - error -> - _Mon = monitor(process, Pid, [{tag, {'DOWN', Addr}}]), - {noreply, State#st{addr_map = AddrMap#{Addr => Pid}}} - end; - -handle_info({{'DOWN', Addr}, _Mon, process, Pid, ExitReason}, State) - when map_get(Addr, State#st.addr_map) =:= Pid -> - %% Client process is down. Abort all requests to this client. - Pending = maps:fold(fun (From, To, Acc) when To =:= Pid -> - gen_server:reply(From, {error, ExitReason}), - maps:remove(From, Acc); - (_From, _To, Acc) -> - Acc - end, - State#st.pending, - State#st.pending), - %% Put a placeholder instead of a pid in the lookup structures. - Placeholder = Addr, - ClientList = [case P of - Pid -> Placeholder; - Other -> Other - end || P <- tuple_to_list(State#st.clients)], - {noreply, State#st{addr_map = (State#st.addr_map)#{Addr := Placeholder}, - clients = list_to_tuple(ClientList), - pending = Pending}}; - -handle_info(_Ignore, State) -> - {noreply, State}. - -terminate(_Reason, State) -> - ered_cluster:stop(State#st.cluster_pid), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -format_status(_Opt, Status) -> - Status. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== -send_command_to_slot(Command, Slot, From, State, AttemptsLeft) -> - case binary:at(State#st.slots, Slot) of - 0 -> - reply(From, {error, unmapped_slot}, none), - State; - Ix -> - case element(Ix, State#st.clients) of - Client when is_pid(Client) -> - Fun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), - ered_client:command_async(Client, Command, Fun), - put_pending(From, Client, State); - _Placeholder -> - reply(From, {error, client_down}, none), - State - end - end. - -put_pending(From = {_, _}, Client, State) -> - %% Gen_server call. Store so we can reply if Client crashes. - State#st{pending = maps:put(From, Client, State#st.pending)}; -put_pending(ReplyFun, _Client, State) when is_function(ReplyFun) -> - %% Cast with reply fun. We don't keep track of those. - State. - -create_reply_fun(_Command, _Slot, _Client, From, _State, 0) -> - Pid = self(), - fun(Reply) -> reply(From, Reply, Pid) end; - -create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft) -> - Pid = self(), - %% Avoid binding the #st record inside the fun since the fun will be - %% copied to another process - ClusterPid = State#st.cluster_pid, - SlotMapVersion = State#st.slot_map_version, - TryAgainDelay = State#st.try_again_delay, - fun(Reply) -> - case ered_command:check_result(Reply) of - normal -> - reply(From, Reply, Pid); - {moved, Addr} -> - ered_cluster:update_slots(ClusterPid, SlotMapVersion, Client), - gen_server:cast(Pid, {forward_command, Command, Slot, From, Addr, AttemptsLeft-1}); - {ask, Addr} -> - gen_server:cast(Pid, {forward_command_asking, Command, Slot, From, Addr, AttemptsLeft-1, Reply}); - try_again -> - erlang:send_after(TryAgainDelay, Pid, {command_try_again, Command, Slot, From, AttemptsLeft-1}); - cluster_down -> - ered_cluster:update_slots(ClusterPid, SlotMapVersion, Client), - reply(From, Reply, Pid) - end - end. - -%% Handle a reply, either by sending it back to a gen server caller or by -%% applying a reply function. -reply(To = {_, _}, Reply, EredPid) when is_pid(EredPid) -> - gen_server:reply(To, Reply), - gen_server:cast(EredPid, {replied, To}); -reply(To = {_, _}, Reply, none) -> - gen_server:reply(To, Reply); -reply(ReplyFun, Reply, _EredPid) when is_function(ReplyFun, 1) -> - ReplyFun(Reply). - -create_client_pid_tuple(AddrToPid, AddrToIx) -> - %% Create a list with tuples where the first element is the index and the second is the pid - IxPid = [{maps:get(Addr, AddrToIx), Pid} || {Addr, Pid} <- maps:to_list(AddrToPid)], - %% Sort the list and remove the index to get the pids in the right order - Pids = [Pid || {_Ix, Pid} <- lists:sort(IxPid)], - list_to_tuple(Pids). - -create_lookup_table(ClusterMap, AddrToIx) -> - %% Replace the Addr in the slot map with the index using the lookup - Slots = [{Start, End, maps:get(Addr,AddrToIx)} - || {Start, End, Addr} <- ered_lib:slotmap_master_slots(ClusterMap)], - create_lookup_table(0, Slots, <<>>). - -create_lookup_table(16384, _, Acc) -> - Acc; -create_lookup_table(N, [], Acc) -> - %% no more slots, rest are set to unmapped - create_lookup_table(N+1, [], <>); -create_lookup_table(N, L = [{Start, End, Val} | Rest], Acc) -> - if - N < Start -> % unmapped, use 0 - create_lookup_table(N+1, L, <>); - N =< End -> % in range - create_lookup_table(N+1, L, <>); - true -> - create_lookup_table(N, Rest, Acc) - end. - -connect_addr(Addr, State) -> - case maps:get(Addr, State#st.addr_map, not_found) of - not_found -> - Client = ered_cluster:connect_node(State#st.cluster_pid, Addr), - _Mon = monitor(process, Client, [{tag, {'DOWN', Addr}}]), - {Client, State#st{addr_map = maps:put(Addr, Client, State#st.addr_map)}}; - Client -> - {Client, State} - end. - -take_prop(Key, List, Default) -> - Val = proplists:get_value(Key, List, Default), - NewList = proplists:delete(Key, List), - {Val, NewList}. + ered_cluster:update_slots(ServerRef, any, ClientRef). diff --git a/src/ered_client_sup.erl b/src/ered_client_sup.erl index 8dcb3af..7968169 100644 --- a/src/ered_client_sup.erl +++ b/src/ered_client_sup.erl @@ -8,23 +8,22 @@ -export([init/1]). -type host() :: inet:socket_address() | inet:hostname(). --type addr() :: {host(), inet:port_number()}. start_link() -> supervisor:start_link(?MODULE, []). -spec start_client(supervisor:sup_ref(), host(), inet:port_number(), [ered_client:opt()]) -> any(). start_client(Sup, Host, Port, ClientOpts) -> - ChildSpec = #{id => {Host, Port}, - start => {ered_client, start_link, [Host, Port, ClientOpts]}, - modules => [ered_client]}, - supervisor:start_child(Sup, ChildSpec). + supervisor:start_child(Sup, [Host, Port, ClientOpts]). --spec stop_client(supervisor:sup_ref(), addr()) -> ok. -stop_client(Sup, Addr) -> - ok = supervisor:terminate_child(Sup, Addr), - ok = supervisor:delete_child(Sup, Addr). +-spec stop_client(supervisor:sup_ref(), pid()) -> ok. +stop_client(Sup, Pid) -> + _ = supervisor:terminate_child(Sup, Pid), + ok. init([]) -> - %% Use defaults (one_for_one; tolerate 1 restart per 5 seconds), no children yet. - {ok, {#{}, []}}. + {ok, {#{strategy => simple_one_for_one}, + [#{id => undefined, + start => {ered_client, start_link, []}, + restart => temporary, + modules => [ered_client]}]}}. diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index 185ed10..63d0bfa 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -10,8 +10,11 @@ %% API -export([start_link/2, stop/1, + command/4, command_async/4, + command_all/2, command_all/3, + get_clients/1, + get_addr_to_client_map/1, update_slots/3, - get_slot_map_info/1, connect_node/2 ]). @@ -31,6 +34,18 @@ cluster_state = nok :: ok | nok, %% Supervisor for our client processes client_sup :: pid(), + + %% Structures for fast slot-to-pid lookup. + slots :: binary(), % The byte at offset N is an + % index into the clients tuple + % for slot N. + clients = {} :: tuple(), % Tuple of pid(), or addr() as + % placeholder when the process + % is gone. + + %% Pending synchronous commands + pending_commands = #{} :: #{gen_server:from() => pid()}, + %% Mapping from address to client for all known clients nodes = #{} :: #{addr() => pid()}, %% Clients in connected state @@ -53,6 +68,8 @@ slot_timer_ref = none, info_pid = [] :: [pid()], + try_again_delay = 200 :: non_neg_integer(), + redirect_attempts = 10 :: non_neg_integer(), update_delay = 1000, % 1s delay between slot map update requests client_opts = [], update_slot_wait = 500, @@ -66,8 +83,18 @@ -type addr_set() :: sets:set(addr()). -type server_ref() :: pid(). -type client_ref() :: ered_client:server_ref(). +-type command() :: ered_command:command(). +-type reply() :: ered_client:reply() | {error, unmapped_slot | client_down}. +-type key() :: binary(). -type opt() :: + %% If there is a TRYAGAIN response from Redis then wait + %% this many milliseconds before re-sending the command + {try_again_delay, non_neg_integer()} | + %% Only do these many retries or re-sends before giving + %% up and returning the result. This affects ASK, MOVED + %% and TRYAGAIN responses + {redirect_attempts, non_neg_integer()} | %% List of pids to receive cluster info messages. See ered_info_msg module. {info_pid, [pid()]} | %% CLUSTER SLOTS command is used to fetch slots from the Redis cluster. @@ -107,30 +134,76 @@ stop(ServerRef) -> gen_server:stop(ServerRef). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec update_slots(server_ref(), non_neg_integer(), client_ref() | none) -> ok. +-spec command(server_ref(), command(), key(), timeout()) -> reply(). +%% +%% Send a command to the Redis cluster. The command will be routed to +%% the correct Redis node client based on the provided key. +%% If the command is a single command then it is represented as a +%% list of binaries where the first binary is the Redis command +%% to execute and the rest of the binaries are the arguments. +%% If the command is a pipeline, e.g. multiple commands to executed +%% then they need to all map to the same slot for things to +%% work as expected. +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +command(ServerRef, Command, Key, Timeout) -> + C = ered_command:convert_to(Command), + gen_server:call(ServerRef, {command, C, Key}, Timeout). + +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +-spec command_async(server_ref(), command(), key(), fun((reply()) -> any())) -> ok. +%% +%% Like command/4 but asynchronous. Instead of returning the reply, the reply +%% function is applied to the reply when it is available. The reply function +%% runs in an unspecified process. +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +command_async(ServerRef, Command, Key, ReplyFun) when is_function(ReplyFun, 1) -> + C = ered_command:convert_to(Command), + gen_server:cast(ServerRef, {command_async, C, Key, ReplyFun}). + +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +-spec command_all(server_ref(), command()) -> [reply()]. +-spec command_all(server_ref(), command(), timeout()) -> [reply()]. +%% +%% Send the same command to all connected master Redis nodes. +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +command_all(ServerRef, Command) -> + command_all(ServerRef, Command, infinity). + +command_all(ServerRef, Command, Timeout) -> + %% Send command in sequence to all instances. This could be done in parallel + %% but but we're keeping it simple and aligned with eredis_cluster for now. + Cmd = ered_command:convert_to(Command), + [ered_client:command(ClientRef, Cmd, Timeout) || ClientRef <- get_clients(ServerRef)]. + +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +-spec get_clients(server_ref()) -> [client_ref()]. +%% +%% Get all Redis master node clients +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +get_clients(ServerRef) -> + gen_server:call(ServerRef, get_clients). + +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +-spec get_addr_to_client_map(server_ref()) -> #{addr() => client_ref()}. +%% +%% Get the address to client mapping. This includes all clients. +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +get_addr_to_client_map(ServerRef) -> + gen_server:call(ServerRef, get_addr_to_client_map). + +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +-spec update_slots(server_ref(), non_neg_integer() | any, client_ref() | any) -> ok. %% %% Trigger a CLUSTER SLOTS command towards the specified Redis node if %% the slot map version provided is the same as the one stored in the %% cluster process state. This is used when a cluster state change is %% detected with a MOVED redirection. It is also used when triggering -%% a slot update manually. In this case the node is 'none', meaning +%% a slot update manually. In this case the node is 'any', meaning %% no specific node is preferred. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - update_slots(ServerRef, SlotMapVersion, Node) -> gen_server:cast(ServerRef, {trigger_map_update, SlotMapVersion, Node}). -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec get_slot_map_info(server_ref()) -> - {SlotMapVersion :: non_neg_integer(), - SlotMap :: ered_lib:slot_map(), - Clients :: #{addr() => pid()}}. -%% -%% Fetch the cluster information. This provides the current slot map -%% and a map with all the clients. -%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -get_slot_map_info(ServerRef) -> - gen_server:call(ServerRef, get_slot_map_info). - %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec connect_node(server_ref(), addr()) -> client_ref(). %% @@ -150,58 +223,80 @@ init([Addrs, Opts]) -> {ok, ClientSup} = ered_client_sup:start_link(), State = lists:foldl( fun ({info_pid, Val}, S) -> S#st{info_pid = Val}; + ({try_again_delay, Val}, S) -> S#st{try_again_delay = Val}; + ({redirect_attempts, Val}, S)-> S#st{redirect_attempts = Val}; ({update_slot_wait, Val}, S) -> S#st{update_slot_wait = Val}; ({client_opts, Val}, S) -> S#st{client_opts = Val}; ({min_replicas, Val}, S) -> S#st{min_replicas = Val}; ({close_wait, Val}, S) -> S#st{close_wait = Val}; (Other, _) -> error({badarg, Other}) end, - #st{client_sup = ClientSup}, + #st{client_sup = ClientSup, + slots = create_lookup_table(0, [], <<>>)}, Opts), {ok, start_clients(Addrs, State)}. +handle_call({command, Command, Key}, From, State) -> + Slot = ered_lib:hash(Key), + State1 = send_command_to_slot(Command, Slot, From, State, State#st.redirect_attempts), + {noreply, State1}; -handle_call(get_slot_map_info, _From, State) -> - Nodes = ered_lib:slotmap_all_nodes(State#st.slot_map), - Clients = maps:with(Nodes, State#st.nodes), - Reply = {State#st.slot_map_version, State#st.slot_map, Clients}, - {reply,Reply,State}; +handle_call(get_clients, _From, State) -> + {reply, tuple_to_list(State#st.clients), State}; + +handle_call(get_addr_to_client_map, _From, State) -> + %% All connected clients, except the ones we're closing. + {reply, maps:without(maps:keys(State#st.closing), State#st.nodes), State}; handle_call({connect_node, Addr}, _From, State) -> State1 = start_clients([Addr], State), ClientPid = maps:get(Addr, State1#st.nodes), {reply, ClientPid, State1}. -handle_cast({trigger_map_update, SlotMapVersion, Node}, State) -> - case (SlotMapVersion == State#st.slot_map_version) and (State#st.slot_timer_ref == none) of - true -> - %% Get the address of the client. The address is needd to look up the node status - %% before sending an update. This could need to go through all the nodes - %% but it should not be done often enough to be a problem - NodeAddr = case lists:keyfind(Node, 2, maps:to_list(State#st.nodes)) of - false -> - []; - {Addr, _Client} -> - [Addr] - end, - {noreply, start_periodic_slot_info_request(NodeAddr, State)}; - false -> - {noreply, State} - end. - -handle_info(Msg = {connection_status, {Pid, Addr, _Id}, Status}, State0) -> - State = case maps:find(Addr, State0#st.nodes) of - {ok, Pid} -> - %% Client pid unchanged. - State0; - {ok, _OldPid} -> - %% New client pid for this address. It may have been - %% restarted by the client supervisor. - State0#st{nodes = (State0#st.nodes)#{Addr => Pid}}; - error -> - %% Node not part of the cluster and was already removed. - State0 - end, +handle_cast({command_async, Command, Key, ReplyFun}, State) -> + Slot = ered_lib:hash(Key), + State1 = send_command_to_slot(Command, Slot, ReplyFun, State, State#st.redirect_attempts), + {noreply, State1}; + +handle_cast({replied, To}, State) -> + {noreply, State#st{pending_commands = maps:remove(To, State#st.pending_commands)}}; + +handle_cast({forward_command, Command, Slot, From, Addr, AttemptsLeft}, State) -> + {Client, State1} = connect_addr(Addr, State), + Fun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), + ered_client:command_async(Client, Command, Fun), + {noreply, State1}; + +handle_cast({forward_command_asking, Command, Slot, From, Addr, AttemptsLeft, OldReply}, State) -> + {Client, State1} = connect_addr(Addr, State), + Command1 = ered_command:add_asking(OldReply, Command), + HandleReplyFun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), + Fun = fun(Reply) -> HandleReplyFun(ered_command:fix_ask_reply(OldReply, Reply)) end, + ered_client:command_async(Client, Command1, Fun), + {noreply, State1}; + +handle_cast({trigger_map_update, SlotMapVersion, Node}, State) + when SlotMapVersion == State#st.slot_map_version orelse SlotMapVersion == any, + State#st.slot_timer_ref == none -> + %% Get the address of the client. The address is needed to look up the node status + %% before sending an update. This could need to go through all the nodes + %% but it should not be done often enough to be a problem + NodeAddr = case lists:keyfind(Node, 2, maps:to_list(State#st.nodes)) of + false -> + []; + {Addr, _Client} -> + [Addr] + end, + {noreply, start_periodic_slot_info_request(NodeAddr, State)}; + +handle_cast({trigger_map_update, _SlotMapVersion, _Node}, State) -> + {noreply, State}. + +handle_info({command_try_again, Command, Slot, From, AttemptsLeft}, State) -> + State1 = send_command_to_slot(Command, Slot, From, State, AttemptsLeft), + {noreply, State1}; + +handle_info(Msg = {connection_status, {_Pid, Addr, _Id}, Status}, State) -> IsMaster = sets:is_element(Addr, State#st.masters), ered_info_msg:connection_status(Msg, IsMaster, State#st.info_pid), State1 = case Status of @@ -271,7 +366,8 @@ handle_info({slot_info, Version, Response, FromAddr}, State) -> {noreply, update_cluster_state(State)}; false -> Nodes = ered_lib:slotmap_all_nodes(NewMap), - MasterNodes = new_set(ered_lib:slotmap_master_nodes(NewMap)), + MasterNodesList = ered_lib:slotmap_master_nodes(NewMap), + MasterNodes = new_set(MasterNodesList), %% Remove nodes if they are not in the new map. Remove = maps:keys(maps:without(Nodes, State#st.nodes)), @@ -293,7 +389,20 @@ handle_info({slot_info, Version, Response, FromAddr}, State) -> %% open new clients State1 = start_clients(Nodes, State), - State2 = State1#st{slot_map_version = Version + 1, + + %% Create the slots (binary slot-to-idx) and clients (tuple idx-to-pid) stuctures + AddrToPid = maps:with(Nodes, State1#st.nodes), + MasterAddrToPid = maps:with(MasterNodesList, AddrToPid), + %% Create a list of indices, one for each client pid + Ixs = lists:seq(1, maps:size(MasterAddrToPid)), + %% Combine the indices with the Addresses to create a lookup from Addr -> Ix + AddrToIx = maps:from_list(lists:zip(maps:keys(MasterAddrToPid), Ixs)), + Slots = create_lookup_table(NewMap, AddrToIx), + Clients = create_client_pid_tuple(MasterAddrToPid, AddrToIx), + + State2 = State1#st{slots = Slots, + clients = Clients, + slot_map_version = Version + 1, slot_map = NewMap, masters = MasterNodes, closing = NewClosing}, @@ -319,15 +428,59 @@ handle_info({timeout, TimerRef, {close_clients, Remove}}, State) -> Tref == TimerRef], Clients = maps:with(ToCloseNow, State#st.nodes), [ered_client_sup:stop_client(State#st.client_sup, Client) - || Client <- maps:keys(Clients)], + || Client <- maps:values(Clients)], %% remove from nodes and closing map {noreply, State#st{nodes = maps:without(ToCloseNow, State#st.nodes), up = sets:subtract(State#st.up, new_set(ToCloseNow)), - closing = maps:without(ToCloseNow, State#st.closing)}}. + closing = maps:without(ToCloseNow, State#st.closing)}}; + +handle_info({{'DOWN', Addr}, _Mon, process, Pid, ExitReason}, State) + when map_get(Addr, State#st.nodes) =:= Pid -> + %% Unexpected client exit. Abort all requests to this client. + PendingCmds = maps:fold(fun (From, To, Acc) when To =:= Pid -> + gen_server:reply(From, {error, ExitReason}), + maps:remove(From, Acc); + (_From, _To, Acc) -> + Acc + end, + State#st.pending_commands, + State#st.pending_commands), + State1 = case maps:is_key(Addr, State#st.closing) of + true -> + %% Don't restart it. + State#st{nodes = maps:remove(Addr, State#st.nodes), + closing = maps:remove(Addr, State#st.closing), + pending = sets:del_element(Addr, State#st.pending)}; + false -> + %% Restart it. + NewPid = start_client(Addr, State), + Clients = case sets:is_element(Addr, State#st.masters) of + true -> + %% Replace pid in slot-to-pid lookup + List = tuple_to_list(State#st.clients), + NewList = [case P of + Pid -> NewPid; + Other -> Other + end || P <- List], + list_to_tuple(NewList); + false -> + State#st.clients + end, + State#st{clients = Clients, + nodes = maps:put(Addr, NewPid, State#st.nodes), + pending = sets:add_element(Addr, State#st.pending)} + end, + {noreply, State1#st{pending_commands = PendingCmds, + up = sets:del_element(Addr, State#st.up), + queue_full = sets:del_element(Addr, State#st.queue_full), + reconnecting = sets:del_element(Addr, State#st.reconnecting)}}; + +handle_info(_Ignore, State) -> + {noreply, State}. terminate(_Reason, State) -> [ered_client_sup:stop_client(State#st.client_sup, Pid) - || Pid <- maps:keys(State#st.nodes)], + || Pid <- maps:values(State#st.nodes)], ok. code_change(_OldVsn, State, _Extra) -> @@ -348,6 +501,101 @@ new_set(List) -> sets:from_list(List). -endif. +%%%------------------------------------------------------------------- +%%% Command handling +%%%------------------------------------------------------------------- + +send_command_to_slot(Command, Slot, From, State, AttemptsLeft) -> + case binary:at(State#st.slots, Slot) of + 0 -> + reply(From, {error, unmapped_slot}, none), + State; + Ix -> + Client = element(Ix, State#st.clients), + Fun = create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft), + ered_client:command_async(Client, Command, Fun), + put_pending_command(From, Client, State) + end. + +put_pending_command(From = {_, _}, Client, State) -> + %% Gen_server call. Store so we can reply if Client crashes. + State#st{pending_commands = maps:put(From, Client, State#st.pending_commands)}; +put_pending_command(ReplyFun, _Client, State) when is_function(ReplyFun) -> + %% Cast with reply fun. We don't keep track of those. + State. + +create_reply_fun(_Command, _Slot, _Client, From, _State, 0) -> + Pid = self(), + fun(Reply) -> reply(From, Reply, Pid) end; +create_reply_fun(Command, Slot, Client, From, State, AttemptsLeft) -> + Pid = self(), + %% Avoid binding the #st record inside the fun since the fun will be + %% copied to another process + SlotMapVersion = State#st.slot_map_version, + TryAgainDelay = State#st.try_again_delay, + fun(Reply) -> + case ered_command:check_result(Reply) of + normal -> + reply(From, Reply, Pid); + {moved, Addr} -> + update_slots(Pid, SlotMapVersion, Client), + gen_server:cast(Pid, {forward_command, Command, Slot, From, Addr, AttemptsLeft-1}); + {ask, Addr} -> + gen_server:cast(Pid, {forward_command_asking, Command, Slot, From, Addr, AttemptsLeft-1, Reply}); + try_again -> + erlang:send_after(TryAgainDelay, Pid, {command_try_again, Command, Slot, From, AttemptsLeft-1}); + cluster_down -> + update_slots(Pid, SlotMapVersion, Client), + reply(From, Reply, Pid) + end + end. + +%% Handle a reply, either by sending it back to a gen server caller or by +%% applying a reply function. +reply(To = {_, _}, Reply, ClusterPid) when is_pid(ClusterPid) -> + gen_server:reply(To, Reply), + gen_server:cast(ClusterPid, {replied, To}); +reply(To = {_, _}, Reply, none) -> + gen_server:reply(To, Reply); +reply(ReplyFun, Reply, _ClusterPid) when is_function(ReplyFun, 1) -> + ReplyFun(Reply). + +create_client_pid_tuple(AddrToPid, AddrToIx) -> + %% Create a list with tuples where the first element is the index and the second is the pid + IxPid = [{maps:get(Addr, AddrToIx), Pid} || {Addr, Pid} <- maps:to_list(AddrToPid)], + %% Sort the list and remove the index to get the pids in the right order + Pids = [Pid || {_Ix, Pid} <- lists:sort(IxPid)], + list_to_tuple(Pids). + +create_lookup_table(ClusterMap, AddrToIx) -> + %% Replace the Addr in the slot map with the index using the lookup + Slots = [{Start, End, maps:get(Addr,AddrToIx)} + || {Start, End, Addr} <- ered_lib:slotmap_master_slots(ClusterMap)], + create_lookup_table(0, Slots, <<>>). + +create_lookup_table(16384, _, Acc) -> + Acc; +create_lookup_table(N, [], Acc) -> + %% no more slots, rest are set to unmapped + create_lookup_table(N+1, [], <>); +create_lookup_table(N, L = [{Start, End, Val} | Rest], Acc) -> + if + N < Start -> % unmapped, use 0 + create_lookup_table(N+1, L, <>); + N =< End -> % in range + create_lookup_table(N+1, L, <>); + true -> + create_lookup_table(N, Rest, Acc) + end. + +connect_addr(Addr, State) -> + State1 = start_clients([Addr], State), + {maps:get(Addr, State1#st.nodes), State1}. + +%%%------------------------------------------------------------------- +%%% Cluster management +%%%------------------------------------------------------------------- + check_cluster_status(State) -> case is_slot_map_ok(State) of ok -> @@ -522,6 +770,7 @@ start_client(Addr, State) -> {Host, Port} = Addr, Opts = [{info_pid, self()}, {use_cluster_id, true}] ++ State#st.client_opts, {ok, Pid} = ered_client_sup:start_client(State#st.client_sup, Host, Port, Opts), + _ = monitor(process, Pid, [{tag, {'DOWN', Addr}}]), Pid. start_clients(Addrs, State) -> diff --git a/test/ered_SUITE.erl b/test/ered_SUITE.erl index abec63e..9016a42 100644 --- a/test/ered_SUITE.erl +++ b/test/ered_SUITE.erl @@ -225,16 +225,18 @@ t_client_crash(_) -> ?MSG(#{addr := {"127.0.0.1", Port}, master := true, msg_type := client_stopped}), ?MSG({'DOWN', _Mon, process, Pid0, crash}), ?MSG(#{msg_type := cluster_not_ok, reason := master_down}), - %% Instant error when client pid is dead. There's a race condition here. The - %% new client process can potentially come up fast and return "OK" to the - %% following commands. - {error, client_down} = ered:command(R, [<<"SET">>, <<"k">>, <<"v">>], <<"k">>), + %% Command immediately when the client process is dead. The cluster process + %% starts a new client synchronously, so the command succeeds. There's a + %% possible race condition here though. The cluster process may receive the + %% command before it receives the 'DOWN' message from the dead client and + %% thus it doesn't know the client is dead. + {ok, <<"OK">>} = ered:command(R, [<<"SET">>, <<"k">>, <<"v">>], <<"k">>), ered:command_async(R, [<<"SET">>, <<"k">>, <<"v">>], <<"k">>, fun (Reply) -> %% This command does get a reply. TestPid ! {async_command_when_down, Reply} end), - ?MSG({async_command_when_down, {error, client_down}}), + ?MSG({async_command_when_down, {ok, <<"OK">>}}), %% End of race condition. ?MSG(#{addr := {"127.0.0.1", Port}, master := true, msg_type := connected}, 10000), AddrToPid1 = ered:get_addr_to_client_map(R), @@ -274,16 +276,18 @@ t_client_killed(_) -> %% We don't get 'cluster_not_ok' here, because ered_cluster relies on a %% message from ered_client. Using a monitor instead would be more reliable. - %% Instant error when client pid is dead. There's a race condition here. The - %% new client process can potentially come up fast and return "OK" to the - %% following commands. - {error, client_down} = ered:command(R, [<<"SET">>, <<"k">>, <<"v">>], <<"k">>), + %% Command immediately when the client process is dead. The cluster process + %% starts a new client synchronously, so the command succeeds. There's a + %% possible race condition here though. The cluster process may receive the + %% command before it receives the 'DOWN' message from the dead client and + %% thus it doesn't know the client is dead. + {ok, <<"OK">>} = ered:command(R, [<<"SET">>, <<"k">>, <<"v">>], <<"k">>), ered:command_async(R, [<<"SET">>, <<"k">>, <<"v">>], <<"k">>, fun (Reply) -> %% This command does get a reply. TestPid ! {async_command_when_down, Reply} end), - ?MSG({async_command_when_down, {error, client_down}}), + ?MSG({async_command_when_down, {ok, <<"OK">>}}), %% End of race condition. ?MSG(#{addr := {"127.0.0.1", Port}, master := true, msg_type := connected}), AddrToPid1 = ered:get_addr_to_client_map(R), From 504db557dcd21decc833294e42225d5dfa5bb450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 19 Mar 2024 16:16:44 +0100 Subject: [PATCH 2/6] App and supervision tree Ered becomes an application with its own supervision tree. In the API, connect_cluster/2 replaces start_link/2 and close/1 replaces stop/1. The processes are added to ered's own supervision tree. An ered instance monitors the caller and closes down if the owner dies, much like the controlling process concept in gen_tcp and ssl. --- README.md | 28 +++++++++++----------- src/ered.app.src | 2 +- src/ered.erl | 30 ++++++++++++++++++------ src/ered_app.erl | 11 +++++++++ src/ered_client.erl | 45 ++++++++++++++++++++++++++++-------- src/ered_client_sup.erl | 13 +++++++---- src/ered_cluster.erl | 26 ++++++++++++--------- src/ered_cluster_sup.erl | 24 +++++++++++++++++++ src/ered_dyn_cluster_sup.erl | 38 ++++++++++++++++++++++++++++++ src/ered_sup.erl | 39 +++++++++++++++++++++++++++++++ test/ered_SUITE.erl | 23 +++++++++--------- 11 files changed, 223 insertions(+), 56 deletions(-) create mode 100644 src/ered_app.erl create mode 100644 src/ered_cluster_sup.erl create mode 100644 src/ered_dyn_cluster_sup.erl create mode 100644 src/ered_sup.erl diff --git a/README.md b/README.md index 2f0e601..56ab1cb 100644 --- a/README.md +++ b/README.md @@ -18,38 +18,40 @@ Usage by example ---------------- ```Erlang -1> {ok, Pid} = ered:start_link([{"localhost", 6379}], []). +1> {ok, _} = application:ensure_all_started(ered, temporary), +2> {ok, Pid} = ered:connect_cluster([{"localhost", 6379}], []). {ok,<0.164.0>} -2> ered:command(Pid, [<<"SET">>, <<"mykey">>, <<"42">>], <<"mykey">>, 5000). +3> ered:command(Pid, [<<"SET">>, <<"mykey">>, <<"42">>], <<"mykey">>, 5000). {ok,<<"OK">>} -3> ered:command_async(Pid, [<<"GET">>, <<"mykey">>], <<"mykey">>, fun(Reply) -> io:format("Reply: ~p~n", [Reply]) end). +4> ered:command_async(Pid, [<<"GET">>, <<"mykey">>], <<"mykey">>, fun(Reply) -> io:format("Reply: ~p~n", [Reply]) end). ok Reply: {ok,<<"42">>} -4> ered:stop(Pid). +5> ered:stop(Pid). ok ``` Functions --------- -### `start_link/2` +### `connect_cluster/2` ```Erlang -start_link([addr()], [opt()]) -> {ok, server_ref()} | {error, term()}. +connect_cluster([addr()], [opt()]) -> {ok, server_ref()} | {error, term()}. ``` Start the main process. This will also start the cluster handling process which will set up clients to the provided addresses and fetch the cluster slot map. Once there is a complete slot map and all Redis node clients are connected this process is ready to -serve requests. +serve requests. The processes are supervised by the `ered` application, +which needs to be started in advance. One or more addresses, `addr() :: {inet:socket_address() | inet:hostname(), -inet:port_number()}`, is used to discover the rest of the cluster. +inet:port_number()}`, are used to discover the rest of the cluster. For options, see [Options](#options) below. -### `stop/1` +### `close/1` ```Erlang stop(server_ref()) -> ok. @@ -145,7 +147,7 @@ used. Options ------- -The following options can be passed to `start_link/2`: +The following options can be passed to `connect/2`: * `{try_again_delay, non_neg_integer()}` @@ -187,7 +189,7 @@ The following options can be passed to `start_link/2`: ### Client options -Options passed to `start_link/2` as the options `{client_opts, [...]}`. +Options passed to `connect/2` as the options `{client_opts, [...]}`. * `{connection_opts, [ered_connection:opt()]}` @@ -242,7 +244,7 @@ Options passed to `start_link/2` as the options `{client_opts, [...]}`. ### Connection options -Options passed to `start_link/2` as the options `{client_opts, [{connection_opts, [...]}]}`. +Options passed to `connect/2` as the options `{client_opts, [{connection_opts, [...]}]}`. * `{batch_size, non_neg_integer()}` @@ -281,7 +283,7 @@ Info messages ------------- When one or more pids have been provided as the option `{info_pid, [pid()]}` to -`start_link/2`, these are the messages ered sends. All messages are maps with at +`connect/2`, these are the messages ered sends. All messages are maps with at least the key `msg_type`. Messages about the cluster as a whole: diff --git a/src/ered.app.src b/src/ered.app.src index 46d710c..72992b0 100644 --- a/src/ered.app.src +++ b/src/ered.app.src @@ -8,7 +8,7 @@ ssl]}, {env,[]}, {modules, []}, - + {mod, {ered_app, []}}, {licenses, ["MIT"]}, {links, []} ]}. diff --git a/src/ered.erl b/src/ered.erl index bccce50..7f5058d 100644 --- a/src/ered.erl +++ b/src/ered.erl @@ -3,8 +3,8 @@ %% External API for using connecting and sending commands to Redis cluster. %% API --export([start_link/2, - stop/1, +-export([connect_cluster/2, connect_client/3, + close/1, command/3, command/4, command_async/4, command_all/2, command_all/3, @@ -27,6 +27,7 @@ -type opt() :: ered_cluster:opt(). -type addr() :: ered_cluster:addr(). +-type host() :: ered_connection:host(). -type server_ref() :: pid(). -type command() :: ered_command:command(). -type reply() :: ered_client:reply() | {error, unmapped_slot | client_down}. @@ -39,7 +40,7 @@ %%%=================================================================== %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec start_link([addr()], [opt()]) -> {ok, server_ref()} | {error, term()}. +-spec connect_cluster([addr()], [opt()]) -> {ok, server_ref()} | {error, term()}. %% %% Start the cluster handling %% process which will set up clients to the provided addresses and @@ -47,16 +48,31 @@ %% all Redis node clients are connected this process is ready to %% serve requests. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -start_link(Addrs, Opts) -> - ered_cluster:start_link(Addrs, Opts). +connect_cluster(Addrs, Opts) -> + try ered_cluster_sup:start_child() of + {ok, ClusterSup} -> + {ok, ClientSup} = ered_dyn_cluster_sup:start_client_sup(ClusterSup), + {ok, ClusterPid} = ered_dyn_cluster_sup:start_cluster_mgr(ClusterSup, Addrs, Opts, ClientSup, self()), + {ok, ClusterPid} + catch exit:{noproc, _} -> + {error, ered_not_started} + end. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec stop(server_ref()) -> ok. +-spec connect_client(host(), inet:port_number(), [opt()]) -> {ok, client_ref()} | {error, term()}. +%% +%% Open a single client connection to a Redis node. +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +connect_client(Host, Port, Opts) -> + ered_client:connect(Host, Port, Opts). + +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +-spec close(server_ref()) -> ok. %% %% Stop the cluster handling %% process and in turn disconnect and stop all clients. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -stop(ServerRef) -> +close(ServerRef) -> ered_cluster:stop(ServerRef). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/src/ered_app.erl b/src/ered_app.erl new file mode 100644 index 0000000..15cbec8 --- /dev/null +++ b/src/ered_app.erl @@ -0,0 +1,11 @@ +-module(ered_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_Type, _StartArgs) -> + ered_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/ered_client.erl b/src/ered_client.erl index caf4bcb..f956dc2 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -9,8 +9,9 @@ %% API --export([start_link/3, - stop/1, deactivate/1, reactivate/1, +-export([start_link/3, start_link/4, + connect/3, close/1, + deactivate/1, reactivate/1, command/2, command/3, command_async/3]). @@ -52,6 +53,7 @@ { connect_loop_pid = none, connection_pid = none, + controlling_process :: pid(), last_status = none, waiting = q_new() :: command_queue(), @@ -122,20 +124,38 @@ %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec start_link(host(), inet:port_number(), [opt()]) -> {ok, server_ref()} | {error, term()}. +-spec start_link(host(), inet:port_number(), [opt()], pid()) -> + {ok, server_ref()} | {error, term()}. %% %% Start the client process. Create a connection towards the provided -%% address. +%% address. Typically called by a supervisor. Use connect/3 instead. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - start_link(Host, Port, Opts) -> - gen_server:start_link(?MODULE, [Host, Port, Opts], []). + start_link(Host, Port, Opts, self()). +start_link(Host, Port, Opts, User) -> + gen_server:start_link(?MODULE, {Host, Port, Opts, User}, []). + +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +-spec connect(host(), inet:port_number(), [opt()]) -> + {ok, server_ref()} | {error, term()}. +%% +%% Create a standalone connection supervised by the ered application. +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +connect(Host, Port, Opts) -> + try ered_client_sup:start_client(ered_standalone_sup, Host, Port, Opts, self()) of + {ok, ClientPid} -> + {ok, ClientPid} + catch exit:{noproc, _} -> + {error, ered_not_started} + end. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec stop(server_ref()) -> ok. +-spec close(server_ref()) -> ok. %% %% Stop the client process. Cancel all commands in queue. Take down %% connection. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -stop(ServerRef) -> +close(ServerRef) -> gen_server:stop(ServerRef). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -187,7 +207,7 @@ command_async(ServerRef, Command, CallbackFun) -> %%%=================================================================== %%% gen_server callbacks %%%=================================================================== -init([Host, Port, OptsList]) -> +init({Host, Port, OptsList, User}) -> Opts = lists:foldl( fun({connection_opts, Val}, S) -> S#opts{connection_opts = Val}; ({max_waiting, Val}, S) -> S#opts{max_waiting = Val}; @@ -203,11 +223,12 @@ init([Host, Port, OptsList]) -> end, #opts{host = Host, port = Port}, OptsList), - + monitor(process, User), process_flag(trap_exit, true), Pid = self(), ConnectPid = spawn_link(fun() -> connect(Pid, Opts) end), {ok, start_node_down_timer(#st{opts = Opts, + controlling_process = User, connect_loop_pid = ConnectPid})}. handle_call({command, Command}, From, State) -> @@ -277,8 +298,14 @@ handle_info({timeout, TimerRef, node_down}, State) when TimerRef == State#st.nod handle_info({timeout, _TimerRef, _Msg}, State) -> {noreply, State}; +handle_info({'DOWN', _Mon, process, Pid, ExitReason}, State = #st{controlling_process = Pid}) -> + {stop, ExitReason, State}; + handle_info({'EXIT', _From, Reason}, State) -> - {stop, Reason, State}. + {stop, Reason, State}; + +handle_info(_Ignore, State) -> + {noreply, State}. terminate(Reason, State) -> exit(State#st.connect_loop_pid, kill), diff --git a/src/ered_client_sup.erl b/src/ered_client_sup.erl index 7968169..c08dbae 100644 --- a/src/ered_client_sup.erl +++ b/src/ered_client_sup.erl @@ -2,7 +2,7 @@ %% This is the supervisor for the ered_client processes of a custer client instance. --export([start_link/0, start_client/4, stop_client/2]). +-export([start_link/0, start_link/1, start_client/5, stop_client/2]). -behaviour(supervisor). -export([init/1]). @@ -10,11 +10,16 @@ -type host() :: inet:socket_address() | inet:hostname(). start_link() -> + %% Used for the clients owned by a cluster instance. supervisor:start_link(?MODULE, []). --spec start_client(supervisor:sup_ref(), host(), inet:port_number(), [ered_client:opt()]) -> any(). -start_client(Sup, Host, Port, ClientOpts) -> - supervisor:start_child(Sup, [Host, Port, ClientOpts]). +start_link(Name) -> + %% Used for standalone client connections. + supervisor:start_link(Name, ?MODULE, []). + +-spec start_client(supervisor:sup_ref(), host(), inet:port_number(), [ered_client:opt()], pid()) -> any(). +start_client(Sup, Host, Port, ClientOpts, User) -> + supervisor:start_child(Sup, [Host, Port, ClientOpts, User]). -spec stop_client(supervisor:sup_ref(), pid()) -> ok. stop_client(Sup, Pid) -> diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index 63d0bfa..175ef5a 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -8,7 +8,7 @@ -behaviour(gen_server). %% API --export([start_link/2, +-export([start_link/4, stop/1, command/4, command_async/4, command_all/2, command_all/3, @@ -32,8 +32,6 @@ -record(st, { cluster_state = nok :: ok | nok, - %% Supervisor for our client processes - client_sup :: pid(), %% Structures for fast slot-to-pid lookup. slots :: binary(), % The byte at offset N is an @@ -67,6 +65,8 @@ slot_map_version = 0, slot_timer_ref = none, + client_sup :: pid(), + controlling_process :: pid(), info_pid = [] :: [pid()], try_again_delay = 200 :: non_neg_integer(), redirect_attempts = 10 :: non_neg_integer(), @@ -116,13 +116,13 @@ %%%=================================================================== %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --spec start_link([addr()], [opt()]) -> {ok, server_ref()} | {error, term()}. +-spec start_link([addr()], [opt()], pid(), pid()) -> {ok, server_ref()} | {error, term()}. %% %% Start the cluster process. Clients will be set up to the provided %% addresses and cluster information will be retrieved. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -start_link(Addrs, Opts) -> - gen_server:start_link(?MODULE, [Addrs, Opts], []). +start_link(Addrs, Opts, ClientSup, User) -> + gen_server:start_link(?MODULE, {Addrs, Opts, ClientSup, User}, []). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec stop(server_ref()) -> ok. @@ -219,8 +219,7 @@ connect_node(ServerRef, Addr) -> %%% gen_server callbacks %%%=================================================================== -init([Addrs, Opts]) -> - {ok, ClientSup} = ered_client_sup:start_link(), +init({Addrs, Opts, ClientSup, User}) -> State = lists:foldl( fun ({info_pid, Val}, S) -> S#st{info_pid = Val}; ({try_again_delay, Val}, S) -> S#st{try_again_delay = Val}; @@ -231,9 +230,11 @@ init([Addrs, Opts]) -> ({close_wait, Val}, S) -> S#st{close_wait = Val}; (Other, _) -> error({badarg, Other}) end, - #st{client_sup = ClientSup, - slots = create_lookup_table(0, [], <<>>)}, + #st{controlling_process = User, + client_sup = ClientSup, + slots = create_lookup_table(0, [], <<>>)}, Opts), + monitor(process, User), {ok, start_clients(Addrs, State)}. handle_call({command, Command, Key}, From, State) -> @@ -475,6 +476,9 @@ handle_info({{'DOWN', Addr}, _Mon, process, Pid, ExitReason}, State) queue_full = sets:del_element(Addr, State#st.queue_full), reconnecting = sets:del_element(Addr, State#st.reconnecting)}}; +handle_info({'DOWN', _Mon, process, Pid, ExitReason}, State = #st{controlling_process = Pid}) -> + {stop, ExitReason, State}; + handle_info(_Ignore, State) -> {noreply, State}. @@ -769,7 +773,7 @@ check_replica_count(State) -> start_client(Addr, State) -> {Host, Port} = Addr, Opts = [{info_pid, self()}, {use_cluster_id, true}] ++ State#st.client_opts, - {ok, Pid} = ered_client_sup:start_client(State#st.client_sup, Host, Port, Opts), + {ok, Pid} = ered_client_sup:start_client(State#st.client_sup, Host, Port, Opts, self()), _ = monitor(process, Pid, [{tag, {'DOWN', Addr}}]), Pid. diff --git a/src/ered_cluster_sup.erl b/src/ered_cluster_sup.erl new file mode 100644 index 0000000..4061b56 --- /dev/null +++ b/src/ered_cluster_sup.erl @@ -0,0 +1,24 @@ +-module(ered_cluster_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start_child/0]). + +%% Supervisor callback +-export([init/1]). + +%% API +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_child() -> + supervisor:start_child(?MODULE, []). + +init([]) -> + SupFlags = #{strategy => simple_one_for_one}, + ChildSpecs = [#{id => undefined, + restart => temporary, + type => supervisor, + start => {ered_dyn_cluster_sup, start_link, []}}], + {ok, {SupFlags, ChildSpecs}}. diff --git a/src/ered_dyn_cluster_sup.erl b/src/ered_dyn_cluster_sup.erl new file mode 100644 index 0000000..d8d5e97 --- /dev/null +++ b/src/ered_dyn_cluster_sup.erl @@ -0,0 +1,38 @@ +-module(ered_dyn_cluster_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0, start_client_sup/1, start_cluster_mgr/5]). + +%% Supervisor callback +-export([init/1]). + +%% API +start_link() -> + supervisor:start_link(?MODULE, []). + +start_client_sup(Sup) -> + ChildSpec = #{id => ered_client_sup, + restart => temporary, + type => supervisor, + start => {ered_client_sup, start_link, []}, + modules => [ered_client_sup]}, + supervisor:start_child(Sup, ChildSpec). + +start_cluster_mgr(Sup, Addrs, Opts, ClientSup, User) -> + ChildSpec = #{id => ered_cluster, + restart => temporary, + type => worker, + significant => true, + start => {ered_cluster, start_link, [Addrs, Opts, ClientSup, User]}, + modules => [ered_cluster]}, + supervisor:start_child(Sup, ChildSpec). + +%% Supervisor callback +init([]) -> + SupFlags = #{strategy => one_for_all, + intensity => 0, + period => 3600, + auto_shutdown => any_significant}, + {ok, {SupFlags, []}}. diff --git a/src/ered_sup.erl b/src/ered_sup.erl new file mode 100644 index 0000000..b56ddb8 --- /dev/null +++ b/src/ered_sup.erl @@ -0,0 +1,39 @@ +-module(ered_sup). + +%% This is the top-level supervisor of the ered application. The tree has the +%% following structure. Triple lines indicate multiple children. +%% +%% ered_app +%% | +%% ered_sup +%% / \ +%% ered_client_sup ered_cluster_sup +%% /// \\\ +%% ered_client ered_dyn_cluster_sup +%% / \ +%% ered_client_sup ered_cluster +%% /// (significant) +%% ered_client + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callback +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_one}, + ChildSpecs = [#{id => ered_client_sup, + restart => permanent, + type => supervisor, + start => {ered_client_sup, start_link, [{local, ered_standalone_sup}]}}, + #{id => ered_cluster_sup, + restart => permanent, + type => supervisor, + start => {ered_cluster_sup, start_link, []}}], + {ok, {SupFlags, ChildSpecs}}. diff --git a/test/ered_SUITE.erl b/test/ered_SUITE.erl index 9016a42..2eb8545 100644 --- a/test/ered_SUITE.erl +++ b/test/ered_SUITE.erl @@ -66,10 +66,11 @@ init_per_suite(_Config) -> || P <- ?PORTS]), timer:sleep(2000), + {ok, _} = application:ensure_all_started(ered, temporary), lists:foreach(fun(Port) -> - {ok,Pid} = ered_client:start_link("127.0.0.1", Port, []), + {ok,Pid} = ered_client:connect("127.0.0.1", Port, []), {ok, <<"PONG">>} = ered_client:command(Pid, [<<"ping">>]), - ered_client:stop(Pid) + ered_client:close(Pid) end, ?PORTS), create_cluster(), @@ -97,12 +98,12 @@ create_cluster() -> reset_cluster() -> Pids = [begin - {ok, Pid} = ered_client:start_link("127.0.0.1", Port, []), + {ok, Pid} = ered_client:connect("127.0.0.1", Port, []), Pid end || Port <- ?PORTS], [{ok, <<"OK">>} = ered_client:command(Pid, [<<"CLUSTER">>, <<"RESET">>]) || Pid <- Pids], [{ok, []} = ered_client:command(Pid, [<<"CLUSTER">>, <<"SLOTS">>]) || Pid <- Pids], - lists:foreach(fun ered_client:stop/1, Pids). + lists:foreach(fun ered_client:close/1, Pids). %% Wait until cluster is consistent, i.e all nodes have the same single view %% of the slot map and all cluster nodes are included in the slot map. @@ -124,9 +125,9 @@ wait_for_consistent_cluster(Ports) -> check_consistent_cluster(Ports) -> SlotMaps = [fun(Port) -> - {ok, Pid} = ered_client:start_link("127.0.0.1", Port, []), + {ok, Pid} = ered_client:connect("127.0.0.1", Port, []), {ok, SlotMap} = ered_client:command(Pid, [<<"CLUSTER">>, <<"SLOTS">>]), - ered_client:stop(Pid), + ered_client:close(Pid), SlotMap end(P) || P <- Ports], Consistent = case lists:usort(SlotMaps) of @@ -478,7 +479,7 @@ t_init_timeout(_) -> } ], ct:pal("~p\n", [os:cmd("redis-cli -p 30001 CLIENT PAUSE 10000")]), - {ok, _P} = ered:start_link([{localhost, 30001}], [{info_pid, [self()]}] ++ Opts), + {ok, _P} = ered:connect_cluster([{localhost, 30001}], [{info_pid, [self()]}] ++ Opts), ?MSG(#{msg_type := socket_closed, reason := {recv_exit, timeout}}, 3500), ?MSG(#{msg_type := node_down_timeout, addr := {localhost, 30001}}, 2500), @@ -515,8 +516,8 @@ t_empty_slotmap(_) -> t_empty_initial_slotmap(_) -> reset_cluster(), - {ok, R} = ered:start_link([{"127.0.0.1", 30001}], - [{info_pid, [self()]}, {min_replicas, 1}]), + {ok, R} = ered:connect_cluster([{"127.0.0.1", 30001}], + [{info_pid, [self()]}, {min_replicas, 1}]), ?MSG(#{msg_type := cluster_slots_error_response, response := empty, addr := {"127.0.0.1", 30001}}), @@ -789,7 +790,7 @@ t_new_cluster_master(_) -> %% Verify that the cluster is still ok {ok, Data} = ered:command(R, [<<"GET">>, Key], Key), - ered:stop(R), + ered:close(R), no_more_msgs(). t_ask_redirect(_) -> @@ -926,7 +927,7 @@ start_cluster(Opts) -> InitialNodes = [{"127.0.0.1", Port} || Port <- [Port1, Port2]], wait_for_consistent_cluster(), - {ok, P} = ered:start_link(InitialNodes, [{info_pid, [self()]}] ++ Opts), + {ok, P} = ered:connect_cluster(InitialNodes, [{info_pid, [self()]}] ++ Opts), ConnectedInit = [#{msg_type := connected} = msg(addr, {"127.0.0.1", Port}) || Port <- [Port1, Port2]], From 81858101d0eb726f1cc4a8a00865944da09fbbf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 19 Mar 2024 19:33:23 +0100 Subject: [PATCH 3/6] Add info message 'cluster_stopped' sent in ered_cluster:terminate/1 The process traps exits so terminate/1 will run even if the process crashes. --- README.md | 3 +++ src/ered_cluster.erl | 12 ++++++++---- src/ered_info_msg.erl | 16 ++++++++++++++-- test/ered_SUITE.erl | 9 +++++++++ 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 56ab1cb..3a26e00 100644 --- a/README.md +++ b/README.md @@ -318,6 +318,9 @@ Messages about the cluster as a whole: mapping. The `response` is either an error or the atom `empty` if the CLUSTER SLOTS returned an empty list, which is treated like an error. +* `#{msg_type := cluster_stopped, reason := any()}` when the ered cluster + instance is closing down. + Messages about the connection to a specific node are in the following form: ```Erlang diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index 175ef5a..199440d 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -235,6 +235,7 @@ init({Addrs, Opts, ClientSup, User}) -> slots = create_lookup_table(0, [], <<>>)}, Opts), monitor(process, User), + process_flag(trap_exit, true), {ok, start_clients(Addrs, State)}. handle_call({command, Command, Key}, From, State) -> @@ -479,13 +480,16 @@ handle_info({{'DOWN', Addr}, _Mon, process, Pid, ExitReason}, State) handle_info({'DOWN', _Mon, process, Pid, ExitReason}, State = #st{controlling_process = Pid}) -> {stop, ExitReason, State}; +handle_info({'EXIT', _From, Reason}, State) -> + {stop, Reason, State}; + handle_info(_Ignore, State) -> {noreply, State}. -terminate(_Reason, State) -> - [ered_client_sup:stop_client(State#st.client_sup, Pid) - || Pid <- maps:values(State#st.nodes)], - ok. +terminate(Reason, State) -> + catch [ered_client_sup:stop_client(State#st.client_sup, Pid) + || Pid <- maps:values(State#st.nodes)], + ered_info_msg:cluster_stopped(State#st.info_pid, Reason). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/ered_info_msg.erl b/src/ered_info_msg.erl index b0cb0fe..f5d4988 100644 --- a/src/ered_info_msg.erl +++ b/src/ered_info_msg.erl @@ -6,7 +6,8 @@ slot_map_updated/4, cluster_slots_error_response/3, cluster_ok/1, - cluster_nok/2 + cluster_nok/2, + cluster_stopped/2 ]). @@ -54,7 +55,10 @@ #{msg_type := cluster_ok} | #{msg_type := cluster_not_ok, - reason := master_down | master_queue_full | pending | not_all_slots_covered | too_few_replicas}. + reason := master_down | master_queue_full | pending | not_all_slots_covered | too_few_replicas} | + + #{msg_type := cluster_stopped, + reason := any()}. -type addr() :: ered_client:addr(). @@ -136,6 +140,14 @@ cluster_nok(Reason, Pids) -> reason => Reason}, Pids). +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +-spec cluster_stopped([pid()], any()) -> ok. +%% +%% The cluster instance is terminating. +%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +cluster_stopped(Pids, Reason) -> + send_info(#{msg_type => cluster_stopped, reason => Reason}, Pids). + %%%=================================================================== %%% Internal functions %%%=================================================================== diff --git a/test/ered_SUITE.erl b/test/ered_SUITE.erl index 2eb8545..2c34bb7 100644 --- a/test/ered_SUITE.erl +++ b/test/ered_SUITE.erl @@ -9,6 +9,7 @@ all() -> t_command_all, t_command_client, t_command_pipeline, + t_cluster_crash, t_client_crash, t_client_killed, t_scan_delete_keys, @@ -202,6 +203,13 @@ t_command_pipeline(_) -> no_more_msgs(). +t_cluster_crash(_) -> + R = start_cluster(), + exit(R, crash), + ?MSG(#{msg_type := cluster_stopped, reason := crash}), + no_more_msgs(). + + t_client_crash(_) -> R = start_cluster(), Port = get_master_from_key(R, <<"k">>), @@ -791,6 +799,7 @@ t_new_cluster_master(_) -> %% Verify that the cluster is still ok {ok, Data} = ered:command(R, [<<"GET">>, Key], Key), ered:close(R), + ?MSG(#{msg_type := cluster_stopped, reason := normal}), no_more_msgs(). t_ask_redirect(_) -> From 7a823a942c882efa038d933e125e3d477df821e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 19 Mar 2024 21:40:20 +0100 Subject: [PATCH 4/6] Unify info messages from ered_client and ered_cluster Make the messages sent by a standalone ered_client match those sent by a cluster client for each client, i.e. a map with the keys msg_type, reason, client_id, addr and the optional fields cluster_id and master. --- src/ered_client.erl | 62 ++++++++++++++++++++++++++++---------- src/ered_cluster.erl | 16 ++++++---- src/ered_info_msg.erl | 44 ++------------------------- test/ered_client_tests.erl | 33 ++++++++++---------- 4 files changed, 76 insertions(+), 79 deletions(-) diff --git a/src/ered_client.erl b/src/ered_client.erl index f956dc2..6129b5d 100644 --- a/src/ered_client.erl +++ b/src/ered_client.erl @@ -79,14 +79,29 @@ -type host() :: ered_connection:host(). -type addr() :: {host(), inet:port_number()}. --type node_id() :: binary() | undefined. --type client_info() :: {pid(), addr(), node_id()}. -type status() :: connection_up | {connection_down, down_reason()} | queue_ok | queue_full. -type reason() :: term(). % ssl reasons are of type any so no point being more specific -type down_reason() :: node_down_timeout | node_deactivated | {client_stopped | connect_error | init_error | socket_closed, reason()}. --type info_msg() :: {connection_status, client_info(), status()}. +-type info_msg(MsgType, Reason) :: + #{msg_type := MsgType, + reason := Reason, + master => boolean(), % Optional. Added by ered_cluster. + addr := addr(), + client_id := pid(), + cluster_id => binary() % Optional. Used by ered_cluster. + }. +-type info_msg() :: + info_msg(connected, none) | + info_msg(socket_closed, any()) | + info_msg(connect_error, any()) | + info_msg(init_error, any()) | + info_msg(node_down_timeout, none) | + info_msg(node_deactivated, none) | + info_msg(queue_ok, none) | + info_msg(queue_full, none) | + info_msg(client_stopped, any()). -type server_ref() :: pid(). -type opt() :: @@ -413,13 +428,11 @@ reply_command({command, _, Fun}, Reply) -> get_command_payload({command, Command, _Fun}) -> Command. +-spec report_connection_status(status(), #st{}) -> #st{}. report_connection_status(Status, State = #st{last_status = Status}) -> State; report_connection_status(Status, State) -> - #opts{host = Host, port = Port} = State#st.opts, - ClusterId = State#st.cluster_id, - Msg = {connection_status, {self(), {Host, Port}, ClusterId}, Status}, - send_info(Msg, State), + send_info(Status, State), case Status of %% Skip saving the last_status in this to avoid an extra connect_error event. %% The usual case is that there is a connect_error and then node_down and then @@ -431,15 +444,32 @@ report_connection_status(Status, State) -> end. --spec send_info(info_msg(), #st{}) -> ok. -send_info(Msg, State) -> - Pid = State#st.opts#opts.info_pid, - case Pid of - none -> - ok; - _ -> - Pid ! Msg - end, +-spec send_info(status(), #st{}) -> ok. +send_info(Status, #st{opts = #opts{info_pid = Pid, + host = Host, + port = Port}, + cluster_id = ClusterId}) when is_pid(Pid) -> + {MsgType, Reason} = + case Status of + connection_up -> {connected, none}; + {connection_down, R} when is_atom(R) -> {R, none}; + {connection_down, R} -> R; + queue_full -> {queue_full, none}; + queue_ok -> {queue_ok, none} + end, + Msg0 = #{msg_type => MsgType, + reason => Reason, + addr => {Host, Port}, + client_id => self()}, + Msg = case ClusterId of + undefined -> + Msg0; + Id when is_binary(Id) -> + Msg0#{cluster_id => ClusterId} + end, + Pid ! Msg, + ok; +send_info(_Msg, _State) -> ok. diff --git a/src/ered_cluster.erl b/src/ered_cluster.erl index 199440d..424a48e 100644 --- a/src/ered_cluster.erl +++ b/src/ered_cluster.erl @@ -298,12 +298,12 @@ handle_info({command_try_again, Command, Slot, From, AttemptsLeft}, State) -> State1 = send_command_to_slot(Command, Slot, From, State, AttemptsLeft), {noreply, State1}; -handle_info(Msg = {connection_status, {_Pid, Addr, _Id}, Status}, State) -> +handle_info(Msg = #{msg_type := MsgType, client_id := _Pid, addr := Addr}, State) -> IsMaster = sets:is_element(Addr, State#st.masters), ered_info_msg:connection_status(Msg, IsMaster, State#st.info_pid), - State1 = case Status of - {connection_down, {Reason, _}} when Reason =:= socket_closed; - Reason =:= connect_error -> + State1 = case MsgType of + _ when MsgType =:= socket_closed; + MsgType =:= connect_error -> %% Avoid triggering the alarm for a socket closed by the %% peer. The cluster will be marked down on the node down %% timeout. @@ -321,11 +321,15 @@ handle_info(Msg = {connection_status, {_Pid, Addr, _Id}, Status}, State) -> false -> NewState end; - {connection_down,_} -> + _ when MsgType =:= node_down_timeout; + MsgType =:= node_deactivated; + MsgType =:= init_error; + MsgType =:= client_stopped -> + %% Client is down. State#st{up = sets:del_element(Addr, State#st.up), pending = sets:del_element(Addr, State#st.pending), reconnecting = sets:del_element(Addr, State#st.reconnecting)}; - connection_up -> + connected -> State#st{up = sets:add_element(Addr, State#st.up), pending = sets:del_element(Addr, State#st.pending), reconnecting = sets:del_element(Addr, State#st.reconnecting)}; diff --git a/src/ered_info_msg.erl b/src/ered_info_msg.erl index f5d4988..8596b88 100644 --- a/src/ered_info_msg.erl +++ b/src/ered_info_msg.erl @@ -17,31 +17,8 @@ %%% Definitions %%%=================================================================== --type node_info(MsgType, Reason) :: - #{msg_type := MsgType, - reason := Reason, - master := boolean(), - addr := addr(), - client_id := pid(), - node_id := string() - }. - -type info_msg() :: - node_info(connected, none) | - - node_info(socket_closed, any()) | - - node_info(connect_error, any()) | - - node_info(init_error, any()) | - - node_info(node_down_timeout, none) | - - node_info(queue_ok, none) | - - node_info(queue_full, none) | - - node_info(client_stopped, any()) | + ered_client:info_msg() | #{msg_type := slot_map_updated, slot_map := ClusterSlotsReply :: any(), @@ -73,23 +50,8 @@ %% Client connection goes up or down. %% Client queue full or queue recovered to OK level. %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -connection_status(ClientInfo, IsMaster, Pids) -> - {connection_status, {Pid, Addr, Id} , Status} = ClientInfo, - {MsgType, Reason} = - case Status of - connection_up -> {connected, none}; - {connection_down, R} when is_atom(R) -> {R, none}; - {connection_down, R} -> R; - queue_full -> {queue_full, none}; - queue_ok -> {queue_ok, none} - end, - send_info(#{msg_type => MsgType, - reason => Reason, - master => IsMaster, - addr => Addr, - client_id => Pid, - cluster_id => Id}, - Pids). +connection_status(Msg, IsMaster, Pids) -> + send_info(Msg#{master => IsMaster}, Pids). %% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -spec slot_map_updated(ered_lib:slot_map(), non_neg_integer(), addr(), [pid()]) -> ok. diff --git a/test/ered_client_tests.erl b/test/ered_client_tests.erl index 6bad993..516c0cd 100644 --- a/test/ered_client_tests.erl +++ b/test/ered_client_tests.erl @@ -66,7 +66,7 @@ fail_parse_t() -> end), expect_connection_up(Client), Reason = {recv_exit, {parse_error,{invalid_data,<<"&pong">>}}}, - receive {connection_status, _ClientInfo, {connection_down, {socket_closed, Reason}}} -> ok end, + receive #{msg_type := socket_closed, reason := Reason} -> ok end, expect_connection_up(Client), {ok, <<"pong">>} = get_msg(). @@ -84,7 +84,7 @@ server_close_socket_t() -> end), Client = start_client(Port), expect_connection_up(Client), - receive {connection_status, _ClientInfo, {connection_down, {socket_closed, {recv_exit, closed}}}} -> ok end, + receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, expect_connection_up(Client). @@ -131,9 +131,9 @@ server_buffer_full_t() -> Pid = self(), [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)], - receive {connection_status, _, queue_full} -> ok end, + receive #{msg_type := queue_full} -> ok end, {6, {error, queue_overflow}} = get_msg(), - receive {connection_status, _, queue_ok} -> ok end, + receive #{msg_type := queue_ok} -> ok end, [{N, {ok, <<"pong">>}} = get_msg()|| N <- [1,2,3,4,5,7,8,9,10,11]], no_more_msgs(). @@ -168,14 +168,14 @@ server_buffer_full_reconnect_t() -> Pid = self(), %% 5 messages will be pending, 5 messages in queue [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)], - receive {connection_status, _ClientInfo1, queue_full} -> ok end, + receive #{msg_type := queue_full} -> ok end, %% 1 message over the limit, first one in queue gets kicked out {6, {error, queue_overflow}} = get_msg(), - receive {connection_status, _ClientInfo2, {connection_down, {socket_closed, {recv_exit, closed}}}} -> ok end, + receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, %% when connection goes down the pending messages will be put in the queue and the queue %% will overflow kicking out the oldest first [{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]], - receive {connection_status, _ClientInfo3, queue_ok} -> ok end, + receive #{msg_type := queue_ok} -> ok end, expect_connection_up(Client), [{N, {ok, <<"pong">>}} = get_msg() || N <- [7,8,9,10,11]], no_more_msgs(). @@ -199,13 +199,13 @@ server_buffer_full_node_goes_down_t() -> Pid = self(), [ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {N, Reply} end) || N <- lists:seq(1,11)], - receive {connection_status, _ClientInfo1, queue_full} -> ok end, + receive #{msg_type := queue_full} -> ok end, {6, {error, queue_overflow}} = get_msg(), - receive {connection_status, _ClientInfo2, {connection_down, {socket_closed, {recv_exit, closed}}}} -> ok end, + receive #{msg_type := socket_closed, reason := {recv_exit, closed}} -> ok end, [{N, {error, queue_overflow}} = get_msg() || N <- [1,2,3,4,5]], - receive {connection_status, _ClientInfo3, queue_ok} -> ok end, - receive {connection_status, _ClientInfo4, {connection_down, {connect_error,econnrefused}}} -> ok end, - receive {connection_status, _ClientInfo5, {connection_down, node_down_timeout}} -> ok end, + receive #{msg_type := queue_ok} -> ok end, + receive #{msg_type := connect_error, reason := econnrefused} -> ok end, + receive #{msg_type := node_down_timeout} -> ok end, [{N, {error, node_down}} = get_msg() || N <- [7,8,9,10,11]], %% additional commands should get a node down @@ -241,7 +241,7 @@ send_timeout_t() -> Pid = self(), ered_client:command_async(Client, [<<"ping">>], fun(Reply) -> Pid ! {reply, Reply} end), %% this should come after max 1000ms - receive {connection_status, _ClientInfo, {connection_down, {socket_closed, {recv_exit, timeout}}}} -> ok after 2000 -> timeout_error() end, + receive #{msg_type := socket_closed, reason := {recv_exit, timeout}} -> ok after 2000 -> timeout_error() end, expect_connection_up(Client), {reply, {ok, <<"pong">>}} = get_msg(), no_more_msgs(). @@ -381,16 +381,17 @@ expect_connection_up(Client) -> expect_connection_up(Client, infinity). expect_connection_up(Client, Timeout) -> - {connection_status, {Client, _Addr, _undefined}, connection_up} = + #{msg_type := connected, client_id := Client} = get_msg(Timeout). expect_connection_down(Client) -> expect_connection_down(Client, infinity). expect_connection_down(Client, Timeout) -> - {connection_status, {Client, _Addr, _undefined}, {connection_down, Reason}} = + #{msg_type := MsgType, reason := Reason, client_id := Client} = get_msg(Timeout), - Reason. + if Reason =/= none -> ok end, + {MsgType, Reason}. get_msg() -> get_msg(infinity). From bb5ce7009ce95ed0554d4e053df33bd91bab57cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 28 May 2024 10:30:24 +0200 Subject: [PATCH 5/6] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- README.md | 4 ++-- test/ered_SUITE.erl | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 3a26e00..e82213b 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Usage by example 4> ered:command_async(Pid, [<<"GET">>, <<"mykey">>], <<"mykey">>, fun(Reply) -> io:format("Reply: ~p~n", [Reply]) end). ok Reply: {ok,<<"42">>} -5> ered:stop(Pid). +5> ered:close(Pid). ok ``` @@ -54,7 +54,7 @@ For options, see [Options](#options) below. ### `close/1` ```Erlang -stop(server_ref()) -> ok. +close(server_ref()) -> ok. ``` Stop the main process. This will also stop the cluster handling diff --git a/test/ered_SUITE.erl b/test/ered_SUITE.erl index 2c34bb7..c5942dd 100644 --- a/test/ered_SUITE.erl +++ b/test/ered_SUITE.erl @@ -234,7 +234,7 @@ t_client_crash(_) -> ?MSG(#{addr := {"127.0.0.1", Port}, master := true, msg_type := client_stopped}), ?MSG({'DOWN', _Mon, process, Pid0, crash}), ?MSG(#{msg_type := cluster_not_ok, reason := master_down}), - %% Command immediately when the client process is dead. The cluster process + %% Send command when the client process is dead. The cluster process %% starts a new client synchronously, so the command succeeds. There's a %% possible race condition here though. The cluster process may receive the %% command before it receives the 'DOWN' message from the dead client and @@ -285,7 +285,7 @@ t_client_killed(_) -> %% We don't get 'cluster_not_ok' here, because ered_cluster relies on a %% message from ered_client. Using a monitor instead would be more reliable. - %% Command immediately when the client process is dead. The cluster process + %% Send command when the client process is dead. The cluster process %% starts a new client synchronously, so the command succeeds. There's a %% possible race condition here though. The cluster process may receive the %% command before it receives the 'DOWN' message from the dead client and From 4374c5e2684dba9cf462a67b4fae4a8415099d33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 28 May 2024 12:34:55 +0200 Subject: [PATCH 6/6] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist Co-authored-by: Björn Svensson --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e82213b..96d0759 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,7 @@ used. Options ------- -The following options can be passed to `connect/2`: +The following options can be passed to `connect_cluster/2`: * `{try_again_delay, non_neg_integer()}` @@ -189,7 +189,7 @@ The following options can be passed to `connect/2`: ### Client options -Options passed to `connect/2` as the options `{client_opts, [...]}`. +Options passed to `connect_cluster/2` as the options `{client_opts, [...]}`. * `{connection_opts, [ered_connection:opt()]}`