Skip to content
This repository was archived by the owner on Jul 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f0690d3
Add Gitter badge
gitter-badger Feb 23, 2016
2e58a99
Update THIRD-PARTY-NOTICES.txt
Feb 24, 2016
a1676b3
After a rebalance set consumedOffset=commitedOffset and NOT
Feb 24, 2016
9dc7041
Fix git clone url
AlgorithmsAreCool Feb 24, 2016
6a2714d
Remove package binaries cache from repo.
jthelin Feb 24, 2016
8a37d5f
Remove temporary generated files from repo.
jthelin Feb 24, 2016
daea134
Fix inconsistent NuGet package versions.
jthelin Feb 24, 2016
e05997f
Missing NuGet packages.config files.
jthelin Feb 24, 2016
aeecdf9
Update ProduceSimpleHelperOptions.cs
johnstark Feb 24, 2016
4d0eac1
Update ProducePerfTestHelperOptions.cs
johnstark Feb 24, 2016
52ed6e0
Update Readme
riamandii Feb 26, 2016
4daac0b
Property calls instead of method invocation
alexandrnikitin Feb 25, 2016
0c97923
Fix string format parameter in ZookeeperConsumerConnector
alexandrnikitin Feb 25, 2016
5302baf
Use syntax highlighting on code examples
epayet Feb 29, 2016
4601a52
Add standard .gitattributes file, as recommended by GitHub Client app.
jthelin Feb 26, 2016
601ba9c
Small Typo
riamandii Mar 2, 2016
315e22d
Fix some build issues related to VS nuget package restore + references.
jthelin Mar 3, 2016
d807ebd
test typo fix
riamandii Mar 3, 2016
d53f7d0
Intial .nuspec
riamandii Mar 3, 2016
2d94244
Revert "Intial .nuspec"
riamandii Mar 3, 2016
5c4e21e
Intial .nuspec
riamandii Mar 3, 2016
07658e6
[PVS-Studio] ConsumeGroupFindNewLeaderSleepIntervalMs variable assign…
jthelin Mar 5, 2016
ae5ced6
override setting position of fetch offset with flag
ducas Mar 8, 2016
b3c1b07
expose producer method to send multiple producerdata in single operat…
ducas Mar 16, 2016
84fdbed
extract interface for KafkaMessageStream and ConsumerIterator
ducas Mar 16, 2016
04e9f34
addition of the zookeeperconsumerconnector interface, this is to allo…
wha-deploy Mar 13, 2016
ea9d0ac
use kafka message stream interface in zookeeper consumer connector in…
ducas Mar 28, 2016
3760e21
add fetch wait max ms and fetch min bytes to consumer configuration
ducas Mar 28, 2016
2afb968
setup partition info with consumedOffset = auto.offset.reset - 1 for …
Mar 31, 2016
d87ac15
Removing explicit assembly references from nuspec
mhorstma Apr 1, 2016
d761482
Deleted commented out files section
mhorstma Apr 1, 2016
2066c70
Modify Repo name and references to CSharpKafkaClient instead of Kafkanet
riamandii Apr 14, 2016
f32eb7b
Renaming to CSharpClient-for-Kafka
riamandii Apr 18, 2016
9a44d90
1) Fix log loss scenario during rebalance, 2) use api version 0 inste…
Apr 4, 2016
a9159ea
change Connect() to use async connect call and honor receiveTimeoutMs…
danielli90 Apr 19, 2016
7ba6c98
Add support for 0.9.0 and 0.10.0 protocols
gfodor Mar 6, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
root = true

[*]
end_of_line = crlf
indent_style = space
indent_size = 4
17 changes: 17 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Auto detect text files and perform LF normalization
* text=auto

# Custom for Visual Studio
*.cs diff=csharp

# Standard to msysgit
*.doc diff=astextplain
*.DOC diff=astextplain
*.docx diff=astextplain
*.DOCX diff=astextplain
*.dot diff=astextplain
*.DOT diff=astextplain
*.pdf diff=astextplain
*.PDF diff=astextplain
*.rtf diff=astextplain
*.RTF diff=astextplain
6 changes: 3 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ We invite you to become part of this project, regardless of your skill set or av

