diff --git a/wayang-platforms/wayang-flink/pom.xml b/wayang-platforms/wayang-flink/pom.xml
index 677e0b1e9..24bc325ba 100644
--- a/wayang-platforms/wayang-flink/pom.xml
+++ b/wayang-platforms/wayang-flink/pom.xml
@@ -116,6 +116,16 @@
flink-hadoop-compatibility_2.12
${flink.version}
+
+ org.apache.flink
+ flink-connector-files
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
org.apache.commons
commons-math3
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/channels/DataStreamChannel.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/channels/DataStreamChannel.java
new file mode 100644
index 000000000..7ed041a19
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/channels/DataStreamChannel.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.channels;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.executionplan.Channel;
+import org.apache.wayang.core.plan.wayangplan.OutputSlot;
+import org.apache.wayang.core.platform.AbstractChannelInstance;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.Executor;
+import org.apache.wayang.flink.execution.FlinkExecutor;
+
+import java.util.OptionalLong;
+
+public class DataStreamChannel extends Channel {
+
+ /**
+ * {@link ChannelInstance} implementation for {@link DataStream}s.
+ */
+ public class Instance extends AbstractChannelInstance {
+
+ private DataStream> dataStream;
+
+ // TODO: this.size is currently always 0
+ private long size;
+
+ public Instance(final FlinkExecutor executor,
+ final OptimizationContext.OperatorContext producerOperatorContext,
+ final int producerOutputIndex) {
+ super(executor, producerOperatorContext, producerOutputIndex);
+ }
+
+ public void accept(final DataStream> dataStream) {
+ this.dataStream = dataStream;
+ }
+
+ @SuppressWarnings("unchecked")
+ public DataStream provideDataStream() {
+ return (DataStream) this.dataStream;
+ }
+
+ @Override
+ public OptionalLong getMeasuredCardinality() {
+ return this.size == 0 ? super.getMeasuredCardinality() : OptionalLong.of(this.size);
+ }
+
+ @Override
+ public DataStreamChannel getChannel() {
+ return DataStreamChannel.this;
+ }
+
+ @Override
+ protected void doDispose() {
+ this.dataStream = null;
+ }
+ }
+
+ public static final ChannelDescriptor DESCRIPTOR = new ChannelDescriptor(
+ DataStreamChannel.class, true, false);
+
+ public static final ChannelDescriptor DESCRIPTOR_MANY = new ChannelDescriptor(
+ DataStreamChannel.class, true, false);
+
+ public DataStreamChannel(final ChannelDescriptor descriptor, final OutputSlot> outputSlot) {
+ super(descriptor, outputSlot);
+ assert descriptor == DESCRIPTOR || descriptor == DESCRIPTOR_MANY;
+ this.markForInstrumentation();
+ }
+
+ private DataStreamChannel(final DataStreamChannel parent) {
+ super(parent);
+ }
+
+ @Override
+ public Channel copy() {
+ return new DataStreamChannel(this);
+ }
+
+ @Override
+ public Instance createInstance(final Executor executor,
+ final OptimizationContext.OperatorContext producerOperatorContext,
+ final int producerOutputIndex) {
+ return new Instance((FlinkExecutor) executor, producerOperatorContext, producerOutputIndex);
+ }
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/FunctionCompiler.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/FunctionCompiler.java
index e1f3e0e6c..20a423cbf 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/FunctionCompiler.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/compiler/FunctionCompiler.java
@@ -58,7 +58,7 @@ public class FunctionCompiler {
* @param output type of the transformation
* @return a compiled function
*/
- public MapFunction compile(TransformationDescriptor descriptor) {
+ public static MapFunction compile(TransformationDescriptor descriptor) {
// This is a dummy method but shows the intention of having something compilable in the descriptors.
Function function = descriptor.getJavaImplementation();
return (MapFunction) i -> function.apply(i);
@@ -72,7 +72,7 @@ public MapFunction compile(TransformationDescriptor descripto
* @param output type of the transformation
* @return a compiled function
*/
- public FlatMapFunction compile(FunctionDescriptor.SerializableFunction> flatMapDescriptor) {
+ public static FlatMapFunction compile(FunctionDescriptor.SerializableFunction> flatMapDescriptor) {
return (t, collector) -> flatMapDescriptor.apply(t).forEach(collector::collect);
}
@@ -83,7 +83,7 @@ public FlatMapFunction compile(FunctionDescriptor.SerializableFunct
* @param input/output type of the transformation
* @return a compiled function
*/
- public ReduceFunction compile(ReduceDescriptor descriptor) {
+ public static ReduceFunction compile(ReduceDescriptor descriptor) {
// This is a dummy method but shows the intention of having something compilable in the descriptors.
BiFunction reduce_function = descriptor.getJavaImplementation();
return new ReduceFunction() {
@@ -94,26 +94,26 @@ public T reduce(T t, T t1) throws Exception {
};
}
- public FilterFunction compile(PredicateDescriptor.SerializablePredicate predicateDescriptor) {
- return t -> predicateDescriptor.test(t);
+ public static FilterFunction compile(PredicateDescriptor.SerializablePredicate predicateDescriptor) {
+ return predicateDescriptor::test;
}
- public OutputFormat compile(ConsumerDescriptor.SerializableConsumer consumerDescriptor) {
+ public static OutputFormat compile(ConsumerDescriptor.SerializableConsumer consumerDescriptor) {
return new OutputFormatConsumer(consumerDescriptor);
}
- public KeySelector compileKeySelector(TransformationDescriptor descriptor){
+ public static KeySelector compileKeySelector(TransformationDescriptor descriptor){
return new KeySelectorFunction(descriptor);
}
- public CoGroupFunction compileCoGroup(){
+ public static CoGroupFunction compileCoGroup(){
return new FlinkCoGroupFunction();
}
- public TextOutputFormat.TextFormatter compileOutput(TransformationDescriptor formattingDescriptor) {
+ public static TextOutputFormat.TextFormatter compileOutput(TransformationDescriptor formattingDescriptor) {
Function format = formattingDescriptor.getJavaImplementation();
return new TextOutputFormat.TextFormatter(){
@@ -132,7 +132,7 @@ public String format(T value) {
* @param output type of the transformation
* @return a compiled function
*/
- public MapPartitionFunction compile(MapPartitionsDescriptor descriptor){
+ public static MapPartitionFunction compile(MapPartitionsDescriptor descriptor){
Function, Iterable> function = descriptor.getJavaImplementation();
return new MapPartitionFunction() {
@Override
@@ -146,13 +146,12 @@ public void mapPartition(Iterable iterable, Collector collector) throws Ex
};
}
- public WayangConvergenceCriterion compile(PredicateDescriptor> descriptor){
- FunctionDescriptor.SerializablePredicate> predicate = descriptor.getJavaImplementation();
- return new WayangConvergenceCriterion(predicate);
+ public static WayangConvergenceCriterion compile(PredicateDescriptor> descriptor){
+ return new WayangConvergenceCriterion(descriptor.getJavaImplementation());
}
- public RichFlatMapFunction compile(FunctionDescriptor.ExtendedSerializableFunction> flatMapDescriptor, FlinkExecutionContext exe) {
+ public static RichFlatMapFunction compile(FunctionDescriptor.ExtendedSerializableFunction> flatMapDescriptor, FlinkExecutionContext exe) {
return new RichFlatMapFunction() {
@Override
@@ -168,7 +167,7 @@ public void flatMap(I value, Collector out) throws Exception {
}
- public RichMapFunction compile(TransformationDescriptor mapDescriptor, FlinkExecutionContext fex ) {
+ public static RichMapFunction compile(TransformationDescriptor mapDescriptor, FlinkExecutionContext fex ) {
FunctionDescriptor.ExtendedSerializableFunction map = (FunctionDescriptor.ExtendedSerializableFunction) mapDescriptor.getJavaImplementation();
return new RichMapFunction() {
@@ -186,7 +185,7 @@ public void open(Configuration parameters) throws Exception {
- public RichMapPartitionFunction compile(MapPartitionsDescriptor descriptor, FlinkExecutionContext fex){
+ public static RichMapPartitionFunction compile(MapPartitionsDescriptor descriptor, FlinkExecutionContext fex){
FunctionDescriptor.ExtendedSerializableFunction, Iterable> function =
(FunctionDescriptor.ExtendedSerializableFunction, Iterable>)
descriptor.getJavaImplementation();
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/execution/FlinkExecutor.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/execution/FlinkExecutor.java
index 960f4165c..052e97d47 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/execution/FlinkExecutor.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/execution/FlinkExecutor.java
@@ -19,6 +19,7 @@
package org.apache.wayang.flink.execution;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.OptimizationContext;
@@ -35,7 +36,6 @@
import org.apache.wayang.flink.compiler.FunctionCompiler;
import org.apache.wayang.flink.operators.FlinkExecutionOperator;
import org.apache.wayang.flink.platform.FlinkPlatform;
-import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import java.util.Arrays;
import java.util.Collection;
@@ -56,6 +56,12 @@ public class FlinkExecutor extends PushExecutorTemplate {
*/
public ExecutionEnvironment fee;
+
+ /**
+ * {@link StreamExecutionEnvironment} for bounded and continuous streams.
+ */
+ public StreamExecutionEnvironment sEnv;
+
/**
* Compiler to create flink UDFs.
*/
@@ -76,6 +82,7 @@ public FlinkExecutor(FlinkPlatform flinkPlatform, Job job) {
super(job);
this.platform = flinkPlatform;
this.flinkContextReference = this.platform.getFlinkContext(job);
+ this.sEnv = flinkPlatform.streamExecutionEnvironment;
this.fee = this.flinkContextReference.get();
this.numDefaultPartitions = (int) this.getConfiguration().getLongProperty("wayang.flink.parallelism");
this.fee.setParallelism(this.numDefaultPartitions);
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/BoundedTextFileSourceMapping.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/BoundedTextFileSourceMapping.java
new file mode 100644
index 000000000..4fc7085d5
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/BoundedTextFileSourceMapping.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.flink.operators.FlinkBoundedTextFileSource;
+import org.apache.wayang.flink.platform.FlinkPlatform;
+
+public class BoundedTextFileSourceMapping implements Mapping {
+ @Override
+ public Collection getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ FlinkPlatform.getInstance()
+ ));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ final OperatorPattern> operatorPattern = new OperatorPattern<>(
+ "source", new TextFileSource("", null), false
+ );
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators(
+ (matchedOperator, epoch) -> new FlinkBoundedTextFileSource(matchedOperator).at(epoch)
+ );
+ }
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
index 9a3197937..0ea97860c 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/Mappings.java
@@ -19,16 +19,21 @@
package org.apache.wayang.flink.mapping;
import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.flink.plugin.FlinkBasicPlugin;
import java.util.Arrays;
import java.util.Collection;
/**
- * Register for {@link Mapping}s for this platform.
+ * Register for {@link Mapping}s for {@link FlinkBasicPlugin}.
*/
public class Mappings {
- public static Collection BASIC_MAPPINGS = Arrays.asList(
+ /**
+ * Mappings using Flink's DataSets
+ * @deprecated DataSet API in Flink has been deprecated move over to bounded streams for a 1-to-1 replacement {@link #BOUNDED_STREAM_MAPPINGS}.
+ */
+ public static final Collection BASIC_MAPPINGS = Arrays.asList(
new CartesianMapping(),
new CoGroupMapping(),
new CollectionSourceMapping(),
@@ -60,6 +65,12 @@ public class Mappings {
new ZipWithIdMapping()
);
+ public static final Collection BOUNDED_STREAM_MAPPINGS = Arrays.asList(
+ new BoundedTextFileSourceMapping(),
+ new StreamedMapMapping(),
+ new StreamedJoinMapping(),
+ new StreamedLocalCallbackSinkMapping()
+ );
}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/StreamedJoinMapping.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/StreamedJoinMapping.java
new file mode 100644
index 000000000..45c66e5dc
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/StreamedJoinMapping.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.wayang.basic.operators.JoinOperator;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.operators.FlinkDataStreamJoinOperator;
+import org.apache.wayang.flink.platform.FlinkPlatform;
+
+/**
+ * Mapping from {@link JoinOperator} to {@link FlinkDataStreamJoinOperator}.
+ */
+public class StreamedJoinMapping implements Mapping {
+ @Override
+ public Collection getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ FlinkPlatform.getInstance()));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ final OperatorPattern> operatorPattern = new OperatorPattern<>(
+ "join", new JoinOperator<>(null, null, DataSetType.none(), DataSetType.none()), false);
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators>(
+ (matchedOperator, epoch) -> new FlinkDataStreamJoinOperator<>(matchedOperator).at(epoch));
+ }
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/StreamedLocalCallbackSinkMapping.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/StreamedLocalCallbackSinkMapping.java
new file mode 100644
index 000000000..21d7fcb20
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/StreamedLocalCallbackSinkMapping.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.mapping;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.wayang.basic.operators.LocalCallbackSink;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.operators.FlinkDataStreamLocalCallbackSink;
+import org.apache.wayang.flink.platform.FlinkPlatform;
+
+public class StreamedLocalCallbackSinkMapping implements Mapping {
+ @Override
+ public Collection getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ FlinkPlatform.getInstance()));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ final OperatorPattern> operatorPattern = new OperatorPattern<>(
+ "sink", new LocalCallbackSink<>(null, DataSetType.none()),
+ false);
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators>(
+ (matchedOperator, epoch) -> new FlinkDataStreamLocalCallbackSink<>(matchedOperator).at(epoch));
+ }
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/StreamedMapMapping.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/StreamedMapMapping.java
new file mode 100644
index 000000000..0d966cb13
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/StreamedMapMapping.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.wayang.basic.operators.MapOperator;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.flink.operators.FlinkDataStreamMapOperator;
+import org.apache.wayang.flink.platform.FlinkPlatform;
+
+/**
+ * Mapping from {@link MapOperator} to {@link FlinkDataStreamMapOperator}.
+ */
+public class StreamedMapMapping implements Mapping {
+ @Override
+ public Collection getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ FlinkPlatform.getInstance()));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ final OperatorPattern> operatorPattern = new OperatorPattern<>(
+ "map", new MapOperator<>(null, DataSetType.none(), DataSetType.none()), false);
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators>(
+ (matchedOperator, epoch) -> new FlinkDataStreamMapOperator<>(matchedOperator).at(epoch));
+ }
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/TextFileSourceMapping.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/TextFileSourceMapping.java
index a57bb6643..959d57edc 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/TextFileSourceMapping.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/mapping/TextFileSourceMapping.java
@@ -33,7 +33,6 @@
/**
* Mapping from {@link TextFileSource} to {@link FlinkTextFileSource}.
*/
-@SuppressWarnings("unchecked")
public class TextFileSourceMapping implements Mapping {
@Override
public Collection getTransformations() {
@@ -46,7 +45,7 @@ public Collection getTransformations() {
private SubplanPattern createSubplanPattern() {
- final OperatorPattern operatorPattern = new OperatorPattern(
+ final OperatorPattern> operatorPattern = new OperatorPattern<>(
"source", new TextFileSource("", null), false
);
return SubplanPattern.createSingleton(operatorPattern);
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkBoundedTextFileSource.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkBoundedTextFileSource.java
new file mode 100644
index 000000000..dff1f522e
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkBoundedTextFileSource.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.optimizer.OptimizationContext.OperatorContext;
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.flink.channels.DataStreamChannel;
+import org.apache.wayang.flink.execution.FlinkExecutor;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Opens a Flink bounded {@link DataStream} on a {@link TextFileSource}.
+ */
+public class FlinkBoundedTextFileSource extends TextFileSource implements FlinkExecutionOperator {
+
+ public FlinkBoundedTextFileSource(final TextFileSource that) {
+ super(that);
+ }
+
+ public FlinkBoundedTextFileSource(final String inputUrl) {
+ super(inputUrl);
+ }
+
+ @Override
+ public List getSupportedInputChannels(final int index) {
+ throw new UnsupportedOperationException(String.format("%s does not have input channels.", this));
+ }
+
+ @Override
+ public List getSupportedOutputChannels(final int index) {
+ return Arrays.asList(DataStreamChannel.DESCRIPTOR, DataStreamChannel.DESCRIPTOR_MANY);
+ }
+
+ @Override
+ public Tuple, Collection> evaluate(final ChannelInstance[] inputs,
+ final ChannelInstance[] outputs, final FlinkExecutor flinkExecutor, final OperatorContext operatorContext)
+ throws Exception {
+ assert inputs.length == this.getNumInputs();
+ assert outputs.length == this.getNumOutputs();
+
+ final DataStreamChannel.Instance output = (DataStreamChannel.Instance) outputs[0];
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final FileSource fs = FileSource
+ .forRecordStreamFormat(new TextLineInputFormat(), new Path(this.getInputUrl()))
+ .build();
+
+ final DataStream dataStream = env.fromSource(fs, WatermarkStrategy.noWatermarks(),
+ "FlinkDataStreamFileSource[" + this.getInputUrl() + "]");
+
+ output.accept(dataStream);
+
+ final ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
+ prepareLineageNode.add(LoadProfileEstimators.createFromSpecification(
+ "wayang.flink.textfilesource.load.prepare", flinkExecutor.getConfiguration()));
+
+ final ExecutionLineageNode mainLineageNode = new ExecutionLineageNode(operatorContext);
+ mainLineageNode.add(LoadProfileEstimators.createFromSpecification(
+ "wayang.flink.textfilesource.load.main", flinkExecutor.getConfiguration()));
+
+ output.getLineage().addPredecessor(mainLineageNode);
+
+ return prepareLineageNode.collectAndMark();
+ }
+
+ @Override
+ public boolean containsAction() {
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkContinuousTextFileSource.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkContinuousTextFileSource.java
new file mode 100644
index 000000000..2da7d49fa
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkContinuousTextFileSource.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.optimizer.OptimizationContext.OperatorContext;
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.flink.channels.DataStreamChannel;
+import org.apache.wayang.flink.execution.FlinkExecutor;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Opens a Flink continuous {@link DataStream} that monitors a file directory.
+ */
+public class FlinkContinuousTextFileSource extends TextFileSource implements FlinkExecutionOperator {
+
+ public FlinkContinuousTextFileSource(final TextFileSource that) {
+ super(that);
+ }
+
+ public FlinkContinuousTextFileSource(final String inputUrl) {
+ super(inputUrl);
+ }
+
+ @Override
+ public List getSupportedInputChannels(final int index) {
+ throw new UnsupportedOperationException("Unimplemented method 'getSupportedInputChannels'");
+ }
+
+ @Override
+ public List getSupportedOutputChannels(final int index) {
+ throw new UnsupportedOperationException("Unimplemented method 'getSupportedOutputChannels'");
+ }
+
+ @Override
+ public Tuple, Collection> evaluate(final ChannelInstance[] inputs,
+ final ChannelInstance[] outputs, final FlinkExecutor flinkExecutor, final OperatorContext operatorContext)
+ throws Exception {
+ assert inputs.length == this.getNumInputs();
+ assert outputs.length == this.getNumOutputs();
+
+ final DataStreamChannel.Instance output = (DataStreamChannel.Instance) outputs[0];
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final FileSource fs = FileSource
+ .forRecordStreamFormat(new TextLineInputFormat(), new Path(this.getInputUrl()))
+ //TODO: I set manually 1 here but should be in config.
+ .monitorContinuously(Duration.ofSeconds(1))
+ .build();
+
+ final DataStream dataStream = env.fromSource(fs, WatermarkStrategy.noWatermarks(),
+ "FlinkDataStreamFileSource[" + this.getInputUrl() + "]");
+
+ output.accept(dataStream);
+
+ final ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
+ prepareLineageNode.add(LoadProfileEstimators.createFromSpecification(
+ "wayang.flink.textfilesource.load.prepare", flinkExecutor.getConfiguration()));
+
+ final ExecutionLineageNode mainLineageNode = new ExecutionLineageNode(operatorContext);
+ mainLineageNode.add(LoadProfileEstimators.createFromSpecification(
+ "wayang.flink.textfilesource.load.main", flinkExecutor.getConfiguration()));
+
+ output.getLineage().addPredecessor(mainLineageNode);
+
+ return prepareLineageNode.collectAndMark();
+ }
+
+ @Override
+ public boolean containsAction() {
+ throw new UnsupportedOperationException("Unimplemented method 'containsAction'");
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamCollectionSink.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamCollectionSink.java
new file mode 100644
index 000000000..92aa97083
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamCollectionSink.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.commons.lang3.Validate;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
+import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.flink.channels.DataStreamChannel;
+import org.apache.wayang.flink.execution.FlinkExecutor;
+import org.apache.wayang.java.channels.CollectionChannel;
+
+/**
+ * Converts {@link DataStreamChannel} into a {@link CollectionChannel}
+ */
+public class FlinkDataStreamCollectionSink extends UnaryToUnaryOperator implements FlinkExecutionOperator {
+ public FlinkDataStreamCollectionSink(final DataSetType inputType) {
+ super(inputType, inputType, false);
+ }
+
+ @Override
+ public boolean isSink() {
+ return true;
+ }
+
+ @Override
+ public Tuple, Collection> evaluate(final ChannelInstance[] inputs,
+ final ChannelInstance[] outputs, final FlinkExecutor flinkExecutor,
+ final OptimizationContext.OperatorContext operatorContext)
+ throws Exception {
+ assert inputs.length == this.getNumInputs();
+ assert outputs.length == this.getNumOutputs();
+
+ final DataStreamChannel.Instance input = (DataStreamChannel.Instance) inputs[0];
+ final CollectionChannel.Instance output = (CollectionChannel.Instance) outputs[0];
+
+ final DataStream dataStreamInput = input.provideDataStream();
+ final List collection = new ArrayList<>();
+ dataStreamInput.executeAndCollect().forEachRemaining(collection::add);
+
+ output.accept(collection);
+
+ return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
+ }
+
+ @Override
+ public boolean containsAction() {
+ return true;
+ }
+
+ @Override
+ public List getSupportedInputChannels(final int index) {
+ return Collections.singletonList(DataStreamChannel.DESCRIPTOR);
+ }
+
+ @Override
+ public List getSupportedOutputChannels(final int index) {
+ return Collections.singletonList(CollectionChannel.DESCRIPTOR);
+ }
+
+ @Override
+ public Optional createCardinalityEstimator(
+ final int outputIndex,
+ final Configuration configuration) {
+ Validate.inclusiveBetween(0, 0, outputIndex);
+ return Optional.of(new DefaultCardinalityEstimator(1d, 1, this.isSupportingBroadcastInputs(),
+ inputCards -> inputCards[0]));
+ }
+
+ @Override
+ public String getLoadProfileEstimatorConfigurationKey() {
+ return "wayang.flink.collect.load";
+ }
+
+ @Override
+ public boolean isConversion() {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamJoinOperator.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamJoinOperator.java
new file mode 100644
index 000000000..0013f3e7f
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamJoinOperator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.basic.function.ProjectionDescriptor;
+import org.apache.wayang.basic.operators.JoinOperator;
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction;
+import org.apache.wayang.core.optimizer.OptimizationContext.OperatorContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.flink.channels.DataSetChannel;
+import org.apache.wayang.flink.channels.DataStreamChannel;
+import org.apache.wayang.flink.compiler.FunctionCompiler;
+import org.apache.wayang.flink.execution.FlinkExecutor;
+
+public class FlinkDataStreamJoinOperator extends JoinOperator implements FlinkExecutionOperator {
+ class Joiner implements JoinFunction> {
+ @Override
+ public Tuple2 join(final I0 first, final I1 second) throws Exception {
+ return new Tuple2<>(first, second);
+ }
+ }
+
+ final WatermarkStrategy leftWatermarkStrategy;
+ final WatermarkStrategy rightWatermarkStrategy;
+ final Duration duration;
+
+ public FlinkDataStreamJoinOperator(final JoinOperator that) {
+ super(that);
+ this.leftWatermarkStrategy = WatermarkStrategy
+ .forMonotonousTimestamps()
+ .withTimestampAssigner((e, ts) -> 0L);
+ this.rightWatermarkStrategy = WatermarkStrategy
+ .forMonotonousTimestamps()
+ .withTimestampAssigner((e, ts) -> 0L);
+ this.duration = Duration.ofDays(365);
+ }
+
+ public FlinkDataStreamJoinOperator(final ProjectionDescriptor descriptor0,
+ final ProjectionDescriptor descriptor1) {
+ this(descriptor0, descriptor1,
+ WatermarkStrategy
+ .forMonotonousTimestamps()
+ .withTimestampAssigner((e, ts) -> 0L),
+ WatermarkStrategy
+ .forMonotonousTimestamps()
+ .withTimestampAssigner((e, ts) -> 0L),
+ Duration.ofDays(365));
+ ;
+ }
+
+ public FlinkDataStreamJoinOperator(final ProjectionDescriptor descriptor0,
+ final ProjectionDescriptor descriptor1, final WatermarkStrategy leftWatermarkStrategy,
+ final WatermarkStrategy rightWatermarkStrategy, final Duration duration) {
+ super(descriptor0, descriptor1);
+ this.leftWatermarkStrategy = leftWatermarkStrategy;
+ this.rightWatermarkStrategy = rightWatermarkStrategy;
+ this.duration = duration;
+ }
+
+ public FlinkDataStreamJoinOperator(final SerializableFunction keyExtractor0,
+ final SerializableFunction keyExtractor1, final Class input0Class, final Class input1Class,
+ final Class keyClass, final WatermarkStrategy leftWatermarkStrategy,
+ final WatermarkStrategy rightWatermarkStrategy, final Duration duration) {
+ super(keyExtractor0, keyExtractor1, input0Class, input1Class, keyClass);
+
+ this.leftWatermarkStrategy = leftWatermarkStrategy;
+ this.rightWatermarkStrategy = rightWatermarkStrategy;
+ this.duration = duration;
+ }
+
+ @Override
+ public Tuple, Collection> evaluate(final ChannelInstance[] inputs,
+ final ChannelInstance[] kputs, final FlinkExecutor flinkExecutor, final OperatorContext operatorContext)
+ throws Exception {
+ assert inputs.length == this.getNumInputs();
+ assert kputs.length == this.getNumOutputs();
+
+ final DataStreamChannel.Instance input0 = (DataStreamChannel.Instance) inputs[0];
+ final DataStreamChannel.Instance input1 = (DataStreamChannel.Instance) inputs[1];
+ final DataStreamChannel.Instance output = (DataStreamChannel.Instance) kputs[0];
+
+ final DataStream dataStream0 = input0.provideDataStream();
+ final DataStream dataStream1 = input1.provideDataStream();
+
+ final DataStream> outputStream = dataStream0
+ .assignTimestampsAndWatermarks(leftWatermarkStrategy)
+ .join(dataStream1.assignTimestampsAndWatermarks(rightWatermarkStrategy))
+ .where(FunctionCompiler.compileKeySelector(keyDescriptor0))
+ .equalTo(FunctionCompiler.compileKeySelector(keyDescriptor1))
+ .window(TumblingEventTimeWindows.of(duration))
+ .apply(new Joiner());
+
+ output.accept(outputStream);
+
+ return ExecutionOperator.modelLazyExecution(inputs, kputs, operatorContext);
+ }
+
+ @Override
+ public boolean containsAction() {
+ return false;
+ }
+
+ @Override
+ public List getSupportedInputChannels(final int index) {
+ assert index <= this.getNumInputs() || (index == 0 && this.getNumInputs() == 0);
+ return Arrays.asList(DataSetChannel.DESCRIPTOR, DataSetChannel.DESCRIPTOR_MANY);
+ }
+
+ @Override
+ public List getSupportedOutputChannels(final int index) {
+ assert index <= this.getNumOutputs() || (index == 0 && this.getNumOutputs() == 0);
+ // return Collections.singletonList(DataSetChannel.DESCRIPTOR);
+ return Arrays.asList(DataSetChannel.DESCRIPTOR, DataSetChannel.DESCRIPTOR_MANY);
+ }
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamLocalCallbackSink.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamLocalCallbackSink.java
new file mode 100644
index 000000000..d71115ae3
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamLocalCallbackSink.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.wayang.basic.operators.LocalCallbackSink;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.flink.channels.DataStreamChannel;
+import org.apache.wayang.flink.execution.FlinkExecutor;
+
+/**
+ * Implementation of the {@link LocalCallbackSink} operator for the Flink
+ * platform.
+ */
+
+public class FlinkDataStreamLocalCallbackSink extends LocalCallbackSink
+ implements FlinkExecutionOperator {
+
+ /**
+ * Copies an instance (exclusive of broadcasts).
+ *
+ * @param that that should be copied
+ */
+ public FlinkDataStreamLocalCallbackSink(final LocalCallbackSink that) {
+ super(that);
+ }
+
+ @Override
+ public Tuple, Collection> evaluate(
+ final ChannelInstance[] inputs,
+ final ChannelInstance[] outputs,
+ final FlinkExecutor flinkExecutor,
+ final OptimizationContext.OperatorContext operatorContext) throws Exception {
+ assert inputs.length == this.getNumInputs();
+ assert outputs.length == this.getNumOutputs();
+
+ final DataStreamChannel.Instance input = (DataStreamChannel.Instance) inputs[0];
+
+ final DataStream dataStreamInput = input.provideDataStream();
+
+ if (this.collector != null) {
+ dataStreamInput.executeAndCollect().forEachRemaining(this.collector::add);
+ } else {
+ dataStreamInput.print();
+ }
+
+ return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
+ }
+
+ @Override
+ public String getLoadProfileEstimatorConfigurationKey() {
+ return "wayang.flink.localcallbacksink.load";
+ }
+
+ @Override
+ public List getSupportedInputChannels(final int index) {
+ assert index <= this.getNumInputs() || (index == 0 && this.getNumInputs() == 0);
+ return Arrays.asList(DataStreamChannel.DESCRIPTOR, DataStreamChannel.DESCRIPTOR_MANY);
+ }
+
+ @Override
+ public List getSupportedOutputChannels(final int index) {
+ throw new UnsupportedOperationException(String.format("%s does not have output channels.", this));
+ }
+
+ @Override
+ public boolean containsAction() {
+ return true;
+ }
+
+ @Override
+ protected ExecutionOperator createCopy() {
+ return new FlinkLocalCallbackSink(this);
+ }
+}
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamMapOperator.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamMapOperator.java
new file mode 100644
index 000000000..12491378d
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/operators/FlinkDataStreamMapOperator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.wayang.basic.operators.MapOperator;
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction;
+import org.apache.wayang.core.optimizer.OptimizationContext.OperatorContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.flink.channels.DataStreamChannel;
+import org.apache.wayang.flink.compiler.FunctionCompiler;
+import org.apache.wayang.flink.execution.FlinkExecutor;
+
+public class FlinkDataStreamMapOperator extends MapOperator implements FlinkExecutionOperator {
+ public FlinkDataStreamMapOperator(final SerializableFunction function, final Class inputTypeClass, final Class outputTypeClass) {
+ super(function, inputTypeClass, outputTypeClass);
+ }
+
+ public FlinkDataStreamMapOperator(final MapOperator that) {
+ super(that);
+ }
+
+ @Override
+ public List getSupportedInputChannels(final int index) {
+ assert index <= this.getNumInputs() || (index == 0 && this.getNumInputs() == 0);
+ return Arrays.asList(DataStreamChannel.DESCRIPTOR, DataStreamChannel.DESCRIPTOR_MANY);
+ }
+
+ @Override
+ public List getSupportedOutputChannels(final int index) {
+ assert index <= this.getNumInputs() || (index == 0 && this.getNumInputs() == 0);
+ return Arrays.asList(DataStreamChannel.DESCRIPTOR, DataStreamChannel.DESCRIPTOR_MANY);
+ }
+
+ @Override
+ public Tuple, Collection> evaluate(final ChannelInstance[] inputs,
+ final ChannelInstance[] outputs, final FlinkExecutor flinkExecutor, final OperatorContext operatorContext) throws Exception {
+ final DataStreamChannel.Instance input = (DataStreamChannel.Instance) inputs[0];
+ final DataStreamChannel.Instance output = (DataStreamChannel.Instance) outputs[0];
+
+ final DataStream stream = input.provideDataStream();
+ final MapFunction mapper = FunctionCompiler.compile(this.functionDescriptor);
+ final DataStream outputStream = stream.map(mapper).returns(this.getOutputType().getDataUnitType().getTypeClass());
+
+ output.accept(outputStream);
+
+ return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext);
+ }
+
+ @Override
+ public boolean containsAction() {
+ return false;
+ }
+
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/platform/FlinkPlatform.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/platform/FlinkPlatform.java
index f3685d92e..d8ab2c926 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/platform/FlinkPlatform.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/platform/FlinkPlatform.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.wayang.basic.plugin.WayangBasic;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.Job;
@@ -60,14 +61,6 @@ public class FlinkPlatform extends Platform {
private static final String[] OPTIONAL_FLINK_PROPERTIES = {
};
- /**
- * Lazy-initialized. Maintains a reference to a {@link ExecutionEnvironment}. This instance's reference, however,
- * does not hold a counted reference, so it might be disposed.
- */
- private FlinkContextReference flinkContextReference = null;
-
- private Logger logger = LogManager.getLogger(this.getClass());
-
public static FlinkPlatform getInstance() {
if (instance == null) {
instance = new FlinkPlatform();
@@ -75,57 +68,71 @@ public static FlinkPlatform getInstance() {
return instance;
}
+ public StreamExecutionEnvironment streamExecutionEnvironment = null;
+
+ /**
+ * Lazy-initialized. Maintains a reference to a
+ * {@link ExecutionEnvironment}. This instance's reference, however,
+ * does not hold a counted reference, so it might be disposed.
+ */
+ private FlinkContextReference flinkContextReference = null;
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
private FlinkPlatform() {
super(PLATFORM_NAME, CONFIG_NAME);
}
/**
- * Configures the single maintained {@link ExecutionEnvironment} according to the {@code job} and returns it.
+ * Configures the single maintained {@link ExecutionEnvironment} according to
+ * the {@code job} and returns it.
*
- * @return a {@link FlinkContextReference} wrapping the {@link ExecutionEnvironment}
+ * @return a {@link FlinkContextReference} wrapping the
+ * {@link ExecutionEnvironment}
*/
- public FlinkContextReference getFlinkContext(Job job) {
- Configuration conf = job.getConfiguration();
- String[] jars = getJars(job);
+ public FlinkContextReference getFlinkContext(final Job job) {
+ final Configuration conf = job.getConfiguration();
+ final String[] jars = getJars(job);
- if(this.flinkContextReference == null)
+ if (this.flinkContextReference == null)
switch (conf.getStringProperty("wayang.flink.mode.run")) {
- case "local":
- this.flinkContextReference = new FlinkContextReference(
- job.getCrossPlatformExecutor(),
- ExecutionEnvironment.getExecutionEnvironment(),
- (int) conf.getLongProperty("wayang.flink.parallelism")
- );
- break;
- case "distribution":
- org.apache.flink.configuration.Configuration flinkConfig = new org.apache.flink.configuration.Configuration();
- flinkConfig.setString("rest.client.max-content-length", "1000000000");
- this.flinkContextReference = new FlinkContextReference(
- job.getCrossPlatformExecutor(),
- ExecutionEnvironment.createRemoteEnvironment(
- conf.getStringProperty("wayang.flink.master"),
- Integer.parseInt(conf.getStringProperty("wayang.flink.port")),
- flinkConfig,
- jars
- ),
- (int)conf.getLongProperty("wayang.flink.parallelism")
- );
- break;
- case "collection":
- default:
- this.flinkContextReference = new FlinkContextReference(
- job.getCrossPlatformExecutor(),
- new CollectionEnvironment(),
- 1
- );
- break;
- }
+ case "local":
+ this.flinkContextReference = new FlinkContextReference(
+ job.getCrossPlatformExecutor(),
+ ExecutionEnvironment.getExecutionEnvironment(),
+ (int) conf.getLongProperty("wayang.flink.parallelism"));
+ break;
+ case "distribution":
+ final org.apache.flink.configuration.Configuration flinkConfig = new org.apache.flink.configuration.Configuration();
+ flinkConfig.setString("rest.client.max-content-length", "1000000000");
+ this.flinkContextReference = new FlinkContextReference(
+ job.getCrossPlatformExecutor(),
+ ExecutionEnvironment.createRemoteEnvironment(
+ conf.getStringProperty("wayang.flink.master"),
+ Integer.parseInt(conf.getStringProperty("wayang.flink.port")),
+ flinkConfig,
+ jars),
+ (int) conf.getLongProperty("wayang.flink.parallelism"));
+ this.streamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment(
+ conf.getStringProperty("wayang.flink.master"),
+ Integer.parseInt(conf.getStringProperty("wayang.flink.port")),
+ flinkConfig,
+ jars);
+ break;
+ case "collection":
+ default:
+ this.flinkContextReference = new FlinkContextReference(
+ job.getCrossPlatformExecutor(),
+ new CollectionEnvironment(),
+ 1);
+ break;
+ }
return this.flinkContextReference;
}
@Override
- public void configureDefaults(Configuration configuration) {
+ public void configureDefaults(final Configuration configuration) {
configuration.load(ReflectionUtils.loadResource(DEFAULT_CONFIG_FILE));
}
@@ -135,40 +142,36 @@ public Executor.Factory getExecutorFactory() {
}
@Override
- public LoadProfileToTimeConverter createLoadProfileToTimeConverter(Configuration configuration) {
- int cpuMhz = (int) configuration.getLongProperty("wayang.flink.cpu.mhz");
- int numCores = (int) ( configuration.getLongProperty("wayang.flink.parallelism"));
- double hdfsMsPerMb = configuration.getDoubleProperty("wayang.flink.hdfs.ms-per-mb");
- double networkMsPerMb = configuration.getDoubleProperty("wayang.flink.network.ms-per-mb");
- double stretch = configuration.getDoubleProperty("wayang.flink.stretch");
+ public LoadProfileToTimeConverter createLoadProfileToTimeConverter(final Configuration configuration) {
+ final int cpuMhz = (int) configuration.getLongProperty("wayang.flink.cpu.mhz");
+ final int numCores = (int) (configuration.getLongProperty("wayang.flink.parallelism"));
+ final double hdfsMsPerMb = configuration.getDoubleProperty("wayang.flink.hdfs.ms-per-mb");
+ final double networkMsPerMb = configuration.getDoubleProperty("wayang.flink.network.ms-per-mb");
+ final double stretch = configuration.getDoubleProperty("wayang.flink.stretch");
return LoadProfileToTimeConverter.createTopLevelStretching(
LoadToTimeConverter.createLinearCoverter(1 / (numCores * cpuMhz * 1000d)),
LoadToTimeConverter.createLinearCoverter(hdfsMsPerMb / 1000000d),
LoadToTimeConverter.createLinearCoverter(networkMsPerMb / 1000000d),
(cpuEstimate, diskEstimate, networkEstimate) -> cpuEstimate.plus(diskEstimate).plus(networkEstimate),
- stretch
- );
+ stretch);
}
@Override
- public TimeToCostConverter createTimeToCostConverter(Configuration configuration) {
+ public TimeToCostConverter createTimeToCostConverter(final Configuration configuration) {
return new TimeToCostConverter(
configuration.getDoubleProperty("wayang.flink.costs.fix"),
- configuration.getDoubleProperty("wayang.flink.costs.per-ms")
- );
+ configuration.getDoubleProperty("wayang.flink.costs.per-ms"));
}
-
- private String[] getJars(Job job){
- List jars = new ArrayList<>();
- List clazzs = Arrays.asList(new Class[]{FlinkPlatform.class, WayangBasic.class, WayangContext.class});
+ private String[] getJars(final Job job) {
+ final List jars = new ArrayList<>();
+ final List clazzs = Arrays
+ .asList(new Class[] { FlinkPlatform.class, WayangBasic.class, WayangContext.class });
clazzs.stream().map(
- ReflectionUtils::getDeclaringJar
- ).filter(
- element -> element != null
- ).forEach(jars::add);
-
+ ReflectionUtils::getDeclaringJar).filter(
+ element -> element != null)
+ .forEach(jars::add);
final Set udfJarPaths = job.getUdfJarPaths();
if (udfJarPaths.isEmpty()) {
diff --git a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/plugin/FlinkBasicPlugin.java b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/plugin/FlinkBasicPlugin.java
index bc7df27d6..d2cf564fd 100644
--- a/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/plugin/FlinkBasicPlugin.java
+++ b/wayang-platforms/wayang-flink/src/main/java/org/apache/wayang/flink/plugin/FlinkBasicPlugin.java
@@ -35,7 +35,9 @@
/**
* This {@link Plugin} enables to use the basic Wayang {@link Operator}s on the {@link FlinkPlatform}.
*/
-public class FlinkBasicPlugin implements Plugin{
+public class FlinkBasicPlugin implements Plugin {
+ boolean useBoundedDataStreams = false;
+
@Override
public Collection getRequiredPlatforms() {
return Arrays.asList(FlinkPlatform.getInstance(), JavaPlatform.getInstance());
@@ -43,7 +45,8 @@ public Collection getRequiredPlatforms() {
@Override
public Collection getMappings() {
- return Mappings.BASIC_MAPPINGS;
+
+ return useBoundedDataStreams ? Mappings.BOUNDED_STREAM_MAPPINGS : Mappings.BASIC_MAPPINGS;
}
@Override
@@ -53,6 +56,6 @@ public Collection getChannelConversions() {
@Override
public void setProperties(Configuration configuration) {
-
+ useBoundedDataStreams = configuration.getBooleanProperty("wayang.flink.platforms.useDataStreams", false);
}
}
diff --git a/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/operators/FlinkDataStreamTests.java b/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/operators/FlinkDataStreamTests.java
new file mode 100644
index 000000000..e56253769
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/operators/FlinkDataStreamTests.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.flink.operators;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.basic.function.ProjectionDescriptor;
+import org.apache.wayang.basic.operators.LocalCallbackSink;
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.types.DataUnitType;
+import org.apache.wayang.flink.channels.DataStreamChannel;
+import org.apache.wayang.java.channels.CollectionChannel;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+
+public class FlinkDataStreamTests extends FlinkOperatorTestBase {
+ @Test
+ public void boundedSourceTest() throws Exception {
+ final String path = FlinkDataStreamTests.class.getResource("dataStreamTest.txt").getPath();
+
+ final FlinkBoundedTextFileSource collectionSource = new FlinkBoundedTextFileSource(path);
+ final DataStreamChannel.Instance output = this.createDataStreamChannelInstance();
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] inputs = new ChannelInstance[] {};
+ final ChannelInstance[] outputs = new ChannelInstance[] { output };
+
+ // Execute.
+ this.evaluate(collectionSource, inputs, outputs);
+
+ final DataStream stream = output.provideDataStream();
+ final Iterator str = stream.executeAndCollect();
+
+ final ArrayList collection = new ArrayList<>();
+ str.forEachRemaining(collection::add);
+
+ assertTrue(collection.size() > 0);
+ }
+
+ @Test
+ public void localcallbackSinkTest() throws Exception {
+ final String path = FlinkDataStreamTests.class.getResource("dataStreamTest.txt").getPath();
+
+ final FlinkBoundedTextFileSource collectionSource = new FlinkBoundedTextFileSource(path);
+ final DataStreamChannel.Instance output = this.createDataStreamChannelInstance();
+
+ final List collection = new ArrayList<>();
+ final FlinkDataStreamLocalCallbackSink sink = new FlinkDataStreamLocalCallbackSink<>(LocalCallbackSink.createCollectingSink(collection, DataSetType.createDefault(String.class)));
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] sourceInputs = new ChannelInstance[] {};
+ final ChannelInstance[] sourceOutputs = new ChannelInstance[] { output };
+ final ChannelInstance[] sinkInputs = new ChannelInstance[] { output };
+ final ChannelInstance[] sinkOutputs = new ChannelInstance[] { };
+
+ // Execute.
+ this.evaluate(collectionSource, sourceInputs, sourceOutputs);
+ this.evaluate(sink, sinkInputs, sinkOutputs);
+
+ assertTrue(collection.isEmpty() == false);
+ }
+
+ @Test
+ public void mapTest() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ // Set up channels
+ final DataStreamChannel.Instance input = this.createDataStreamChannelInstance();
+ input.accept(env.fromData(1, 2, 3, 4));
+ final DataStreamChannel.Instance output = this.createDataStreamChannelInstance();
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] inputs = new ChannelInstance[] { input };
+ final ChannelInstance[] outputs = new ChannelInstance[] { output };
+
+ // Set up MapOperator
+ final SerializableFunction add = i -> i + 5;
+ final FlinkDataStreamMapOperator map = new FlinkDataStreamMapOperator(add,
+ Integer.class, Integer.class);
+
+ // Execute.
+ this.evaluate(map, inputs, outputs);
+
+ final DataStream stream = output.provideDataStream();
+ final Iterator ints = stream.executeAndCollect();
+
+ final ArrayList collection = new ArrayList<>();
+ ints.forEachRemaining(collection::add);
+
+ assertTrue(collection.stream().allMatch(i -> i > 5));
+ }
+
+ @Test
+ public void javaConversion() throws Exception {
+ final String path = FlinkDataStreamTests.class.getResource("dataStreamTest.txt").getPath();
+
+ final FlinkBoundedTextFileSource collectionSource = new FlinkBoundedTextFileSource(path);
+ final DataStreamChannel.Instance sourceOutput = this.createDataStreamChannelInstance();
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] sourceInputs = new ChannelInstance[] {};
+ final ChannelInstance[] sourceOutputs = new ChannelInstance[] { sourceOutput };
+
+ // Execute.
+ this.evaluate(collectionSource, sourceInputs, sourceOutputs);
+
+ final FlinkDataStreamCollectionSink collectionSink = new FlinkDataStreamCollectionSink<>(
+ DataSetType.createDefault(String.class));
+ final CollectionChannel.Instance sinkOutput = this.createCollectionChannelInstance();
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] sinkInputs = new ChannelInstance[] { sourceOutput };
+ final ChannelInstance[] sinkOutputs = new ChannelInstance[] { sinkOutput };
+
+ // Execute.
+ this.evaluate(collectionSink, sinkInputs, sinkOutputs);
+
+ assertTrue(sinkOutput.provideCollection().size() > 0);
+ }
+
+ @RepeatedTest(5)
+ public void joinTest() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ // Set up channels
+ final DataStreamChannel.Instance input1 = this.createDataStreamChannelInstance();
+ input1.accept(
+ env.fromData(new Tuple2<>(1, "b"), new Tuple2<>(1, "c"), new Tuple2<>(2, "d"), new Tuple2<>(3, "e")));
+ final DataStreamChannel.Instance input2 = this.createDataStreamChannelInstance();
+ input2.accept(
+ env.fromData(new Tuple2<>("x", 1), new Tuple2<>("y", 1), new Tuple2<>("z", 2), new Tuple2<>("w", 4)));
+
+ final DataStreamChannel.Instance output = this.createDataStreamChannelInstance();
+
+ // Set up the ChannelInstances.
+ final ChannelInstance[] inputs = new ChannelInstance[] { input1, input2 };
+ final ChannelInstance[] outputs = new ChannelInstance[] { output };
+
+ // Set up JoinOperator
+ final ProjectionDescriptor, Integer> left = new ProjectionDescriptor<>(
+ DataUnitType.createBasicUnchecked(Tuple2.class),
+ DataUnitType.createBasic(Integer.class),
+ "field0");
+ final ProjectionDescriptor, Integer> right = new ProjectionDescriptor<>(
+ DataUnitType.createBasicUnchecked(Tuple2.class),
+ DataUnitType.createBasic(Integer.class),
+ "field1");
+ final FlinkDataStreamJoinOperator, Tuple2, Integer> join = new FlinkDataStreamJoinOperator<>(
+ left, right);
+
+ // Execute.
+ this.evaluate(join, inputs, outputs);
+
+ final DataStream> stream = output.>provideDataStream();
+ final Iterator> ints = stream.executeAndCollect();
+
+ final ArrayList> collection = new ArrayList<>();
+ ints.forEachRemaining(collection::add);
+
+ assertEquals(5, collection.size());
+ assertTrue(collection.stream()
+ .anyMatch(res -> res.equals(new Tuple2<>(new Tuple2<>(1, "b"), new Tuple2<>("x", 1)))));
+ assertTrue(collection.stream()
+ .anyMatch(res -> res.equals(new Tuple2<>(new Tuple2<>(1, "b"), new Tuple2<>("y", 1)))));
+ assertTrue(collection.stream()
+ .anyMatch(res -> res.equals(new Tuple2<>(new Tuple2<>(1, "c"), new Tuple2<>("x", 1)))));
+ assertTrue(collection.stream()
+ .anyMatch(res -> res.equals(new Tuple2<>(new Tuple2<>(1, "c"), new Tuple2<>("y", 1)))));
+ assertTrue(collection.stream()
+ .anyMatch(res -> res.equals(new Tuple2<>(new Tuple2<>(2, "d"), new Tuple2<>("z", 2)))));
+ }
+}
\ No newline at end of file
diff --git a/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java b/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java
index bf0672d20..f19ae2903 100644
--- a/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java
+++ b/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/operators/FlinkOperatorTestBase.java
@@ -18,7 +18,13 @@
package org.apache.wayang.flink.operators;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collection;
+
import org.apache.flink.api.java.ExecutionEnvironment;
+
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.Job;
import org.apache.wayang.core.optimizer.DefaultOptimizationContext;
@@ -28,17 +34,13 @@
import org.apache.wayang.core.platform.CrossPlatformExecutor;
import org.apache.wayang.core.profiling.FullInstrumentationStrategy;
import org.apache.wayang.flink.channels.DataSetChannel;
+import org.apache.wayang.flink.channels.DataStreamChannel;
import org.apache.wayang.flink.execution.FlinkExecutor;
import org.apache.wayang.flink.platform.FlinkPlatform;
import org.apache.wayang.flink.test.ChannelFactory;
import org.apache.wayang.java.channels.CollectionChannel;
-import org.apache.wayang.flink.operators.FlinkExecutionOperator;
-import org.junit.jupiter.api.BeforeEach;
-
-import java.util.Collection;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.junit.jupiter.api.BeforeEach;
/**
* Test base for {@link FlinkExecutionOperator} tests.
@@ -49,51 +51,55 @@ public class FlinkOperatorTestBase {
protected FlinkExecutor flinkExecutor;
- @BeforeEach
- void setUp(){
- this.configuration = new Configuration();
- if(flinkExecutor == null)
- this.flinkExecutor = (FlinkExecutor) FlinkPlatform.getInstance().getExecutorFactory().create(this.mockJob());
- }
-
- Job mockJob() {
- final Job job = mock(Job.class);
- when(job.getConfiguration()).thenReturn(this.configuration);
- when(job.getCrossPlatformExecutor()).thenReturn(new CrossPlatformExecutor(job, new FullInstrumentationStrategy()));
- return job;
+ public ExecutionEnvironment getEnv() {
+ return this.flinkExecutor.fee;
}
- protected OptimizationContext.OperatorContext createOperatorContext(Operator operator) {
- OptimizationContext optimizationContext = new DefaultOptimizationContext(mockJob());
+ protected OptimizationContext.OperatorContext createOperatorContext(final Operator operator) {
+ final OptimizationContext optimizationContext = new DefaultOptimizationContext(mockJob());
return optimizationContext.addOneTimeOperator(operator);
}
-
- protected void evaluate(FlinkExecutionOperator operator,
- ChannelInstance[] inputs,
- ChannelInstance[] outputs) throws Exception {
+ protected void evaluate(final FlinkExecutionOperator operator,
+ final ChannelInstance[] inputs,
+ final ChannelInstance[] outputs) throws Exception {
operator.evaluate(inputs, outputs, this.flinkExecutor, this.createOperatorContext(operator));
}
+ protected CollectionChannel.Instance createCollectionChannelInstance() {
+ return ChannelFactory.createCollectionChannelInstance(this.configuration);
+ }
- DataSetChannel.Instance createDataSetChannelInstance() {
- return ChannelFactory.createDataSetChannelInstance(this.configuration);
+ protected CollectionChannel.Instance createCollectionChannelInstance(final Collection> collection) {
+ return ChannelFactory.createCollectionChannelInstance(collection, this.configuration);
}
- DataSetChannel.Instance createDataSetChannelInstance(Collection> collection) {
- return ChannelFactory.createDataSetChannelInstance(collection, this.flinkExecutor, this.configuration);
+ @BeforeEach
+ void setUp() {
+ this.configuration = new Configuration();
+ if (flinkExecutor == null)
+ this.flinkExecutor = (FlinkExecutor) FlinkPlatform.getInstance().getExecutorFactory()
+ .create(this.mockJob());
}
- protected CollectionChannel.Instance createCollectionChannelInstance() {
- return ChannelFactory.createCollectionChannelInstance(this.configuration);
+ Job mockJob() {
+ final Job job = mock(Job.class);
+ when(job.getConfiguration()).thenReturn(this.configuration);
+ when(job.getCrossPlatformExecutor())
+ .thenReturn(new CrossPlatformExecutor(job, new FullInstrumentationStrategy()));
+ return job;
}
- protected CollectionChannel.Instance createCollectionChannelInstance(Collection> collection) {
- return ChannelFactory.createCollectionChannelInstance(collection, this.configuration);
+ DataSetChannel.Instance createDataSetChannelInstance() {
+ return ChannelFactory.createDataSetChannelInstance(this.configuration);
}
- public ExecutionEnvironment getEnv() {
- return this.flinkExecutor.fee;
+ DataSetChannel.Instance createDataSetChannelInstance(final Collection> collection) {
+ return ChannelFactory.createDataSetChannelInstance(collection, this.flinkExecutor, this.configuration);
+ }
+
+ DataStreamChannel.Instance createDataStreamChannelInstance() {
+ return ChannelFactory.createDataStreamChannelInstance(this.configuration);
}
}
diff --git a/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/test/ChannelFactory.java b/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/test/ChannelFactory.java
index 1c49ed10e..3befe17ff 100644
--- a/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/test/ChannelFactory.java
+++ b/wayang-platforms/wayang-flink/src/test/java/org/apache/wayang/flink/test/ChannelFactory.java
@@ -18,18 +18,19 @@
package org.apache.wayang.flink.test;
+import static org.mockito.Mockito.mock;
+
+import java.util.Collection;
+
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.flink.channels.DataSetChannel;
+import org.apache.wayang.flink.channels.DataStreamChannel;
+import org.apache.wayang.flink.execution.FlinkExecutor;
import org.apache.wayang.java.channels.CollectionChannel;
import org.junit.jupiter.api.BeforeEach;
-import org.apache.wayang.flink.execution.FlinkExecutor;
-
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
/**
* Utility to create {@link Channel}s in tests.
@@ -38,39 +39,52 @@ public class ChannelFactory {
private static FlinkExecutor flinkExecutor;
- @BeforeEach
- void setUp() {
- flinkExecutor = mock(FlinkExecutor.class);
- }
-
- public static DataSetChannel.Instance createDataSetChannelInstance(ChannelDescriptor dataSetChannelDescriptor, Configuration configuration){
+ public static DataSetChannel.Instance createDataSetChannelInstance(final ChannelDescriptor dataSetChannelDescriptor,
+ final Configuration configuration) {
return (DataSetChannel.Instance) dataSetChannelDescriptor
- .createChannel(null,configuration)
- .createInstance(flinkExecutor,null,-1);
+ .createChannel(null, configuration)
+ .createInstance(flinkExecutor, null, -1);
}
- public static DataSetChannel.Instance createDataSetChannelInstance(Configuration configuration){
+ public static DataSetChannel.Instance createDataSetChannelInstance(final Configuration configuration) {
return createDataSetChannelInstance(DataSetChannel.DESCRIPTOR, configuration);
}
- public static DataSetChannel.Instance createDataSetChannelInstance(Collection> data,
- FlinkExecutor flinkExecutor,
- Configuration configuration){
- DataSetChannel.Instance instance = createDataSetChannelInstance(configuration);
- instance.accept(flinkExecutor.fee.fromCollection(WayangCollections.asList(data)),flinkExecutor);
+ public static DataSetChannel.Instance createDataSetChannelInstance(final Collection> data,
+ final FlinkExecutor flinkExecutor,
+ final Configuration configuration) {
+ final DataSetChannel.Instance instance = createDataSetChannelInstance(configuration);
+ instance.accept(flinkExecutor.fee.fromCollection(WayangCollections.asList(data)), flinkExecutor);
return instance;
}
- public static CollectionChannel.Instance createCollectionChannelInstance(Configuration configuration) {
+ public static CollectionChannel.Instance createCollectionChannelInstance(final Configuration configuration) {
return (CollectionChannel.Instance) CollectionChannel.DESCRIPTOR
.createChannel(null, configuration)
.createInstance(flinkExecutor, null, -1);
}
- public static CollectionChannel.Instance createCollectionChannelInstance(Collection> collection, Configuration configuration) {
- CollectionChannel.Instance instance = createCollectionChannelInstance(configuration);
+ public static CollectionChannel.Instance createCollectionChannelInstance(final Collection> collection,
+ final Configuration configuration) {
+ final CollectionChannel.Instance instance = createCollectionChannelInstance(configuration);
instance.accept(collection);
return instance;
}
+ public static DataStreamChannel.Instance createDataStreamChannelInstance(
+ final ChannelDescriptor dataStreamChannelDescriptor, final Configuration configuration) {
+ return (DataStreamChannel.Instance) dataStreamChannelDescriptor
+ .createChannel(null, configuration)
+ .createInstance(flinkExecutor, null, -1);
+ }
+
+ public static DataStreamChannel.Instance createDataStreamChannelInstance(final Configuration configuration) {
+ return createDataStreamChannelInstance(DataStreamChannel.DESCRIPTOR, configuration);
+ }
+
+ @BeforeEach
+ void setUp() {
+ flinkExecutor = mock(FlinkExecutor.class);
+ }
+
}
diff --git a/wayang-platforms/wayang-flink/src/test/resources/org/apache/wayang/flink/operators/dataStreamTest.txt b/wayang-platforms/wayang-flink/src/test/resources/org/apache/wayang/flink/operators/dataStreamTest.txt
new file mode 100644
index 000000000..84058e52e
--- /dev/null
+++ b/wayang-platforms/wayang-flink/src/test/resources/org/apache/wayang/flink/operators/dataStreamTest.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+col1,col2,col3
+col4,col5,col6
\ No newline at end of file