diff --git a/src/riak_repl_aae_sink.erl b/src/riak_repl_aae_sink.erl index 2dcdf6a7..0383e89c 100644 --- a/src/riak_repl_aae_sink.erl +++ b/src/riak_repl_aae_sink.erl @@ -138,9 +138,11 @@ process_msg(?MSG_GET_AAE_BUCKET, {Level,BucketNum,IndexN}, State=#state{tree_pid ResponseMsg = riak_kv_index_hashtree:exchange_bucket(IndexN, Level, BucketNum, TreePid), send_reply(ResponseMsg, State); -process_msg(?MSG_GET_AAE_SEGMENT, {SegmentNum,IndexN}, State=#state{tree_pid=TreePid}) -> - ResponseMsg = riak_kv_index_hashtree:exchange_segment(IndexN, SegmentNum, TreePid), - send_reply(ResponseMsg, State); +process_msg(?MSG_GET_AAE_SEGMENT, {SegmentNum,IndexN}, State) -> + riak_repl_stats:aae_segments_requested(), + State#state.sender ! + {?MSG_GET_AAE_SEGMENT, {SegmentNum, IndexN, State#state.tree_pid}}, + {noreply, State}; %% no reply process_msg(?MSG_PUT_OBJ, {fs_diff_obj, BObj}, State) -> @@ -207,7 +209,15 @@ sender_init(Transport, Socket) -> sender_loop({Transport, Socket}). sender_loop(State={Transport, Socket}) -> - receive Msg -> + receive + {?MSG_GET_AAE_SEGMENT, {SegmentNum, IndexN, TreePid}} -> + KeysHashes = + riak_kv_index_hashtree:exchange_segment( + IndexN, SegmentNum, TreePid), + riak_repl_stats:keys_hashes_returned(length(KeysHashes)), + DataBin = term_to_binary(KeysHashes), + ok = Transport:send(Socket, <>); + Msg -> ok = Transport:send(Socket, Msg) end, ?MODULE:sender_loop(State). diff --git a/src/riak_repl_aae_source.erl b/src/riak_repl_aae_source.erl index 8ff3acfb..3596e4a0 100644 --- a/src/riak_repl_aae_source.erl +++ b/src/riak_repl_aae_source.erl @@ -99,8 +99,8 @@ cancel_fullsync(Pid) -> %%%=================================================================== init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) -> - lager:debug("AAE fullsync source worker started for partition ~p", - [Partition]), + lager:info( + "AAE fullsync source worker started for partition ~p", [Partition]), Ver = riak_repl_util:deduce_wire_version_from_proto(Proto), {_, ClientVer, _} = Proto, @@ -285,7 +285,9 @@ update_trees(tree_built, State = #state{indexns=IndexNs}) -> NeededBuilts -> %% Trees built now we can estimate how many keys {ok, EstimatedNrKeys} = riak_kv_index_hashtree:estimate_keys(State#state.tree_pid), - lager:debug("EstimatedNrKeys ~p for partition ~p", [EstimatedNrKeys, State#state.index]), + lager:info( + "EstimatedNrKeys ~p for partition ~p", + [EstimatedNrKeys, State#state.index]), lager:debug("Moving to key exchange state"), key_exchange(init, State#state{built=Built, estimated_nr_keys = EstimatedNrKeys}); @@ -320,7 +322,7 @@ key_exchange(cancel_fullsync, State) -> {stop, normal, State}; key_exchange(finish_fullsync, State=#state{owner=Owner}) -> send_complete(State), - lager:debug("AAE fullsync source completed partition ~p", + lager:info("AAE fullsync source completed partition ~p", [State#state.index]), riak_repl2_fssource:fullsync_complete(Owner), %% TODO: Why stay in key_exchange? Should we stop instead? @@ -341,8 +343,9 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, tree_pid=TreePid, exchange=Exchange, indexns=[IndexN|_IndexNs]}) -> - lager:debug("Starting fullsync key exchange with ~p for ~p/~p", - [Cluster, Partition, IndexN]), + lager:info( + "Starting fullsync key exchange with ~p for ~p/~p", + [Cluster, Partition, IndexN]), SourcePid = self(), @@ -396,14 +399,20 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster, end, %% TODO: Add stats for AAE - lager:debug("Starting compare for partition ~p", [Partition]), - spawn_link(fun() -> - StageStart=os:timestamp(), - Exchange2 = riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, Exchange, TreePid), - lager:debug("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)", - [State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]), - gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2}) - end), + lager:info("Starting compare for partition ~p", [Partition]), + spawn_link( + fun() -> + StageStart = os:timestamp(), + Exchange2 = + riak_kv_index_hashtree:compare( + IndexN, Remote, AccFun, Exchange, TreePid), + lager:info( + "Full-sync with site ~p; fullsync difference generator for ~p completion_time=~p secs", + [State#state.cluster, + Partition, + riak_repl_util:elapsed_secs(StageStart)]), + gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2}) + end), %% wait for differences from bloom_folder or to be done {next_state, compute_differences, State}. diff --git a/src/riak_repl_stats.erl b/src/riak_repl_stats.erl index daec4874..8df16f7f 100644 --- a/src/riak_repl_stats.erl +++ b/src/riak_repl_stats.erl @@ -41,7 +41,9 @@ clear_rt_dirty/0, touch_rt_dirty_file/0, remove_rt_dirty_file/0, - is_rt_dirty/0]). + is_rt_dirty/0, + aae_segments_requested/0, + keys_hashes_returned/1]). -define(APP, riak_repl). @@ -107,6 +109,12 @@ elections_elected() -> elections_leader_changed() -> increment_counter(elections_leader_changed). +aae_segments_requested() -> + increment_counter(aae_segments_requested). + +keys_hashes_returned(Length) -> + increment_counter(keys_hashes_returned, Length). + %% If any source errors are detected, write a file out to persist this status %% across restarts rt_source_errors() -> @@ -212,7 +220,9 @@ stats() -> {last_server_bytes_recv, gauge}, {rt_source_errors, counter}, {rt_sink_errors, counter}, - {rt_dirty, counter}]. + {rt_dirty, counter}, + {aae_segments_requested, counter}, + {keys_hashes_returned, counter}]. increment_counter(Name) -> increment_counter(Name, 1). @@ -427,11 +437,24 @@ test_check_stats() -> {server_tx_kbps,[]}, {rt_source_errors,1}, {rt_sink_errors, 1}, - {rt_dirty, 2}], + {rt_dirty, 2}, + {aae_segments_requested, 0}, + {keys_hashes_returned, 0}], Result = get_stats(), ?assertEqual(Expected, - [{K1, V} || {K1, V} <- Result, {K2, _} <- Expected, K1 == K2]). + [{K1, V} || {K1, V} <- Result, {K2, _} <- Expected, K1 == K2]), + + riak_repl_stats:aae_segments_requested(), + riak_repl_stats:keys_hashes_returned(100), + + UpdResult = get_stats(), + ?assertMatch( + {aae_segments_requested, 1}, + lists:keyfind(aae_segments_requested, 1, UpdResult)), + ?assertMatch( + {keys_hashes_returned, 100}, + lists:keyfind(keys_hashes_returned, 1, UpdResult)). test_report() ->