From 2a7aa527b8ab40f4b18e75cca3e557c1a22414d3 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Tue, 20 Jan 2026 19:04:41 +0100 Subject: [PATCH 1/7] Use semantic convention span attributes --- build.sbt | 1 + .../src/main/scala/net/protocol/Query.scala | 8 ++-- .../src/main/scala/net/protocol/package.scala | 10 ++++- .../shared/src/main/scala/util/Otel.scala | 40 +++++++++++++++++++ 4 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 modules/core/shared/src/main/scala/util/Otel.scala diff --git a/build.sbt b/build.sbt index 319d1b09a..266ac1d20 100644 --- a/build.sbt +++ b/build.sbt @@ -124,6 +124,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) "org.scodec" %%% "scodec-core" % (if (tlIsScala3.value) "2.3.3" else "1.11.11"), "org.scodec" %%% "scodec-cats" % "1.3.0-RC1", "org.typelevel" %%% "otel4s-core-trace" % otel4sVersion, + "org.typelevel" %%% "otel4s-semconv" % otel4sVersion, "org.tpolecat" %%% "sourcepos" % "1.2.0", "org.typelevel" %%% "twiddles-core" % "1.0.0-RC2", ) ++ Seq( diff --git a/modules/core/shared/src/main/scala/net/protocol/Query.scala b/modules/core/shared/src/main/scala/net/protocol/Query.scala index 3e86ac9b3..329019502 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Query.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Query.scala @@ -12,7 +12,7 @@ import skunk.exception._ import skunk.net.message.{ Query => QueryMessage, _ } import skunk.net.MessageSocket import skunk.util.Typer -import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.semconv.attributes.DbAttributes import org.typelevel.otel4s.trace.Span import org.typelevel.otel4s.trace.Tracer import skunk.Statement @@ -72,7 +72,7 @@ object Query { override def apply[B](query: skunk.Query[Void, B], ty: Typer): F[List[B]] = exchange("query") { (span: Span[F]) => span.addAttribute( - Attribute("query.sql", query.sql) + DbAttributes.DbQueryText(query.sql) ) *> send(QueryMessage(query.sql)) *> flatExpect { // If we get a RowDescription back it means we have a valid query as far as Postgres is @@ -164,7 +164,7 @@ object Query { override def apply(command: Command[Void]): F[Completion] = exchange("query") { (span: Span[F]) => span.addAttribute( - Attribute("command.sql", command.sql) + DbAttributes.DbQueryText(command.sql) ) *> send(QueryMessage(command.sql)) *> flatExpect { case CommandComplete(c) => @@ -247,7 +247,7 @@ object Query { override def applyDiscard(statement: Statement[Void]): F[Unit] = exchange("query") { (span: Span[F]) => span.addAttribute( - Attribute("command.sql", statement.sql) + DbAttributes.DbQueryText(statement.sql) ) *> send(QueryMessage(statement.sql)) *> finishUpDiscard(statement, None) } } diff --git a/modules/core/shared/src/main/scala/net/protocol/package.scala b/modules/core/shared/src/main/scala/net/protocol/package.scala index 2d7fa813e..2445bbc66 100644 --- a/modules/core/shared/src/main/scala/net/protocol/package.scala +++ b/modules/core/shared/src/main/scala/net/protocol/package.scala @@ -7,14 +7,22 @@ package skunk.net import skunk.net.message._ import skunk.util.Namer import skunk.util.Origin +import skunk.util.Otel import org.typelevel.otel4s.trace.Span import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.trace.SpanKind package object protocol { def exchange[F[_]: Tracer, A](label: String)(f: Span[F] => F[A])( implicit exchange: Exchange[F] - ): F[A] = Tracer[F].span(label).use(span => exchange(f(span))) + ): F[A] = + Tracer[F].spanBuilder(label) + .withSpanKind(SpanKind.Client) + .addAttribute(Otel.DbSystemName) + .withFinalizationStrategy(Otel.PostgresStrategy) + .build + .use(span => exchange(f(span))) def receive[F[_]](implicit ev: MessageSocket[F]): F[BackendMessage] = ev.receive diff --git a/modules/core/shared/src/main/scala/util/Otel.scala b/modules/core/shared/src/main/scala/util/Otel.scala new file mode 100644 index 000000000..abb7ca782 --- /dev/null +++ b/modules/core/shared/src/main/scala/util/Otel.scala @@ -0,0 +1,40 @@ +// Copyright (c) 2018-2024 by Rob Norris and Contributors +// This software is licensed under the MIT License (MIT). +// For more information see LICENSE or https://opensource.org/licenses/MIT + +package skunk.util + +import cats.effect.Resource +import cats.syntax.semigroup._ +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.semconv.attributes.DbAttributes +import org.typelevel.otel4s.trace.SpanFinalizer +import org.typelevel.otel4s.trace.StatusCode +import skunk.exception.PostgresErrorException + +object Otel { + + val DbSystemName = DbAttributes.DbSystemName("postgresql") + + // Similar to the default reportAbnormal strategy but records some + // postgresql specific attributes in case it is a postgres error + val PostgresStrategy: SpanFinalizer.Strategy = { + case Resource.ExitCase.Errored(e: PostgresErrorException) => + val builder = Attributes.newBuilder + + builder += DbAttributes.DbResponseStatusCode(e.code) + builder ++= DbAttributes.DbCollectionName.maybe(e.tableName) + builder ++= DbAttributes.DbNamespace.maybe(e.schemaName) + + SpanFinalizer.recordException(e) |+| + SpanFinalizer.setStatus(StatusCode.Error) |+| + SpanFinalizer.addAttributes(builder.result()) + + case Resource.ExitCase.Errored(e) => + SpanFinalizer.recordException(e) |+| SpanFinalizer.setStatus(StatusCode.Error) + + case Resource.ExitCase.Canceled => + SpanFinalizer.setStatus(StatusCode.Error, "canceled") + } + +} From caa70a4355bf26815af4a1df5f7b06bf8097ff47 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Thu, 22 Jan 2026 19:52:07 +0100 Subject: [PATCH 2/7] Add basic metrics --- build.sbt | 3 +- .../main/scala/skunk/bench/SelectBench.scala | 2 + .../core/shared/src/main/scala/Session.scala | 13 ++- .../shared/src/main/scala/net/Protocol.scala | 103 ++++++++++-------- .../src/main/scala/net/protocol/Bind.scala | 11 +- .../main/scala/net/protocol/BindExecute.scala | 13 ++- .../src/main/scala/net/protocol/Close.scala | 9 +- .../src/main/scala/net/protocol/Execute.scala | 9 +- .../scala/net/protocol/ParseDescribe.scala | 10 +- .../src/main/scala/net/protocol/Prepare.scala | 15 +-- .../src/main/scala/net/protocol/Query.scala | 13 ++- .../src/main/scala/net/protocol/Startup.scala | 8 +- .../src/main/scala/net/protocol/package.scala | 13 ++- .../shared/src/main/scala/util/Otel.scala | 40 ++++++- .../src/main/scala/AppliedFragments.scala | 2 + modules/example/src/main/scala/Channel.scala | 2 + modules/example/src/main/scala/Error.scala | 2 + modules/example/src/main/scala/Join.scala | 2 + modules/example/src/main/scala/Main.scala | 2 + modules/example/src/main/scala/Math1.scala | 2 + modules/example/src/main/scala/Math2.scala | 2 + modules/example/src/main/scala/Minimal1.scala | 2 + modules/example/src/main/scala/Minimal2.scala | 15 ++- modules/example/src/main/scala/Minimal3.scala | 2 + .../example/src/main/scala/Transaction.scala | 2 + modules/example/src/main/scala/Values.scala | 2 + .../src/main/scala/ffstest/FFramework.scala | 3 + 27 files changed, 202 insertions(+), 100 deletions(-) diff --git a/build.sbt b/build.sbt index 266ac1d20..5e3ed6f38 100644 --- a/build.sbt +++ b/build.sbt @@ -123,8 +123,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) "org.scodec" %%% "scodec-bits" % "1.2.4", "org.scodec" %%% "scodec-core" % (if (tlIsScala3.value) "2.3.3" else "1.11.11"), "org.scodec" %%% "scodec-cats" % "1.3.0-RC1", - "org.typelevel" %%% "otel4s-core-trace" % otel4sVersion, + "org.typelevel" %%% "otel4s-core" % otel4sVersion, "org.typelevel" %%% "otel4s-semconv" % otel4sVersion, + "org.typelevel" %%% "otel4s-semconv-metrics" % otel4sVersion, "org.tpolecat" %%% "sourcepos" % "1.2.0", "org.typelevel" %%% "twiddles-core" % "1.0.0-RC2", ) ++ Seq( diff --git a/modules/bench/src/main/scala/skunk/bench/SelectBench.scala b/modules/bench/src/main/scala/skunk/bench/SelectBench.scala index b1568b4b8..3b6808965 100644 --- a/modules/bench/src/main/scala/skunk/bench/SelectBench.scala +++ b/modules/bench/src/main/scala/skunk/bench/SelectBench.scala @@ -12,10 +12,12 @@ import skunk.codec.all._ import java.sql.DriverManager import org.openjdk.jmh.annotations._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter @State(Scope.Benchmark) object SelectBenchScope { implicit val tracer: Tracer[IO] = Tracer.noop[IO] + implicit val meter: Meter[IO] = Meter.noop[IO] val defaultChunkSize = 512 diff --git a/modules/core/shared/src/main/scala/Session.scala b/modules/core/shared/src/main/scala/Session.scala index 9bc04bce9..e6ffbf6f3 100644 --- a/modules/core/shared/src/main/scala/Session.scala +++ b/modules/core/shared/src/main/scala/Session.scala @@ -23,6 +23,7 @@ import skunk.net.SSLNegotiation import skunk.net.protocol.Describe import scala.concurrent.duration.Duration import skunk.net.protocol.Parse +import org.typelevel.otel4s.metrics.Meter /** * Represents a live connection to a Postgres database. Operations provided here are safe to use @@ -476,7 +477,7 @@ object Session { * @param queryCacheSize size of the session-level cache for query checking; defaults to 2048 * @param parseCacheSize size of the pool-level cache for parsing statements; defaults to 2048 */ - final class Builder[F[_]: Temporal: Network: Console] private ( + final class Builder[F[_]: Temporal: Meter: Network: Console] private ( val connectionType: ConnectionType, val host: Host, val port: Port, @@ -701,7 +702,7 @@ object Session { }}} */ object Builder { - def apply[F[_]: Temporal: Network: Console]: Builder[F] = + def apply[F[_]: Temporal: Meter: Network: Console]: Builder[F] = new Builder[F]( connectionType = ConnectionType.TCP, host = host"localhost", @@ -748,7 +749,7 @@ object Session { * @group Constructors */ @deprecated("1.0.0-M11", "Use Session.Builder[F].pooled instead") - def pooled[F[_]: Temporal: Tracer: Network: Console]( + def pooled[F[_]: Temporal: Tracer: Meter: Network: Console]( host: String, port: Int = 5432, user: String, @@ -806,7 +807,7 @@ object Session { * @group Constructors */ @deprecated("1.0.0-M11", "Use Session.Builder[F].pooledExplicitTracer instead") - def pooledF[F[_]: Temporal: Network: Console]( + def pooledF[F[_]: Temporal: Meter: Network: Console]( host: String, port: Int = 5432, user: String, @@ -848,7 +849,7 @@ object Session { * @see pooled */ @deprecated("1.0.0-M11", "Use Session.Builder[F].single instead") - def single[F[_]: Temporal: Tracer: Network: Console]( + def single[F[_]: Temporal: Tracer: Meter: Network: Console]( host: String, port: Int = 5432, user: String, @@ -887,7 +888,7 @@ object Session { * @see pooledF */ @deprecated("1.0.0-M11", "Use Session.Builder[F].singleExplicitTracer instead") - def singleF[F[_]: Temporal: Network: Console]( + def singleF[F[_]: Temporal: Meter: Network: Console]( host: String, port: Int = 5432, user: String, diff --git a/modules/core/shared/src/main/scala/net/Protocol.scala b/modules/core/shared/src/main/scala/net/Protocol.scala index 4b57c7bfb..06ada804a 100644 --- a/modules/core/shared/src/main/scala/net/Protocol.scala +++ b/modules/core/shared/src/main/scala/net/Protocol.scala @@ -11,8 +11,10 @@ import fs2.concurrent.Signal import fs2.Stream import skunk.{ Command, Query, Statement, ~, Void, RedactionStrategy } import skunk.data._ -import skunk.util.{ Namer, Origin } +import skunk.util.{ Namer, Origin, Otel } import skunk.util.Typer +import org.typelevel.otel4s.metrics.Histogram +import org.typelevel.otel4s.metrics.Meter import org.typelevel.otel4s.trace.Tracer import fs2.io.net.Socket import skunk.net.protocol.Describe @@ -28,6 +30,13 @@ import skunk.net.protocol.Parse */ trait Protocol[F[_]] { + /** + * Histogram tracking the duration of database operations. + * + * @see [[https://opentelemetry.io/docs/specs/semconv/db/database-metrics/#metric-dbclientoperationduration `db.client.operation.duration`]] + */ + def opDuration: Histogram[F, Double] + /** * Unfiltered stream of all asynchronous channel notifications sent to this session. In general * this stream is consumed asynchronously and the associated fiber is canceled before the @@ -208,7 +217,7 @@ object Protocol { def execute(maxRows: Int): F[List[B] ~ Boolean] } - def apply[F[_]: Temporal: Tracer: Console]( + def apply[F[_]: Temporal: Tracer: Meter: Console]( debug: Boolean, nam: Namer[F], sockets: Resource[F, Socket[F]], @@ -223,73 +232,77 @@ object Protocol { p <- Resource.eval(fromMessageSocket(bms, nam, describeCache, parseCache, redactionStrategy)) } yield p - def fromMessageSocket[F[_]: Concurrent: Tracer]( + def fromMessageSocket[F[_]: Concurrent: Tracer: Meter]( bms: BufferedMessageSocket[F], nam: Namer[F], dc: Describe.Cache[F], pc: Parse.Cache[F], redactionStrategy: RedactionStrategy ): F[Protocol[F]] = - Exchange[F].map { ex => - new Protocol[F] { + Otel.OpDurationHistogram[F].flatMap { opDurationHistogram => + Exchange[F].map { ex => + new Protocol[F] { + + def opDuration: Histogram[F,Double] = opDurationHistogram - // Not super sure about this but it does make the sub-protocol implementations cleaner. - // We'll see how well it works out. - implicit val ms: MessageSocket[F] = bms - implicit val na: Namer[F] = nam - implicit val ExchangeF: protocol.Exchange[F] = ex + // Not super sure about this but it does make the sub-protocol implementations cleaner. + // We'll see how well it works out. + implicit val ms: MessageSocket[F] = bms + implicit val na: Namer[F] = nam + implicit val ExchangeF: protocol.Exchange[F] = ex - override def notifications(maxQueued: Int): Resource[F, Stream[F, Notification[String]]] = - bms.notifications(maxQueued) + override def notifications(maxQueued: Int): Resource[F, Stream[F, Notification[String]]] = + bms.notifications(maxQueued) - override def parameters: Signal[F, Map[String, String]] = - bms.parameters + override def parameters: Signal[F, Map[String, String]] = + bms.parameters - override def prepare[A](command: Command[A], ty: Typer): F[PreparedCommand[F, A]] = - protocol.Prepare[F](describeCache, parseCache, redactionStrategy).apply(command, ty) + override def prepare[A](command: Command[A], ty: Typer): F[PreparedCommand[F, A]] = + protocol.Prepare[F](describeCache, parseCache, redactionStrategy, opDuration).apply(command, ty) - override def prepare[A, B](query: Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]] = - protocol.Prepare[F](describeCache, parseCache, redactionStrategy).apply(query, ty) + override def prepare[A, B](query: Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]] = + protocol.Prepare[F](describeCache, parseCache, redactionStrategy, opDuration).apply(query, ty) - override def prepareR[A](command: Command[A], ty: Typer): Resource[F, Protocol.PreparedCommand[F, A]] = { - val acquire = Parse.Cache.empty[F](1).flatMap { pc => - protocol.Prepare[F](describeCache, pc, redactionStrategy).apply(command, ty) + override def prepareR[A](command: Command[A], ty: Typer): Resource[F, Protocol.PreparedCommand[F, A]] = { + val acquire = Parse.Cache.empty[F](1).flatMap { pc => + protocol.Prepare[F](describeCache, pc, redactionStrategy, opDuration).apply(command, ty) + } + Resource.make(acquire)(pc => protocol.Close[F](opDuration).apply(pc.id)) } - Resource.make(acquire)(pc => protocol.Close[F].apply(pc.id)) - } - override def prepareR[A, B](query: Query[A, B], ty: Typer): Resource[F, Protocol.PreparedQuery[F, A, B]] = { - val acquire = Parse.Cache.empty[F](1).flatMap { pc => - protocol.Prepare[F](describeCache, pc, redactionStrategy).apply(query, ty) + override def prepareR[A, B](query: Query[A, B], ty: Typer): Resource[F, Protocol.PreparedQuery[F, A, B]] = { + val acquire = Parse.Cache.empty[F](1).flatMap { pc => + protocol.Prepare[F](describeCache, pc, redactionStrategy, opDuration).apply(query, ty) + } + Resource.make(acquire)(pq => protocol.Close[F](opDuration).apply(pq.id)) } - Resource.make(acquire)(pq => protocol.Close[F].apply(pq.id)) - } - override def execute(command: Command[Void]): F[Completion] = - protocol.Query[F](redactionStrategy).apply(command) + override def execute(command: Command[Void]): F[Completion] = + protocol.Query[F](redactionStrategy, opDuration).apply(command) - override def execute[B](query: Query[Void, B], ty: Typer): F[List[B]] = - protocol.Query[F](redactionStrategy).apply(query, ty) + override def execute[B](query: Query[Void, B], ty: Typer): F[List[B]] = + protocol.Query[F](redactionStrategy, opDuration).apply(query, ty) - override def executeDiscard(statement: Statement[Void]): F[Unit] = protocol.Query[F](redactionStrategy).applyDiscard(statement) + override def executeDiscard(statement: Statement[Void]): F[Unit] = protocol.Query[F](redactionStrategy, opDuration).applyDiscard(statement) - override def startup(user: String, database: String, password: Option[String], parameters: Map[String, String]): F[Unit] = - protocol.Startup[F].apply(user, database, password, parameters) + override def startup(user: String, database: String, password: Option[String], parameters: Map[String, String]): F[Unit] = + protocol.Startup[F](opDuration).apply(user, database, password, parameters) - override def cleanup: F[Unit] = - parseCache.value.values.flatMap(_.traverse_(protocol.Close[F].apply)) + override def cleanup: F[Unit] = + parseCache.value.values.flatMap(_.traverse_(protocol.Close[F](opDuration).apply)) - override def transactionStatus: Signal[F, TransactionStatus] = - bms.transactionStatus + override def transactionStatus: Signal[F, TransactionStatus] = + bms.transactionStatus - override val describeCache: Describe.Cache[F] = - dc + override val describeCache: Describe.Cache[F] = + dc - override val parseCache: Parse.Cache[F] = - pc + override val parseCache: Parse.Cache[F] = + pc - override def closeEvictedPreparedStatements: F[Unit] = - pc.value.clearEvicted.flatMap(_.traverse_(protocol.Close[F].apply)) + override def closeEvictedPreparedStatements: F[Unit] = + pc.value.clearEvicted.flatMap(_.traverse_(protocol.Close[F](opDuration).apply)) + } } } diff --git a/modules/core/shared/src/main/scala/net/protocol/Bind.scala b/modules/core/shared/src/main/scala/net/protocol/Bind.scala index 5659ae307..33e905b4b 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Bind.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Bind.scala @@ -4,9 +4,9 @@ package skunk.net.protocol +import cats.effect.MonadCancel import cats.effect.Resource import cats.syntax.all._ -import cats.MonadError import skunk.exception.PostgresErrorException import skunk.net.message.{ Bind => BindMessage, Close => _, _ } import skunk.net.MessageSocket @@ -15,6 +15,7 @@ import skunk.util.{ Origin, Namer } import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.trace.{Span, Tracer} import skunk.RedactionStrategy +import org.typelevel.otel4s.metrics.Histogram trait Bind[F[_]] { @@ -29,8 +30,8 @@ trait Bind[F[_]] { object Bind { - def apply[F[_]: Exchange: MessageSocket: Namer: Tracer]( - implicit ev: MonadError[F, Throwable] + def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](opDuration: Histogram[F, Double])( + implicit ev: MonadCancel[F, Throwable] ): Bind[F] = new Bind[F] { @@ -41,7 +42,7 @@ object Bind { redactionStrategy: RedactionStrategy ): Resource[F, PortalId] = Resource.make { - exchange("bind") { (span: Span[F]) => + exchange("bind", opDuration) { (span: Span[F]) => for { pn <- nextName("portal").map(PortalId(_)) ea = statement.statement.encoder.encode(args) // encoded args @@ -70,7 +71,7 @@ object Bind { } } yield pn } - } { Close[F].apply } + } { Close[F](opDuration).apply } } diff --git a/modules/core/shared/src/main/scala/net/protocol/BindExecute.scala b/modules/core/shared/src/main/scala/net/protocol/BindExecute.scala index 8fe43324f..63456ecd1 100644 --- a/modules/core/shared/src/main/scala/net/protocol/BindExecute.scala +++ b/modules/core/shared/src/main/scala/net/protocol/BindExecute.scala @@ -20,6 +20,7 @@ import skunk.net.Protocol import skunk.data.Completion import skunk.net.protocol.exchange import cats.effect.kernel.Deferred +import org.typelevel.otel4s.metrics.Histogram trait BindExecute[F[_]] { @@ -41,7 +42,7 @@ trait BindExecute[F[_]] { object BindExecute { - def apply[F[_]: Exchange: MessageSocket: Namer: Tracer]( + def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](opDuration: Histogram[F, Double])( implicit ev: Concurrent[F] ): BindExecute[F] = new Unroll[F] with BindExecute[F] { @@ -133,7 +134,7 @@ object BindExecute { } Resource.make { - exchange("bind+execute"){ (span: Span[F]) => + exchange("bind+execute", opDuration){ (span: Span[F]) => for { pn <- preBind(span) _ <- send(ExecuteMessage(pn.value, 0)) @@ -144,7 +145,7 @@ object BindExecute { def execute: F[Completion] = c.pure } } - } { portal => Close[F].apply(portal.id)} + } { portal => Close[F](opDuration).apply(portal.id)} } @@ -158,7 +159,7 @@ object BindExecute { val (preBind, postBind) = bindExchange(statement, args, argsOrigin, redactionStrategy) Resource.eval(Deferred[F, Unit]).flatMap { prefetch => Resource.make { - exchange("bind+execute"){ (span: Span[F]) => + exchange("bind+execute", opDuration){ (span: Span[F]) => for { pn <- preBind(span) _ <- span.addAttributes( @@ -173,11 +174,11 @@ object BindExecute { def execute(maxRows: Int): F[List[B] ~ Boolean] = prefetch.tryGet.flatMap { case None => rs.pure <* prefetch.complete(()) - case Some(()) => Execute[F].apply(this, maxRows) + case Some(()) => Execute[F](opDuration).apply(this, maxRows) } } } - } { portal => Close[F].apply(portal.id)} + } { portal => Close[F](opDuration).apply(portal.id)} } } } diff --git a/modules/core/shared/src/main/scala/net/protocol/Close.scala b/modules/core/shared/src/main/scala/net/protocol/Close.scala index aa134185b..b7ae019c4 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Close.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Close.scala @@ -5,12 +5,13 @@ package skunk.net package protocol -import cats.FlatMap +import cats.effect.MonadCancelThrow import cats.syntax.all._ import skunk.net.message.{ Close => CloseMessage, Flush, CloseComplete } import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.trace.Span import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Histogram trait Close[F[_]] { def apply(portalId: Protocol.PortalId): F[Unit] @@ -19,17 +20,17 @@ trait Close[F[_]] { object Close { - def apply[F[_]: FlatMap: Exchange: MessageSocket: Tracer]: Close[F] = + def apply[F[_]: MonadCancelThrow: Exchange: MessageSocket: Tracer](opDuration: Histogram[F, Double]): Close[F] = new Close[F] { override def apply(portalId: Protocol.PortalId): F[Unit] = - exchange("close-portal") { (span: Span[F]) => + exchange("close-portal", opDuration) { (span: Span[F]) => span.addAttribute(Attribute("portal", portalId.value)) *> close(CloseMessage.portal(portalId.value)) } override def apply(statementId: Protocol.StatementId): F[Unit] = - exchange("close-statement") { (span: Span[F]) => + exchange("close-statement", opDuration) { (span: Span[F]) => span.addAttribute(Attribute("statement", statementId.value)) *> close(CloseMessage.statement(statementId.value)) } diff --git a/modules/core/shared/src/main/scala/net/protocol/Execute.scala b/modules/core/shared/src/main/scala/net/protocol/Execute.scala index c4c67c765..51fa3a38d 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Execute.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Execute.scala @@ -5,13 +5,14 @@ package skunk.net.protocol import cats.syntax.all._ -import cats.MonadError +import cats.effect.MonadCancel import skunk.~ import skunk.net.{ Protocol, MessageSocket } import skunk.net.message.{ Execute => ExecuteMessage, _ } import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.trace.Span import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Histogram trait Execute[F[_]] { def apply[A, B](portal: Protocol.QueryPortal[F, A, B], maxRows: Int): F[List[B] ~ Boolean] @@ -19,13 +20,13 @@ trait Execute[F[_]] { object Execute { - def apply[F[_]: Exchange: MessageSocket: Tracer]( - implicit ev: MonadError[F, Throwable] + def apply[F[_]: Exchange: MessageSocket: Tracer](opDuration: Histogram[F, Double])( + implicit ev: MonadCancel[F, Throwable] ): Execute[F] = new Unroll[F] with Execute[F] { override def apply[A, B](portal: Protocol.QueryPortal[F, A, B], maxRows: Int): F[List[B] ~ Boolean] = - exchange("execute") { (span: Span[F]) => + exchange("execute", opDuration) { (span: Span[F]) => for { _ <- span.addAttributes( Attribute("max-rows", maxRows.toLong), diff --git a/modules/core/shared/src/main/scala/net/protocol/ParseDescribe.scala b/modules/core/shared/src/main/scala/net/protocol/ParseDescribe.scala index 704c0a700..5a4462107 100644 --- a/modules/core/shared/src/main/scala/net/protocol/ParseDescribe.scala +++ b/modules/core/shared/src/main/scala/net/protocol/ParseDescribe.scala @@ -19,6 +19,8 @@ import skunk.Statement import skunk.exception.* import skunk.util.Namer import skunk.net.protocol.exchange +import org.typelevel.otel4s.metrics.Histogram +import cats.effect.MonadCancel trait ParseDescribe[F[_]] { def command[A](cmd: skunk.Command[A], ty: Typer): F[StatementId] @@ -27,8 +29,8 @@ trait ParseDescribe[F[_]] { object ParseDescribe { - def apply[F[_]: Exchange: MessageSocket: Tracer: Namer](cache: Describe.Cache[F], parseCache: Parse.Cache[F])( - implicit ev: MonadError[F, Throwable] + def apply[F[_]: Exchange: MessageSocket: Tracer: Namer](cache: Describe.Cache[F], parseCache: Parse.Cache[F], opDuration: Histogram[F, Double])( + implicit ev: MonadCancel[F, Throwable] ): ParseDescribe[F] = new ParseDescribe[F] { def syncAndFail(statement: Statement[_], info: Map[Char, String]): F[Unit] = @@ -109,7 +111,7 @@ object ParseDescribe { (pre, post) } - exchange("parse+describe") { (span: Span[F]) => + exchange("parse+describe", opDuration) { (span: Span[F]) => parseExchange(cmd, ty)(span).flatMap { case (preParse, postParse) => describeExchange(span).flatMap { case (preDesc, postDesc) => for { @@ -155,7 +157,7 @@ object ParseDescribe { } - exchange("parse+describe") { (span: Span[F]) => + exchange("parse+describe", opDuration) { (span: Span[F]) => parseExchange(query, ty)(span).flatMap { case (preParse, postParse) => describeExchange(span).flatMap { case (preDesc, postDesc) => for { diff --git a/modules/core/shared/src/main/scala/net/protocol/Prepare.scala b/modules/core/shared/src/main/scala/net/protocol/Prepare.scala index 69b29add8..39edde7f3 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Prepare.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Prepare.scala @@ -14,6 +14,7 @@ import skunk.net.Protocol.{ PreparedCommand, PreparedQuery, CommandPortal, Query import skunk.util.{ Origin, Namer } import skunk.util.Typer import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Histogram trait Prepare[F[_]] { def apply[A](command: skunk.Command[A], ty: Typer): F[PreparedCommand[F, A]] @@ -22,31 +23,31 @@ trait Prepare[F[_]] { object Prepare { - def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](describeCache: Describe.Cache[F], parseCache: Parse.Cache[F], redactionStrategy: RedactionStrategy)( + def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](describeCache: Describe.Cache[F], parseCache: Parse.Cache[F], redactionStrategy: RedactionStrategy, opDuration: Histogram[F, Double])( implicit ev: Concurrent[F] ): Prepare[F] = new Prepare[F] { override def apply[A](command: skunk.Command[A], ty: Typer): F[PreparedCommand[F, A]] = - ParseDescribe[F](describeCache, parseCache).command(command, ty).map { id => + ParseDescribe[F](describeCache, parseCache, opDuration).command(command, ty).map { id => new PreparedCommand[F, A](id, command) { pc => def bind(args: A, origin: Origin): Resource[F, CommandPortal[F, A]] = - BindExecute[F].command(this, args, origin, redactionStrategy) + BindExecute[F](opDuration).command(this, args, origin, redactionStrategy) } } override def apply[A, B](query: skunk.Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]] = - ParseDescribe[F](describeCache, parseCache).apply(query, ty).map { case (id, rd) => + ParseDescribe[F](describeCache, parseCache, opDuration).apply(query, ty).map { case (id, rd) => new PreparedQuery[F, A, B](id, query, rd) { pq => def bind(args: A, origin: Origin): Resource[F, QueryPortal[F, A, B]] = - Bind[F].apply(this, args, origin, redactionStrategy).map { + Bind[F](opDuration).apply(this, args, origin, redactionStrategy).map { new QueryPortal[F, A, B](_, pq, args, origin, redactionStrategy) { def execute(maxRows: Int): F[List[B] ~ Boolean] = - Execute[F].apply(this, maxRows) + Execute[F](opDuration).apply(this, maxRows) } } def bindSized(args: A, origin: Origin, maxRows: Int): Resource[F, QueryPortal[F, A, B]] = - BindExecute[F].query(this, args, origin, redactionStrategy, maxRows) + BindExecute[F](opDuration).query(this, args, origin, redactionStrategy, maxRows) } } diff --git a/modules/core/shared/src/main/scala/net/protocol/Query.scala b/modules/core/shared/src/main/scala/net/protocol/Query.scala index 329019502..2cd42fea6 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Query.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Query.scala @@ -4,7 +4,7 @@ package skunk.net.protocol -import cats.MonadError +import cats.effect.MonadCancel import cats.syntax.all._ import skunk.{ Command, Void, RedactionStrategy } import skunk.data.Completion @@ -16,6 +16,7 @@ import org.typelevel.otel4s.semconv.attributes.DbAttributes import org.typelevel.otel4s.trace.Span import org.typelevel.otel4s.trace.Tracer import skunk.Statement +import org.typelevel.otel4s.metrics.Histogram trait Query[F[_]] { def apply(command: Command[Void]): F[Completion] @@ -25,8 +26,8 @@ trait Query[F[_]] { object Query { - def apply[F[_]: Exchange: MessageSocket: Tracer](redactionStrategy: RedactionStrategy)( - implicit ev: MonadError[F, Throwable] + def apply[F[_]: Exchange: MessageSocket: Tracer](redactionStrategy: RedactionStrategy, opDuration: Histogram[F, Double])( + implicit ev: MonadCancel[F, Throwable] ): Query[F] = new Unroll[F] with Query[F] { @@ -70,7 +71,7 @@ object Query { } override def apply[B](query: skunk.Query[Void, B], ty: Typer): F[List[B]] = - exchange("query") { (span: Span[F]) => + exchange("query", opDuration) { (span: Span[F]) => span.addAttribute( DbAttributes.DbQueryText(query.sql) ) *> send(QueryMessage(query.sql)) *> flatExpect { @@ -162,7 +163,7 @@ object Query { } override def apply(command: Command[Void]): F[Completion] = - exchange("query") { (span: Span[F]) => + exchange("query", opDuration) { (span: Span[F]) => span.addAttribute( DbAttributes.DbQueryText(command.sql) ) *> send(QueryMessage(command.sql)) *> flatExpect { @@ -245,7 +246,7 @@ object Query { } override def applyDiscard(statement: Statement[Void]): F[Unit] = - exchange("query") { (span: Span[F]) => + exchange("query", opDuration) { (span: Span[F]) => span.addAttribute( DbAttributes.DbQueryText(statement.sql) ) *> send(QueryMessage(statement.sql)) *> finishUpDiscard(statement, None) diff --git a/modules/core/shared/src/main/scala/net/protocol/Startup.scala b/modules/core/shared/src/main/scala/net/protocol/Startup.scala index 598464f72..aab57729c 100644 --- a/modules/core/shared/src/main/scala/net/protocol/Startup.scala +++ b/modules/core/shared/src/main/scala/net/protocol/Startup.scala @@ -20,6 +20,8 @@ import skunk.exception.{ SkunkException, UnsupportedAuthenticationSchemeException } +import org.typelevel.otel4s.metrics.Histogram +import cats.effect.MonadCancel trait Startup[F[_]] { def apply(user: String, database: String, password: Option[String], parameters: Map[String, String]): F[Unit] @@ -27,12 +29,12 @@ trait Startup[F[_]] { object Startup extends StartupCompanionPlatform { - def apply[F[_]: Exchange: MessageSocket: Tracer]( - implicit ev: MonadError[F, Throwable] + def apply[F[_]: Exchange: MessageSocket: Tracer](opDuration: Histogram[F, Double])( + implicit ev: MonadCancel[F, Throwable] ): Startup[F] = new Startup[F] { override def apply(user: String, database: String, password: Option[String], parameters: Map[String, String]): F[Unit] = - exchange("startup") { (span: Span[F]) => + exchange("startup", opDuration) { (span: Span[F]) => val sm = StartupMessage(user, database, parameters) for { _ <- span.addAttributes( diff --git a/modules/core/shared/src/main/scala/net/protocol/package.scala b/modules/core/shared/src/main/scala/net/protocol/package.scala index 2445bbc66..e34b9dba1 100644 --- a/modules/core/shared/src/main/scala/net/protocol/package.scala +++ b/modules/core/shared/src/main/scala/net/protocol/package.scala @@ -11,18 +11,25 @@ import skunk.util.Otel import org.typelevel.otel4s.trace.Span import org.typelevel.otel4s.trace.Tracer import org.typelevel.otel4s.trace.SpanKind +import org.typelevel.otel4s.metrics.Histogram +import java.util.concurrent.TimeUnit +import cats.effect.MonadCancel package object protocol { - def exchange[F[_]: Tracer, A](label: String)(f: Span[F] => F[A])( - implicit exchange: Exchange[F] + def exchange[F[_]: Tracer, A](label: String, opDuration: Histogram[F, Double])(f: Span[F] => F[A])( + implicit exchange: Exchange[F], ev: MonadCancel[F, Throwable] ): F[A] = Tracer[F].spanBuilder(label) .withSpanKind(SpanKind.Client) .addAttribute(Otel.DbSystemName) .withFinalizationStrategy(Otel.PostgresStrategy) .build - .use(span => exchange(f(span))) + .use{span => + opDuration.recordDuration(TimeUnit.SECONDS, Otel.opDurationAttributes(_)).surround { + exchange(f(span)) + } + } def receive[F[_]](implicit ev: MessageSocket[F]): F[BackendMessage] = ev.receive diff --git a/modules/core/shared/src/main/scala/util/Otel.scala b/modules/core/shared/src/main/scala/util/Otel.scala index abb7ca782..a49e53416 100644 --- a/modules/core/shared/src/main/scala/util/Otel.scala +++ b/modules/core/shared/src/main/scala/util/Otel.scala @@ -11,10 +11,18 @@ import org.typelevel.otel4s.semconv.attributes.DbAttributes import org.typelevel.otel4s.trace.SpanFinalizer import org.typelevel.otel4s.trace.StatusCode import skunk.exception.PostgresErrorException +import org.typelevel.otel4s.semconv.attributes.ErrorAttributes +import org.typelevel.otel4s.metrics.Meter +import org.typelevel.otel4s.semconv.metrics.DbMetrics +import org.typelevel.otel4s.metrics.BucketBoundaries +import org.typelevel.otel4s.metrics.Histogram object Otel { - val DbSystemName = DbAttributes.DbSystemName("postgresql") + // TODO the current snapshot used does not have the new `From` instance, + // and I cannot update the otel4s dependency since skunk depends on SN 0.5 + // only available in the snapshot + val DbSystemName = DbAttributes.DbSystemName(DbAttributes.DbSystemNameValue.Postgresql.value) // Similar to the default reportAbnormal strategy but records some // postgresql specific attributes in case it is a postgres error @@ -35,6 +43,36 @@ object Otel { case Resource.ExitCase.Canceled => SpanFinalizer.setStatus(StatusCode.Error, "canceled") + + } + + private val opDurationBoundaries = BucketBoundaries(0.001d, 0.005d, 0.01d, 0.05d, 0.1d, 0.5d, 1d, 5d, 10d) + + def OpDurationHistogram[F[_]: Meter]: F[Histogram[F, Double]] = + DbMetrics.ClientOperationDuration.create[F, Double](opDurationBoundaries) + + def opDurationAttributes(exitCase: Resource.ExitCase): Attributes = { + val builder = Attributes.newBuilder + + builder += DbSystemName + + exitCase match { + case Resource.ExitCase.Succeeded => + + case Resource.ExitCase.Errored(e: PostgresErrorException) => + builder += ErrorAttributes.ErrorType(e.getClass().getName()) + builder += DbAttributes.DbResponseStatusCode(e.code) + builder ++= DbAttributes.DbCollectionName.maybe(e.tableName) + builder ++= DbAttributes.DbNamespace.maybe(e.schemaName) + + case Resource.ExitCase.Errored(e) => + builder += ErrorAttributes.ErrorType(e.getClass().getName()) + + case Resource.ExitCase.Canceled => + + } + + builder.result() } } diff --git a/modules/example/src/main/scala/AppliedFragments.scala b/modules/example/src/main/scala/AppliedFragments.scala index 4dd0efb8c..243c96108 100644 --- a/modules/example/src/main/scala/AppliedFragments.scala +++ b/modules/example/src/main/scala/AppliedFragments.scala @@ -7,6 +7,7 @@ package example import cats.effect._ import cats.implicits._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter import skunk._ import skunk.implicits._ import skunk.codec.all._ @@ -14,6 +15,7 @@ import skunk.codec.all._ object AppliedFragments extends IOApp { implicit val tracer: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop val session: Resource[IO, Session[IO]] = Session.Builder[IO] diff --git a/modules/example/src/main/scala/Channel.scala b/modules/example/src/main/scala/Channel.scala index 1f1ad9640..039bea2be 100644 --- a/modules/example/src/main/scala/Channel.scala +++ b/modules/example/src/main/scala/Channel.scala @@ -8,10 +8,12 @@ import cats.effect._ import skunk._ import skunk.implicits._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter object Channel extends IOApp { implicit val tracer: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop val session: Resource[IO, Session[IO]] = Session.Builder[IO] diff --git a/modules/example/src/main/scala/Error.scala b/modules/example/src/main/scala/Error.scala index 8c76249bf..c6b7547af 100644 --- a/modules/example/src/main/scala/Error.scala +++ b/modules/example/src/main/scala/Error.scala @@ -10,10 +10,12 @@ import skunk._ import skunk.implicits._ import skunk.codec.all._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter object Error extends IOApp { implicit val trace: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop val session: Resource[IO, Session[IO]] = Session.Builder[IO] diff --git a/modules/example/src/main/scala/Join.scala b/modules/example/src/main/scala/Join.scala index fa8507de3..18619dd5b 100644 --- a/modules/example/src/main/scala/Join.scala +++ b/modules/example/src/main/scala/Join.scala @@ -11,10 +11,12 @@ import skunk._ import skunk.implicits._ import skunk.codec.all._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter object Join extends IOApp with StreamOps { implicit val tracer: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop val session: Resource[IO, Session[IO]] = Session.Builder[IO] diff --git a/modules/example/src/main/scala/Main.scala b/modules/example/src/main/scala/Main.scala index d67fdaa5d..8b61b6b34 100644 --- a/modules/example/src/main/scala/Main.scala +++ b/modules/example/src/main/scala/Main.scala @@ -11,12 +11,14 @@ import cats.effect._ import cats.syntax.all._ import fs2._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter // This does a lot of stuff and is mostly just to test features as they're being added. This class // will probably go away. object Main extends IOApp { implicit val trace: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop case class Country(name: String, code: String, indepyear: Option[Short], population: Int) diff --git a/modules/example/src/main/scala/Math1.scala b/modules/example/src/main/scala/Math1.scala index b1eaf14b9..ee07be0ad 100644 --- a/modules/example/src/main/scala/Math1.scala +++ b/modules/example/src/main/scala/Math1.scala @@ -10,10 +10,12 @@ import skunk._ import skunk.implicits._ import skunk.codec.numeric.{ int4, float8 } import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter object Math1 extends IOApp { implicit val tracer: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop val session: Resource[IO, Session[IO]] = Session.Builder[IO] diff --git a/modules/example/src/main/scala/Math2.scala b/modules/example/src/main/scala/Math2.scala index 30ba7383d..4d0218956 100644 --- a/modules/example/src/main/scala/Math2.scala +++ b/modules/example/src/main/scala/Math2.scala @@ -11,10 +11,12 @@ import skunk._ import skunk.implicits._ import skunk.codec.numeric.{ int4, float8 } import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter object Math2 extends IOApp { implicit val tracer: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop val session: Resource[IO, Session[IO]] = Session.Builder[IO] diff --git a/modules/example/src/main/scala/Minimal1.scala b/modules/example/src/main/scala/Minimal1.scala index db966cd4b..c687b5938 100644 --- a/modules/example/src/main/scala/Minimal1.scala +++ b/modules/example/src/main/scala/Minimal1.scala @@ -9,10 +9,12 @@ import skunk._ import skunk.implicits._ import skunk.codec.all._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter object Minimal1 extends IOApp { implicit val tracer: Tracer[IO] = Tracer.noop[IO] + implicit val mterj: Meter[IO] = Meter.noop[IO] val session: Resource[IO, Session[IO]] = Session.Builder[IO] diff --git a/modules/example/src/main/scala/Minimal2.scala b/modules/example/src/main/scala/Minimal2.scala index d5497948b..528cf7554 100644 --- a/modules/example/src/main/scala/Minimal2.scala +++ b/modules/example/src/main/scala/Minimal2.scala @@ -12,6 +12,7 @@ import skunk.implicits._ import skunk.codec.all._ import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.oteljava.OtelJava +import org.typelevel.otel4s.metrics.Meter import org.typelevel.otel4s.trace.Tracer import fs2.io.net.Network import cats.effect.std.Console @@ -19,7 +20,7 @@ import cats.effect.std.Console object Minimal2 extends IOApp { - def session[F[_]: Temporal: Tracer: Console: Network]: Resource[F, Session[F]] = + def session[F[_]: Temporal: Tracer: Meter: Console: Network]: Resource[F, Session[F]] = Session.Builder[F] .withUserAndPassword("jimmy", "banana") .withDatabase("world") @@ -46,17 +47,21 @@ object Minimal2 extends IOApp { } } - def runF[F[_]: Temporal: Tracer: Console: Network]: F[ExitCode] = + def runF[F[_]: Temporal: Tracer: Meter: Console: Network]: F[ExitCode] = session.use { s => List("A%", "B%").parTraverse(p => lookup(p, s)) } as ExitCode.Success - def getTracer[F[_]: Async: LiftIO]: Resource[F, Tracer[F]] = + def getTelemetry[F[_]: Async: LiftIO]: Resource[F, (Tracer[F], Meter[F])] = OtelJava.autoConfigured[F]() - .evalMap(_.tracerProvider.tracer("skunk-http4s-example").get) + .evalMap{ otel => + (otel.tracerProvider.tracer("skunk-http4s-example").get, otel.meterProvider.meter("skunk-http4s-example").get).tupled + } def run(args: List[String]): IO[ExitCode] = - getTracer[IO].use { implicit T => + getTelemetry[IO].use { case (tracer, meter) => + implicit val M = meter + implicit val T = tracer T.span("root").surround { runF[IO] *> runF[IO] } diff --git a/modules/example/src/main/scala/Minimal3.scala b/modules/example/src/main/scala/Minimal3.scala index fa29af66a..3d5eb309f 100644 --- a/modules/example/src/main/scala/Minimal3.scala +++ b/modules/example/src/main/scala/Minimal3.scala @@ -11,10 +11,12 @@ import skunk._ import skunk.implicits._ import skunk.codec.all._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter object Minimal3 extends IOApp { implicit val trace: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop val session: Resource[IO, Session[IO]] = Session.Builder[IO] diff --git a/modules/example/src/main/scala/Transaction.scala b/modules/example/src/main/scala/Transaction.scala index 4958cb2b0..74dd10ab0 100644 --- a/modules/example/src/main/scala/Transaction.scala +++ b/modules/example/src/main/scala/Transaction.scala @@ -10,10 +10,12 @@ import skunk._, skunk.implicits._, skunk.codec.all.int4 import org.typelevel.otel4s.trace.Tracer import cats.effect.std.Console import fs2.io.net.Network +import org.typelevel.otel4s.metrics.Meter object Transaction extends IOApp { implicit def tracer[F[_]: MonadCancelThrow]: Tracer[F] = Tracer.noop + implicit def meter[F[_]: MonadCancelThrow]: Meter[F] = Meter.noop def session[F[_]: Temporal: Console: Network]: Resource[F, Session[F]] = Session.Builder[F] diff --git a/modules/example/src/main/scala/Values.scala b/modules/example/src/main/scala/Values.scala index 63617aa81..f5124743b 100644 --- a/modules/example/src/main/scala/Values.scala +++ b/modules/example/src/main/scala/Values.scala @@ -10,11 +10,13 @@ import org.typelevel.otel4s.trace.Tracer import skunk._ import skunk.implicits._ import skunk.codec.all._ +import org.typelevel.otel4s.metrics.Meter /** Round-trip a list of values. You can use this pattern to do bulk-inserts. */ object Values extends IOApp { implicit val tracer: Tracer[IO] = Tracer.noop + implicit val mter: Meter[IO] = Meter.noop val session: Resource[IO, Session[IO]] = Session.Builder[IO] diff --git a/modules/tests/shared/src/main/scala/ffstest/FFramework.scala b/modules/tests/shared/src/main/scala/ffstest/FFramework.scala index f0e4d80c1..6776956bd 100644 --- a/modules/tests/shared/src/main/scala/ffstest/FFramework.scala +++ b/modules/tests/shared/src/main/scala/ffstest/FFramework.scala @@ -15,9 +15,12 @@ import skunk.exception._ import org.typelevel.twiddles._ import org.typelevel.otel4s.sdk.trace.SdkTraces import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter trait FTest extends CatsEffectSuite with FTestPlatform { + implicit val meter: Meter[IO] = Meter.noop + private def withinSpan[A](name: String)(body: Tracer[IO] => IO[A]): IO[A] = if (PlatformCompat.isNative) body(Tracer.Implicits.noop) // FIXME: With auto-configured traces, PoolTest fails on Native From 8533a0ce8c6b4d80638f133edfc79148dcaacfd8 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 23 Jan 2026 18:25:44 +0100 Subject: [PATCH 3/7] Remove unused import --- .../shared/src/main/scala/net/protocol/ParseDescribe.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/core/shared/src/main/scala/net/protocol/ParseDescribe.scala b/modules/core/shared/src/main/scala/net/protocol/ParseDescribe.scala index 5a4462107..380a9fc5f 100644 --- a/modules/core/shared/src/main/scala/net/protocol/ParseDescribe.scala +++ b/modules/core/shared/src/main/scala/net/protocol/ParseDescribe.scala @@ -14,13 +14,12 @@ import org.typelevel.otel4s.Attribute import org.typelevel.otel4s.trace.Span import org.typelevel.otel4s.trace.Tracer import cats.data.OptionT -import cats.MonadError +import cats.effect.MonadCancel import skunk.Statement import skunk.exception.* import skunk.util.Namer import skunk.net.protocol.exchange import org.typelevel.otel4s.metrics.Histogram -import cats.effect.MonadCancel trait ParseDescribe[F[_]] { def command[A](cmd: skunk.Command[A], ty: Typer): F[StatementId] From 42c1695eeee9e2b52c42cb5fe8f16d7a23441e49 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 23 Jan 2026 18:25:53 +0100 Subject: [PATCH 4/7] Cleanup unnecessary public histogram field --- .../core/shared/src/main/scala/net/Protocol.scala | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/modules/core/shared/src/main/scala/net/Protocol.scala b/modules/core/shared/src/main/scala/net/Protocol.scala index 06ada804a..b703d6583 100644 --- a/modules/core/shared/src/main/scala/net/Protocol.scala +++ b/modules/core/shared/src/main/scala/net/Protocol.scala @@ -13,7 +13,6 @@ import skunk.{ Command, Query, Statement, ~, Void, RedactionStrategy } import skunk.data._ import skunk.util.{ Namer, Origin, Otel } import skunk.util.Typer -import org.typelevel.otel4s.metrics.Histogram import org.typelevel.otel4s.metrics.Meter import org.typelevel.otel4s.trace.Tracer import fs2.io.net.Socket @@ -30,13 +29,6 @@ import skunk.net.protocol.Parse */ trait Protocol[F[_]] { - /** - * Histogram tracking the duration of database operations. - * - * @see [[https://opentelemetry.io/docs/specs/semconv/db/database-metrics/#metric-dbclientoperationduration `db.client.operation.duration`]] - */ - def opDuration: Histogram[F, Double] - /** * Unfiltered stream of all asynchronous channel notifications sent to this session. In general * this stream is consumed asynchronously and the associated fiber is canceled before the @@ -239,12 +231,10 @@ object Protocol { pc: Parse.Cache[F], redactionStrategy: RedactionStrategy ): F[Protocol[F]] = - Otel.OpDurationHistogram[F].flatMap { opDurationHistogram => + Otel.OpDurationHistogram[F].flatMap { opDuration => Exchange[F].map { ex => new Protocol[F] { - def opDuration: Histogram[F,Double] = opDurationHistogram - // Not super sure about this but it does make the sub-protocol implementations cleaner. // We'll see how well it works out. implicit val ms: MessageSocket[F] = bms From b1e5558a5ae0258c90e830de5a129c792889dc4b Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 23 Jan 2026 18:33:47 +0100 Subject: [PATCH 5/7] Update documentation with noop meters --- modules/docs/src/main/laika/reference/Sessions.md | 1 + modules/docs/src/main/laika/tutorial/Command.md | 4 ++++ modules/docs/src/main/laika/tutorial/Query.md | 3 +++ modules/docs/src/main/laika/tutorial/Setup.md | 4 +++- modules/docs/src/main/laika/tutorial/Transactions.md | 2 ++ 5 files changed, 13 insertions(+), 1 deletion(-) diff --git a/modules/docs/src/main/laika/reference/Sessions.md b/modules/docs/src/main/laika/reference/Sessions.md index c6518833b..e5efb3f88 100644 --- a/modules/docs/src/main/laika/reference/Sessions.md +++ b/modules/docs/src/main/laika/reference/Sessions.md @@ -1,6 +1,7 @@ ```scala mdoc:invisible import cats.effect._, skunk._ implicit def dummyTrace: org.typelevel.otel4s.trace.Tracer[IO] = ??? +implicit def dummyMeter: org.typelevel.otel4s.metrics.Meter[IO] = ??? ``` # Sessions diff --git a/modules/docs/src/main/laika/tutorial/Command.md b/modules/docs/src/main/laika/tutorial/Command.md index c3a1b4552..9239bff0a 100644 --- a/modules/docs/src/main/laika/tutorial/Command.md +++ b/modules/docs/src/main/laika/tutorial/Command.md @@ -5,9 +5,11 @@ import skunk._ import skunk.implicits._ import skunk.codec.all._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter import fs2.Stream val s: Session[IO] = null implicit val tracer: Tracer[IO] = Tracer.noop +implicit val meter: Meter[IO] = Meter.noop ``` # Commands @@ -198,6 +200,7 @@ import cats.Monad import cats.effect._ import cats.syntax.all._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter import skunk._ import skunk.codec.all._ import skunk.implicits._ @@ -246,6 +249,7 @@ object PetService { object CommandExample extends IOApp { implicit val tracer: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop // a source of sessions val session: Resource[IO, Session[IO]] = diff --git a/modules/docs/src/main/laika/tutorial/Query.md b/modules/docs/src/main/laika/tutorial/Query.md index 2d2f9a6fb..422798e8a 100644 --- a/modules/docs/src/main/laika/tutorial/Query.md +++ b/modules/docs/src/main/laika/tutorial/Query.md @@ -253,6 +253,7 @@ import skunk.implicits._ import skunk.codec.all._ import java.time.OffsetDateTime implicit def dummyTrace: org.typelevel.otel4s.trace.Tracer[IO] = org.typelevel.otel4s.trace.Tracer.noop +implicit def dummyMeter: org.typelevel.otel4s.metrics.Meter[IO] = org.typelevel.otel4s.metrics.Meter.noop object QueryExample extends IOApp { @@ -328,6 +329,7 @@ import skunk.implicits._ import skunk.codec.all._ import java.time.OffsetDateTime import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter import fs2.Stream import cats.Applicative @@ -371,6 +373,7 @@ object Service { object QueryExample2 extends IOApp { implicit val tracer: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop // a source of sessions val session: Resource[IO, Session[IO]] = diff --git a/modules/docs/src/main/laika/tutorial/Setup.md b/modules/docs/src/main/laika/tutorial/Setup.md index 31d29b204..8446e3802 100644 --- a/modules/docs/src/main/laika/tutorial/Setup.md +++ b/modules/docs/src/main/laika/tutorial/Setup.md @@ -36,10 +36,12 @@ import skunk._ import skunk.implicits._ import skunk.codec.all._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter object Hello extends IOApp { implicit val tracer: Tracer[IO] = Tracer.noop // (1) + implicit val meter: Meter[IO] = Meter.noop val session: Resource[IO, Session[IO]] = Session.Builder[IO] // (2) @@ -62,7 +64,7 @@ object Hello extends IOApp { Let's examine the code above. -- At ① we define the no-op `Tracer`, which allows us to run Skunk programs with execution tracing disabled. We will revisit [Tracing](Tracing.md) in a later section. +- At ① we define the no-op `Tracer` and `Meter`, which allows us to run Skunk programs with execution tracing and metrics disabled. We will revisit [Tracing](Tracing.md) in a later section. - At ② we define a [Resource](https://typelevel.org/cats-effect/datatypes/resource.html) that yields un-pooled [Session](../reference/Sessions.md) values and ensures that they are closed after use. We specify the host, port, user, database, and password (note: we didn't actually need to specify the host and port as the default host is localhost and the default port is 5432 -- see [Session](../reference/Sessions.md) for information on ther connection options). - At ③ we `use` the resource, specifying a block to execute during the `Session`'s lifetime. No matter how the block terminates (success, failure, cancellation) the `Session` will be closed properly. - At ④ we use the [sql interpolator](../reference/Fragments.md) to construct a `Query` that selects a single column of schema type `date` (yielding `d`, a value of type `java.time.LocalDate`), then we ask the session to execute it, expecting a *unique* value back; i.e., exactly one row. diff --git a/modules/docs/src/main/laika/tutorial/Transactions.md b/modules/docs/src/main/laika/tutorial/Transactions.md index 2e8ed4684..5dc991d70 100644 --- a/modules/docs/src/main/laika/tutorial/Transactions.md +++ b/modules/docs/src/main/laika/tutorial/Transactions.md @@ -96,6 +96,7 @@ Here is a complete program listing that demonstrates our knowledge thus far. import cats.effect._ import cats.implicits._ import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Meter import skunk._ import skunk.codec.all._ import skunk.implicits._ @@ -156,6 +157,7 @@ object PetService { object TransactionExample extends IOApp { implicit val tracer: Tracer[IO] = Tracer.noop + implicit val meter: Meter[IO] = Meter.noop // a source of sessions val session: Resource[IO, Session[IO]] = From c52e9fb7deeee623681e8df0fbd2ca44324f5903 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 23 Jan 2026 18:55:11 +0100 Subject: [PATCH 6/7] Rename the `Tracing` section into a more general `Telemetry` one --- modules/docs/src/main/laika/tutorial/Setup.md | 2 +- .../docs/src/main/laika/tutorial/Telemetry.md | 29 +++++++++++++++++++ .../docs/src/main/laika/tutorial/Tracing.md | 19 ------------ .../src/main/laika/tutorial/directory.conf | 2 +- 4 files changed, 31 insertions(+), 21 deletions(-) create mode 100644 modules/docs/src/main/laika/tutorial/Telemetry.md delete mode 100644 modules/docs/src/main/laika/tutorial/Tracing.md diff --git a/modules/docs/src/main/laika/tutorial/Setup.md b/modules/docs/src/main/laika/tutorial/Setup.md index 8446e3802..6663c5a00 100644 --- a/modules/docs/src/main/laika/tutorial/Setup.md +++ b/modules/docs/src/main/laika/tutorial/Setup.md @@ -64,7 +64,7 @@ object Hello extends IOApp { Let's examine the code above. -- At ① we define the no-op `Tracer` and `Meter`, which allows us to run Skunk programs with execution tracing and metrics disabled. We will revisit [Tracing](Tracing.md) in a later section. +- At ① we define the no-op `Tracer` and `Meter`, which allows us to run Skunk programs with execution tracing and metrics disabled. We will revisit [Telemetry](Telemetry.md) in a later section. - At ② we define a [Resource](https://typelevel.org/cats-effect/datatypes/resource.html) that yields un-pooled [Session](../reference/Sessions.md) values and ensures that they are closed after use. We specify the host, port, user, database, and password (note: we didn't actually need to specify the host and port as the default host is localhost and the default port is 5432 -- see [Session](../reference/Sessions.md) for information on ther connection options). - At ③ we `use` the resource, specifying a block to execute during the `Session`'s lifetime. No matter how the block terminates (success, failure, cancellation) the `Session` will be closed properly. - At ④ we use the [sql interpolator](../reference/Fragments.md) to construct a `Query` that selects a single column of schema type `date` (yielding `d`, a value of type `java.time.LocalDate`), then we ask the session to execute it, expecting a *unique* value back; i.e., exactly one row. diff --git a/modules/docs/src/main/laika/tutorial/Telemetry.md b/modules/docs/src/main/laika/tutorial/Telemetry.md new file mode 100644 index 000000000..7c3c71986 --- /dev/null +++ b/modules/docs/src/main/laika/tutorial/Telemetry.md @@ -0,0 +1,29 @@ +# Telemetry + +Skunk uses [OpenTelemtry](https://opentelemetry.io/), using the [otel4s](https://github.com/typelevel/otel4s) implementation. + +## Metrics + +Skunk provides the [`db.client.operation.duration`](https://opentelemetry.io/docs/specs/semconv/db/database-metrics/#metric-dbclientoperationduration) histogram, that records the duration of all operations interacting with the postgresql server. + +### I Don't Care + +If you don't care about metrics you can use the **no-op meter** to disable metrics entirely (`org.typelevel.otel4s.metrics.Meter`). + +## Tracing + +### I Don't Care + +If you don't care about tracing you have two choices: + +- Use the **no-op tracer** to disable tracing entirely (`org.typelevel.otel4s.Tracer.noop`). +- Use the **log tracer** to log completed traces to a [log4cats](https://typelevel.org/log4cats/) logger. + +### Tracing with Jaeger + +Easy because there's a docker container. + +### Tracing Example + +... program that adds its own events to the trace + diff --git a/modules/docs/src/main/laika/tutorial/Tracing.md b/modules/docs/src/main/laika/tutorial/Tracing.md deleted file mode 100644 index 942e97240..000000000 --- a/modules/docs/src/main/laika/tutorial/Tracing.md +++ /dev/null @@ -1,19 +0,0 @@ -# Tracing - -Skunk uses structured tracing instead of logging, using the [otel4s](https://github.com/typelevel/otel4s) tracing library. - -## I Don't Care - -If you don't care about tracing you have two choices: - -- Use the **no-op tracer** to disable tracing entirely (`org.typelevel.otel4s.Tracer.noop`). -- Use the **log tracer** to log completed traces to a [log4cats](https://typelevel.org/log4cats/) logger. - -## Tracing with Jaeger - -Easy because there's a docker container. - -## Tracing Example - -... program that adds its own events to the trace - diff --git a/modules/docs/src/main/laika/tutorial/directory.conf b/modules/docs/src/main/laika/tutorial/directory.conf index e99bbf201..7eefce147 100644 --- a/modules/docs/src/main/laika/tutorial/directory.conf +++ b/modules/docs/src/main/laika/tutorial/directory.conf @@ -6,5 +6,5 @@ laika.navigationOrder = [ Command.md Transactions.md Channels.md - Tracing.md + Telemetry.md ] From 248b3a5d2847aee8c5b47d2f98cb8a52d11b3391 Mon Sep 17 00:00:00 2001 From: Lucas Satabin Date: Fri, 23 Jan 2026 19:08:20 +0100 Subject: [PATCH 7/7] Instantiate meter only once --- .../core/shared/src/main/scala/Session.scala | 16 ++- .../shared/src/main/scala/net/Protocol.scala | 104 +++++++++--------- .../src/test/scala/simulation/SimTest.scala | 3 +- 3 files changed, 64 insertions(+), 59 deletions(-) diff --git a/modules/core/shared/src/main/scala/Session.scala b/modules/core/shared/src/main/scala/Session.scala index e6ffbf6f3..15d0dcdba 100644 --- a/modules/core/shared/src/main/scala/Session.scala +++ b/modules/core/shared/src/main/scala/Session.scala @@ -13,6 +13,8 @@ import fs2.concurrent.Signal import fs2.io.net.{ Network, Socket, SocketOption } import fs2.Pipe import fs2.Stream +import org.typelevel.otel4s.metrics.Meter +import org.typelevel.otel4s.metrics.Histogram import org.typelevel.otel4s.trace.Tracer import skunk.codec.all.bool import skunk.data._ @@ -23,7 +25,6 @@ import skunk.net.SSLNegotiation import skunk.net.protocol.Describe import scala.concurrent.duration.Duration import skunk.net.protocol.Parse -import org.typelevel.otel4s.metrics.Meter /** * Represents a live connection to a Postgres database. Operations provided here are safe to use @@ -646,13 +647,15 @@ object Session { for { dc <- Resource.eval(Describe.Cache.empty[F](commandCacheSize, queryCacheSize)) sslOp <- ssl.toSSLNegotiationOptions(if (debug) logger.some else none) - pool <- Pool.ofF({implicit T: Tracer[F] => sessions(sslOp, dc)}, max)(Recyclers.full) + opDuration <- Resource.eval(Otel.OpDurationHistogram[F]) + pool <- Pool.ofF({implicit T: Tracer[F] => sessions(sslOp, dc, opDuration)}, max)(Recyclers.full) } yield pool } private def sessions( sslOptions: Option[SSLNegotiation.Options[F]], - describeCache: Describe.Cache[F] + describeCache: Describe.Cache[F], + opDuration: Histogram[F, Double] )(implicit T: Tracer[F]): Resource[F, Session[F]] = { val sockets = connectionType match { case ConnectionType.TCP => @@ -664,18 +667,19 @@ object Session { val filteredSocketOptions = socketOptions.filter(o => o.key != SocketOption.NoDelay) Network[F].connect(address, filteredSocketOptions) } - fromSockets(sockets, sslOptions, describeCache) + fromSockets(sockets, sslOptions, describeCache, opDuration) } private def fromSockets( sockets: Resource[F, Socket[F]], sslOptions: Option[SSLNegotiation.Options[F]], - describeCache: Describe.Cache[F] + describeCache: Describe.Cache[F], + opDuration: Histogram[F, Double] )(implicit T: Tracer[F]): Resource[F, Session[F]] = for { namer <- Resource.eval(Namer[F]) pc <- Resource.eval(Parse.Cache.empty[F](parseCacheSize)) - proto <- Protocol[F](debug, namer, sockets, sslOptions, describeCache, pc, readTimeout, redactionStrategy) + proto <- Protocol[F](debug, namer, sockets, sslOptions, describeCache, pc, readTimeout, redactionStrategy, opDuration) creds <- Resource.eval(credentials) _ <- Resource.eval(proto.startup(creds.user, database.getOrElse(creds.user), creds.password, connectionParameters)) sess <- Resource.make(fromProtocol(proto, namer, typingStrategy, redactionStrategy))(_ => proto.cleanup) diff --git a/modules/core/shared/src/main/scala/net/Protocol.scala b/modules/core/shared/src/main/scala/net/Protocol.scala index b703d6583..b880e43ae 100644 --- a/modules/core/shared/src/main/scala/net/Protocol.scala +++ b/modules/core/shared/src/main/scala/net/Protocol.scala @@ -11,15 +11,15 @@ import fs2.concurrent.Signal import fs2.Stream import skunk.{ Command, Query, Statement, ~, Void, RedactionStrategy } import skunk.data._ -import skunk.util.{ Namer, Origin, Otel } +import skunk.util.{ Namer, Origin } import skunk.util.Typer -import org.typelevel.otel4s.metrics.Meter import org.typelevel.otel4s.trace.Tracer import fs2.io.net.Socket import skunk.net.protocol.Describe import scala.concurrent.duration.Duration import skunk.net.protocol.Exchange import skunk.net.protocol.Parse +import org.typelevel.otel4s.metrics.Histogram /** * Interface for a Postgres database, expressed through high-level operations that rely on exchange @@ -209,7 +209,7 @@ object Protocol { def execute(maxRows: Int): F[List[B] ~ Boolean] } - def apply[F[_]: Temporal: Tracer: Meter: Console]( + def apply[F[_]: Temporal: Tracer: Console]( debug: Boolean, nam: Namer[F], sockets: Resource[F, Socket[F]], @@ -217,82 +217,82 @@ object Protocol { describeCache: Describe.Cache[F], parseCache: Parse.Cache[F], readTimeout: Duration, - redactionStrategy: RedactionStrategy + redactionStrategy: RedactionStrategy, + opDuration: Histogram[F, Double] ): Resource[F, Protocol[F]] = for { bms <- BufferedMessageSocket[F](256, debug, sockets, sslOptions, readTimeout) // TODO: should we expose the queue size? - p <- Resource.eval(fromMessageSocket(bms, nam, describeCache, parseCache, redactionStrategy)) + p <- Resource.eval(fromMessageSocket(bms, nam, describeCache, parseCache, redactionStrategy, opDuration)) } yield p - def fromMessageSocket[F[_]: Concurrent: Tracer: Meter]( + def fromMessageSocket[F[_]: Concurrent: Tracer]( bms: BufferedMessageSocket[F], nam: Namer[F], dc: Describe.Cache[F], pc: Parse.Cache[F], - redactionStrategy: RedactionStrategy + redactionStrategy: RedactionStrategy, + opDuration: Histogram[F, Double] ): F[Protocol[F]] = - Otel.OpDurationHistogram[F].flatMap { opDuration => - Exchange[F].map { ex => - new Protocol[F] { + Exchange[F].map { ex => + new Protocol[F] { - // Not super sure about this but it does make the sub-protocol implementations cleaner. - // We'll see how well it works out. - implicit val ms: MessageSocket[F] = bms - implicit val na: Namer[F] = nam - implicit val ExchangeF: protocol.Exchange[F] = ex + // Not super sure about this but it does make the sub-protocol implementations cleaner. + // We'll see how well it works out. + implicit val ms: MessageSocket[F] = bms + implicit val na: Namer[F] = nam + implicit val ExchangeF: protocol.Exchange[F] = ex - override def notifications(maxQueued: Int): Resource[F, Stream[F, Notification[String]]] = - bms.notifications(maxQueued) + override def notifications(maxQueued: Int): Resource[F, Stream[F, Notification[String]]] = + bms.notifications(maxQueued) - override def parameters: Signal[F, Map[String, String]] = - bms.parameters + override def parameters: Signal[F, Map[String, String]] = + bms.parameters - override def prepare[A](command: Command[A], ty: Typer): F[PreparedCommand[F, A]] = - protocol.Prepare[F](describeCache, parseCache, redactionStrategy, opDuration).apply(command, ty) + override def prepare[A](command: Command[A], ty: Typer): F[PreparedCommand[F, A]] = + protocol.Prepare[F](describeCache, parseCache, redactionStrategy, opDuration).apply(command, ty) - override def prepare[A, B](query: Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]] = - protocol.Prepare[F](describeCache, parseCache, redactionStrategy, opDuration).apply(query, ty) + override def prepare[A, B](query: Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]] = + protocol.Prepare[F](describeCache, parseCache, redactionStrategy, opDuration).apply(query, ty) - override def prepareR[A](command: Command[A], ty: Typer): Resource[F, Protocol.PreparedCommand[F, A]] = { - val acquire = Parse.Cache.empty[F](1).flatMap { pc => - protocol.Prepare[F](describeCache, pc, redactionStrategy, opDuration).apply(command, ty) - } - Resource.make(acquire)(pc => protocol.Close[F](opDuration).apply(pc.id)) + override def prepareR[A](command: Command[A], ty: Typer): Resource[F, Protocol.PreparedCommand[F, A]] = { + val acquire = Parse.Cache.empty[F](1).flatMap { pc => + protocol.Prepare[F](describeCache, pc, redactionStrategy, opDuration).apply(command, ty) } + Resource.make(acquire)(pc => protocol.Close[F](opDuration).apply(pc.id)) + } - override def prepareR[A, B](query: Query[A, B], ty: Typer): Resource[F, Protocol.PreparedQuery[F, A, B]] = { - val acquire = Parse.Cache.empty[F](1).flatMap { pc => - protocol.Prepare[F](describeCache, pc, redactionStrategy, opDuration).apply(query, ty) - } - Resource.make(acquire)(pq => protocol.Close[F](opDuration).apply(pq.id)) + override def prepareR[A, B](query: Query[A, B], ty: Typer): Resource[F, Protocol.PreparedQuery[F, A, B]] = { + val acquire = Parse.Cache.empty[F](1).flatMap { pc => + protocol.Prepare[F](describeCache, pc, redactionStrategy, opDuration).apply(query, ty) } + Resource.make(acquire)(pq => protocol.Close[F](opDuration).apply(pq.id)) + } - override def execute(command: Command[Void]): F[Completion] = - protocol.Query[F](redactionStrategy, opDuration).apply(command) + override def execute(command: Command[Void]): F[Completion] = + protocol.Query[F](redactionStrategy, opDuration).apply(command) - override def execute[B](query: Query[Void, B], ty: Typer): F[List[B]] = - protocol.Query[F](redactionStrategy, opDuration).apply(query, ty) + override def execute[B](query: Query[Void, B], ty: Typer): F[List[B]] = + protocol.Query[F](redactionStrategy, opDuration).apply(query, ty) - override def executeDiscard(statement: Statement[Void]): F[Unit] = protocol.Query[F](redactionStrategy, opDuration).applyDiscard(statement) + override def executeDiscard(statement: Statement[Void]): F[Unit] = protocol.Query[F](redactionStrategy, opDuration).applyDiscard(statement) - override def startup(user: String, database: String, password: Option[String], parameters: Map[String, String]): F[Unit] = - protocol.Startup[F](opDuration).apply(user, database, password, parameters) + override def startup(user: String, database: String, password: Option[String], parameters: Map[String, String]): F[Unit] = + protocol.Startup[F](opDuration).apply(user, database, password, parameters) - override def cleanup: F[Unit] = - parseCache.value.values.flatMap(_.traverse_(protocol.Close[F](opDuration).apply)) - - override def transactionStatus: Signal[F, TransactionStatus] = - bms.transactionStatus + override def cleanup: F[Unit] = + parseCache.value.values.flatMap(_.traverse_(protocol.Close[F](opDuration).apply)) + + override def transactionStatus: Signal[F, TransactionStatus] = + bms.transactionStatus - override val describeCache: Describe.Cache[F] = - dc + override val describeCache: Describe.Cache[F] = + dc - override val parseCache: Parse.Cache[F] = - pc + override val parseCache: Parse.Cache[F] = + pc - override def closeEvictedPreparedStatements: F[Unit] = - pc.value.clearEvicted.flatMap(_.traverse_(protocol.Close[F](opDuration).apply)) - } + override def closeEvictedPreparedStatements: F[Unit] = + pc.value.clearEvicted.flatMap(_.traverse_(protocol.Close[F](opDuration).apply)) } } diff --git a/modules/tests/shared/src/test/scala/simulation/SimTest.scala b/modules/tests/shared/src/test/scala/simulation/SimTest.scala index 62fc904fa..108edd904 100644 --- a/modules/tests/shared/src/test/scala/simulation/SimTest.scala +++ b/modules/tests/shared/src/test/scala/simulation/SimTest.scala @@ -9,6 +9,7 @@ import cats.effect._ import ffstest.FTest import fs2.concurrent.Signal import org.typelevel.otel4s.trace.Tracer +import org.typelevel.otel4s.metrics.Histogram import skunk.{Session, RedactionStrategy, TypingStrategy} import skunk.data.Notification import skunk.data.TransactionStatus @@ -44,7 +45,7 @@ trait SimTest extends FTest with SimMessageSocket.DSL { nam <- Namer[IO] dc <- Describe.Cache.empty[IO](1024, 1024) pc <- Parse.Cache.empty[IO](1024) - pro <- Protocol.fromMessageSocket(bms, nam, dc, pc, RedactionStrategy.None) + pro <- Protocol.fromMessageSocket(bms, nam, dc, pc, RedactionStrategy.None, Histogram.noop[IO, Double]) _ <- pro.startup(user, database, password, Session.DefaultConnectionParameters) ses <- Session.fromProtocol(pro, nam, TypingStrategy.BuiltinsOnly, RedactionStrategy.None) } yield ses