Skip to content
This repository was archived by the owner on Sep 6, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=2.3.2
version=2.5.3
align = true
maxColumn = 120
18 changes: 12 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import sbt.Opts.resolver.sonatypeStaging
lazy val buildSettings = Seq(
organization := "be.dataminded",
scalaVersion := scala211,
crossScalaVersions := Nil,
crossScalaVersions := supportedScalaVersions,
// Ensure code quality
scalafmtOnCompile := true,
// Memory settings to be able to test with Spark
Expand All @@ -15,14 +15,21 @@ lazy val buildSettings = Seq(
javaOptions ++= Seq(
"-Xms768M",
"-Xmx2048M",
"-XX:+CMSClassUnloadingEnabled",
"-XX:+UseStringDeduplication",
"-XX:+UseG1GC",
"-Dspark.sql.shuffle.partitions=2",
"-Dspark.shuffle.sort.bypassMergeThreshold=2",
"-Dlighthouse.environment=test"
),
scalacOptions ++= Seq(
"-deprecation",
"-optimize",
"-deprecation", // Emit warning and location for usages of deprecated APIs.
"-encoding", "utf-8", // Specify character encoding used by source files.
"-explaintypes", // Explain type errors in more detail.
"-feature", // Emit warning and location for usages of features that should be imported explicitly.
"-language:existentials", // Existential types (besides wildcard types) can be written and inferred
"-language:experimental.macros", // Allow macro definition (besides implementation and application)
"-language:higherKinds", // Allow higher-kinded types
"-language:implicitConversions", // Allow definition of implicit functions called views
"-unchecked",
"-Ydelambdafy:inline",
"-Ypartial-unification",
Expand Down Expand Up @@ -61,8 +68,7 @@ lazy val lighthouse = (project in file("lighthouse-core"))
lazy val `lighthouse-testing` = (project in file("lighthouse-testing"))
.settings(
buildSettings,
libraryDependencies ++= Seq(sparkSql, sparkHive, scalaTest, betterFiles),
crossScalaVersions := supportedScalaVersions
libraryDependencies ++= Seq(sparkSql, sparkHive, scalaTest, betterFiles)
)

lazy val `lighthouse-demo` = (project in file("lighthouse-demo"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class LighthouseConfigurationParser extends OptionParser[LighthouseConfiguration

opt[String]('e', "environment")
.action((environment, config) => config.copy(environment = environment))
.withFallback(fallbackEnvironment)
.withFallback(() => fallbackEnvironment())
.validate(item => if (item.nonEmpty) success else failure("The configured environment for Lighthouse is empty"))
.required()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ trait Datalake {
*/
def getDataLink(dataUID: DataUID): DataLink = enabledEnvironment(dataUID)

protected def environment(name: String)(f: EnvironmentBuilder => EnvironmentBuilder): Unit = name match {
case envName if envName == environmentName => enabledEnvironment = f(newEmptyEnvironment()).result()
case _ =>
}
protected def environment(name: String)(f: EnvironmentBuilder => EnvironmentBuilder): Unit =
name match {
case envName if envName == environmentName => enabledEnvironment = f(newEmptyEnvironment()).result()
case _ =>
}

private def newEmptyEnvironment() = new mutable.MapBuilder[DataUID, DataLink, Environment](Map.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ class AwsSsmParamStore() extends ParameterStore {
* @param ssmPath The AWS SSM path where to retrieve the parameter value from
* @return An anonymous function allowing to retrieve the configuration path
*/
def lookup(ssmPath: String): LazyConfig[String] = LazyConfig {
val getParameterRequest = new GetParameterRequest().withName(ssmPath).withWithDecryption(true)
client.getParameter(getParameterRequest).getParameter.getValue
}
def lookup(ssmPath: String): LazyConfig[String] =
LazyConfig {
val getParameterRequest = new GetParameterRequest().withName(ssmPath).withWithDecryption(true)
client.getParameter(getParameterRequest).getParameter.getValue
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class FileParamStore(path: String, overrides: Map[String, String] = Map.empty) e
* Returns the lookup function to find a particular setting
* @param key The key to retrieve
*/
def lookup(key: String): LazyConfig[String] = LazyConfig {
config.getString(key)
}
def lookup(key: String): LazyConfig[String] =
LazyConfig {
config.getString(key)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,29 @@ object RichSparkFunctions extends LazyLogging {
/*
* Print schema originally returns Unit, wrapping it in the SparkFunction allows you to chain the method
*/
def printSchema(): SparkFunction[A] = function.map { dataSet =>
dataSet.printSchema()
dataSet
}
def printSchema(): SparkFunction[A] =
function.map { dataSet =>
dataSet.printSchema()
dataSet
}

def as[T: Encoder]: SparkFunction[Dataset[T]] = function.map(_.as[T])

def cache(storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): SparkFunction[A] = function.map {
_.persist(storageLevel)
}
def cache(storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): SparkFunction[A] =
function.map {
_.persist(storageLevel)
}

def dropCache(): SparkFunction[A] = function.map {
_.unpersist()
}
def dropCache(): SparkFunction[A] =
function.map {
_.unpersist()
}

def write(sink: Sink, sinks: Sink*): SparkFunction[A] = {
if (sinks.isEmpty) function.map { data =>
sink.write(data); data
} else (sink +: sinks).foldLeft(function.cache())((f, sink) => f.write(sink))
}
else (sink +: sinks).foldLeft(function.cache())((f, sink) => f.write(sink))
}

def count(): SparkFunction[Long] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ object Sources {

def fromDataLink(dataLink: DataLink): SparkFunction[DataFrame] = SparkFunction.of(dataLink.read())

def fromOrc(path: String): SparkFunction[DataFrame] = SparkFunction { spark =>
spark.read.orc(path)
}
def fromOrc(path: String): SparkFunction[DataFrame] =
SparkFunction { spark =>
spark.read.orc(path)
}

def fromText(path: String) = SparkFunction { spark =>
import spark.implicits._
spark.read.text(path).as[String]
}
def fromText(path: String) =
SparkFunction { spark =>
import spark.implicits._
spark.read.text(path).as[String]
}

def fromCsv(path: String): SparkFunction[DataFrame] = SparkFunction { spark =>
spark.read.option("inferSchema", "true").option("header", "true").csv(path)
}
def fromCsv(path: String): SparkFunction[DataFrame] =
SparkFunction { spark =>
spark.read.option("inferSchema", "true").option("header", "true").csv(path)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ object SparkFunction {
override def run(spark: SparkSession): A = function(spark)
}

def of[A](block: => A): SparkFunction[A] = new SparkFunction[A] {
override def run(spark: SparkSession): A = block
}
def of[A](block: => A): SparkFunction[A] =
new SparkFunction[A] {
override def run(spark: SparkSession): A = block
}

implicit val monad: Monad[SparkFunction] = new Monad[SparkFunction] {
override def pure[A](x: A): SparkFunction[A] = SparkFunction(_ => x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ class SparkFunctionTest
val orders = Sources.fromCsv(ordersPath)

// I wrap this in a SparkFunction again because I need the SparkSession to import the implicits
def countOrdersByCustomer(orders: DataFrame): SparkFunction[DataFrame] = SparkFunction { spark =>
import spark.implicits._
orders.groupBy('customerId).agg(count('id).as("count"))
}
def countOrdersByCustomer(orders: DataFrame): SparkFunction[DataFrame] =
SparkFunction { spark =>
import spark.implicits._
orders.groupBy('customerId).agg(count('id).as("count"))
}

// No need for SparkSession, just a normal function
def joinCustomersWithOrders(customers: DataFrame, ordersByCustomer: DataFrame): DataFrame = {
Expand Down Expand Up @@ -225,15 +226,15 @@ class SparkFunctionTest

def dedup(persons: Dataset[RawPerson]): Dataset[RawPerson] = persons.distinct()

def normalize(persons: Dataset[RawPerson]): SparkFunction[Dataset[BasePerson]] = SparkFunction {
spark: SparkSession =>
def normalize(persons: Dataset[RawPerson]): SparkFunction[Dataset[BasePerson]] =
SparkFunction { spark: SparkSession =>
import spark.implicits._

persons.map { raw =>
val tokens = raw.name.split(" ")
BasePerson(tokens(0), tokens(1), raw.age)
}
}
}

def returnBase(basePersons: Dataset[BasePerson]): Dataset[BasePerson] = basePersons
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,34 +35,38 @@ trait AirplanePipeline {
.mapN(buildView)
.write(AirplaneDatalake("master" -> "view"))

private def cleanDailyWeather(dailyWeather: DataFrame) = SparkFunction { spark =>
import spark.implicits._

val toCelsius = udf((temp: Int) => ((temp - 32) / 1.8).toInt)

dailyWeather
.select('WBAN, 'YearMonthDay, 'Tmin, 'Tmax, 'Tavg, 'PrecipTotal, 'AvgSpeed)
.withColumn("Tmin", toCelsius('Tmin))
.withColumn("Tmax", toCelsius('Tmax))
.withColumn("Tavg", toCelsius('Tavg))
.withColumnRenamed("AvgSpeed", "WAvgSpeed")
}

protected def cleanWeatherStations(weatherStations: DataFrame) = SparkFunction { spark =>
import spark.implicits._

weatherStations.select('WBAN, 'CallSign).withColumnRenamed("CallSign", "IAT").distinct()
}

private def cleanAirlines(airlines: DataFrame) = SparkFunction { spark =>
import spark.implicits._

val timestamp = udf((year: String, month: String, day: String) => f"$year${month.toInt}%02d${day.toInt}%02d".toInt)

airlines
.select('Origin, 'Dest, 'Year, 'Month, 'DayofMonth, 'DayOfWeek, 'ArrDelay)
.withColumn("YearMonthDay", timestamp('Year, 'Month, 'DayofMonth))
}
private def cleanDailyWeather(dailyWeather: DataFrame) =
SparkFunction { spark =>
import spark.implicits._

val toCelsius = udf((temp: Int) => ((temp - 32) / 1.8).toInt)

dailyWeather
.select('WBAN, 'YearMonthDay, 'Tmin, 'Tmax, 'Tavg, 'PrecipTotal, 'AvgSpeed)
.withColumn("Tmin", toCelsius('Tmin))
.withColumn("Tmax", toCelsius('Tmax))
.withColumn("Tavg", toCelsius('Tavg))
.withColumnRenamed("AvgSpeed", "WAvgSpeed")
}

protected def cleanWeatherStations(weatherStations: DataFrame) =
SparkFunction { spark =>
import spark.implicits._

weatherStations.select('WBAN, 'CallSign).withColumnRenamed("CallSign", "IAT").distinct()
}

private def cleanAirlines(airlines: DataFrame) =
SparkFunction { spark =>
import spark.implicits._

val timestamp =
udf((year: String, month: String, day: String) => f"$year${month.toInt}%02d${day.toInt}%02d".toInt)

airlines
.select('Origin, 'Dest, 'Year, 'Month, 'DayofMonth, 'DayOfWeek, 'ArrDelay)
.withColumn("YearMonthDay", timestamp('Year, 'Month, 'DayofMonth))
}

private def dailyWeatherWithStation(dailyWeather: DataFrame, weatherStations: DataFrame) = {
dailyWeather.join(weatherStations, "WBAN").drop("WBAN")
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import sbt._
object Dependencies {

private val amazonSdkVersion = "1.11.659"
private val sparkVersion = "2.4.4"
private val sparkVersion = "2.4.5"

val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion % Provided
val sparkSql = "org.apache.spark" %% "spark-sql" % sparkVersion % Provided
Expand All @@ -18,15 +18,15 @@ object Dependencies {
val betterFiles = "com.github.pathikrit" %% "better-files" % "3.8.0"

// Database connectivity
val scalikejdbc = "org.scalikejdbc" %% "scalikejdbc" % "3.4.0"
val scalikejdbc = "org.scalikejdbc" %% "scalikejdbc" % "3.4.2"
val h2 = "com.h2database" % "h2" % "1.4.200" % Test

// Amazon AWS
val awsSdkS3 = "com.amazonaws" % "aws-java-sdk-s3" % amazonSdkVersion % Provided
val awsSdkSsm = "com.amazonaws" % "aws-java-sdk-ssm" % amazonSdkVersion % Provided
val amazonSdk = Seq(awsSdkS3, awsSdkSsm)

val scalaTest = "org.scalatest" %% "scalatest" % "3.1.0"
val scalaTest = "org.scalatest" %% "scalatest" % "3.1.2"
val testDependencies = Seq(scalaTest % Test, h2)

val commonDependencies: Seq[ModuleID] =
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.3.7
sbt.version=1.3.10
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")
addSbtPlugin("com.frugalmechanic" % "fm-sbt-s3-resolver" % "0.19.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.3.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-site" % "1.4.0")

// Publish to Maven Central
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.8.1")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.1")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")