From 7ac913832335726ebd39b2ecef75c5408a7d6344 Mon Sep 17 00:00:00 2001 From: David Javakhishvili Date: Wed, 10 Apr 2019 09:42:17 +0300 Subject: [PATCH] Implementation of typed links, metrics and generic cleansing task --- .../datalake/environment/Environment.scala | 3 ++ .../environment/EnvironmentDispatcher.scala | 13 +++++ .../datalake/links/DataStorage.scala | 8 +++ .../links/FileSystemDataStorage.scala | 52 +++++++++++++++++++ .../datalake/links/MockDataStorage.scala | 16 ++++++ .../datalake/links/NullDataStorage.scala | 17 ++++++ .../datalake/links/Partitionable.scala | 16 ++++++ .../links/PartitionedDataStorage.scala | 7 +++ .../datalake/links/PathBasedDataStorage.scala | 14 +++++ .../links/ProxyPathBasedDataStorage.scala | 10 ++++ .../datalake/links/SnapshotDataStorage.scala | 25 +++++++++ .../datalake/links/TypeableDataLink.scala | 9 ++++ .../datalake/links/TypedDataLink.scala | 11 ++++ .../datalake/links/TypedPartitionable.scala | 24 +++++++++ .../datalake/links/UntypedDataLink.scala | 9 ++++ .../datalake/links/UntypedPartitionable.scala | 21 ++++++++ .../experimental/datalake/links/package.scala | 7 +++ .../experimental/metrics/Count.scala | 13 +++++ .../lighthouse/experimental/metrics/Max.scala | 11 ++++ .../experimental/metrics/Metric.scala | 8 +++ .../experimental/metrics/MetricsBundle.scala | 15 ++++++ .../lighthouse/experimental/metrics/Min.scala | 11 ++++ .../experimental/metrics/NullCount.scala | 15 ++++++ .../lighthouse/experimental/metrics/Sum.scala | 11 ++++ .../experimental/metrics/syntax/package.scala | 17 ++++++ .../tasks/GenericCleansingTask.scala | 22 ++++++++ 26 files changed, 385 insertions(+) create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/environment/Environment.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/environment/EnvironmentDispatcher.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/DataStorage.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/FileSystemDataStorage.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/MockDataStorage.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/NullDataStorage.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/Partitionable.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/PartitionedDataStorage.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/PathBasedDataStorage.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/ProxyPathBasedDataStorage.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/SnapshotDataStorage.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypeableDataLink.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypedDataLink.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypedPartitionable.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/UntypedDataLink.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/UntypedPartitionable.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/package.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Count.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Max.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Metric.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/MetricsBundle.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Min.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/NullCount.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Sum.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/syntax/package.scala create mode 100644 lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/tasks/GenericCleansingTask.scala diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/environment/Environment.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/environment/Environment.scala new file mode 100644 index 0000000..30e9cc0 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/environment/Environment.scala @@ -0,0 +1,3 @@ +package be.dataminded.lighthouse.experimental.datalake.environment + +abstract class Environment(val name: String) {} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/environment/EnvironmentDispatcher.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/environment/EnvironmentDispatcher.scala new file mode 100644 index 0000000..e38bfa4 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/environment/EnvironmentDispatcher.scala @@ -0,0 +1,13 @@ +package be.dataminded.lighthouse.experimental.datalake.environment +import be.dataminded.lighthouse.datalake.Datalake.{DefaultEnvironment, PropertyName} + +trait EnvironmentDispatcher[E <: Environment] { + private val environmentName: String = + Option(System.getProperty(PropertyName)).getOrElse(DefaultEnvironment) + + def enabledEnvironments: Set[E] + + def currentEnvironment: E = + enabledEnvironments.find(_.name == environmentName) + .getOrElse(throw new IllegalArgumentException(s"DataEnvironment not valid: $environmentName")) +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/DataStorage.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/DataStorage.scala new file mode 100644 index 0000000..da1a7eb --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/DataStorage.scala @@ -0,0 +1,8 @@ +package be.dataminded.lighthouse.experimental.datalake.links +import org.apache.spark.sql.DataFrame + +trait DataStorage { + protected def doRead: DataFrame + + protected def doWrite(data: DataFrame): Unit +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/FileSystemDataStorage.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/FileSystemDataStorage.scala new file mode 100644 index 0000000..d850400 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/FileSystemDataStorage.scala @@ -0,0 +1,52 @@ +package be.dataminded.lighthouse.experimental.datalake.links + +import be.dataminded.lighthouse.datalake._ +import be.dataminded.lighthouse.spark.{Orc, SparkFileFormat, SparkSessions} +import org.apache.spark.sql.{DataFrame, Encoder, Row, SaveMode} +import org.apache.spark.sql.types.StructType + +abstract class FileSystemDataStorage(val path: LazyConfig[String], + format: SparkFileFormat = Orc, + saveMode: SaveMode = SaveMode.Overwrite, + partitionedBy: List[String] = List.empty, + options: Map[String, String] = Map.empty, + schema: Option[StructType] = None) + extends PathBasedDataStorage + with SparkSessions { + + override protected[links] def readFrom(path: String): DataFrame = schema match { + case Some(s) => spark.read.format(format.toString).options(options).schema(s).load(path) + case None => spark.read.format(format.toString).options(options).load(path) + } + + override protected[links] def writeTo(path: String, data: DataFrame): Unit = + data.write + .format(format.toString) + .partitionBy(partitionedBy: _*) + .options(options) + .mode(saveMode) + .save(path) +} + +object FileSystemDataStorage { + def untyped(path: LazyConfig[String], + format: SparkFileFormat = Orc, + saveMode: SaveMode = SaveMode.Overwrite, + partitionedBy: List[String] = List.empty, + options: Map[String, String] = Map.empty, + schema: Option[StructType] = None): FileSystemDataStorage with UntypedDataLink with UntypedPartitionable = + new FileSystemDataStorage(path, format, saveMode, partitionedBy, options, schema) with UntypedDataLink + with UntypedPartitionable + + def typed[T: Encoder]( + path: LazyConfig[String], + format: SparkFileFormat = Orc, + saveMode: SaveMode = SaveMode.Overwrite, + partitionedBy: List[String] = List.empty, + options: Map[String, String] = Map.empty, + schema: Option[StructType] = None): FileSystemDataStorage with TypedDataLink[T] with TypedPartitionable[T] = + new FileSystemDataStorage(path, format, saveMode, partitionedBy, options, schema) with TypedDataLink[T] + with TypedPartitionable[T] { + override protected def encoder: Encoder[T] = implicitly + } +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/MockDataStorage.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/MockDataStorage.scala new file mode 100644 index 0000000..043f897 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/MockDataStorage.scala @@ -0,0 +1,16 @@ +package be.dataminded.lighthouse.experimental.datalake.links +import be.dataminded.lighthouse.spark.SparkSessions +import org.apache.spark.sql.{DataFrame, Encoder} + +class MockDataStorage(data: DataFrame) extends DataStorage { + override protected[links] def doRead: DataFrame = data + override protected[links] def doWrite(data: DataFrame): Unit = () +} + +object MockDataStorage extends SparkSessions { + def typed[T: Encoder](data: DataFrame): TypedDataLink[T] = new MockDataStorage(data) with TypedDataLink[T] { + override protected def encoder: Encoder[T] = implicitly + } + + def untyped(data: DataFrame): UntypedDataLink = new MockDataStorage(data) with UntypedDataLink {} +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/NullDataStorage.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/NullDataStorage.scala new file mode 100644 index 0000000..1787de0 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/NullDataStorage.scala @@ -0,0 +1,17 @@ +package be.dataminded.lighthouse.experimental.datalake.links + +import be.dataminded.lighthouse.spark.SparkSessions +import org.apache.spark.sql.{DataFrame, Encoder, SparkSession} + +class NullDataStorage private(sparkSession: SparkSession) extends DataStorage { + override protected[links] def doRead: DataFrame = sparkSession.createDataFrame(Seq.empty) + override protected[links] def doWrite(data: DataFrame): Unit = () +} + +object NullDataStorage extends SparkSessions { + def typed[T: Encoder]: TypedDataLink[T] = new NullDataStorage(spark) with TypedDataLink[T] { + override protected def encoder: Encoder[T] = implicitly + } + + def untyped = new NullDataStorage(spark) with UntypedDataLink +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/Partitionable.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/Partitionable.scala new file mode 100644 index 0000000..3192816 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/Partitionable.scala @@ -0,0 +1,16 @@ +package be.dataminded.lighthouse.experimental.datalake.links + +import java.time.LocalDate + +import be.dataminded.lighthouse.datalake.LazyConfig + +trait Partitionable[T, L <: TypeableDataLink[T]] { + this: PathBasedDataStorage with TypeableDataLink[T] => + + def snapshotOf(date: LazyConfig[LocalDate], + pathSuffix: Option[String] = None): SnapshotDataStorage with L + + + def partitionOf(partitionPath: LazyConfig[String]): PartitionedDataStorage with L + +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/PartitionedDataStorage.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/PartitionedDataStorage.scala new file mode 100644 index 0000000..3dd2d5d --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/PartitionedDataStorage.scala @@ -0,0 +1,7 @@ +package be.dataminded.lighthouse.experimental.datalake.links + +import be.dataminded.lighthouse.datalake._ + +abstract class PartitionedDataStorage(val partitionPath: LazyConfig[String]) extends ProxyPathBasedDataStorage { + override val path: LazyConfig[String] = s"${sourceDataStorage.path().trim().stripSuffix("/")}/${partitionPath()}" +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/PathBasedDataStorage.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/PathBasedDataStorage.scala new file mode 100644 index 0000000..4e30170 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/PathBasedDataStorage.scala @@ -0,0 +1,14 @@ +package be.dataminded.lighthouse.experimental.datalake.links +import be.dataminded.lighthouse.datalake.LazyConfig +import org.apache.spark.sql.DataFrame + +abstract class PathBasedDataStorage extends DataStorage { + def path: LazyConfig[String] + + override protected final def doRead: DataFrame = readFrom(path()) + override protected final def doWrite(data: DataFrame): Unit = writeTo(path(), data) + + protected[links] def readFrom(path: String): DataFrame + protected[links] def writeTo(path: String, data: DataFrame): Unit + +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/ProxyPathBasedDataStorage.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/ProxyPathBasedDataStorage.scala new file mode 100644 index 0000000..4eb3e4f --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/ProxyPathBasedDataStorage.scala @@ -0,0 +1,10 @@ +package be.dataminded.lighthouse.experimental.datalake.links + +import org.apache.spark.sql.DataFrame + +trait ProxyPathBasedDataStorage extends PathBasedDataStorage { + def sourceDataStorage: PathBasedDataStorage + + override protected[links] def readFrom(path: String): DataFrame = sourceDataStorage.readFrom(path) + override protected[links] def writeTo(path: String, data: DataFrame): Unit = sourceDataStorage.writeTo(path, data) +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/SnapshotDataStorage.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/SnapshotDataStorage.scala new file mode 100644 index 0000000..a2ab3ae --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/SnapshotDataStorage.scala @@ -0,0 +1,25 @@ +package be.dataminded.lighthouse.experimental.datalake.links + +import java.time.LocalDate +import java.time.format.DateTimeFormatter + +import be.dataminded.lighthouse.datalake._ + +abstract class SnapshotDataStorage(val snapshotDate: LazyConfig[LocalDate], val pathSuffix: Option[String]) + extends ProxyPathBasedDataStorage { + + override val path: LazyConfig[String] = + s"${sourceDataStorage.path().trim().stripSuffix("/")}/${snapshotDate().format(DateTimeFormatter.ofPattern("yyyy/MM/dd"))}/${pathSuffix + .getOrElse("")}" +} + +object SnapshotDataStorage { + def apply[T](dataLink: PathBasedDataStorage, + snapshotDate: LazyConfig[LocalDate], + pathSuffix: Option[String]): SnapshotDataStorage = { + + new SnapshotDataStorage /*[T]*/ (snapshotDate, pathSuffix) { + override val sourceDataStorage: PathBasedDataStorage = dataLink + } + } +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypeableDataLink.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypeableDataLink.scala new file mode 100644 index 0000000..e530ee8 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypeableDataLink.scala @@ -0,0 +1,9 @@ +package be.dataminded.lighthouse.experimental.datalake.links +import org.apache.spark.sql.Dataset + +trait TypeableDataLink[T] extends Serializable { + this: DataStorage => + + def read: Dataset[T] + def write(data: Dataset[T]): Unit + } diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypedDataLink.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypedDataLink.scala new file mode 100644 index 0000000..eff9b7a --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypedDataLink.scala @@ -0,0 +1,11 @@ +package be.dataminded.lighthouse.experimental.datalake.links +import org.apache.spark.sql.{Dataset, Encoder} + +trait TypedDataLink[T] extends TypeableDataLink[T] { + this: DataStorage => + + protected def encoder: Encoder[T] + + override def read: Dataset[T] = doRead.as[T](encoder) + override def write(data: Dataset[T]): Unit = doWrite(data.toDF()) +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypedPartitionable.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypedPartitionable.scala new file mode 100644 index 0000000..c729f00 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/TypedPartitionable.scala @@ -0,0 +1,24 @@ +package be.dataminded.lighthouse.experimental.datalake.links +import java.time.LocalDate + +import be.dataminded.lighthouse.datalake.LazyConfig +import org.apache.spark.sql.Encoder + +trait TypedPartitionable[T] extends Partitionable[T, TypedDataLink[T]] { + self: PathBasedDataStorage with TypedDataLink[T] => + + override def snapshotOf(date: LazyConfig[LocalDate], + pathSuffix: Option[String]): SnapshotDataStorage with TypedDataLink[T] = + new SnapshotDataStorage(date, pathSuffix) with TypedDataLink[T] with TypedPartitionable[T] { + override def sourceDataStorage: PathBasedDataStorage = self + + override protected def encoder: Encoder[T] = self.encoder + } + + override def partitionOf(partitionPath: LazyConfig[String]): PartitionedDataStorage with TypedDataLink[T] = + new PartitionedDataStorage(partitionPath) with TypedDataLink[T] with TypedPartitionable[T] { + override def sourceDataStorage: PathBasedDataStorage = self + + override protected def encoder: Encoder[T] = self.encoder + } +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/UntypedDataLink.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/UntypedDataLink.scala new file mode 100644 index 0000000..c3b610b --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/UntypedDataLink.scala @@ -0,0 +1,9 @@ +package be.dataminded.lighthouse.experimental.datalake.links +import org.apache.spark.sql.{DataFrame, Dataset, Row} + +trait UntypedDataLink extends TypeableDataLink[Row] { + this: DataStorage => + + override def read: Dataset[Row] = doRead + override def write(data: DataFrame): Unit = doWrite(data) +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/UntypedPartitionable.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/UntypedPartitionable.scala new file mode 100644 index 0000000..5b3d2f5 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/UntypedPartitionable.scala @@ -0,0 +1,21 @@ +package be.dataminded.lighthouse.experimental.datalake.links + +import java.time.LocalDate + +import be.dataminded.lighthouse.datalake.LazyConfig +import org.apache.spark.sql.Row + +trait UntypedPartitionable extends Partitionable[Row, UntypedDataLink] { + self: PathBasedDataStorage with UntypedDataLink => + + override def snapshotOf(date: LazyConfig[LocalDate], + pathSuffix: Option[String]): SnapshotDataStorage with UntypedDataLink = + new SnapshotDataStorage(date, pathSuffix) with UntypedDataLink with UntypedPartitionable { + override def sourceDataStorage: PathBasedDataStorage = self + } + + override def partitionOf(partitionPath: LazyConfig[String]): PartitionedDataStorage with UntypedDataLink = + new PartitionedDataStorage(partitionPath) with UntypedDataLink with UntypedPartitionable { + override def sourceDataStorage: PathBasedDataStorage = self + } +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/package.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/package.scala new file mode 100644 index 0000000..d811aaf --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/datalake/links/package.scala @@ -0,0 +1,7 @@ +package be.dataminded.lighthouse.experimental.datalake + +package object links { + type PartitionableTypedDataLink[T] = TypedDataLink[T] with TypedPartitionable[T] + + type PartitionableUntypedDataLink = UntypedDataLink with UntypedPartitionable +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Count.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Count.scala new file mode 100644 index 0000000..a0ec8d4 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Count.scala @@ -0,0 +1,13 @@ +package be.dataminded.lighthouse.experimental.metrics +import org.apache.spark.sql.Row + +class Count[N: Numeric] extends Metric[N] { + private val numeric: Numeric[N] = implicitly + + override def ofRow(row: Row): N = numeric.one + override def combine(x: N, y: N): N = numeric.plus(x, y) +} + +object Count { + implicit def count[N: Numeric]: Count[N] = new Count[N] +} \ No newline at end of file diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Max.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Max.scala new file mode 100644 index 0000000..76a45c9 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Max.scala @@ -0,0 +1,11 @@ +package be.dataminded.lighthouse.experimental.metrics + +import org.apache.spark.sql.Row + +class Max[O: Ordering](field: String) extends Metric[O] { + private val ordering: Ordering[O] = implicitly + + override def ofRow(row: Row): O = row.getAs[O](field) + + override def combine(x: O, y: O): O = ordering.max(x, y) +} \ No newline at end of file diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Metric.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Metric.scala new file mode 100644 index 0000000..2c658fe --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Metric.scala @@ -0,0 +1,8 @@ +package be.dataminded.lighthouse.experimental.metrics +import org.apache.spark.sql.Row + +trait Metric[M] { + def ofRow(row: Row): M + + def combine(x: M, y: M): M +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/MetricsBundle.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/MetricsBundle.scala new file mode 100644 index 0000000..e029c7f --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/MetricsBundle.scala @@ -0,0 +1,15 @@ +package be.dataminded.lighthouse.experimental.metrics +import org.apache.spark.sql.{Encoder, Encoders} + +import scala.reflect.ClassTag + +abstract class MetricsBundle[B: ClassTag] extends Metric[B] with Serializable { + implicit val encoder: Encoder[B] = Encoders.kryo +// def ofRow(row: Row): B +// def combine(x: B, y: B) +} + +/* +object MetricsBundle { + implicit def bundle[B]: MetricsBundle[B] +}*/ diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Min.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Min.scala new file mode 100644 index 0000000..a31e7b8 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Min.scala @@ -0,0 +1,11 @@ +package be.dataminded.lighthouse.experimental.metrics + +import org.apache.spark.sql.Row + +class Min[O: Ordering](field: String) extends Metric[O] { + private val ordering: Ordering[O] = implicitly + + override def ofRow(row: Row): O = row.getAs(field) + + override def combine(x: O, y: O): O = ordering.min(x, y) +} \ No newline at end of file diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/NullCount.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/NullCount.scala new file mode 100644 index 0000000..511a492 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/NullCount.scala @@ -0,0 +1,15 @@ +package be.dataminded.lighthouse.experimental.metrics + +import org.apache.spark.sql.Row + +class NullCount[N: Numeric](field: String) extends Metric[N] { + private val numeric: Numeric[N] = implicitly + + override def ofRow(row: Row): N = + if (row.isNullAt(row.fieldIndex(field))) + numeric.one + else + numeric.zero + + override def combine(x: N, y: N): N = numeric.plus(x, y) +} \ No newline at end of file diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Sum.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Sum.scala new file mode 100644 index 0000000..b823d55 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/Sum.scala @@ -0,0 +1,11 @@ +package be.dataminded.lighthouse.experimental.metrics + +import org.apache.spark.sql.Row + +class Sum[N: Numeric](field: String) extends Metric[N] { + private val numeric: Numeric[N] = implicitly + + override def ofRow(row: Row): N = row.getAs[N](field) + + override def combine(x: N, y: N): N = numeric.plus(x, y) +} \ No newline at end of file diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/syntax/package.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/syntax/package.scala new file mode 100644 index 0000000..9ce0061 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/metrics/syntax/package.scala @@ -0,0 +1,17 @@ +package be.dataminded.lighthouse.experimental.metrics +import org.apache.spark.sql.Row + +package object syntax { + trait MetricTag[T, M <: Metric[T]] + type As[T, M <: Metric[T]] = T with MetricTag[T, M] + + implicit class MetricsSyntax[T, M <: Metric[T]](x: T As M)(implicit metric: M) { + def ++(y: T As M): T As M = + implicitly[M].combine(x, y).asInstanceOf[T As M] + } + + implicit class RowSyntax(row: Row) { + def calculateThisMetric[T, M <: Metric[T]](implicit metric: M): T As M = + implicitly[M].ofRow(row).asInstanceOf[T As M] + } +} diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/tasks/GenericCleansingTask.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/tasks/GenericCleansingTask.scala new file mode 100644 index 0000000..52ebc06 --- /dev/null +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/experimental/tasks/GenericCleansingTask.scala @@ -0,0 +1,22 @@ +package be.dataminded.lighthouse.experimental.tasks +import be.dataminded.lighthouse.experimental.datalake.links.{TypedDataLink, UntypedDataLink} +import be.dataminded.lighthouse.experimental.metrics.MetricsBundle +import org.apache.spark.sql.{Dataset, Encoder, Row} + +abstract class GenericCleansingTask[O: Encoder, B: MetricsBundle] extends Serializable { + private val metricsBundle: MetricsBundle[B] = implicitly + + private val sourceData = source.read.cache() + + def source: UntypedDataLink + def sink: TypedDataLink[O] + + def trasform(row: Row): O + + def clean: Dataset[O] = sourceData.map(trasform) + + def metrics: B = { + import metricsBundle.encoder + sourceData.map(metricsBundle.ofRow).reduce(metricsBundle.combine _) + } +}