[BAHIR-234] Add ClickHouse Connector for Flink#85
[BAHIR-234] Add ClickHouse Connector for Flink#85pyscala wants to merge 2 commits intoapache:masterfrom
Conversation
| </dependency> | ||
| </dependencies> | ||
|
|
||
| <build> |
...main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java
Outdated
Show resolved
Hide resolved
...main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java
Outdated
Show resolved
Hide resolved
| /** | ||
| * Created by liufangliang on 2020/4/16. | ||
| */ | ||
| public class ClickHouseTableSinkTest { |
flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java
Outdated
Show resolved
Hide resolved
...tor-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseValidatorTest.java
Outdated
Show resolved
Hide resolved
|
@eskabetxe Is there remained concern on the PR? I'd love to see the PR merged so that I can use it in work :) |
eskabetxe
left a comment
There was a problem hiding this comment.
You should revise your tests, they appear to be executed with a local clickhouse instance
flink-connector-clickhouse/pom.xml
Outdated
| <dependency> | ||
| <groupId>ru.yandex.clickhouse</groupId> | ||
| <artifactId>clickhouse-jdbc</artifactId> | ||
| <version>0.1.50</version> |
There was a problem hiding this comment.
this should be in properties so user can change this version
There was a problem hiding this comment.
thanks for your reply,i will fix it later.
|
|
||
| public class ClickHouseAppendSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction { | ||
| private static final String USERNAME = "user"; | ||
| private static final String PASSWORD = "password"; |
| try { | ||
| Thread.sleep(retryInterval); | ||
| } catch (InterruptedException e) { | ||
| e.printStackTrace(); |
| try { | ||
| copy = new ClickHouseTableSink(address, username, password, database, table, schema, batchSize, commitPadding, retries, retryInterval, ignoreInsertError); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); |
| String[] fieldNames = tableSchema.getFieldNames(); | ||
| String columns = String.join(",", fieldNames); | ||
| String[] questionMark = new String[fieldNames.length]; | ||
| for (int i = 0; i < questionMark.length; i++) { |
There was a problem hiding this comment.
this could be change to a stream map no?
String questionMark = Arrays.stream(fieldNames)
.map(field -> "?")
.reduce((left,right) -> left+","+right)
.get();
| private PreparedStatement pstat; | ||
|
|
||
| @Test | ||
| public void open() throws Exception { |
| for (int i = 0; i < value.getArity(); i++) { | ||
| pstat.setObject(i + 1, value.getField(i)); | ||
| } | ||
|
|
| .retryInterval(3000L); | ||
| Map<String, String> connectorProperties = clickhouse.toConnectorProperties(); | ||
| for (Map.Entry<String, String> entry : connectorProperties.entrySet()) { | ||
| System.out.println(entry.getKey() + ":" + entry.getValue()); |
There was a problem hiding this comment.
we should test that the map is what you expect and not print it to console
There was a problem hiding this comment.
@eskabetxe thanks for your reply, related optimization has been completed, review again.
|
|
||
| @Test | ||
| public void createStreamTableSink() throws Exception { | ||
|
|
There was a problem hiding this comment.
you are testing with local clickhouse?
we should have a testcontainer that create a clickhouse instance to test
Implement Streaming ClickHouseSink,support Flink Table API & Flink SQL for ClickHouse connector
|
@eskabetxe Optimization complete, looking forward to your reply. |
|
How is it going? How to contribute code? |
|
Is this PR still alive? |
Implement Streaming ClickHouseSink,support Flink Table API & Flink SQL
for ClickHouse connector