Open
Conversation
General:
* Factored out a logging and utils package in attempt to reduce circular
dependencies.
Listener:
* Pulled out non-shard operations from listener (CreateStream,
DeleteStream, etc). This is primarily because it is a much safer to
have the shard set and immutable at listener creation. Operations
like SetShard in previous implementation were dangerous if the shard
iterators have already been retrieved. Non-shard operations are now
moved to the Kinetic object.
* Use an embedded struct (listenerConfig) to share common fields in
Listener and its Config.
* Added a FromKinetic to Listener's Config to import any AWS/relevant
config from an existing Kinetic object.
* Fixed the logLevel implementationt to be a bit more sane.
* Separated out testing of Listener's Config to config_test.go
Added Kinetic:
* Added a Kinetic object that can be used to create Kinesis streams and
Firehose Delivery Streams. Note that since the APIs for Kinesis and
Firehose differ, they can co-exist in Kinetic comfortably.
General Notes:
The new Kinetic struct is completely optional, but may be useful in
situations where you do not know what your ShardId is. For example, you
may be spawning off multiple Listeners to handle all shards, or you may
have an external mechanism to determine which ShardId to listen to. The
Kinetic object can be instantiated and the GetShards method can be used
to obtain a list of all shards. Once you have figured out the ShardId,
you can construct a Listener object using:
import "github.com/rewardStyle/kinetic"
import "github.com/rewardStyle/kinetic/listener"
k := kinetic.New(kinetic.NewConfig().
WithCredentials(accessKey, secretKe, securityToken).
WithRegion("us-east-1"))
l := listener.NewListener(
listener.NewConfig(stream, shard).
FromKinetic(k))
This allows you to use the same config from your Kinetic object to
instantiate the Listener. Alternatively, you can opt to create a
Listener directly:
l := listener.NewListener(
listener.NewConfig(stream, shard).
WithCredentials(accessKey, secretKey, securityToken).
WithRegion("us-east"))
I'm not very satisfied with the amount of code duplication between the
Listener's Config and Kinetic's Config. But, with the prototypal
pattern With*, I could not figure out an easy way to maintain the
chaining of the Config using embedded structs.
Furthermore, I'm not to thrilled at the dependency implementation. I
think it would be easier to allow something like this:
import "github.com/rewardStyle/kinetic"
k := kinetic.New(kinetic.NewConfig().
WithCredentials(accessKey, secretKe, securityToken).
WithRegion("us-east-1"))
l := k.NewListener(stream, shard)
Where:
func (k *Kinetic) NewListener(stream, shard string) (*Listener, error) {
return listener.NewListener(
listener.NewConfig(stream, shard).
FromKinetic(k))
}
But, this would require the kinetic package to import the listener
package. Unfortunately, the listener package also needs to import the
kinetic package. Moving logging and utils out to their separate
packages has helped minimize the circular dependencies, but the listener
test cases still rely directly on the kinetic package.
Finally, the tests do require unexported fields. Despite that being a
smell, I am torn between several forces:
* Several of the unexported variables carry important state that really
should not be tampered with.
* I don't like the idea of adding a bunch of accessors for internal
state variables simply for the purpose of testing. Furthermore, this
actually just makes the public API more unstable, as it couples the
API to the internal implementation of the packages.
* The tests really need access to the unexported fields.
TODOS:
* We need to implement checkpointing such that we don't always start at
TRIM_HORIZON. This would probably be done via dependency injection of
an interface, similar to the StatsListener.
* We need to add support for merging and splitting straem. At minimum,
the Kinetic library should provide a mechanism for the calling
application to decide what to do when a stream closes.
* Use a simpler (more DRY) method for configuring Kinetic and Listener objects. TODO: * There should probably be a recover() inside the New functions to handle any panics inside the supplied configuration function. * Should re-evaluate whether embedding should be done by pointer. Could the caller retain a pointer to the structs and therefore gain access to internal state? Do we care? * Should re-evaluate whether much of the config should be exported or unexported. I've been told recently that exported/unexported isn't really about encapsulation, but rather more about stable vs unstable API. I'm not 100% convinced.
* There is probably a race condition in one of the tests. I've disabled it for now. It may not be worth adding complexity to fix the test.
- Use a timeout (bleh!) of 10s and a polling rate of 1s to give consume a chance to see the last planet.
… Introduced first draft of KclReader. Code cleanup based on code review.
…s processRecord message
Version3 listener unstable
… kinesis stream using the kinetic object with a producer and a listener
…eckpointer. Removed node expirary on checkpointer. Added checkpoint stats in consumer's stats collector. Added periodic update of checkpointer size in KclReader.
…t (instead of a binary search tree). Modified markDone so delete elements from the list upon forming a chain of sequence numbers marked done in order to address the potential for unchecked memory growth. Updated all unit tests to reflect the new expected behavior.
… the main (public) objects instead of the respective (private) option configuration structs.
…ency by adding a new concurrency field to the kinesisReaderOptions struct. Also added a new function method option to be able to adjust this parameter.
…ical bug in the producer's doWork exit condition. Added read response timeout to FirehoseWriter.
… Simplified the GetRecord / GetRecords API to only return an error (instead of returning count and size as well). Updated documentation on the Consumer / KinesisReader around rate limiting.
…th the count/size rate limiter.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This is the one. Release candidate v3.0.0-rc1