Skip to content

Commit

Permalink
Implement limiting the number of entries
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Sep 17, 2024
1 parent ddb3f34 commit 025f7fe
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 57 deletions.
2 changes: 2 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{minimum_otp_vsn, "21"}.

{erl_opts,
[warn_missing_doc, warn_missing_spec, warn_unused_import,
warn_export_vars, verbose, report, debug_info
Expand Down
5 changes: 4 additions & 1 deletion src/segmented_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,20 @@ For more information, see the README, and the function documentation.
-type key() :: term().
?DOC("Dynamic type of _values_ from cache clients.").
-type value() :: term().
?DOC("Maximum number of entries per segment. When filled, rotation is ensued.").
-type entries_limit() :: infinity | non_neg_integer().
?DOC("Merging function to use for resolving conflicts").
-type merger_fun(Value) :: fun((Value, Value) -> Value).
?DOC("Configuration values for the cache.").
-type opts() :: #{scope => scope(),
strategy => strategy(),
entries_limit => entries_limit(),
segment_num => non_neg_integer(),
ttl => timeout() | {erlang:time_unit(), non_neg_integer()},
merger_fun => merger_fun(term())}.

-export_type([scope/0, name/0, key/0, value/0, hit/0, delete_error/1,
strategy/0, merger_fun/1, opts/0]).
entries_limit/0, strategy/0, merger_fun/1, opts/0]).

%%====================================================================
%% API
Expand Down
134 changes: 84 additions & 50 deletions src/segmented_cache_helpers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
-export([purge_last_segment_and_rotate/1]).

-record(segmented_cache, {scope :: segmented_cache:scope(),
name :: segmented_cache:name(),
strategy = fifo :: segmented_cache:strategy(),
entries_limit = infinity :: segmented_cache:entries_limit(),
index :: atomics:atomics_ref(),
segments :: tuple(),
merger_fun :: merger_fun(term())}).
Expand All @@ -26,16 +28,22 @@

-spec init_cache_config(segmented_cache:name(), segmented_cache:opts()) ->
#{scope := segmented_cache:scope(), ttl := timeout()}.
init_cache_config(Name, Opts) ->
{Scope, N, TTL, Strategy, MergerFun} = assert_parameters(Opts),
SegmentOpts = ets_settings(),
init_cache_config(Name, Opts0) ->
#{scope := Scope,
strategy := Strategy,
entries_limit := EntriesLimit,
segment_num := N,
ttl := TTL,
merger_fun := MergerFun} = Opts = assert_parameters(Opts0),
SegmentOpts = ets_settings(Opts),
SegmentsList = lists:map(fun(_) -> ets:new(undefined, SegmentOpts) end, lists:seq(1, N)),
Segments = list_to_tuple(SegmentsList),
Index = atomics:new(1, [{signed, false}]),
atomics:put(Index, 1, 1),
Config = #segmented_cache{scope = Scope, strategy = Strategy, index = Index,
segments = Segments, merger_fun = MergerFun},
set_cache_config(Name, Config),
Config = #segmented_cache{scope = Scope, name = Name, strategy = Strategy,
index = Index, entries_limit = EntriesLimit,
segments = Segments, merger_fun = MergerFun},
persist_cache_config(Name, Config),
#{scope => Scope, ttl => TTL}.

-spec get_cache_scope(segmented_cache:name()) -> segmented_cache:scope().
Expand All @@ -51,8 +59,8 @@ erase_cache_config(Name) ->
get_cache_config(Name) ->
persistent_term:get({?APP_KEY, Name}).

-spec set_cache_config(segmented_cache:name(), config()) -> ok.
set_cache_config(Name, Config) ->
-spec persist_cache_config(segmented_cache:name(), config()) -> ok.
persist_cache_config(Name, Config) ->
persistent_term:put({?APP_KEY, Name}, Config).

%%====================================================================
Expand All @@ -77,7 +85,7 @@ get_entry_span(Name, Key) when is_atom(Name) ->
-spec put_entry_front(segmented_cache:name(), segmented_cache:key(), segmented_cache:value()) -> boolean().
put_entry_front(Name, Key, Value) ->
SegmentRecord = get_cache_config(Name),
do_put_entry_front(SegmentRecord, Key, Value).
do_put_entry_front(SegmentRecord, Key, Value, 3).

-spec merge_entry(segmented_cache:name(), segmented_cache:key(), segmented_cache:value()) -> boolean().
merge_entry(Name, Key, Value) when is_atom(Name) ->
Expand All @@ -91,7 +99,7 @@ merge_entry(Name, Key, Value) when is_atom(Name) ->
end,
case iterate_fun_in_tables(Name, Key, F) of
true -> true;
false -> do_put_entry_front(SegmentRecord, Key, Value)
false -> do_put_entry_front(SegmentRecord, Key, Value, 3)
end.

