Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 58 additions & 32 deletions beam-postgres/Database/Beam/Postgres/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import qualified Control.Monad.Fail as Fail
import Data.ByteString (ByteString)
import Data.ByteString.Builder (toLazyByteString, byteString)
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Char8 as BLC
import Data.Maybe (listToMaybe, fromMaybe)
import Data.Proxy
import Data.String
Expand All @@ -77,6 +78,39 @@ import System.Clock

import Network.URI (uriToString)

import GHC.Stats (getRTSStats, RTSStats(..))
import Data.Time.Clock.System (getSystemTime, systemToTAITime)
import Data.Time (diffTimeToPicoseconds)
import Data.Time.Clock.TAI (diffAbsoluteTime)

-- | Track execution time and GC stats, then call callback
withTickTock :: (Text -> Text -> Text -> ByteString -> IO ()) -> Text -> ByteString -> IO a -> IO a
withTickTock callback tag query action = do
t1 <- getSystemTime
rtsTick <- getRTSStats
result <- action
rtsTock <- getRTSStats
t2 <- getSystemTime
let execTime = diffTimeToPicoseconds $ diffAbsoluteTime (systemToTAITime t2) (systemToTAITime t1)
latency = execTime `div` 10 ^ (9 :: Int)
gcTime = (gc_elapsed_ns rtsTock) - (gc_elapsed_ns rtsTick)
callback (T.pack $ show latency <> " ms") (T.pack $ show gcTime <> " ns") tag query
pure result

-- | Track execution time and GC stats, then call callback
withTickTock' :: (Text -> Text -> Text -> ByteString -> IO ()) -> Text -> IO ByteString -> IO ByteString
withTickTock' callback tag action = do
t1 <- getSystemTime
rtsTick <- getRTSStats
result <- action
rtsTock <- getRTSStats
t2 <- getSystemTime
let execTime = diffTimeToPicoseconds $ diffAbsoluteTime (systemToTAITime t2) (systemToTAITime t1)
latency = execTime `div` 10 ^ (9 :: Int)
gcTime = (gc_elapsed_ns rtsTock) - (gc_elapsed_ns rtsTick)
callback (T.pack $ show latency) (T.pack $ show gcTime) tag result
pure result

data PgStream a = PgStreamDone (Either BeamRowReadError a)
| PgStreamContinue (Maybe PgI.Row -> IO (PgStream a))

Expand Down Expand Up @@ -182,57 +216,49 @@ runPgRowReader conn rowIdx res fields (FromBackendRowM readRow) =

finish x _ _ _ = pure (Right x)

