From 025f7fe937563d816bb4288fc9e26e0bb8d3290d Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Tue, 17 Sep 2024 11:50:19 +0200 Subject: [PATCH 1/2] Implement limiting the number of entries --- rebar.config | 2 + src/segmented_cache.erl | 5 +- src/segmented_cache_helpers.erl | 134 ++++++++++++++++++++------------ test/segmented_cache_SUITE.erl | 37 +++++++-- 4 files changed, 121 insertions(+), 57 deletions(-) diff --git a/rebar.config b/rebar.config index a994b02..c13f494 100644 --- a/rebar.config +++ b/rebar.config @@ -1,3 +1,5 @@ +{minimum_otp_vsn, "21"}. + {erl_opts, [warn_missing_doc, warn_missing_spec, warn_unused_import, warn_export_vars, verbose, report, debug_info diff --git a/src/segmented_cache.erl b/src/segmented_cache.erl index 5c2cf7c..cc885cf 100644 --- a/src/segmented_cache.erl +++ b/src/segmented_cache.erl @@ -39,17 +39,20 @@ For more information, see the README, and the function documentation. -type key() :: term(). ?DOC("Dynamic type of _values_ from cache clients."). -type value() :: term(). +?DOC("Maximum number of entries per segment. When filled, rotation is ensued."). +-type entries_limit() :: infinity | non_neg_integer(). ?DOC("Merging function to use for resolving conflicts"). -type merger_fun(Value) :: fun((Value, Value) -> Value). ?DOC("Configuration values for the cache."). -type opts() :: #{scope => scope(), strategy => strategy(), + entries_limit => entries_limit(), segment_num => non_neg_integer(), ttl => timeout() | {erlang:time_unit(), non_neg_integer()}, merger_fun => merger_fun(term())}. -export_type([scope/0, name/0, key/0, value/0, hit/0, delete_error/1, - strategy/0, merger_fun/1, opts/0]). + entries_limit/0, strategy/0, merger_fun/1, opts/0]). %%==================================================================== %% API diff --git a/src/segmented_cache_helpers.erl b/src/segmented_cache_helpers.erl index 2e65df7..82bd8a3 100644 --- a/src/segmented_cache_helpers.erl +++ b/src/segmented_cache_helpers.erl @@ -9,7 +9,9 @@ -export([purge_last_segment_and_rotate/1]). -record(segmented_cache, {scope :: segmented_cache:scope(), + name :: segmented_cache:name(), strategy = fifo :: segmented_cache:strategy(), + entries_limit = infinity :: segmented_cache:entries_limit(), index :: atomics:atomics_ref(), segments :: tuple(), merger_fun :: merger_fun(term())}). @@ -26,16 +28,22 @@ -spec init_cache_config(segmented_cache:name(), segmented_cache:opts()) -> #{scope := segmented_cache:scope(), ttl := timeout()}. -init_cache_config(Name, Opts) -> - {Scope, N, TTL, Strategy, MergerFun} = assert_parameters(Opts), - SegmentOpts = ets_settings(), +init_cache_config(Name, Opts0) -> + #{scope := Scope, + strategy := Strategy, + entries_limit := EntriesLimit, + segment_num := N, + ttl := TTL, + merger_fun := MergerFun} = Opts = assert_parameters(Opts0), + SegmentOpts = ets_settings(Opts), SegmentsList = lists:map(fun(_) -> ets:new(undefined, SegmentOpts) end, lists:seq(1, N)), Segments = list_to_tuple(SegmentsList), Index = atomics:new(1, [{signed, false}]), atomics:put(Index, 1, 1), - Config = #segmented_cache{scope = Scope, strategy = Strategy, index = Index, - segments = Segments, merger_fun = MergerFun}, - set_cache_config(Name, Config), + Config = #segmented_cache{scope = Scope, name = Name, strategy = Strategy, + index = Index, entries_limit = EntriesLimit, + segments = Segments, merger_fun = MergerFun}, + persist_cache_config(Name, Config), #{scope => Scope, ttl => TTL}. -spec get_cache_scope(segmented_cache:name()) -> segmented_cache:scope(). @@ -51,8 +59,8 @@ erase_cache_config(Name) -> get_cache_config(Name) -> persistent_term:get({?APP_KEY, Name}). --spec set_cache_config(segmented_cache:name(), config()) -> ok. -set_cache_config(Name, Config) -> +-spec persist_cache_config(segmented_cache:name(), config()) -> ok. +persist_cache_config(Name, Config) -> persistent_term:put({?APP_KEY, Name}, Config). %%==================================================================== @@ -77,7 +85,7 @@ get_entry_span(Name, Key) when is_atom(Name) -> -spec put_entry_front(segmented_cache:name(), segmented_cache:key(), segmented_cache:value()) -> boolean(). put_entry_front(Name, Key, Value) -> SegmentRecord = get_cache_config(Name), - do_put_entry_front(SegmentRecord, Key, Value). + do_put_entry_front(SegmentRecord, Key, Value, 3). -spec merge_entry(segmented_cache:name(), segmented_cache:key(), segmented_cache:value()) -> boolean(). merge_entry(Name, Key, Value) when is_atom(Name) -> @@ -91,7 +99,7 @@ merge_entry(Name, Key, Value) when is_atom(Name) -> end, case iterate_fun_in_tables(Name, Key, F) of true -> true; - false -> do_put_entry_front(SegmentRecord, Key, Value) + false -> do_put_entry_front(SegmentRecord, Key, Value, 3) end. -spec delete_entry(segmented_cache:name(), segmented_cache:key()) -> true. @@ -187,27 +195,49 @@ apply_strategy(lru, _CurrentIndex, FoundIndex, Key, SegmentRecord) -> Segments = SegmentRecord#segmented_cache.segments, FoundInSegment = element(FoundIndex, Segments), try [{_, Value}] = ets:lookup(FoundInSegment, Key), - do_put_entry_front(SegmentRecord, Key, Value) + do_put_entry_front(SegmentRecord, Key, Value, 3) catch _:_ -> false end. --spec do_put_entry_front(#segmented_cache{}, segmented_cache:key(), segmented_cache:value()) -> +-spec do_put_entry_front(#segmented_cache{}, segmented_cache:key(), segmented_cache:value(), 0..3) -> boolean(). -do_put_entry_front(SegmentRecord, Key, Value) -> - Atomic = SegmentRecord#segmented_cache.index, +do_put_entry_front(_, _, _, 0) -> false; +do_put_entry_front(#segmented_cache{ + name = Name, + entries_limit = EntriesLimit, + index = Atomic, + segments = Segments, + merger_fun = MergerFun + } = SegmentRecord, Key, Value, Retry) -> Index = atomics:get(Atomic, 1), - Segments = SegmentRecord#segmented_cache.segments, FrontSegment = element(Index, Segments), - Inserted = case ets:insert_new(FrontSegment, {Key, Value}) of - true -> true; - false -> - MergerFun = SegmentRecord#segmented_cache.merger_fun, - compare_and_swap(3, FrontSegment, Key, Value, MergerFun) - end, - MaybeMovedIndex = atomics:get(Atomic, 1), - case post_insert_check_should_retry(Inserted, Index, MaybeMovedIndex) of - false -> Inserted; - true -> do_put_entry_front(SegmentRecord, Key, Value) + case insert_new(FrontSegment, Key, Value, EntriesLimit, Name) of + retry -> + do_put_entry_front(SegmentRecord, Key, Value, Retry - 1); + true -> + MaybeMovedIndex = atomics:get(Atomic, 1), + case post_insert_check_should_retry(true, Index, MaybeMovedIndex) of + false -> true; + true -> do_put_entry_front(SegmentRecord, Key, Value, Retry - 1) + end; + false -> + Inserted = compare_and_swap(3, FrontSegment, Key, Value, MergerFun), + MaybeMovedIndex = atomics:get(Atomic, 1), + case post_insert_check_should_retry(Inserted, Index, MaybeMovedIndex) of + false -> Inserted; + true -> do_put_entry_front(SegmentRecord, Key, Value, Retry - 1) + end + end. + +insert_new(Table, Key, Value, infinity, _) -> + ets:insert_new(Table, {Key, Value}); +insert_new(Table, Key, Value, EntriesLimit, Name) -> + case EntriesLimit =< ets:info(Table, size) of + false -> + ets:insert_new(Table, {Key, Value}); + true -> + purge_last_segment_and_rotate(Name), + retry end. -spec post_insert_check_should_retry(boolean(), integer(), integer()) -> boolean(). @@ -254,12 +284,14 @@ purge_last_segment_and_rotate(Name) -> atomics:put(SegmentRecord#segmented_cache.index, 1, NewIndex), NewIndex. --spec assert_parameters(segmented_cache:opts()) -> - {segmented_cache:name(), pos_integer(), timeout(), segmented_cache:strategy(), merger_fun(term())}. -assert_parameters(Opts) when is_map(Opts) -> - N = maps:get(segment_num, Opts, 3), - true = is_integer(N) andalso N > 0, - TTL0 = maps:get(ttl, Opts, {hours, 8}), +-spec assert_parameters(segmented_cache:opts()) -> segmented_cache:opts(). +assert_parameters(Opts0) when is_map(Opts0) -> + #{scope := Scope, + strategy := Strategy, + entries_limit := EntriesLimit, + segment_num := N, + ttl := TTL0, + merger_fun := MergerFun} = Opts = maps:merge(defaults(), Opts0), TTL = case TTL0 of infinity -> infinity; {milliseconds, S} -> S; @@ -268,31 +300,33 @@ assert_parameters(Opts) when is_map(Opts) -> {hours, H} -> timer:hours(H); T when is_integer(T) -> timer:minutes(T) end, + true = is_integer(N) andalso N > 0, + true = (EntriesLimit =:= infinity) orelse (is_integer(EntriesLimit) andalso EntriesLimit > 0), true = (TTL =:= infinity) orelse (is_integer(TTL) andalso N > 0), - Strategy = maps:get(strategy, Opts, fifo), true = (Strategy =:= fifo) orelse (Strategy =:= lru), - MergerFun = maps:get(merger_fun, Opts, fun segmented_cache_callbacks:default_merger_fun/2), true = is_function(MergerFun, 2), - Scope = maps:get(scope, Opts, pg), true = (undefined =/= whereis(Scope)), - {Scope, N, TTL, Strategy, MergerFun}. + Opts#{ttl := TTL}. + +defaults() -> + #{scope => pg, + strategy => fifo, + entries_limit => infinity, + segment_num => 3, + ttl => {hours, 8}, + merger_fun => fun segmented_cache_callbacks:default_merger_fun/2}. --ifdef(OTP_RELEASE). - -if(?OTP_RELEASE >= 25). - ets_settings() -> - [set, public, - {read_concurrency, true}, - {write_concurrency, auto}, - {decentralized_counters, true}]. - -elif(?OTP_RELEASE >= 21). - ets_settings() -> - [set, public, - {read_concurrency, true}, - {write_concurrency, true}, - {decentralized_counters, true}]. - -endif. +-if(?OTP_RELEASE >= 25). +ets_settings(#{entries_limit := infinity}) -> + [set, public, + {read_concurrency, true}, + {write_concurrency, auto}]; +ets_settings(#{entries_limit := _}) -> + [set, public, + {read_concurrency, true}, + {write_concurrency, true}]. -else. -ets_settings() -> +ets_settings(_Opts) -> [set, public, {read_concurrency, true}, {write_concurrency, true}, diff --git a/test/segmented_cache_SUITE.erl b/test/segmented_cache_SUITE.erl index a249a44..b4c8528 100644 --- a/test/segmented_cache_SUITE.erl +++ b/test/segmented_cache_SUITE.erl @@ -13,6 +13,7 @@ init_per_testcase/2, end_per_testcase/2]). +-include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("proper/include/proper.hrl"). @@ -22,6 +23,7 @@ all() -> [ {group, basic_api}, + {group, cache_limits}, {group, short_fifo}, {group, lru} ]. @@ -36,6 +38,10 @@ groups() -> put_entry_then_delete_it_then_not_member, stateful_property ]}, + {cache_limits, [sequence], + [ + ensure_configured_size_is_respected + ]}, {short_fifo, [sequence], [ put_entry_wait_and_check_false @@ -72,15 +78,25 @@ end_per_suite(_Config) -> %%%=================================================================== init_per_group(lru, Config) -> print_and_restart_counters(), - {ok, Cleaner} = segmented_cache:start(?CACHE_NAME, #{strategy => lru, - segment_num => 2, - ttl => {milliseconds, 100}}), + Opts = #{strategy => lru, + segment_num => 2, + ttl => {milliseconds, 100}}, + {ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts), [{cleaner, Cleaner} | Config]; init_per_group(short_fifo, Config) -> print_and_restart_counters(), - {ok, Cleaner} = segmented_cache:start(?CACHE_NAME, #{strategy => fifo, - segment_num => 2, - ttl => {milliseconds, 5}}), + Opts = #{strategy => fifo, + segment_num => 2, + ttl => {milliseconds, 5}}, + {ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts), + [{cleaner, Cleaner} | Config]; +init_per_group(cache_limits, Config) -> + print_and_restart_counters(), + Opts = #{entries_limit => 1, + strategy => fifo, + segment_num => 2, + ttl => {seconds, 60}}, + {ok, Cleaner} = segmented_cache:start(?CACHE_NAME, Opts), [{cleaner, Cleaner} | Config]; init_per_group(_Groupname, Config) -> print_and_restart_counters(), @@ -109,6 +125,15 @@ end_per_testcase(_TestCase, _Config) -> %%% Stateful property Test Case %%%=================================================================== +ensure_configured_size_is_respected(_Config) -> + %% We have 2 tables with 1 element each + ?assert(segmented_cache:put_entry(?CACHE_NAME, one, make_ref())), + ?assert(segmented_cache:put_entry(?CACHE_NAME, two, make_ref())), + ?assert(segmented_cache:put_entry(?CACHE_NAME, three, make_ref())), + ?assert(segmented_cache:is_member(?CACHE_NAME, three)), + ?assert(segmented_cache:is_member(?CACHE_NAME, two)), + ?assertNot(segmented_cache:is_member(?CACHE_NAME, one)). + stateful_property(_Config) -> Prop = ?FORALL(Cmds, commands(?CMD_MODULE), From e03dbc57ec9cd3e2533fb4f7255640a4cf5604cb Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Tue, 17 Sep 2024 13:33:40 +0200 Subject: [PATCH 2/2] Update codecov github actions --- .github/workflows/ci.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 590aaf6..59b181b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,7 +38,10 @@ jobs: - run: rebar3 as test do cover, covertool generate if: ${{ matrix.otp == '27' }} - name: Upload code coverage - uses: codecov/codecov-action@v2 + uses: codecov/codecov-action@v4 if: ${{ matrix.otp == '27' }} with: - file: "_build/test/covertool/segmented_cache.covertool.xml" + files: _build/test/covertool/segmented_cache.covertool.xml + token: ${{ secrets.CODECOV_TOKEN }} + fail_ci_if_error: true + verbose: true