Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Oct 14, 2024
1 parent c3083a1 commit 205bc0d
Show file tree
Hide file tree
Showing 12 changed files with 32 additions and 407 deletions.
6 changes: 6 additions & 0 deletions src/ra_ets_queue.erl
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
%% @hidden
-module(ra_ets_queue).

-export([
Expand Down
31 changes: 7 additions & 24 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,9 @@ init(#{uid := UId,
end,
%% TODO dont thing this is necessary given the range is calculated from this
%% but can't hurt as it will do some cleanup
{DeleteSpec, Mt} = ra_log_memtbl:set_first(FirstIdx, Mt0),
{DeleteSpecs, Mt} = ra_log_memtbl:set_first(FirstIdx, Mt0),

exec_mem_table_delete(Names, DeleteSpec, Mt),
% _NumDeleted = ra_log_memtbl:delete(DeleteSpec, Mt),
exec_mem_table_delete(Names, DeleteSpecs, Mt),
%% TODO: can there be obsolete segments returned here?
Reader0 = ra_log_reader:init(UId, Dir, FirstIdx, MaxOpen, AccessPattern, SegRefs,
Names, Counter),
Expand Down Expand Up @@ -358,21 +357,6 @@ write([{FstIdx, _, _} = _First | _Rest] = Entries,
when FstIdx =< LastIdx + 1 andalso
FstIdx >= 0 ->
write_entries(Entries, State00);
% case ra_snapshot:current(SnapState) of
% {SnapIdx, _} when FstIdx =:= SnapIdx + 1 ->
% % it is the next entry after a snapshot
% % we need to tell the wal to truncate as we
% % are not going to receive any entries prior to the snapshot
% try wal_truncate_write(State00, First) of
% State0 ->
% % write the rest normally
% write_entries(Rest, State0)
% catch error:wal_down ->
% {error, wal_down}
% end;
% _ ->
% write_entries(Entries, State00)
% end;
write([], State) ->
{ok, State};
write([{Idx, _, _} | _], #?MODULE{cfg = #cfg{uid = UId},
Expand Down Expand Up @@ -1225,8 +1209,6 @@ resend_from0(Idx, #?MODULE{last_resend_time = {LastResend, WalPid},
% %% TODO mt: ra_log_memtbl:abort/1
% {error, {integrity_error, lists:flatten(Msg)}}.

write_entries([], State) ->
{ok, State};
write_entries(Entries, #?MODULE{cfg = Cfg, mem_table = Mt0} = State0) ->
case stage_entries(Cfg, Entries, Mt0) of
{ok, Mt} ->
Expand Down Expand Up @@ -1325,7 +1307,7 @@ await_written_idx(Idx, Term, Log0) ->
{ra_log_event, {written, Term, {_, IDX}} = Evt} ->
{Log, _} = handle_event(Evt, Log0),
Log;
{ra_log_event, {written, _} = Evt} ->
{ra_log_event, {written, _, _} = Evt} ->
{Log, _} = handle_event(Evt, Log0),
await_written_idx(Idx, Term, Log)
after ?LOG_APPEND_TIMEOUT ->
Expand Down Expand Up @@ -1384,9 +1366,10 @@ maps_with_values(Keys, Map) ->
now_ms() ->
erlang:system_time(millisecond).

exec_mem_table_delete(#{} = _Names, Spec, Mt) ->
% ra_log_memtbl:delete(Spec, Mt),
ra_log_ets:execute_delete(_Names, Spec, Mt),
exec_mem_table_delete(#{} = Names, Specs, Mt)
when is_list(Specs) ->
[ra_log_ets:execute_delete(Names, Spec, Mt)
|| Spec <- Specs],
ok.

%%%% TESTS
Expand Down
222 changes: 0 additions & 222 deletions src/ra_log_cache.erl

This file was deleted.

43 changes: 7 additions & 36 deletions src/ra_log_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
-module(ra_log_ets).
-behaviour(gen_server).

-export([start_link/1,
give_away/2,
delete_tables/2]).
-export([start_link/1]).

-export([
mem_table_please/2,
Expand All @@ -28,11 +26,9 @@

-include("ra.hrl").

-record(state, {names :: ra_system:names(),
% memtbls = #{} :: #{ra:uid() => ets:tid()},
deletes = []}).
-record(state, {names :: ra_system:names()}).

%%% ra_log_ets - owns mem_table ETS tables
%%% ra_log_ets - owns and creates mem_table ETS tables

%%%===================================================================
%%% API functions
Expand All @@ -41,14 +37,6 @@
start_link(#{names := #{log_ets := Name}} = Cfg) ->
gen_server:start_link({local, Name}, ?MODULE, [Cfg], []).

-spec give_away(ra_system:names(), ets:tid()) -> true.
give_away(#{log_ets := Name}, Tid) ->
ets:give_away(Tid, whereis(Name), undefined).

-spec delete_tables(ra_system:names(), [ets:tid()]) -> ok.
delete_tables(#{log_ets := Name}, Tids) ->
gen_server:cast(Name, {delete_tables, Tids}).

-spec mem_table_please(ra_system:names(), ra:uid()) ->
{ok, ra_log_membtbl:state()} | {error, term()}.
mem_table_please(Names, UId) ->
Expand Down Expand Up @@ -88,8 +76,6 @@ new_mem_table_please(#{log_ets := Name}, UId, Prev) ->
delete_mem_table(#{log_ets := Name}, UId) ->
gen_server:cast(Name, {delete_mem_table, UId}).



-spec execute_delete(ra_system:names(),
ra_log_memtbl:delete_spec(),
ra_log_memtbl:state()) ->
Expand Down Expand Up @@ -154,10 +140,12 @@ handle_cast({exec_delete, Spec, Mt}, State) ->
ok
end,
{noreply, State};
handle_cast({delete_tables, Tids}, State) ->
handle_cast({delete_mem_table, UId},
#state{names = #{open_mem_tbls := OpenMemTbls}} = State) ->
%% delete ets tables,
%% we need to be defensive here.
%% it is better to leak a table than to crash them all

[begin
try timer:tc(fun () -> ets_delete(Tid) end) of
{Time, true} ->
Expand All @@ -171,29 +159,12 @@ handle_cast({delete_tables, Tids}, State) ->
[Tid, Err]),
ok
end
end || Tid <- Tids],
end || {ok, Tid} <- ets:lookup(OpenMemTbls, UId)],
{noreply, State};

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(do_deletes, #state{deletes = Deletes} = State) ->
_ = lists:foldr(
fun ({exec_delete, Spec, Mt}, S0) ->
try timer:tc(fun () -> ra_log_memtbl:delete(Spec, Mt) end) of
{Time, _} ->
?DEBUG("ra_log_ets: ets:delete/1 took ~bms to delete ~w",
[Time div 1000, Spec]),
ok
catch
_:Err ->
?WARN("ra_log_ets: failed to delete ~w ~w ",
[Spec, Err]),
ok
end,
S0
end, undefined, Deletes),
{noreply, State#state{deletes = []}};
handle_info(_Info, State) ->
{noreply, State}.

Expand Down
Loading

0 comments on commit 205bc0d

Please sign in to comment.