diff --git a/src/Kafka.Client.Tests/Producers/DefaultCallbackHandlerTest.cs b/src/Kafka.Client.Tests/Producers/DefaultCallbackHandlerTest.cs index 755e779..3b589c4 100644 --- a/src/Kafka.Client.Tests/Producers/DefaultCallbackHandlerTest.cs +++ b/src/Kafka.Client.Tests/Producers/DefaultCallbackHandlerTest.cs @@ -111,5 +111,61 @@ public void ShouldRetryHandleWhenTopicNotFound() Assert.Fail("Should have caught exception."); } + + [TestMethod] + [TestCategory(TestCategories.BVT)] + public void ShouldReturnOffset() + { + var partitioner = new Mock>(); + var config = new ProducerConfiguration(new List()); + var pool = new Mock(); + var producer = new Mock(); + var partitionMetadatas = new List() + { + new PartitionMetadata(0, new Broker(0, "host1", 1234), Enumerable.Empty(), + Enumerable.Empty()) + }; + var metadatas = new List() { new TopicMetadata("test", partitionMetadatas, ErrorMapping.NoError) }; + producer.SetupGet(p => p.Config) + .Returns( + () => + new SyncProducerConfiguration(new ProducerConfiguration(new List()), 0, + "host1", 1234)); + producer.Setup(p => p.Send(It.IsAny())).Returns(() => metadatas); + var statuses = new Dictionary(); + var producerResponseStatus = new ProducerResponseStatus() + { + PartitionId = 0, + Topic = "test", + Offset = 1, + Error = ErrorMapping.NoError + }; + statuses[new TopicAndPartition("test", 0)] = producerResponseStatus; + producer.Setup(p => p.Send(It.IsAny())) + .Returns( + () => + new ProducerResponse(1, statuses)); + pool.Setup(p => p.GetShuffledProducers()).Returns(() => new List() { producer.Object }); + pool.Setup(p => p.GetProducer(It.IsAny())).Returns(() => producer.Object); + var mockPartitionInfo = new Mock(); + mockPartitionInfo.Setup(m => m.GetBrokerPartitionInfo(0, string.Empty, It.IsAny(), "test")) + .Returns(() => new List()); + var handler = new DefaultCallbackHandler(config, partitioner.Object, new DefaultEncoder(), mockPartitionInfo.Object, pool.Object); + List producerResponse; + try + { + producerResponse = handler.Handle(new List>() + { + new ProducerData("test", new Message(new byte[100])) + }); + } + catch (FailedToSendMessageException) + { + mockPartitionInfo.Verify(m => m.GetBrokerPartitionInfo(0, string.Empty, It.IsAny(), "test"), Times.Exactly(3)); + return; + } + + Assert.AreEqual(1, producerResponse.FirstOrDefault().Offset); + } } } diff --git a/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs b/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs index 101d0a1..b6eedc0 100644 --- a/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs +++ b/src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs @@ -67,12 +67,14 @@ private int NextCorrelationId get { return Interlocked.Increment(ref correlationId); } } - public void Handle(IEnumerable> events) + public List Handle(IEnumerable> events) { IEnumerable> serializedData = this.Serialize(events); ProduceDispatchSeralizeResult outstandingProduceRequests = new ProduceDispatchSeralizeResult(new List { }, serializedData, null, true); var remainingRetries = this.producerConfig.ProducerRetries; int currentRetryMs = producerConfig.ProducerRetryExponentialBackoffMinMs; + //new up master list of ProducerResponseStatus to return + List producerResponseStatuses = new List(); var brokers = this.producerConfig.Brokers; if (producerConfig.Verbose) @@ -86,6 +88,8 @@ public void Handle(IEnumerable> events) { ProduceDispatchSeralizeResult currentOutstandingRequests = this.DispatchSerializedData(outstandingProduceRequests.FailedProducerDatas, remainingRetries > 1 ? false : true); + //Add producerResponseStatuses to response master list + producerResponseStatuses.AddRange(currentOutstandingRequests.ProducerResponse); outstandingProduceRequests = currentOutstandingRequests; if (outstandingProduceRequests.HasDataNeedDispatch) { @@ -121,6 +125,7 @@ public void Handle(IEnumerable> events) Logger.Error(message); throw new FailedToSendMessageException(message, new List(), outstandingProduceRequests, allCount, remainFailedCount); } + return producerResponseStatuses; } private int ExponentialRetry(int currentRetryMs) @@ -141,8 +146,10 @@ private ProduceDispatchSeralizeResult DispatchSerializedData(IEnumerable> failedDetail = null; var exceptions = new List(); bool hasDataNeedReprocess = false; + //new up master list of ProducerResponseStatus to add to ProduceDispatchSerializeResult constructor + List producerResponseStatuses = new List(); try - { + { IEnumerable>>>> partitionedData = this.PartitionAndCollate(messages); foreach (KeyValuePair>>> keyValuePair in partitionedData) { @@ -154,37 +161,47 @@ private ProduceDispatchSeralizeResult DispatchSerializedData(IEnumerable>> failedTopicResponse = this.Send(brokerId, messageSetPerBroker); - if (!failedTopicResponse.Success || (failedTopicResponse.ReturnVal != null && failedTopicResponse.ReturnVal.Any())) - { - failedProduceRequests = new List>(); - foreach (var failedTopic in failedTopicResponse.ReturnVal) - { - List> failedMessages = eventsPerBrokerMap[failedTopic.Item1]; - failedProduceRequests.AddRange(failedMessages); - hasDataNeedReprocess = true; - } + //Get topic responses from send method + ProducerSendResult>> topicResponse = this.Send(brokerId, messageSetPerBroker); - foreach (var topic in failedTopicResponse.ReturnVal.Select(e => e.Item1.Topic).Distinct()) - { - // update the metadata in case that the failure caused by kafka broker failover - this.brokerPartitionInfo.UpdateInfo(producerConfig.VersionId, NextCorrelationId, - producerConfig.ClientId, topic); - } + if (topicResponse?.ReturnVal != null) + { + //New up list of topics to ensure we do not updateInfo on the same topic more than once + var topics = new List(); + failedDetail = new List>(); - if (lastRetry) + //Use a single for each to do proper error handling and retries and add the topicResponseStatus to the master list + foreach (var topicResponseStatus in topicResponse.ReturnVal) { - failedDetail = new List>(); - foreach (var failedTopic in failedTopicResponse.ReturnVal) + if (topicResponseStatus.Item2.Error != ErrorMapping.NoError) { - failedDetail.Add(new Tuple(brokerId, failedTopic.Item1, failedTopic.Item2)); - } + failedProduceRequests = new List>(); + + List> failedMessages = eventsPerBrokerMap[topicResponseStatus.Item1]; + failedProduceRequests.AddRange(failedMessages); + hasDataNeedReprocess = true; + + //If not in list of topics whose metadata has already been updated + if (!topics.Contains(topicResponseStatus.Item1.Topic)) + { + // update the metadata in case that the failure caused by kafka broker failover + this.brokerPartitionInfo.UpdateInfo(producerConfig.VersionId, NextCorrelationId, + producerConfig.ClientId, topicResponseStatus.Item1.Topic); + topics.Add(topicResponseStatus.Item1.Topic); + } + + if (lastRetry) + { + failedDetail.Add(new Tuple(brokerId, topicResponseStatus.Item1, topicResponseStatus.Item2)); + } + } + //Add ProducerResponseStatus to master list + producerResponseStatuses.Add(topicResponseStatus.Item2); } } - if (failedTopicResponse.Exception != null) - exceptions.Add(failedTopicResponse.Exception); - + if (topicResponse?.Exception != null) + exceptions.Add(topicResponse.Exception); } } catch (Exception) @@ -193,7 +210,7 @@ private ProduceDispatchSeralizeResult DispatchSerializedData(IEnumerable(exceptions, failedProduceRequests, failedDetail, hasDataNeedReprocess); + return new ProduceDispatchSeralizeResult(exceptions, failedProduceRequests, failedDetail, hasDataNeedReprocess, producerResponseStatuses); } /// /// Send message of one broker. @@ -268,8 +285,7 @@ private ProducerSendResult r.Value.Error != (short)ErrorMapping.NoError).Select(r => r.ToString()))); throw new FailedToSendMessageException(sb.ToString()); } - return new ProducerSendResult>>(response.Statuses.Where(s => s.Value.Error != (short)ErrorMapping.NoError) - .Select(s => new Tuple(s.Key, s.Value))); + return new ProducerSendResult>>(response.Statuses.Select(s => new Tuple(s.Key, s.Value))); } } } diff --git a/src/KafkaNET.Library/Producers/ICallbackHandler.cs b/src/KafkaNET.Library/Producers/ICallbackHandler.cs index 6a82abc..6446147 100644 --- a/src/KafkaNET.Library/Producers/ICallbackHandler.cs +++ b/src/KafkaNET.Library/Producers/ICallbackHandler.cs @@ -15,6 +15,8 @@ * limitations under the License. */ +using Kafka.Client.Responses; + namespace Kafka.Client.Producers { using System; @@ -30,6 +32,6 @@ public interface ICallbackHandler : IDisposable /// /// The sent request events. /// - void Handle(IEnumerable> events); + List Handle(IEnumerable> events); } } diff --git a/src/KafkaNET.Library/Producers/IProducer.cs b/src/KafkaNET.Library/Producers/IProducer.cs index 79a9feb..c747e30 100644 --- a/src/KafkaNET.Library/Producers/IProducer.cs +++ b/src/KafkaNET.Library/Producers/IProducer.cs @@ -16,6 +16,7 @@ */ using System.Collections.Generic; +using Kafka.Client.Responses; namespace Kafka.Client.Producers { @@ -36,7 +37,7 @@ public interface IProducer : IDisposable /// synchronous or the asynchronous producer. /// /// The producer data objects that encapsulate the topic, key and message data. - void Send(IEnumerable> data); + List Send(IEnumerable> data); /// /// Sends the data to a single topic, partitioned by key, using either the diff --git a/src/KafkaNET.Library/Producers/ProduceDispatchSeralizeResult.cs b/src/KafkaNET.Library/Producers/ProduceDispatchSeralizeResult.cs index a40ac1b..a05218d 100644 --- a/src/KafkaNET.Library/Producers/ProduceDispatchSeralizeResult.cs +++ b/src/KafkaNET.Library/Producers/ProduceDispatchSeralizeResult.cs @@ -14,17 +14,19 @@ namespace Kafka.Client.Producers public class ProduceDispatchSeralizeResult { public ProduceDispatchSeralizeResult(IEnumerable exceptions, - IEnumerable> failedProducerDatas, List> failedDetail, bool hasDataNeedDispatch) + IEnumerable> failedProducerDatas, List> failedDetail, bool hasDataNeedDispatch, List producerResponse = null) { Exceptions = exceptions; FailedProducerDatas = failedProducerDatas; FailedDetail = failedDetail; this.HasDataNeedDispatch = hasDataNeedDispatch; + this.ProducerResponse = producerResponse ?? new List(); } public IEnumerable> FailedProducerDatas { get; private set; } public IEnumerable Exceptions { get; private set; } public List> FailedDetail { get; private set; } public bool HasDataNeedDispatch { get; private set; } - } + public List ProducerResponse { get; private set; } +} } diff --git a/src/KafkaNET.Library/Producers/Producer.cs b/src/KafkaNET.Library/Producers/Producer.cs index d640ba9..2fb6481 100644 --- a/src/KafkaNET.Library/Producers/Producer.cs +++ b/src/KafkaNET.Library/Producers/Producer.cs @@ -15,6 +15,8 @@ * limitations under the License. */ +using Kafka.Client.Responses; + namespace Kafka.Client.Producers { using Kafka.Client.Cfg; @@ -70,14 +72,14 @@ public Producer(ProducerConfiguration config) /// Sends the data to a multiple topics, partitioned by key /// /// The producer data objects that encapsulate the topic, key and message data. - public void Send(IEnumerable> data) + public List Send(IEnumerable> data) { Guard.NotNull(data, "data"); Guard.CheckBool(data.Any(), true, "data.Any()"); this.EnsuresNotDisposed(); - this.callbackHandler.Handle(data); + return this.callbackHandler.Handle(data); } /// diff --git a/src/KafkaNET.Library/Responses/ProducerResponse.cs b/src/KafkaNET.Library/Responses/ProducerResponse.cs index 61a4179..8afc807 100644 --- a/src/KafkaNET.Library/Responses/ProducerResponse.cs +++ b/src/KafkaNET.Library/Responses/ProducerResponse.cs @@ -25,6 +25,8 @@ public class ProducerResponseStatus { public ErrorMapping Error { get; set; } public long Offset { get; set; } + public string Topic { get; set; } + public int PartitionId { get; set; } public override string ToString() { return string.Format("Error:{0} Offset:{1}", this.Error, this.Offset); @@ -59,13 +61,16 @@ public ProducerResponse ParseFrom(KafkaBinaryReader reader) { var partitionId = reader.ReadInt32(); var error = reader.ReadInt16(); + //only returns first message offset per Topic and Partition pair var offset = reader.ReadInt64(); var topicAndPartition = new TopicAndPartition(topic, partitionId); statuses.Add(topicAndPartition, new ProducerResponseStatus() { Error = ErrorMapper.ToError(error), - Offset = offset + Offset = offset, + Topic = topic, + PartitionId = partitionId }); }