diff --git a/cep/cep/src/Network/CEP.hs b/cep/cep/src/Network/CEP.hs index 8a05a7d3..5866f14b 100644 --- a/cep/cep/src/Network/CEP.hs +++ b/cep/cep/src/Network/CEP.hs @@ -38,6 +38,7 @@ module Network.CEP , start , start_ , startFork + , startForks , get , gets , put diff --git a/cep/cep/src/Network/CEP/Types.hs b/cep/cep/src/Network/CEP/Types.hs index 6e2d8e51..aab7761e 100644 --- a/cep/cep/src/Network/CEP/Types.hs +++ b/cep/cep/src/Network/CEP/Types.hs @@ -453,9 +453,14 @@ start h l = singleton $ Start h l start_ :: Jump PhaseHandle -> RuleM g () (Started g ()) start_ h = start h () --- | Start a rule with an implicit fork +-- | Start a rule with an implicit fork. startFork :: Jump PhaseHandle -> l -> RuleM g l (Started g l) -startFork rule state = do +startFork rule = startForks [rule] + +-- | Start a rule with an implicit fork. Rule may started from +-- different phases. +startForks :: [Jump PhaseHandle] -> l -> RuleM g l (Started g l) +startForks rules state = do winit <- initWrapper start winit state where @@ -463,10 +468,10 @@ startFork rule state = do wrapper_init <- phaseHandle "wrapper_init" wrapper_clear <- phaseHandle "wrapper_clear" wrapper_end <- phaseHandle "wrapper_end" - directly wrapper_init $ Network.CEP.Types.switch [rule, wrapper_clear] + directly wrapper_init $ Network.CEP.Types.switch (rules ++ [wrapper_clear]) directly wrapper_clear $ do - fork NoBuffer $ continue rule + fork NoBuffer $ Network.CEP.Types.switch rules continue wrapper_end directly wrapper_end stop diff --git a/mero-halon/mero-halon.cabal b/mero-halon/mero-halon.cabal index 6c64bb35..dd154257 100644 --- a/mero-halon/mero-halon.cabal +++ b/mero-halon/mero-halon.cabal @@ -97,7 +97,8 @@ Library HA.RecoveryCoordinator.Actions.Hardware HA.RecoveryCoordinator.Actions.Service HA.RecoveryCoordinator.Actions.Test - HA.RecoveryCoordinator.Actions.Job + HA.RecoveryCoordinator.Job.Actions + HA.RecoveryCoordinator.Job.Events HA.RecoveryCoordinator.Rules.Castor HA.RecoveryCoordinator.Rules.Debug HA.RecoveryCoordinator.Rules.Service @@ -124,6 +125,7 @@ Library HA.Services.SSPL.HL.StatusHandler HA.Services.SSPL.HL.CEP HA.Services.SSPL.LL.RC.Actions + HA.RecoveryCoordinator.Job.Internal Build-Depends: base, aeson, diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Actions/Core.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Actions/Core.hs index fc316f71..dc8c9a50 100644 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/Actions/Core.hs +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Actions/Core.hs @@ -25,9 +25,15 @@ module HA.RecoveryCoordinator.Actions.Core , putStorageRC , getStorageRC , deleteStorageRC + -- ** Helpers for Set , insertStorageSetRC , memberStorageSetRC , deleteStorageSetRC + -- ** Helpers for Map + , insertWithStorageMapRC + , memberStorageMapRC + , deleteStorageMapRC + , lookupStorageMapRC -- * Communication with the EQ , messageProcessed , mkMessageProcessed @@ -86,7 +92,7 @@ import Control.Distributed.Process , spawnLocal , link ) -import Control.Monad (when, unless) +import Control.Monad (when, unless, (<=<)) import Control.Distributed.Process.Serializable import Data.Typeable (Typeable) @@ -94,6 +100,7 @@ import Data.Functor (void) import Data.Proxy import qualified Data.Map.Strict as Map import qualified Data.Set as Set +import Data.Maybe (fromMaybe) import Network.CEP @@ -135,6 +142,32 @@ deleteStorageSetRC x = modify Global $ \g -> do Nothing -> g Just z -> g{lsStorage = Storage.put (Set.delete x z) $ lsStorage g} +-- | Helper that wraps. 'Data.Map.member' for map kept in Storage. +memberStorageMapRC :: forall proxy k v l . (Typeable k, Typeable v, Ord k) + => proxy v -> k -> PhaseM LoopState l Bool +memberStorageMapRC _ x = + maybe False (\m -> Map.member x (m :: Map.Map k v)) + . Storage.get . lsStorage <$> get Global + +insertWithStorageMapRC :: (Typeable k, Typeable v, Ord k) + => (v -> v -> v) -> k -> v -> PhaseM LoopState l () +insertWithStorageMapRC f k v = modify Global $ \g -> do + let z = fromMaybe Map.empty $ Storage.get (lsStorage g) + g{lsStorage = Storage.put (Map.insertWith f k v z) $ lsStorage g} + +deleteStorageMapRC :: forall proxy k v l . (Typeable k, Typeable v, Ord k) + => proxy v -> k -> PhaseM LoopState l () +deleteStorageMapRC _ x = modify Global $ \g -> do + case Storage.get (lsStorage g) of + Nothing -> g + Just (z::Map.Map k v) -> g{lsStorage = Storage.put (Map.delete x z) $ lsStorage g} + +lookupStorageMapRC :: forall proxy k v l . (Typeable k, Typeable v, Ord k) + => k -> PhaseM LoopState l (Maybe v) +lookupStorageMapRC x = + ((\m -> Map.lookup x (m :: Map.Map k v)) + <=< Storage.get . lsStorage) <$> get Global + -- | Is a given resource existent in the RG? knownResource :: G.Resource a => a -> PhaseM LoopState l Bool knownResource res = fmap (G.memberResource res) getLocalGraph diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Actions/Job.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Actions/Job.hs deleted file mode 100644 index defadba1..00000000 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/Actions/Job.hs +++ /dev/null @@ -1,103 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE TypeOperators #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE FlexibleContexts #-} --- | --- Copyright: (C) 2015 Seagate Technology Limited. --- --- Helpers that simplifies creation of the long running processes -module HA.RecoveryCoordinator.Actions.Job - ( -- * Process - Job(..) - , mkJobRule - ) where - -import HA.EventQueue.Types -import HA.RecoveryCoordinator.Actions.Core - -import Control.Distributed.Process.Serializable -import Control.Lens - -import Data.Foldable (for_) -import Data.Proxy -import Data.Vinyl - -import Network.CEP - --- | Process handle. Process is a long running rule --- that is triggered when some @input@ event is received --- and emits @output@ event as a result of it's run. -newtype Job input output = Job String - --- | Create rule for a given process. This is a helper --- method that removes some boilerplate that is needed --- in order to define such rule. --- --- Job identity is completely determined by its input --- event; for each event, only one instance of any job --- may run. Note that this means that if we have two --- distinct jobs that take the same input, only one of --- them may run at any given time. --- --- Note that also no other rules should fire on the --- input, unless they do not mind that the event could --- be deleted. --- --- It's not legitimate to call 'Network.CEP.stop' inside --- this @body@. -mkJobRule :: forall input output l s . - ( FldUUID ∈ l, '("request", Maybe input) ∈ l, '("reply", Maybe output) ∈ l - , Serializable input, Serializable output, Ord input,Show input, s ~ Rec ElField l, Show output) - => Job input output -- ^ Process name. - -> s - -> (Jump PhaseHandle -> RuleM LoopState s (input -> PhaseM LoopState s (Maybe [Jump PhaseHandle]))) - -- ^ Rule body, takes final handle as paramter, returns an action used to - -- decide how to process rule - -> Specification LoopState () -mkJobRule (Job name) - args - body = define name $ do - request <- phaseHandle $ name ++ " -> request" - finish <- phaseHandle $ name ++ " -> finish" - end <- phaseHandle $ name ++ " -> end" - - check_input <- body finish - - setPhase request $ \(HAEvent eid input _) -> do - isRunning <- memberStorageSetRC input - if isRunning - then do - phaseLog "info" $ "Job " ++ name ++ " is already running for " ++ show input - messageProcessed eid - else do - modify Local $ rlens fldRequest .~ (Field $ Just input) - modify Local $ rlens fldUUID .~ (Field $ Just eid) - check_input input >>= \case - Nothing -> messageProcessed eid - Just next -> do - insertStorageSetRC input - fork CopyNewerBuffer $ do - phaseLog "info" $ " request: " ++ show input - switch next - - directly finish $ do -- XXX: use rule finalier, when implemented - state <- get Local - let uuid = state ^. rlens fldUUID - req = state ^. rlens fldRequest - rep = state ^. rlens fldReply - phaseLog "info" $ " request: " ++ maybe "N/A" show (getField req) - phaseLog "info" $ " reply: " ++ show (getField rep) - for_ (getField rep) notify - for_ (getField req) deleteStorageSetRC - for_ (getField uuid) messageProcessed - continue end - - directly end stop - - startFork request args - where - fldRequest :: Proxy '("request", Maybe input) - fldRequest = Proxy - fldReply :: Proxy '("reply", Maybe output) - fldReply = Proxy diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/CEP.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/CEP.hs index e547c661..fa4294e1 100644 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/CEP.hs +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/CEP.hs @@ -43,7 +43,7 @@ import HA.RecoveryCoordinator.Events.Cluster import HA.RecoveryCoordinator.Rules.Castor import qualified HA.RecoveryCoordinator.Rules.Service import qualified HA.RecoveryCoordinator.Rules.Debug as Debug (rules) -import HA.RecoveryCoordinator.Actions.Job +import HA.RecoveryCoordinator.Job.Actions import qualified HA.ResourceGraph as G import HA.Resources import HA.Resources.Castor diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules.hs index ed0dc362..29e24340 100644 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules.hs +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules.hs @@ -22,6 +22,7 @@ -- See "HA.RecoveryCoordinator.Rules.Castor.Disk.Repair" for details. {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE Rank2Types #-} {-# LANGUAGE ViewPatterns #-} {-# LANGUAGE GADTs #-} module HA.RecoveryCoordinator.Castor.Drive.Rules @@ -48,6 +49,8 @@ import HA.RecoveryCoordinator.Actions.Core import HA.RecoveryCoordinator.Actions.Hardware import HA.RecoveryCoordinator.Castor.Drive.Actions import HA.RecoveryCoordinator.Events.Castor.Cluster (PoolRebalanceRequest(..)) +import HA.RecoveryCoordinator.Job.Actions +import HA.RecoveryCoordinator.Job.Events import HA.Resources import HA.Resources.Castor import qualified HA.ResourceGraph as G @@ -61,7 +64,7 @@ import HA.RecoveryCoordinator.Events.Mero import Mero.Notification hiding (notifyMero) import Control.Distributed.Process hiding (catch) -import Control.Lens ((<&>)) +import Control.Lens import Control.Monad import Control.Monad.Trans.Maybe @@ -120,9 +123,10 @@ driveRemovalTimeout = 60 -- - Have a successful SMART test run. This will be checked by this rule. mkCheckAndHandleDriveReady :: (l -> Maybe StorageDevice) -- ^ accessor to curent storage device in a local state. + -> Lens' l (Maybe ListenerId) -- ^ Simple lens to listener ID for SMART test -> (M0.SDev -> PhaseM LoopState l ()) -- ^ Action to run when drive is handled. -> RuleM LoopState l (Jump PhaseHandle, Node -> StorageDevice -> PhaseM LoopState l ()) -mkCheckAndHandleDriveReady getter next = do +mkCheckAndHandleDriveReady getter smartLens next = do smart_result <- phaseHandle "smart_result" @@ -151,11 +155,13 @@ mkCheckAndHandleDriveReady getter next = do SDSRebalancing -> return () next m0sdev - onSameSdev (SMARTResponse sdev' status) _ (getter -> (Just sdev)) = - if sdev' == sdev - then return $ Just status - else return Nothing - onSameSdev _ _ _ = return Nothing + onSameSdev (JobFinished listenerIds (SMARTResponse sdev' status)) _ l = + return $ case (,) <$> ( getter l ) + <*> ( l ^. smartLens ) of + Just (sdev, smartId) + | smartId `elem` listenerIds + && sdev == sdev' -> Just status + _ -> Nothing (device_attached, deviceAttach) <- mkAttachDisk (fmap join . traverse (lookupStorageDeviceSDev) . getter) @@ -196,7 +202,8 @@ mkCheckAndHandleDriveReady getter next = do if not reset && powered && not removed && status == "OK" then do phaseLog "info" "Device ready. Running SMART test." - promulgateRC $ SMARTRequest node disk + smartId <- startJob $ SMARTRequest node disk + modify Local $ smartLens .~ Just smartId continue smart_result else phaseLog "info" $ unwords [ @@ -291,7 +298,7 @@ ruleDriveInserted = define "drive-inserted" $ do finish <- phaseHandle "finish" setPhase handler $ \di -> do - put Local $ Just (UUID.nil, di) + put Local $ (Just (UUID.nil, di), Nothing) fork CopyNewerBuffer $ switch [ removed , inserted @@ -301,7 +308,7 @@ ruleDriveInserted = define "drive-inserted" $ do (\(DriveRemoved _ _ enc _ loc _) _ (Just (_,DriveInserted{diUUID=uuid ,diEnclosure=enc' - ,diDiskNum=loc'})) -> do + ,diDiskNum=loc'}), _) -> do if enc == enc' && loc == loc' then return (Just uuid) else return Nothing) @@ -319,7 +326,7 @@ ruleDriveInserted = define "drive-inserted" $ do (\(DriveInserted{diEnclosure=enc, diDiskNum=loc}) _ (Just (_, DriveInserted{ diUUID=uuid , diEnclosure=enc' - , diDiskNum=loc'})) -> do + , diDiskNum=loc'}), _) -> do if enc == enc' && loc == loc' then return (Just uuid) else return Nothing) @@ -329,16 +336,18 @@ ruleDriveInserted = define "drive-inserted" $ do continue finish (checked, checkAndHandleDriveReady) <- - mkCheckAndHandleDriveReady (fmap (\(_,DriveInserted{diDevice=disk})->disk)) - $ \_ -> continue finish + mkCheckAndHandleDriveReady + (fmap (\(_,DriveInserted{diDevice=disk})->disk) . fst) + _2 -- TODO make this a better lens! + (\_ -> continue finish) directly main $ do - Just (_, di@DriveInserted{ diUUID = uuid + (Just (_, di@DriveInserted{ diUUID = uuid , diNode = node , diDevice = disk , diSerial = sn , diPowered = powered - }) <- get Local + }), _) <- get Local -- Check if we already have device that was inserted. -- In case it this is the same device, then we do not need to update confd. hasStorageDeviceIdentifier disk sn >>= \case @@ -355,7 +364,7 @@ ruleDriveInserted = define "drive-inserted" $ do unmarkStorageDeviceRemoved disk when powered $ markDiskPowerOn disk markIfNotMeroFailure - put Local $ Just (UUID.nil, di) + put Local $ (Just (UUID.nil, di), Nothing) checkAndHandleDriveReady node disk continue checked False -> do @@ -375,25 +384,25 @@ ruleDriveInserted = define "drive-inserted" $ do when powered $ markDiskPowerOn disk markStorageDeviceReplaced disk request <- liftIO $ nextRandom - put Local $ Just (request, di) + put Local $ (Just (request, di), Nothing) registerSyncGraphProcess $ \self -> usend self (request, SyncToConfdServersInRG) continue sync_complete setPhaseIf sync_complete - (\(SyncComplete request) _ (Just (req, _)) -> return $ + (\(SyncComplete request) _ (Just (req, _), _) -> return $ if (req == request) then (Just ()) else Nothing ) $ \() -> do - Just (_, DriveInserted{diDevice=disk, diNode = node}) <- get Local + (Just (_, DriveInserted{diDevice=disk, diNode = node}), _) <- get Local checkAndHandleDriveReady node disk continue finish directly finish $ do - Just (_, DriveInserted{diUUID=uuid}) <- get Local + (Just (_, DriveInserted{diUUID=uuid}), _) <- get Local registerSyncGraphProcessMsg uuid stop - startFork handler Nothing + startFork handler (Nothing, Nothing) -- | Mark drive as failed when SMART fails. -- @@ -426,9 +435,9 @@ ruleDrivePoweredOff = define "drive-powered-off" $ do if dpcPowered then return Nothing else return (Just evt) power_on evt@(DrivePowerChange{..}) _ _ = if dpcPowered then return (Just evt) else return Nothing - matching_device _ _ Nothing = return Nothing - matching_device evt@(DrivePowerChange{..}) _ (Just (_,dev, _, _)) = + matching_device evt@(DrivePowerChange{..}) _ (Just (_,dev, _, _), Nothing) = if dev == dpcDevice then return (Just evt) else return Nothing + matching_device _ _ _ = return Nothing x `gAnd` y = \a g l -> x a g l >>= \case Nothing -> return Nothing Just b -> y b g l @@ -438,7 +447,7 @@ ruleDrivePoweredOff = define "drive-powered-off" $ do applyStateChanges [stateSet m0sdev $ sdsFailTransient old_state] continue post_power_removed (device_detached, detachDisk) <- mkDetachDisk - (fmap join . traverse (\(_,d,_,_) -> lookupStorageDeviceSDev d)) + (fmap join . traverse (\(_,d,_,_) -> lookupStorageDeviceSDev d) . fst) (\sdev e -> do phaseLog "warning" e post_process sdev) post_process @@ -447,7 +456,7 @@ ruleDrivePoweredOff = define "drive-powered-off" $ do (Node nid) = dpcNode in do todo dpcUUID - put Local $ Just (dpcUUID, dpcDevice, nid, dpcSerial) + put Local $ (Just (dpcUUID, dpcDevice, nid, dpcSerial), Nothing) markDiskPowerOff dpcDevice -- Mark Mero device as transient @@ -458,7 +467,7 @@ ruleDrivePoweredOff = define "drive-powered-off" $ do continue post_power_removed directly post_power_removed $ do - Just (_, _, nid, serial) <- get Local + (Just (_, _, nid, serial), _) <- get Local -- Attempt to power the disk back on sent <- sendNodeCmd nid Nothing (DrivePoweron serial) if sent @@ -476,9 +485,11 @@ ruleDrivePoweredOff = define "drive-powered-off" $ do continue power_removed_duration (checked, checkAndHandleDriveReady) <- - mkCheckAndHandleDriveReady (fmap (\(_,d,_,_) -> d)) $ \_ -> do - Just (uuid, _, _, _) <- get Local - done uuid + mkCheckAndHandleDriveReady (fmap (\(_,d,_,_) -> d) . fst) + _2 -- TODO better lens! + $ \_ -> do + (Just (uuid, _, _, _), _) <- get Local + done uuid setPhaseIf power_returned (power_on `gAnd` matching_device) $ \(DrivePowerChange{..}) -> do @@ -489,7 +500,7 @@ ruleDrivePoweredOff = define "drive-powered-off" $ do continue checked directly power_removed_duration $ do - Just (uuid, dpcDevice, _, _) <- get Local + (Just (uuid, dpcDevice, _, _), _) <- get Local -- Mark Mero device as permanently failed mm0sdev <- lookupStorageDeviceSDev dpcDevice @@ -498,7 +509,7 @@ ruleDrivePoweredOff = define "drive-powered-off" $ do applyStateChanges [stateSet m0sdev $ sdsFailFailed old_state] done uuid - startFork power_removed Nothing + startFork power_removed (Nothing, Nothing) -- | If a drive is powered on, and it wasn't failed due to Mero issues -- or SMART test failures (e.g. it had just been depowered), then mark @@ -509,13 +520,15 @@ ruleDrivePoweredOn = define "drive-powered-on" $ do handle <- phaseHandle "Drive power change event received." (checked, checkAndHandleDriveReady) <- - mkCheckAndHandleDriveReady (fmap snd) $ \_ -> do - Just (uuid,_) <- get Local - done uuid + mkCheckAndHandleDriveReady (fmap snd . fst) + _2 + $ \_ -> do + (Just (uuid,_), _) <- get Local + done uuid setPhase handle $ \(DrivePowerChange{..}) -> do when dpcPowered $ do - put Local $ Just (dpcUUID, dpcDevice) + put Local (Just (dpcUUID, dpcDevice), Nothing) todo dpcUUID markDiskPowerOn dpcDevice realFailure <- maybe False isRealFailure <$> driveStatus dpcDevice @@ -531,7 +544,7 @@ ruleDrivePoweredOn = define "drive-powered-on" $ do checkAndHandleDriveReady dpcNode dpcDevice continue checked - start handle Nothing + start handle (Nothing, Nothing) where filterMaybeM _ Nothing = return Nothing filterMaybeM f j@(Just x) = f x >>= \q -> return $ if q then j else Nothing @@ -571,16 +584,18 @@ ruleDriveOK = define "castor::disk::ready" $ do handle <- phaseHandle "Drive ready event received" (checked, checkAndHandleDriveReady) <- - mkCheckAndHandleDriveReady (fmap snd) $ \_ -> do - Just (eid,_) <- get Local - messageProcessed eid + mkCheckAndHandleDriveReady (fmap snd . fst) + _2 + $ \_ -> do + (Just (eid,_), _) <- get Local + messageProcessed eid setPhase handle $ \(DriveOK eid node _ disk) -> do - put Local $ Just (eid, disk) + put Local $ (Just (eid, disk), Nothing) checkAndHandleDriveReady node disk continue checked - start handle Nothing + start handle (Nothing, Nothing) -- | When a drive is marked as failed, power it down. rulePowerDownDriveOnFailure :: Definitions LoopState () diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Repair.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Repair.hs index adb141ab..7e3145a2 100644 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Repair.hs +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Repair.hs @@ -67,8 +67,8 @@ import HA.EventQueue.Types import qualified HA.ResourceGraph as G import HA.RecoveryCoordinator.Actions.Castor.Cluster (barrierPass) import HA.RecoveryCoordinator.Actions.Core -import HA.RecoveryCoordinator.Actions.Job import HA.RecoveryCoordinator.Actions.Mero +import HA.RecoveryCoordinator.Job.Actions import HA.RecoveryCoordinator.Mero import HA.RecoveryCoordinator.Events.Castor.Cluster import HA.RecoveryCoordinator.Events.Mero @@ -187,7 +187,7 @@ processSnsStatusReply getUUIDs preProcess onNotRunning onNonComplete onComplete -- rule. -- -- TODO Can we remove these? -querySpiel :: Specification LoopState () +querySpiel :: Definitions LoopState () querySpiel = define "spiel::sns:query-status" $ do query_status <- phaseHandle "run status request" dispatch_hourly <- phaseHandle "dispatch hourly event" @@ -260,7 +260,7 @@ jobHourlyStatus = Job "castor::sns::hourly-status" -- -- * it runs hourly -- * it runs until repairs complete -querySpielHourly :: Specification LoopState () +querySpielHourly :: Definitions LoopState () querySpielHourly = mkJobRule jobHourlyStatus args $ \finish -> do run_query <- phaseHandle "run status query" abort_on_quiesce <- phaseHandle "abort due to SNS operation pause" @@ -321,7 +321,7 @@ jobRebalanceStart = Job "castor::sns::rebalance::start" -- Emits 'PoolRebalanceStarted' if successful. -- -- See 'ruleRepairStart' for some caveats. -ruleRebalanceStart :: Specification LoopState () +ruleRebalanceStart :: Definitions LoopState () ruleRebalanceStart = mkJobRule jobRebalanceStart args $ \finish -> do pool_disks_notified <- phaseHandle "pool_disks_notified" notify_failed <- phaseHandle "notify_failed" @@ -468,7 +468,7 @@ jobRepairStart = Job "castor-repair-start" -- IOS becoming unavailable. For now we just check IOS status right -- before repairing but there is no guarantee we won't try to start -- repair on IOS that's down. HALON-403 should help. -ruleRepairStart :: Specification LoopState () +ruleRepairStart :: Definitions LoopState () ruleRepairStart = mkJobRule jobRepairStart args $ \finish -> do pool_disks_notified <- phaseHandle "pool_disks_notified" notify_failed <- phaseHandle "notify_failed" @@ -963,6 +963,7 @@ handleRepairExternal noteSet = do -- * Handle messages that only include information about devices and -- not pools. -- +ruleHandleRepair :: Definitions LoopState () ruleHandleRepair = defineSimpleTask "castor::sns::handle-repair" $ \msg -> getClusterStatus <$> getLocalGraph >>= \case Just (M0.MeroClusterState M0.ONLINE n _) | n >= (M0.BootLevel 1) -> do diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Reset.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Reset.hs index efcfbaf1..7fa593e8 100644 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Reset.hs +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Reset.hs @@ -1,4 +1,8 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE ViewPatterns #-} -- | -- Copyright : (C) 2016 Seagate Technology Limited. -- License : All rights reserved. @@ -16,6 +20,7 @@ import HA.EventQueue.Types ) import HA.RecoveryCoordinator.Actions.Core ( LoopState + , fldUUID , getLocalGraph , messageProcessed , promulgateRC @@ -25,6 +30,8 @@ import HA.RecoveryCoordinator.Actions.Hardware import HA.RecoveryCoordinator.Actions.Mero import HA.RecoveryCoordinator.Castor.Drive.Events import HA.RecoveryCoordinator.Castor.Drive.Actions +import HA.RecoveryCoordinator.Job.Actions +import HA.RecoveryCoordinator.Job.Events import HA.RecoveryCoordinator.Rules.Mero.Conf import HA.Resources (Node(..)) import HA.Resources.Castor @@ -46,7 +53,7 @@ import Mero.Notification.HAState (Note(..)) import Control.Distributed.Process ( Process ) -import Control.Lens ((<&>)) +import Control.Lens import Control.Monad ( forM_ , when @@ -56,11 +63,26 @@ import Control.Monad import Control.Monad.IO.Class import Data.Foldable (for_) +import Data.Proxy (Proxy(..)) import Data.Text (Text, pack) +import Data.Vinyl import Debug.Trace (traceEventIO) import Network.CEP +data DeviceInfo = DeviceInfo { + _diSDev :: StorageDevice + , _diSerial :: Text +} + +fldNode :: Proxy '("node", Maybe Node) +fldNode = Proxy + +type FldDeviceInfo = '("deviceInfo", Maybe DeviceInfo) +-- | Device info used in SMART rule +fldDeviceInfo :: Proxy FldDeviceInfo +fldDeviceInfo = Proxy + -------------------------------------------------------------------------------- -- Reset bit -- -------------------------------------------------------------------------------- @@ -168,7 +190,11 @@ ruleResetAttempt = define "reset-attempt" $ do paths <- lookupStorageDeviceSerial sdev case paths of serial:_ -> do - put Local (Just (sdev, pack serial, node, uid)) + modify Local $ rlens fldUUID . rfield .~ Just uid + modify Local $ rlens fldNode . rfield .~ Just node + modify Local $ rlens fldDeviceInfo . rfield .~ + (Just $ DeviceInfo sdev (pack serial)) + whenM (isStorageDriveRemoved sdev) $ do phaseLog "info" $ "Cancelling drive reset as drive is removed." phaseLog "sdev" $ show sdev @@ -184,12 +210,15 @@ ruleResetAttempt = define "reset-attempt" $ do stop (disk_detached, detachDisk) <- mkDetachDisk - (fmap join . traverse (\(sdev,_,_,_) -> lookupStorageDeviceSDev sdev)) - (\_ _ -> switch [drive_removed, resetComplete, timeout driveResetTimeout failure]) - (\_ -> switch [drive_removed, resetComplete, timeout driveResetTimeout failure]) + (\l -> fmap join $ traverse + (lookupStorageDeviceSDev . _diSDev) + (l ^. rlens fldDeviceInfo . rfield)) + (\_ _ -> switch [drive_removed, resetComplete, timeout driveResetTimeout failure]) + (\_ -> switch [drive_removed, resetComplete, timeout driveResetTimeout failure]) directly reset $ do - Just (sdev, serial, Node nid, _) <- get Local + Just (DeviceInfo sdev serial) <- gets Local (^. rlens fldDeviceInfo . rfield) + Just (Node nid) <- gets Local (^. rlens fldNode . rfield) i <- getDiskResetAttempts sdev phaseLog "debug" $ "Current reset attempts: " ++ show i if i <= resetAttemptThreshold @@ -209,7 +238,7 @@ ruleResetAttempt = define "reset-attempt" $ do else continue failure setPhaseIf resetComplete (onCommandAck DriveReset) $ \(result, eid) -> do - Just (sdev, _, _, _) <- get Local + Just (DeviceInfo sdev _) <- gets Local (^. rlens fldDeviceInfo . rfield) markResetComplete sdev if result then do @@ -223,13 +252,17 @@ ruleResetAttempt = define "reset-attempt" $ do continue failure directly smart $ do - Just (sdev, serial, node, _) <- get Local - promulgateRC $ SMARTRequest node sdev + Just (DeviceInfo sdev _) <- gets Local (^. rlens fldDeviceInfo . rfield) + Just node <- gets Local (^. rlens fldNode . rfield) + smartId <- startJob $ SMARTRequest node sdev + modify Local $ rlens fldListenerId . rfield .~ Just smartId switch [ drive_removed, smartResponse , timeout smartTestTimeout failure ] (disk_attached, attachDisk) <- mkAttachDisk - (fmap join . traverse (\(sdev, _, _, _) -> lookupStorageDeviceSDev sdev) ) + (\l -> fmap join $ traverse + (lookupStorageDeviceSDev . _diSDev) + (l ^. rlens fldDeviceInfo . rfield)) (\_ _ -> do phaseLog "error" "failed to attach disk" continue end) (\m0sdev -> do @@ -242,7 +275,7 @@ ruleResetAttempt = define "reset-attempt" $ do continue end) setPhaseIf smartResponse onSameSdev $ \status -> do - Just (sdev, _, _, _) <- get Local + Just (DeviceInfo sdev _) <- gets Local (^. rlens fldDeviceInfo . rfield) phaseLog "sdev" $ show sdev phaseLog "smart.response" $ show status case status of @@ -258,7 +291,7 @@ ruleResetAttempt = define "reset-attempt" $ do continue failure directly failure $ do - Just (sdev, _, _, _) <- get Local + Just (DeviceInfo sdev _) <- gets Local (^. rlens fldDeviceInfo . rfield) phaseLog "info" $ "Drive reset failure for " ++ show sdev promulgateRC $ ResetFailure sdev sd <- lookupStorageDeviceSDev sdev @@ -274,7 +307,7 @@ ruleResetAttempt = define "reset-attempt" $ do continue end directly end $ do - Just (_, _, _, uid) <- get Local + Just uid <- gets Local (^. rlens fldUUID . rfield) messageProcessed uid stop @@ -284,42 +317,54 @@ ruleResetAttempt = define "reset-attempt" $ do markResetComplete sdev continue end - startFork home Nothing - + startFork home args + where + args = fldUUID =: Nothing + <+> fldNode =: Nothing + <+> fldDeviceInfo =: Nothing + <+> fldListenerId =: Nothing -------------------------------------------------------------------------------- -- Helpers -------------------------------------------------------------------------------- -onCommandAck :: (Text -> NodeCmd) - -> HAEvent CommandAck - -> g - -> Maybe (StorageDevice, Text, Node, UUID) - -> Process (Maybe (Bool, UUID)) -onCommandAck _ _ _ Nothing = return Nothing -onCommandAck k (HAEvent eid cmd _) _ (Just (_, serial, _, _)) = +onCommandAck :: forall g l. (FldDeviceInfo ∈ l) + => (Text -> NodeCmd) + -> HAEvent CommandAck + -> g + -> FieldRec l + -> Process (Maybe (Bool, UUID)) +onCommandAck k (HAEvent eid cmd _) _ + ((view $ rlens fldDeviceInfo . rfield) + -> Just (DeviceInfo _ serial)) = case commandAckType cmd of Just x | (k serial) == x -> return $ Just (commandAck cmd == AckReplyPassed, eid) | otherwise -> return Nothing _ -> return Nothing +onCommandAck _ _ _ _ = return Nothing -onDriveRemoved :: DriveRemoved +onDriveRemoved :: forall g l. (FldDeviceInfo ∈ l) + => DriveRemoved -> g - -> Maybe (StorageDevice, Text, Node, UUID) + -> FieldRec l -> Process (Maybe StorageDevice) -onDriveRemoved dr _ (Just (sdev, _, _, _)) = +onDriveRemoved dr _ ((view $ rlens fldDeviceInfo . rfield) + -> Just (DeviceInfo sdev _)) = if drDevice dr == sdev then return $ Just sdev else return Nothing onDriveRemoved _ _ _ = return Nothing -onSameSdev :: SMARTResponse +onSameSdev :: forall g l. (FldDeviceInfo ∈ l, FldListenerId ∈ l) + => JobFinished SMARTResponse -> g - -> Maybe (StorageDevice, Text, Node, UUID) + -> FieldRec l -> Process (Maybe SMARTResponseStatus) -onSameSdev (SMARTResponse sdev' status) _ (Just (sdev, _, _, _)) = - if sdev' == sdev - then return $ Just status - else return Nothing -onSameSdev _ _ _ = return Nothing +onSameSdev (JobFinished listenerIds (SMARTResponse sdev' status)) _ l = + return $ case (,) <$> ( l ^. rlens fldDeviceInfo . rfield ) + <*> ( l ^. rlens fldListenerId . rfield ) of + Just (DeviceInfo sdev _, smartId) + | smartId `elem` listenerIds + && sdev == sdev' -> Just status + _ -> Nothing diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Smart.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Smart.hs index 78daaae5..927f7063 100644 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Smart.hs +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Castor/Drive/Rules/Smart.hs @@ -34,7 +34,7 @@ import HA.RecoveryCoordinator.Actions.Hardware , isStorageDevicePowered , lookupStorageDeviceSerial ) -import HA.RecoveryCoordinator.Actions.Job +import HA.RecoveryCoordinator.Job.Actions import HA.RecoveryCoordinator.Castor.Drive.Events ( SMARTRequest(..) , SMARTResponse(..) diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Job/Actions.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Job/Actions.hs new file mode 100644 index 00000000..77bfc7b1 --- /dev/null +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Job/Actions.hs @@ -0,0 +1,146 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE TypeOperators #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE FlexibleContexts #-} +-- | +-- Copyright: (C) 2015 Seagate Technology Limited. +-- +-- Helpers that simplifies creation of the long running processes +module HA.RecoveryCoordinator.Job.Actions + ( -- * Process + Job(..) + , mkJobRule + , startJob + , ListenerId + , FldListenerId + , fldListenerId + ) where + +import HA.RecoveryCoordinator.Job.Events +import HA.RecoveryCoordinator.Job.Internal + +import HA.EventQueue.Types +import HA.RecoveryCoordinator.Actions.Core + +import Control.Distributed.Process.Serializable +import Control.Lens +import Control.Monad (unless, join) +import Control.Monad.IO.Class (liftIO) + +import Data.Binary (Binary) +import Data.Foldable (for_) +import Data.Traversable (for) +import Data.Typeable (Typeable) +import Data.Proxy +import Data.Vinyl +import qualified Data.UUID.V4 as UUID + +import Network.CEP + +type FldListenerId = '("listenerId", Maybe ListenerId) + +fldListenerId :: Proxy FldListenerId +fldListenerId = Proxy + +-- | Process handle. Process is a long running rule +-- that is triggered when some @input@ event is received +-- and emits @output@ event as a result of it's run. +newtype Job input output = Job String + +-- | Create rule for a given process. This is a helper +-- method that removes some boilerplate that is needed +-- in order to define such rule. +-- +-- Job identity is completely determined by its input +-- event; for each event, only one instance of any job +-- may run. Note that this means that if we have two +-- distinct jobs that take the same input, only one of +-- them may run at any given time. +-- +-- Note that also no other rules should fire on the +-- input, unless they do not mind that the event could +-- be deleted. +-- +-- It's not legitimate to call 'Network.CEP.stop' inside +-- this @body@. +mkJobRule :: forall input output l s . + ( '("request", Maybe input) ∈ l, '("reply", Maybe output) ∈ l + , Serializable input, Serializable output, Ord input,Show input, s ~ Rec ElField l, Show output) + => Job input output -- ^ Process name. + -> s + -> (Jump PhaseHandle -> RuleM LoopState s (input -> PhaseM LoopState s (Maybe [Jump PhaseHandle]))) + -- ^ Rule body, takes final handle as paramter, returns an action used to + -- decide how to process rule + -> Definitions LoopState () +mkJobRule (Job name) + args + body = define name $ do + request <- phaseHandle $ name ++ " -> request" + indexed_request <- phaseHandle $ name ++ " -> indexed request" + finish <- phaseHandle $ name ++ " -> finish" + end <- phaseHandle $ name ++ " -> end" + + check_input <- body finish + + let processRequest eid input listeners = do + isRunning <- memberStorageMapRC pJD input + if isRunning + then do + phaseLog "info" $ "Job is already running" + phaseLog "input" $ show input + if null listeners + then do phaseLog "action" "Mark message processed" + messageProcessed eid + else do phaseLog "action" "Adding listeners" + insertWithStorageMapRC (mappend) input (JobDescription [eid] listeners) + else do + phaseLog "request" $ show input + modify Local $ rlens fldRequest .~ (Field $ Just input) + check_input input >>= \case + Nothing -> do + phaseLog "action" "Ignoring message due to rule filter." + messageProcessed eid + Just next -> do + phaseLog "action" "Starting execution." + insertWithStorageMapRC (mappend) input (JobDescription [eid] listeners) + fork CopyNewerBuffer $ switch next + + + setPhase request $ \(HAEvent eid input _) -> processRequest eid input [] + setPhase indexed_request $ \(HAEvent eid (JobStartRequest uuid input) _) -> + processRequest eid input [uuid] + + directly finish $ do -- XXX: use rule finalier, when implemented + state <- get Local + let req = state ^. rlens fldRequest + rep = state ^. rlens fldReply + phaseLog "request" $ maybe "N/A" show (getField req) + phaseLog "reply" $ show (getField rep) + mdescription <- fmap join $ for (getField req) $ \input -> do + x <- lookupStorageMapRC input + deleteStorageMapRC pJD input + return x + for_ (getField rep) notify + for_ mdescription $ \(JobDescription uuids listeners) -> do + for_ uuids messageProcessed + unless (null listeners) $ do + for_ (getField rep) $ notify . JobFinished listeners + continue end + + directly end stop + + startForks [request, indexed_request] args + where + pJD :: Proxy JobDescription + pJD = Proxy + fldRequest :: Proxy '("request", Maybe input) + fldRequest = Proxy + fldReply :: Proxy '("reply", Maybe output) + fldReply = Proxy + +startJob :: (Typeable r, Binary r) => r -> PhaseM LoopState l ListenerId +startJob request = do + l <- ListenerId <$> liftIO UUID.nextRandom + promulgateRC $ JobStartRequest l request + return l diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Job/Events.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Job/Events.hs new file mode 100644 index 00000000..d62d35fe --- /dev/null +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Job/Events.hs @@ -0,0 +1,34 @@ +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE DeriveGeneric #-} +-- | +-- Copyright: (C) 2016 Seagate Technology Limited. +-- +-- Helpers that simplifies creation of the long running processes +module HA.RecoveryCoordinator.Job.Events + ( JobStartRequest(..) + , JobFinished(..) + ) where + +import HA.RecoveryCoordinator.Job.Internal +import Data.UUID (UUID) + +import Data.Binary (Binary) +import Data.Typeable (Typeable) +import GHC.Generics (Generic) + +-- | Request to start a new job. +-- This event starts job with input @a@ but also adds a listener +-- to the rule. This way rule can match the job it's interested +-- in. +data JobStartRequest a = JobStartRequest ListenerId a + deriving (Typeable, Generic, Show) + +instance Binary a => Binary (JobStartRequest a) + +-- | Event that is sent when job with listeners finished it's +-- execution. +data JobFinished a = JobFinished [ListenerId] a + deriving (Typeable, Generic, Show) + +instance Binary a => Binary (JobFinished a) + diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Job/Internal.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Job/Internal.hs new file mode 100644 index 00000000..ca94e66a --- /dev/null +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Job/Internal.hs @@ -0,0 +1,27 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +-- | +-- Copyright: (C) 2015 Seagate Technology Limited. +-- +module HA.RecoveryCoordinator.Job.Internal + ( ListenerId(..) + , JobDescription(..) + ) where + +import Data.UUID +import Data.Binary (Binary) +import Data.Typeable (Typeable) +import GHC.Generics + +-- | Wrapper for listener id. +newtype ListenerId = ListenerId UUID + deriving (Typeable, Generic, Binary, Ord, Eq, Show) + +data JobDescription = JobDescription + { requestUUIDS :: [UUID] + , listenersUUIDS :: [ListenerId] + } + +instance Monoid JobDescription where + mempty = JobDescription [] [] + (JobDescription a b) `mappend` (JobDescription c d) = + JobDescription (a `mappend` c) (b `mappend` d) diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Cluster.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Cluster.hs index 8f083cbc..a9346e7f 100644 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Cluster.hs +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Cluster.hs @@ -67,7 +67,7 @@ import HA.RecoveryCoordinator.Rules.Mero.Conf ( applyStateChanges , setPhaseNotified ) -import HA.RecoveryCoordinator.Actions.Job +import HA.RecoveryCoordinator.Job.Actions import HA.Services.Mero.RC (meroChannel) import Mero.ConfC (ServiceType(..)) import Network.CEP diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Node.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Node.hs index 1633cdfa..d22155a8 100644 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Node.hs +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Node.hs @@ -121,8 +121,8 @@ import HA.EventQueue.Types import HA.RecoveryCoordinator.Actions.Core import HA.RecoveryCoordinator.Actions.Hardware import HA.RecoveryCoordinator.Actions.Mero -import HA.RecoveryCoordinator.Actions.Job import HA.RecoveryCoordinator.Actions.Service (lookupInfoMsg) +import HA.RecoveryCoordinator.Job.Actions import HA.RecoveryCoordinator.Events.Castor.Cluster import HA.RecoveryCoordinator.Events.Castor.Process import HA.RecoveryCoordinator.Events.Mero diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Process.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Process.hs index e9db1ad8..67eae326 100644 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Process.hs +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Castor/Process.hs @@ -13,7 +13,7 @@ module HA.RecoveryCoordinator.Rules.Castor.Process import HA.EventQueue.Types import HA.RecoveryCoordinator.Actions.Core -import HA.RecoveryCoordinator.Actions.Job +import HA.RecoveryCoordinator.Job.Actions import HA.RecoveryCoordinator.Actions.Mero import HA.RecoveryCoordinator.Events.Castor.Process import HA.RecoveryCoordinator.Events.Castor.Cluster diff --git a/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Service.hs b/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Service.hs index 185523b8..d20cce42 100644 --- a/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Service.hs +++ b/mero-halon/src/lib/HA/RecoveryCoordinator/Rules/Service.hs @@ -35,7 +35,7 @@ import HA.Service ) import HA.RecoveryCoordinator.Events.Service -import HA.RecoveryCoordinator.Actions.Job +import HA.RecoveryCoordinator.Job.Actions import qualified HA.RecoveryCoordinator.Actions.Service as Service import Control.Monad (when, unless)