Skip to content

Commit

Permalink
Improve DC and InterDC Functionality (#408)
Browse files Browse the repository at this point in the history
* Test case for issue #400 
* Removed stable riak metadata and improved create dc.
* Changed level of pending changes log messages
* Added a better named function to add nodes to a current dc
* Changed description and test case for dc descriptors. Fixed #400
* Reverted handle call for periodic updates, not needed
* Bumped to version 0.2.2
* Reduced severity of log message for missing external_descriptors
* Update include/antidote.hrl
* Added leave_dc functionality

We maintain our own dc id based on the riak_core_ring:cluster_name/1 command.
Riak maintains the cluster name itself, and contrary to the comment in dc_utilities,
it is restored on fail and restart correctly, even if all nodes fail at the same time.
This caused errors in maintaining the ring members after simplifying the create_dc function
and errors in shard count meta data. Switching back to the riak internal meta data
(which is managed and updated by riak_core anyway) fixes these issues.
The create_dc simplification also improved joining an empty cluster significantly (10 seconds for 8 shards).
This is also work towards being able to add/remove nodes dynamically to scale after some time elapsed.

Co-Authored-By: Peter Zeller <p_zeller@cs.uni-kl.de>
  • Loading branch information
albsch and peterzeller committed Nov 28, 2019
1 parent a597f73 commit f6bf21b
Show file tree
Hide file tree
Showing 21 changed files with 218 additions and 353 deletions.
9 changes: 8 additions & 1 deletion config/sys.config.src
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@
{riak_core, [
%% riak directories
{ring_state_dir, "${ROOT_DIR_PREFIX}${DATA_DIR_PREFIX}data_riak_core"},
{platform_data_dir, "${ROOT_DIR_PREFIX}${DATA_DIR_PREFIX}data_riak_core"}
{platform_data_dir, "${ROOT_DIR_PREFIX}${DATA_DIR_PREFIX}data_riak_core"},

%% determines how many vnodes will be used
%% also determines the number of files the log is sliced into
%% has to be an exponent of 2
%% low number will decrease file accesses (good for testing) and boot time
%% high number enables scaling and generates smaller log files
{ring_creation_size, 64}
]},


Expand Down
4 changes: 3 additions & 1 deletion include/antidote.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@
-type op() :: {op_name(), op_param()}.
-type effect() :: term().

-type dcid() :: 'undefined' | {atom(),tuple()}. %% TODO, is this the only structure that is returned by riak_core_ring:cluster_name(Ring)?

%% DC Id is the riak_core ring cluster name
-type dcid() :: undefined | riak_core_ring:riak_core_ring().
-type snapshot_time() :: 'undefined' | vectorclock:vectorclock().
-type clock_time() :: non_neg_integer().
-type dc_and_commit_time() :: {dcid(), clock_time()}.
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
]}
]}.

