From ca42be1810a8eb9694d3ea987638d462e21ba2f6 Mon Sep 17 00:00:00 2001 From: Dmytro Hambal Date: Wed, 8 Apr 2020 18:43:49 +0300 Subject: [PATCH 1/2] Scala 2.12 compatibility. Apache Spark has embraced the GA support for 2.12 since the version 2.4.1, that was released nearly a year ago. The Amazon EMR v6.0.0 characterised by the adoption of Hadoop 3.2.1 (sic!) and Scala 2.12 (although the default JDK is still Coretto 8) has been released a month ago, hence I presume it would be nice to have cross-builds for the Lighthouse. This PR was cherry-picked from #27, where you can observe the matrix builds, as currently the Circle CI config supports only 2.11. Changelog: - enable cross-version builds for the projects; - fix compile-time warning. --- build.sbt | 9 ++++----- .../config/LighthouseConfigurationParser.scala | 2 +- project/build.properties | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/build.sbt b/build.sbt index 46f3f41..9d64402 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ import sbt.Opts.resolver.sonatypeStaging lazy val buildSettings = Seq( organization := "be.dataminded", scalaVersion := scala211, - crossScalaVersions := Nil, + crossScalaVersions := supportedScalaVersions, // Ensure code quality scalafmtOnCompile := true, // Memory settings to be able to test with Spark @@ -15,14 +15,14 @@ lazy val buildSettings = Seq( javaOptions ++= Seq( "-Xms768M", "-Xmx2048M", - "-XX:+CMSClassUnloadingEnabled", + "-XX:+UseStringDeduplication", + "-XX:+UseG1GC", "-Dspark.sql.shuffle.partitions=2", "-Dspark.shuffle.sort.bypassMergeThreshold=2", "-Dlighthouse.environment=test" ), scalacOptions ++= Seq( "-deprecation", - "-optimize", "-unchecked", "-Ydelambdafy:inline", "-Ypartial-unification", @@ -61,8 +61,7 @@ lazy val lighthouse = (project in file("lighthouse-core")) lazy val `lighthouse-testing` = (project in file("lighthouse-testing")) .settings( buildSettings, - libraryDependencies ++= Seq(sparkSql, sparkHive, scalaTest, betterFiles), - crossScalaVersions := supportedScalaVersions + libraryDependencies ++= Seq(sparkSql, sparkHive, scalaTest, betterFiles) ) lazy val `lighthouse-demo` = (project in file("lighthouse-demo")) diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/config/LighthouseConfigurationParser.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/config/LighthouseConfigurationParser.scala index 1341438..e49b8e1 100644 --- a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/config/LighthouseConfigurationParser.scala +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/config/LighthouseConfigurationParser.scala @@ -29,7 +29,7 @@ class LighthouseConfigurationParser extends OptionParser[LighthouseConfiguration opt[String]('e', "environment") .action((environment, config) => config.copy(environment = environment)) - .withFallback(fallbackEnvironment) + .withFallback(() => fallbackEnvironment()) .validate(item => if (item.nonEmpty) success else failure("The configured environment for Lighthouse is empty")) .required() diff --git a/project/build.properties b/project/build.properties index 2930ee9..a919a9b 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.7 \ No newline at end of file +sbt.version=1.3.8 From 7537a10a752ef5c77161814111128124c06ec045 Mon Sep 17 00:00:00 2001 From: Dmytro Hambal Date: Fri, 5 Jun 2020 12:49:00 +0300 Subject: [PATCH 2/2] Bump dependencies, formatting. --- .scalafmt.conf | 2 +- build.sbt | 9 ++- .../lighthouse/datalake/Datalake.scala | 9 +-- .../paramstore/AwsSsmParamStore.scala | 9 +-- .../paramstore/FileParamStore.scala | 7 ++- .../pipeline/RichSparkFunctions.scala | 26 ++++---- .../lighthouse/pipeline/Sources.scala | 23 +++---- .../lighthouse/pipeline/SparkFunction.scala | 7 ++- .../pipeline/SparkFunctionTest.scala | 15 ++--- .../lighthouse/demo/AirplanePipeline.scala | 60 ++++++++++--------- project/Dependencies.scala | 6 +- project/build.properties | 2 +- project/plugins.sbt | 4 +- 13 files changed, 101 insertions(+), 78 deletions(-) diff --git a/.scalafmt.conf b/.scalafmt.conf index 3732cd5..dec5cfc 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,3 +1,3 @@ -version=2.3.2 +version=2.5.3 align = true maxColumn = 120 \ No newline at end of file diff --git a/build.sbt b/build.sbt index 9d64402..b3e1083 100644 --- a/build.sbt +++ b/build.sbt @@ -22,7 +22,14 @@ lazy val buildSettings = Seq( "-Dlighthouse.environment=test" ), scalacOptions ++= Seq( - "-deprecation", + "-deprecation", // Emit warning and location for usages of deprecated APIs. + "-encoding", "utf-8", // Specify character encoding used by source files. + "-explaintypes", // Explain type errors in more detail. + "-feature", // Emit warning and location for usages of features that should be imported explicitly. + "-language:existentials", // Existential types (besides wildcard types) can be written and inferred + "-language:experimental.macros", // Allow macro definition (besides implementation and application) + "-language:higherKinds", // Allow higher-kinded types + "-language:implicitConversions", // Allow definition of implicit functions called views "-unchecked", "-Ydelambdafy:inline", "-Ypartial-unification", diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/datalake/Datalake.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/datalake/Datalake.scala index ce3b162..bba5d5f 100644 --- a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/datalake/Datalake.scala +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/datalake/Datalake.scala @@ -39,10 +39,11 @@ trait Datalake { */ def getDataLink(dataUID: DataUID): DataLink = enabledEnvironment(dataUID) - protected def environment(name: String)(f: EnvironmentBuilder => EnvironmentBuilder): Unit = name match { - case envName if envName == environmentName => enabledEnvironment = f(newEmptyEnvironment()).result() - case _ => - } + protected def environment(name: String)(f: EnvironmentBuilder => EnvironmentBuilder): Unit = + name match { + case envName if envName == environmentName => enabledEnvironment = f(newEmptyEnvironment()).result() + case _ => + } private def newEmptyEnvironment() = new mutable.MapBuilder[DataUID, DataLink, Environment](Map.empty) } diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/paramstore/AwsSsmParamStore.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/paramstore/AwsSsmParamStore.scala index e7063a8..1b9161b 100644 --- a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/paramstore/AwsSsmParamStore.scala +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/paramstore/AwsSsmParamStore.scala @@ -18,10 +18,11 @@ class AwsSsmParamStore() extends ParameterStore { * @param ssmPath The AWS SSM path where to retrieve the parameter value from * @return An anonymous function allowing to retrieve the configuration path */ - def lookup(ssmPath: String): LazyConfig[String] = LazyConfig { - val getParameterRequest = new GetParameterRequest().withName(ssmPath).withWithDecryption(true) - client.getParameter(getParameterRequest).getParameter.getValue - } + def lookup(ssmPath: String): LazyConfig[String] = + LazyConfig { + val getParameterRequest = new GetParameterRequest().withName(ssmPath).withWithDecryption(true) + client.getParameter(getParameterRequest).getParameter.getValue + } } /** diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/paramstore/FileParamStore.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/paramstore/FileParamStore.scala index 6acae32..57ad718 100644 --- a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/paramstore/FileParamStore.scala +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/paramstore/FileParamStore.scala @@ -21,7 +21,8 @@ class FileParamStore(path: String, overrides: Map[String, String] = Map.empty) e * Returns the lookup function to find a particular setting * @param key The key to retrieve */ - def lookup(key: String): LazyConfig[String] = LazyConfig { - config.getString(key) - } + def lookup(key: String): LazyConfig[String] = + LazyConfig { + config.getString(key) + } } diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/RichSparkFunctions.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/RichSparkFunctions.scala index 6751963..427f8d1 100644 --- a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/RichSparkFunctions.scala +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/RichSparkFunctions.scala @@ -13,25 +13,29 @@ object RichSparkFunctions extends LazyLogging { /* * Print schema originally returns Unit, wrapping it in the SparkFunction allows you to chain the method */ - def printSchema(): SparkFunction[A] = function.map { dataSet => - dataSet.printSchema() - dataSet - } + def printSchema(): SparkFunction[A] = + function.map { dataSet => + dataSet.printSchema() + dataSet + } def as[T: Encoder]: SparkFunction[Dataset[T]] = function.map(_.as[T]) - def cache(storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): SparkFunction[A] = function.map { - _.persist(storageLevel) - } + def cache(storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): SparkFunction[A] = + function.map { + _.persist(storageLevel) + } - def dropCache(): SparkFunction[A] = function.map { - _.unpersist() - } + def dropCache(): SparkFunction[A] = + function.map { + _.unpersist() + } def write(sink: Sink, sinks: Sink*): SparkFunction[A] = { if (sinks.isEmpty) function.map { data => sink.write(data); data - } else (sink +: sinks).foldLeft(function.cache())((f, sink) => f.write(sink)) + } + else (sink +: sinks).foldLeft(function.cache())((f, sink) => f.write(sink)) } def count(): SparkFunction[Long] = { diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/Sources.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/Sources.scala index 0623f61..4e88800 100644 --- a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/Sources.scala +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/Sources.scala @@ -7,16 +7,19 @@ object Sources { def fromDataLink(dataLink: DataLink): SparkFunction[DataFrame] = SparkFunction.of(dataLink.read()) - def fromOrc(path: String): SparkFunction[DataFrame] = SparkFunction { spark => - spark.read.orc(path) - } + def fromOrc(path: String): SparkFunction[DataFrame] = + SparkFunction { spark => + spark.read.orc(path) + } - def fromText(path: String) = SparkFunction { spark => - import spark.implicits._ - spark.read.text(path).as[String] - } + def fromText(path: String) = + SparkFunction { spark => + import spark.implicits._ + spark.read.text(path).as[String] + } - def fromCsv(path: String): SparkFunction[DataFrame] = SparkFunction { spark => - spark.read.option("inferSchema", "true").option("header", "true").csv(path) - } + def fromCsv(path: String): SparkFunction[DataFrame] = + SparkFunction { spark => + spark.read.option("inferSchema", "true").option("header", "true").csv(path) + } } diff --git a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/SparkFunction.scala b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/SparkFunction.scala index 441181c..b2d3751 100644 --- a/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/SparkFunction.scala +++ b/lighthouse-core/src/main/scala/be/dataminded/lighthouse/pipeline/SparkFunction.scala @@ -22,9 +22,10 @@ object SparkFunction { override def run(spark: SparkSession): A = function(spark) } - def of[A](block: => A): SparkFunction[A] = new SparkFunction[A] { - override def run(spark: SparkSession): A = block - } + def of[A](block: => A): SparkFunction[A] = + new SparkFunction[A] { + override def run(spark: SparkSession): A = block + } implicit val monad: Monad[SparkFunction] = new Monad[SparkFunction] { override def pure[A](x: A): SparkFunction[A] = SparkFunction(_ => x) diff --git a/lighthouse-core/src/test/scala/be/dataminded/lighthouse/pipeline/SparkFunctionTest.scala b/lighthouse-core/src/test/scala/be/dataminded/lighthouse/pipeline/SparkFunctionTest.scala index 0e8fecf..d03d508 100644 --- a/lighthouse-core/src/test/scala/be/dataminded/lighthouse/pipeline/SparkFunctionTest.scala +++ b/lighthouse-core/src/test/scala/be/dataminded/lighthouse/pipeline/SparkFunctionTest.scala @@ -138,10 +138,11 @@ class SparkFunctionTest val orders = Sources.fromCsv(ordersPath) // I wrap this in a SparkFunction again because I need the SparkSession to import the implicits - def countOrdersByCustomer(orders: DataFrame): SparkFunction[DataFrame] = SparkFunction { spark => - import spark.implicits._ - orders.groupBy('customerId).agg(count('id).as("count")) - } + def countOrdersByCustomer(orders: DataFrame): SparkFunction[DataFrame] = + SparkFunction { spark => + import spark.implicits._ + orders.groupBy('customerId).agg(count('id).as("count")) + } // No need for SparkSession, just a normal function def joinCustomersWithOrders(customers: DataFrame, ordersByCustomer: DataFrame): DataFrame = { @@ -225,15 +226,15 @@ class SparkFunctionTest def dedup(persons: Dataset[RawPerson]): Dataset[RawPerson] = persons.distinct() - def normalize(persons: Dataset[RawPerson]): SparkFunction[Dataset[BasePerson]] = SparkFunction { - spark: SparkSession => + def normalize(persons: Dataset[RawPerson]): SparkFunction[Dataset[BasePerson]] = + SparkFunction { spark: SparkSession => import spark.implicits._ persons.map { raw => val tokens = raw.name.split(" ") BasePerson(tokens(0), tokens(1), raw.age) } - } + } def returnBase(basePersons: Dataset[BasePerson]): Dataset[BasePerson] = basePersons } diff --git a/lighthouse-demo/src/main/scala/be/dataminded/lighthouse/demo/AirplanePipeline.scala b/lighthouse-demo/src/main/scala/be/dataminded/lighthouse/demo/AirplanePipeline.scala index 37293ad..066508c 100644 --- a/lighthouse-demo/src/main/scala/be/dataminded/lighthouse/demo/AirplanePipeline.scala +++ b/lighthouse-demo/src/main/scala/be/dataminded/lighthouse/demo/AirplanePipeline.scala @@ -35,34 +35,38 @@ trait AirplanePipeline { .mapN(buildView) .write(AirplaneDatalake("master" -> "view")) - private def cleanDailyWeather(dailyWeather: DataFrame) = SparkFunction { spark => - import spark.implicits._ - - val toCelsius = udf((temp: Int) => ((temp - 32) / 1.8).toInt) - - dailyWeather - .select('WBAN, 'YearMonthDay, 'Tmin, 'Tmax, 'Tavg, 'PrecipTotal, 'AvgSpeed) - .withColumn("Tmin", toCelsius('Tmin)) - .withColumn("Tmax", toCelsius('Tmax)) - .withColumn("Tavg", toCelsius('Tavg)) - .withColumnRenamed("AvgSpeed", "WAvgSpeed") - } - - protected def cleanWeatherStations(weatherStations: DataFrame) = SparkFunction { spark => - import spark.implicits._ - - weatherStations.select('WBAN, 'CallSign).withColumnRenamed("CallSign", "IAT").distinct() - } - - private def cleanAirlines(airlines: DataFrame) = SparkFunction { spark => - import spark.implicits._ - - val timestamp = udf((year: String, month: String, day: String) => f"$year${month.toInt}%02d${day.toInt}%02d".toInt) - - airlines - .select('Origin, 'Dest, 'Year, 'Month, 'DayofMonth, 'DayOfWeek, 'ArrDelay) - .withColumn("YearMonthDay", timestamp('Year, 'Month, 'DayofMonth)) - } + private def cleanDailyWeather(dailyWeather: DataFrame) = + SparkFunction { spark => + import spark.implicits._ + + val toCelsius = udf((temp: Int) => ((temp - 32) / 1.8).toInt) + + dailyWeather + .select('WBAN, 'YearMonthDay, 'Tmin, 'Tmax, 'Tavg, 'PrecipTotal, 'AvgSpeed) + .withColumn("Tmin", toCelsius('Tmin)) + .withColumn("Tmax", toCelsius('Tmax)) + .withColumn("Tavg", toCelsius('Tavg)) + .withColumnRenamed("AvgSpeed", "WAvgSpeed") + } + + protected def cleanWeatherStations(weatherStations: DataFrame) = + SparkFunction { spark => + import spark.implicits._ + + weatherStations.select('WBAN, 'CallSign).withColumnRenamed("CallSign", "IAT").distinct() + } + + private def cleanAirlines(airlines: DataFrame) = + SparkFunction { spark => + import spark.implicits._ + + val timestamp = + udf((year: String, month: String, day: String) => f"$year${month.toInt}%02d${day.toInt}%02d".toInt) + + airlines + .select('Origin, 'Dest, 'Year, 'Month, 'DayofMonth, 'DayOfWeek, 'ArrDelay) + .withColumn("YearMonthDay", timestamp('Year, 'Month, 'DayofMonth)) + } private def dailyWeatherWithStation(dailyWeather: DataFrame, weatherStations: DataFrame) = { dailyWeather.join(weatherStations, "WBAN").drop("WBAN") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1d78445..75ef5c2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,7 +3,7 @@ import sbt._ object Dependencies { private val amazonSdkVersion = "1.11.659" - private val sparkVersion = "2.4.4" + private val sparkVersion = "2.4.5" val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion % Provided val sparkSql = "org.apache.spark" %% "spark-sql" % sparkVersion % Provided @@ -18,7 +18,7 @@ object Dependencies { val betterFiles = "com.github.pathikrit" %% "better-files" % "3.8.0" // Database connectivity - val scalikejdbc = "org.scalikejdbc" %% "scalikejdbc" % "3.4.0" + val scalikejdbc = "org.scalikejdbc" %% "scalikejdbc" % "3.4.2" val h2 = "com.h2database" % "h2" % "1.4.200" % Test // Amazon AWS @@ -26,7 +26,7 @@ object Dependencies { val awsSdkSsm = "com.amazonaws" % "aws-java-sdk-ssm" % amazonSdkVersion % Provided val amazonSdk = Seq(awsSdkS3, awsSdkSsm) - val scalaTest = "org.scalatest" %% "scalatest" % "3.1.0" + val scalaTest = "org.scalatest" %% "scalatest" % "3.1.2" val testDependencies = Seq(scalaTest % Test, h2) val commonDependencies: Seq[ModuleID] = diff --git a/project/build.properties b/project/build.properties index a919a9b..797e7cc 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.8 +sbt.version=1.3.10 diff --git a/project/plugins.sbt b/project/plugins.sbt index 6d5a0f2..10e23d2 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,9 +1,9 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0") addSbtPlugin("com.frugalmechanic" % "fm-sbt-s3-resolver" % "0.19.0") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.3.0") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.0") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1") addSbtPlugin("com.typesafe.sbt" % "sbt-site" % "1.4.0") // Publish to Maven Central -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.8.1") +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.1") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")