From f6bf21bb51a673d787ceb1e5a795e30597ccb44a Mon Sep 17 00:00:00 2001 From: Albert Schimpf <38429047+albsch@users.noreply.github.com> Date: Thu, 28 Nov 2019 14:13:48 +0100 Subject: [PATCH] Improve DC and InterDC Functionality (#408) * Test case for issue #400 * Removed stable riak metadata and improved create dc. * Changed level of pending changes log messages * Added a better named function to add nodes to a current dc * Changed description and test case for dc descriptors. Fixed #400 * Reverted handle call for periodic updates, not needed * Bumped to version 0.2.2 * Reduced severity of log message for missing external_descriptors * Update include/antidote.hrl * Added leave_dc functionality We maintain our own dc id based on the riak_core_ring:cluster_name/1 command. Riak maintains the cluster name itself, and contrary to the comment in dc_utilities, it is restored on fail and restart correctly, even if all nodes fail at the same time. This caused errors in maintaining the ring members after simplifying the create_dc function and errors in shard count meta data. Switching back to the riak internal meta data (which is managed and updated by riak_core anyway) fixes these issues. The create_dc simplification also improved joining an empty cluster significantly (10 seconds for 8 shards). This is also work towards being able to add/remove nodes dynamically to scale after some time elapsed. Co-Authored-By: Peter Zeller --- config/sys.config.src | 9 +- include/antidote.hrl | 4 +- rebar.config | 2 +- src/antidote.app.src | 2 +- src/antidote_dc_manager.erl | 289 +++++++++++----------------- src/antidote_ring_event_handler.erl | 2 + src/antidote_stats.erl | 5 +- src/bcounter_mgr.erl | 14 +- src/clocksi_interactive_coord.erl | 2 +- src/clocksi_vnode.erl | 4 +- src/cure.erl | 2 +- src/dc_meta_data_utilities.erl | 135 +------------ src/dc_utilities.erl | 15 +- src/inter_dc_dep_vnode.erl | 2 +- src/inter_dc_manager.erl | 12 +- src/inter_dc_query_response.erl | 2 +- src/inter_dc_txn.erl | 4 +- src/log_utilities.erl | 14 +- src/logging_vnode.erl | 6 +- test/multidc/antidote_SUITE.erl | 44 ++++- test/utils/test_utils.erl | 2 +- 21 files changed, 218 insertions(+), 353 deletions(-) diff --git a/config/sys.config.src b/config/sys.config.src index 69e6317c0..f7a3e7f0c 100644 --- a/config/sys.config.src +++ b/config/sys.config.src @@ -28,7 +28,14 @@ {riak_core, [ %% riak directories {ring_state_dir, "${ROOT_DIR_PREFIX}${DATA_DIR_PREFIX}data_riak_core"}, - {platform_data_dir, "${ROOT_DIR_PREFIX}${DATA_DIR_PREFIX}data_riak_core"} + {platform_data_dir, "${ROOT_DIR_PREFIX}${DATA_DIR_PREFIX}data_riak_core"}, + + %% determines how many vnodes will be used + %% also determines the number of files the log is sliced into + %% has to be an exponent of 2 + %% low number will decrease file accesses (good for testing) and boot time + %% high number enables scaling and generates smaller log files + {ring_creation_size, 64} ]}, diff --git a/include/antidote.hrl b/include/antidote.hrl index 4ba5f1ac3..46d33a716 100644 --- a/include/antidote.hrl +++ b/include/antidote.hrl @@ -185,7 +185,9 @@ -type op() :: {op_name(), op_param()}. -type effect() :: term(). --type dcid() :: 'undefined' | {atom(),tuple()}. %% TODO, is this the only structure that is returned by riak_core_ring:cluster_name(Ring)? + +%% DC Id is the riak_core ring cluster name +-type dcid() :: undefined | riak_core_ring:riak_core_ring(). -type snapshot_time() :: 'undefined' | vectorclock:vectorclock(). -type clock_time() :: non_neg_integer(). -type dc_and_commit_time() :: {dcid(), clock_time()}. diff --git a/rebar.config b/rebar.config index 60f0adf29..94e757740 100644 --- a/rebar.config +++ b/rebar.config @@ -100,7 +100,7 @@ ]} ]}. -{relx, [{release, {antidote, "0.2.1"}, [antidote]}, +{relx, [{release, {antidote, "0.2.2"}, [antidote]}, {dev_mode, false}, % do not expect Erlang runtime at deployment site {include_erts, true}, diff --git a/src/antidote.app.src b/src/antidote.app.src index 170489063..085929ceb 100644 --- a/src/antidote.app.src +++ b/src/antidote.app.src @@ -1,7 +1,7 @@ %% -*- erlang -*- {application, antidote, [ {description, "A transactional CRDT database"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {applications, [ kernel, stdlib, diff --git a/src/antidote_dc_manager.erl b/src/antidote_dc_manager.erl index 6e003fb16..161a9bc0d 100644 --- a/src/antidote_dc_manager.erl +++ b/src/antidote_dc_manager.erl @@ -28,11 +28,22 @@ %% This module exports methods to build a data center of multiple nodes and -%% connect data centers to start replcation among them. -%% Usage Example: To create 3 DCs of 2 nodes each: -%% create_dc(['antidote@node1', 'antidote@node2']), -%% create_dc(['antidote@node3', 'antidote@node4']), -%% create_dc(['antidote@node5', 'antidote@node6']), +%% connect data centers to start replication among them. +%% +%% Usage Example: +%% +%% To create 3 DCs of 2 nodes each execute +%% +%% add_nodes_to_dc(['antidote@node1', 'antidote@node2']) +%% add_nodes_to_dc(['antidote@node3', 'antidote@node4']) +%% add_nodes_to_dc(['antidote@node5', 'antidote@node6']) +%% +%% on one node of the pair of nodes. +%% (Single) Nodes will join the data center of the node the 'add_nodes_to_dc' function is executed on. +%% The `add_nodes_to_dc` function is idempotent. +%% +%% To connect these data centers together execute +%% %% {ok, Descriptor1} = get_connection_descriptor() % on antidote@node1 %% {ok, Descriptor2} = get_connection_descriptor() %% on antidote@node3 %% {ok, Descriptor3} = get_connection_descriptor() %% on antidote@node5 @@ -45,41 +56,28 @@ -include("inter_dc_repl.hrl"). -include_lib("kernel/include/logger.hrl"). --export([create_dc/1, - get_connection_descriptor/0, - subscribe_updates_from/1] - ). +-export([ + leave_dc/0, + create_dc/1, + add_nodes_to_dc/1, + get_connection_descriptor/0, + subscribe_updates_from/1 +]). + + +%% Command this node to leave the current data center +-spec leave_dc() -> ok | {error, term()}. +leave_dc() -> riak_core:leave(). %% Build a ring of Nodes forming a data center --spec create_dc([node()]) -> ok. -create_dc(Nodes) -> - ?LOG_INFO("Creating DC ring with nodes ~p", [Nodes]), - %% Ensure each node owns 100% of it's own ring - _ = [[Node] = owners_according_to(Node) || Node <- Nodes], - %% Join nodes - [Node1 | OtherNodes] = Nodes, - case OtherNodes of - [] -> - %% no other nodes, nothing to join/plan/commit - ok; - _ -> - %% ok do a staged join and then commit it, this eliminates the - %% large amount of redundant handoff done in a sequential join - [staged_join(Node, Node1) || Node <- OtherNodes], - plan_and_commit(Node1), - try_nodes_ready(Nodes, 3, 500) - end, - - ok = wait_until_nodes_ready(Nodes), - - %% Ensure each node owns a portion of the ring - wait_until_nodes_agree_about_ownership(Nodes), - ok = wait_until_no_pending_changes(Nodes), - wait_until_ring_converged(Nodes), - ok = wait_until(hd(Nodes), fun wait_init:check_ready/1), - %% starts metadata services needed for intra-dc communication - ok = inter_dc_manager:start_bg_processes(stable_time_functions), - ok. +-spec add_nodes_to_dc([node()]) -> ok | {error, ring_not_ready}. +add_nodes_to_dc(Nodes) -> + %% check if ring is ready first + case riak_core_ring:ring_ready() of + true -> join_new_nodes(Nodes); + _ -> {error, ring_not_ready} + end. + %% Start receiving updates from other DCs -spec subscribe_updates_from([descriptor()]) -> ok. @@ -89,163 +87,96 @@ subscribe_updates_from(DCDescriptors) -> ok = inter_dc_manager:dc_successfully_started(), ok. + %% Get the DC connection descriptor to be given to other DCs -spec get_connection_descriptor() -> {ok, descriptor()}. get_connection_descriptor() -> inter_dc_manager:get_descriptor(). + %% ---------- Internal Functions -------------- -%% @doc Return a list of nodes that own partitions according to the ring -%% retrieved from the specified node. -owners_according_to(Node) -> - case rpc:call(Node, riak_core_ring_manager, get_raw_ring, []) of - {ok, Ring} -> - ?LOG_INFO("Ring ~p", [Ring]), - Owners = [Owner || {_Idx, Owner} <- riak_core_ring:all_owners(Ring)], - ?LOG_INFO("Owners ~p", [lists:usort(Owners)]), - lists:usort(Owners); - {badrpc, Reason} -> - ?LOG_INFO("Could not connect to Node ~p", [Node]), - {badrpc, Reason} - end. +-spec join_new_nodes([node()]) -> ok. +join_new_nodes(Nodes) -> + %% get the current ring + {ok, CurrentRing} = riak_core_ring_manager:get_my_ring(), + + %% filter nodes that are not already in this nodes ring + CurrentNodeMembers = riak_core_ring:all_members(CurrentRing), + NewNodeMembers = [NewNode || NewNode <- Nodes, not lists:member(NewNode, CurrentNodeMembers)], + plan_and_commit(NewNodeMembers). + -%% @doc Have `Node' send a join request to `PNode' -staged_join(Node, PNode) -> - ?LOG_INFO("[join] ~p to (~p)", [Node, PNode]), - ok = rpc:call(Node, riak_core, staged_join, [PNode]), +-spec plan_and_commit([node()]) -> ok. +plan_and_commit([]) -> ?LOG_WARNING("No new nodes added to the ring of ~p", [node()]); +plan_and_commit(NewNodeMembers) -> + lists:foreach(fun(Node) -> + ?LOG_INFO("Checking if Node ~p is reachable (from ~p)", [Node, node()]), + pong = net_adm:ping(Node) + end, NewNodeMembers), + + lists:foreach(fun(Node) -> + ?LOG_INFO("Node ~p is joining my ring (~p)", [Node, node()]), + ok = rpc:call(Node, riak_core, staged_join, [node()]) + end, NewNodeMembers), + + lists:foreach(fun(Node) -> + ?LOG_INFO("Checking if node ring is ready (~p)", [Node]), + wait_until_ring_ready(Node) + end, NewNodeMembers), + + {ok, Actions, Transitions} = riak_core_claimant:plan(), + ?LOG_DEBUG("Actions planned: ~p", [Actions]), + ?LOG_DEBUG("Ring transitions planned: ~p", [Transitions]), + + %% only after commit returns ok the ring structure will change + %% even if nothing changes, it returns {error, nothing_planned} indicating some serious error + ok = riak_core_claimant:commit(), + ?LOG_NOTICE("Ring committed and ring structure is changing. New ring members: ~p", [NewNodeMembers]), + + %% wait until ring is ready + wait_until_ring_ready(node()), + + %% wait until ring has no pending changes + %% this prevents writing to a ring which has not finished its balancing yet and therefore causes + %% handoffs to be triggered + %% FIXME this can be removed when #401 and #203 is fixed + wait_until_ring_no_pending_changes(), + + + %% start periodic heart beat + ok = inter_dc_manager:start_bg_processes(stable_time_functions), ok. -plan_and_commit(Node) -> - ?LOG_INFO("Planning and committing cluster join"), - case rpc:call(Node, riak_core_claimant, plan, []) of - {error, ring_not_ready} -> - ?LOG_INFO("plan: ring not ready"), - maybe_wait_for_changes(Node), - plan_and_commit(Node); - {ok, _, _} -> - do_commit(Node) - end. -do_commit(Node) -> - ?LOG_INFO("Committing"), - case rpc:call(Node, riak_core_claimant, commit, []) of - {error, plan_changed} -> - ?LOG_INFO("commit: plan changed"), - maybe_wait_for_changes(Node), - plan_and_commit(Node); - {error, ring_not_ready} -> - ?LOG_INFO("commit: ring not ready"), - maybe_wait_for_changes(Node), - do_commit(Node); - {error, nothing_planned} -> - %% Assume plan actually committed somehow - ok; - ok -> - ok - end. -try_nodes_ready([Node1 | _Nodes], 0, _SleepMs) -> - ?LOG_INFO("Nodes not ready after initial plan/commit, retrying"), - plan_and_commit(Node1); - try_nodes_ready(Nodes, N, SleepMs) -> - ReadyNodes = [Node || Node <- Nodes, is_ready(Node) =:= true], - case ReadyNodes of - Nodes -> - ok; - _ -> - timer:sleep(SleepMs), - try_nodes_ready(Nodes, N-1, SleepMs) - end. - -maybe_wait_for_changes(Node) -> - wait_until_no_pending_changes([Node]). - -%% @doc Given a list of nodes, wait until all nodes believe there are no +%% @doc Wait until all nodes in this ring believe there are no %% on-going or pending ownership transfers. --spec wait_until_no_pending_changes([node()]) -> ok. -wait_until_no_pending_changes(Nodes) -> - ?LOG_INFO("Wait until no pending changes on ~p", [Nodes]), +-spec wait_until_ring_no_pending_changes() -> ok. +wait_until_ring_no_pending_changes() -> + {ok, CurrentRing} = riak_core_ring_manager:get_my_ring(), + Nodes = riak_core_ring:all_members(CurrentRing), + + ?LOG_DEBUG("Wait until no pending changes on ~p", [Nodes]), F = fun() -> - _ = rpc:multicall(Nodes, riak_core_vnode_manager, force_handoffs, []), - {Rings, BadNodes} = rpc:multicall(Nodes, riak_core_ring_manager, get_raw_ring, []), - Changes = [ riak_core_ring:pending_changes(Ring) =:= [] || {ok, Ring} <- Rings ], - BadNodes =:= [] andalso length(Changes) =:= length(Nodes) andalso lists:all(fun(T) -> T end, Changes) + _ = rpc:multicall(Nodes, riak_core_vnode_manager, force_handoffs, []), + {Rings, BadNodes} = rpc:multicall(Nodes, riak_core_ring_manager, get_raw_ring, []), + Changes = [ riak_core_ring:pending_changes(Ring) =:= [] || {ok, Ring} <- Rings ], + BadNodes =:= [] andalso length(Changes) =:= length(Nodes) andalso lists:all(fun(T) -> T end, Changes) end, - wait_until(F). - -%% @doc Utility function used to construct test predicates. Retries the -%% function `Fun' until it returns `true', or until the maximum -%% number of retries is reached. -wait_until(Fun) when is_function(Fun) -> - MaxTime = 600000, %% @TODO use config, - Delay = 1000, %% @TODO use config, - Retry = MaxTime div Delay, - wait_until(Fun, Retry, Delay). - -%% @doc Given a list of nodes, wait until all nodes are considered ready. -%% See {@link wait_until_ready/1} for definition of ready. -wait_until_nodes_ready(Nodes) -> - ?LOG_INFO("Wait until nodes are ready : ~p", [Nodes]), - [ok = wait_until(Node, fun is_ready/1) || Node <- Nodes], - ok. - -%% @private -is_ready(Node) -> - case rpc:call(Node, riak_core_ring_manager, get_raw_ring, []) of - {ok, Ring} -> - case lists:member(Node, riak_core_ring:ready_members(Ring)) of - true -> true; - false -> {not_ready, Node} - end; - Other -> - Other + case F() of + true -> ok; + _ -> timer:sleep(500), wait_until_ring_no_pending_changes() end. -wait_until_nodes_agree_about_ownership(Nodes) -> - ?LOG_INFO("Wait until nodes agree about ownership: ~p", [Nodes]), - Results = [ wait_until_owners_according_to(Node, Nodes) || Node <- Nodes ], - lists:all(fun(X) -> ok =:= X end, Results). - -%% @doc Convenience wrapper for wait_until for the myriad functions that -%% take a node as single argument. -wait_until(Node, Fun) when is_atom(Node), is_function(Fun) -> - wait_until(fun() -> Fun(Node) end). - -wait_until(Fun, Retry, Delay) when Retry > 0 -> - wait_until_result(Fun, true, Retry, Delay). - -wait_until_result(Fun, Result, Retry, Delay) when Retry > 0 -> - Res = Fun(), - case Res of - Result -> - ok; - _ when Retry == 1 -> - {fail, Res}; - _ -> - timer:sleep(Delay), - wait_until_result(Fun, Result, Retry-1, Delay) - end. -wait_until_owners_according_to(Node, Nodes) -> - SortedNodes = lists:usort(Nodes), - F = fun(N) -> - owners_according_to(N) =:= SortedNodes - end, - ok = wait_until(Node, F), - ok. - -%% @private -is_ring_ready(Node) -> - case rpc:call(Node, riak_core_ring_manager, get_raw_ring, []) of - {ok, Ring} -> - riak_core_ring:ring_ready(Ring); - _ -> - false +-spec wait_until_ring_ready(node()) -> ok. +wait_until_ring_ready(Node) -> + Status = rpc:call(Node, riak_core_ring, ring_ready, []), + case Status of + true -> ok; + false -> timer:sleep(100), wait_until_ring_ready(Node) end. -%% @doc Given a list of nodes, wait until all nodes believe the ring has -%% converged (ie. `riak_core_ring:is_ready' returns `true'). -wait_until_ring_converged(Nodes) -> - ?LOG_INFO("Wait until ring converged on ~p", [Nodes]), - [ok = wait_until(Node, fun is_ring_ready/1)|| Node <- Nodes], - ok. +%% backwards compatible function for add_nodes_to_dc +-spec create_dc([node()]) -> ok | {error, ring_not_ready}. +create_dc(Nodes) -> add_nodes_to_dc(Nodes). diff --git a/src/antidote_ring_event_handler.erl b/src/antidote_ring_event_handler.erl index 88cb03e14..5319ebaee 100644 --- a/src/antidote_ring_event_handler.erl +++ b/src/antidote_ring_event_handler.erl @@ -32,6 +32,8 @@ -include("antidote.hrl"). -include_lib("kernel/include/logger.hrl"). +-export([update_status/0]). + %% gen_event callbacks -export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]). diff --git a/src/antidote_stats.erl b/src/antidote_stats.erl index a97838f8d..d9521726d 100644 --- a/src/antidote_stats.erl +++ b/src/antidote_stats.erl @@ -89,6 +89,9 @@ handle_info(periodic_update, State = #state{timer = CheapTimer}) -> update_dc_count(), + %% update ring state + antidote_ring_event_handler:update_status(), + %% schedule tick if continue Timer = erlang:send_after(?INTERVAL, self(), periodic_update), {noreply, State#state{timer = Timer}}; @@ -144,7 +147,7 @@ to_microsec({MegaSecs, Secs, MicroSecs}) -> update_dc_count() -> - DCs = dc_meta_data_utilities:get_dc_ids(true), + DCs = dc_meta_data_utilities:get_dc_descriptors(), ?STATS({dc_count, length(DCs)}). diff --git a/src/bcounter_mgr.erl b/src/bcounter_mgr.erl index 02e12c177..a61656993 100644 --- a/src/bcounter_mgr.erl +++ b/src/bcounter_mgr.erl @@ -78,13 +78,13 @@ init([]) -> %% below 0), operation fails, otherwhise a downstream for the decrement %% is generated. generate_downstream(Key, {decrement, {V, _}}, BCounter) -> - MyDCId = dc_meta_data_utilities:get_my_dc_id(), + MyDCId = dc_utilities:get_my_dc_id(), gen_server:call(?MODULE, {consume, Key, {decrement, {V, MyDCId}}, BCounter}); %% @doc Processes an increment operation for the bounded counter. %% Operation is always safe. generate_downstream(_Key, {increment, {Amount, _}}, BCounter) -> - MyDCId = dc_meta_data_utilities:get_my_dc_id(), + MyDCId = dc_utilities:get_my_dc_id(), ?DATA_TYPE:downstream({increment, {Amount, MyDCId}}, BCounter); %% @doc Processes a trasfer operation between two owners of the @@ -102,7 +102,7 @@ process_transfer({transfer, TransferOp}) -> handle_cast({transfer, {Key, Amount, Requester}}, #state{last_transfers=LT}=State) -> NewLT = cancel_consecutive_req(LT, ?GRACE_PERIOD), - MyDCId = dc_meta_data_utilities:get_my_dc_id(), + MyDCId = dc_utilities:get_my_dc_id(), case can_process(Key, Requester, NewLT) of true -> {SKey, Bucket} = Key, @@ -115,7 +115,7 @@ handle_cast({transfer, {Key, Amount, Requester}}, #state{last_transfers=LT}=Stat end. handle_call({consume, Key, {Op, {Amount, _}}, BCounter}, _From, #state{req_queue=RQ}=State) -> - MyDCId = dc_meta_data_utilities:get_my_dc_id(), + MyDCId = dc_utilities:get_my_dc_id(), case ?DATA_TYPE:generate_downstream_check({Op, Amount}, MyDCId, BCounter, Amount) of {error, no_permissions} = FailedResult -> Available = ?DATA_TYPE:localPermissions(MyDCId, BCounter), @@ -166,7 +166,7 @@ queue_request(Key, Amount, RequestsQueue) -> request_remote(0, _Key) -> 0; request_remote(RequiredSum, Key) -> - MyDCId = dc_meta_data_utilities:get_my_dc_id(), + MyDCId = dc_utilities:get_my_dc_id(), {SKey, Bucket} = Key, BObj = {SKey, ?DATA_TYPE, Bucket}, {ok, [Obj], _} = antidote:read_objects(ignore, [], [BObj]), @@ -194,7 +194,7 @@ do_request(MyDCId, RemoteId, Key, Amount) -> %% Orders the reservation of each DC, from high to low. pref_list(Obj) -> - MyDCId = dc_meta_data_utilities:get_my_dc_id(), + MyDCId = dc_utilities:get_my_dc_id(), OtherDCDescriptors = dc_meta_data_utilities:get_dc_descriptors(), OtherDCIds = lists:foldl(fun(#descriptor{dcid=Id}, IdsList) -> case Id == MyDCId of @@ -227,7 +227,7 @@ clear_pending_req(LastRequests, Period) -> end , LastRequests). can_process(Key, Requester, LastTransfers) -> - MyDCId = dc_meta_data_utilities:get_my_dc_id(), + MyDCId = dc_utilities:get_my_dc_id(), case Requester == MyDCId of false -> case orddict:find({Key, Requester}, LastTransfers) of diff --git a/src/clocksi_interactive_coord.erl b/src/clocksi_interactive_coord.erl index 51d847548..7b28b7230 100644 --- a/src/clocksi_interactive_coord.erl +++ b/src/clocksi_interactive_coord.erl @@ -50,7 +50,7 @@ -define(LOGGING_VNODE, mock_partition). -else. --define(DC_META_UTIL, dc_meta_data_utilities). +-define(DC_META_UTIL, dc_utilities). -define(DC_UTIL, dc_utilities). -define(VECTORCLOCK, vectorclock). -define(LOG_UTIL, log_utilities). diff --git a/src/clocksi_vnode.erl b/src/clocksi_vnode.erl index 219564a30..d4cd48eca 100644 --- a/src/clocksi_vnode.erl +++ b/src/clocksi_vnode.erl @@ -499,7 +499,7 @@ reset_prepared(PreparedTx, [{Key, _Type, _Update} | Rest], TxId, Time, ActiveTxs commit(Transaction, TxCommitTime, Updates, CommittedTx, State) -> TxId = Transaction#transaction.txn_id, - DcId = dc_meta_data_utilities:get_my_dc_id(), + DcId = dc_utilities:get_my_dc_id(), LogRecord = #log_operation{tx_id = TxId, op_type = commit, log_payload = #commit_log_payload{commit_time = {DcId, TxCommitTime}, @@ -635,7 +635,7 @@ check_prepared(_TxId, PreparedTx, Key) -> -spec update_materializer([{key(), type(), effect()}], tx(), non_neg_integer()) -> ok | error. update_materializer(DownstreamOps, Transaction, TxCommitTime) -> - DcId = dc_meta_data_utilities:get_my_dc_id(), + DcId = dc_utilities:get_my_dc_id(), ReversedDownstreamOps = lists:reverse(DownstreamOps), UpdateFunction = fun({Key, Type, Op}, AccIn) -> CommittedDownstreamOp = diff --git a/src/cure.erl b/src/cure.erl index 647d839ad..211638a75 100644 --- a/src/cure.erl +++ b/src/cure.erl @@ -210,7 +210,7 @@ gr_snapshot_obtain(ClientClock, Objects, StateOrValue) -> %% GST = scalar stable time %% VST = vector stable time with entries for each dc {ok, GST, VST} = dc_utilities:get_scalar_stable_time(), - DcId = dc_meta_data_utilities:get_my_dc_id(), + DcId = dc_utilities:get_my_dc_id(), Dt = vectorclock:get(DcId, ClientClock), case Dt =< GST of true -> diff --git a/src/dc_meta_data_utilities.erl b/src/dc_meta_data_utilities.erl index b11811eba..5c34a1cb5 100644 --- a/src/dc_meta_data_utilities.erl +++ b/src/dc_meta_data_utilities.erl @@ -40,19 +40,11 @@ store_env_meta_data/2, store_meta_data_name/1, get_meta_data_name/0, - get_dc_partitions_detailed/1, - get_dc_partitions_dict/1, - get_my_dc_id/0, - reset_my_dc_id/0, - set_dc_partitions/2, - get_dc_ids/1, get_key/1, key_as_integer/1, store_dc_descriptors/1, - get_dc_descriptors/0, - load_partition_meta_data/0, - get_num_partitions/0, - get_partition_at_index/1]). + get_dc_descriptors/0 + ]). %% Should be called once a DC has successfully started @@ -108,86 +100,8 @@ store_env_meta_data(Name, Value) -> get_meta_data_name() -> stable_meta_data_server:read_meta_data(meta_data_name). -%% Returns a tuple of three elements -%% The first is a dict with all partitions for DCID, with key and value being the partition id -%% The second is a tuple with all partitions for DCID -%% The third is an integer telling the number of partitions --spec get_dc_partitions_detailed(dcid()) -> {dict:dict(), tuple(), non_neg_integer()}. -get_dc_partitions_detailed(DCID) -> - case stable_meta_data_server:read_meta_data({partition_meta_data, DCID}) of - {ok, Info} -> - Info; - error -> - ?LOG_ERROR("Error no partitions for dc ~w", [DCID]), - {dict:new(), {}, 0} - end. - -%% Returns a dict with all partitions for DCID, with key and value being the partition id --spec get_dc_partitions_dict(dcid()) -> dict:dict(). -get_dc_partitions_dict(DCID) -> - case stable_meta_data_server:read_meta_data({partition_dict, DCID}) of - {ok, Dict} -> - Dict; - error -> - ?LOG_ERROR("Error no partitions for dc ~w", [DCID]), - dict:new() - end. - -%% Returns the id of the local dc --spec get_my_dc_id() -> dcid(). -get_my_dc_id() -> - case stable_meta_data_server:read_meta_data(my_dc) of - {ok, DcId} -> - DcId; - error -> - %% Add my DC to the list of DCs since none have been added yet - reset_my_dc_id() - end. - -% Sets the id of the local dc --spec reset_my_dc_id() -> dcid(). -reset_my_dc_id() -> - MyDC = dc_utilities:get_my_dc_id(), - ok = stable_meta_data_server:broadcast_meta_data(my_dc, MyDC), - ok = stable_meta_data_server:broadcast_meta_data_merge(dc_list_w_me, MyDC, fun ordsets:add_element/2, fun ordsets:new/0), - MyDC. -%% Loads all the partitions ids into an ets table stored by -%% their index --spec load_partition_meta_data() -> ok. -load_partition_meta_data() -> - {ok, CHBin} = riak_core_ring_manager:get_chash_bin(), - PartitionList = chashbin:to_list(CHBin), - Length = length(PartitionList), - ok = stable_meta_data_server:broadcast_meta_data({part, length}, Length), - {_Len, IdPartitionList} = lists:foldl(fun(Partition, {PrevId, Acc}) -> - {PrevId + 1, Acc ++ [{{part, PrevId}, Partition}]} - end, {1, []}, PartitionList), - ok = stable_meta_data_server:broadcast_meta_data_list(IdPartitionList). - -%% Gets the number of partitions at this DC --spec get_num_partitions() -> non_neg_integer(). -get_num_partitions() -> - case stable_meta_data_server:read_meta_data({part, length}) of - {ok, Num} -> - Num; - error -> - ok = load_partition_meta_data(), - get_num_partitions() - end. - -%% Get information about a partition based on it index --spec get_partition_at_index(non_neg_integer()) -> term(). -get_partition_at_index(Index) -> - case stable_meta_data_server:read_meta_data({part, Index}) of - {ok, Partition} -> - Partition; - error -> - ok = load_partition_meta_data(), - get_partition_at_index(Index) - end. - -%% Store an external dc descriptor +%% Store an dc descriptor -spec store_dc_descriptors([descriptor()]) -> ok. store_dc_descriptors(Descriptors) -> MergeFunc = fun(DescList, PrevDict) -> @@ -197,7 +111,7 @@ store_dc_descriptors(Descriptors) -> end, stable_meta_data_server:broadcast_meta_data_merge(external_descriptors, Descriptors, MergeFunc, fun dict:new/0). -%% Gets the list of external dc descriptors +%% Gets the list of all known dc descriptors -spec get_dc_descriptors() -> [descriptor()]. get_dc_descriptors() -> case stable_meta_data_server:read_meta_data(external_descriptors) of @@ -206,46 +120,11 @@ get_dc_descriptors() -> [Desc | Acc] end, [], Dict); error -> - [] + ?LOG_DEBUG("Could not read shared meta data for external_descriptors"), + %% return self descriptor only + [inter_dc_manager:get_descriptor()] end. -%% Add information about a DC to the meta_data --spec set_dc_partitions([partition_id()], dcid()) -> ok. -set_dc_partitions(PartitionList, DCID) -> - NumPartitions = length(PartitionList), - PartitionTuple = list_to_tuple(PartitionList), - PartitionDict = - lists:foldl(fun(Part, Acc) -> - dict:store(Part, Part, Acc) - end, dict:new(), PartitionList), - ok = stable_meta_data_server:broadcast_meta_data({partition_meta_data, DCID}, {PartitionDict, PartitionTuple, NumPartitions}), - ok = stable_meta_data_server:broadcast_meta_data({partition_dict, DCID}, PartitionDict), - %% Add the new one to the list that doesnt include you - ok = stable_meta_data_server:broadcast_meta_data_merge(dc_list, DCID, fun ordsets:add_element/2, fun ordsets:new/0), - %% Be sure your dc is in the list before adding the new one to the list that includes you - _MyDCID = get_my_dc_id(), - %% Add the new one to the list that includes you - ok = stable_meta_data_server:broadcast_meta_data_merge(dc_list_w_me, DCID, fun ordsets:add_element/2, fun ordsets:new/0). - -%% Get an ordered list of all the dc ids --spec get_dc_ids(boolean()) -> [dcid()]. -get_dc_ids(IncludeSelf) -> - case IncludeSelf of - true -> - case stable_meta_data_server:read_meta_data(dc_list_w_me) of - {ok, List} -> - List; - error -> - [get_my_dc_id()] - end; - false -> - case stable_meta_data_server:read_meta_data(dc_list) of - {ok, List} -> - List; - error -> - [] - end - end. -spec get_key(term()) -> term(). get_key(Key) when is_binary(Key) -> diff --git a/src/dc_utilities.erl b/src/dc_utilities.erl index d5aae468a..469489c56 100644 --- a/src/dc_utilities.erl +++ b/src/dc_utilities.erl @@ -58,13 +58,8 @@ now_microsec/0, now_millisec/0]). + %% Returns the ID of the current DC. -%% This should not be called manually (it is only used the very -%% first time the DC is started), instead if you need to know -%% the id of the DC use the following: -%% dc_meta_data_utilites:get_my_dc_id -%% The reason is that the dcid can change on fail and restart, but -%% the original name is stored on disk in the meta_data_utilities -spec get_my_dc_id() -> dcid(). get_my_dc_id() -> {ok, Ring} = riak_core_ring_manager:get_my_ring(), @@ -269,8 +264,8 @@ get_stable_snapshot() -> 0 -> {ok, StableSnapshot}; _ -> - DCs = dc_meta_data_utilities:get_dc_ids(true), - GST = vectorclock:min_clock(StableSnapshot, DCs), + MembersInDc = dc_utilities:get_my_dc_nodes(), + GST = vectorclock:min_clock(StableSnapshot, MembersInDc), {ok, vectorclock:set_all(GST, StableSnapshot)} end end @@ -301,8 +296,8 @@ get_scalar_stable_time() -> Now = dc_utilities:now_microsec() - ?OLD_SS_MICROSEC, {ok, Now, StableSnapshot}; _ -> - DCs = dc_meta_data_utilities:get_dc_ids(true), - GST = vectorclock:min_clock(StableSnapshot, DCs), + MembersInDc = dc_utilities:get_my_dc_nodes(), + GST = vectorclock:min_clock(StableSnapshot, MembersInDc), {ok, GST, vectorclock:set_all(GST, StableSnapshot)} end. diff --git a/src/inter_dc_dep_vnode.erl b/src/inter_dc_dep_vnode.erl index deefc42c1..54ea2281e 100644 --- a/src/inter_dc_dep_vnode.erl +++ b/src/inter_dc_dep_vnode.erl @@ -239,7 +239,7 @@ update_clock(State = #state{last_updated = LastUpdated}, DCID, Timestamp) -> -spec get_partition_clock(state()) -> vectorclock(). get_partition_clock(State) -> %% Return the vectorclock associated with the current state, but update the local entry with the current timestamp - vectorclock:set(dc_meta_data_utilities:get_my_dc_id(), dc_utilities:now_microsec(), State#state.vectorclock). + vectorclock:set(dc_utilities:get_my_dc_id(), dc_utilities:now_microsec(), State#state.vectorclock). %% Utility function: converts the transaction to a list of clocksi_payload ops. -spec updates_to_clocksi_payloads(interdc_txn()) -> list(clocksi_payload()). diff --git a/src/inter_dc_manager.erl b/src/inter_dc_manager.erl index 486288719..cc52b97e0 100644 --- a/src/inter_dc_manager.erl +++ b/src/inter_dc_manager.erl @@ -56,7 +56,7 @@ get_descriptor() -> Publishers = lists:map(fun(Node) -> rpc:call(Node, inter_dc_pub, get_address_list, []) end, Nodes), LogReaders = lists:map(fun(Node) -> rpc:call(Node, inter_dc_query_receive_socket, get_address_list, []) end, Nodes), {ok, #descriptor{ - dcid = dc_meta_data_utilities:get_my_dc_id(), + dcid = dc_utilities:get_my_dc_id(), partition_num = dc_utilities:get_partitions_num(), publishers = Publishers, logreaders = LogReaders @@ -127,9 +127,8 @@ start_bg_processes(MetaDataName) -> ok = rpc:call(Node, dc_utilities, check_registered_global, [stable_meta_data_server:generate_server_name(Node)]), ok = rpc:call(Node, meta_data_sender, start, [MetaDataName]) end, Nodes), + %% Load the internal meta-data - _MyDCId = dc_meta_data_utilities:reset_my_dc_id(), - ok = dc_meta_data_utilities:load_partition_meta_data(), ok = dc_meta_data_utilities:store_meta_data_name(MetaDataName), %% Start the timers sending the heartbeats ?LOG_INFO("Starting heartbeat sender timers"), @@ -219,6 +218,7 @@ observe_dcs_sync(Descriptors, Nodes) -> DCs = lists:map(fun(DC) -> {observe_dc(DC, Nodes), DC} end, Descriptors), + lists:foreach(fun({Res, Desc = #descriptor{dcid = DCID}}) -> case Res of ok -> @@ -233,10 +233,10 @@ observe_dcs_sync(Descriptors, Nodes) -> -spec forget_dc(descriptor(), [node()]) -> ok. forget_dc(#descriptor{dcid = DCID}, Nodes) -> - case DCID == dc_meta_data_utilities:get_my_dc_id() of + case DCID == dc_utilities:get_my_dc_id() of true -> ok; false -> - ?LOG_INFO("Forgetting DC ~p", [DCID]), + ?LOG_NOTICE("Forgetting DC ~p", [DCID]), lists:foreach(fun(Node) -> ok = rpc:call(Node, inter_dc_query, del_dc, [DCID]) end, Nodes), lists:foreach(fun(Node) -> ok = rpc:call(Node, inter_dc_sub, del_dc, [DCID]) end, Nodes) end. @@ -265,7 +265,7 @@ drop_ping(DropPing) -> %% Utils wait_for_stable_snapshot(DCID, MinValue) -> - case DCID == dc_meta_data_utilities:get_my_dc_id() of + case DCID == dc_utilities:get_my_dc_id() of true -> ok; false -> {ok, SS} = dc_utilities:get_stable_snapshot(), diff --git a/src/inter_dc_query_response.erl b/src/inter_dc_query_response.erl index 97cacdd60..bcd410cd8 100644 --- a/src/inter_dc_query_response.erl +++ b/src/inter_dc_query_response.erl @@ -72,7 +72,7 @@ init([Num]) -> handle_cast({get_entries, BinaryQuery, QueryState}, State) -> {read_log, Partition, From, To} = binary_to_term(BinaryQuery), Entries = get_entries_internal(Partition, From, To), - BinaryResp = term_to_binary({{dc_meta_data_utilities:get_my_dc_id(), Partition}, Entries}), + BinaryResp = term_to_binary({{dc_utilities:get_my_dc_id(), Partition}, Entries}), BinaryPartition = inter_dc_txn:partition_to_bin(Partition), FullResponse = <>, ok = inter_dc_query_receive_socket:send_response(FullResponse, QueryState), diff --git a/src/inter_dc_txn.erl b/src/inter_dc_txn.erl index 1bc4ab7c8..cf5a0745e 100644 --- a/src/inter_dc_txn.erl +++ b/src/inter_dc_txn.erl @@ -62,7 +62,7 @@ from_ops(Ops, Partition, PrevLogOpId) -> -spec ping(partition_id(), op_number() | none, non_neg_integer()) -> interdc_txn(). ping(Partition, PrevLogOpId, Timestamp) -> #interdc_txn{ - dcid = dc_meta_data_utilities:get_my_dc_id(), + dcid = dc_utilities:get_my_dc_id(), partition = Partition, prev_log_opid = PrevLogOpId, log_records = [], @@ -82,7 +82,7 @@ last_log_opid(Txn = #interdc_txn{log_records = Ops, prev_log_opid = LogOpId}) -> end. -spec is_local(interdc_txn()) -> boolean(). -is_local(#interdc_txn{dcid = DCID}) -> DCID == dc_meta_data_utilities:get_my_dc_id(). +is_local(#interdc_txn{dcid = DCID}) -> DCID == dc_utilities:get_my_dc_id(). -spec is_ping(interdc_txn()) -> boolean(). is_ping(#interdc_txn{log_records = Ops}) -> Ops == []. diff --git a/src/log_utilities.erl b/src/log_utilities.erl index 48a57874a..f7d0464d1 100644 --- a/src/log_utilities.erl +++ b/src/log_utilities.erl @@ -30,6 +30,8 @@ -include("antidote.hrl"). +-include_lib("kernel/include/logger.hrl"). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -58,7 +60,8 @@ get_logid_from_key(Key) -> %% key's logfile will be located. -spec get_key_partition(key()) -> index_node(). get_key_partition(Key) -> - hd(get_preflist_from_key(Key)). + IndexNode = hd(get_preflist_from_key(Key)), + IndexNode. %% @doc get_preflist_from_key returns a preference list where a given %% key's logfile will be located. @@ -74,9 +77,11 @@ get_preflist_from_key(Key) -> %% -spec get_primaries_preflist(non_neg_integer()) -> preflist(). get_primaries_preflist(Key)-> - NumPartitions = dc_meta_data_utilities:get_num_partitions(), + {ok, Ring} = riak_core_ring_manager:get_my_ring(), + {NumPartitions, ListOfPartitions} = riak_core_ring:chash(Ring), Pos = Key rem NumPartitions + 1, - [dc_meta_data_utilities:get_partition_at_index(Pos)]. + {Index, Node} = lists:nth(Pos, ListOfPartitions), + [{Index, Node}]. -spec get_my_node(partition_id()) -> node(). get_my_node(Partition) -> @@ -131,9 +136,8 @@ check_log_record_version(LogRecord) -> ?LOG_RECORD_VERSION = LogRecord#log_record.version, LogRecord. --ifdef(TEST). - +-ifdef(TEST). %% Testing remove_node_from_preflist remove_node_from_preflist_test()-> diff --git a/src/logging_vnode.erl b/src/logging_vnode.erl index 37d64cb7f..ad4fa6980 100644 --- a/src/logging_vnode.erl +++ b/src/logging_vnode.erl @@ -305,7 +305,7 @@ handle_command({get_op_id, DCID, Bucket, Partition}, _Sender, State=#state{op_id handle_command({start_timer, undefined}, Sender, State) -> handle_command({start_timer, Sender}, Sender, State); handle_command({start_timer, Sender}, _, State = #state{partition=Partition, op_id_table=OpIdTable, recovered_vector=MaxVector}) -> - MyDCID = dc_meta_data_utilities:get_my_dc_id(), + MyDCID = dc_utilities:get_my_dc_id(), OpId = get_op_id(OpIdTable, {[Partition], MyDCID}), IsReady = try ok = inter_dc_dep_vnode:set_dependency_clock(Partition, MaxVector), @@ -398,7 +398,7 @@ handle_command({append, LogId, LogOperation, Sync}, _Sender, ?STATS(operation_update_internal), case get_log_from_map(Map, Partition, LogId) of {ok, Log} -> - MyDCID = dc_meta_data_utilities:get_my_dc_id(), + MyDCID = dc_utilities:get_my_dc_id(), %% all operations update the per log, operation id OpId = get_op_id(OpIdTable, {LogId, MyDCID}), #op_number{local = Local, global = Global} = OpId, @@ -456,7 +456,7 @@ handle_command({append_group, LogId, LogRecordList, _IsLocal = false, Sync}, _Se op_id_table=OpIdTable, partition=Partition, enable_log_to_disk=EnableLog}=State) -> - MyDCID = dc_meta_data_utilities:get_my_dc_id(), + MyDCID = dc_utilities:get_my_dc_id(), {ErrorList, SuccList, UpdatedLogs} = lists:foldl(fun(LogRecordOrg, {AccErr, AccSucc, UpdatedLogs}) -> LogRecord = log_utilities:check_log_record_version(LogRecordOrg), diff --git a/test/multidc/antidote_SUITE.erl b/test/multidc/antidote_SUITE.erl index 8044a997d..8752d2b93 100644 --- a/test/multidc/antidote_SUITE.erl +++ b/test/multidc/antidote_SUITE.erl @@ -43,8 +43,11 @@ %% tests -export([ + recreate_dc/1, dummy_test/1, random_test/1, + shard_count/1, + dc_count/1, meta_data_env_test/1 ]). @@ -68,15 +71,54 @@ end_per_testcase(Name, _) -> all() -> [ + recreate_dc, + shard_count, + dc_count, dummy_test, random_test, meta_data_env_test ]. +%% Tests that add_nodes_to_dc is idempotent +%% calling it again on each node of a dc should have no effect +recreate_dc(Config) -> + [Node1, Node2 | _Nodes] = proplists:get_value(nodes, Config), + + ok = rpc:call(Node1, antidote_dc_manager, add_nodes_to_dc, [[Node1, Node2]]), + ok = rpc:call(Node1, antidote_dc_manager, add_nodes_to_dc, [[Node1, Node2]]), + ok = rpc:call(Node2, antidote_dc_manager, add_nodes_to_dc, [[Node1, Node2]]), + + ok. + +dc_count(Config) -> + [[Node1, Node2], [Node3], [Node4]] = proplists:get_value(clusters, Config), + + %% Check external DC count + DCs1 = rpc:call(Node1, dc_meta_data_utilities, get_dc_descriptors, []), + DCs2 = rpc:call(Node2, dc_meta_data_utilities, get_dc_descriptors, []), + DCs3 = rpc:call(Node3, dc_meta_data_utilities, get_dc_descriptors, []), + DCs4 = rpc:call(Node4, dc_meta_data_utilities, get_dc_descriptors, []), + + ?assertEqual({3,3,3,3}, {length(DCs1), length(DCs2), length(DCs3), length(DCs4)}), + ok. + + +shard_count(Config) -> + [[Node1, Node2], [Node3], [Node4]] = proplists:get_value(clusters, Config), + + %% Check sharding count + Shards1 = rpc:call(Node1, dc_utilities, get_my_dc_nodes, []), + Shards2 = rpc:call(Node2, dc_utilities, get_my_dc_nodes, []), + Shards3 = rpc:call(Node3, dc_utilities, get_my_dc_nodes, []), + Shards4 = rpc:call(Node4, dc_utilities, get_my_dc_nodes, []), + + ?assertEqual({2,2,1,1}, {length(Shards1), length(Shards2), length(Shards3), length(Shards4)}), + ok. dummy_test(Config) -> Bucket = ?BUCKET, - [Node1, Node2 | _Nodes] = proplists:get_value(nodes, Config), + [[Node1, Node2] | _] = proplists:get_value(clusters, Config), + [Node1, Node2] = proplists:get_value(nodes, Config), Key = antidote_key, Type = antidote_crdt_counter_pn, Object = {Key, Type, Bucket}, diff --git a/test/utils/test_utils.erl b/test/utils/test_utils.erl index e3167861c..06d226f44 100644 --- a/test/utils/test_utils.erl +++ b/test/utils/test_utils.erl @@ -311,7 +311,7 @@ set_up_clusters_common(Config) -> ready -> ok; connect -> ct:pal("Creating a ring for claimant ~p and other nodes ~p", [Claimant, unpack(OtherNodes)]), - ok = rpc:call(Claimant, antidote_dc_manager, create_dc, [unpack(Cl)]) + ok = rpc:call(Claimant, antidote_dc_manager, add_nodes_to_dc, [unpack(Cl)]) end, Cl end,