Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ For programmatic access, add SoftClient4ES to your project:
resolvers += "Softnetwork" at "https://softnetwork.jfrog.io/artifactory/releases/"

// Choose your Elasticsearch version
libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-java-client" % "0.17.2"
libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-java-client" % "0.17.3"
// Add the community extensions for materialized views (optional)
libraryDependencies += "app.softnetwork.elastic" %% "softclient4es8-community-extensions" % "0.1.0"
```
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ThisBuild / organization := "app.softnetwork"

name := "softclient4es"

ThisBuild / version := "0.17.2"
ThisBuild / version := "0.17.3"

ThisBuild / scalaVersion := scala213

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ trait IndicesApi extends ElasticClientHelpers {

executeDeleteIndex(index) match {
case success @ ElasticSuccess(true) =>
invalidateSchema(index)
logger.info(s"✅ Index '$index' deleted successfully")
success
case success @ ElasticSuccess(_) =>
Expand Down
14 changes: 7 additions & 7 deletions documentation/sql/dml_statements.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,13 @@ ON CONFLICT (uuid) DO UPDATE;
`COPY INTO` transparently supports remote file systems by auto-detecting the URI scheme in the `FROM` path.
No SQL syntax change is required — simply use the appropriate URI scheme.

| URI scheme | File system | Required JAR |
| --- | --- | --- |
| `s3a://` or `s3://` | AWS S3 | `hadoop-aws` |
| `abfs://`, `abfss://`, `wasb://`, `wasbs://` | Azure ADLS Gen2 / Blob Storage | `hadoop-azure` |
| `gs://` | Google Cloud Storage | `gcs-connector-hadoop3` |
| `hdfs://` | HDFS | _(bundled with hadoop-client)_ |
| _(no scheme / local path)_ | Local filesystem | _(no extra JAR needed)_ |
| URI scheme | File system | Required JAR |
|----------------------------------------------|--------------------------------|--------------------------------|
| `s3a://` or `s3://` | AWS S3 | `hadoop-aws` |
| `abfs://`, `abfss://`, `wasb://`, `wasbs://` | Azure ADLS Gen2 / Blob Storage | `hadoop-azure` |
| `gs://` | Google Cloud Storage | `gcs-connector-hadoop3` |
| `hdfs://` | HDFS | _(bundled with hadoop-client)_ |
| _(no scheme / local path)_ | Local filesystem | _(no extra JAR needed)_ |

> **Important:** Cloud connector JARs are declared as `provided` dependencies and are **not bundled** in the library.
> They must be present in the runtime classpath (e.g. added to the CLI assembly or the application's fat-jar).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ package object sql {
override def sql: String = s"""'$value'"""
override def baseType: SQLType = SQLTypes.Varchar

override def ddl: String = s""""$value""""
override def ddl: String = s""""${value.replace("\\", "\\\\").replace("\"", "\\\"")}""""
}

case object IdValue extends Value[String]("_id") with TokenRegex {
Expand Down
63 changes: 14 additions & 49 deletions sql/src/main/scala/app/softnetwork/elastic/sql/parser/Parser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package app.softnetwork.elastic.sql.parser

import app.softnetwork.elastic.sql.PainlessContextType.Processor
import app.softnetwork.elastic.sql._
import app.softnetwork.elastic.sql.function.time.DateTimeFunction
import app.softnetwork.elastic.sql.function._
Expand Down Expand Up @@ -230,42 +229,24 @@ object Parser
case None => None
}

def script: PackratParser[PainlessScript] =
("SCRIPT" ~ "AS") ~ start ~ (identifierWithArithmeticExpression |
def scriptValue: PackratParser[PainlessScript] = identifierWithArithmeticExpression |
identifierWithTransformation |
identifierWithIntervalFunction |
identifierWithFunction) ~ end ^^ { case _ ~ _ ~ s ~ _ => s }
identifierWithFunction

def script: PackratParser[PainlessScript] =
("SCRIPT" ~ "AS") ~ start ~ scriptValue ~ end ^^ { case _ ~ _ ~ s ~ _ => s }

