diff --git a/IHP/DBEvent.hs b/IHP/DBEvent.hs index 185777d7c..8f4fe8518 100644 --- a/IHP/DBEvent.hs +++ b/IHP/DBEvent.hs @@ -28,7 +28,7 @@ import Database.PostgreSQL.Simple.Notification (notificationPid) initDbEvents :: (?context :: ControllerContext, ?applicationContext :: ApplicationContext) => IO () initDbEvents = do putContext ?applicationContext.pgListener - + respondDbEvent :: (?modelContext :: ModelContext, ?context :: ControllerContext, ?touchedTables::IORef (Set ByteString)) => ByteString -> IO () @@ -37,34 +37,41 @@ respondDbEvent eventName = do putStrLn $ "Registering notification trigger for tables: " <> show touchedTables let streamBody sendChunk flush = do - sendChunk (ByteString.stringUtf8 "data: Connection established!\n\n") >> flush - - - pgListener <- fromContext @PGListener.PGListener - touchedTables |> mapM (\table -> do - let createTriggerSql = notificationTrigger table + sendChunk (ByteString.stringUtf8 "data: Connection established!\n\n") >> flush - withRowLevelSecurityDisabled do - sqlExec createTriggerSql () - pure () + pgListener <- fromContext @PGListener.PGListener + touchedTables |> mapM (\table -> do + let createTriggerSql = notificationTrigger table - pgListener |> PGListener.subscribe (channelName table) \notification -> do - let pid = notification.notificationPid |> show |> cs - (sendChunk (ByteString.stringUtf8 $ "id:" <> pid <> "\nevent:" <> cs eventName <> "\ndata: " <> cs table <> " change event triggered\n\n") >> flush) `Exception.catch ` (\e -> putStrLn $ "Error sending chunk: " ++ show (e :: Exception.SomeException))) - - forever do - threadDelay (30 * 1000000) - sendChunk (ByteString.stringUtf8 ": heartbeat\n\n") >> flush + withRowLevelSecurityDisabled do + sqlExec createTriggerSql () pure () - - respondAndExit $ responseStream - status200 - [ ("Cache-Control", "no-store") - , ("Connection", "keep-alive") - , (hContentType, "text/event-stream") - ] - streamBody + pgListener |> PGListener.subscribe (channelName table) \notification -> do + let pid = notification.notificationPid |> show |> cs + sendChunk (ByteString.stringUtf8 $ + "id:" <> pid <> "\n" <> + "event:" <> cs eventName <> "\n" <> + "data: " <> cs table <> " change event triggered\n\n") + >> flush + `Exception.catch` (\e -> putStrLn $ "Error sending chunk: " ++ show (e :: Exception.SomeException) + ) + ) + + forever do + threadDelay (30 * 1000000) + sendChunk (ByteString.stringUtf8 ": heartbeat\n\n") >> flush + pure () + + + respondAndExit $ + responseStream + status200 + [ ("Cache-Control", "no-store") + , ("Connection", "keep-alive") + , (hContentType, "text/event-stream") + ] + streamBody