-spec delete_entry(segmented_cache:name(), segmented_cache:key()) -> true.
Expand Down Expand Up @@ -187,27 +195,49 @@ apply_strategy(lru, _CurrentIndex, FoundIndex, Key, SegmentRecord) ->
Segments = SegmentRecord#segmented_cache.segments,
FoundInSegment = element(FoundIndex, Segments),
try [{_, Value}] = ets:lookup(FoundInSegment, Key),
do_put_entry_front(SegmentRecord, Key, Value)
do_put_entry_front(SegmentRecord, Key, Value, 3)
catch _:_ -> false
end.

-spec do_put_entry_front(#segmented_cache{}, segmented_cache:key(), segmented_cache:value()) ->
-spec do_put_entry_front(#segmented_cache{}, segmented_cache:key(), segmented_cache:value(), 0..3) ->
boolean().
do_put_entry_front(SegmentRecord, Key, Value) ->
Atomic = SegmentRecord#segmented_cache.index,
do_put_entry_front(_, _, _, 0) -> false;
do_put_entry_front(#segmented_cache{
name = Name,
entries_limit = EntriesLimit,
index = Atomic,
segments = Segments,
merger_fun = MergerFun
} = SegmentRecord, Key, Value, Retry) ->
Index = atomics:get(Atomic, 1),
Segments = SegmentRecord#segmented_cache.segments,
FrontSegment = element(Index, Segments),
Inserted = case ets:insert_new(FrontSegment, {Key, Value}) of
true -> true;
false ->
MergerFun = SegmentRecord#segmented_cache.merger_fun,
compare_and_swap(3, FrontSegment, Key, Value, MergerFun)
end,
MaybeMovedIndex = atomics:get(Atomic, 1),
case post_insert_check_should_retry(Inserted, Index, MaybeMovedIndex) of
false -> Inserted;
true -> do_put_entry_front(SegmentRecord, Key, Value)
case insert_new(FrontSegment, Key, Value, EntriesLimit, Name) of
retry ->
do_put_entry_front(SegmentRecord, Key, Value, Retry - 1);
true ->
MaybeMovedIndex = atomics:get(Atomic, 1),
case post_insert_check_should_retry(true, Index, MaybeMovedIndex) of
false -> true;
true -> do_put_entry_front(SegmentRecord, Key, Value, Retry - 1)
end;
false ->
Inserted = compare_and_swap(3, FrontSegment, Key, Value, MergerFun),
MaybeMovedIndex = atomics:get(Atomic, 1),
case post_insert_check_should_retry(Inserted, Index, MaybeMovedIndex) of
false -> Inserted;
true -> do_put_entry_front(SegmentRecord, Key, Value, Retry - 1)
end
end.

insert_new(Table, Key, Value, infinity, _) ->
ets:insert_new(Table, {Key, Value});
insert_new(Table, Key, Value, EntriesLimit, Name) ->
case EntriesLimit =< ets:info(Table, size) of
false ->
ets:insert_new(Table, {Key, Value});
true ->
purge_last_segment_and_rotate(Name),
retry
end.

-spec post_insert_check_should_retry(boolean(), integer(), integer()) -> boolean().
Expand Down Expand Up @@ -254,12 +284,14 @@ purge_last_segment_and_rotate(Name) ->
atomics:put(SegmentRecord#segmented_cache.index, 1, NewIndex),
NewIndex.

-spec assert_parameters(segmented_cache:opts()) ->
{segmented_cache:name(), pos_integer(), timeout(), segmented_cache:strategy(), merger_fun(term())}.
assert_parameters(Opts) when is_map(Opts) ->
N = maps:get(segment_num, Opts, 3),
true = is_integer(N) andalso N > 0,
TTL0 = maps:get(ttl, Opts, {hours, 8}),
-spec assert_parameters(segmented_cache:opts()) -> segmented_cache:opts().
assert_parameters(Opts0) when is_map(Opts0) ->
#{scope := Scope,
strategy := Strategy,
entries_limit := EntriesLimit,
segment_num := N,
ttl := TTL0,
merger_fun := MergerFun} = Opts = maps:merge(defaults(), Opts0),
TTL = case TTL0 of
infinity -> infinity;
{milliseconds, S} -> S;
Expand All @@ -268,31 +300,33 @@ assert_parameters(Opts) when is_map(Opts) ->
{hours, H} -> timer:hours(H);
T when is_integer(T) -> timer:minutes(T)
end,
true = is_integer(N) andalso N > 0,
true = (EntriesLimit =:= infinity) orelse (is_integer(EntriesLimit) andalso EntriesLimit > 0),
true = (TTL =:= infinity) orelse (is_integer(TTL) andalso N > 0),
Strategy = maps:get(strategy, Opts, fifo),
true = (Strategy =:= fifo) orelse (Strategy =:= lru),
MergerFun = maps:get(merger_fun, Opts, fun segmented_cache_callbacks:default_merger_fun/2),
true = is_function(MergerFun, 2),
Scope = maps:get(scope, Opts, pg),
true = (undefined =/= whereis(Scope)),
{Scope, N, TTL, Strategy, MergerFun}.
Opts#{ttl := TTL}.

defaults() ->
#{scope => pg,
strategy => fifo,
entries_limit => infinity,
segment_num => 3,
ttl => {hours, 8},
merger_fun => fun segmented_cache_callbacks:default_merger_fun/2}.

