Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Oct 15, 2024
1 parent 205bc0d commit a4daa6d
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 55 deletions.
4 changes: 3 additions & 1 deletion src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
-type server_id() :: ra_server_id().
-type cluster_name() :: ra_cluster_name().
-type range() :: ra_range:range().
-type uid() :: ra_uid().

-type query_condition() :: {applied, idxterm()}.
%% A condition that a query will wait for it to become true before it is
Expand All @@ -132,7 +133,8 @@
range/0,
query_fun/0,
query_condition/0,
from/0]).
from/0,
uid/0]).

%% @doc Starts the ra application.
%% @end
Expand Down
8 changes: 4 additions & 4 deletions src/ra_log_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ start_link(#{names := #{log_ets := Name}} = Cfg) ->
gen_server:start_link({local, Name}, ?MODULE, [Cfg], []).

-spec mem_table_please(ra_system:names(), ra:uid()) ->
{ok, ra_log_membtbl:state()} | {error, term()}.
{ok, ra_log_memtbl:state()} | {error, term()}.
mem_table_please(Names, UId) ->
mem_table_please(Names, UId, read_write).

-spec mem_table_please(ra_system:names(), ra:uid(), read | read_write) ->
{ok, ra_log_membtbl:state()} | {error, term()}.
{ok, ra_log_memtbl:state()} | {error, term()}.
mem_table_please(#{log_ets := Name,
open_mem_tbls := OpnMemTbls}, UId, Mode) ->
case ets:lookup(OpnMemTbls, UId) of
Expand All @@ -63,8 +63,8 @@ mem_table_please(#{log_ets := Name,
{ok, Mt}
end.

-spec new_mem_table_please(ra_system:names(), ra:uid(), ra_log_membtbl:state()) ->
{ok, ra_log_membtbl:state()} | {error, term()}.
-spec new_mem_table_please(ra_system:names(), ra:uid(), ra_log_memtbl:state()) ->
{ok, ra_log_memtbl:state()} | {error, term()}.
new_mem_table_please(#{log_ets := Name}, UId, Prev) ->
case gen_server:call(Name, {new_mem_table_please, UId}, infinity) of
{ok, Tid} ->
Expand Down
13 changes: 3 additions & 10 deletions src/ra_log_memtbl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,13 @@ init(Tid, Mode) ->
read ->
undefined;
read_write ->
%% TODO: this is potentially horrendously slow
case ets:tab2list(Tid) of
[] ->
undefined;
Entries ->
lists:foldl(
fun ({I, _, _}, undefined) ->
%% TODO: mt: can this be optimised further?
ets:foldl(fun ({I, _, _}, undefined) ->
{I, I};
({I, _, _}, {S, E}) ->
{min(I, S), max(I, E)}
end, undefined, Entries)
end
end, undefined, Tid)
end,

#?MODULE{tid = Tid,
range = Range
}.
Expand Down
2 changes: 1 addition & 1 deletion src/ra_log_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
%%
%%
-module(ra_log_reader).

-compile(inline_list_funcs).
Expand Down
16 changes: 10 additions & 6 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ get_overview(#state{data_dir = Dir,

flush_mem_table_ranges({ServerUId, TidRanges0},
#state{system = System} = State) ->
SnapIdx = start_index(ServerUId, -1),
SnapIdx = snap_idx(ServerUId),

%% truncate and limit all ranges to create a contiguous non-oeverlapping
%% truncate and limit all ranges to create a contiguous non-overlapping
%% list of tid ranges to flush to disk
TidRanges =
lists:foldl(fun ({T, Range}, []) ->
Expand All @@ -265,7 +265,8 @@ flush_mem_table_ranges({ServerUId, TidRanges0},

SegRefs0 = lists:append(
[flush_mem_table_range(ServerUId, TidRange, State)
|| TidRange <- TidRanges]),
|| {_Tid, Range} = TidRange <- TidRanges,
Range =/= undefined]),

%% compact cases where a segment was appended in a subsequent call to
%% flush_mem_table_range
Expand Down Expand Up @@ -329,11 +330,14 @@ flush_mem_table_range(ServerUId, {Tid, {StartIdx0, EndIdx}},
end.

start_index(ServerUId, StartIdx0) ->
max(snap_idx(ServerUId) + 1, StartIdx0).

snap_idx(ServerUId) ->
case ets:lookup(ra_log_snapshot_state, ServerUId) of
[{_, SnapIdx}] ->
max(SnapIdx + 1, StartIdx0);
[] ->
StartIdx0
SnapIdx;
_ ->
-1
end.

send_segments(System, ServerUId, TidRanges, SegRefs) ->
Expand Down
36 changes: 18 additions & 18 deletions test/ra_log_ets_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,24 @@ end_per_testcase(_TestCase, _Config) ->
%%% Test cases
%%%===================================================================

deletes_tables(Config) ->
Conf0 = ra_system:default_config(),
Conf = Conf0#{data_dir => ?config(priv_dir, Config)},
Names = maps:get(names, Conf),
_ = ra_log_ets:start_link(Conf),
T1 = ets:new(t1, []),
T2 = ets:new(t2, []),
T3 = ets:new(t2, [public]),
ra_log_ets:give_away(Names, T1),
ra_log_ets:give_away(Names, T2),
ra_log_ets:give_away(Names, T3),
ets:delete(T3),
ra_log_ets:delete_tables(Names, [T1, T2, T3]),
%% ensure prior messages have been processed
gen_server:call(ra_log_ets, noop),
undefined = ets:info(T1),
undefined = ets:info(T2),
proc_lib:stop(ra_log_ets),
deletes_tables(_Config) ->
% Conf0 = ra_system:default_config(),
% Conf = Conf0#{data_dir => ?config(priv_dir, Config)},
% Names = maps:get(names, Conf),
% _ = ra_log_ets:start_link(Conf),
% T1 = ets:new(t1, []),
% T2 = ets:new(t2, []),
% T3 = ets:new(t2, [public]),
% ra_log_ets:give_away(Names, T1),
% ra_log_ets:give_away(Names, T2),
% ra_log_ets:give_away(Names, T3),
% ets:delete(T3),
% ra_log_ets:delete_tables(Names, [T1, T2, T3]),
% %% ensure prior messages have been processed
% gen_server:call(ra_log_ets, noop),
% undefined = ets:info(T1),
% undefined = ets:info(T2),
% proc_lib:stop(ra_log_ets),
ok.

%% Utility
13 changes: 0 additions & 13 deletions test/ra_log_memtbl_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,6 @@ perf(_Config) ->
% end || {Name, Tid} <- Tables
% ],

% [begin
% {Taken, MtOut} = timer:tc(?MODULE, delete_n, [0, From, Mt]),
% #{name := Name, size := Size} = ra_log_memtbl:info(MtOut),
% ct:pal("~s delete_n size left ~b took ~bms",
% [Name, Size, Taken div 1000]),
% ok
% end || Mt <- Tables
% ],

DelTo = (trunc(Num * 0.9)),
% DelSpec = [{{'$1', '_'}, [], [{'<', '$1', DelTo}]}],
Expand Down Expand Up @@ -472,11 +464,6 @@ insert_n(K, N, Data, Mt) ->
insert_n(K+1, N, Data,
element(2, ra_log_memtbl:insert({K, 42, Data}, Mt))).

delete_n(N, N, Mt) ->
Mt;
delete_n(K, N, Mt) ->
delete_n(K+1, N, ra_log_memtbl:delete(K, Mt)).

do_n(N, N, _Fun) ->
ok;
do_n(N, To, Fun) ->
Expand Down
4 changes: 2 additions & 2 deletions test/ra_log_props_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ deliver_log_events(Log0, Timeout) ->

consume_events(Log0, Last) ->
receive
{ra_log_event, {written, {_, To, Term}} = Evt} ->
{ra_log_event, {written, Term, {_, To}} = Evt} ->
{Log, _} = ra_log:handle_event(Evt, Log0),
consume_events(Log, {To, Term})
after 0 ->
Expand All @@ -800,7 +800,7 @@ consume_events(Log0, Last) ->

consume_all_events(Log0, Last) ->
receive
{ra_log_event, {written, {_, To, Term}} = Evt} ->
{ra_log_event, {written, Term, {_, To}} = Evt} ->
{Log, _} = ra_log:handle_event(Evt, Log0),
consume_events(Log, {To, Term})
after 15000 ->
Expand Down

0 comments on commit a4daa6d

Please sign in to comment.