diff --git a/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala b/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala index 3c48b01..ad3b256 100644 --- a/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala +++ b/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala @@ -73,7 +73,7 @@ class RecordBatchingSinker[B <: RecordBatch]( log.info(s"Committing batch $batch to storage") Metrics.commitDuration.recordCallable(() => - retryOnFailureIf(retryPolicy)(!batchCommittedAfterFailure(batch)) { + retryOnFailureIf(retryPolicy)(isRunning.get() && !batchCommittedAfterFailure(batch)) { batchStorage.commitBatch(batch) } ) @@ -86,7 +86,7 @@ class RecordBatchingSinker[B <: RecordBatch]( log.warn("Failed discarding batch") } } catch { - case e if isInterruptionException(e) => + case e if isInterruptionException(e) && !isRunning.get() => log.debug("Batch commit thread interrupted") } }, diff --git a/stream-loader-core/src/main/scala/com/adform/streamloader/util/Retry.scala b/stream-loader-core/src/main/scala/com/adform/streamloader/util/Retry.scala index 5c2fa6c..26260fd 100644 --- a/stream-loader-core/src/main/scala/com/adform/streamloader/util/Retry.scala +++ b/stream-loader-core/src/main/scala/com/adform/streamloader/util/Retry.scala @@ -46,7 +46,6 @@ object Retry extends Logging { def isInterruptionException(e: Throwable): Boolean = e match { case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException | _: InterruptException => true - case e: Throwable if e.getCause != null && isInterruptionException(e.getCause) => true case _ => false } diff --git a/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala b/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala index fc830c7..c21be98 100644 --- a/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala +++ b/stream-loader-tests/src/test/scala/com/adform/streamloader/VerticaIntegrationTests.scala @@ -12,6 +12,7 @@ import com.adform.streamloader.behaviors.BasicLoaderBehaviors import com.adform.streamloader.fixtures._ import com.adform.streamloader.storage._ import com.zaxxer.hikari.{HikariConfig, HikariDataSource} +import org.scalatest.Ignore import org.scalatest.concurrent.Eventually import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers @@ -21,6 +22,9 @@ import org.scalatestplus.scalacheck.Checkers import scala.concurrent.ExecutionContext @Slow +// Temporarily ignore Vertica tests, vertica-ce image is not available on DockerHub +// https://github.com/vertica/vertica-containers/issues/64 +@Ignore class VerticaIntegrationTests extends AnyFunSpec with Matchers