diff --git a/build.sbt b/build.sbt index 7b1a030..f423b56 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,5 @@ import Dependencies.* +import com.typesafe.sbt.packager.docker.{Cmd, ExecCmd} import sbt.* ThisBuild / organization := "com.evolution.jgrpc.tools" @@ -26,27 +27,53 @@ ThisBuild / scmInfo := Some(ScmInfo( // not sure if bincompat check works for Java code, put it here just in case ThisBuild / versionPolicyIntention := Compatibility.BinaryCompatible -// this is a Java project, setting a fixed Scala version just in case -ThisBuild / scalaVersion := "2.13.16" - // setting pure-Java module build settings ThisBuild / crossPaths := false // drop off Scala suffix from artifact names. ThisBuild / autoScalaLibrary := false // exclude scala-library from dependencies ThisBuild / javacOptions := Seq("-source", "17", "-target", "17", "-Werror", "-Xlint:all") ThisBuild / doc / javacOptions := Seq("-source", "17", "-Xdoclint:all", "-Werror") - -// common test dependencies: -ThisBuild / libraryDependencies ++= Seq( - // to be able to run JUnit 5+ tests: - "com.github.sbt.junit" % "jupiter-interface" % JupiterKeys.jupiterVersion.value, - Slf4j.simple, -).map(_ % Test) +ThisBuild / scalaVersion := "2.13.18" +ThisBuild / scalacOptions ++= Seq( + "-release:17", + "-deprecation", + "-Xsource:3", +) // common compile dependencies: ThisBuild / libraryDependencies ++= Seq( jspecify, // JSpecify null-check annotations ) +def asJavaPublishedModule(p: Project): Project = { + p.settings( + // common test dependencies for Java modules: + libraryDependencies ++= Seq( + // to be able to run JUnit 5+ tests: + "com.github.sbt.junit" % "jupiter-interface" % JupiterKeys.jupiterVersion.value, + Slf4j.simple, + ).map(_ % Test), + ) +} + +def asScalaIntegrationTestModule(p: Project): Project = { + p.disablePlugins(JupiterPlugin) // using scalatest instead + .settings( + publish / skip := true, + autoScalaLibrary := true, // int tests are written in Scala, returning scala-library dependency + Test / parallelExecution := false, // disable parallel execution between test suites + Test / fork := true, // disable parallel execution between modules + // tests take a long time to run, better to see the process in real time + Test / logBuffered := false, + // disable scaladoc generation to avoid dealing with annoying warnings + Compile / doc / sources := Seq.empty, + // common test dependencies for Scala int test modules: + libraryDependencies ++= Seq( + scalatest, + Slf4j.simple, + ).map(_ % Test), + ) +} + lazy val root = project.in(file(".")) .settings( name := "grpc-java-tools-root", @@ -55,9 +82,11 @@ lazy val root = project.in(file(".")) ) .aggregate( k8sDnsNameResolver, + k8sDnsNameResolverIt, ) lazy val k8sDnsNameResolver = project.in(file("k8s-dns-name-resolver")) + .configure(asJavaPublishedModule) .settings( name := "k8s-dns-name-resolver", description := "Evolution grpc-java tools - DNS-based name resolver for Kubernetes services", @@ -68,6 +97,43 @@ lazy val k8sDnsNameResolver = project.in(file("k8s-dns-name-resolver")) ), ) +lazy val k8sDnsNameResolverIt = project.in(file("k8s-dns-name-resolver-it")) + .configure(asScalaIntegrationTestModule) + // the module builds its own test app docker container + .enablePlugins(JavaAppPackaging, DockerPlugin) + .settings( + name := "k8s-dns-name-resolver-it", + description := "Evolution grpc-java tools - DNS-based name resolver for Kubernetes services - integration tests", + Compile / PB.targets := Seq( + scalapb.gen() -> (Compile / sourceManaged).value / "scalapb", + ), + dockerBaseImage := "amazoncorretto:17-alpine", + dockerCommands ++= Seq( + // root rights are needed to install additional packages, and also test client needs it + // to manipulate its DNS settings + Cmd("USER", "root"), + // bash is needed for testcontainers log watching logic + // lsof and coredns are needed for the integration test logic + ExecCmd("RUN", "apk", "add", "--no-cache", "bash", "lsof", "coredns"), + ), + dockerExposedPorts := Seq(9000), // Should match the test app GRPC server port. + // The int test here needs the test app docker container staged before running the code. + // It's then used in docker compose inside testcontainers. + test := { + (Docker / stage).value + (Test / test).value + }, + libraryDependencies ++= Seq( + Slf4j.simple, + commonsLang3, + "io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion, + "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion, + Testcontainers.core % Test, + ), + ).dependsOn( + k8sDnsNameResolver, + ) + addCommandAlias("fmt", "all scalafmtAll scalafmtSbt javafmtAll") addCommandAlias( "build", diff --git a/k8s-dns-name-resolver-it/src/main/protobuf/test_svc.proto b/k8s-dns-name-resolver-it/src/main/protobuf/test_svc.proto new file mode 100644 index 0000000..135b7f6 --- /dev/null +++ b/k8s-dns-name-resolver-it/src/main/protobuf/test_svc.proto @@ -0,0 +1,13 @@ +syntax = "proto2"; + +package k8sdns.it; + +service TestSvc { + rpc GetId (GetIdRequest) returns (GetIdReply) {} +} + +message GetIdRequest {} + +message GetIdReply { + required int32 id = 1; +} diff --git a/k8s-dns-name-resolver-it/src/main/resources/simplelogger.properties b/k8s-dns-name-resolver-it/src/main/resources/simplelogger.properties new file mode 100644 index 0000000..82aec05 --- /dev/null +++ b/k8s-dns-name-resolver-it/src/main/resources/simplelogger.properties @@ -0,0 +1,7 @@ +# Configure slf4j-simple to have concise output for tests +# Supported settings: https://www.slf4j.org/api/org/slf4j/simple/SimpleLogger.html +org.slf4j.simpleLogger.logFile=System.out +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss.SSS +org.slf4j.simpleLogger.showThreadName=false +org.slf4j.simpleLogger.showShortLogName=true diff --git a/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestApp.scala b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestApp.scala new file mode 100644 index 0000000..bafc078 --- /dev/null +++ b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestApp.scala @@ -0,0 +1,36 @@ +package com.evolution.jgrpc.tools.k8sdns.it + +/** + * `K8sDnsNameResolver` integration test service app entrypoint. + * + * Depending on the run mode environment variable value could either work as + * [[TestServer]] or [[TestClient]]. + */ +object TestApp extends App { + private val runModeEnvVarName = "TEST_SVC_RUN_MODE" + private val instanceIdVarName = "TEST_SVC_INSTANCE_ID" + + sys.env.get(runModeEnvVarName) match { + case None => + sys.error(s"missing environment variable: $runModeEnvVarName") + case Some("server") => + runServer() + case Some("client") => + runClient() + case Some(unexpectedRunMode) => + sys.error(s"unexpected run mode: $unexpectedRunMode") + } + + private def runServer(): Unit = { + val instanceId = sys.env.getOrElse( + instanceIdVarName, + sys.error(s"missing environment variable: $instanceIdVarName"), + ).toInt + + new TestServer(instanceId = instanceId).run() + } + + private def runClient(): Unit = { + new TestClient().run() + } +} diff --git a/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestAppShared.scala b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestAppShared.scala new file mode 100644 index 0000000..8cb1244 --- /dev/null +++ b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestAppShared.scala @@ -0,0 +1,169 @@ +package com.evolution.jgrpc.tools.k8sdns.it + +import java.nio.file.* + +/** + * Common things shared between the `K8sDnsNameResolver` integration test code and the + * test service code + * + * @see + * [[TestApp]] + */ +object TestAppShared { + + /** + * [[TestApp]] GRPC server port + */ + val ServerPort: Int = 9000 + + /** + * Docker compose service names for [[TestApp]] containers. + * + * The names here should match the ones used in the + * `src/test/resources/docker/compose-test.yml` file. + */ + object TestAppSvcNames { + + /** + * First [[TestApp]] service in a [[TestServer]] mode + */ + val Server1: String = "test-server1" + + /** + * Second [[TestApp]] service in a [[TestServer]] mode + */ + val Server2: String = "test-server2" + + /** + * [[TestApp]] service in a [[TestClient]] mode + */ + val Client: String = "test-client" + } + + /** + * `K8sDnsNameResolver` integration test watches for these [[TestApp]] log messages in + * the stdout. + */ + object TestAppSpecialLogMsgs { + + /** + * [[TestApp]] docker container has been started and ready to proceed with the test + */ + val Ready: String = "TEST CONTAINER READY" + + /** + * [[TestApp]] in the [[TestClient]] mode died prematurely, all the tests should be + * aborted + */ + val ClientPrematureDeath: String = "TEST CLIENT PANIC" + + /** + * [[TestApp]] in the [[TestClient]] mode completed a requested test case successfully + * + * @see + * [[TestClientControl]] for how to request a test case execution + */ + val ClientTestCaseSuccess: String = "TEST SUCCESS" + + /** + * [[TestApp]] in the [[TestClient]] mode ran a requested test case and got a failure + * + * @see + * [[TestClientControl]] for how to request a test case execution + */ + val ClientTestCaseFailed: String = "TEST FAILED" + } + + /** + * Defines the way to send commands to the [[TestApp]] container in the [[TestClient]] + * mode: + * - create an empty file in the [[CmdDirPath]] directory on the container - the name + * of the file is the command name + * - the [[TestClient]] code deletes the file and queues the command for execution + * - commands are executed on the [[TestClient]] one-by-one + * - monitor [[TestClient]] container stdout for the command progress - see + * [[TestAppSpecialLogMsgs]] + * + * Currently supported commands: + * - [[RunTestCaseCmdFileName]] for running [[TestClientTestCase]] + */ + object TestClientControl { + + /** + * Directory which [[TestApp]] in the [[TestClient]] mode uses for receiving commands + * + * @see + * [[TestClientControl]] + */ + val CmdDirPath: Path = Paths.get("/tmp/test-client-control") + + /** + * [[TestClientControl]] command for running [[TestClientTestCase]]. + */ + object RunTestCaseCmdFileName { + private val fileNamePrefix = ".run-test-case-" + + /** + * Creates a [[TestClientControl]] command file name for running the given + * [[TestClientTestCase]] + */ + def apply(testCase: TestClientTestCase): String = { + s"$fileNamePrefix${ testCase.name }" + } + + /** + * Matches [[TestClientControl]] command file name which runs a + * [[TestClientTestCase]] + */ + def unapply(fileName: String): Option[TestClientTestCase] = { + if (fileName.startsWith(fileNamePrefix)) { + val testCaseName = fileName.drop(fileNamePrefix.length) + TestClientTestCase.values.find(_.name == testCaseName) + } else { + None + } + } + } + } + + /** + * Test case to run on [[TestClient]]. + * + * @see + * [[TestClientControl.RunTestCaseCmdFileName]] + */ + sealed abstract class TestClientTestCase extends Product { + final def name: String = productPrefix + } + object TestClientTestCase { + val values: Vector[TestClientTestCase] = Vector( + DiscoverNewPod, + DnsFailureRecover, + ) + + /** + * [[TestClient]] test case verifying that `K8sDnsNameResolver` live pod discovery + * works. + * + * Test steps overview: + * - point the service host DNS records to one server container + * - create a GRPC client, check that it sees only the first server + * - add the second server to the DNS records + * - check that after the configured reload TTL, the client sees both servers + */ + case object DiscoverNewPod extends TestClientTestCase + + /** + * [[TestClient]] test case verifying that `K8sDnsNameResolver` recovers after a DNS + * call failure. + * + * Test steps overview: + * - point the service host DNS records to one server container + * - create a GRPC client, check that it sees only the first server + * - stop the DNS server, wait until the client gets a DNS error + * - start the DNS server back again, with 2 servers in the records + * - check that after the configured reload TTL, the client sees both servers + */ + case object DnsFailureRecover extends TestClientTestCase + } +} diff --git a/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestClient.scala b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestClient.scala new file mode 100644 index 0000000..9b853ae --- /dev/null +++ b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestClient.scala @@ -0,0 +1,362 @@ +package com.evolution.jgrpc.tools.k8sdns.it + +import com.evolution.jgrpc.tools.k8sdns.K8sDnsNameResolverProvider +import com.evolution.jgrpc.tools.k8sdns.it.TestAppShared.* +import io.grpc.netty.NettyChannelBuilder +import k8sdns.it.test_svc.TestSvcGrpc.TestSvcBlockingStub +import k8sdns.it.test_svc.{GetIdRequest, TestSvcGrpc} +import org.apache.commons.lang3.exception.ExceptionUtils + +import java.io.IOException +import java.net.http.HttpResponse.BodyHandlers +import java.net.http.{HttpClient, HttpRequest} +import java.net.{InetAddress, URI} +import java.nio.file.* +import java.time.Instant +import java.util.concurrent.TimeUnit +import scala.annotation.tailrec +import scala.concurrent.duration.DurationInt +import scala.jdk.CollectionConverters.* +import scala.jdk.DurationConverters.* +import scala.sys.process +import scala.sys.process.ProcessLogger + +/** + * Client mode implementation for the `K8sDnsNameResolver` integration test service app. + * + * It is controlled by the test code using [[TestClientControl]]. + * + * The results are reported back using special log messages: [[TestAppSpecialLogMsgs]]. + * + * Supported test cases: [[TestClientTestCase]]. + * + * DNS record changes are done using a controlled CoreDNS instance run in the test client + * app container. + * + * @see + * [[TestApp]] + */ +private[it] final class TestClient { + import TestClient.* + + def run(): Unit = { + try { + val controlCommandWatcher = new ControlCommandWatcher + println(TestAppSpecialLogMsgs.Ready) + val fixture = new Fixture + runTestCaseLoop(controlCommandWatcher, fixture) + } catch { + case t: Throwable => + t.printStackTrace() + println(TestAppSpecialLogMsgs.ClientPrematureDeath) + sys.exit(1) + } + } + + private def runTestCaseLoop(controlCommandWatcher: ControlCommandWatcher, fixture: Fixture): Unit = { + while (true) { + controlCommandWatcher.waitForCommand() match { + case TestClientControl.RunTestCaseCmdFileName(testCase) => + runTestCase { + testCase match { + case TestClientTestCase.DiscoverNewPod => + testCaseDiscoverNewPod(fixture) + case TestClientTestCase.DnsFailureRecover => + testCaseDnsFailureRecover(fixture) + } + } + + case unknownCmd: String => + sys.error(s"unknown test client control command: $unknownCmd") + } + } + } + + private def runTestCase(body: => Unit): Unit = { + try { + body + println(TestAppSpecialLogMsgs.ClientTestCaseSuccess) + } catch { + case t: Throwable => + t.printStackTrace() + println(TestAppSpecialLogMsgs.ClientTestCaseFailed) + } + } + + private def testCaseDiscoverNewPod(fixture: Fixture): Unit = { + fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip)) + + withRoundRobinLbClient { client => + callHost2TimesAssertServerIds(client, expectedServerIds = Set(1)) + + fixture.coreDns.setServiceIps(Set(fixture.srv1Ip, fixture.srv2Ip)) + + sleepUntilClientGetsDnsUpdate() + + callHost2TimesAssertServerIds(client, expectedServerIds = Set(1, 2)) + } + } + + private def testCaseDnsFailureRecover(fixture: Fixture): Unit = { + fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip)) + + withRoundRobinLbClient { client => + callHost2TimesAssertServerIds(client, expectedServerIds = Set(1)) + + fixture.coreDns.ensureStopped() + + sleepUntilClientGetsDnsUpdate() + + fixture.coreDns.ensureStarted(serviceIps = Set(fixture.srv1Ip, fixture.srv2Ip)) + + sleepUntilClientGetsDnsUpdate() + + callHost2TimesAssertServerIds(client, expectedServerIds = Set(1, 2)) + } + } + + private def sleepUntilClientGetsDnsUpdate(): Unit = { + val sleepIntervalSeconds = + coreDnsHostsReloadIntervalSeconds + + K8sDnsNameResolverProvider.DEFAULT_REFRESH_INTERVAL_SECONDS + + 2 // adding 2 seconds on top just in case + + println(s"Sleeping until GRPC client gets DNS update: $sleepIntervalSeconds seconds") + Thread.sleep(sleepIntervalSeconds.toLong * 1000) + } + + private def callHost2TimesAssertServerIds( + client: TestSvcBlockingStub, + expectedServerIds: Set[Int], + ): Unit = { + val actualServerIds = 0.until(2).map { _ => + client.getId(GetIdRequest()).id + }.toSet + if (actualServerIds != expectedServerIds) { + sys.error(s"GRPC client observed server IDs $actualServerIds, expected $expectedServerIds") + } + } + + private def withRoundRobinLbClient[T](body: TestSvcBlockingStub => T): T = { + val channel = NettyChannelBuilder + .forTarget(s"k8s-dns://$svcHostname:${ TestAppShared.ServerPort }") + .usePlaintext() + .defaultLoadBalancingPolicy("round_robin") + .build() + + try { + body(TestSvcGrpc.blockingStub(channel)) + } finally { + channel.shutdownNow() + // setting some termination just in case something gets stuck + channel.awaitTermination(10L, TimeUnit.SECONDS) + () + } + } +} + +private object TestClient { + private val svcHostname: String = "svc.example.org" + private val resolveConfPath = "/etc/resolv.conf" + + private val coreDnsCoreFilePath = "/etc/coredns/CoreFile" + private val coreDnsHostsFilePath = "/etc/coredns/hosts" + private val coreDnsHealthEndpointPort = 8080 + private val coreDnsReadyTimeout = 10.seconds + private val coreDnsReadyCheckAttemptDelay = 2.seconds + private val coreDnsHostsReloadIntervalSeconds = 2 + + private final class Fixture { + val srv1Ip: String = InetAddress.getByName(TestAppSvcNames.Server1).getHostAddress + val srv2Ip: String = InetAddress.getByName(TestAppSvcNames.Server2).getHostAddress + val coreDns: CoreDns = new CoreDns + setCoreDnsAsPrimaryDns() + } + + private def setCoreDnsAsPrimaryDns(): Unit = { + Files.write( + Paths.get(resolveConfPath), + Vector( + "nameserver 127.0.0.1", + ).asJava, + StandardOpenOption.TRUNCATE_EXISTING, + ) + () + } + + private final class ControlCommandWatcher { + Files.createDirectories(TestClientControl.CmdDirPath) + private val watchService = FileSystems.getDefault.newWatchService + TestClientControl.CmdDirPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE) + + private var commandStash: Vector[String] = Vector.empty + + @tailrec + def waitForCommand(): String = { + if (commandStash.nonEmpty) { + val cmd = commandStash.head + commandStash = commandStash.drop(1) + cmd + } else { + val watchKey = watchService.take() + val createdFilesRelativePaths = + watchKey.pollEvents().asScala.map(e => e.context().asInstanceOf[Path]).toVector + createdFilesRelativePaths.foreach(relativePath => + Files.delete(TestClientControl.CmdDirPath.resolve(relativePath)), + ) + watchKey.reset() + commandStash = createdFilesRelativePaths.map(_.toString) + waitForCommand() + } + } + } + + private final class CoreDns { + private val healthCheckHttpClient = HttpClient.newHttpClient() + + private var currentIpSet: Set[String] = Set.empty + + writeCoreFile() + writeHostsFile(ips = currentIpSet) + + private var processOpt: Option[CoreDns.StartedProcess] = None + + private def writeCoreFile(): Unit = { + Files.writeString( + Paths.get(coreDnsCoreFilePath), + s"""$svcHostname { + | hosts $coreDnsHostsFilePath { + | ttl $coreDnsHostsReloadIntervalSeconds + | reload ${ coreDnsHostsReloadIntervalSeconds }s + | } + | errors # show errors + | log # enable query logs + | health :$coreDnsHealthEndpointPort # enable healthcheck HTTP endpoint + |} + |""".stripMargin, + ) + () + } + + private def callIsHealthy(): Either[String, Unit] = { + val request = HttpRequest.newBuilder() + .uri(new URI(s"http://127.0.0.1:$coreDnsHealthEndpointPort/health")) + .timeout(coreDnsReadyCheckAttemptDelay.toJava) + .GET() + .build() + try { + val response = healthCheckHttpClient.send(request, BodyHandlers.ofString()) + if (response.statusCode() == 200) { + Right(()) + } else { + Left(s"status code not OK but ${ response.statusCode() }") + } + } catch { + case e: IOException => + Left(ExceptionUtils.getMessage(e)) + } + } + + @tailrec + private def waitUntilHealthy( + startTime: Instant = Instant.now(), + ): Unit = { + callIsHealthy() match { + case Right(_) => + println("CoreDNS process is ready") + case Left(err) => + println(s"CoreDNS process is not ready yet: $err") + val timePassed = java.time.Duration.between(startTime, Instant.now()) + if (timePassed.toScala > coreDnsReadyTimeout) { + sys.error(s"CoreDNS process startup timed out after $coreDnsReadyTimeout") + } else { + Thread.sleep(coreDnsReadyCheckAttemptDelay.toMillis) + waitUntilHealthy(startTime) + } + } + } + + def ensureStarted(serviceIps: Set[String]): Unit = { + processOpt match { + case Some(process) if process.sysProcess.isAlive() && serviceIps == currentIpSet => + waitUntilHealthy() + case _ => + // stopping first to make sure we get new DNS records without waiting + // for the hosts file reload + ensureStopped() + setServiceIps(serviceIps) + startNewProcess() + } + } + + def ensureStopped(): Unit = { + processOpt match { + case Some(process) if process.sysProcess.isAlive() => + println("stopping CoreDNS process") + process.sysProcess.destroy() + case _ => + } + processOpt = None + } + + private def startNewProcess(): Unit = { + processOpt = None + println("starting CoreDNS process") + val process = sys.process.Process(s"coredns -conf $coreDnsCoreFilePath").run( + ProcessLogger(line => println(s"[CoreDNS] $line")), + ) + // CoreDNS usually needs ~100ms to start, so the first health check attempt is almost + // guaranteed to fail. + // Let's wait a bit to allow it to start. + Thread.sleep(500L) + waitUntilHealthy() + val pid = findDnsProcessPid().getOrElse( + sys.error("unable to get CoreDNS process PID - nothing bound to port 53"), + ) + println(s"CoreDNS process PID: $pid") + processOpt = Some(new CoreDns.StartedProcess( + pid = pid, + sysProcess = process, + )) + } + + private def findDnsProcessPid(): Option[Int] = { + val lsOfOutRaw = sys.process.Process("lsof -t -i:53").!! + Some(lsOfOutRaw.strip()).filter(_.nonEmpty).map { lsOfOut => + lsOfOut.toIntOption.getOrElse(sys.error(s"malformed lsof output - $lsOfOut")) + } + } + + private def writeHostsFile(ips: Set[String]): Unit = { + // overwriting hosts file atomically so CoreDNS couldn't observe broken file content + + val tmpHostsFile = Files.createTempFile("hosts", "txt") + Files.write( + tmpHostsFile, + ips.toVector.sorted.map(ip => s"$ip $svcHostname").asJava, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + ) + + Files.move(tmpHostsFile, Paths.get(coreDnsHostsFilePath), StandardCopyOption.ATOMIC_MOVE) + () + } + + def setServiceIps(ips: Set[String]): Unit = { + if (ips != currentIpSet) { + writeHostsFile(ips) + currentIpSet = ips + } + // it looks like CoreDNS hosts plugin does not reread files on SIGHUP + // force CoreDNS to reread configs just to be sure +// processOpt.foreach(p => sys.process.Process(s"kill -HUP ${ p.pid }").!!) + } + } + + private object CoreDns { + private final class StartedProcess( + val pid: Int, + val sysProcess: process.Process, + ) + } +} diff --git a/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestServer.scala b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestServer.scala new file mode 100644 index 0000000..97daf95 --- /dev/null +++ b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestServer.scala @@ -0,0 +1,34 @@ +package com.evolution.jgrpc.tools.k8sdns.it + +import com.evolution.jgrpc.tools.k8sdns.it.TestAppShared.TestAppSpecialLogMsgs +import io.grpc.netty.NettyServerBuilder +import k8sdns.it.test_svc.TestSvcGrpc + +import java.util.concurrent.TimeUnit +import scala.concurrent.ExecutionContext + +/** + * Server mode implementation for the `K8sDnsNameResolver` integration test service app. + * + * @param instanceId + * server instance ID which is returned by the test GRPC API + * @see + * [[TestApp]] + */ +private[it] final class TestServer(instanceId: Int) { + private val server = NettyServerBuilder + .forPort(TestAppShared.ServerPort) + // Connection idle timeout should be larger than potential max test case runtime, because + // reconnect forces the client to refresh NameResolver results. + // If this happens during K8sDnsNameResolver live DNS reload tests, it will spoil the results. + .maxConnectionIdle(10, TimeUnit.MINUTES) + .addService(TestSvcGrpc.bindService(new TestSvcImpl(instanceId = instanceId), ExecutionContext.global)) + .build + + def run(): Unit = { + server.start() + println(s"instance id: $instanceId") + println(TestAppSpecialLogMsgs.Ready) + server.awaitTermination() + } +} diff --git a/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestSvcImpl.scala b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestSvcImpl.scala new file mode 100644 index 0000000..a8386bc --- /dev/null +++ b/k8s-dns-name-resolver-it/src/main/scala/com/evolution/jgrpc/tools/k8sdns/it/TestSvcImpl.scala @@ -0,0 +1,11 @@ +package com.evolution.jgrpc.tools.k8sdns.it + +import k8sdns.it.test_svc.* + +import scala.concurrent.Future + +private[it] final class TestSvcImpl(instanceId: Int) extends TestSvcGrpc.TestSvc { + override def getId(request: GetIdRequest): Future[GetIdReply] = { + Future.successful(GetIdReply(id = instanceId)) + } +} diff --git a/k8s-dns-name-resolver-it/src/test/resources/docker/compose-test.yml b/k8s-dns-name-resolver-it/src/test/resources/docker/compose-test.yml new file mode 100644 index 0000000..07b47c4 --- /dev/null +++ b/k8s-dns-name-resolver-it/src/test/resources/docker/compose-test.yml @@ -0,0 +1,23 @@ +services: + test-server1: + &test-app + build: ../../../target/docker/stage + environment: + TEST_SVC_RUN_MODE: server + TEST_SVC_INSTANCE_ID: 1 + deploy: + resources: + limits: + cpus: 1 + memory: 200M + + test-server2: + <<: *test-app + environment: + TEST_SVC_RUN_MODE: server + TEST_SVC_INSTANCE_ID: 2 + + test-client: + <<: *test-app + environment: + TEST_SVC_RUN_MODE: client diff --git a/k8s-dns-name-resolver-it/src/test/scala/com/evolution/jgrpc/tools/k8sdns/it/K8sDnsNameResolverIt.scala b/k8s-dns-name-resolver-it/src/test/scala/com/evolution/jgrpc/tools/k8sdns/it/K8sDnsNameResolverIt.scala new file mode 100644 index 0000000..3b88830 --- /dev/null +++ b/k8s-dns-name-resolver-it/src/test/scala/com/evolution/jgrpc/tools/k8sdns/it/K8sDnsNameResolverIt.scala @@ -0,0 +1,212 @@ +package com.evolution.jgrpc.tools.k8sdns.it + +import com.evolution.jgrpc.tools.k8sdns.it.TestAppShared.* +import org.scalatest.BeforeAndAfterAll +import org.scalatest.freespec.AnyFreeSpec +import org.slf4j.LoggerFactory +import org.testcontainers.containers.ComposeContainer +import org.testcontainers.containers.output.OutputFrame +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy + +import java.io.File +import java.nio.file.Paths +import java.util.concurrent.locks.ReentrantLock +import java.util.function.Consumer +import java.util.regex.Pattern +import scala.concurrent.* +import scala.concurrent.duration.* +import scala.jdk.FunctionConverters.* +import scala.util.control.NoStackTrace + +/** + * Integration tests for `K8sDnsNameResolver`. + * + * Uses testcontainers with docker compose (`src/test/resources/docker/compose-test.yml`) + * to run 2 instances of a GRPC server and one instance of a GRPC client with + * `K8sDnsNameResolver` and a controlled DNS server. + * + * Then it verifies how the client reacts to changes in DNS for the service hostname. + * + * Test cases are run on the client app container. + * + * The test code communicates with the client container using files in a special command + * folder (see [[TestClientControl]]). + * + * The test results are reported back using log messages (see [[TestAppSpecialLogMsgs]]). + */ +class K8sDnsNameResolverIt extends AnyFreeSpec with BeforeAndAfterAll { + import K8sDnsNameResolverIt.* + + // should be enough to start/stop CoreDNS and wait for K8sDnsNameResolver reload several times + // over + private val testCaseTimeout = 2.minutes + + private val logger = LoggerFactory.getLogger(classOf[K8sDnsNameResolverIt]) + + private val testClientLogWatcher = new TestClientLogWatcher + + private val composeContainer: ComposeContainer = new ComposeContainer( + classpathResourceToFile("/docker/compose-test.yml"), + ) + .waitingFor(TestAppSvcNames.Server1, testAppReadyWaitStrategy) + .waitingFor(TestAppSvcNames.Server2, testAppReadyWaitStrategy) + .waitingFor(TestAppSvcNames.Client, testAppReadyWaitStrategy) + .withLogConsumer1(TestAppSvcNames.Server1, printingLogConsumer) + .withLogConsumer1(TestAppSvcNames.Server2, printingLogConsumer) + .withLogConsumer1(TestAppSvcNames.Client, printingLogConsumer) + .withLogConsumer(TestAppSvcNames.Client, testClientLogWatcher) + + override def beforeAll(): Unit = { + composeContainer.start() + } + + override def afterAll(): Unit = { + composeContainer.stop() + } + + "K8sDnsNameResolver" - { + "should discover a new pod after a DNS A-record added" in { + runTestCase(TestClientTestCase.DiscoverNewPod) + } + + "should recover after DNS query failure" in { + runTestCase(TestClientTestCase.DnsFailureRecover) + } + } + + private def runTestCase(testCase: TestClientTestCase): Unit = { + val resultFuture = testClientLogWatcher.subscribeForTestCaseResult() + + runInContainerExpectSuccess( + svcName = TestAppSvcNames.Client, + cmd = Vector( + "touch", + s"${ TestClientControl.CmdDirPath }/${ TestClientControl.RunTestCaseCmdFileName(testCase) }", + ), + ) + + Await.result(resultFuture, testCaseTimeout) + } + + private def runInContainerExpectSuccess(svcName: String, cmd: Vector[String]): Unit = { + val result = composeContainer.getContainerByServiceName(svcName).orElseThrow() + .execInContainer(cmd*) + if (result.getExitCode != 0) { + val stdOut = Option(result.getStdout).filter(_.nonEmpty).getOrElse("EMPTY") + val stdErr = Option(result.getStderr).filter(_.nonEmpty).getOrElse("EMPTY") + sys.error( + s"[$svcName] container command failed: code ${ result.getExitCode }\n\tstdout: $stdOut\n\tstderr: $stdErr", + ) + } + } + + private def testAppReadyWaitStrategy: LogMessageWaitStrategy = { + new LogMessageWaitStrategy().withRegEx( + s"${ Pattern.quote(TestAppSpecialLogMsgs.Ready) }.+", + ) + } + + // when a test case fails, these logs are the only way to find what went wrong + private def printingLogConsumer(serviceName: String): LogConsumer = (outputFrame: OutputFrame) => { + outputFrame.getType match { + case OutputFrame.OutputType.STDOUT | OutputFrame.OutputType.STDERR => + logger.info(s"[$serviceName] ${ outputFrame.getUtf8StringWithoutLineEnding }") + case OutputFrame.OutputType.END => + // testcontainers API didn't provide a way to distinguish close markers for stdout and + // stderr, so this line is printed twice + logger.info(s"[$serviceName] STDOUT/STDERR END") + } + } + + private def classpathResourceToFile(resource: String): File = { + Paths.get(getClass.getResource(resource).toURI).toFile + } +} + +private object K8sDnsNameResolverIt { + type LogConsumer = OutputFrame => Unit + + private implicit class RichComposeContainer(val inner: ComposeContainer) extends AnyVal { + def withLogConsumer1(serviceName: String, mkConsumer: String => LogConsumer): ComposeContainer = { + inner.withLogConsumer(serviceName, mkConsumer(serviceName).asJava) + } + } + + /** + * Stateful, thread-safe component to subscribe to [[TestClient]] special log messages, + * which indicate results of test case executions. + * + * @see + * [[TestAppSpecialLogMsgs]] + */ + private final class TestClientLogWatcher extends Consumer[OutputFrame] { + + private val stateLock: ReentrantLock = new ReentrantLock() + + private var testClientDiedPrematurely: Boolean = false + private var watcherOpt: Option[Promise[Unit]] = None + + override def accept(frame: OutputFrame): Unit = { + frame.getType match { + case OutputFrame.OutputType.STDOUT | OutputFrame.OutputType.STDERR => + val msgStr = frame.getUtf8String + + if (msgStr.contains(TestAppSpecialLogMsgs.ClientTestCaseSuccess)) { + underStateLock { + watcherOpt.foreach(_.success(())) + watcherOpt = None + } + } else if (msgStr.contains(TestAppSpecialLogMsgs.ClientTestCaseFailed)) { + underStateLock { + watcherOpt.foreach(_.failure(mkRemoteTestCaseFailed)) + watcherOpt = None + } + } else if (msgStr.contains(TestAppSpecialLogMsgs.ClientPrematureDeath)) { + underStateLock { + testClientDiedPrematurely = true + watcherOpt.foreach(_.failure(mkRemoteTestCaseFailedPrematureDeath)) + watcherOpt = None + } + } + + case OutputFrame.OutputType.END => + () + } + } + + def subscribeForTestCaseResult(): Future[Unit] = underStateLock { + if (testClientDiedPrematurely) { + Future.failed(mkRemoteTestCaseFailedPrematureDeath) + } else if (watcherOpt.nonEmpty) { + Future.failed(mkRemoteTestCaseFailed) + } else { + val promise = Promise[Unit]() + watcherOpt = Some(promise) + promise.future + } + } + + private def underStateLock[T](body: => T): T = { + stateLock.lock() + try { + body + } finally { + stateLock.unlock() + } + } + } + + private def mkRemoteTestCaseFailedPrematureDeath = RemoteTestCaseException( + "test client app container died prematurely", + ) + + private def mkRemoteTestCaseFailed = RemoteTestCaseException( + s"test case failed", + ) + + private case class RemoteTestCaseException( + details: String, + ) extends RuntimeException( + s"$details\ncheck printed ${ TestAppSvcNames.Client } logs for more info", + ) with NoStackTrace +} diff --git a/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/package-info.java b/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/package-info.java index 53f5158..2c69f6e 100644 --- a/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/package-info.java +++ b/k8s-dns-name-resolver/src/main/java/com/evolution/jgrpc/tools/k8sdns/package-info.java @@ -4,5 +4,4 @@ @org.jspecify.annotations.NullMarked package com.evolution.jgrpc.tools.k8sdns; -// TODO: #2 add integration test for K8sDnsNameResolver // TODO: #1 add README diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 89361e0..d5009b4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,6 +3,8 @@ import sbt.* object Dependencies { val dnsJava = "dnsjava" % "dnsjava" % "3.6.3" val jspecify = "org.jspecify" % "jspecify" % "1.0.0" + val scalatest = "org.scalatest" %% "scalatest" % "3.2.19" + val commonsLang3 = "org.apache.commons" % "commons-lang3" % "3.20.0" object Grpc { // let's keep it in sync with the version used by the last release of scalapb @@ -17,4 +19,10 @@ object Dependencies { val api = "org.slf4j" % "slf4j-api" % version val simple = "org.slf4j" % "slf4j-simple" % version } + + object Testcontainers { + private val version = "2.0.2" + + val core = "org.testcontainers" % "testcontainers" % version + } } diff --git a/project/plugins.sbt b/project/plugins.sbt index 6149d9e..51eed63 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,9 @@ +addSbtPlugin("com.evolution" % "sbt-scalac-opts-plugin" % "0.0.9") addSbtPlugin("com.github.sbt" % "sbt-java-formatter" % "0.10.0") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.5") addSbtPlugin("ch.epfl.scala" % "sbt-version-policy" % "3.2.1") addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.11.2") // to be able to run JUnit 5+ tests: addSbtPlugin("com.github.sbt.junit" % "sbt-jupiter-interface" % "0.17.0") +// for docker-compose-based integration tests: +addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.11.4") diff --git a/project/scalapb.sbt b/project/scalapb.sbt new file mode 100644 index 0000000..3a1496c --- /dev/null +++ b/project/scalapb.sbt @@ -0,0 +1,3 @@ +addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.8") + +libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.20"