def column: PackratParser[Column] =
ident ~ extension_type ~ (script | multiFields) ~ defaultVal ~ notNull ~ comment ~ (options | success(
ListMap.empty[String, Value[_]]
)) ^^ { case name ~ dt ~ mfs ~ dv ~ nn ~ ct ~ opts =>
mfs match {
case script: PainlessScript =>
val ctx = PainlessContext(Processor)
val scr = script.painless(Some(ctx))
val temp = s"$ctx$scr"
val ret =
temp.split(";") match {
case Array(single) if single.trim.startsWith("return ") =>
val stripReturn = single.trim.stripPrefix("return ").trim
s"ctx.$name = $stripReturn"
case multiple =>
val last = multiple.last.trim
val temp = multiple.dropRight(1) :+ s" ctx.$name = $last"
temp.mkString(";")
}
Column(
name,
dt,
Some(
ScriptProcessor(
script = script.sql,
column = name,
dataType = dt,
source = ret
)
),
Some(ScriptProcessor.fromScript(name, script, Some(dt))),
Nil,
dv,
nn,
Expand Down Expand Up @@ -521,27 +502,9 @@ object Parser

def alterColumnScript: PackratParser[AlterColumnScript] =
alterColumnIfExists ~ ident ~ "SET" ~ script ^^ { case ie ~ name ~ _ ~ ns =>
val ctx = PainlessContext(Processor)
val scr = ns.painless(Some(ctx))
val temp = s"$ctx$scr"
val ret =
temp.split(";") match {
case Array(single) if single.trim.startsWith("return ") =>
val stripReturn = single.trim.stripPrefix("return ").trim
s"ctx.$name = $stripReturn"
case multiple =>
val last = multiple.last.trim
val temp = multiple.dropRight(1) :+ s" ctx.$name = $last"
temp.mkString(";")
}
AlterColumnScript(
name,
ScriptProcessor(
script = ns.sql,
column = name,
dataType = ns.out,
source = ret
),
ScriptProcessor.fromScript(name, ns, Some(ns.out)),
ifExists = ie
)
}
Expand Down Expand Up @@ -1025,10 +988,12 @@ object Parser

/** UPDATE table SET col1 = v1, col2 = v2 [WHERE ...] */
def update: PackratParser[Update] =
("UPDATE" ~> ident) ~ ("SET" ~> repsep(ident ~ "=" ~ value, separator)) ~ where.? ^^ {
case table ~ assigns ~ w =>
val values = ListMap(assigns.map { case col ~ _ ~ v => col -> v }: _*)
Update(table, values, w)
("UPDATE" ~> ident) ~ ("SET" ~> repsep(
ident ~ "=" ~ (value | scriptValue),
separator
)) ~ where.? ^^ { case table ~ assigns ~ w =>
val values = ListMap(assigns.map { case col ~ _ ~ v => col -> v }: _*)
Update(table, values, w)
}

/** DELETE FROM table [WHERE ...] */
Expand Down Expand Up @@ -1094,7 +1059,7 @@ trait Parser
val endStruct: Parser[String] = "}"

def objectValue: PackratParser[ObjectValue] =
lparen ~> rep1sep(option, comma) <~ rparen ^^ { opts =>
lparen ~> repsep(option, comma) <~ rparen ^^ { opts =>
ObjectValue(ListMap(opts: _*))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import app.softnetwork.elastic.sql.{
Identifier,
LongValue,
LongValues,
Null,
ParamValue,
PiValue,
RandomValue,
Expand All @@ -38,9 +39,11 @@ package object `type` {
trait TypeParser { self: Parser =>

def literal: PackratParser[StringValue] =
(("\"" ~> """([^"\\]|\\.)*""".r <~ "\"") | ("'" ~> """([^'\\]|\\.)*""".r <~ "'")) ^^ { str =>
StringValue(str)
}
(("\"" ~> """([^"\\]|\\.)*""".r <~ "\"") ^^ { str =>
StringValue(str.replace("\\\"", "\"").replace("\\\\", "\\"))
}) | (("'" ~> """([^'\\]|\\.)*""".r <~ "'") ^^ { str =>
StringValue(str.replace("\\'", "'").replace("\\\\", "\\"))
})

def long: PackratParser[LongValue] =
"""(-)?(0|[1-9]\d*)""".r ^^ (str => LongValue(str.toLong))
Expand All @@ -59,6 +62,9 @@ package object `type` {
def param: PackratParser[ParamValue.type] =
"?" ^^ (_ => ParamValue)

def nullValue: PackratParser[Null.type] =
"(?i)NULL\\b".r ^^ (_ => Null)

def literals: PackratParser[Value[_]] = "[" ~> repsep(literal, ",") <~ "]" ^^ { list =>
StringValues(list)
}
Expand All @@ -78,7 +84,7 @@ package object `type` {
def array: PackratParser[Value[_]] = literals | longs | doubles | booleans

def value: PackratParser[Value[_]] =
literal | pi | random | double | long | boolean | param | array
literal | pi | random | double | long | boolean | nullValue | param | array

def identifierWithValue: Parser[Identifier] = (value ^^ functionAsIdentifier) >> cast

Expand Down
41 changes: 28 additions & 13 deletions sql/src/main/scala/app/softnetwork/elastic/sql/query/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -453,20 +453,31 @@ package object query {
}
}

case class Update(table: String, values: ListMap[String, Value[_]], where: Option[Where])
case class Update(table: String, values: ListMap[String, PainlessScript], where: Option[Where])
extends DmlStatement {
override def sql: String = s"UPDATE $table SET ${values
.map { case (k, v) => s"$k = ${v.value}" }
.mkString(", ")}${where.map(w => s" ${w.sql}").getOrElse("")}"
.map { case (k, v) =>
v match {
case value: Value[_] => s"$k = ${value.value}"
case painlessScript => s"$k = ${painlessScript.sql}"
}
}
.mkString(", ")}${where.map(w => s"${w.sql}").getOrElse("")}"

lazy val customPipeline: IngestPipeline = IngestPipeline(
s"update-$table-${Instant.now}",
s"update-$table-${Instant.now.toEpochMilli}",
IngestPipelineType.Custom,
values.map { case (k, v) =>
SetProcessor(
column = k,
value = v
)
v match {
case value: Value[_] =>
SetProcessor(
pipelineType = IngestPipelineType.Custom,
column = k,
value = value
)
case script =>
ScriptProcessor.fromScript(k, script, pipelineType = IngestPipelineType.Custom)
}
}.toSeq
)

Expand Down Expand Up @@ -833,11 +844,15 @@ package object query {
case None => Nil
}

lazy val tableType: TableType = (options.get("type") match {
case Some(value) =>
value match {
case s: StringValue => Some(TableType(s.value))
case _ => None
lazy val tableType: TableType = (mappings.get("_meta") match {
case Some(meta) =>
meta match {
case o: ObjectValue =>
o.value.get("type") match {
case Some(s: StringValue) => Some(TableType(s.value))
case _ => None
}
case _ => None
}
case None => None
}).getOrElse(TableType.Regular)
Expand Down
99 changes: 72 additions & 27 deletions sql/src/main/scala/app/softnetwork/elastic/sql/schema/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,35 @@ package object schema {

}

object ScriptProcessor {
def fromScript(
column: String,
script: PainlessScript,
dataType: Option[SQLType] = None,
pipelineType: IngestPipelineType = IngestPipelineType.Default
): ScriptProcessor = {
val ctx = PainlessContext(PainlessContextType.Processor)
val scr = script.painless(Some(ctx))
val painless = s"$ctx$scr"
val source = painless.split(";") match {
case Array(single) if single.trim.startsWith("return ") =>
val stripped = single.trim.stripPrefix("return ").trim
s"ctx.$column = $stripped"
case parts =>
val last = parts.last.trim
val updated = parts.dropRight(1) :+ s" ctx.$column = $last"
updated.mkString(";")
}
ScriptProcessor(
pipelineType = pipelineType,
script = script.sql,
column = column,
dataType = dataType.getOrElse(script.out),
source = source
)
}
}

case class RenameProcessor(
pipelineType: IngestPipelineType = IngestPipelineType.Default,
description: Option[String] = None,
Expand Down Expand Up @@ -908,29 +937,43 @@ package object schema {
)
)
}
.map("script" -> _) ++ ListMap(
"multi_fields" -> ObjectValue(
ListMap(multiFields.map(field => field.name -> ObjectValue(field._meta)): _*)
)
) ++ (if (lineage.nonEmpty) {
// ✅ Lineage as map of paths
ListMap(
"lineage" -> ObjectValue(
lineage.map { case (pathId, chain) =>
pathId -> ObjectValues(
chain.map { case (table, column) =>
ObjectValue(
ListMap(
"table" -> StringValue(table),
"column" -> StringValue(column)
)
)
}
)
}
)
)
} else ListMap.empty)
.map("script" -> _) ++ (if (multiFields.nonEmpty)
ListMap(
"multi_fields" -> ObjectValue(
ListMap(
multiFields.map(field =>
field.name -> ObjectValue(field._meta)
): _*
)
)
)
else ListMap.empty[String, Value[_]]) ++ (if (lineage.nonEmpty) {
// ✅ Lineage as map of paths
ListMap(
"lineage" -> ObjectValue(
lineage.map { case (pathId, chain) =>
pathId -> ObjectValues(
chain.map {
case (
table,
column
) =>
ObjectValue(
ListMap(
"table" -> StringValue(
table
),
"column" -> StringValue(
column
)
)
)
}
)
}
)
)
} else ListMap.empty)
}

def updateStruct(): Column = {
Expand Down Expand Up @@ -1443,9 +1486,11 @@ package object schema {
"columns" -> ObjectValue(cols.map { case (name, col) => name -> ObjectValue(col._meta) })
) ++ ListMap(
"type" -> StringValue(tableType.name)
) ++ ListMap(
"materialized_views" -> StringValues(materializedViews.map(StringValue))
)
) ++ (if (materializedViews.nonEmpty)
ListMap(
"materialized_views" -> StringValues(materializedViews.map(StringValue))
)
else ListMap.empty[String, Value[_]])

def update(): Table = {
val updated =
Expand Down Expand Up @@ -1490,7 +1535,7 @@ package object schema {
""
}
val separator = if (partitionBy.nonEmpty) "," else ""
s"$separator\nOPTIONS = ${Seq(
s"$separator\nOPTIONS ${Seq(
mappingOpts,
settingsOpts,
aliasesOpts
Expand Down
Loading