diff --git a/src/main/scala/com/github/timgent/dataflare/FlareError.scala b/src/main/scala/com/github/timgent/dataflare/FlareError.scala index b380150..0834a59 100644 --- a/src/main/scala/com/github/timgent/dataflare/FlareError.scala +++ b/src/main/scala/com/github/timgent/dataflare/FlareError.scala @@ -38,4 +38,23 @@ object FlareError { |err: ${err.map(_.getMessage)} |msg: $msg""".stripMargin } + + sealed trait MetricLookupError extends FlareError + + case object MetricMissing extends MetricLookupError { + override def datasourceDescription: Option[DatasourceDescription] = None + + override def msg: String = "Unexpected failure - Metric lookup failed - no metric found - please raise an Issue on Github" + + override def err: Option[Throwable] = None + } + + case object LookedUpMetricOfWrongType extends MetricLookupError { + override def datasourceDescription: Option[DatasourceDescription] = None + + override def msg: String = + "Unexpected failure - Metric lookup failed - metric found was of wrong type - please raise an Issue on Github" + + override def err: Option[Throwable] = None + } } diff --git a/src/main/scala/com/github/timgent/dataflare/checks/QCCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/QCCheck.scala index 5771099..0770559 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/QCCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/QCCheck.scala @@ -73,5 +73,6 @@ private[dataflare] object QcType extends Enum[QcType] { case object ArbDualDsCheck extends QcType case object ArbitraryCheck extends QcType case object SingleMetricCheck extends QcType + case object SingleMetricAnomalyCheck extends QcType case object DualMetricCheck extends QcType } diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala index 84ba24e..03adce1 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala @@ -1,11 +1,29 @@ package com.github.timgent.dataflare.checks.metrics -import com.github.timgent.dataflare.FlareError.MetricCalculationError +import com.github.timgent.dataflare.FlareError +import com.github.timgent.dataflare.FlareError.{LookedUpMetricOfWrongType, MetricCalculationError, MetricLookupError, MetricMissing} import com.github.timgent.dataflare.checks.{CheckResult, CheckStatus, DatasourceDescription, QCCheck, QcType} +import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue} + +import scala.reflect.ClassTag private[dataflare] trait MetricsBasedCheck extends QCCheck { - override def qcType: QcType = QcType.SingleMetricCheck + // typeTag required here to enable match of metric on type MV. Without class tag this type check would be fruitless + protected def getMetric[MV <: MetricValue](metricDescriptor: MetricDescriptor.Aux[MV], metrics: Map[MetricDescriptor, MetricValue])( + implicit classTag: ClassTag[MV] + ): Either[MetricLookupError, MV] = { + val metricOfInterestOpt: Option[MetricValue] = + metrics.get(metricDescriptor).map(metricValue => metricValue) + metricOfInterestOpt match { + case Some(metric) => + metric match { // TODO: Look into heterogenous maps to avoid this type test - https://github.com/milessabin/shapeless/wiki/Feature-overview:-shapeless-1.2.4#heterogenous-maps + case metric: MV => Right(metric) + case _ => Left(LookedUpMetricOfWrongType) + } + case None => Left(MetricMissing) + } + } private[dataflare] def getMetricErrorCheckResult(datasourceDescription: DatasourceDescription, err: MetricCalculationError*) = CheckResult( diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala new file mode 100644 index 0000000..4002b77 --- /dev/null +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala @@ -0,0 +1,109 @@ +package com.github.timgent.dataflare.checks.metrics + +import java.time.Instant + +import com.github.timgent.dataflare.FlareError.{LookedUpMetricOfWrongType, MetricMissing} +import com.github.timgent.dataflare.checks.CheckDescription.SingleMetricCheckDescription +import com.github.timgent.dataflare.checks.QCCheck.SingleDsCheck +import com.github.timgent.dataflare.checks._ +import com.github.timgent.dataflare.metrics.MetricValue.LongMetric +import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue} + +import scala.reflect.ClassTag + +/** + * A check based on a single metric and the history of that metric, in order to detect anomalies + * + * @param metric - describes the metric the check will be done on + * @param checkDescription - the user friendly description for this check + * @param check - the check to be done + * @tparam MV - the type of the MetricValue that will be calculated in order to complete this check + */ +case class SingleMetricAnomalyCheck[MV <: MetricValue](metric: MetricDescriptor { type MetricType = MV }, checkDescription: String)( + check: (MV#T, Map[Instant, MV#T]) => RawCheckResult +) extends MetricsBasedCheck + with SingleDsCheck { + + override def qcType: QcType = QcType.SingleMetricAnomalyCheck + + override def description: CheckDescription = + SingleMetricCheckDescription(checkDescription, metric.toSimpleMetricDescriptor) // TODO: Should have more info in the description + + def applyCheck(metric: MV, historicMetrics: Map[Instant, MV#T]): CheckResult = { + check(metric.value, historicMetrics).withDescription(QcType.SingleMetricAnomalyCheck, description) + } + + // typeTag required here to enable match of metric on type MV. Without class tag this type check would be fruitless + private[dataflare] final def applyCheckOnMetrics( + metrics: Map[MetricDescriptor, MetricValue], + historicMetrics: Map[Instant, MetricValue] + )(implicit classTag: ClassTag[MV]): CheckResult = { + val maybeMetricOfInterest = getMetric(metric, metrics) + val relevantHistoricMetrics: Map[Instant, MV#T] = historicMetrics.collect { + case (instant, metricValue: MV) => (instant, metricValue.value) + } + maybeMetricOfInterest match { + case Left(MetricMissing) => metricNotPresentErrorResult + case Left(LookedUpMetricOfWrongType) => metricTypeErrorResult + case Right(metric) => applyCheck(metric, relevantHistoricMetrics) + } + } + +} + +object SingleMetricAnomalyCheck { + def absoluteChangeAnomalyCheck( + maxReduction: Long, + maxIncrease: Long, + metricDescriptor: MetricDescriptor.Aux[LongMetric] // SizeMetric + ): SingleMetricAnomalyCheck[LongMetric] = + SingleMetricAnomalyCheck[LongMetric](metricDescriptor, "AbsoluteChangeAnomalyCheck") { (currentMetricValue, historicMetricValues) => + val (_, lastMetricValue) = historicMetricValues.maxBy(_._1) + val isWithinAcceptableRange = + (lastMetricValue + maxIncrease) <= currentMetricValue && (lastMetricValue - maxReduction) >= currentMetricValue + if (isWithinAcceptableRange) + RawCheckResult( + CheckStatus.Success, + s"MetricValue of $currentMetricValue was not anomalous compared to previous result of $lastMetricValue" + ) + else + RawCheckResult( + CheckStatus.Error, + s"MetricValue of $currentMetricValue was anomalous compared to previous result of $lastMetricValue" + ) + } + + def stdChangeAnomalyCheck( + lowerDeviationFactor: Int = 2, + higherDeviationFactor: Int = 2, + metricDescriptor: MetricDescriptor.Aux[LongMetric] + ): SingleMetricAnomalyCheck[LongMetric] = { + + SingleMetricAnomalyCheck[LongMetric](metricDescriptor, "STDChangeAnomalyCheck") { + (currentMetricValue: Long, historicMetricValues: Map[Instant, Long]) => + import com.github.timgent.dataflare.utils.stats.{mean, stdDev} + + val historicValues = historicMetricValues.values + + val historicMean: Double = mean(historicValues) + val historicSTD: Double = stdDev(historicValues) + + val isValidRange = + currentMetricValue >= historicMean - (lowerDeviationFactor * historicSTD) && + currentMetricValue <= (higherDeviationFactor * historicSTD) + historicMean + + if (isValidRange) + RawCheckResult( + CheckStatus.Success, + s"MetricValue of $currentMetricValue was not anomalous compared to previous results. Mean: $historicMean; STD: $historicSTD" + ) + else + RawCheckResult( + CheckStatus.Error, + s"MetricValue of $currentMetricValue was anomalous compared to previous results. Mean: $historicMean; STD: $historicSTD" + ) + + } + } + +} diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala index f55b365..9241db2 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala @@ -1,5 +1,7 @@ package com.github.timgent.dataflare.checks.metrics +import com.github.timgent.dataflare.FlareError +import com.github.timgent.dataflare.FlareError.{LookedUpMetricOfWrongType, MetricMissing} import com.github.timgent.dataflare.checks.CheckDescription.SingleMetricCheckDescription import com.github.timgent.dataflare.checks.QCCheck.SingleDsCheck import com.github.timgent.dataflare.checks.{CheckDescription, CheckResult, CheckStatus, QcType, RawCheckResult} @@ -22,6 +24,8 @@ case class SingleMetricCheck[MV <: MetricValue](metric: MetricDescriptor { type ) extends MetricsBasedCheck with SingleDsCheck { + override def qcType: QcType = QcType.SingleMetricCheck + override def description: CheckDescription = SingleMetricCheckDescription(checkDescription, metric.toSimpleMetricDescriptor) def applyCheck(metric: MV): CheckResult = { @@ -32,15 +36,11 @@ case class SingleMetricCheck[MV <: MetricValue](metric: MetricDescriptor { type private[dataflare] final def applyCheckOnMetrics( metrics: Map[MetricDescriptor, MetricValue] )(implicit classTag: ClassTag[MV]): CheckResult = { - val metricOfInterestOpt: Option[MetricValue] = - metrics.get(metric).map(metricValue => metricValue) - metricOfInterestOpt match { - case Some(metric) => - metric match { // TODO: Look into heterogenous maps to avoid this type test - https://github.com/milessabin/shapeless/wiki/Feature-overview:-shapeless-1.2.4#heterogenous-maps - case metric: MV => applyCheck(metric) - case _ => metricTypeErrorResult - } - case None => metricNotPresentErrorResult + val maybeMetric: Either[FlareError, MV] = getMetric(metric, metrics) + maybeMetric match { + case Left(MetricMissing) => metricNotPresentErrorResult + case Left(LookedUpMetricOfWrongType) => metricTypeErrorResult + case Right(metric) => applyCheck(metric) } } } diff --git a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala index 2ca1ae3..d06afad 100644 --- a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala +++ b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala @@ -8,8 +8,8 @@ import com.github.timgent.dataflare.checks.ArbDualDsCheck.DatasetPair import com.github.timgent.dataflare.checks.DatasourceDescription.{DualDsDescription, SingleDsDescription} import com.github.timgent.dataflare.checks.QCCheck.{DualDsQCCheck, SingleDsCheck} import com.github.timgent.dataflare.checks._ -import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricCheck} -import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue, MetricsCalculator} +import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricAnomalyCheck, SingleMetricCheck} +import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue, MetricsCalculator, SimpleMetricDescriptor} import com.github.timgent.dataflare.repository.{MetricsPersister, NullMetricsPersister, NullQcResultsRepository, QcResultsRepository} import org.apache.spark.sql.Dataset @@ -72,6 +72,12 @@ case class ChecksSuite( (dds, relevantChecks) } + private val singleMetricAnomalyChecks: Map[DescribedDs, Seq[SingleMetricAnomalyCheck[_]]] = singleDsChecks.map { + case (dds, checks) => + val relevantChecks = checks.collect { case check: SingleMetricAnomalyCheck[_] => check } + (dds, relevantChecks) + } + private val arbDualDsChecks: Map[DescribedDsPair, Seq[ArbDualDsCheck]] = dualDsChecks.map { case (ddsPair, checks) => val relevantChecks = checks.collect { case check: ArbDualDsCheck => check } @@ -138,6 +144,7 @@ case class ChecksSuite( */ private def getMinimumRequiredMetrics( seqSingleDatasetMetricsChecks: Map[DescribedDs, Seq[SingleMetricCheck[_]]], + seqSingleDatasetMetricAnomalyChecks: Map[DescribedDs, Seq[SingleMetricAnomalyCheck[_]]], seqDualDatasetMetricChecks: Map[DescribedDsPair, Seq[DualMetricCheck[_]]], trackMetrics: Map[DescribedDs, Seq[MetricDescriptor]] ): Map[DescribedDs, List[MetricDescriptor]] = { @@ -146,6 +153,11 @@ case class ChecksSuite( metricDescriptors = checks.map(_.metric).toList } yield (dds, metricDescriptors)).groupBy(_._1).mapValues(_.flatMap(_._2).toList) + val singleDatasetAnomalyMetricDescriptors: Map[DescribedDs, List[MetricDescriptor]] = (for { + (dds, checks) <- seqSingleDatasetMetricAnomalyChecks + metricDescriptors = checks.map(_.metric).toList + } yield (dds, metricDescriptors)).groupBy(_._1).mapValues(_.flatMap(_._2).toList) + val dualDatasetAMetricDescriptors: Map[DescribedDs, List[MetricDescriptor]] = (for { (ddsPair, checks) <- seqDualDatasetMetricChecks describedDatasetA: DescribedDs = ddsPair.ds @@ -159,7 +171,7 @@ case class ChecksSuite( } yield (describedDatasetB, metricDescriptors)).groupBy(_._1).mapValues(_.flatMap(_._2).toList) val allMetricDescriptors: Map[DescribedDs, List[MetricDescriptor]] = - (singleDatasetMetricDescriptors |+| dualDatasetAMetricDescriptors |+| dualDatasetBMetricDescriptors + (singleDatasetMetricDescriptors |+| singleDatasetAnomalyMetricDescriptors |+| dualDatasetAMetricDescriptors |+| dualDatasetBMetricDescriptors |+| trackMetrics.mapValues(_.toList)) .mapValues(_.distinct) @@ -170,7 +182,12 @@ case class ChecksSuite( timestamp: Instant )(implicit ec: ExecutionContext): Future[Seq[CheckResult]] = { val allMetricDescriptors: Map[DescribedDs, List[MetricDescriptor]] = - getMinimumRequiredMetrics(singleMetricChecks, dualMetricChecks, metricsToTrack) + getMinimumRequiredMetrics( + singleMetricChecks, + singleMetricAnomalyChecks, + dualMetricChecks, + metricsToTrack + ) val calculatedMetrics: Map[DescribedDs, Either[MetricCalculationError, Map[MetricDescriptor, MetricValue]]] = allMetricDescriptors.map { case (describedDataset, metricDescriptors) => @@ -178,6 +195,8 @@ case class ChecksSuite( MetricsCalculator.calculateMetrics(describedDataset, metricDescriptors) (describedDataset, metricValues) } + val allPreviousMetricsFut: Future[Map[Instant, Map[SingleDsDescription, Map[SimpleMetricDescriptor, MetricValue]]]] = + metricsPersister.loadAll val metricsToSave = calculatedMetrics.collect { case (describedDataset, Right(metrics)) => @@ -192,6 +211,7 @@ case class ChecksSuite( for { _ <- storedMetricsFut + allPreviousMetrics: Map[Instant, Map[SingleDsDescription, Map[SimpleMetricDescriptor, MetricValue]]] <- allPreviousMetricsFut } yield { val singleDatasetCheckResults: Seq[CheckResult] = singleMetricChecks.toSeq.flatMap { case (dds, checks) => @@ -207,6 +227,28 @@ case class ChecksSuite( checkResults } + val singleDatasetAnomalyCheckResults: Seq[CheckResult] = singleMetricAnomalyChecks.toSeq.flatMap { + case (dds, checks) => + val datasetDescription = SingleDsDescription(dds.description) + val maybeMetricsForDs: Either[MetricCalculationError, Map[MetricDescriptor, MetricValue]] = calculatedMetrics(dds) + val checkResults: Seq[CheckResult] = checks.map { check => + maybeMetricsForDs match { + case Left(err) => check.getMetricErrorCheckResult(dds.datasourceDescription, err) + case Right(metricsForDs: Map[MetricDescriptor, MetricValue]) => + val historicMetricsForOurMetric = allPreviousMetrics + .mapValues { ddsToMetricsMap => + for { + metricsMap <- ddsToMetricsMap.get(datasetDescription) + metricValue <- metricsMap.get(check.metric.toSimpleMetricDescriptor) + } yield metricValue + } + .collect { case (instant, Some(metricValue)) => (instant, metricValue) } + check.applyCheckOnMetrics(metricsForDs, historicMetricsForOurMetric).withDatasourceDescription(datasetDescription) + } + } + checkResults + } + val dualDatasetCheckResults: Seq[CheckResult] = dualMetricChecks.toSeq.flatMap { case (ddsPair, checks) => val dds = ddsPair.ds @@ -229,7 +271,7 @@ case class ChecksSuite( checkResults } - singleDatasetCheckResults ++ dualDatasetCheckResults + singleDatasetCheckResults ++ dualDatasetCheckResults ++ singleDatasetAnomalyCheckResults } } diff --git a/src/main/scala/com/github/timgent/dataflare/metrics/MetricDescriptor.scala b/src/main/scala/com/github/timgent/dataflare/metrics/MetricDescriptor.scala index 1d34900..893aa5f 100644 --- a/src/main/scala/com/github/timgent/dataflare/metrics/MetricDescriptor.scala +++ b/src/main/scala/com/github/timgent/dataflare/metrics/MetricDescriptor.scala @@ -38,6 +38,7 @@ private[dataflare] trait MetricDescriptor { } object MetricDescriptor { + type Aux[A] = MetricDescriptor { type MetricType = A } /** * A MetricDescriptor which can have the dataset filtered before the metric is calculated diff --git a/src/main/scala/com/github/timgent/dataflare/utils/stats.scala b/src/main/scala/com/github/timgent/dataflare/utils/stats.scala new file mode 100644 index 0000000..17a36ea --- /dev/null +++ b/src/main/scala/com/github/timgent/dataflare/utils/stats.scala @@ -0,0 +1,16 @@ +package com.github.timgent.dataflare.utils + +object stats { + + import Numeric.Implicits._ + + def mean[T: Numeric](xs: Iterable[T]): Double = xs.sum[T].toDouble() / xs.size + + def variance[T: Numeric](xs: Iterable[T]): Double = { + val avg = mean(xs) + xs.map(_.toDouble).map(a => math.pow(a - avg, 2)).sum / xs.size + } + + def stdDev[T: Numeric](xs: Iterable[T]): Double = math.sqrt(variance(xs)) + +} diff --git a/src/test/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheckTest.scala b/src/test/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheckTest.scala new file mode 100644 index 0000000..7f79667 --- /dev/null +++ b/src/test/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheckTest.scala @@ -0,0 +1,61 @@ +package com.github.timgent.dataflare.checks.metrics + +import java.time.Instant + +import com.github.timgent.dataflare.checks.CheckDescription.SingleMetricCheckDescription +import com.github.timgent.dataflare.checks.{CheckResult, CheckStatus, QcType} +import com.github.timgent.dataflare.metrics.MetricDescriptor.SizeMetric +import com.github.timgent.dataflare.metrics.MetricValue.LongMetric +import com.github.timgent.dataflare.metrics.SimpleMetricDescriptor +import com.github.timgent.dataflare.utils.CommonFixtures.now +import com.github.timgent.dataflare.utils.DateTimeUtils.InstantExtension +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class SingleMetricAnomalyCheckTest extends AnyWordSpec with Matchers { + "stdChangeAnomalyCheck" should { + "anomaly STD check" in { + val check: SingleMetricAnomalyCheck[LongMetric] = + SingleMetricAnomalyCheck.stdChangeAnomalyCheck(2, 2, SizeMetric()) + val metric: LongMetric = LongMetric(60) + val historicMetrics: Map[Instant, Long] = + Map( + now -> 50, + now.plusDays(1) -> 60, + now.plusDays(2) -> 70 + ) + + val result: CheckResult = check.applyCheck(metric, historicMetrics) + + result shouldBe CheckResult( + QcType.SingleMetricAnomalyCheck, + CheckStatus.Success, + "MetricValue of 60 was not anomalous compared to previous results. Mean: 60.0; STD: 8.16496580927726", + SingleMetricCheckDescription("STDChangeAnomalyCheck", SimpleMetricDescriptor("Size", Some("no filter"))), + None + ) + } + + "outlier should be treated as an anomaly" in { + val check: SingleMetricAnomalyCheck[LongMetric] = + SingleMetricAnomalyCheck.stdChangeAnomalyCheck(2, 2, SizeMetric()) + val metric: LongMetric = LongMetric(35) + val historicMetrics: Map[Instant, Long] = + Map( + now -> 50, + now.plusDays(1) -> 60, + now.plusDays(2) -> 70 + ) + + val result: CheckResult = check.applyCheck(metric, historicMetrics) + + result shouldBe CheckResult( + QcType.SingleMetricAnomalyCheck, + CheckStatus.Error, + "MetricValue of 35 was anomalous compared to previous results. Mean: 60.0; STD: 8.16496580927726", + SingleMetricCheckDescription("STDChangeAnomalyCheck", SimpleMetricDescriptor("Size", Some("no filter"))), + None + ) + } + } +} diff --git a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala index 665ba90..b4e769a 100644 --- a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala +++ b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala @@ -1,18 +1,20 @@ package com.github.timgent.dataflare.checkssuite -import java.time.Instant +import java.time.{Instant, LocalDateTime, ZoneOffset} +import com.github.timgent.dataflare.utils.DateTimeUtils.InstantExtension import cats.implicits._ import com.github.timgent.dataflare.FlareError +import scala.concurrent.Future import com.github.timgent.dataflare.FlareError.MetricCalculationError import com.github.timgent.dataflare.checks.ArbDualDsCheck.DatasetPair import com.github.timgent.dataflare.checks.CheckDescription.{DualMetricCheckDescription, SingleMetricCheckDescription} import com.github.timgent.dataflare.checks.DatasourceDescription.{DualDsDescription, SingleDsDescription} -import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricCheck} +import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricAnomalyCheck, SingleMetricCheck} import com.github.timgent.dataflare.checks._ import com.github.timgent.dataflare.metrics.MetricDescriptor.{SizeMetric, SumValuesMetric} import com.github.timgent.dataflare.metrics.MetricValue.LongMetric -import com.github.timgent.dataflare.metrics.{MetricComparator, MetricDescriptor, MetricFilter, SimpleMetricDescriptor} +import com.github.timgent.dataflare.metrics.{MetricComparator, MetricDescriptor, MetricFilter, MetricValue, SimpleMetricDescriptor} import com.github.timgent.dataflare.repository.{InMemoryMetricsPersister, InMemoryQcResultsRepository} import com.github.timgent.dataflare.thresholds.AbsoluteThreshold import com.github.timgent.dataflare.utils.CommonFixtures._ @@ -570,5 +572,51 @@ class ChecksSuiteTest extends AsyncWordSpec with DatasetSuiteBase with Matchers } } } + + "contains single metric anomaly checks" should { + "calculate required metrics and perform the anomaly checks" in { + val ds = Seq( + NumberString(1, "a"), + NumberString(2, "b") + ).toDS + + val checks: Map[DescribedDs, Seq[SingleMetricAnomalyCheck[LongMetric]]] = Map( + DescribedDs(ds, datasourceDescription) -> + Seq(SingleMetricAnomalyCheck.absoluteChangeAnomalyCheck(0, 0, SizeMetric())) + ) + val checkSuiteDescription = "my first metricsCheckSuite" + val mockMetricsPersister = new InMemoryMetricsPersister() + val metricsBasedChecksSuite = + ChecksSuite(checkSuiteDescription, singleDsChecks = checks, tags = someTags, metricsPersister = mockMetricsPersister) + + for { + _ <- mockMetricsPersister.save( + LocalDateTime.now.minusDays(1).toInstant(ZoneOffset.UTC), + Map( + SingleDsDescription(datasourceDescription) -> Map( + SizeMetric().toSimpleMetricDescriptor -> LongMetric(10) + ) + ) + ) + checkResults: ChecksSuiteResult <- metricsBasedChecksSuite.run(now) + } yield { + checkResults shouldBe ChecksSuiteResult( + CheckSuiteStatus.Error, + checkSuiteDescription, + Seq( + CheckResult( + QcType.SingleMetricAnomalyCheck, + CheckStatus.Error, + "MetricValue of 2 was anomalous compared to previous result of 10", + SingleMetricCheckDescription("AbsoluteChangeAnomalyCheck", SimpleMetricDescriptor("Size", Some("no filter"))), + Some(SingleDsDescription(datasourceDescription)) + ) + ), + now, + someTags + ) + } + } + } } }