From 749229207bc795c12030ced1743b576fef6b6398 Mon Sep 17 00:00:00 2001 From: Jordan Beacham Date: Wed, 4 May 2016 16:40:18 -0500 Subject: [PATCH 1/2] Optimization and bubble up the ProducerResponseStatus to producer.Send method. -Optimized DispatchSerializedData method in DefaultCallbackHandler.cs to use a single foreach method as opposed to multiple foreach statements. -Added offset and partitionId to ProducerResponseStatus and bubbled it all the way to the producer.Send method. Kafka intended for this response to be available to the consumer for logging, health, etc. -Added ShouldReturnOffset() unit test in DefaultCallbackHandlerTest.cs -All Unit Tests passed --- .../Producers/DefaultCallbackHandlerTest.cs | 56 ++++++++++++++ .../Producers/DefaultCallbackHandler.cs | 74 +++++++++++-------- .../Producers/ICallbackHandler.cs | 4 +- src/KafkaNET.Library/Producers/IProducer.cs | 3 +- .../ProduceDispatchSeralizeResult.cs | 6 +- src/KafkaNET.Library/Producers/Producer.cs | 6 +- .../Responses/ProducerResponse.cs | 7 +- 7 files changed, 120 insertions(+), 36 deletions(-) diff --git a/src/Kafka.Client.Tests/Producers/DefaultCallbackHandlerTest.cs b/src/Kafka.Client.Tests/Producers/DefaultCallbackHandlerTest.cs index 755e779..3b589c4 100644 --- a/src/Kafka.Client.Tests/Producers/DefaultCallbackHandlerTest.cs +++ b/src/Kafka.Client.Tests/Producers/DefaultCallbackHandlerTest.cs @@ -111,5 +111,61 @@ public void ShouldRetryHandleWhenTopicNotFound() Assert.Fail("Should have caught exception."); } + + [TestMethod] + [TestCategory(TestCategories.BVT)] + public void ShouldReturnOffset() + { + var partitioner = new Mock>(); + var config = new ProducerConfiguration(new List()); + var pool = new Mock(); + var producer = new Mock(); + var partitionMetadatas = new List() + { + new PartitionMetadata(0, new Broker(0, "host1", 1234), Enumerable.Empty(), + Enumerable.Empty()) + }; + var metadatas = new List() { new TopicMetadata("test", partitionMetadatas, ErrorMapping.NoError) }; + producer.SetupGet(p => p.Config) + .Returns( + () => + new SyncProducerConfiguration(new ProducerConfiguration(new List()), 0, + "host1", 1234)); + producer.Setup(p => p.Send(It.IsAny())).Returns(() => metadatas); + var statuses = new Dictionary(); + var producerResponseStatus = new ProducerResponseStatus() + { + PartitionId = 0, + Topic = "test", + Offset = 1, + Error = ErrorMapping.NoError + }; + statuses[new TopicAndPartition("test", 0)] = producerResponseStatus; + producer.Setup(p => p.Send(It.IsAny())) + .Returns( + () => + new ProducerResponse(1, statuses)); + pool.Setup(p => p.GetShuffledProducers()).Returns(() => new List() { producer.Object }); + pool.Setup(p => p.GetProducer(It.IsAny())).Returns(() => producer.Object); + var mockPartitionInfo = new Mock(); + mockPartitionInfo.Setup(m => m.GetBrokerPartitionInfo(0, string.Empty, It.IsAny(), "test")) + .Returns(() => new List()); + var handler = new DefaultCallbackHandler(config, partitioner.Object, new DefaultEncoder(), mockPartitionInfo.Object, pool.Object); + List producerResponse; + try + { + producerResponse = handler.Handle(new List>() + { + new ProducerData("test", new Message(new byte[100])) + }); + } + catch (FailedToSendMessageException) + { + mockPartitionInfo.Verify(m => m.GetBrokerPartitionInfo(0, string.Empty, It.IsAny(), "test"), Times.Exactly(3)); + return; + } + + Assert.AreEqual(1, producerResponse.FirstOrDefault().Offset); + } } } diff --git a/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs b/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs index 101d0a1..0646958 100644 --- a/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs +++ b/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs @@ -67,12 +67,14 @@ private int NextCorrelationId get { return Interlocked.Increment(ref correlationId); } } - public void Handle(IEnumerable> events) + public List Handle(IEnumerable> events) { IEnumerable> serializedData = this.Serialize(events); ProduceDispatchSeralizeResult outstandingProduceRequests = new ProduceDispatchSeralizeResult(new List { }, serializedData, null, true); var remainingRetries = this.producerConfig.ProducerRetries; int currentRetryMs = producerConfig.ProducerRetryExponentialBackoffMinMs; + //new up master list of ProducerResponseStatus to return + List producerResponseStatuses = new List(); var brokers = this.producerConfig.Brokers; if (producerConfig.Verbose) @@ -86,6 +88,8 @@ public void Handle(IEnumerable> events) { ProduceDispatchSeralizeResult currentOutstandingRequests = this.DispatchSerializedData(outstandingProduceRequests.FailedProducerDatas, remainingRetries > 1 ? false : true); + //Add producerResponseStatuses to response master list + producerResponseStatuses.AddRange(currentOutstandingRequests.ProducerResponse); outstandingProduceRequests = currentOutstandingRequests; if (outstandingProduceRequests.HasDataNeedDispatch) { @@ -121,6 +125,7 @@ public void Handle(IEnumerable> events) Logger.Error(message); throw new FailedToSendMessageException(message, new List(), outstandingProduceRequests, allCount, remainFailedCount); } + return producerResponseStatuses; } private int ExponentialRetry(int currentRetryMs) @@ -141,8 +146,10 @@ private ProduceDispatchSeralizeResult DispatchSerializedData(IEnumerable> failedDetail = null; var exceptions = new List(); bool hasDataNeedReprocess = false; + //new up master list of ProducerResponseStatus to add to ProduceDispatchSerializeResult constructor + List producerResponseStatuses = new List(); try - { + { IEnumerable>>>> partitionedData = this.PartitionAndCollate(messages); foreach (KeyValuePair>>> keyValuePair in partitionedData) { @@ -154,37 +161,47 @@ private ProduceDispatchSeralizeResult DispatchSerializedData(IEnumerable>> failedTopicResponse = this.Send(brokerId, messageSetPerBroker); - if (!failedTopicResponse.Success || (failedTopicResponse.ReturnVal != null && failedTopicResponse.ReturnVal.Any())) - { - failedProduceRequests = new List>(); - foreach (var failedTopic in failedTopicResponse.ReturnVal) - { - List> failedMessages = eventsPerBrokerMap[failedTopic.Item1]; - failedProduceRequests.AddRange(failedMessages); - hasDataNeedReprocess = true; - } + //Get topic responses from send method + ProducerSendResult>> topicResponse = this.Send(brokerId, messageSetPerBroker); - foreach (var topic in failedTopicResponse.ReturnVal.Select(e => e.Item1.Topic).Distinct()) - { - // update the metadata in case that the failure caused by kafka broker failover - this.brokerPartitionInfo.UpdateInfo(producerConfig.VersionId, NextCorrelationId, - producerConfig.ClientId, topic); - } + if (topicResponse?.ReturnVal != null) + { + //New up list of topics to ensure we do not updateInfo on the same topic more than once + var topics = new List(); + failedDetail = new List>(); - if (lastRetry) + //Use a single for each to do proper error handling and retries and add the topicResponseStatus to the master list + foreach (var topicResponseStatus in topicResponse?.ReturnVal) { - failedDetail = new List>(); - foreach (var failedTopic in failedTopicResponse.ReturnVal) + if (topicResponseStatus.Item2.Error != ErrorMapping.NoError) { - failedDetail.Add(new Tuple(brokerId, failedTopic.Item1, failedTopic.Item2)); - } + failedProduceRequests = new List>(); + + List> failedMessages = eventsPerBrokerMap[topicResponseStatus.Item1]; + failedProduceRequests.AddRange(failedMessages); + hasDataNeedReprocess = true; + + //If not in list of topics whose metadata has already been updated + if (!topics.Contains(topicResponseStatus.Item1.Topic)) + { + // update the metadata in case that the failure caused by kafka broker failover + this.brokerPartitionInfo.UpdateInfo(producerConfig.VersionId, NextCorrelationId, + producerConfig.ClientId, topicResponseStatus.Item1.Topic); + topics.Add(topicResponseStatus.Item1.Topic); + } + + if (lastRetry) + { + failedDetail.Add(new Tuple(brokerId, topicResponseStatus.Item1, topicResponseStatus.Item2)); + } + } + //Add ProducerResponseStatus to master list + producerResponseStatuses.Add(topicResponseStatus.Item2); } } - if (failedTopicResponse.Exception != null) - exceptions.Add(failedTopicResponse.Exception); - + if (topicResponse?.Exception != null) + exceptions.Add(topicResponse.Exception); } } catch (Exception) @@ -193,7 +210,7 @@ private ProduceDispatchSeralizeResult DispatchSerializedData(IEnumerable(exceptions, failedProduceRequests, failedDetail, hasDataNeedReprocess); + return new ProduceDispatchSeralizeResult(exceptions, failedProduceRequests, failedDetail, hasDataNeedReprocess, producerResponseStatuses); } /// /// Send message of one broker. @@ -268,8 +285,7 @@ private ProducerSendResult r.Value.Error != (short)ErrorMapping.NoError).Select(r => r.ToString()))); throw new FailedToSendMessageException(sb.ToString()); } - return new ProducerSendResult>>(response.Statuses.Where(s => s.Value.Error != (short)ErrorMapping.NoError) - .Select(s => new Tuple(s.Key, s.Value))); + return new ProducerSendResult>>(response.Statuses.Select(s => new Tuple(s.Key, s.Value))); } } } diff --git a/src/KafkaNET.Library/Producers/ICallbackHandler.cs b/src/KafkaNET.Library/Producers/ICallbackHandler.cs index 6a82abc..6446147 100644 --- a/src/KafkaNET.Library/Producers/ICallbackHandler.cs +++ b/src/KafkaNET.Library/Producers/ICallbackHandler.cs @@ -15,6 +15,8 @@ * limitations under the License. */ +using Kafka.Client.Responses; + namespace Kafka.Client.Producers { using System; @@ -30,6 +32,6 @@ public interface ICallbackHandler : IDisposable /// /// The sent request events. /// - void Handle(IEnumerable> events); + List Handle(IEnumerable> events); } } diff --git a/src/KafkaNET.Library/Producers/IProducer.cs b/src/KafkaNET.Library/Producers/IProducer.cs index 79a9feb..c747e30 100644 --- a/src/KafkaNET.Library/Producers/IProducer.cs +++ b/src/KafkaNET.Library/Producers/IProducer.cs @@ -16,6 +16,7 @@ */ using System.Collections.Generic; +using Kafka.Client.Responses; namespace Kafka.Client.Producers { @@ -36,7 +37,7 @@ public interface IProducer : IDisposable /// synchronous or the asynchronous producer. /// /// The producer data objects that encapsulate the topic, key and message data. - void Send(IEnumerable> data); + List Send(IEnumerable> data); /// /// Sends the data to a single topic, partitioned by key, using either the diff --git a/src/KafkaNET.Library/Producers/ProduceDispatchSeralizeResult.cs b/src/KafkaNET.Library/Producers/ProduceDispatchSeralizeResult.cs index a40ac1b..a05218d 100644 --- a/src/KafkaNET.Library/Producers/ProduceDispatchSeralizeResult.cs +++ b/src/KafkaNET.Library/Producers/ProduceDispatchSeralizeResult.cs @@ -14,17 +14,19 @@ namespace Kafka.Client.Producers public class ProduceDispatchSeralizeResult { public ProduceDispatchSeralizeResult(IEnumerable exceptions, - IEnumerable> failedProducerDatas, List> failedDetail, bool hasDataNeedDispatch) + IEnumerable> failedProducerDatas, List> failedDetail, bool hasDataNeedDispatch, List producerResponse = null) { Exceptions = exceptions; FailedProducerDatas = failedProducerDatas; FailedDetail = failedDetail; this.HasDataNeedDispatch = hasDataNeedDispatch; + this.ProducerResponse = producerResponse ?? new List(); } public IEnumerable> FailedProducerDatas { get; private set; } public IEnumerable Exceptions { get; private set; } public List> FailedDetail { get; private set; } public bool HasDataNeedDispatch { get; private set; } - } + public List ProducerResponse { get; private set; } +} } diff --git a/src/KafkaNET.Library/Producers/Producer.cs b/src/KafkaNET.Library/Producers/Producer.cs index d640ba9..2fb6481 100644 --- a/src/KafkaNET.Library/Producers/Producer.cs +++ b/src/KafkaNET.Library/Producers/Producer.cs @@ -15,6 +15,8 @@ * limitations under the License. */ +using Kafka.Client.Responses; + namespace Kafka.Client.Producers { using Kafka.Client.Cfg; @@ -70,14 +72,14 @@ public Producer(ProducerConfiguration config) /// Sends the data to a multiple topics, partitioned by key /// /// The producer data objects that encapsulate the topic, key and message data. - public void Send(IEnumerable> data) + public List Send(IEnumerable> data) { Guard.NotNull(data, "data"); Guard.CheckBool(data.Any(), true, "data.Any()"); this.EnsuresNotDisposed(); - this.callbackHandler.Handle(data); + return this.callbackHandler.Handle(data); } /// diff --git a/src/KafkaNET.Library/Responses/ProducerResponse.cs b/src/KafkaNET.Library/Responses/ProducerResponse.cs index 61a4179..8afc807 100644 --- a/src/KafkaNET.Library/Responses/ProducerResponse.cs +++ b/src/KafkaNET.Library/Responses/ProducerResponse.cs @@ -25,6 +25,8 @@ public class ProducerResponseStatus { public ErrorMapping Error { get; set; } public long Offset { get; set; } + public string Topic { get; set; } + public int PartitionId { get; set; } public override string ToString() { return string.Format("Error:{0} Offset:{1}", this.Error, this.Offset); @@ -59,13 +61,16 @@ public ProducerResponse ParseFrom(KafkaBinaryReader reader) { var partitionId = reader.ReadInt32(); var error = reader.ReadInt16(); + //only returns first message offset per Topic and Partition pair var offset = reader.ReadInt64(); var topicAndPartition = new TopicAndPartition(topic, partitionId); statuses.Add(topicAndPartition, new ProducerResponseStatus() { Error = ErrorMapper.ToError(error), - Offset = offset + Offset = offset, + Topic = topic, + PartitionId = partitionId }); } From e27e82ab3d9f8cc019ca4599dc2691779655447c Mon Sep 17 00:00:00 2001 From: Jordan Beacham Date: Fri, 6 May 2016 12:00:49 -0500 Subject: [PATCH 2/2] Remove unnecessary null check to avoid confusion. --- src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs b/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs index 0646958..b6eedc0 100644 --- a/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs +++ b/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs @@ -171,7 +171,7 @@ private ProduceDispatchSeralizeResult DispatchSerializedData(IEnumerable>(); //Use a single for each to do proper error handling and retries and add the topicResponseStatus to the master list - foreach (var topicResponseStatus in topicResponse?.ReturnVal) + foreach (var topicResponseStatus in topicResponse.ReturnVal) { if (topicResponseStatus.Item2.Error != ErrorMapping.NoError) {