Skip to content
This repository was archived by the owner on Sep 6, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package be.dataminded.lighthouse.experimental.datalake.environment

abstract class Environment(val name: String) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package be.dataminded.lighthouse.experimental.datalake.environment
import be.dataminded.lighthouse.datalake.Datalake.{DefaultEnvironment, PropertyName}

trait EnvironmentDispatcher[E <: Environment] {
private val environmentName: String =
Option(System.getProperty(PropertyName)).getOrElse(DefaultEnvironment)

def enabledEnvironments: Set[E]

def currentEnvironment: E =
enabledEnvironments.find(_.name == environmentName)
.getOrElse(throw new IllegalArgumentException(s"DataEnvironment not valid: $environmentName"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package be.dataminded.lighthouse.experimental.datalake.links
import org.apache.spark.sql.DataFrame

trait DataStorage {
protected def doRead: DataFrame

protected def doWrite(data: DataFrame): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package be.dataminded.lighthouse.experimental.datalake.links

import be.dataminded.lighthouse.datalake._
import be.dataminded.lighthouse.spark.{Orc, SparkFileFormat, SparkSessions}
import org.apache.spark.sql.{DataFrame, Encoder, Row, SaveMode}
import org.apache.spark.sql.types.StructType

abstract class FileSystemDataStorage(val path: LazyConfig[String],
format: SparkFileFormat = Orc,
saveMode: SaveMode = SaveMode.Overwrite,
partitionedBy: List[String] = List.empty,
options: Map[String, String] = Map.empty,
schema: Option[StructType] = None)
extends PathBasedDataStorage
with SparkSessions {

override protected[links] def readFrom(path: String): DataFrame = schema match {
case Some(s) => spark.read.format(format.toString).options(options).schema(s).load(path)
case None => spark.read.format(format.toString).options(options).load(path)
}

override protected[links] def writeTo(path: String, data: DataFrame): Unit =
data.write
.format(format.toString)
.partitionBy(partitionedBy: _*)
.options(options)
.mode(saveMode)
.save(path)
}

object FileSystemDataStorage {
def untyped(path: LazyConfig[String],
format: SparkFileFormat = Orc,
saveMode: SaveMode = SaveMode.Overwrite,
partitionedBy: List[String] = List.empty,
options: Map[String, String] = Map.empty,
schema: Option[StructType] = None): FileSystemDataStorage with UntypedDataLink with UntypedPartitionable =
new FileSystemDataStorage(path, format, saveMode, partitionedBy, options, schema) with UntypedDataLink
with UntypedPartitionable

def typed[T: Encoder](
path: LazyConfig[String],
format: SparkFileFormat = Orc,
saveMode: SaveMode = SaveMode.Overwrite,
partitionedBy: List[String] = List.empty,
options: Map[String, String] = Map.empty,
schema: Option[StructType] = None): FileSystemDataStorage with TypedDataLink[T] with TypedPartitionable[T] =
new FileSystemDataStorage(path, format, saveMode, partitionedBy, options, schema) with TypedDataLink[T]
with TypedPartitionable[T] {
override protected def encoder: Encoder[T] = implicitly
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package be.dataminded.lighthouse.experimental.datalake.links
import be.dataminded.lighthouse.spark.SparkSessions
import org.apache.spark.sql.{DataFrame, Encoder}

class MockDataStorage(data: DataFrame) extends DataStorage {
override protected[links] def doRead: DataFrame = data
override protected[links] def doWrite(data: DataFrame): Unit = ()
}

object MockDataStorage extends SparkSessions {
def typed[T: Encoder](data: DataFrame): TypedDataLink[T] = new MockDataStorage(data) with TypedDataLink[T] {
override protected def encoder: Encoder[T] = implicitly
}

def untyped(data: DataFrame): UntypedDataLink = new MockDataStorage(data) with UntypedDataLink {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package be.dataminded.lighthouse.experimental.datalake.links

import be.dataminded.lighthouse.spark.SparkSessions
import org.apache.spark.sql.{DataFrame, Encoder, SparkSession}

class NullDataStorage private(sparkSession: SparkSession) extends DataStorage {
override protected[links] def doRead: DataFrame = sparkSession.createDataFrame(Seq.empty)
override protected[links] def doWrite(data: DataFrame): Unit = ()
}

object NullDataStorage extends SparkSessions {
def typed[T: Encoder]: TypedDataLink[T] = new NullDataStorage(spark) with TypedDataLink[T] {
override protected def encoder: Encoder[T] = implicitly
}

def untyped = new NullDataStorage(spark) with UntypedDataLink
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package be.dataminded.lighthouse.experimental.datalake.links

import java.time.LocalDate

import be.dataminded.lighthouse.datalake.LazyConfig

trait Partitionable[T, L <: TypeableDataLink[T]] {
this: PathBasedDataStorage with TypeableDataLink[T] =>

def snapshotOf(date: LazyConfig[LocalDate],
pathSuffix: Option[String] = None): SnapshotDataStorage with L


def partitionOf(partitionPath: LazyConfig[String]): PartitionedDataStorage with L

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package be.dataminded.lighthouse.experimental.datalake.links

import be.dataminded.lighthouse.datalake._

abstract class PartitionedDataStorage(val partitionPath: LazyConfig[String]) extends ProxyPathBasedDataStorage {
override val path: LazyConfig[String] = s"${sourceDataStorage.path().trim().stripSuffix("/")}/${partitionPath()}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package be.dataminded.lighthouse.experimental.datalake.links
import be.dataminded.lighthouse.datalake.LazyConfig
import org.apache.spark.sql.DataFrame

abstract class PathBasedDataStorage extends DataStorage {
def path: LazyConfig[String]

override protected final def doRead: DataFrame = readFrom(path())
override protected final def doWrite(data: DataFrame): Unit = writeTo(path(), data)

protected[links] def readFrom(path: String): DataFrame
protected[links] def writeTo(path: String, data: DataFrame): Unit

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package be.dataminded.lighthouse.experimental.datalake.links

import org.apache.spark.sql.DataFrame

trait ProxyPathBasedDataStorage extends PathBasedDataStorage {
def sourceDataStorage: PathBasedDataStorage

override protected[links] def readFrom(path: String): DataFrame = sourceDataStorage.readFrom(path)
override protected[links] def writeTo(path: String, data: DataFrame): Unit = sourceDataStorage.writeTo(path, data)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package be.dataminded.lighthouse.experimental.datalake.links

import java.time.LocalDate
import java.time.format.DateTimeFormatter

import be.dataminded.lighthouse.datalake._

abstract class SnapshotDataStorage(val snapshotDate: LazyConfig[LocalDate], val pathSuffix: Option[String])
extends ProxyPathBasedDataStorage {

override val path: LazyConfig[String] =
s"${sourceDataStorage.path().trim().stripSuffix("/")}/${snapshotDate().format(DateTimeFormatter.ofPattern("yyyy/MM/dd"))}/${pathSuffix
.getOrElse("")}"
}

object SnapshotDataStorage {
def apply[T](dataLink: PathBasedDataStorage,
snapshotDate: LazyConfig[LocalDate],
pathSuffix: Option[String]): SnapshotDataStorage = {

new SnapshotDataStorage /*[T]*/ (snapshotDate, pathSuffix) {
override val sourceDataStorage: PathBasedDataStorage = dataLink
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package be.dataminded.lighthouse.experimental.datalake.links
import org.apache.spark.sql.Dataset

trait TypeableDataLink[T] extends Serializable {
this: DataStorage =>

def read: Dataset[T]
def write(data: Dataset[T]): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package be.dataminded.lighthouse.experimental.datalake.links
import org.apache.spark.sql.{Dataset, Encoder}

trait TypedDataLink[T] extends TypeableDataLink[T] {
this: DataStorage =>

protected def encoder: Encoder[T]

override def read: Dataset[T] = doRead.as[T](encoder)
override def write(data: Dataset[T]): Unit = doWrite(data.toDF())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package be.dataminded.lighthouse.experimental.datalake.links
import java.time.LocalDate

import be.dataminded.lighthouse.datalake.LazyConfig
import org.apache.spark.sql.Encoder

trait TypedPartitionable[T] extends Partitionable[T, TypedDataLink[T]] {
self: PathBasedDataStorage with TypedDataLink[T] =>

override def snapshotOf(date: LazyConfig[LocalDate],
pathSuffix: Option[String]): SnapshotDataStorage with TypedDataLink[T] =
new SnapshotDataStorage(date, pathSuffix) with TypedDataLink[T] with TypedPartitionable[T] {
override def sourceDataStorage: PathBasedDataStorage = self

override protected def encoder: Encoder[T] = self.encoder
}

override def partitionOf(partitionPath: LazyConfig[String]): PartitionedDataStorage with TypedDataLink[T] =
new PartitionedDataStorage(partitionPath) with TypedDataLink[T] with TypedPartitionable[T] {
override def sourceDataStorage: PathBasedDataStorage = self

override protected def encoder: Encoder[T] = self.encoder
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package be.dataminded.lighthouse.experimental.datalake.links
import org.apache.spark.sql.{DataFrame, Dataset, Row}

trait UntypedDataLink extends TypeableDataLink[Row] {
this: DataStorage =>

override def read: Dataset[Row] = doRead
override def write(data: DataFrame): Unit = doWrite(data)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package be.dataminded.lighthouse.experimental.datalake.links

import java.time.LocalDate

import be.dataminded.lighthouse.datalake.LazyConfig
import org.apache.spark.sql.Row

trait UntypedPartitionable extends Partitionable[Row, UntypedDataLink] {
self: PathBasedDataStorage with UntypedDataLink =>

override def snapshotOf(date: LazyConfig[LocalDate],
pathSuffix: Option[String]): SnapshotDataStorage with UntypedDataLink =
new SnapshotDataStorage(date, pathSuffix) with UntypedDataLink with UntypedPartitionable {
override def sourceDataStorage: PathBasedDataStorage = self
}

override def partitionOf(partitionPath: LazyConfig[String]): PartitionedDataStorage with UntypedDataLink =
new PartitionedDataStorage(partitionPath) with UntypedDataLink with UntypedPartitionable {
override def sourceDataStorage: PathBasedDataStorage = self
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package be.dataminded.lighthouse.experimental.datalake

package object links {
type PartitionableTypedDataLink[T] = TypedDataLink[T] with TypedPartitionable[T]

type PartitionableUntypedDataLink = UntypedDataLink with UntypedPartitionable
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package be.dataminded.lighthouse.experimental.metrics
import org.apache.spark.sql.Row

class Count[N: Numeric] extends Metric[N] {
private val numeric: Numeric[N] = implicitly

override def ofRow(row: Row): N = numeric.one
override def combine(x: N, y: N): N = numeric.plus(x, y)
}

object Count {
implicit def count[N: Numeric]: Count[N] = new Count[N]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package be.dataminded.lighthouse.experimental.metrics

import org.apache.spark.sql.Row

class Max[O: Ordering](field: String) extends Metric[O] {
private val ordering: Ordering[O] = implicitly

override def ofRow(row: Row): O = row.getAs[O](field)

override def combine(x: O, y: O): O = ordering.max(x, y)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package be.dataminded.lighthouse.experimental.metrics
import org.apache.spark.sql.Row

trait Metric[M] {
def ofRow(row: Row): M

def combine(x: M, y: M): M
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package be.dataminded.lighthouse.experimental.metrics
import org.apache.spark.sql.{Encoder, Encoders}

import scala.reflect.ClassTag

abstract class MetricsBundle[B: ClassTag] extends Metric[B] with Serializable {
implicit val encoder: Encoder[B] = Encoders.kryo
// def ofRow(row: Row): B
// def combine(x: B, y: B)
}

/*
object MetricsBundle {
implicit def bundle[B]: MetricsBundle[B]
}*/
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package be.dataminded.lighthouse.experimental.metrics

import org.apache.spark.sql.Row

class Min[O: Ordering](field: String) extends Metric[O] {
private val ordering: Ordering[O] = implicitly

override def ofRow(row: Row): O = row.getAs(field)

override def combine(x: O, y: O): O = ordering.min(x, y)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package be.dataminded.lighthouse.experimental.metrics

import org.apache.spark.sql.Row

class NullCount[N: Numeric](field: String) extends Metric[N] {
private val numeric: Numeric[N] = implicitly

override def ofRow(row: Row): N =
if (row.isNullAt(row.fieldIndex(field)))
numeric.one
else
numeric.zero

override def combine(x: N, y: N): N = numeric.plus(x, y)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package be.dataminded.lighthouse.experimental.metrics

import org.apache.spark.sql.Row

class Sum[N: Numeric](field: String) extends Metric[N] {
private val numeric: Numeric[N] = implicitly

override def ofRow(row: Row): N = row.getAs[N](field)

override def combine(x: N, y: N): N = numeric.plus(x, y)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package be.dataminded.lighthouse.experimental.metrics
import org.apache.spark.sql.Row

package object syntax {
trait MetricTag[T, M <: Metric[T]]
type As[T, M <: Metric[T]] = T with MetricTag[T, M]

implicit class MetricsSyntax[T, M <: Metric[T]](x: T As M)(implicit metric: M) {
def ++(y: T As M): T As M =
implicitly[M].combine(x, y).asInstanceOf[T As M]
}

implicit class RowSyntax(row: Row) {
def calculateThisMetric[T, M <: Metric[T]](implicit metric: M): T As M =
implicitly[M].ofRow(row).asInstanceOf[T As M]
}
}
Loading