Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@

package org.apache.wayang.api.sql.calcite.optimizer;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.*;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCostImpl;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.Prepare;
Expand All @@ -37,7 +45,7 @@
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.sql2rel.SqlToRelConverter;
Expand All @@ -54,10 +62,7 @@
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import com.google.common.collect.ImmutableList;

public class Optimizer {

Expand All @@ -67,178 +72,173 @@ public class Optimizer {
private final VolcanoPlanner volcanoPlanner;

public Optimizer(
CalciteConnectionConfig config,
SqlValidator sqlValidator,
SqlToRelConverter sqlToRelConverter,
VolcanoPlanner volcanoPlanner) {
final CalciteConnectionConfig config,
final SqlValidator sqlValidator,
final SqlToRelConverter sqlToRelConverter,
final VolcanoPlanner volcanoPlanner) {
this.config = config;
this.sqlValidator = sqlValidator;
this.sqlToRelConverter = sqlToRelConverter;
this.volcanoPlanner = volcanoPlanner;
}

public static Optimizer create(
CalciteSchema calciteSchema,
Properties configProperties,
RelDataTypeFactory typeFactory) {
final CalciteSchema calciteSchema,
final Properties configProperties,
final RelDataTypeFactory typeFactory) {

CalciteConnectionConfig config = new CalciteConnectionConfigImpl(configProperties);
final CalciteConnectionConfig config = new CalciteConnectionConfigImpl(configProperties);

CalciteCatalogReader catalogReader = new CalciteCatalogReader(
final CalciteCatalogReader catalogReader = new CalciteCatalogReader(
calciteSchema.root(),
ImmutableList.of(calciteSchema.name),
typeFactory,
config
);
config);

SqlOperatorTable operatorTable = new ChainedSqlOperatorTable(ImmutableList.of(SqlStdOperatorTable.instance()));
final SqlOperatorTable operatorTable = SqlOperatorTables.chain(
ImmutableList.of(SqlStdOperatorTable.instance()));

SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
final SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
.withLenientOperatorLookup(config.lenientOperatorLookup())
.withSqlConformance(config.conformance())
.withConformance(config.conformance())
.withDefaultNullCollation(config.defaultNullCollation())
.withIdentifierExpansion(true);

SqlValidator validator = SqlValidatorUtil.newValidator(operatorTable, catalogReader, typeFactory, validatorConfig);
final SqlValidator validator = SqlValidatorUtil.newValidator(operatorTable, catalogReader, typeFactory,
validatorConfig);

VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));
final VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);

RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
final RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));

SqlToRelConverter.Config converterConfig = SqlToRelConverter.config()
final SqlToRelConverter.Config converterConfig = SqlToRelConverter.config()
.withTrimUnusedFields(true)
.withExpand(false);

SqlToRelConverter converter = new SqlToRelConverter(
final SqlToRelConverter converter = new SqlToRelConverter(
null,
validator,
catalogReader,
cluster,
StandardConvertletTable.INSTANCE,
converterConfig
);
converterConfig);

return new Optimizer(config, validator, converter, planner);
}



//To remove
public static Optimizer create(WayangSchema wayangSchema) {
RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
// To remove
public static Optimizer create(final WayangSchema wayangSchema) {
final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();

// Configuration
Properties configProperties = new Properties();
final Properties configProperties = new Properties();
configProperties.put(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), Boolean.TRUE.toString());
configProperties.put(CalciteConnectionProperty.UNQUOTED_CASING.camelName(), Casing.UNCHANGED.toString());
configProperties.put(CalciteConnectionProperty.QUOTED_CASING.camelName(), Casing.UNCHANGED.toString());

CalciteConnectionConfig config = new CalciteConnectionConfigImpl(configProperties);
final CalciteConnectionConfig config = new CalciteConnectionConfigImpl(configProperties);

CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
final CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
rootSchema.add(wayangSchema.getSchemaName(), wayangSchema);
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(
final Prepare.CatalogReader catalogReader = new CalciteCatalogReader(
rootSchema,
Collections.singletonList(wayangSchema.getSchemaName()),
typeFactory,
config
);
config);

SqlOperatorTable operatorTable = new ChainedSqlOperatorTable(ImmutableList.of(SqlStdOperatorTable.instance()));
final SqlOperatorTable operatorTable = SqlOperatorTables.chain(
ImmutableList.of(SqlStdOperatorTable.instance()));

SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
final SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
.withLenientOperatorLookup(config.lenientOperatorLookup())
.withSqlConformance(config.conformance())
.withConformance(config.conformance())
.withDefaultNullCollation(config.defaultNullCollation())
.withIdentifierExpansion(true);

SqlValidator validator = SqlValidatorUtil.newValidator(operatorTable, catalogReader, typeFactory, validatorConfig);
final SqlValidator validator = SqlValidatorUtil.newValidator(operatorTable, catalogReader, typeFactory,
validatorConfig);

VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));
final VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);

RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
final RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));

SqlToRelConverter.Config converterConfig = SqlToRelConverter.config()
final SqlToRelConverter.Config converterConfig = SqlToRelConverter.config()
.withTrimUnusedFields(true)
.withExpand(false);

SqlToRelConverter converter = new SqlToRelConverter(
final SqlToRelConverter converter = new SqlToRelConverter(
null,
validator,
catalogReader,
cluster,
StandardConvertletTable.INSTANCE,
converterConfig
);
converterConfig);

return new Optimizer(config, validator, converter, planner);
}


public SqlNode parseSql(String sql) throws SqlParseException {
SqlParser.Config parserConfig = SqlParser.config()
public SqlNode parseSql(final String sql) throws SqlParseException {
final SqlParser.Config parserConfig = SqlParser.config()
.withCaseSensitive(config.caseSensitive())
.withQuotedCasing(config.quotedCasing())
.withUnquotedCasing(config.unquotedCasing())
.withConformance(config.conformance());

SqlParser parser = SqlParser.create(sql, parserConfig);
final SqlParser parser = SqlParser.create(sql, parserConfig);

return parser.parseStmt();
}

public SqlNode validate(SqlNode sqlNode) {
public SqlNode validate(final SqlNode sqlNode) {
return sqlValidator.validate(sqlNode);
}

public RelNode convert(SqlNode sqlNode) {
RelRoot root = sqlToRelConverter.convertQuery(sqlNode, false, true);
public RelNode convert(final SqlNode sqlNode) {
final RelRoot root = sqlToRelConverter.convertQuery(sqlNode, false, true);
return root.rel;
}

//TODO: create a basic ruleset
public RelNode optimize(RelNode node, RelTraitSet requiredTraitSet, RuleSet rules) {
Program program = Programs.of(RuleSets.ofList(rules));
public RelNode optimize(final RelNode node, final RelTraitSet requiredTraitSet, final RuleSet rules) {
final Program program = Programs.of(RuleSets.ofList(rules));

return program.run(
volcanoPlanner,
node,
requiredTraitSet,
Collections.emptyList(),
Collections.emptyList()
);
Collections.emptyList());
}

public static WayangPlan convert(RelNode relNode) {
public static WayangPlan convert(final RelNode relNode) {
return convert(relNode, new ArrayList<>());
}

public static WayangPlan convert(RelNode relNode, Collection<Record> collector) {
public static WayangPlan convert(final RelNode relNode, final Collection<Record> collector) {

LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);
final LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);

Operator op = new WayangRelConverter().convert(relNode);
final Operator op = new WayangRelConverter().convert(relNode);

op.connectTo(0, sink, 0);
return new WayangPlan(sink);
}

public static WayangPlan convertWithConfig(RelNode relNode, Configuration configuration, Collection<Record> collector) {
LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);
public static WayangPlan convertWithConfig(final RelNode relNode, final Configuration configuration,
final Collection<Record> collector) {
final LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);

Operator op = new WayangRelConverter(configuration).convert(relNode);
final Operator op = new WayangRelConverter(configuration).convert(relNode);

op.connectTo(0, sink, 0);
return new WayangPlan(sink);
}


public static class ConfigProperties {

public static Properties getDefaults() {
Properties configProperties = new Properties();
final Properties configProperties = new Properties();
configProperties.put(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), Boolean.TRUE.toString());
configProperties.put(CalciteConnectionProperty.UNQUOTED_CASING.camelName(), Casing.UNCHANGED.toString());
configProperties.put(CalciteConnectionProperty.QUOTED_CASING.camelName(), Casing.UNCHANGED.toString());
Expand All @@ -247,7 +247,4 @@ public static Properties getDefaults() {

}


}


Loading