Skip to content

Commit

Permalink
Cluster management pb fix (#372)
Browse files Browse the repository at this point in the history
* downgrade lager because of logging problems
* change test setup

- forking ct_slave because it cannot be configured as I want it to
- now sets the working directory for slaves, so lager logs to the
correct directory
- standard output of slaves is written to the ct-log and to stio.log
file, which should make debugging easier
- protocol buffer process now also reports stacktrace to client
* also testing connect_to_dcs
* use different node names for cluster management test
* Adapted to cleanup of antidote_pc_codec
* Upgraded to lint v0.1.10 which is checking more things.
* Few fixes for the connection descriptors
* Encoding ignore in pb_client
* Updating the dependencies
  • Loading branch information
peterzeller authored and bieniusa committed Jul 5, 2019
1 parent 52d5bc9 commit 09663bb
Show file tree
Hide file tree
Showing 30 changed files with 840 additions and 275 deletions.
22 changes: 15 additions & 7 deletions include/antidote.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@
commit_time :: dc_and_commit_time(),
snapshot_time :: snapshot_time()
}).
-type commit_log_payload() :: #commit_log_payload{}.

-record(update_log_payload, {
key :: key(),
bucket :: bucket(),
type :: type(),
op :: op()
}).
-type update_log_payload() :: #update_log_payload{}.

-record(abort_log_payload, {}).

Expand All @@ -126,6 +128,7 @@
| noop,
log_payload :: any_log_payload()
}).
-type log_operation() :: #log_operation{}.

-record(op_number, {
%% TODO 19 undefined is required here, because of the use in inter_dc_log_sender_vnode.
Expand All @@ -134,16 +137,17 @@
global :: undefined | non_neg_integer(),
local :: undefined | non_neg_integer()
}).
-type op_number() :: #op_number{}.

%% The way records are stored in the log.
-record(log_record, {
%% The version of the log record, for backwards compatability
version :: non_neg_integer(),
op_number :: #op_number{},
bucket_op_number :: #op_number{},
log_operation :: #log_operation{}
op_number :: op_number(),
bucket_op_number :: op_number(),
log_operation :: log_operation()
}).

-type log_record() :: #log_record{}.
%% Clock SI

%% MIN is Used for generating the timeStamp of a new snapshot
Expand Down Expand Up @@ -174,7 +178,7 @@
last_op_id :: op_num(),
value :: snapshot()
}).

-type materialized_snapshot() :: #materialized_snapshot{}.
%%---------------------------------------------------------------------
-type actor() :: term().
-type key() :: term().
Expand Down Expand Up @@ -245,7 +249,10 @@
bound_object/0,
module_name/0,
function_name/0,
clocksi_payload/0]).
clocksi_payload/0,
materialized_snapshot/0,
snapshot_get_response/0, log_operation/0, log_record/0, op_number/0,
update_log_payload/0, commit_log_payload/0]).


%% The record is using during materialization to keep the
Expand All @@ -256,9 +263,10 @@
%% size of ops_list
number_of_ops :: non_neg_integer(),
%% the previous snapshot to apply the ops to
materialized_snapshot :: #materialized_snapshot{},
materialized_snapshot :: materialized_snapshot(),
%% The version vector time of the snapshot
snapshot_time :: snapshot_time() | ignore,
%% true if this is the most recent snapshot in the cache
is_newest_snapshot :: boolean()
}).
-type snapshot_get_response() :: #snapshot_get_response{}.
15 changes: 12 additions & 3 deletions include/inter_dc_repl.hrl
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
-include("antidote_message_types.hrl").

-export_type([descriptor/0, interdc_txn/0, recvr_state/0, request_cache_entry/0]).

-record(recvr_state,
{lastRecvd :: orddict:orddict(), %TODO: this may not be required
lastCommitted :: orddict:orddict(),
%%Track timestamps from other DC which have been committed by this DC
recQ :: orddict:orddict(), %% Holds recieving updates from each DC separately in causal order.
statestore,
partition}).
-type recvr_state() :: #recvr_state{}.

