-
Notifications
You must be signed in to change notification settings - Fork 33
Support Snippets
- For general Riak/Erlang snippets, see:: https://github.com/basho/internal_wiki/wiki/Erlang-Snippets#repl
rp(riak_core_util:rpc_every_member_ann(riak_core_cluster_mgr, get_leader, [], 5000)).
curl -q http://localhost:10018/riak-repl/stats
curl -q http://localhost:8091/riak-repl/stats | python -mjson.tool
rp(rpc:multicall(erlang, apply, [fun() -> {node(), proplists:get_value(bytes, riak_repl2_rtq:status())} end, []])).
rp(sys:get_status(riak_repl2_leader_gs)).
riak_core_cluster_mgr:set_leader(riak_repl2_leader:leader_node(), undefined).
*or*
riak_core_cluster_mgr:set_leader('[email protected]', undefined).
To disable updating list of remote IPs
rpc:multicall(erlang, apply, [fun() -> riak_core_cluster_mgr:register_save_cluster_members_fun(fun(_ClusterName, _Members) -> ok end) end, []]).
To re-enable
rpc:multicall(erlang, apply, [fun() -> riak_core_cluster_mgr:register_save_cluster_members_fun(fun(ClusterName, Members) -> riak_core_ring_manager:ring_trans(fun riak_repl_ring:set_clusterIpAddrs/2, {ClusterName, Members}) end) end, []]).
(possibly forcing a reelection.)
[exit(P, kill) || P <- element(1, rpc:multicall(riak_repl2_leader, helper_pid, []))].
%%% V2 REPLICATION
Kill = fun() -> exit(whereis(riak_repl_leader_gs), kill) end,
riak_core_util:rpc_every_member_ann(erlang, apply, [Kill, []]).
%% Note, the registered name is different than the module name
%% gs = "gen_server"
rp(sys:get_status(riak_core_cluster_manager)).
rp(rpc:call(riak_repl2_leader:leader_node(), sys, get_status, [riak_core_connection_manager])).
%% One node
rp(sys:get_status(riak_core_connection_manager)).
%% All nodes
rp(rpc:multicall(sys, get_status, [riak_core_connection_manager])).
rpc:multicall(supervisor, which_children, [riak_repl2_rtsink_conn_sup]).
riak_core_ring_manager:ring_trans(fun(Ring, Name) ->
riak_repl_ring:set_clusterIpAddrs(Ring, {Name, [{IP, Port} || {IP, Port} <- riak_repl_ring:get_clusterIpAddrs(Ring, Name), is_list(IP), is_integer(Port)]}) end, "sink_clustername").
riak_kv_entropy_manager:disable().
riak_kv_entropy_manager:cancel_exchanges().
rp(rpc:multicall(erlang, apply, [fun() -> {node(), element(5, element(2, hd(element(2, lists:nth(3, lists:nth(5, element(4, sys:get_status(riak_core_cluster_manager))))))))} end, []])).
io:format("~s~n", [element(2,process_info(whereis(riak_repl2_rtq), backtrace))]).
rp(fun() ->
{RTSs, Failed} = riak_core_util:rpc_every_member_ann(riak_repl2_rt, status, [], 10000),
Active = lists:flatten([[{Node, list_to_binary(proplists:get_value(source, Sink))} || Sink <- proplists:get_value(sinks, RTS)] || {Node, RTS} <- RTSs]),
Sources = lists:usort([Source || {_N, Source} <- Active]),
Acc0 = orddict:from_list([{{N, Source}, 0} || {N, _} <- RTSs, Source <- Sources]),
lists:foldl(fun(X, Acc) -> orddict:update_counter(X, 1, Acc) end, Acc0, Active)
end()).
c/o @lordnull
rr(riak_repl2_rtsource_conn),
Enabled = riak_repl2_rtsource_conn_sup:enabled(),
lists:map(fun({Remote, Pid}) ->
io:format("==== ~p @ ~p ====~n", [Remote, Pid]),
RawStatus = sys:get_status(Pid, 240000),
rp(RawStatus),
{status, _Pid, _Module, ProcessData} = RawStatus,
[_PDict, _, _, _, [_, _, OtherData]] = ProcessData,
{data, [{"State", State}]} = OtherData,
HelperPid = State#state.helper_pid,
case HelperPid of
undefined ->
io:format("~p at ~p does not have a helperPid~n", [Remote, Pid]),
{Remote, Pid, {error, nohelper}};
_ ->
try sys:get_status(HelperPid, 240000) of
HelperStatus ->
rp(HelperStatus),
{Remote, Pid, HelperStatus}
catch
W:Y ->
io:format("could not get status for ~p due to ~p:~p~n", [HelperPid, W,Y]),
{Remote, Pid, {{W,Y}, HelperPid}}
end
end
end, Enabled).
Prior to fix for #588, if you had a custom bucket property set for a disabled replication system it would not be removed by the fixup. This script clears out the repl postcommit hooks from any customer buckets.
riak_core_ring_manager:ring_trans(
fun(Ring, _) ->
Buckets = riak_core_ring:get_buckets(Ring),
ClearReplPostCommitFun =
fun(Bucket, AccRing) ->
BProps = riak_core_bucket:get_bucket(Bucket, AccRing),
PostCommit =
case proplists:get_value(postcommit, BProps, []) of
{struct, _}=X ->
[X];
X ->
X
end,
CleanPostCommit = PostCommit --
[{struct,[{<<"mod">>,<<"riak_repl_leader">>},
{<<"fun">>,<<"postcommit">>}]},
{struct,[{<<"mod">>,<<"riak_repl2_rt">>},
{<<"fun">>,<<"postcommit">>}]}],
NewBProps = lists:keystore(postcommit, 1, BProps,
{postcommit, CleanPostCommit}),
riak_core_ring:update_meta({bucket, Bucket}, NewBProps, AccRing)
end,
NewRing0 = lists:foldl(ClearReplPostCommitFun, Ring, Buckets),
NewRing = riak_core_ring:update_member_meta(node(), NewRing0, node(),
unused, now()),
{new_ring, NewRing}
end, undefined).
At Open X, EE 1.4.2+patches was under-performing and so not keeping up with their hourly ETL load (millions of IP addresses). The sinks were only lightly loaded, and the source had little to no TCP buffers, indicating that neither network latency nor RT sink performance were the problem.
After a few failed attempts at diagnosing the problem by the team, Jon Meredith led a live debugging session with the admin at Open X and found the root cause in a few minutes, using the steps below:
rp(process_info(whereis(riak_repl2_rtq), garbage_collection)).
rp(sys:get_status(riak_repl2_rtq)).
When these didn't reveal anything alarming...
io:format("~s\n", [element(2, process_info(whereis(riak_repl2_rtq), backtrace))]).
This showed us the following stack trace:
0x00007fc3c43f1240 Return addr 0x0000000003dc6658 (riak_repl2_rtq:'-clear_non_deliverables/3-lc$^0/1-0-'/3 + 520)
y(0) {c,"riak-tq.prod.ca",11350544,11350545,0,2773400,1,undefined,true}
0x00007fc3c43f1250 Return addr 0x0000000003dc63c8 (riak_repl2_rtq:'-clear_non_deliverables/3-fun-0-'/3 + 400)
y(0) {c,"riak-tq.prod.lc",11350544,11350545,0,2774077,1,undefined,true}
0x00007fc3c43f1260 Return addr 0x00007fc3d8d4ba18 (lists:foldl/3 + 120)
y(0) []
y(1) []
y(2) 11406509
0x00007fc3c43f1280 Return addr 0x00007fc3d8ffe548 (ets:do_foldl/4 + 264)
y(0) #Fun<riak_repl2_rtq.8.100495617>
y(1) []
0x00007fc3c43f1298 Return addr 0x00007fc3d8ffe338 (ets:foldl/3 + 264)
y(0) 11406509
y(1) 36700404
y(2) #Fun<riak_repl2_rtq.8.100495617>
Load the following beam in basho-patches:
https://dl.dropboxusercontent.com/u/2596739/timeit.beam
run:
m(timeit).
timeit:timeit(riak_repl2_rtq, clear_non_deliverables, 3). timer:sleep(5000). dbg:stop_clear().
Since this function is called for each RT replication ack, it was limiting replication to 5 objects per second max.
It was found that clean_non_deliverables was doing a fold over the entire RT queue ETS table each time. The final resolution was to remove the function from the ack path, as it was essentially doing garbage collection and not necessary in the replication call path.