From daa3d56fd5e00c3ee5f2a08d6370bd9e3cbeecbe Mon Sep 17 00:00:00 2001 From: Greg Fodor Date: Wed, 16 Mar 2016 16:40:51 -0700 Subject: [PATCH 1/4] Add support for throttle time field --- .../Response/FetchResponseTest.cs | 2 +- src/KafkaNET.Library/KafkaConnection.cs | 2 +- src/KafkaNET.Library/Responses/FetchResponse.cs | 17 ++++++++++++++--- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/Kafka.Client.Tests/Response/FetchResponseTest.cs b/src/Kafka.Client.Tests/Response/FetchResponseTest.cs index c7a72f0..761fdfd 100644 --- a/src/Kafka.Client.Tests/Response/FetchResponseTest.cs +++ b/src/Kafka.Client.Tests/Response/FetchResponseTest.cs @@ -39,7 +39,7 @@ public void ShouldAbleToParseFetchResponse() writer.Write(messageStream.GetBuffer(), 0, (int)messageStream.Length); stream.Seek(0, SeekOrigin.Begin); var reader = new KafkaBinaryReader(stream); - var response = new FetchResponse.Parser().ParseFrom(reader); + var response = new FetchResponse.Parser(0).ParseFrom(reader); var set = response.MessageSet("topic1", 111); set.Should().NotBeNull(); var messages = set.Messages.ToList(); diff --git a/src/KafkaNET.Library/KafkaConnection.cs b/src/KafkaNET.Library/KafkaConnection.cs index 475ba7d..18b9cf7 100644 --- a/src/KafkaNET.Library/KafkaConnection.cs +++ b/src/KafkaNET.Library/KafkaConnection.cs @@ -139,7 +139,7 @@ public FetchResponse Send(FetchRequest request) { this.EnsuresNotDisposed(); Guard.NotNull(request, "request"); - return this.Handle(request.RequestBuffer.GetBuffer(), new FetchResponse.Parser()); + return this.Handle(request.RequestBuffer.GetBuffer(), new FetchResponse.Parser(request.VersionId)); } /// diff --git a/src/KafkaNET.Library/Responses/FetchResponse.cs b/src/KafkaNET.Library/Responses/FetchResponse.cs index 0e6e0f8..f8f7d9e 100644 --- a/src/KafkaNET.Library/Responses/FetchResponse.cs +++ b/src/KafkaNET.Library/Responses/FetchResponse.cs @@ -39,17 +39,19 @@ public FetchResponse(int correlationId, IEnumerable data) this.TopicDataDict = data.GroupBy(x => x.Topic, x => x) .ToDictionary(x => x.Key, x => x.ToList().FirstOrDefault()); } - public FetchResponse(int correlationId, IEnumerable data, int size) + public FetchResponse(int correlationId, IEnumerable data, int size, int throttleTime) { Guard.NotNull(data, "data"); this.CorrelationId = correlationId; this.TopicDataDict = data.GroupBy(x => x.Topic, x => x) .ToDictionary(x => x.Key, x => x.ToList().FirstOrDefault()); this.Size = size; + this.ThrottleTime = throttleTime; } public int Size { get; private set; } public int CorrelationId { get; private set; } + public int ThrottleTime { get; private set; } public Dictionary TopicDataDict { get; private set; } public BufferedMessageSet MessageSet(string topic, int partition) @@ -92,13 +94,22 @@ public PartitionData PartitionData(string topic, int partition) public class Parser : IResponseParser { + private int versionId; + + public Parser(int versionId) + { + this.versionId = versionId; + } + public FetchResponse ParseFrom(KafkaBinaryReader reader) { - int size = 0, correlationId = 0, dataCount = 0; + int size = 0, correlationId = 0, dataCount = 0, throttleTime = 0; try { size = reader.ReadInt32(); correlationId = reader.ReadInt32(); + if (versionId > 0) + throttleTime = reader.ReadInt32(); dataCount = reader.ReadInt32(); var data = new TopicData[dataCount]; for (int i = 0; i < dataCount; i++) @@ -106,7 +117,7 @@ public FetchResponse ParseFrom(KafkaBinaryReader reader) data[i] = TopicData.ParseFrom(reader); } - return new FetchResponse(correlationId, data, size); + return new FetchResponse(correlationId, data, size, throttleTime); } catch (OutOfMemoryException mex) { From 935353eb2d1325c5d708f1c7312eadc8f11454af Mon Sep 17 00:00:00 2001 From: Greg Fodor Date: Wed, 16 Mar 2016 16:47:42 -0700 Subject: [PATCH 2/4] Update tests for v1 FetchResponse --- .../Response/FetchResponseTest.cs | 50 ++++++++++++++----- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/src/Kafka.Client.Tests/Response/FetchResponseTest.cs b/src/Kafka.Client.Tests/Response/FetchResponseTest.cs index 761fdfd..72f800d 100644 --- a/src/Kafka.Client.Tests/Response/FetchResponseTest.cs +++ b/src/Kafka.Client.Tests/Response/FetchResponseTest.cs @@ -19,32 +19,58 @@ public class FetchResponseTest { [TestMethod] [TestCategory(TestCategories.BVT)] - public void ShouldAbleToParseFetchResponse() + public void ShouldAbleToParseV0FetchResponse() { var stream = new MemoryStream(); + WriteTestFetchResponse(stream, 0); + var reader = new KafkaBinaryReader(stream); + var response = new FetchResponse.Parser(0).ParseFrom(reader); + response.ThrottleTime.ShouldBeEquivalentTo(0); + var set = response.MessageSet("topic1", 111); + set.Should().NotBeNull(); + var messages = set.Messages.ToList(); + messages.Count().Should().Be(1); + messages.First().Payload.Length.Should().Be(100); + } + + [TestMethod] + [TestCategory(TestCategories.BVT)] + public void ShouldAbleToParseV1FetchResponse() + { + var stream = new MemoryStream(); + WriteTestFetchResponse(stream, 1); + var reader = new KafkaBinaryReader(stream); + var response = new FetchResponse.Parser(1).ParseFrom(reader); + response.ThrottleTime.ShouldBeEquivalentTo(456); + var set = response.MessageSet("topic1", 111); + set.Should().NotBeNull(); + var messages = set.Messages.ToList(); + messages.Count().Should().Be(1); + messages.First().Payload.Length.Should().Be(100); + } + + private static void WriteTestFetchResponse(MemoryStream stream, int versionId) + { var writer = new KafkaBinaryWriter(stream); writer.Write(1); writer.Write(123); // correlation id + if (versionId > 0) + { + writer.Write(456); + } writer.Write(1); // data count writer.WriteShortString("topic1"); writer.Write(1); // partition count writer.Write(111); //partition id - writer.Write((short)ErrorMapping.NoError); + writer.Write((short) ErrorMapping.NoError); writer.Write(1011L); // hw var messageStream = new MemoryStream(); var messageWriter = new KafkaBinaryWriter(messageStream); - new BufferedMessageSet(new List() { new Message(new byte[100]) }, 0).WriteTo(messageWriter); - writer.Write((int)messageStream.Length); - writer.Write(messageStream.GetBuffer(), 0, (int)messageStream.Length); + new BufferedMessageSet(new List() {new Message(new byte[100])}, 0).WriteTo(messageWriter); + writer.Write((int) messageStream.Length); + writer.Write(messageStream.GetBuffer(), 0, (int) messageStream.Length); stream.Seek(0, SeekOrigin.Begin); - var reader = new KafkaBinaryReader(stream); - var response = new FetchResponse.Parser(0).ParseFrom(reader); - var set = response.MessageSet("topic1", 111); - set.Should().NotBeNull(); - var messages = set.Messages.ToList(); - messages.Count().Should().Be(1); - messages.First().Payload.Length.Should().Be(100); } } } From 230f9c2a9cd725403175c9e6555602ededf74080 Mon Sep 17 00:00:00 2001 From: Greg Fodor Date: Wed, 16 Mar 2016 16:48:57 -0700 Subject: [PATCH 3/4] Re-order method signature to reflect protocol ordering --- src/KafkaNET.Library/Responses/FetchResponse.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/KafkaNET.Library/Responses/FetchResponse.cs b/src/KafkaNET.Library/Responses/FetchResponse.cs index f8f7d9e..405c1fa 100644 --- a/src/KafkaNET.Library/Responses/FetchResponse.cs +++ b/src/KafkaNET.Library/Responses/FetchResponse.cs @@ -39,14 +39,14 @@ public FetchResponse(int correlationId, IEnumerable data) this.TopicDataDict = data.GroupBy(x => x.Topic, x => x) .ToDictionary(x => x.Key, x => x.ToList().FirstOrDefault()); } - public FetchResponse(int correlationId, IEnumerable data, int size, int throttleTime) + public FetchResponse(int correlationId, int throttleTime, IEnumerable data, int size) { Guard.NotNull(data, "data"); this.CorrelationId = correlationId; + this.ThrottleTime = throttleTime; this.TopicDataDict = data.GroupBy(x => x.Topic, x => x) .ToDictionary(x => x.Key, x => x.ToList().FirstOrDefault()); this.Size = size; - this.ThrottleTime = throttleTime; } public int Size { get; private set; } @@ -117,7 +117,7 @@ public FetchResponse ParseFrom(KafkaBinaryReader reader) data[i] = TopicData.ParseFrom(reader); } - return new FetchResponse(correlationId, data, size, throttleTime); + return new FetchResponse(correlationId, throttleTime, data, size); } catch (OutOfMemoryException mex) { From b1d706cabfeaa1eefd78ae66cf09b7639377ec26 Mon Sep 17 00:00:00 2001 From: Greg Fodor Date: Wed, 16 Mar 2016 23:17:13 -0700 Subject: [PATCH 4/4] Missing comment --- src/Kafka.Client.Tests/Response/FetchResponseTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Kafka.Client.Tests/Response/FetchResponseTest.cs b/src/Kafka.Client.Tests/Response/FetchResponseTest.cs index 72f800d..8864629 100644 --- a/src/Kafka.Client.Tests/Response/FetchResponseTest.cs +++ b/src/Kafka.Client.Tests/Response/FetchResponseTest.cs @@ -56,7 +56,7 @@ private static void WriteTestFetchResponse(MemoryStream stream, int versionId) writer.Write(123); // correlation id if (versionId > 0) { - writer.Write(456); + writer.Write(456); // throttle time } writer.Write(1); // data count writer.WriteShortString("topic1");