Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Oct 11, 2024
1 parent 611b7d1 commit 7650dc4
Show file tree
Hide file tree
Showing 11 changed files with 602 additions and 385 deletions.
346 changes: 170 additions & 176 deletions src/ra_log.erl

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions src/ra_log_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ delete_mem_table(#{log_ets := Name}, UId) ->
ra_log_memtbl:delete_spec(),
ra_log_memtbl:state()) ->
ok.
execute_delete(#{}, undefined, _Mt) ->
ok;
execute_delete(#{log_ets := Name}, Spec, Mt) ->
gen_server:cast(Name, {exec_delete, Spec, Mt}).

Expand Down Expand Up @@ -150,9 +152,9 @@ handle_call(_Request, _From, State) ->
% end;
handle_cast({exec_delete, Spec, Mt}, State) ->
try timer:tc(fun () -> ra_log_memtbl:delete(Spec, Mt) end) of
{Time, _} ->
?DEBUG("ra_log_ets: ets:delete/1 took ~bms to delete ~w",
[Time div 1000, Spec]),
{Time, Num} ->
ct:pal("ra_log_ets: ets:delete/1 took ~bms to delete ~w ~b entries",
[Time div 1000, Spec, Num]),
ok
catch
_:Err ->
Expand Down
136 changes: 79 additions & 57 deletions src/ra_log_memtbl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
get_items/2,
record_flushed/3,
set_first/2,
set_last/2,
delete/2,
tid/1,
is_active/2,
prev/1,
info/1,
range/1,
Expand All @@ -36,9 +36,9 @@
(is_tuple(Range) andalso
Idx < element(1, Range))).

-define(IS_AFTER_RANGE(Idx, Range),
(is_tuple(Range) andalso
Idx > element(2, Range))).
% -define(IS_AFTER_RANGE(Idx, Range),
% (is_tuple(Range) andalso
% Idx > element(2, Range))).

% -define(RANGE_OUT(Idx, Range),
% (Idx < element(1, Range) orelse
Expand All @@ -57,7 +57,8 @@

-opaque state() :: #?MODULE{}.

-type delete_spec() :: ra:index() |
-type delete_spec() :: undefined |
ra:index() |
{'<', ra:index()} |
{all, ets:tid()} |
{delete, ets:tid()} |
Expand Down Expand Up @@ -115,25 +116,26 @@ insert({Idx, _, _} = _Entry,
?IS_BEFORE_RANGE(Idx, Range) ->
{error, overwriting}.

-spec stage(log_entry(), state()) -> state().
-spec stage(log_entry(), state()) ->
{ok, state()} | {error, overwriting}.
stage({Idx, _, _} = Entry,
#?MODULE{ staged = {FstIdx, Staged},
range = Range} = State)
#?MODULE{staged = {FstIdx, Staged},
range = Range} = State)
when ?IS_NEXT_IDX(Idx, Range) ->
State#?MODULE{staged = {FstIdx, [Entry | Staged]},
range = update_range_end(Idx, Range)};
{ok, State#?MODULE{staged = {FstIdx, [Entry | Staged]},
range = update_range_end(Idx, Range)}};
stage({Idx, _, _} = Entry,
#?MODULE{tid = _Tid,
staged = undefined,
range = Range} = State)
#?MODULE{tid = _Tid,
staged = undefined,
range = Range} = State)
when ?IS_NEXT_IDX(Idx, Range) ->
State#?MODULE{staged = {Idx, [Entry]},
range = update_range_end(Idx, Range)};
{ok, State#?MODULE{staged = {Idx, [Entry]},
range = update_range_end(Idx, Range)}};
stage({Idx, _, _} = _Entry,
#?MODULE{range = Range} = _State0)
#?MODULE{range = Range} = _State0)
when ?IN_RANGE(Idx, Range) orelse
?IS_BEFORE_RANGE(Idx, Range) ->
exit({error, overwriting}).
{error, overwriting}.

-spec commit(state()) -> state().
commit(#?MODULE{staged = undefined} = State) ->
Expand Down Expand Up @@ -178,12 +180,12 @@ lookup_term(Idx, #?MODULE{staged = {FstStagedIdx, Staged}})
lookup_term(Idx, #?MODULE{tid = Tid,
range = Range})
when ?IN_RANGE(Idx, Range) ->
ets:lookup_element(Tid, Idx, 2);
ets:lookup_element(Tid, Idx, 2, undefined);
lookup_term(_Idx, _State) ->
undefined.

-spec tid_for(ra:index(), ra_term(), state()) ->
undefine | ets:tid().
undefined | ets:tid().
tid_for(_Idx, _Term, undefined) ->
undefined;
tid_for(Idx, Term, State) ->
Expand Down Expand Up @@ -225,15 +227,18 @@ delete({range, Tid, {Start, End}}, #?MODULE{tid = Tid} = State) ->
Limit = ets:info(Tid, size) div 2,
case NumToDelete > Limit of
true ->
ct:pal("delete < ~b ~b", [Start, End]),
%% more than half the table is to be deleted
delete({'<', Tid, End + 1}, State);
false ->
ct:pal("delete ~b ~b", [Start, End]),
delete(Start, End, Tid),
End - Start + 1
end;
delete({Op, Tid, Idx}, #?MODULE{tid = Tid})
when is_integer(Idx) and is_atom(Op) ->
DelSpec = [{{'$1', '_', '_'}, [{'<', '$1', Idx}], [true]}],
ct:pal("DelSpec ~p", [DelSpec]),
ets:select_delete(Tid, DelSpec);
delete({all, Tid}, #?MODULE{tid = Tid}) ->
Size = ets:info(Tid, size),
Expand All @@ -259,6 +264,10 @@ delete(#?MODULE{tid = Tid}) ->
tid(#?MODULE{tid = Tid}) ->
Tid.

-spec is_active(ets:tid(), state()) -> boolean().
is_active(Tid, State) ->
Tid =:= tid(State).

-spec prev(state()) -> undefined | state().
prev(#?MODULE{prev = Prev}) ->
Prev.
Expand All @@ -273,50 +282,70 @@ info(#?MODULE{tid = Tid,
has_previous => Prev =/= undefined
}.

-spec record_flushed(ets:tid(), ra:range(), state()) -> {delete_spec(), state()}.
record_flushed(Tid, {_, End}, #?MODULE{tid = Tid} = State0) ->
case set_first(End+1, State0) of
{[], State} ->
{undefined, State};
{[Spec], State} ->
{Spec, State}
end;
-spec record_flushed(ets:tid(), ra:range(), state()) ->
{delete_spec(), state()}.
record_flushed(TID = Tid, {Start, End},
#?MODULE{tid = TID,
range = Range} = State) ->
case ?IN_RANGE(End, Range) of
true ->
{{range, Tid, {Start, End}},
State#?MODULE{range = ra_range:truncate(End, Range)}};
false ->
{undefined, State}
end;
record_flushed(_Tid, _Range, #?MODULE{prev = undefined} = State) ->
{undefined, State};
record_flushed(Tid, Range, #?MODULE{prev = Prev0} = State) ->
case record_flushed(Tid, Range, Prev0) of
{{all, Tid}, _Prev} ->
%% upgrade all spec to delete
{Spec, Prev} = record_flushed(Tid, Range, Prev0),
case range(Prev) of
undefined ->
%% the prev table is now empty and can be deleted,
{{delete, Tid}, State#?MODULE{prev = undefined}};
{Spec, Prev} ->
_ ->
{Spec, State#?MODULE{prev = Prev}}
end.

-spec set_first(ra:index(), state()) ->
{[delete_spec()], state()}.
set_first(Idx, #?MODULE{tid = Tid,
range = {Start, _} = Range} = State)
when ?IN_RANGE(Idx, Range) ->
{[{range, Tid, {Start, Idx - 1}}],
State#?MODULE{range = update_range_start(Idx, Range)}};
set_first(Idx, #?MODULE{tid = Tid,
range = Range} = State)
when ?IS_AFTER_RANGE(Idx, Range) ->
{[{all, Tid}], State#?MODULE{range = undefined}};
range = Range,
prev = Prev0} = State)
when (is_tuple(Range) andalso Idx > element(1, Range)) orelse
Range == undefined ->
{PrevSpecs, Prev} = case Prev0 of
undefined ->
{[], undefined};
_ ->
case set_first(Idx, Prev0) of
{[{range, PTID, _} | Rem],
#?MODULE{tid = PTID} = P} = Res ->
%% set_first/2 returned a range spec for
%% prev and prev is now empty,
%% upgrade to delete spec of whole tid
case range(P) of
undefined ->
{[{delete, tid(P)} | Rem],
prev(P)};
_ ->
Res
end;
Res ->
Res
end
end,
Specs = case Range of
{Start, End} ->
[{range, Tid, {Start, min(Idx - 1, End)}} | PrevSpecs];
undefined ->
PrevSpecs
end,
{Specs,
State#?MODULE{range = ra_range:truncate(Idx - 1, Range),
prev = Prev}};
set_first(_Idx, State) ->
%% TODO fall back to prev
{[], State}.

-spec set_last(ra:index(), state()) ->
{undefined | delete_spec(), state()}.
set_last(Idx, #?MODULE{range = Range} = State)
when ?IN_RANGE(Idx, Range) ->
{{'>', Idx}, State#?MODULE{range = update_range_end(Idx, Range)}};
set_last(Idx, #?MODULE{range = {Start, _End}} = State)
when Idx < Start ->
{all, State#?MODULE{range = undefined}};
set_last(_Idx, State) ->
{undefined, State}.

%% internal

Expand All @@ -327,13 +356,6 @@ update_range_end(Idx, {Start, End})
update_range_end(Idx, undefined) ->
{Idx, Idx}.

update_range_start(Idx, {Start, End})
when Idx >= Start andalso
Idx =< End ->
{Idx, End};
update_range_start(_Idx, Range) ->
Range.

delete(End, End, Tid) ->
ets:delete(Tid, End);
delete(Start, End, Tid) ->
Expand Down
Loading

0 comments on commit 7650dc4

Please sign in to comment.