Skip to content

Commit

Permalink
Merge pull request #468 from rabbitmq/snapshot-installed-4-tweaks
Browse files Browse the repository at this point in the history
Amend ra_machine:snapshot_installed/4
  • Loading branch information
kjnilsson authored Sep 4, 2024
2 parents b5f6b38 + 682947c commit 6a80882
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 21 deletions.
30 changes: 14 additions & 16 deletions src/ra_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@
command_meta_data/0]).

-optional_callbacks([tick/2,
snapshot_installed/2,
snapshot_installed/4,
state_enter/2,
init_aux/1,
Expand Down Expand Up @@ -242,14 +241,12 @@

-callback tick(TimeMs :: milliseconds(), state()) -> effects().

-callback snapshot_installed(ra_snapshot:meta(), state()) -> effects().

-callback snapshot_installed(Meta, OldMacVer, OldState, NewState) -> Effects
-callback snapshot_installed(Meta, State, OldMeta, OldState) -> Effects
when
Meta :: ra_snapshot:meta(),
OldMacVer :: version(),
State :: state(),
OldMeta :: ra_snapshot:meta(),
OldState :: state(),
NewState :: state(),
Effects :: effects().

-callback init_aux(Name :: atom()) -> AuxState :: term().
Expand Down Expand Up @@ -311,22 +308,23 @@ apply(Mod, Metadata, Cmd, State) ->
tick(Mod, TimeMs, State) ->
?OPT_CALL(Mod:tick(TimeMs, State), []).

-spec snapshot_installed(Module, Meta, OldMacVer, OldState, NewState) ->
Effects when
-spec snapshot_installed(Module, Meta, State, OldMeta, OldState) ->
effects() when
Module :: module(),
Meta :: ra_snapshot:meta(),
OldMacVer :: version(),
OldState :: state(),
NewState :: state(),
Effects :: effects().

snapshot_installed(Mod, Meta, OldMacVer, OldState, NewState) ->
State :: state(),
OldMeta :: ra_snapshot:meta(),
OldState :: state().
snapshot_installed(Mod, Meta, State, OldMeta, OldState)
when is_atom(Mod) andalso
is_map(Meta) andalso
is_map(OldMeta) ->
try
Mod:snapshot_installed(Meta, OldMacVer, OldState, NewState)
Mod:snapshot_installed(Meta, State, OldMeta, OldState)
catch
error:undef ->
try
Mod:snapshot_installed(Meta, NewState)
Mod:snapshot_installed(Meta, State)
catch
error:undef ->
[]
Expand Down
17 changes: 13 additions & 4 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,9 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
machine_versions = MachineVersions,
machine = Machine} = Cfg0,
log := Log0,
cluster := Cluster,
current_term := CurTerm,
last_applied := LastApplied,
machine_state := OldMacState} = State0)
when Term >= CurTerm ->
?DEBUG("~ts: receiving snapshot chunk: ~b / ~w, index ~b, term ~b",
Expand Down Expand Up @@ -1376,12 +1378,19 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,

{#{cluster := ClusterIds}, MacState} = ra_log:recover_snapshot(Log),

OldServerIds = maps:map(fun (_, V) ->
maps:with([voter_status], V)
end, Cluster),
OldMeta = #{machine_version => CurEffMacVer,
term => CurTerm,
index => LastApplied,
cluster => OldServerIds},

SnapInstalledEffs = ra_machine:snapshot_installed(EffMacMod,
SnapMeta,
CurEffMacVer,
OldMacState,
MacState),

MacState,
OldMeta,
OldMacState),
State = update_term(Term,
State0#{cfg => Cfg,
log => Log,
Expand Down
14 changes: 13 additions & 1 deletion test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,19 @@ apply(#{index := _Idx}, {segment_writer_or_wal_crash_follower, _}, State) ->
apply(#{index := Idx}, _Cmd, State) ->
{State, ok, [{release_cursor, Idx, State}]}.

snapshot_installed(Meta, _OldMacVer, _OldState, _NewState) ->
snapshot_installed(#{machine_version := _,
index := Idx,
term := _,
cluster := Cluster} = Meta,
_State,
#{machine_version := _,
index := OldIdx,
term := _,
cluster := OldCluster} = _OldMeta,
_OldState)
when is_map(OldCluster) andalso
is_map(Cluster) andalso
Idx > OldIdx ->
case whereis(snapshot_installed_proc) of
undefined ->
[];
Expand Down

0 comments on commit 6a80882

Please sign in to comment.