From 1796371a16315d99bdd5cb2b7a9cb99e1ee06137 Mon Sep 17 00:00:00 2001 From: Tim Gent Date: Fri, 30 Oct 2020 11:54:47 +0000 Subject: [PATCH 1/7] Failing test for absoluteChangeAnomalyCheck --- .../timgent/dataflare/checks/QCCheck.scala | 1 + .../checks/metrics/MetricsBasedCheck.scala | 2 - .../metrics/SingleMetricAnomalyCheck.scala | 42 +++++++++++++++ .../checks/metrics/SingleMetricCheck.scala | 2 + .../checkssuite/ChecksSuiteTest.scala | 51 ++++++++++++++++++- 5 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala diff --git a/src/main/scala/com/github/timgent/dataflare/checks/QCCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/QCCheck.scala index 5771099..0770559 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/QCCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/QCCheck.scala @@ -73,5 +73,6 @@ private[dataflare] object QcType extends Enum[QcType] { case object ArbDualDsCheck extends QcType case object ArbitraryCheck extends QcType case object SingleMetricCheck extends QcType + case object SingleMetricAnomalyCheck extends QcType case object DualMetricCheck extends QcType } diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala index 84ba24e..397893c 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala @@ -5,8 +5,6 @@ import com.github.timgent.dataflare.checks.{CheckResult, CheckStatus, Datasource private[dataflare] trait MetricsBasedCheck extends QCCheck { - override def qcType: QcType = QcType.SingleMetricCheck - private[dataflare] def getMetricErrorCheckResult(datasourceDescription: DatasourceDescription, err: MetricCalculationError*) = CheckResult( qcType, diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala new file mode 100644 index 0000000..c4bba1d --- /dev/null +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala @@ -0,0 +1,42 @@ +package com.github.timgent.dataflare.checks.metrics + +import java.time.Instant + +import com.github.timgent.dataflare.checks.CheckDescription.SingleMetricCheckDescription +import com.github.timgent.dataflare.checks.QCCheck.SingleDsCheck +import com.github.timgent.dataflare.checks.{CheckDescription, CheckResult, QcType, RawCheckResult} +import com.github.timgent.dataflare.metrics.MetricValue.LongMetric +import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue} +import com.github.timgent.dataflare.repository.MetricsPersister + +import scala.reflect.ClassTag + +/** + * A check based on a single metric and the history of that metric, in order to detect anomalies + * + * @param metric - describes the metric the check will be done on + * @param checkDescription - the user friendly description for this check + * @param check - the check to be done + * @tparam MV - the type of the MetricValue that will be calculated in order to complete this check + */ +case class SingleMetricAnomalyCheck[MV <: MetricValue](metric: MetricDescriptor { type MetricType = MV }, checkDescription: String)( + check: (MV#T, Map[Instant, MV#T]) => RawCheckResult +) extends MetricsBasedCheck + with SingleDsCheck { + + override def qcType: QcType = QcType.SingleMetricAnomalyCheck + + override def description: CheckDescription = SingleMetricCheckDescription(checkDescription, metric.toSimpleMetricDescriptor) + + def applyCheck(metric: MV, historicMetrics: Map[Instant, MV#T]): CheckResult = { + check(metric.value, historicMetrics).withDescription(QcType.SingleMetricCheck, description) + } +} + +object SingleMetricAnomalyCheck { + def absoluteChangeAnomalyCheck( + maxReduction: Long, + maxIncrease: Long, + metricDescriptor: MetricDescriptor + ): SingleMetricAnomalyCheck[LongMetric] = ??? +} diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala index f55b365..6c90faf 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala @@ -22,6 +22,8 @@ case class SingleMetricCheck[MV <: MetricValue](metric: MetricDescriptor { type ) extends MetricsBasedCheck with SingleDsCheck { + override def qcType: QcType = QcType.SingleMetricCheck + override def description: CheckDescription = SingleMetricCheckDescription(checkDescription, metric.toSimpleMetricDescriptor) def applyCheck(metric: MV): CheckResult = { diff --git a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala index 665ba90..e0ae987 100644 --- a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala +++ b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala @@ -1,6 +1,6 @@ package com.github.timgent.dataflare.checkssuite -import java.time.Instant +import java.time.{Instant, LocalDateTime, ZoneOffset} import cats.implicits._ import com.github.timgent.dataflare.FlareError @@ -8,7 +8,7 @@ import com.github.timgent.dataflare.FlareError.MetricCalculationError import com.github.timgent.dataflare.checks.ArbDualDsCheck.DatasetPair import com.github.timgent.dataflare.checks.CheckDescription.{DualMetricCheckDescription, SingleMetricCheckDescription} import com.github.timgent.dataflare.checks.DatasourceDescription.{DualDsDescription, SingleDsDescription} -import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricCheck} +import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricAnomalyCheck, SingleMetricCheck} import com.github.timgent.dataflare.checks._ import com.github.timgent.dataflare.metrics.MetricDescriptor.{SizeMetric, SumValuesMetric} import com.github.timgent.dataflare.metrics.MetricValue.LongMetric @@ -570,5 +570,52 @@ class ChecksSuiteTest extends AsyncWordSpec with DatasetSuiteBase with Matchers } } } + + "contains single metric anomaly checks" should { + "calculate metrics based checks on single datasets" in { + val ds = Seq( + NumberString(1, "a"), + NumberString(2, "b") + ).toDS + + val checks: Map[DescribedDs, Seq[SingleMetricAnomalyCheck[LongMetric]]] = Map( + DescribedDs(ds, datasourceDescription) -> + Seq(SingleMetricAnomalyCheck.absoluteChangeAnomalyCheck(0, 0, SizeMetric())) + ) + val checkSuiteDescription = "my first metricsCheckSuite" + val mockMetricsPersister = new InMemoryMetricsPersister() + val metricsBasedChecksSuite = + ChecksSuite(checkSuiteDescription, singleDsChecks = checks, tags = someTags, metricsPersister = mockMetricsPersister) + + for { + _ <- mockMetricsPersister.save( + LocalDateTime.now.minusDays(1).toInstant(ZoneOffset.UTC), + Map( + SingleDsDescription(datasourceDescription) -> Map( + SizeMetric().toSimpleMetricDescriptor -> LongMetric(10) + ) + ) + ) + checkResults: ChecksSuiteResult <- metricsBasedChecksSuite.run(now) + } yield { + checkResults shouldBe ChecksSuiteResult( + CheckSuiteStatus.Error, + checkSuiteDescription, + Seq( + CheckResult( + QcType.SingleMetricAnomalyCheck, + CheckStatus.Error, + "MetricValue of 2 was anomalous compared to previous result of 10", + SingleMetricCheckDescription("SizeCheck", SimpleMetricDescriptor("Size", Some("no filter"))), + Some(SingleDsDescription(datasourceDescription)) + ) + ), + now, + someTags + ) + } + } + + } } } From 6a36ec6194771dbed06fecc4a007ad8b14157331 Mon Sep 17 00:00:00 2001 From: Tim Gent Date: Fri, 30 Oct 2020 12:06:54 +0000 Subject: [PATCH 2/7] Implement absoluteChangeAnomalyCheck --- .../metrics/SingleMetricAnomalyCheck.scala | 21 ++++++++++++++++--- .../dataflare/metrics/MetricDescriptor.scala | 1 + 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala index c4bba1d..0914683 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala @@ -4,7 +4,7 @@ import java.time.Instant import com.github.timgent.dataflare.checks.CheckDescription.SingleMetricCheckDescription import com.github.timgent.dataflare.checks.QCCheck.SingleDsCheck -import com.github.timgent.dataflare.checks.{CheckDescription, CheckResult, QcType, RawCheckResult} +import com.github.timgent.dataflare.checks.{CheckDescription, CheckResult, CheckStatus, QcType, RawCheckResult} import com.github.timgent.dataflare.metrics.MetricValue.LongMetric import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue} import com.github.timgent.dataflare.repository.MetricsPersister @@ -37,6 +37,21 @@ object SingleMetricAnomalyCheck { def absoluteChangeAnomalyCheck( maxReduction: Long, maxIncrease: Long, - metricDescriptor: MetricDescriptor - ): SingleMetricAnomalyCheck[LongMetric] = ??? + metricDescriptor: MetricDescriptor.Aux[LongMetric] + ): SingleMetricAnomalyCheck[LongMetric] = + SingleMetricAnomalyCheck[LongMetric](metricDescriptor, "some check description") { (currentMetricValue, historicMetricValues) => + val (lastMetricTimestamp, lastMetricValue) = historicMetricValues.maxBy(_._1) + val isWithinAcceptableRange = + (lastMetricValue + maxIncrease) <= currentMetricValue && (lastMetricValue - maxReduction) >= currentMetricValue + if (isWithinAcceptableRange) + RawCheckResult( + CheckStatus.Success, + s"MetricValue of $currentMetricValue was anomalous compared to previous result of $lastMetricValue" + ) + else + RawCheckResult( + CheckStatus.Error, + s"MetricValue of $currentMetricValue was not anomalous compared to previous result of $lastMetricValue" + ) + } } diff --git a/src/main/scala/com/github/timgent/dataflare/metrics/MetricDescriptor.scala b/src/main/scala/com/github/timgent/dataflare/metrics/MetricDescriptor.scala index 1d34900..893aa5f 100644 --- a/src/main/scala/com/github/timgent/dataflare/metrics/MetricDescriptor.scala +++ b/src/main/scala/com/github/timgent/dataflare/metrics/MetricDescriptor.scala @@ -38,6 +38,7 @@ private[dataflare] trait MetricDescriptor { } object MetricDescriptor { + type Aux[A] = MetricDescriptor { type MetricType = A } /** * A MetricDescriptor which can have the dataset filtered before the metric is calculated From ffdaff3bb6fb4fae54499865c36c79deaddde66c Mon Sep 17 00:00:00 2001 From: Tim Gent Date: Fri, 30 Oct 2020 13:00:16 +0000 Subject: [PATCH 3/7] Wire in Anomaly checks (tests passing) --- .../metrics/SingleMetricAnomalyCheck.scala | 40 ++++++++++++---- .../dataflare/checkssuite/ChecksSuite.scala | 46 +++++++++++++++++-- .../checkssuite/ChecksSuiteTest.scala | 2 +- 3 files changed, 74 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala index 0914683..14d9cc5 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala @@ -4,10 +4,9 @@ import java.time.Instant import com.github.timgent.dataflare.checks.CheckDescription.SingleMetricCheckDescription import com.github.timgent.dataflare.checks.QCCheck.SingleDsCheck -import com.github.timgent.dataflare.checks.{CheckDescription, CheckResult, CheckStatus, QcType, RawCheckResult} +import com.github.timgent.dataflare.checks._ import com.github.timgent.dataflare.metrics.MetricValue.LongMetric -import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue} -import com.github.timgent.dataflare.repository.MetricsPersister +import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue, SimpleMetricDescriptor} import scala.reflect.ClassTag @@ -26,11 +25,36 @@ case class SingleMetricAnomalyCheck[MV <: MetricValue](metric: MetricDescriptor override def qcType: QcType = QcType.SingleMetricAnomalyCheck - override def description: CheckDescription = SingleMetricCheckDescription(checkDescription, metric.toSimpleMetricDescriptor) + override def description: CheckDescription = + SingleMetricCheckDescription(checkDescription, metric.toSimpleMetricDescriptor) // TODO: Should have more info in the description def applyCheck(metric: MV, historicMetrics: Map[Instant, MV#T]): CheckResult = { - check(metric.value, historicMetrics).withDescription(QcType.SingleMetricCheck, description) + check(metric.value, historicMetrics).withDescription(QcType.SingleMetricAnomalyCheck, description) } + + // typeTag required here to enable match of metric on type MV. Without class tag this type check would be fruitless + private[dataflare] final def applyCheckOnMetrics( // TODO: Remove duplication with SingleMetricCheck + metrics: Map[MetricDescriptor, MetricValue], + historicMetrics: Map[Instant, Map[SimpleMetricDescriptor, MetricValue]] + )(implicit classTag: ClassTag[MV]): CheckResult = { + val metricOfInterestOpt: Option[MetricValue] = + metrics.get(metric).map(metricValue => metricValue) + val relevantHistoricMetrics = + historicMetrics + .mapValues { m => + m(metric.toSimpleMetricDescriptor).value + } + .asInstanceOf[Map[Instant, MV#T]] + metricOfInterestOpt match { + case Some(metric) => + metric match { + case metric: MV => applyCheck(metric, relevantHistoricMetrics) + case _ => metricTypeErrorResult + } + case None => metricNotPresentErrorResult + } + } + } object SingleMetricAnomalyCheck { @@ -39,19 +63,19 @@ object SingleMetricAnomalyCheck { maxIncrease: Long, metricDescriptor: MetricDescriptor.Aux[LongMetric] ): SingleMetricAnomalyCheck[LongMetric] = - SingleMetricAnomalyCheck[LongMetric](metricDescriptor, "some check description") { (currentMetricValue, historicMetricValues) => + SingleMetricAnomalyCheck[LongMetric](metricDescriptor, "AbsoluteChangeAnomalyCheck") { (currentMetricValue, historicMetricValues) => val (lastMetricTimestamp, lastMetricValue) = historicMetricValues.maxBy(_._1) val isWithinAcceptableRange = (lastMetricValue + maxIncrease) <= currentMetricValue && (lastMetricValue - maxReduction) >= currentMetricValue if (isWithinAcceptableRange) RawCheckResult( CheckStatus.Success, - s"MetricValue of $currentMetricValue was anomalous compared to previous result of $lastMetricValue" + s"MetricValue of $currentMetricValue was not anomalous compared to previous result of $lastMetricValue" ) else RawCheckResult( CheckStatus.Error, - s"MetricValue of $currentMetricValue was not anomalous compared to previous result of $lastMetricValue" + s"MetricValue of $currentMetricValue was anomalous compared to previous result of $lastMetricValue" ) } } diff --git a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala index 2ca1ae3..c4da38c 100644 --- a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala +++ b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala @@ -8,8 +8,8 @@ import com.github.timgent.dataflare.checks.ArbDualDsCheck.DatasetPair import com.github.timgent.dataflare.checks.DatasourceDescription.{DualDsDescription, SingleDsDescription} import com.github.timgent.dataflare.checks.QCCheck.{DualDsQCCheck, SingleDsCheck} import com.github.timgent.dataflare.checks._ -import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricCheck} -import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue, MetricsCalculator} +import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetricAnomalyCheck, SingleMetricCheck} +import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue, MetricsCalculator, SimpleMetricDescriptor} import com.github.timgent.dataflare.repository.{MetricsPersister, NullMetricsPersister, NullQcResultsRepository, QcResultsRepository} import org.apache.spark.sql.Dataset @@ -72,6 +72,12 @@ case class ChecksSuite( (dds, relevantChecks) } + private val singleMetricAnomalyChecks: Map[DescribedDs, Seq[SingleMetricAnomalyCheck[_]]] = singleDsChecks.map { + case (dds, checks) => + val relevantChecks = checks.collect { case check: SingleMetricAnomalyCheck[_] => check } + (dds, relevantChecks) + } + private val arbDualDsChecks: Map[DescribedDsPair, Seq[ArbDualDsCheck]] = dualDsChecks.map { case (ddsPair, checks) => val relevantChecks = checks.collect { case check: ArbDualDsCheck => check } @@ -138,6 +144,7 @@ case class ChecksSuite( */ private def getMinimumRequiredMetrics( seqSingleDatasetMetricsChecks: Map[DescribedDs, Seq[SingleMetricCheck[_]]], + seqSingleDatasetMetricAnomalyChecks: Map[DescribedDs, Seq[SingleMetricAnomalyCheck[_]]], seqDualDatasetMetricChecks: Map[DescribedDsPair, Seq[DualMetricCheck[_]]], trackMetrics: Map[DescribedDs, Seq[MetricDescriptor]] ): Map[DescribedDs, List[MetricDescriptor]] = { @@ -146,6 +153,11 @@ case class ChecksSuite( metricDescriptors = checks.map(_.metric).toList } yield (dds, metricDescriptors)).groupBy(_._1).mapValues(_.flatMap(_._2).toList) + val singleDatasetAnomalyMetricDescriptors: Map[DescribedDs, List[MetricDescriptor]] = (for { + (dds, checks) <- seqSingleDatasetMetricAnomalyChecks + metricDescriptors = checks.map(_.metric).toList + } yield (dds, metricDescriptors)).groupBy(_._1).mapValues(_.flatMap(_._2).toList) + val dualDatasetAMetricDescriptors: Map[DescribedDs, List[MetricDescriptor]] = (for { (ddsPair, checks) <- seqDualDatasetMetricChecks describedDatasetA: DescribedDs = ddsPair.ds @@ -159,7 +171,7 @@ case class ChecksSuite( } yield (describedDatasetB, metricDescriptors)).groupBy(_._1).mapValues(_.flatMap(_._2).toList) val allMetricDescriptors: Map[DescribedDs, List[MetricDescriptor]] = - (singleDatasetMetricDescriptors |+| dualDatasetAMetricDescriptors |+| dualDatasetBMetricDescriptors + (singleDatasetMetricDescriptors |+| singleDatasetAnomalyMetricDescriptors |+| dualDatasetAMetricDescriptors |+| dualDatasetBMetricDescriptors |+| trackMetrics.mapValues(_.toList)) .mapValues(_.distinct) @@ -170,7 +182,12 @@ case class ChecksSuite( timestamp: Instant )(implicit ec: ExecutionContext): Future[Seq[CheckResult]] = { val allMetricDescriptors: Map[DescribedDs, List[MetricDescriptor]] = - getMinimumRequiredMetrics(singleMetricChecks, dualMetricChecks, metricsToTrack) + getMinimumRequiredMetrics( + singleMetricChecks, + singleMetricAnomalyChecks, + dualMetricChecks, + metricsToTrack + ) val calculatedMetrics: Map[DescribedDs, Either[MetricCalculationError, Map[MetricDescriptor, MetricValue]]] = allMetricDescriptors.map { case (describedDataset, metricDescriptors) => @@ -178,6 +195,8 @@ case class ChecksSuite( MetricsCalculator.calculateMetrics(describedDataset, metricDescriptors) (describedDataset, metricValues) } + val allPreviousMetricsFut: Future[Map[Instant, Map[SingleDsDescription, Map[SimpleMetricDescriptor, MetricValue]]]] = + metricsPersister.loadAll val metricsToSave = calculatedMetrics.collect { case (describedDataset, Right(metrics)) => @@ -192,6 +211,7 @@ case class ChecksSuite( for { _ <- storedMetricsFut + allPreviousMetrics: Map[Instant, Map[SingleDsDescription, Map[SimpleMetricDescriptor, MetricValue]]] <- allPreviousMetricsFut } yield { val singleDatasetCheckResults: Seq[CheckResult] = singleMetricChecks.toSeq.flatMap { case (dds, checks) => @@ -207,6 +227,22 @@ case class ChecksSuite( checkResults } + val singleDatasetAnomalyCheckResults: Seq[CheckResult] = singleMetricAnomalyChecks.toSeq.flatMap { + case (dds, checks) => + val datasetDescription = SingleDsDescription(dds.description) + val maybeMetricsForDs: Either[MetricCalculationError, Map[MetricDescriptor, MetricValue]] = calculatedMetrics(dds) + val checkResults: Seq[CheckResult] = checks.map { check => + maybeMetricsForDs match { + case Left(err) => check.getMetricErrorCheckResult(dds.datasourceDescription, err) + case Right(metricsForDs: Map[MetricDescriptor, MetricValue]) => + val historicMetricsForOurDs: Map[Instant, Map[SimpleMetricDescriptor, MetricValue]] = + allPreviousMetrics.mapValues(_.apply(datasetDescription)) // TODO: Handle error case! + check.applyCheckOnMetrics(metricsForDs, historicMetricsForOurDs).withDatasourceDescription(datasetDescription) + } + } + checkResults + } + val dualDatasetCheckResults: Seq[CheckResult] = dualMetricChecks.toSeq.flatMap { case (ddsPair, checks) => val dds = ddsPair.ds @@ -229,7 +265,7 @@ case class ChecksSuite( checkResults } - singleDatasetCheckResults ++ dualDatasetCheckResults + singleDatasetCheckResults ++ dualDatasetCheckResults ++ singleDatasetAnomalyCheckResults } } diff --git a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala index e0ae987..9d3a904 100644 --- a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala +++ b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala @@ -606,7 +606,7 @@ class ChecksSuiteTest extends AsyncWordSpec with DatasetSuiteBase with Matchers QcType.SingleMetricAnomalyCheck, CheckStatus.Error, "MetricValue of 2 was anomalous compared to previous result of 10", - SingleMetricCheckDescription("SizeCheck", SimpleMetricDescriptor("Size", Some("no filter"))), + SingleMetricCheckDescription("AbsoluteChangeAnomalyCheck", SimpleMetricDescriptor("Size", Some("no filter"))), Some(SingleDsDescription(datasourceDescription)) ) ), From eccf0b9f1f4da458c02876561e26d5e367896829 Mon Sep 17 00:00:00 2001 From: Tim Gent Date: Fri, 30 Oct 2020 14:36:16 +0000 Subject: [PATCH 4/7] Anomalous checks based on historic std deviation and mean --- .../metrics/SingleMetricAnomalyCheck.scala | 38 ++++++++++- .../timgent/dataflare/utils/stats.scala | 16 +++++ .../checkssuite/ChecksSuiteTest.scala | 68 ++++++++++++++++++- 3 files changed, 119 insertions(+), 3 deletions(-) create mode 100644 src/main/scala/com/github/timgent/dataflare/utils/stats.scala diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala index 14d9cc5..56f619e 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala @@ -61,10 +61,10 @@ object SingleMetricAnomalyCheck { def absoluteChangeAnomalyCheck( maxReduction: Long, maxIncrease: Long, - metricDescriptor: MetricDescriptor.Aux[LongMetric] + metricDescriptor: MetricDescriptor.Aux[LongMetric] // SizeMetric ): SingleMetricAnomalyCheck[LongMetric] = SingleMetricAnomalyCheck[LongMetric](metricDescriptor, "AbsoluteChangeAnomalyCheck") { (currentMetricValue, historicMetricValues) => - val (lastMetricTimestamp, lastMetricValue) = historicMetricValues.maxBy(_._1) + val (_, lastMetricValue) = historicMetricValues.maxBy(_._1) val isWithinAcceptableRange = (lastMetricValue + maxIncrease) <= currentMetricValue && (lastMetricValue - maxReduction) >= currentMetricValue if (isWithinAcceptableRange) @@ -78,4 +78,38 @@ object SingleMetricAnomalyCheck { s"MetricValue of $currentMetricValue was anomalous compared to previous result of $lastMetricValue" ) } + + def stdChangeAnomalyCheck( + lowerDiviationFactor: Int = 2, + higherDiviationFactor: Int = 2, + metricDescriptor: MetricDescriptor.Aux[LongMetric] + ): SingleMetricAnomalyCheck[LongMetric] = { + + SingleMetricAnomalyCheck[LongMetric](metricDescriptor, "STDChangeAnomalyCheck") { + (currentMetricValue: Long, historicMetricValues: Map[Instant, Long]) => + import com.github.timgent.dataflare.utils.stats.{mean, stdDev} + + val historicValues = historicMetricValues.values + + val historicMean: Double = mean(historicValues) + val historicSTD: Double = stdDev(historicValues) + + val isValidRange = + currentMetricValue >= historicMean - (lowerDiviationFactor * historicSTD) && + currentMetricValue <= (higherDiviationFactor * historicSTD) + historicMean + + if (isValidRange) + RawCheckResult( + CheckStatus.Success, + s"MetricValue of $currentMetricValue was not anomalous compared to previous results. Mean: $historicMean; STD: $historicSTD" + ) + else + RawCheckResult( + CheckStatus.Error, + s"MetricValue of $currentMetricValue was anomalous compared to previous results. Mean: $historicMean; STD: $historicSTD" + ) + + } + } + } diff --git a/src/main/scala/com/github/timgent/dataflare/utils/stats.scala b/src/main/scala/com/github/timgent/dataflare/utils/stats.scala new file mode 100644 index 0000000..17a36ea --- /dev/null +++ b/src/main/scala/com/github/timgent/dataflare/utils/stats.scala @@ -0,0 +1,16 @@ +package com.github.timgent.dataflare.utils + +object stats { + + import Numeric.Implicits._ + + def mean[T: Numeric](xs: Iterable[T]): Double = xs.sum[T].toDouble() / xs.size + + def variance[T: Numeric](xs: Iterable[T]): Double = { + val avg = mean(xs) + xs.map(_.toDouble).map(a => math.pow(a - avg, 2)).sum / xs.size + } + + def stdDev[T: Numeric](xs: Iterable[T]): Double = math.sqrt(variance(xs)) + +} diff --git a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala index 9d3a904..6508fe8 100644 --- a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala +++ b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala @@ -4,6 +4,7 @@ import java.time.{Instant, LocalDateTime, ZoneOffset} import cats.implicits._ import com.github.timgent.dataflare.FlareError +import scala.concurrent.Future import com.github.timgent.dataflare.FlareError.MetricCalculationError import com.github.timgent.dataflare.checks.ArbDualDsCheck.DatasetPair import com.github.timgent.dataflare.checks.CheckDescription.{DualMetricCheckDescription, SingleMetricCheckDescription} @@ -12,7 +13,7 @@ import com.github.timgent.dataflare.checks.metrics.{DualMetricCheck, SingleMetri import com.github.timgent.dataflare.checks._ import com.github.timgent.dataflare.metrics.MetricDescriptor.{SizeMetric, SumValuesMetric} import com.github.timgent.dataflare.metrics.MetricValue.LongMetric -import com.github.timgent.dataflare.metrics.{MetricComparator, MetricDescriptor, MetricFilter, SimpleMetricDescriptor} +import com.github.timgent.dataflare.metrics.{MetricComparator, MetricDescriptor, MetricFilter, MetricValue, SimpleMetricDescriptor} import com.github.timgent.dataflare.repository.{InMemoryMetricsPersister, InMemoryQcResultsRepository} import com.github.timgent.dataflare.thresholds.AbsoluteThreshold import com.github.timgent.dataflare.utils.CommonFixtures._ @@ -616,6 +617,71 @@ class ChecksSuiteTest extends AsyncWordSpec with DatasetSuiteBase with Matchers } } + "anomaly STD check" in { + + val check: SingleMetricAnomalyCheck[LongMetric] = + SingleMetricAnomalyCheck.stdChangeAnomalyCheck(2, 2, SizeMetric()) + + val metrics: Map[MetricDescriptor, MetricValue] = + Map(SizeMetric() -> LongMetric(60)) + + def inner(n: Int): Map[SimpleMetricDescriptor, MetricValue] = + Map( + SizeMetric().toSimpleMetricDescriptor -> LongMetric(n) + ) + + val historicMetrics: Map[Instant, Map[SimpleMetricDescriptor, MetricValue]] = + Map( + LocalDateTime.now().toInstant(ZoneOffset.UTC) -> inner(50), + LocalDateTime.now().plusDays(1).toInstant(ZoneOffset.UTC) -> inner(60), + LocalDateTime.now().plusDays(2).toInstant(ZoneOffset.UTC) -> inner(70) + ) + + val result: CheckResult = check.applyCheckOnMetrics(metrics, historicMetrics) + + Future.successful { + result shouldBe CheckResult( + QcType.SingleMetricAnomalyCheck, + CheckStatus.Success, + "MetricValue of 60 was not anomalous compared to previous results. Mean: 60.0; STD: 8.16496580927726", + SingleMetricCheckDescription("STDChangeAnomalyCheck", SimpleMetricDescriptor("Size", Some("no filter"))), + None + ) + } + } + + "outlier should be treated as an anomaly" in { + + val check: SingleMetricAnomalyCheck[LongMetric] = + SingleMetricAnomalyCheck.stdChangeAnomalyCheck(2, 2, SizeMetric()) + + val metrics: Map[MetricDescriptor, MetricValue] = + Map(SizeMetric() -> LongMetric(35)) + + def inner(n: Int): Map[SimpleMetricDescriptor, MetricValue] = + Map( + SizeMetric().toSimpleMetricDescriptor -> LongMetric(n) + ) + + val historicMetrics: Map[Instant, Map[SimpleMetricDescriptor, MetricValue]] = + Map( + LocalDateTime.now().toInstant(ZoneOffset.UTC) -> inner(50), + LocalDateTime.now().plusDays(1).toInstant(ZoneOffset.UTC) -> inner(60), + LocalDateTime.now().plusDays(2).toInstant(ZoneOffset.UTC) -> inner(70) + ) + + val result: CheckResult = check.applyCheckOnMetrics(metrics, historicMetrics) + + Future.successful { + result shouldBe CheckResult( + QcType.SingleMetricAnomalyCheck, + CheckStatus.Error, + "MetricValue of 35 was anomalous compared to previous results. Mean: 60.0; STD: 8.16496580927726", + SingleMetricCheckDescription("STDChangeAnomalyCheck", SimpleMetricDescriptor("Size", Some("no filter"))), + None + ) + } + } } } } From bbf7c0b403e9d6e86c4624d6ba39ebc47eb92f10 Mon Sep 17 00:00:00 2001 From: Tim Gent Date: Fri, 30 Oct 2020 15:39:17 +0000 Subject: [PATCH 5/7] Remove duplication for applyCheckOnMetrics method --- .../github/timgent/dataflare/FlareError.scala | 19 ++++++++++++++ .../checks/metrics/MetricsBasedCheck.scala | 22 +++++++++++++++- .../metrics/SingleMetricAnomalyCheck.scala | 25 ++++++++----------- .../checks/metrics/SingleMetricCheck.scala | 16 ++++++------ 4 files changed, 58 insertions(+), 24 deletions(-) diff --git a/src/main/scala/com/github/timgent/dataflare/FlareError.scala b/src/main/scala/com/github/timgent/dataflare/FlareError.scala index b380150..0834a59 100644 --- a/src/main/scala/com/github/timgent/dataflare/FlareError.scala +++ b/src/main/scala/com/github/timgent/dataflare/FlareError.scala @@ -38,4 +38,23 @@ object FlareError { |err: ${err.map(_.getMessage)} |msg: $msg""".stripMargin } + + sealed trait MetricLookupError extends FlareError + + case object MetricMissing extends MetricLookupError { + override def datasourceDescription: Option[DatasourceDescription] = None + + override def msg: String = "Unexpected failure - Metric lookup failed - no metric found - please raise an Issue on Github" + + override def err: Option[Throwable] = None + } + + case object LookedUpMetricOfWrongType extends MetricLookupError { + override def datasourceDescription: Option[DatasourceDescription] = None + + override def msg: String = + "Unexpected failure - Metric lookup failed - metric found was of wrong type - please raise an Issue on Github" + + override def err: Option[Throwable] = None + } } diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala index 397893c..03adce1 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/MetricsBasedCheck.scala @@ -1,10 +1,30 @@ package com.github.timgent.dataflare.checks.metrics -import com.github.timgent.dataflare.FlareError.MetricCalculationError +import com.github.timgent.dataflare.FlareError +import com.github.timgent.dataflare.FlareError.{LookedUpMetricOfWrongType, MetricCalculationError, MetricLookupError, MetricMissing} import com.github.timgent.dataflare.checks.{CheckResult, CheckStatus, DatasourceDescription, QCCheck, QcType} +import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue} + +import scala.reflect.ClassTag private[dataflare] trait MetricsBasedCheck extends QCCheck { + // typeTag required here to enable match of metric on type MV. Without class tag this type check would be fruitless + protected def getMetric[MV <: MetricValue](metricDescriptor: MetricDescriptor.Aux[MV], metrics: Map[MetricDescriptor, MetricValue])( + implicit classTag: ClassTag[MV] + ): Either[MetricLookupError, MV] = { + val metricOfInterestOpt: Option[MetricValue] = + metrics.get(metricDescriptor).map(metricValue => metricValue) + metricOfInterestOpt match { + case Some(metric) => + metric match { // TODO: Look into heterogenous maps to avoid this type test - https://github.com/milessabin/shapeless/wiki/Feature-overview:-shapeless-1.2.4#heterogenous-maps + case metric: MV => Right(metric) + case _ => Left(LookedUpMetricOfWrongType) + } + case None => Left(MetricMissing) + } + } + private[dataflare] def getMetricErrorCheckResult(datasourceDescription: DatasourceDescription, err: MetricCalculationError*) = CheckResult( qcType, diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala index 56f619e..9245af6 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala @@ -2,6 +2,7 @@ package com.github.timgent.dataflare.checks.metrics import java.time.Instant +import com.github.timgent.dataflare.FlareError.{LookedUpMetricOfWrongType, MetricMissing} import com.github.timgent.dataflare.checks.CheckDescription.SingleMetricCheckDescription import com.github.timgent.dataflare.checks.QCCheck.SingleDsCheck import com.github.timgent.dataflare.checks._ @@ -33,25 +34,21 @@ case class SingleMetricAnomalyCheck[MV <: MetricValue](metric: MetricDescriptor } // typeTag required here to enable match of metric on type MV. Without class tag this type check would be fruitless - private[dataflare] final def applyCheckOnMetrics( // TODO: Remove duplication with SingleMetricCheck + private[dataflare] final def applyCheckOnMetrics( metrics: Map[MetricDescriptor, MetricValue], historicMetrics: Map[Instant, Map[SimpleMetricDescriptor, MetricValue]] )(implicit classTag: ClassTag[MV]): CheckResult = { - val metricOfInterestOpt: Option[MetricValue] = - metrics.get(metric).map(metricValue => metricValue) - val relevantHistoricMetrics = + val maybeMetricOfInterest = getMetric(metric, metrics) + val relevantHistoricMetrics: Map[Instant, MV#T] = historicMetrics - .mapValues { m => - m(metric.toSimpleMetricDescriptor).value + .mapValues { metricsMap => + metricsMap.get(metric.toSimpleMetricDescriptor) } - .asInstanceOf[Map[Instant, MV#T]] - metricOfInterestOpt match { - case Some(metric) => - metric match { - case metric: MV => applyCheck(metric, relevantHistoricMetrics) - case _ => metricTypeErrorResult - } - case None => metricNotPresentErrorResult + .collect { case (instant, Some(metricValue: MV)) => (instant, metricValue.value) } + maybeMetricOfInterest match { + case Left(MetricMissing) => metricNotPresentErrorResult + case Left(LookedUpMetricOfWrongType) => metricTypeErrorResult + case Right(metric) => applyCheck(metric, relevantHistoricMetrics) } } diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala index 6c90faf..9241db2 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricCheck.scala @@ -1,5 +1,7 @@ package com.github.timgent.dataflare.checks.metrics +import com.github.timgent.dataflare.FlareError +import com.github.timgent.dataflare.FlareError.{LookedUpMetricOfWrongType, MetricMissing} import com.github.timgent.dataflare.checks.CheckDescription.SingleMetricCheckDescription import com.github.timgent.dataflare.checks.QCCheck.SingleDsCheck import com.github.timgent.dataflare.checks.{CheckDescription, CheckResult, CheckStatus, QcType, RawCheckResult} @@ -34,15 +36,11 @@ case class SingleMetricCheck[MV <: MetricValue](metric: MetricDescriptor { type private[dataflare] final def applyCheckOnMetrics( metrics: Map[MetricDescriptor, MetricValue] )(implicit classTag: ClassTag[MV]): CheckResult = { - val metricOfInterestOpt: Option[MetricValue] = - metrics.get(metric).map(metricValue => metricValue) - metricOfInterestOpt match { - case Some(metric) => - metric match { // TODO: Look into heterogenous maps to avoid this type test - https://github.com/milessabin/shapeless/wiki/Feature-overview:-shapeless-1.2.4#heterogenous-maps - case metric: MV => applyCheck(metric) - case _ => metricTypeErrorResult - } - case None => metricNotPresentErrorResult + val maybeMetric: Either[FlareError, MV] = getMetric(metric, metrics) + maybeMetric match { + case Left(MetricMissing) => metricNotPresentErrorResult + case Left(LookedUpMetricOfWrongType) => metricTypeErrorResult + case Right(metric) => applyCheck(metric) } } } From 9471b436233c2fa1466f61f9046bb7e0f23d8e3a Mon Sep 17 00:00:00 2001 From: Tim Gent Date: Fri, 30 Oct 2020 16:00:59 +0000 Subject: [PATCH 6/7] Minor tidy ups for code organisation for anomaly checks --- .../metrics/SingleMetricAnomalyCheck.scala | 21 +++--- .../dataflare/checkssuite/ChecksSuite.scala | 7 +- .../SingleMetricAnomalyCheckTest.scala | 61 ++++++++++++++++ .../checkssuite/ChecksSuiteTest.scala | 69 +------------------ 4 files changed, 78 insertions(+), 80 deletions(-) create mode 100644 src/test/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheckTest.scala diff --git a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala index 9245af6..4002b77 100644 --- a/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala +++ b/src/main/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheck.scala @@ -7,7 +7,7 @@ import com.github.timgent.dataflare.checks.CheckDescription.SingleMetricCheckDes import com.github.timgent.dataflare.checks.QCCheck.SingleDsCheck import com.github.timgent.dataflare.checks._ import com.github.timgent.dataflare.metrics.MetricValue.LongMetric -import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue, SimpleMetricDescriptor} +import com.github.timgent.dataflare.metrics.{MetricDescriptor, MetricValue} import scala.reflect.ClassTag @@ -36,15 +36,12 @@ case class SingleMetricAnomalyCheck[MV <: MetricValue](metric: MetricDescriptor // typeTag required here to enable match of metric on type MV. Without class tag this type check would be fruitless private[dataflare] final def applyCheckOnMetrics( metrics: Map[MetricDescriptor, MetricValue], - historicMetrics: Map[Instant, Map[SimpleMetricDescriptor, MetricValue]] + historicMetrics: Map[Instant, MetricValue] )(implicit classTag: ClassTag[MV]): CheckResult = { val maybeMetricOfInterest = getMetric(metric, metrics) - val relevantHistoricMetrics: Map[Instant, MV#T] = - historicMetrics - .mapValues { metricsMap => - metricsMap.get(metric.toSimpleMetricDescriptor) - } - .collect { case (instant, Some(metricValue: MV)) => (instant, metricValue.value) } + val relevantHistoricMetrics: Map[Instant, MV#T] = historicMetrics.collect { + case (instant, metricValue: MV) => (instant, metricValue.value) + } maybeMetricOfInterest match { case Left(MetricMissing) => metricNotPresentErrorResult case Left(LookedUpMetricOfWrongType) => metricTypeErrorResult @@ -77,8 +74,8 @@ object SingleMetricAnomalyCheck { } def stdChangeAnomalyCheck( - lowerDiviationFactor: Int = 2, - higherDiviationFactor: Int = 2, + lowerDeviationFactor: Int = 2, + higherDeviationFactor: Int = 2, metricDescriptor: MetricDescriptor.Aux[LongMetric] ): SingleMetricAnomalyCheck[LongMetric] = { @@ -92,8 +89,8 @@ object SingleMetricAnomalyCheck { val historicSTD: Double = stdDev(historicValues) val isValidRange = - currentMetricValue >= historicMean - (lowerDiviationFactor * historicSTD) && - currentMetricValue <= (higherDiviationFactor * historicSTD) + historicMean + currentMetricValue >= historicMean - (lowerDeviationFactor * historicSTD) && + currentMetricValue <= (higherDeviationFactor * historicSTD) + historicMean if (isValidRange) RawCheckResult( diff --git a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala index c4da38c..0b8d0b0 100644 --- a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala +++ b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala @@ -237,7 +237,12 @@ case class ChecksSuite( case Right(metricsForDs: Map[MetricDescriptor, MetricValue]) => val historicMetricsForOurDs: Map[Instant, Map[SimpleMetricDescriptor, MetricValue]] = allPreviousMetrics.mapValues(_.apply(datasetDescription)) // TODO: Handle error case! - check.applyCheckOnMetrics(metricsForDs, historicMetricsForOurDs).withDatasourceDescription(datasetDescription) + val historicMetricsForOurMetric = historicMetricsForOurDs + .mapValues { metricsMap => + metricsMap.get(check.metric.toSimpleMetricDescriptor) + } + .collect { case (instant, Some(metricValue)) => (instant, metricValue) } + check.applyCheckOnMetrics(metricsForDs, historicMetricsForOurMetric).withDatasourceDescription(datasetDescription) } } checkResults diff --git a/src/test/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheckTest.scala b/src/test/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheckTest.scala new file mode 100644 index 0000000..7f79667 --- /dev/null +++ b/src/test/scala/com/github/timgent/dataflare/checks/metrics/SingleMetricAnomalyCheckTest.scala @@ -0,0 +1,61 @@ +package com.github.timgent.dataflare.checks.metrics + +import java.time.Instant + +import com.github.timgent.dataflare.checks.CheckDescription.SingleMetricCheckDescription +import com.github.timgent.dataflare.checks.{CheckResult, CheckStatus, QcType} +import com.github.timgent.dataflare.metrics.MetricDescriptor.SizeMetric +import com.github.timgent.dataflare.metrics.MetricValue.LongMetric +import com.github.timgent.dataflare.metrics.SimpleMetricDescriptor +import com.github.timgent.dataflare.utils.CommonFixtures.now +import com.github.timgent.dataflare.utils.DateTimeUtils.InstantExtension +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class SingleMetricAnomalyCheckTest extends AnyWordSpec with Matchers { + "stdChangeAnomalyCheck" should { + "anomaly STD check" in { + val check: SingleMetricAnomalyCheck[LongMetric] = + SingleMetricAnomalyCheck.stdChangeAnomalyCheck(2, 2, SizeMetric()) + val metric: LongMetric = LongMetric(60) + val historicMetrics: Map[Instant, Long] = + Map( + now -> 50, + now.plusDays(1) -> 60, + now.plusDays(2) -> 70 + ) + + val result: CheckResult = check.applyCheck(metric, historicMetrics) + + result shouldBe CheckResult( + QcType.SingleMetricAnomalyCheck, + CheckStatus.Success, + "MetricValue of 60 was not anomalous compared to previous results. Mean: 60.0; STD: 8.16496580927726", + SingleMetricCheckDescription("STDChangeAnomalyCheck", SimpleMetricDescriptor("Size", Some("no filter"))), + None + ) + } + + "outlier should be treated as an anomaly" in { + val check: SingleMetricAnomalyCheck[LongMetric] = + SingleMetricAnomalyCheck.stdChangeAnomalyCheck(2, 2, SizeMetric()) + val metric: LongMetric = LongMetric(35) + val historicMetrics: Map[Instant, Long] = + Map( + now -> 50, + now.plusDays(1) -> 60, + now.plusDays(2) -> 70 + ) + + val result: CheckResult = check.applyCheck(metric, historicMetrics) + + result shouldBe CheckResult( + QcType.SingleMetricAnomalyCheck, + CheckStatus.Error, + "MetricValue of 35 was anomalous compared to previous results. Mean: 60.0; STD: 8.16496580927726", + SingleMetricCheckDescription("STDChangeAnomalyCheck", SimpleMetricDescriptor("Size", Some("no filter"))), + None + ) + } + } +} diff --git a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala index 6508fe8..b4e769a 100644 --- a/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala +++ b/src/test/scala/com/github/timgent/dataflare/checkssuite/ChecksSuiteTest.scala @@ -2,6 +2,7 @@ package com.github.timgent.dataflare.checkssuite import java.time.{Instant, LocalDateTime, ZoneOffset} +import com.github.timgent.dataflare.utils.DateTimeUtils.InstantExtension import cats.implicits._ import com.github.timgent.dataflare.FlareError import scala.concurrent.Future @@ -573,7 +574,7 @@ class ChecksSuiteTest extends AsyncWordSpec with DatasetSuiteBase with Matchers } "contains single metric anomaly checks" should { - "calculate metrics based checks on single datasets" in { + "calculate required metrics and perform the anomaly checks" in { val ds = Seq( NumberString(1, "a"), NumberString(2, "b") @@ -616,72 +617,6 @@ class ChecksSuiteTest extends AsyncWordSpec with DatasetSuiteBase with Matchers ) } } - - "anomaly STD check" in { - - val check: SingleMetricAnomalyCheck[LongMetric] = - SingleMetricAnomalyCheck.stdChangeAnomalyCheck(2, 2, SizeMetric()) - - val metrics: Map[MetricDescriptor, MetricValue] = - Map(SizeMetric() -> LongMetric(60)) - - def inner(n: Int): Map[SimpleMetricDescriptor, MetricValue] = - Map( - SizeMetric().toSimpleMetricDescriptor -> LongMetric(n) - ) - - val historicMetrics: Map[Instant, Map[SimpleMetricDescriptor, MetricValue]] = - Map( - LocalDateTime.now().toInstant(ZoneOffset.UTC) -> inner(50), - LocalDateTime.now().plusDays(1).toInstant(ZoneOffset.UTC) -> inner(60), - LocalDateTime.now().plusDays(2).toInstant(ZoneOffset.UTC) -> inner(70) - ) - - val result: CheckResult = check.applyCheckOnMetrics(metrics, historicMetrics) - - Future.successful { - result shouldBe CheckResult( - QcType.SingleMetricAnomalyCheck, - CheckStatus.Success, - "MetricValue of 60 was not anomalous compared to previous results. Mean: 60.0; STD: 8.16496580927726", - SingleMetricCheckDescription("STDChangeAnomalyCheck", SimpleMetricDescriptor("Size", Some("no filter"))), - None - ) - } - } - - "outlier should be treated as an anomaly" in { - - val check: SingleMetricAnomalyCheck[LongMetric] = - SingleMetricAnomalyCheck.stdChangeAnomalyCheck(2, 2, SizeMetric()) - - val metrics: Map[MetricDescriptor, MetricValue] = - Map(SizeMetric() -> LongMetric(35)) - - def inner(n: Int): Map[SimpleMetricDescriptor, MetricValue] = - Map( - SizeMetric().toSimpleMetricDescriptor -> LongMetric(n) - ) - - val historicMetrics: Map[Instant, Map[SimpleMetricDescriptor, MetricValue]] = - Map( - LocalDateTime.now().toInstant(ZoneOffset.UTC) -> inner(50), - LocalDateTime.now().plusDays(1).toInstant(ZoneOffset.UTC) -> inner(60), - LocalDateTime.now().plusDays(2).toInstant(ZoneOffset.UTC) -> inner(70) - ) - - val result: CheckResult = check.applyCheckOnMetrics(metrics, historicMetrics) - - Future.successful { - result shouldBe CheckResult( - QcType.SingleMetricAnomalyCheck, - CheckStatus.Error, - "MetricValue of 35 was anomalous compared to previous results. Mean: 60.0; STD: 8.16496580927726", - SingleMetricCheckDescription("STDChangeAnomalyCheck", SimpleMetricDescriptor("Size", Some("no filter"))), - None - ) - } - } } } } From ba46206461ae618245830804fb03fb326e808d66 Mon Sep 17 00:00:00 2001 From: Tim Gent Date: Fri, 30 Oct 2020 16:03:18 +0000 Subject: [PATCH 7/7] Handle error case if there were no previous metrics for a dataset with an anomaly check --- .../timgent/dataflare/checkssuite/ChecksSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala index 0b8d0b0..d06afad 100644 --- a/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala +++ b/src/main/scala/com/github/timgent/dataflare/checkssuite/ChecksSuite.scala @@ -235,11 +235,12 @@ case class ChecksSuite( maybeMetricsForDs match { case Left(err) => check.getMetricErrorCheckResult(dds.datasourceDescription, err) case Right(metricsForDs: Map[MetricDescriptor, MetricValue]) => - val historicMetricsForOurDs: Map[Instant, Map[SimpleMetricDescriptor, MetricValue]] = - allPreviousMetrics.mapValues(_.apply(datasetDescription)) // TODO: Handle error case! - val historicMetricsForOurMetric = historicMetricsForOurDs - .mapValues { metricsMap => - metricsMap.get(check.metric.toSimpleMetricDescriptor) + val historicMetricsForOurMetric = allPreviousMetrics + .mapValues { ddsToMetricsMap => + for { + metricsMap <- ddsToMetricsMap.get(datasetDescription) + metricValue <- metricsMap.get(check.metric.toSimpleMetricDescriptor) + } yield metricValue } .collect { case (instant, Some(metricValue)) => (instant, metricValue) } check.applyCheckOnMetrics(metricsForDs, historicMetricsForOurMetric).withDatasourceDescription(datasetDescription)