withPgDebug :: (Text -> IO ()) -> Pg.Connection -> Pg a -> IO (Either BeamRowReadError a)
withPgDebug dbg conn (Pg action) =
withPgDebug :: (Text -> IO ()) -> (Text -> Text -> Text -> ByteString -> IO ()) -> Pg.Connection -> Pg a -> IO (Either BeamRowReadError a)
withPgDebug dbg tickTock conn (Pg action) =
let finish x = pure (Right x)
step (PgLiftIO io next) = io >>= next
step (PgLiftWithHandle withConn next) = withConn conn >>= next
step (PgFetchNext next) = next Nothing
step (PgRunReturning (PgCommandSyntax PgCommandTypeQuery syntax)
(mkProcess :: Pg (Maybe x) -> Pg a')
next) =
do query <- pgRenderSyntax conn syntax
do query <- withTickTock' tickTock "ORM_QUERY" $ pgRenderSyntax conn syntax
let Pg process = mkProcess (Pg (liftF (PgFetchNext id)))
action' <- runF process finishProcess stepProcess Nothing
(res, extime) <-
res <-
case action' of
PgStreamDone (Right x) -> do
start <- getTime Monotonic
Pg.execute_ conn (Pg.Query query)
end <- getTime Monotonic
(, Just (end - start)) <$> next x
PgStreamDone (Left err) -> pure (Left err, Nothing)
PgStreamDone (Right x) ->
withTickTock tickTock "EXECUTE" query $ do
Pg.execute_ conn (Pg.Query query)
next x
PgStreamDone (Left err) -> pure (Left err)
PgStreamContinue nextStream ->
let finishUp (PgStreamDone (Right x)) = (, Nothing) <$> next x
finishUp (PgStreamDone (Left err)) = pure (Left err, Nothing)
let finishUp (PgStreamDone (Right x)) = next x
finishUp (PgStreamDone (Left err)) = pure (Left err)
finishUp (PgStreamContinue next') = next' Nothing >>= finishUp

columnCount = fromIntegral $ valuesNeeded (Proxy @Postgres) (Proxy @x)
in do resp <- Pg.queryWith_ (Pg.RP (put columnCount >> ask)) conn (Pg.Query query)
foldM runConsumer (PgStreamContinue nextStream) resp >>= finishUp
dbg (decodeUtf8 query <> " Executed in: " <> T.pack (show extime) <> " seconds ") >> return res
in do
resp <- withTickTock tickTock "EXECUTE" query $ Pg.queryWith_ (Pg.RP (put columnCount >> ask)) conn (Pg.Query query)
withTickTock tickTock "DECODE" (query <> " rows returned " <> (BLC.pack $ show $ length resp)) $ foldM runConsumer (PgStreamContinue nextStream) resp >>= finishUp
pure res
step (PgRunReturning (PgCommandSyntax PgCommandTypeDataUpdateReturning syntax) mkProcess next) =
do query <- pgRenderSyntax conn syntax
do query <- withTickTock' tickTock "ORM_QUERY" $ pgRenderSyntax conn syntax

start <- getTime Monotonic
res <- Pg.exec conn query
end <- getTime Monotonic
let extime = end - start
dbg (decodeUtf8 query <> " Executed in: " <> T.pack (show extime) <> " seconds ")
res <- withTickTock tickTock "EXECUTE" query $ Pg.exec conn query
sts <- Pg.resultStatus res
case sts of
Pg.TuplesOk -> do
let Pg process = mkProcess (Pg (liftF (PgFetchNext id)))
runF process (\x _ -> Pg.unsafeFreeResult res >> next x) (stepReturningList res) 0
withTickTock tickTock "DECODE" query $ runF process (\x _ -> Pg.unsafeFreeResult res >> next x) (stepReturningList res) 0
_ -> Pg.throwResultError "No tuples returned to Postgres update/insert returning"
res sts
step (PgRunReturning (PgCommandSyntax _ syntax) mkProcess next) =
do query <- pgRenderSyntax conn syntax
start <- getTime Monotonic
_ <- Pg.execute_ conn (Pg.Query query)
end <- getTime Monotonic
let extime = end - start
dbg (decodeUtf8 query <> " Executed in: " <> T.pack (show extime) <> " seconds ")
do query <- withTickTock' tickTock "ORM_QUERY" $ pgRenderSyntax conn syntax
withTickTock tickTock "EXECUTE" query $ Pg.execute_ conn (Pg.Query query)
let Pg process = mkProcess (Pg (liftF (PgFetchNext id)))
runF process next stepReturningNone

Expand Down Expand Up @@ -319,12 +345,12 @@ instance MonadIO Pg where
liftIOWithHandle :: (Pg.Connection -> IO a) -> Pg a
liftIOWithHandle f = liftF (PgLiftWithHandle f id)

runBeamPostgresDebug :: (Text -> IO ()) -> Pg.Connection -> Pg a -> IO a
runBeamPostgresDebug dbg conn action =
withPgDebug dbg conn action >>= either throwIO pure
runBeamPostgresDebug :: (Text -> IO ()) -> (Text -> Text -> Text -> ByteString -> IO ()) -> Pg.Connection -> Pg a -> IO a
runBeamPostgresDebug dbg tickTock conn action =
withPgDebug dbg tickTock conn action >>= either throwIO pure

runBeamPostgres :: Pg.Connection -> Pg a -> IO a
runBeamPostgres = runBeamPostgresDebug (\_ -> pure ())
runBeamPostgres = runBeamPostgresDebug (\_ -> pure ()) (\_ _ _ _ -> pure ())

instance MonadBeam Postgres Pg where
runReturningMany cmd consume =
Expand Down
2 changes: 1 addition & 1 deletion beam-postgres/Database/Beam/Postgres/Migrate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ migrationBackend = Tool.BeamMigrationBackend
pgCustomEnumActionProvider)
(\options action ->
bracket (Pg.connectPostgreSQL (fromString options)) Pg.close $ \conn ->
left show <$> withPgDebug (\_ -> pure ()) conn action)
left show <$> withPgDebug (\_ -> pure ()) (\_ _ _ _ -> pure ()) conn action)

-- | 'BeamDeserializers' for postgres-specific types:
--
Expand Down
Loading