From d90f9923d639a6b7958df61ba9d581258e28a3d8 Mon Sep 17 00:00:00 2001 From: accountas Date: Tue, 2 Sep 2025 11:34:01 +0300 Subject: [PATCH 1/2] Fix dataloss, do not ignore interrupt exceptions while commit thread is running --- .../adform/streamloader/sink/batch/RecordBatchingSinker.scala | 4 ++-- .../src/main/scala/com/adform/streamloader/util/Retry.scala | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala b/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala index 3c48b017..ad3b2561 100644 --- a/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala +++ b/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala @@ -73,7 +73,7 @@ class RecordBatchingSinker[B <: RecordBatch]( log.info(s"Committing batch $batch to storage") Metrics.commitDuration.recordCallable(() => - retryOnFailureIf(retryPolicy)(!batchCommittedAfterFailure(batch)) { + retryOnFailureIf(retryPolicy)(isRunning.get() && !batchCommittedAfterFailure(batch)) { batchStorage.commitBatch(batch) } ) @@ -86,7 +86,7 @@ class RecordBatchingSinker[B <: RecordBatch]( log.warn("Failed discarding batch") } } catch { - case e if isInterruptionException(e) => + case e if isInterruptionException(e) && !isRunning.get() => log.debug("Batch commit thread interrupted") } }, diff --git a/stream-loader-core/src/main/scala/com/adform/streamloader/util/Retry.scala b/stream-loader-core/src/main/scala/com/adform/streamloader/util/Retry.scala index 5c2fa6c1..26260fda 100644 --- a/stream-loader-core/src/main/scala/com/adform/streamloader/util/Retry.scala +++ b/stream-loader-core/src/main/scala/com/adform/streamloader/util/Retry.scala @@ -46,7 +46,6 @@ object Retry extends Logging { def isInterruptionException(e: Throwable): Boolean = e match { case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException | _: InterruptException => true - case e: Throwable if e.getCause != null && isInterruptionException(e.getCause) => true case _ => false } From 912c7b3a9ef4b50a3354cfcd950784481a8bb1eb Mon Sep 17 00:00:00 2001 From: accountas Date: Wed, 3 Sep 2025 14:00:34 +0300 Subject: [PATCH 2/2] Disable vertica integration tests due to missing image --- .../com/adform/streamloader/VerticaIntegrationTests.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala index fc830c7b..c21be981 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala @@ -12,6 +12,7 @@ import com.adform.streamloader.behaviors.BasicLoaderBehaviors import com.adform.streamloader.fixtures._ import com.adform.streamloader.storage._ import com.zaxxer.hikari.{HikariConfig, HikariDataSource} +import org.scalatest.Ignore import org.scalatest.concurrent.Eventually import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers @@ -21,6 +22,9 @@ import org.scalatestplus.scalacheck.Checkers import scala.concurrent.ExecutionContext @Slow +// Temporarily ignore Vertica tests, vertica-ce image is not available on DockerHub +// https://github.com/vertica/vertica-containers/issues/64 +@Ignore class VerticaIntegrationTests extends AnyFunSpec with Matchers