[FLINK-35989][Connectors/AWS] Log errors on partially failed requests for AWS Kinesis Stream sink#201
[FLINK-35989][Connectors/AWS] Log errors on partially failed requests for AWS Kinesis Stream sink#201aws-nageshvh wants to merge 1 commit intoapache:mainfrom
Conversation
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
| KinesisAsyncClient get(); | ||
|
|
||
| /** Closes any resources held by this provider. */ | ||
| void close(); |
There was a problem hiding this comment.
Should the interface extend Closeable? Doing so allows for try-with-resources.
There was a problem hiding this comment.
Makes sense. Updated the PR
| streamArn, | ||
| kinesisClientProperties, | ||
| states, | ||
| null); |
There was a problem hiding this comment.
Should we have an anonymous function implementation of client provider here which calls buildClient() instead of providing null and handling null in the other constructor method? This will allow for cleaner code in the other constructor by removing the null handling.
There was a problem hiding this comment.
I ended up refactoring the way client/clientprovider are injected and passed between SinkWriter and Sink which should simplify this much further. PTAL
6d937c3 to
f6e0587
Compare
| try { | ||
| kinesisClientProvider.close(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to close the kinesisClientProvider", e); |
There was a problem hiding this comment.
Is there a specific or even generic connector exception which extends RuntimeException that we can use here?
There was a problem hiding this comment.
I found there was a common exception so replaced it with that
| summary.getCount(), | ||
| summary.getExampleMessage()))); |
There was a problem hiding this comment.
These can be implemented as ErrorSummary.toString(). Then, instead of using StringBuilder to construct string, calling toString() on the hashmap should produce a readable string. This way, we'd have less code to maintain.
There was a problem hiding this comment.
makes sense. was in two minds because the format looked off but ended up changing that for better encapsulation
|
|
||
| // Using a single WARN log with aggregated information provides operational | ||
| // visibility into errors without flooding logs in high-throughput scenarios | ||
| LOG.warn("KDS Sink failed to write, " + errorSummary.toString()); |
There was a problem hiding this comment.
Let's use full class name here for searchability/debug-ability. e.g. KinesisStreamsSinkWriter failed to write records: ...
There was a problem hiding this comment.
Intentionally didn't change it and kept the same since it was being used in multiple places. I don't think we need the full class name since it would show up in Logger anyway. I have expanded KDS into Kinesis Data Stream so it's more readable
f6e0587 to
257ef78
Compare
… for AWS Kinesis Stream sink
257ef78 to
25ab387
Compare
|
Rebased from main and resolved all conflicts |
Purpose of the change
[FLINK-35989][Connectors/AWS] Log errors on partially failed requests for AWS Kinesis Stream sink
Verifying this change
This change added tests and can be verified as follows:
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
@Public(Evolving))