-type socket_address() :: {inet:ip_address(), inet:port_number()}.
-type zmq_socket() :: any().
Expand All @@ -16,13 +19,14 @@
-record(interdc_txn, {
dcid :: dcid(),
partition :: partition_id(),
prev_log_opid :: #op_number{} | none, %% the value is *none* if the transaction is read directly from the log
prev_log_opid :: op_number() | none, %% the value is *none* if the transaction is read directly from the log
snapshot :: snapshot_time(),
timestamp :: clock_time(),
last_update_opid :: undefined | #op_number{}, %% last opid of the txn that was an update operations (i.e. not a commit/abort)
last_update_opid :: undefined | op_number(), %% last opid of the txn that was an update operations (i.e. not a commit/abort)
bucket :: bucket(),
log_records :: [#log_record{}] %% if the OP list is empty, the message is a HEARTBEAT
log_records :: [log_record()] %% if the OP list is empty, the message is a HEARTBEAT
}).
-type interdc_txn() :: #interdc_txn{}.

-record(descriptor, {
dcid :: dcid(),
Expand All @@ -31,6 +35,8 @@
logreaders :: [socket_address()]
}).

-type descriptor() :: #descriptor{}.

%% This keeps information about an inter-dc request that
%% is waiting for a reply
-record(request_cache_entry, {
Expand All @@ -40,6 +46,7 @@
pdcid :: pdcid(),
binary_req :: binary()
}).
-type request_cache_entry() :: #request_cache_entry{}.

%% This keeps information about an inter-dc request
%% on the site that is performing the query
Expand All @@ -49,6 +56,7 @@
request_id_num_binary :: binary(),
local_pid :: pid()
}).
-type inter_dc_query_state() :: #inter_dc_query_state{}.