{relx, [{release, {antidote, "0.2.1"}, [antidote]},
{relx, [{release, {antidote, "0.2.2"}, [antidote]},
{dev_mode, false},
% do not expect Erlang runtime at deployment site
{include_erts, true},
Expand Down
2 changes: 1 addition & 1 deletion src/antidote.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- erlang -*-
{application, antidote, [
{description, "A transactional CRDT database"},
{vsn, "0.2.1"},
{vsn, "0.2.2"},
{applications, [
kernel,
stdlib,
Expand Down
289 changes: 110 additions & 179 deletions src/antidote_dc_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,22 @@


%% This module exports methods to build a data center of multiple nodes and
%% connect data centers to start replcation among them.
%% Usage Example: To create 3 DCs of 2 nodes each:
%% create_dc(['antidote@node1', 'antidote@node2']),
%% create_dc(['antidote@node3', 'antidote@node4']),
%% create_dc(['antidote@node5', 'antidote@node6']),
%% connect data centers to start replication among them.
%%
%% Usage Example:
%%
%% To create 3 DCs of 2 nodes each execute
%%
%% add_nodes_to_dc(['antidote@node1', 'antidote@node2'])
%% add_nodes_to_dc(['antidote@node3', 'antidote@node4'])
%% add_nodes_to_dc(['antidote@node5', 'antidote@node6'])
%%
%% on one node of the pair of nodes.
%% (Single) Nodes will join the data center of the node the 'add_nodes_to_dc' function is executed on.
%% The `add_nodes_to_dc` function is idempotent.
%%
%% To connect these data centers together execute
%%
%% {ok, Descriptor1} = get_connection_descriptor() % on antidote@node1
%% {ok, Descriptor2} = get_connection_descriptor() %% on antidote@node3
%% {ok, Descriptor3} = get_connection_descriptor() %% on antidote@node5
Expand All @@ -45,41 +56,28 @@
-include("inter_dc_repl.hrl").
-include_lib("kernel/include/logger.hrl").

-export([create_dc/1,
get_connection_descriptor/0,
subscribe_updates_from/1]
).
-export([
leave_dc/0,
create_dc/1,
add_nodes_to_dc/1,
get_connection_descriptor/0,
subscribe_updates_from/1
]).


%% Command this node to leave the current data center
-spec leave_dc() -> ok | {error, term()}.
leave_dc() -> riak_core:leave().

%% Build a ring of Nodes forming a data center
-spec create_dc([node()]) -> ok.
create_dc(Nodes) ->
?LOG_INFO("Creating DC ring with nodes ~p", [Nodes]),
%% Ensure each node owns 100% of it's own ring
_ = [[Node] = owners_according_to(Node) || Node <- Nodes],
%% Join nodes
[Node1 | OtherNodes] = Nodes,
case OtherNodes of
[] ->
%% no other nodes, nothing to join/plan/commit
ok;
_ ->
%% ok do a staged join and then commit it, this eliminates the
%% large amount of redundant handoff done in a sequential join
[staged_join(Node, Node1) || Node <- OtherNodes],
plan_and_commit(Node1),
try_nodes_ready(Nodes, 3, 500)
end,

ok = wait_until_nodes_ready(Nodes),

%% Ensure each node owns a portion of the ring
wait_until_nodes_agree_about_ownership(Nodes),
ok = wait_until_no_pending_changes(Nodes),
wait_until_ring_converged(Nodes),
ok = wait_until(hd(Nodes), fun wait_init:check_ready/1),
%% starts metadata services needed for intra-dc communication
ok = inter_dc_manager:start_bg_processes(stable_time_functions),
ok.
-spec add_nodes_to_dc([node()]) -> ok | {error, ring_not_ready}.
add_nodes_to_dc(Nodes) ->
%% check if ring is ready first
case riak_core_ring:ring_ready() of
true -> join_new_nodes(Nodes);
_ -> {error, ring_not_ready}
end.


%% Start receiving updates from other DCs
-spec subscribe_updates_from([descriptor()]) -> ok.
Expand All @@ -89,163 +87,96 @@ subscribe_updates_from(DCDescriptors) ->
ok = inter_dc_manager:dc_successfully_started(),
ok.


%% Get the DC connection descriptor to be given to other DCs
-spec get_connection_descriptor() -> {ok, descriptor()}.
get_connection_descriptor() ->
inter_dc_manager:get_descriptor().


%% ---------- Internal Functions --------------

%% @doc Return a list of nodes that own partitions according to the ring
%% retrieved from the specified node.
owners_according_to(Node) ->
case rpc:call(Node, riak_core_ring_manager, get_raw_ring, []) of
{ok, Ring} ->
?LOG_INFO("Ring ~p", [Ring]),
Owners = [Owner || {_Idx, Owner} <- riak_core_ring:all_owners(Ring)],
?LOG_INFO("Owners ~p", [lists:usort(Owners)]),
lists:usort(Owners);
{badrpc, Reason} ->
?LOG_INFO("Could not connect to Node ~p", [Node]),
{badrpc, Reason}
end.
-spec join_new_nodes([node()]) -> ok.
join_new_nodes(Nodes) ->
%% get the current ring
{ok, CurrentRing} = riak_core_ring_manager:get_my_ring(),

%% filter nodes that are not already in this nodes ring
CurrentNodeMembers = riak_core_ring:all_members(CurrentRing),
NewNodeMembers = [NewNode || NewNode <- Nodes, not lists:member(NewNode, CurrentNodeMembers)],
plan_and_commit(NewNodeMembers).


%% @doc Have `Node' send a join request to `PNode'
staged_join(Node, PNode) ->
?LOG_INFO("[join] ~p to (~p)", [Node, PNode]),
ok = rpc:call(Node, riak_core, staged_join, [PNode]),
-spec plan_and_commit([node()]) -> ok.
plan_and_commit([]) -> ?LOG_WARNING("No new nodes added to the ring of ~p", [node()]);
plan_and_commit(NewNodeMembers) ->
lists:foreach(fun(Node) ->
?LOG_INFO("Checking if Node ~p is reachable (from ~p)", [Node, node()]),
pong = net_adm:ping(Node)
end, NewNodeMembers),

lists:foreach(fun(Node) ->
?LOG_INFO("Node ~p is joining my ring (~p)", [Node, node()]),
ok = rpc:call(Node, riak_core, staged_join, [node()])
end, NewNodeMembers),

lists:foreach(fun(Node) ->
?LOG_INFO("Checking if node ring is ready (~p)", [Node]),
wait_until_ring_ready(Node)
end, NewNodeMembers),

{ok, Actions, Transitions} = riak_core_claimant:plan(),
?LOG_DEBUG("Actions planned: ~p", [Actions]),
?LOG_DEBUG("Ring transitions planned: ~p", [Transitions]),

%% only after commit returns ok the ring structure will change
%% even if nothing changes, it returns {error, nothing_planned} indicating some serious error
ok = riak_core_claimant:commit(),
?LOG_NOTICE("Ring committed and ring structure is changing. New ring members: ~p", [NewNodeMembers]),

%% wait until ring is ready
wait_until_ring_ready(node()),

%% wait until ring has no pending changes
%% this prevents writing to a ring which has not finished its balancing yet and therefore causes
%% handoffs to be triggered
%% FIXME this can be removed when #401 and #203 is fixed
wait_until_ring_no_pending_changes(),


%% start periodic heart beat
ok = inter_dc_manager:start_bg_processes(stable_time_functions),
ok.

plan_and_commit(Node) ->
?LOG_INFO("Planning and committing cluster join"),
case rpc:call(Node, riak_core_claimant, plan, []) of
{error, ring_not_ready} ->
?LOG_INFO("plan: ring not ready"),
maybe_wait_for_changes(Node),
plan_and_commit(Node);
{ok, _, _} ->
do_commit(Node)
end.
do_commit(Node) ->
?LOG_INFO("Committing"),
case rpc:call(Node, riak_core_claimant, commit, []) of
{error, plan_changed} ->
?LOG_INFO("commit: plan changed"),
maybe_wait_for_changes(Node),
plan_and_commit(Node);
{error, ring_not_ready} ->
?LOG_INFO("commit: ring not ready"),
maybe_wait_for_changes(Node),
do_commit(Node);
{error, nothing_planned} ->
%% Assume plan actually committed somehow
ok;
ok ->
ok
end.

try_nodes_ready([Node1 | _Nodes], 0, _SleepMs) ->
?LOG_INFO("Nodes not ready after initial plan/commit, retrying"),
plan_and_commit(Node1);
try_nodes_ready(Nodes, N, SleepMs) ->
ReadyNodes = [Node || Node <- Nodes, is_ready(Node) =:= true],
case ReadyNodes of
Nodes ->
ok;
_ ->
timer:sleep(SleepMs),
try_nodes_ready(Nodes, N-1, SleepMs)
end.

maybe_wait_for_changes(Node) ->
wait_until_no_pending_changes([Node]).

%% @doc Given a list of nodes, wait until all nodes believe there are no
%% @doc Wait until all nodes in this ring believe there are no
%% on-going or pending ownership transfers.
-spec wait_until_no_pending_changes([node()]) -> ok.
wait_until_no_pending_changes(Nodes) ->
?LOG_INFO("Wait until no pending changes on ~p", [Nodes]),
-spec wait_until_ring_no_pending_changes() -> ok.
wait_until_ring_no_pending_changes() ->
{ok, CurrentRing} = riak_core_ring_manager:get_my_ring(),
Nodes = riak_core_ring:all_members(CurrentRing),

?LOG_DEBUG("Wait until no pending changes on ~p", [Nodes]),
F = fun() ->
_ = rpc:multicall(Nodes, riak_core_vnode_manager, force_handoffs, []),
{Rings, BadNodes} = rpc:multicall(Nodes, riak_core_ring_manager, get_raw_ring, []),
Changes = [ riak_core_ring:pending_changes(Ring) =:= [] || {ok, Ring} <- Rings ],
BadNodes =:= [] andalso length(Changes) =:= length(Nodes) andalso lists:all(fun(T) -> T end, Changes)
_ = rpc:multicall(Nodes, riak_core_vnode_manager, force_handoffs, []),
{Rings, BadNodes} = rpc:multicall(Nodes, riak_core_ring_manager, get_raw_ring, []),
Changes = [ riak_core_ring:pending_changes(Ring) =:= [] || {ok, Ring} <- Rings ],
BadNodes =:= [] andalso length(Changes) =:= length(Nodes) andalso lists:all(fun(T) -> T end, Changes)
end,
wait_until(F).

%% @doc Utility function used to construct test predicates. Retries the
%% function `Fun' until it returns `true', or until the maximum
%% number of retries is reached.
wait_until(Fun) when is_function(Fun) ->
MaxTime = 600000, %% @TODO use config,
Delay = 1000, %% @TODO use config,
Retry = MaxTime div Delay,
wait_until(Fun, Retry, Delay).

%% @doc Given a list of nodes, wait until all nodes are considered ready.
%% See {@link wait_until_ready/1} for definition of ready.
wait_until_nodes_ready(Nodes) ->
?LOG_INFO("Wait until nodes are ready : ~p", [Nodes]),
[ok = wait_until(Node, fun is_ready/1) || Node <- Nodes],
ok.

%% @private
is_ready(Node) ->
case rpc:call(Node, riak_core_ring_manager, get_raw_ring, []) of
{ok, Ring} ->
case lists:member(Node, riak_core_ring:ready_members(Ring)) of
true -> true;
false -> {not_ready, Node}
end;
Other ->
Other
case F() of
true -> ok;
_ -> timer:sleep(500), wait_until_ring_no_pending_changes()
end.

wait_until_nodes_agree_about_ownership(Nodes) ->
?LOG_INFO("Wait until nodes agree about ownership: ~p", [Nodes]),
Results = [ wait_until_owners_according_to(Node, Nodes) || Node <- Nodes ],
lists:all(fun(X) -> ok =:= X end, Results).

%% @doc Convenience wrapper for wait_until for the myriad functions that
%% take a node as single argument.
wait_until(Node, Fun) when is_atom(Node), is_function(Fun) ->
wait_until(fun() -> Fun(Node) end).

wait_until(Fun, Retry, Delay) when Retry > 0 ->
wait_until_result(Fun, true, Retry, Delay).

wait_until_result(Fun, Result, Retry, Delay) when Retry > 0 ->
Res = Fun(),
case Res of
Result ->
ok;
_ when Retry == 1 ->
{fail, Res};
_ ->
timer:sleep(Delay),
wait_until_result(Fun, Result, Retry-1, Delay)
end.

wait_until_owners_according_to(Node, Nodes) ->
SortedNodes = lists:usort(Nodes),
F = fun(N) ->
owners_according_to(N) =:= SortedNodes
end,
ok = wait_until(Node, F),
ok.

%% @private
is_ring_ready(Node) ->
case rpc:call(Node, riak_core_ring_manager, get_raw_ring, []) of
{ok, Ring} ->
riak_core_ring:ring_ready(Ring);
_ ->
false
-spec wait_until_ring_ready(node()) -> ok.
wait_until_ring_ready(Node) ->
Status = rpc:call(Node, riak_core_ring, ring_ready, []),
case Status of
true -> ok;
false -> timer:sleep(100), wait_until_ring_ready(Node)
end.

%% @doc Given a list of nodes, wait until all nodes believe the ring has
%% converged (ie. `riak_core_ring:is_ready' returns `true').
wait_until_ring_converged(Nodes) ->
?LOG_INFO("Wait until ring converged on ~p", [Nodes]),
[ok = wait_until(Node, fun is_ring_ready/1)|| Node <- Nodes],
ok.
%% backwards compatible function for add_nodes_to_dc
-spec create_dc([node()]) -> ok | {error, ring_not_ready}.
create_dc(Nodes) -> add_nodes_to_dc(Nodes).
2 changes: 2 additions & 0 deletions src/antidote_ring_event_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
-include("antidote.hrl").
-include_lib("kernel/include/logger.hrl").

-export([update_status/0]).

%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).
Expand Down
Loading

0 comments on commit f6bf21b

Please sign in to comment.