Contribute
----------
Contributions to Kafkanet are welcome. Here is how you can contribute to Kafkanet:
* [Submit bugs](https://github.com/Microsoft/Kafkanet/issues) and help us verify fixes
* [Submit pull requests](https://github.com/Microsoft/Kafkanet/pulls) for bug fixes and features and discuss existing proposals
Contributions to CSharpClient-for-Kafka are welcome. Here is how you can contribute to CSharpClient-for-Kafka:
* [Submit bugs](https://github.com/Microsoft/CSharpClient-for-Kafka/issues) and help us verify fixes
* [Submit pull requests](https://github.com/Microsoft/CSharpClient-for-Kafka/pulls) for bug fixes and features and discuss existing proposals

Contribution License Agreement
------------------------------
Expand Down
2 changes: 1 addition & 1 deletion LICENSE.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Kafkanet
CSharpClient-for-Kafka
Copyright � Microsoft Corporation
All rights reserved.
Licensed under the Apache License, Version 2.0 (the �License�); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
Expand Down
114 changes: 61 additions & 53 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
# Kafkanet
# CSharpClient-for-Kafka

[![Join the chat at https://gitter.im/Microsoft/Kafkanet](https://badges.gitter.im/Microsoft/Kafkanet.svg)](https://gitter.im/Microsoft/Kafkanet?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
.Net implementation of the Apache Kafka Protocol that provides basic functionality through Producer/Consumer classes. The project also offers balanced consumer implementation.
The project is a fork from ExactTarget's Kafka-net Client, tuned to serve Microsoft's Big Data needs. At Microsoft we run Kafka on multiple Windows Clusters with JBOD machines.
The client works with Kafka 0.8.1.
The project is a fork from ExactTarget's Kafka-net Client.

## Related documentation
* [Kafka documentation](https://kafka.apache.org/documentation.html)
* [Zookeeper documentation](https://cwiki.apache.org/confluence/display/ZOOKEEPER/Index)
* [Kafka client protocol](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol)

## Build Kafkanet
* Clone Kafkanet through ```git clone https://github.com/Microsoft/ChakraCore.git```
## Build CSharpClient-for-Kafka
* Clone CSharpClient-for-Kafka through ```git clone https://github.com/Microsoft/CSharpClient-for-Kafka.git```
* Open `src\KafkaNETLibraryAndConsole.sln` in Visual Studio
* Build Solution

Expand Down Expand Up @@ -38,63 +39,70 @@ The client works with Kafka 0.8.1.
The Producer can send one message or an entire batch to Kafka. When sending a batch you can send to multiple topics at once
#### Producer Usage

var brokerConfig = new BrokerConfiguration()
{
BrokerId = this.brokerId,
Host = this.kafkaServerName,
Port = this.kafkaPort
};
var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
kafkaProducer = new Producer(config);
// here you construct you batch or single message object
var batch=ConstructBatch();
kafkaProducer.Send(batch);
```c#
var brokerConfig = new BrokerConfiguration()
{
BrokerId = this.brokerId,
Host = this.kafkaServerName,
Port = this.kafkaPort
};
var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
kafkaProducer = new Producer(config);
// here you construct your batch or a single message object
var batch=ConstructBatch();
kafkaProducer.Send(batch);
```

### Simple Consumer

The simple Consumer allows full control for retrieving data. You could instantiate a Consumer directly by providing a ConsumerConfiguration and then calling Fetch.
Kafkanet has a higher level wrapper around Consumer which allows consumer reuse and other benefits
CSharpClient-for-Kafka has a higher level wrapper around Consumer which allows consumer reuse and other benefits
#### Consumer Usage

// create the Consumer higher level manager
var managerConfig = new KafkaSimpleManagerConfiguration()
{
FetchSize = FetchSize,
BufferSize = BufferSize,
Zookeeper = m_zookeeper
};
m_consumerManager = new KafkaSimpleManager<int, Kafka.Client.Messages.Message>(managerConfig);
// get all available partitions for a topic through the manager
var allPartitions = m_consumerManager.GetTopicPartitionsFromZK(m_topic);
// Refresh metadata and grab a consumer for desired partitions
m_consumerManager.RefreshMetadata(0, m_consumerId, 0, m_topic, true);
var partitionConsumer = m_consumerManager.GetConsumer(m_topic, partitionId);
```c#
// create the Consumer higher level manager
var managerConfig = new KafkaSimpleManagerConfiguration()
{
FetchSize = FetchSize,
BufferSize = BufferSize,
Zookeeper = m_zookeeper
};
m_consumerManager = new KafkaSimpleManager<int, Kafka.Client.Messages.Message>(managerConfig);
// get all available partitions for a topic through the manager
var allPartitions = m_consumerManager.GetTopicPartitionsFromZK(m_topic);
// Refresh metadata and grab a consumer for desired partitions
m_consumerManager.RefreshMetadata(0, m_consumerId, 0, m_topic, true);
var partitionConsumer = m_consumerManager.GetConsumer(m_topic, partitionId);
```
### Balanced Consumer

The balanced consumer manages partition assignment for each instance in the same consumer group. Rebalance are triggered by zookeeper changes.
#### Balanced Consumer Usage

// Here we create a balanced consumer on one consumer machine for consumerGroupId. All machines consuming for this group will get balanced together
ConsumerConfiguration config = new ConsumerConfiguration
{
AutoCommit = false,
GroupId = consumerGroupId
ConsumerId = uniqueConsumerId
MaxFetchBufferLength = m_BufferMaxNoOfMessages,
FetchSize = fetchSize,
AutoOffsetReset = OffsetRequest.LargestTime,
NumberOfTries = 20,
ZooKeeper = new ZooKeeperConfiguration(zookeeperString, 30000, 30000, 2000)
};
var balancedConsumer = new ZookeeperConsumerConnector(config, true, m_ConsumerRebalanceHandler, m_ZKDisconnectHandler, m_ZKExpireHandler);
// grab streams for desired topics
var streams = m_ZooKeeperConsumerConnector.CreateMessageStreams(m_TopicMap, new DefaultDecoder());
var KafkaMessageStream = streams[m_Topic][0];
// start consuming stream
foreach (Message message in m_KafkaMessageStream.GetCancellable(cancellationTokenSource.Token))
....

```c#
// Here we create a balanced consumer on one consumer machine for consumerGroupId. All machines consuming for this group will get balanced together
ConsumerConfiguration config = new ConsumerConfiguration
{
AutoCommit = false,
GroupId = consumerGroupId
ConsumerId = uniqueConsumerId
MaxFetchBufferLength = m_BufferMaxNoOfMessages,
FetchSize = fetchSize,
AutoOffsetReset = OffsetRequest.LargestTime,
NumberOfTries = 20,
ZooKeeper = new ZooKeeperConfiguration(zookeeperString, 30000, 30000, 2000)
};
var balancedConsumer = new ZookeeperConsumerConnector(config, true, m_ConsumerRebalanceHandler, m_ZKDisconnectHandler, m_ZKExpireHandler);
// grab streams for desired topics
var streams = m_ZooKeeperConsumerConnector.CreateMessageStreams(m_TopicMap, new DefaultDecoder());
var KafkaMessageStream = streams[m_Topic][0];
// start consuming stream
foreach (Message message in m_KafkaMessageStream.GetCancellable(cancellationTokenSource.Token))
....
```

## Contribute

Contributions to Kafkanet are welcome. Here is how you can contribute to Kafkanet:
* [Submit bugs](https://github.com/Microsoft/Kafkanet/issues) and help us verify fixes
* [Submit pull requests](https://github.com/Microsoft/Kafkanet/pulls) for bug fixes and features and discuss existing proposals
Contributions to CSharpClient-for-Kafka are welcome. Here is how you can contribute to CSharpClient-for-Kafka:
* [Submit bugs](https://github.com/Microsoft/CSharpClient-for-Kafka/issues) and help us verify fixes
* [Submit pull requests](https://github.com/Microsoft/CSharpClient-for-Kafka/pulls) for bug fixes and features and discuss existing proposals
2 changes: 1 addition & 1 deletion THIRD-PARTY-NOTICES.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Kafkanet uses third party material from the projects listed below. The original copyright notice and the license under which Microsoft received such third party material are set forth below. Microsoft reserves all other rights not expressly granted, whether by implication, estoppel or otherwise.

.NET Kafka Client
Copyright (c) salesforce.com, inc. (formerly Exact Target)
Copyright (c) salesforce.com, inc. (formerly ExactTarget)
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
See the Apache Version 2.0 License for specific language governing permissions and limitations under the License.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Expand Down
31 changes: 30 additions & 1 deletion src/Kafka.Client.Tests/CompressionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* limitations under the License.
*/

using System;

namespace Kafka.Client.Tests
{
using FluentAssertions;
Expand All @@ -34,7 +36,7 @@ public class CompressionTests
public void CompressAndDecompressMessageUsingSnappyCompressionCodec()
{
var messageBytes = new byte[] { 1, 2, 3, 4, 5 };
var message = new Message(messageBytes,CompressionCodecs.SnappyCompressionCodec);
var message = new Message(messageBytes, CompressionCodecs.SnappyCompressionCodec);
Message compressedMsg = CompressionUtils.Compress(new List<Message>() { message }, CompressionCodecs.SnappyCompressionCodec, 0);
var decompressed = CompressionUtils.Decompress(compressedMsg, 0);
int i = 0;
Expand Down Expand Up @@ -106,6 +108,33 @@ public void CompressAndDecompress3MessagesUsingDefaultCompressionCodec()
Assert.AreEqual(3, i);
}

[TestMethod]
[TestCategory(TestCategories.BVT)]
public void CompressV1MessagePutsV1MagicOnCompressedMessage()
{
byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5 };
Message message = new Message(DateTime.UtcNow, messageBytes, CompressionCodecs.DefaultCompressionCodec);
Message compressedMsg = CompressionUtils.Compress(new List<Message>() { message }, 0);
Assert.AreEqual(compressedMsg.Magic, 1);
}

[TestMethod]
[TestCategory(TestCategories.BVT)]
public void DecompressWhenWrapperMessageProvidesLogWriteTimestamp()
{
byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5 };
Message message1 = new Message(100L, TimestampTypes.CreateTime, messageBytes, CompressionCodecs.DefaultCompressionCodec);
Message compressedMsg = CompressionUtils.Compress(new List<Message>() { message1 }, CompressionCodecs.DefaultCompressionCodec, 0, 123L);

var decompressed = CompressionUtils.Decompress(compressedMsg, 0);

foreach (var decompressedMessage in decompressed.Messages)
{
decompressedMessage.TimestampType.ShouldBeEquivalentTo(TimestampTypes.LogAppendTime);
decompressedMessage.Timestamp.ShouldBeEquivalentTo(123L);
}
}

[TestMethod]
[TestCategory(TestCategories.BVT)]
public void CreateCompressedBufferedMessageSet()
Expand Down
27 changes: 13 additions & 14 deletions src/Kafka.Client.Tests/Kafka.Client.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,20 @@
<Prefer32Bit>false</Prefer32Bit>
</PropertyGroup>
<ItemGroup>
<Reference Include="FluentAssertions, Version=4.2.2.0, Culture=neutral, PublicKeyToken=33f2691a05b67b6a, processorArchitecture=MSIL">
<Reference Include="FluentAssertions">
<HintPath>..\packages\FluentAssertions.4.2.2\lib\net45\FluentAssertions.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="FluentAssertions.Core, Version=4.2.2.0, Culture=neutral, PublicKeyToken=33f2691a05b67b6a, processorArchitecture=MSIL">
<Reference Include="FluentAssertions.Core">
<HintPath>..\packages\FluentAssertions.4.2.2\lib\net45\FluentAssertions.Core.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="log4net, Version=1.2.13.0, Culture=neutral, PublicKeyToken=669e0ddf0bb1aa2a, processorArchitecture=MSIL">
<HintPath>..\packages\log4net.2.0.3\lib\net40-full\log4net.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework" />
<Reference Include="Moq, Version=4.2.1510.2205, Culture=neutral, PublicKeyToken=69f491c39445e920, processorArchitecture=MSIL">
<Reference Include="Moq">
<HintPath>..\packages\Moq.4.2.1510.2205\lib\net40\Moq.dll</HintPath>
<Private>True</Private>
</Reference>
Expand All @@ -92,6 +96,10 @@
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.XML" />
<Reference Include="System.Xml.Linq" />
<Reference Include="ZooKeeperNet, Version=3.4.6.1, Culture=neutral, PublicKeyToken=fefd2c046da35b56, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\lib\ZooKeeperNet.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="BrokerTest.cs" />
Expand Down Expand Up @@ -130,22 +138,13 @@
<Folder Include="ZooKeeper\" />
</ItemGroup>
<ItemGroup>
<None Include="App.config">
<SubType>Designer</SubType>
</None>
<None Include="App.config" />
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<Import Project="$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets" Condition="Exists('$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets')" />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!--<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
</Target>-->
<!--<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />-->
</Project>
63 changes: 51 additions & 12 deletions src/Kafka.Client.Tests/MessageSetTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,34 @@ public class MessageSetTests
{
[TestMethod]
[TestCategory(TestCategories.BVT)]
public void BufferedMessageSetWriteToValidSequence()
public void BufferedMessageSetWriteToValidSequenceForV0Message()
{
RunMessageSetWriteValidSequestTest(false);
}

[TestMethod]
[TestCategory(TestCategories.BVT)]
public void BufferedMessageSetWriteToValidSequenceForV1Message()
{
RunMessageSetWriteValidSequestTest(true);
}

private void RunMessageSetWriteValidSequestTest(bool useV1Message)
{
byte[] messageBytes = { 1, 2, 3, 4, 5 };
var msg1 = new Message(messageBytes) { Offset = 0 };
var msg2 = new Message(messageBytes);
Message msg1, msg2;

if (useV1Message)
{
msg1 = new Message(123L, TimestampTypes.CreateTime, messageBytes) {Offset = 0};
msg2 = new Message(123L, TimestampTypes.CreateTime, messageBytes);
}
else
{
msg1 = new Message(messageBytes) {Offset = 0};
msg2 = new Message(messageBytes);
}

msg2.Offset = 1;
MessageSet messageSet = new BufferedMessageSet(new List<Message>() { msg1, msg2 }, 0);
var ms = new MemoryStream();
Expand All @@ -48,10 +71,11 @@ public void BufferedMessageSetWriteToValidSequence()
{
reader.ReadInt64().Should().Be(i); // offset
var msgLength = reader.ReadInt32(); // length
msgLength.Should().Be(Message.DefaultHeaderSize + msg1.PayloadSize);
msgLength.Should().Be((useV1Message ? Message.V1HeaderSize : Message.V0HeaderSize) + msg1.PayloadSize);
reader.ReadUInt32().Should().Be(Crc32Hasher.ComputeCrcUint32(ms.GetBuffer(), baseOffset + 8 + 4 + 4, msgLength - 4));
reader.ReadByte().Should().Be(0); // magic
reader.ReadByte().Should().Be(useV1Message ? (byte)1 : (byte)0); // magic
reader.ReadByte().Should().Be(msg1.Attributes);
if (useV1Message) reader.ReadInt64().Should().Be(123L); // Timestamp
reader.ReadInt32().Should().Be(-1); // key length
reader.ReadInt32().Should().Be(messageBytes.Length); // message length
reader.ReadBytes(messageBytes.Length).SequenceEqual(messageBytes).Should().BeTrue();
Expand All @@ -63,13 +87,28 @@ public void BufferedMessageSetWriteToValidSequence()
[TestCategory(TestCategories.BVT)]
public void SetSizeValid()
{
byte[] messageBytes = new byte[] { 1, 2, 3, 4, 5 };
Message msg1 = new Message(messageBytes);
Message msg2 = new Message(messageBytes);
MessageSet messageSet = new BufferedMessageSet(new List<Message>() { msg1, msg2 }, 0);
Assert.AreEqual(
2 * (8 + 4 + Message.DefaultHeaderSize + messageBytes.Length),
messageSet.SetSize);
for (var i = 0; i < 2; i++)
{
var useV0Message = i == 1;
byte[] messageBytes = new byte[] {1, 2, 3, 4, 5};
Message msg1, msg2;

if (useV0Message)
{
msg1 = new Message(messageBytes);
msg2 = new Message(messageBytes);
}
else
{
msg1 = new Message(123L, TimestampTypes.CreateTime, messageBytes);
msg2 = new Message(123L, TimestampTypes.CreateTime, messageBytes);
}

MessageSet messageSet = new BufferedMessageSet(new List<Message>() {msg1, msg2}, 0);
Assert.AreEqual(
2 * (8 + 4 + (useV0Message ? Message.V0HeaderSize : Message.V1HeaderSize) + messageBytes.Length),
messageSet.SetSize);
}
}
}
}
Loading