%% State for sub buff
-record(inter_dc_sub_buf, {
Expand All @@ -58,3 +66,4 @@
queue :: queue:queue(),
logging_enabled :: boolean()
}).
-type inter_dc_sub_buf() :: #inter_dc_sub_buf{}.
17 changes: 9 additions & 8 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{deps, [
%% overwrite lager dependency since current version disables logger messages (see https://github.com/erlang-lager/lager/issues/492)
{lager, {git, "https://github.com/erlang-lager/lager", {ref, "411edc71ee9823f6ab9c6f617daccce3c6798a29"}}},
%% riak framework
{riak_core, "3.1.1", {pkg,riak_core_ng}},
% ranch socket acceptor pool for managing protocol buffer sockets
Expand All @@ -12,7 +14,6 @@
antidote_pb_codec,
antidotec_pb,
vectorclock,
lager,

% expose metrics for prometheus as HTTP-API
elli,
Expand Down Expand Up @@ -130,7 +131,7 @@

{profiles,[
{lint, [
{plugins, [{rebar3_lint, {git, "https://github.com/project-fifo/rebar3_lint.git", {tag, "0.1.2"}}}]}
{plugins, [{rebar3_lint, {git, "https://github.com/project-fifo/rebar3_lint.git", {tag, "v0.1.10"}}}]}
]},
{test, [
{erl_opts, [warnings_as_errors, debug_info, no_inline_list_funcs]},
Expand All @@ -151,7 +152,7 @@
{right, "++"},
{left, "++"}]}},
{elvis_style, god_modules,
#{limit => 25,
#{limit => 30,
ignore => []}},
{elvis_style, used_ignored_variable},
{elvis_style, no_behavior_info},
Expand All @@ -161,11 +162,11 @@
#{regex => "^[a-z]([a-z0-9]*_?)*(_SUITE)?$",
ignore => []}
},
{
elvis_style,
function_naming_convention,
#{regex => "^([a-z][a-z0-9]*_?)*$"}
},
% {
% elvis_style,
% function_naming_convention,
% #{regex => "^([a-z][a-z0-9]*_?)*$"}
% },
{elvis_style, state_record_and_type},
{elvis_style, no_spec_with_records}
]
Expand Down
14 changes: 8 additions & 6 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{"1.1.0",
[{<<"accept">>,{pkg,<<"accept">>,<<"0.3.0">>},1},
{<<"antidote_crdt">>,{pkg,<<"antidote_crdt">>,<<"0.1.2">>},0},
{<<"antidote_pb_codec">>,{pkg,<<"antidote_pb_codec">>,<<"0.0.5">>},0},
{<<"antidotec_pb">>,{pkg,<<"antidotec_pb">>,<<"0.2.4">>},0},
{<<"antidote_pb_codec">>,{pkg,<<"antidote_pb_codec">>,<<"0.1.0">>},0},
{<<"antidotec_pb">>,{pkg,<<"antidotec_pb">>,<<"0.2.7">>},0},
{<<"basho_stats">>,{pkg,<<"basho_stats">>,<<"1.0.3">>},1},
{<<"bear">>,{pkg,<<"bear">>,<<"0.8.7">>},2},
{<<"blume">>,{pkg,<<"blume">>,<<"0.1.1">>},1},
Expand All @@ -21,7 +21,10 @@
{<<"gen_fsm_compat">>,{pkg,<<"gen_fsm_compat">>,<<"0.3.0">>},1},
{<<"getopt">>,{pkg,<<"getopt">>,<<"1.0.1">>},2},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1},
{<<"lager">>,{pkg,<<"lager">>,<<"3.7.0">>},0},
{<<"lager">>,
{git,"https://github.com/erlang-lager/lager",
{ref,"411edc71ee9823f6ab9c6f617daccce3c6798a29"}},
0},
{<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.0">>},2},
{<<"pbkdf2">>,{pkg,<<"pbkdf2">>,<<"2.0.0">>},1},
{<<"poolboy">>,{pkg,<<"basho_poolboy">>,<<"0.8.4">>},1},
Expand All @@ -40,8 +43,8 @@
{pkg_hash,[
{<<"accept">>, <<"2505B60BCB992CA79BD03AB7B8FEC8A520A47D9730F286DF1A479CC98B03F94B">>},
{<<"antidote_crdt">>, <<"A92A5ED8918D87AD22557825743C6EAC69DD6089D536E1BF5F9AC80992FA97F8">>},
{<<"antidote_pb_codec">>, <<"139F291D7E4971DE3920E51D09BD10931AF957656ED6F7D9935B3EA059D88167">>},
{<<"antidotec_pb">>, <<"9B6A760D75AFF0BCFC6B136DE6E4065B4FB4AED5D6AF18246FDC5B8F7F64C12F">>},
{<<"antidote_pb_codec">>, <<"D8CC2D69BD25B3961ADECE20954FCB66719AA1DF1D027CF58389C8B7B6EFC739">>},
{<<"antidotec_pb">>, <<"4CB55E6EB47806B07BBC81BF4727F9CBF6DF2B001FB43B31BFAB01A8047E6CF7">>},
{<<"basho_stats">>, <<"7E1174151509C64FCC1934120ED32295E14F84DAAE7F84926BA2C8D3700D146C">>},
{<<"bear">>, <<"16264309AE5D005D03718A5C82641FCC259C9E8F09ADEB6FD79CA4271168656F">>},
{<<"blume">>, <<"CFB4F43688690BA81C6A79F54E4678CFD5FDEDAB692F277AE740AE4A3897360D">>},
Expand All @@ -56,7 +59,6 @@
{<<"gen_fsm_compat">>, <<"5903549F67D595F58A7101154CBE0FDD46955FBFBE40813F1E53C23A970FF5F4">>},
{<<"getopt">>, <<"C73A9FA687B217F2FF79F68A3B637711BB1936E712B521D8CE466B29CBF7808A">>},
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"lager">>, <<"563AB17CD32134A3DD17EC3B3622E6D8F827506AA4F8C489158879BED87D980B">>},
{<<"parse_trans">>, <<"09765507A3C7590A784615CFD421D101AEC25098D50B89D7AA1D66646BC571C1">>},
{<<"pbkdf2">>, <<"11C23279FDED5C0027AB3996CFAE77805521D7EF4BABDE2BD7EC04A9086CF499">>},
{<<"poolboy">>, <<"45C306FF1C9F6451730DD21642EDF55FA72EBD5E2FE4A38D8D8A56B8EA21A256">>},
Expand Down
6 changes: 3 additions & 3 deletions src/antidote_dc_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,19 @@ create_dc(Nodes) ->
wait_until_ring_converged(Nodes),
ok = wait_until(hd(Nodes), fun wait_init:check_ready/1),
%% starts metadata services needed for intra-dc communication
ok = inter_dc_manager:start_bg_processes(stable),
ok = inter_dc_manager:start_bg_processes(stable_time_functions),
ok.

%% Start receiving updates from other DCs
-spec subscribe_updates_from([#descriptor{}]) -> ok.
-spec subscribe_updates_from([descriptor()]) -> ok.
subscribe_updates_from(DCDescriptors) ->
_Connected = inter_dc_manager:observe_dcs_sync(DCDescriptors),
%%TODO Check return for errors
ok = inter_dc_manager:dc_successfully_started(),
ok.

%% Get the DC connection descriptor to be given to other DCs
-spec get_connection_descriptor() -> {ok, #descriptor{}}.
-spec get_connection_descriptor() -> {ok, descriptor()}.
get_connection_descriptor() ->
inter_dc_manager:get_descriptor().

Expand Down
68 changes: 37 additions & 31 deletions src/antidote_pb_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,27 @@

-export([process/1]).

-spec decode_clock(binary()) -> snapshot_time() | ignore.
decode_clock(Clock) ->
-spec from_bin(binary()) -> snapshot_time() | ignore | txid().
from_bin(Clock) ->
case Clock of
undefined -> ignore;
_ -> binary_to_term(Clock)
end.

-spec encode_clock(snapshot_time() | txid()) -> binary().
encode_clock(TxId) ->
term_to_binary(TxId).

-spec process(antidote_pb_codec:request()) -> antidote_pb_codec:response().
process({start_transaction, {Clock, Properties}}) ->
Response = antidote:start_transaction(decode_clock(Clock), Properties),
-spec process(antidote_pb_codec:request()) -> antidote_pb_codec:response_in().
process({start_transaction, Clock, Properties}) ->
Response = antidote:start_transaction(from_bin(Clock), Properties),
case Response of
{ok, TxId} -> {start_transaction_response, {ok, TxId}};
{ok, TxId} -> {start_transaction_response, {ok, encode_clock(TxId)}};
{error, Reason} -> {start_transaction_response, {error, Reason}}
end;

process({abort_transaction, TxId}) ->
Response = antidote:abort_transaction(TxId),
Response = antidote:abort_transaction(from_bin(TxId)),
case Response of
ok -> {operation_response, ok};
{error, Reason} -> {operation_response, {error, Reason}}
Expand All @@ -64,39 +67,39 @@ process({abort_transaction, TxId}) ->
end;

process({commit_transaction, TxId}) ->
Response = antidote:commit_transaction(TxId),
Response = antidote:commit_transaction(from_bin(TxId)),
case Response of
{error, Reason} -> {commit_response, {error, Reason}};
{ok, CommitTime} -> {commit_response, {ok, CommitTime}}
{ok, CommitTime} -> {commit_response, {ok, encode_clock(CommitTime)}}
end;

process({update_objects, {Updates, TxId}}) ->
Response = antidote:update_objects(Updates, TxId),
process({update_objects, Updates, TxId}) ->
Response = antidote:update_objects(Updates, from_bin(TxId)),
case Response of
{error, Reason} -> {operation_response, {error, Reason}};
ok -> {operation_response, ok}
end;

process({static_update_objects, {Clock, Properties, Updates}}) ->
Response = antidote:update_objects(decode_clock(Clock), Properties, Updates),
process({static_update_objects, Clock, Properties, Updates}) ->
Response = antidote:update_objects(from_bin(Clock), Properties, Updates),
case Response of
{error, Reason} -> {commit_response, {error, Reason}};
{ok, CommitTime} -> {commit_response, {ok, CommitTime}}
{ok, CommitTime} -> {commit_response, {ok, encode_clock(CommitTime)}}
end;

process({read_objects, {Objects, TxId}}) ->
Response = antidote:read_objects(Objects, TxId),
process({read_objects, Objects, TxId}) ->
Response = antidote:read_objects(Objects, from_bin(TxId)),
case Response of
{error, Reason} -> {read_objects_response, {error, Reason}};
{ok, Results} -> {read_objects_response, {ok, lists:zip(Objects, Results)}}
end;


process({static_read_objects, {Clock, Properties, Objects}}) ->
Response = antidote:read_objects(decode_clock(Clock), Properties, Objects),
process({static_read_objects, Clock, Properties, Objects}) ->
Response = antidote:read_objects(from_bin(Clock), Properties, Objects),
case Response of
{error, Reason} ->{commit_response, {error, Reason}};
{ok, Results, CommitTime} -> {static_read_objects_response, {ok, lists:zip(Objects, Results), CommitTime}}
{error, Reason} -> {error_response, {error, Reason}};
{ok, Results, CommitTime} -> {static_read_objects_response, {lists:zip(Objects, Results), encode_clock(CommitTime)}}
end;

process({create_dc, NodeNames}) ->
Expand All @@ -106,30 +109,33 @@ process({create_dc, NodeNames}) ->
catch
Error:Reason -> %% Some error, return unsuccess. TODO: correct error response
logger:info("Create DC Failed ~p : ~p", [Error, Reason]),
{operation_response, {error, create_dc_failed}}
{operation_response, {error, unknown}}
end;

process({get_connection_descriptor}) ->
process(get_connection_descriptor) ->
try
{ok, Descriptor} = antidote_dc_manager:get_connection_descriptor(),
logger:info("Conection Descriptor: ~p", [Descriptor]),
{get_connection_descriptor_resp, {ok, term_to_binary(Descriptor)}}
catch
Error:Reason -> %% Some error, return unsuccess. TODO: correct error response
logger:info("Get Conection Descriptor ~p : ~p", [Error, Reason]),
{get_connection_descriptor_resp, {error, no_clue}}
logger:info("Failed Conection Descriptor ~p : ~p", [Error, Reason]),
{get_connection_descriptor_resp, {error, unknown}}
end;

process({connect_to_dcs, Descriptors}) ->
process({connect_to_dcs, BinDescriptors}) ->
try
Descriptors = [binary_to_term(D) || D <- BinDescriptors],
logger:info("Conection Descriptor: ~p", [Descriptors]),
ok = antidote_dc_manager:subscribe_updates_from(Descriptors),
{operation_response, ok}
catch
Error:Reason -> %% Some error, return unsuccess. TODO: correct error response
logger:info("Connect to DCs Failed ~p : ~p", [Error, Reason]),
{operation_response, {error, connect_to_dcs_failed}}
end;
{operation_response, {error, unknown}}
end.

process(Message) ->
logger:error("Received unhandled message ~p~n", [Message]),
MessageStr = erlang:iolist_to_binary(io_lib:format("~p", [Message])),
{error_response, {unknown, <<"Unhandled message ", MessageStr/binary>>}}.
% process(Message) ->
% logger:error("Received unhandled message ~p~n", [Message]),
% MessageStr = erlang:iolist_to_binary(io_lib:format("~p", [Message])),
% {error_response, {unknown, <<"Unhandled message ", MessageStr/binary>>}}.
Loading

0 comments on commit 09663bb

Please sign in to comment.