-ifdef(OTP_RELEASE).
-if(?OTP_RELEASE >= 25).
ets_settings() ->
[set, public,
{read_concurrency, true},
{write_concurrency, auto},
{decentralized_counters, true}].
-elif(?OTP_RELEASE >= 21).
ets_settings() ->
[set, public,
{read_concurrency, true},
{write_concurrency, true},
{decentralized_counters, true}].
-endif.
-if(?OTP_RELEASE >= 25).
ets_settings(#{entries_limit := infinity}) ->
[set, public,
{read_concurrency, true},
{write_concurrency, auto}];
ets_settings(#{entries_limit := _}) ->
[set, public,
{read_concurrency, true},
{write_concurrency, true}].
-else.
ets_settings() ->
ets_settings(_Opts) ->
[set, public,
{read_concurrency, true},
{write_concurrency, true},
Expand Down
37 changes: 31 additions & 6 deletions test/segmented_cache_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
init_per_testcase/2,
end_per_testcase/2]).

-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("proper/include/proper.hrl").

Expand All @@ -22,6 +23,7 @@
all() ->
[
{group, basic_api},
{group, cache_limits},
{group, short_fifo},
{group, lru}
].
Expand All @@ -36,6 +38,10 @@ groups() ->
put_entry_then_delete_it_then_not_member,
stateful_property
]},
{cache_limits, [sequence],
[
ensure_configured_size_is_respected
]},
{short_fifo, [sequence],
[
put_entry_wait_and_check_false
Expand Down Expand Up @@ -72,15 +78,25 @@ end_per_suite(_Config) ->
%%%===================================================================
init_per_group(lru, Config) ->
print_and_restart_counters(),
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, #{strategy => lru,
segment_num => 2,
ttl => {milliseconds, 100}}),
Opts = #{strategy => lru,
segment_num => 2,
ttl => {milliseconds, 100}},
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts),
[{cleaner, Cleaner} | Config];
init_per_group(short_fifo, Config) ->
print_and_restart_counters(),
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, #{strategy => fifo,
segment_num => 2,
ttl => {milliseconds, 5}}),
Opts = #{strategy => fifo,
segment_num => 2,
ttl => {milliseconds, 5}},
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts),
[{cleaner, Cleaner} | Config];
init_per_group(cache_limits, Config) ->
print_and_restart_counters(),
Opts = #{entries_limit => 1,
strategy => fifo,
segment_num => 2,
ttl => {seconds, 60}},
{ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts),
[{cleaner, Cleaner} | Config];
init_per_group(_Groupname, Config) ->
print_and_restart_counters(),
Expand Down Expand Up @@ -109,6 +125,15 @@ end_per_testcase(_TestCase, _Config) ->
%%% Stateful property Test Case
%%%===================================================================

ensure_configured_size_is_respected(_Config) ->
%% We have 2 tables with 1 element each
?assert(segmented_cache:put_entry(?CACHE_NAME, one, make_ref())),
?assert(segmented_cache:put_entry(?CACHE_NAME, two, make_ref())),
?assert(segmented_cache:put_entry(?CACHE_NAME, three, make_ref())),
?assert(segmented_cache:is_member(?CACHE_NAME, three)),
?assert(segmented_cache:is_member(?CACHE_NAME, two)),
?assertNot(segmented_cache:is_member(?CACHE_NAME, one)).

stateful_property(_Config) ->
Prop =
?FORALL(Cmds, commands(?CMD_MODULE),
Expand Down

0 comments on commit 025f7fe

Please sign in to comment.