Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
"org.scodec" %%% "scodec-bits" % "1.2.4",
"org.scodec" %%% "scodec-core" % (if (tlIsScala3.value) "2.3.3" else "1.11.11"),
"org.scodec" %%% "scodec-cats" % "1.3.0-RC1",
"org.typelevel" %%% "otel4s-core-trace" % otel4sVersion,
"org.typelevel" %%% "otel4s-core" % otel4sVersion,
"org.typelevel" %%% "otel4s-semconv" % otel4sVersion,
"org.typelevel" %%% "otel4s-semconv-metrics" % otel4sVersion,
"org.tpolecat" %%% "sourcepos" % "1.2.0",
"org.typelevel" %%% "twiddles-core" % "1.0.0-RC2",
) ++ Seq(
Expand Down
2 changes: 2 additions & 0 deletions modules/bench/src/main/scala/skunk/bench/SelectBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import skunk.codec.all._
import java.sql.DriverManager
import org.openjdk.jmh.annotations._
import org.typelevel.otel4s.trace.Tracer
import org.typelevel.otel4s.metrics.Meter

@State(Scope.Benchmark)
object SelectBenchScope {
implicit val tracer: Tracer[IO] = Tracer.noop[IO]
implicit val meter: Meter[IO] = Meter.noop[IO]

val defaultChunkSize = 512

Expand Down
27 changes: 16 additions & 11 deletions modules/core/shared/src/main/scala/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import fs2.concurrent.Signal
import fs2.io.net.{ Network, Socket, SocketOption }
import fs2.Pipe
import fs2.Stream
import org.typelevel.otel4s.metrics.Meter
import org.typelevel.otel4s.metrics.Histogram
import org.typelevel.otel4s.trace.Tracer
import skunk.codec.all.bool
import skunk.data._
Expand Down Expand Up @@ -476,7 +478,7 @@ object Session {
* @param queryCacheSize size of the session-level cache for query checking; defaults to 2048
* @param parseCacheSize size of the pool-level cache for parsing statements; defaults to 2048
*/
final class Builder[F[_]: Temporal: Network: Console] private (
final class Builder[F[_]: Temporal: Meter: Network: Console] private (
val connectionType: ConnectionType,
val host: Host,
val port: Port,
Expand Down Expand Up @@ -645,13 +647,15 @@ object Session {
for {
dc <- Resource.eval(Describe.Cache.empty[F](commandCacheSize, queryCacheSize))
sslOp <- ssl.toSSLNegotiationOptions(if (debug) logger.some else none)
pool <- Pool.ofF({implicit T: Tracer[F] => sessions(sslOp, dc)}, max)(Recyclers.full)
opDuration <- Resource.eval(Otel.OpDurationHistogram[F])
pool <- Pool.ofF({implicit T: Tracer[F] => sessions(sslOp, dc, opDuration)}, max)(Recyclers.full)
} yield pool
}

private def sessions(
sslOptions: Option[SSLNegotiation.Options[F]],
describeCache: Describe.Cache[F]
describeCache: Describe.Cache[F],
opDuration: Histogram[F, Double]
)(implicit T: Tracer[F]): Resource[F, Session[F]] = {
val sockets = connectionType match {
case ConnectionType.TCP =>
Expand All @@ -663,18 +667,19 @@ object Session {
val filteredSocketOptions = socketOptions.filter(o => o.key != SocketOption.NoDelay)
Network[F].connect(address, filteredSocketOptions)
}
fromSockets(sockets, sslOptions, describeCache)
fromSockets(sockets, sslOptions, describeCache, opDuration)
}

private def fromSockets(
sockets: Resource[F, Socket[F]],
sslOptions: Option[SSLNegotiation.Options[F]],
describeCache: Describe.Cache[F]
describeCache: Describe.Cache[F],
opDuration: Histogram[F, Double]
)(implicit T: Tracer[F]): Resource[F, Session[F]] =
for {
namer <- Resource.eval(Namer[F])
pc <- Resource.eval(Parse.Cache.empty[F](parseCacheSize))
proto <- Protocol[F](debug, namer, sockets, sslOptions, describeCache, pc, readTimeout, redactionStrategy)
proto <- Protocol[F](debug, namer, sockets, sslOptions, describeCache, pc, readTimeout, redactionStrategy, opDuration)
creds <- Resource.eval(credentials)
_ <- Resource.eval(proto.startup(creds.user, database.getOrElse(creds.user), creds.password, connectionParameters))
sess <- Resource.make(fromProtocol(proto, namer, typingStrategy, redactionStrategy))(_ => proto.cleanup)
Expand All @@ -701,7 +706,7 @@ object Session {
}}}
*/
object Builder {
def apply[F[_]: Temporal: Network: Console]: Builder[F] =
def apply[F[_]: Temporal: Meter: Network: Console]: Builder[F] =
new Builder[F](
connectionType = ConnectionType.TCP,
host = host"localhost",
Expand Down Expand Up @@ -748,7 +753,7 @@ object Session {
* @group Constructors
*/
@deprecated("1.0.0-M11", "Use Session.Builder[F].pooled instead")
def pooled[F[_]: Temporal: Tracer: Network: Console](
def pooled[F[_]: Temporal: Tracer: Meter: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand Down Expand Up @@ -806,7 +811,7 @@ object Session {
* @group Constructors
*/
@deprecated("1.0.0-M11", "Use Session.Builder[F].pooledExplicitTracer instead")
def pooledF[F[_]: Temporal: Network: Console](
def pooledF[F[_]: Temporal: Meter: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand Down Expand Up @@ -848,7 +853,7 @@ object Session {
* @see pooled
*/
@deprecated("1.0.0-M11", "Use Session.Builder[F].single instead")
def single[F[_]: Temporal: Tracer: Network: Console](
def single[F[_]: Temporal: Tracer: Meter: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand Down Expand Up @@ -887,7 +892,7 @@ object Session {
* @see pooledF
*/
@deprecated("1.0.0-M11", "Use Session.Builder[F].singleExplicitTracer instead")
def singleF[F[_]: Temporal: Network: Console](
def singleF[F[_]: Temporal: Meter: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand Down
37 changes: 20 additions & 17 deletions modules/core/shared/src/main/scala/net/Protocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import skunk.net.protocol.Describe
import scala.concurrent.duration.Duration
import skunk.net.protocol.Exchange
import skunk.net.protocol.Parse
import org.typelevel.otel4s.metrics.Histogram

/**
* Interface for a Postgres database, expressed through high-level operations that rely on exchange
Expand Down Expand Up @@ -216,19 +217,21 @@ object Protocol {
describeCache: Describe.Cache[F],
parseCache: Parse.Cache[F],
readTimeout: Duration,
redactionStrategy: RedactionStrategy
redactionStrategy: RedactionStrategy,
opDuration: Histogram[F, Double]
): Resource[F, Protocol[F]] =
for {
bms <- BufferedMessageSocket[F](256, debug, sockets, sslOptions, readTimeout) // TODO: should we expose the queue size?
p <- Resource.eval(fromMessageSocket(bms, nam, describeCache, parseCache, redactionStrategy))
p <- Resource.eval(fromMessageSocket(bms, nam, describeCache, parseCache, redactionStrategy, opDuration))
} yield p

def fromMessageSocket[F[_]: Concurrent: Tracer](
bms: BufferedMessageSocket[F],
nam: Namer[F],
dc: Describe.Cache[F],
pc: Parse.Cache[F],
redactionStrategy: RedactionStrategy
redactionStrategy: RedactionStrategy,
opDuration: Histogram[F, Double]
): F[Protocol[F]] =
Exchange[F].map { ex =>
new Protocol[F] {
Expand All @@ -246,50 +249,50 @@ object Protocol {
bms.parameters

override def prepare[A](command: Command[A], ty: Typer): F[PreparedCommand[F, A]] =
protocol.Prepare[F](describeCache, parseCache, redactionStrategy).apply(command, ty)
protocol.Prepare[F](describeCache, parseCache, redactionStrategy, opDuration).apply(command, ty)

override def prepare[A, B](query: Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]] =
protocol.Prepare[F](describeCache, parseCache, redactionStrategy).apply(query, ty)
protocol.Prepare[F](describeCache, parseCache, redactionStrategy, opDuration).apply(query, ty)

override def prepareR[A](command: Command[A], ty: Typer): Resource[F, Protocol.PreparedCommand[F, A]] = {
val acquire = Parse.Cache.empty[F](1).flatMap { pc =>
protocol.Prepare[F](describeCache, pc, redactionStrategy).apply(command, ty)
protocol.Prepare[F](describeCache, pc, redactionStrategy, opDuration).apply(command, ty)
}
Resource.make(acquire)(pc => protocol.Close[F].apply(pc.id))
Resource.make(acquire)(pc => protocol.Close[F](opDuration).apply(pc.id))
}

override def prepareR[A, B](query: Query[A, B], ty: Typer): Resource[F, Protocol.PreparedQuery[F, A, B]] = {
val acquire = Parse.Cache.empty[F](1).flatMap { pc =>
protocol.Prepare[F](describeCache, pc, redactionStrategy).apply(query, ty)
protocol.Prepare[F](describeCache, pc, redactionStrategy, opDuration).apply(query, ty)
}
Resource.make(acquire)(pq => protocol.Close[F].apply(pq.id))
Resource.make(acquire)(pq => protocol.Close[F](opDuration).apply(pq.id))
}

override def execute(command: Command[Void]): F[Completion] =
protocol.Query[F](redactionStrategy).apply(command)
protocol.Query[F](redactionStrategy, opDuration).apply(command)

override def execute[B](query: Query[Void, B], ty: Typer): F[List[B]] =
protocol.Query[F](redactionStrategy).apply(query, ty)
protocol.Query[F](redactionStrategy, opDuration).apply(query, ty)

override def executeDiscard(statement: Statement[Void]): F[Unit] = protocol.Query[F](redactionStrategy).applyDiscard(statement)
override def executeDiscard(statement: Statement[Void]): F[Unit] = protocol.Query[F](redactionStrategy, opDuration).applyDiscard(statement)

override def startup(user: String, database: String, password: Option[String], parameters: Map[String, String]): F[Unit] =
protocol.Startup[F].apply(user, database, password, parameters)
protocol.Startup[F](opDuration).apply(user, database, password, parameters)

override def cleanup: F[Unit] =
parseCache.value.values.flatMap(_.traverse_(protocol.Close[F].apply))
parseCache.value.values.flatMap(_.traverse_(protocol.Close[F](opDuration).apply))

override def transactionStatus: Signal[F, TransactionStatus] =
bms.transactionStatus

override val describeCache: Describe.Cache[F] =
dc

override val parseCache: Parse.Cache[F] =
pc
pc

override def closeEvictedPreparedStatements: F[Unit] =
pc.value.clearEvicted.flatMap(_.traverse_(protocol.Close[F].apply))
pc.value.clearEvicted.flatMap(_.traverse_(protocol.Close[F](opDuration).apply))
}
}

Expand Down
11 changes: 6 additions & 5 deletions modules/core/shared/src/main/scala/net/protocol/Bind.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package skunk.net.protocol

import cats.effect.MonadCancel
import cats.effect.Resource
import cats.syntax.all._
import cats.MonadError
import skunk.exception.PostgresErrorException
import skunk.net.message.{ Bind => BindMessage, Close => _, _ }
import skunk.net.MessageSocket
Expand All @@ -15,6 +15,7 @@ import skunk.util.{ Origin, Namer }
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.trace.{Span, Tracer}
import skunk.RedactionStrategy
import org.typelevel.otel4s.metrics.Histogram

trait Bind[F[_]] {

Expand All @@ -29,8 +30,8 @@ trait Bind[F[_]] {

object Bind {

def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](
implicit ev: MonadError[F, Throwable]
def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](opDuration: Histogram[F, Double])(
implicit ev: MonadCancel[F, Throwable]
): Bind[F] =
new Bind[F] {

Expand All @@ -41,7 +42,7 @@ object Bind {
redactionStrategy: RedactionStrategy
): Resource[F, PortalId] =
Resource.make {
exchange("bind") { (span: Span[F]) =>
exchange("bind", opDuration) { (span: Span[F]) =>
for {
pn <- nextName("portal").map(PortalId(_))
ea = statement.statement.encoder.encode(args) // encoded args
Expand Down Expand Up @@ -70,7 +71,7 @@ object Bind {
}
} yield pn
}
} { Close[F].apply }
} { Close[F](opDuration).apply }

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import skunk.net.Protocol
import skunk.data.Completion
import skunk.net.protocol.exchange
import cats.effect.kernel.Deferred
import org.typelevel.otel4s.metrics.Histogram

trait BindExecute[F[_]] {

Expand All @@ -41,7 +42,7 @@ trait BindExecute[F[_]] {

object BindExecute {

def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](
def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](opDuration: Histogram[F, Double])(
implicit ev: Concurrent[F]
): BindExecute[F] =
new Unroll[F] with BindExecute[F] {
Expand Down Expand Up @@ -133,7 +134,7 @@ object BindExecute {
}

Resource.make {
exchange("bind+execute"){ (span: Span[F]) =>
exchange("bind+execute", opDuration){ (span: Span[F]) =>
for {
pn <- preBind(span)
_ <- send(ExecuteMessage(pn.value, 0))
Expand All @@ -144,7 +145,7 @@ object BindExecute {
def execute: F[Completion] = c.pure
}
}
} { portal => Close[F].apply(portal.id)}
} { portal => Close[F](opDuration).apply(portal.id)}

}

Expand All @@ -158,7 +159,7 @@ object BindExecute {
val (preBind, postBind) = bindExchange(statement, args, argsOrigin, redactionStrategy)
Resource.eval(Deferred[F, Unit]).flatMap { prefetch =>
Resource.make {
exchange("bind+execute"){ (span: Span[F]) =>
exchange("bind+execute", opDuration){ (span: Span[F]) =>
for {
pn <- preBind(span)
_ <- span.addAttributes(
Expand All @@ -173,11 +174,11 @@ object BindExecute {
def execute(maxRows: Int): F[List[B] ~ Boolean] =
prefetch.tryGet.flatMap {
case None => rs.pure <* prefetch.complete(())
case Some(()) => Execute[F].apply(this, maxRows)
case Some(()) => Execute[F](opDuration).apply(this, maxRows)
}
}
}
} { portal => Close[F].apply(portal.id)}
} { portal => Close[F](opDuration).apply(portal.id)}
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions modules/core/shared/src/main/scala/net/protocol/Close.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
package skunk.net
package protocol

import cats.FlatMap
import cats.effect.MonadCancelThrow
import cats.syntax.all._
import skunk.net.message.{ Close => CloseMessage, Flush, CloseComplete }
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.trace.Span
import org.typelevel.otel4s.trace.Tracer
import org.typelevel.otel4s.metrics.Histogram

trait Close[F[_]] {
def apply(portalId: Protocol.PortalId): F[Unit]
Expand All @@ -19,17 +20,17 @@ trait Close[F[_]] {

object Close {

def apply[F[_]: FlatMap: Exchange: MessageSocket: Tracer]: Close[F] =
def apply[F[_]: MonadCancelThrow: Exchange: MessageSocket: Tracer](opDuration: Histogram[F, Double]): Close[F] =
new Close[F] {

override def apply(portalId: Protocol.PortalId): F[Unit] =
exchange("close-portal") { (span: Span[F]) =>
exchange("close-portal", opDuration) { (span: Span[F]) =>
span.addAttribute(Attribute("portal", portalId.value)) *>
close(CloseMessage.portal(portalId.value))
}

override def apply(statementId: Protocol.StatementId): F[Unit] =
exchange("close-statement") { (span: Span[F]) =>
exchange("close-statement", opDuration) { (span: Span[F]) =>
span.addAttribute(Attribute("statement", statementId.value)) *>
close(CloseMessage.statement(statementId.value))
}
Expand Down
Loading