Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ jobs:
- 9092:9092
env:
KNET_DOCKER_RUNNING_MODE: server
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_5 }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_6 }}

strategy:
fail-fast: false
Expand All @@ -423,7 +423,7 @@ jobs:
DOTNET_CreateDumpVerboseDiagnostics: ${{ inputs.EnableVerboseDiagnosticProcessDump }}
DOTNET_EnableCrashReport: ${{ inputs.EnableProcessDump }}
DOTNET_CreateDumpLogToFile: ${{ github.workspace }}/coredump.diagnostic
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_5 }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_6 }}

steps:
- name: Restore KNet ${{ matrix.framework }} bin from cache
Expand Down Expand Up @@ -513,7 +513,7 @@ jobs:
DOTNET_CreateDumpVerboseDiagnostics: ${{ inputs.EnableVerboseDiagnosticProcessDump }}
DOTNET_EnableCrashReport: ${{ inputs.EnableProcessDump }}
DOTNET_CreateDumpLogToFile: ${{ github.workspace }}/coredump.diagnostic
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_5 }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_6 }}

steps:
- name: Restore KNet ${{ matrix.framework }} bin from cache
Expand Down Expand Up @@ -627,7 +627,7 @@ jobs:
DOTNET_CreateDumpVerboseDiagnostics: ${{ inputs.EnableVerboseDiagnosticProcessDump }}
DOTNET_EnableCrashReport: ${{ inputs.EnableProcessDump }}
DOTNET_CreateDumpLogToFile: ${{ github.workspace }}/coredump.diagnostic
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_5 }}
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_6 }}

steps:
- name: Restore KNet ${{ matrix.framework }} bin from cache
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
name: "CodeQL Manual"

on:
push:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/generateclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ jobs:
DOTNET_CreateDumpDiagnostics: 1
DOTNET_CreateDumpVerboseDiagnostics: 1
DOTNET_EnableCrashReport: 1
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_5 }}
JNet_Version: 2.6.5
JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_6 }}
JNet_Version: 2.6.6

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
Expand Down
2 changes: 1 addition & 1 deletion src/copyright.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2025 MASES s.r.l.
* Copyright (c) 2021-2026 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion src/jvm/knet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<maven.compiler.target>${java.version}</maven.compiler.target>
<classpathfile>${basedir}/classpathfile.classpath</classpathfile>
<mavenSkipMain>false</mavenSkipMain> <!-- set mavenSkipMain to true to avoid source compilation: use directive in command-line -->
<jnetVersion>2.6.5.0</jnetVersion>
<jnetVersion>2.6.6.0</jnetVersion>
<kafkaVersion>3.9.1</kafkaVersion>
<knetVersion>2.9.6.0</knetVersion>
<rootDir>${basedir}/../../../</rootDir>
Expand Down
51 changes: 39 additions & 12 deletions src/net/KNet/Developed/Org/Apache/Kafka/Tools/StreamsResetter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,46 @@ public static bool ResetApplication(string bootstrapserver, string applicationId
return ResetApplication(false, bootstrapserver, applicationId, inputTopics);
}

/// <summary>
/// Resets an <paramref name="applicationId"/> of Apache Kafka Streams and forces deletion of active members from the group
/// </summary>
/// <param name="bootstrapserver">The bootstrap server of the Apache Kafka cluster</param>
/// <param name="applicationId">The application id to be resetted</param>
/// <param name="inputTopics">Input topics to be resetted</param>
/// <returns><see langword="true"/> if everything goes well, otherwise <see langword="false"/></returns>
/// <exception cref="ArgumentNullException">Either <paramref name="applicationId"/> or <paramref name="bootstrapserver"/> are <see langword="null"/></exception>
public static bool ResetApplicationForced(string bootstrapserver, string applicationId, params string[] inputTopics)
/// <summary>
/// Resets an <paramref name="applicationId"/> of Apache Kafka Streams
/// </summary>
/// <param name="bootstrapserver">The bootstrap server of the Apache Kafka cluster</param>
/// <param name="applicationId">The application id to be resetted</param>
/// <param name="inputTopics">Input topics to be resetted</param>
/// <returns><see langword="true"/> if everything goes well, otherwise <see langword="false"/></returns>
/// <exception cref="ArgumentNullException">Either <paramref name="applicationId"/> or <paramref name="bootstrapserver"/> are <see langword="null"/></exception>
public static bool ResetApplication(string bootstrapserver, string applicationId, IEnumerable<string> inputTopics)
{
return ResetApplication(false, bootstrapserver, applicationId, inputTopics);
}

/// <summary>
/// Resets an <paramref name="applicationId"/> of Apache Kafka Streams and forces deletion of active members from the group
/// </summary>
/// <param name="bootstrapserver">The bootstrap server of the Apache Kafka cluster</param>
/// <param name="applicationId">The application id to be resetted</param>
/// <param name="inputTopics">Input topics to be resetted</param>
/// <returns><see langword="true"/> if everything goes well, otherwise <see langword="false"/></returns>
/// <exception cref="ArgumentNullException">Either <paramref name="applicationId"/> or <paramref name="bootstrapserver"/> are <see langword="null"/></exception>
public static bool ResetApplicationForced(string bootstrapserver, string applicationId, params string[] inputTopics)
{
return ResetApplication(true, bootstrapserver, applicationId, inputTopics);
}

static bool ResetApplication(bool force, string bootstrapserver, string applicationId, string[] inputTopics)
/// <summary>
/// Resets an <paramref name="applicationId"/> of Apache Kafka Streams and forces deletion of active members from the group
/// </summary>
/// <param name="bootstrapserver">The bootstrap server of the Apache Kafka cluster</param>
/// <param name="applicationId">The application id to be resetted</param>
/// <param name="inputTopics">Input topics to be resetted</param>
/// <returns><see langword="true"/> if everything goes well, otherwise <see langword="false"/></returns>
/// <exception cref="ArgumentNullException">Either <paramref name="applicationId"/> or <paramref name="bootstrapserver"/> are <see langword="null"/></exception>
public static bool ResetApplicationForced(string bootstrapserver, string applicationId, IEnumerable<string> inputTopics)
{
return ResetApplication(true, bootstrapserver, applicationId, inputTopics);
}

static bool ResetApplication(bool force, string bootstrapserver, string applicationId, IEnumerable<string> inputTopics)
{
if (bootstrapserver == null) throw new ArgumentNullException(nameof(bootstrapserver));
if (applicationId == null) throw new ArgumentNullException(nameof(applicationId));
Expand All @@ -61,10 +87,11 @@ static bool ResetApplication(bool force, string bootstrapserver, string applicat
strings.Add(bootstrapserver);
strings.Add("--application-id");
strings.Add(applicationId);
if (inputTopics != null && inputTopics.Length != 0)
string topics = inputTopics != null ? string.Join(",", inputTopics) : string.Empty;
if (!string.IsNullOrEmpty(topics))
{
strings.Add("--input-topics");
strings.Add(string.Join(",", inputTopics));
strings.Add(topics);
}
if (force) strings.Add("--force");

Expand Down
2 changes: 1 addition & 1 deletion src/net/KNet/KNet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
<None Include="..\..\documentation\articles\usage.md" Pack="true" PackagePath="\" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="MASES.JNet" Version="2.6.5">
<PackageReference Include="MASES.JNet" Version="2.6.6">
<IncludeAssets>All</IncludeAssets>
<PrivateAssets>None</PrivateAssets>
</PackageReference>
Expand Down
24 changes: 12 additions & 12 deletions src/net/KNet/Specific/Connect/KNetConnectProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public bool AllocateSinkConnector(string connectorClassName)
}
catch
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Unable to find {connectorClassName}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Unable to find {connectorClassName}");
return false;
}

Expand All @@ -124,7 +124,7 @@ public bool AllocateSinkConnector(string connectorClassName)
}
catch (Exception ex)
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to create an instance of {connectorClassName}: {ex}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Failed to create an instance of {connectorClassName}: {ex}");
return false;
}

Expand All @@ -135,7 +135,7 @@ public bool AllocateSinkConnector(string connectorClassName)
}
catch (Exception ex)
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to register the instance of {connectorClassName}: {ex}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Failed to register the instance of {connectorClassName}: {ex}");
return false;
}
}
Expand All @@ -156,7 +156,7 @@ public bool AllocateSourceConnector(string connectorClassName)
}
catch
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Unable to find {connectorClassName}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Unable to find {connectorClassName}");
return false;
}

Expand All @@ -168,7 +168,7 @@ public bool AllocateSourceConnector(string connectorClassName)
}
catch (Exception ex)
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to create an instance of {connectorClassName}: {ex}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Failed to create an instance of {connectorClassName}: {ex}");
return false;
}

Expand All @@ -179,7 +179,7 @@ public bool AllocateSourceConnector(string connectorClassName)
}
catch (Exception ex)
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to register the instance of {connectorClassName}: {ex}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Failed to register the instance of {connectorClassName}: {ex}");
return false;
}
}
Expand All @@ -200,7 +200,7 @@ public bool AllocateConnector(string connectorClassName, string uniqueId)
}
catch
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Unable to find {connectorClassName}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Unable to find {connectorClassName}");
return false;
}

Expand All @@ -214,7 +214,7 @@ public bool AllocateConnector(string connectorClassName, string uniqueId)
}
catch (Exception ex)
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to create an instance of {connectorClassName}: {ex}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Failed to create an instance of {connectorClassName}: {ex}");
return false;
}

Expand All @@ -236,7 +236,7 @@ public bool AllocateTransformation(string transformClassName, string uniqueId)
}
catch
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Unable to find {transformClassName}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Unable to find {transformClassName}");
return false;
}

Expand All @@ -250,7 +250,7 @@ public bool AllocateTransformation(string transformClassName, string uniqueId)
}
catch (Exception ex)
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to create an instance of {transformClassName}: {ex}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Failed to create an instance of {transformClassName}: {ex}");
return false;
}

Expand All @@ -272,7 +272,7 @@ public bool AllocatePredicate(string predicateClassName, string uniqueId)
}
catch
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Unable to find {predicateClassName}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Unable to find {predicateClassName}");
return false;
}

Expand All @@ -286,7 +286,7 @@ public bool AllocatePredicate(string predicateClassName, string uniqueId)
}
catch (Exception ex)
{
Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to create an instance of {predicateClassName}: {ex}");
JVMBridgeException.Throw<Org.Apache.Kafka.Common.Config.ConfigException>($"Failed to create an instance of {predicateClassName}: {ex}");
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,21 @@ public virtual ConnectRecord Apply(ConnectRecord record)
{
return Apply(record.CastTo<SinkRecord>());
}
else ConnectException.ThrowNew($"Cannot manage directly the input, override the method {nameof(Apply)} with generic {nameof(ConnectRecord)} parameter.");
else JVMBridgeException.Throw<ConnectException>($"Cannot manage directly the input, override the method {nameof(Apply)} with generic {nameof(ConnectRecord)} parameter.");
return null;
}

/// <inheritdoc cref="IKNetTransformation.Apply(SourceRecord)"/>
public virtual SourceRecord Apply(SourceRecord record)
{
ConnectException.ThrowNew($"Not implemented for {nameof(SourceRecord)}");
JVMBridgeException.Throw<ConnectException>($"Not implemented for {nameof(SourceRecord)}");
return null;
}

/// <inheritdoc cref="IKNetTransformation.Apply(SinkRecord)"/>
public virtual SinkRecord Apply(SinkRecord record)
{
ConnectException.ThrowNew($"Not implemented for {nameof(SinkRecord)}");
JVMBridgeException.Throw<ConnectException>($"Not implemented for {nameof(SinkRecord)}");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ public virtual bool Test(ConnectRecord record)
{
return Test(record.CastTo<SinkRecord>());
}
else ConnectException.ThrowNew($"Cannot manage directly the input, override the method {nameof(Test)} with generic {nameof(ConnectRecord)} parameter.");
else JVMBridgeException.Throw<ConnectException>($"Cannot manage directly the input, override the method {nameof(Test)} with generic {nameof(ConnectRecord)} parameter.");
return false;
}

/// <inheritdoc cref="IKNetPredicate.Test(SourceRecord)"/>
public virtual bool Test(SourceRecord record)
{
ConnectException.ThrowNew($"Not implemented for {nameof(SourceRecord)}");
JVMBridgeException.Throw<ConnectException>($"Not implemented for {nameof(SourceRecord)}");
return false;
}

/// <inheritdoc cref="IKNetPredicate.Test(SinkRecord)"/>
public virtual bool Test(SinkRecord record)
{
ConnectException.ThrowNew($"Not implemented for {nameof(SinkRecord)}");
JVMBridgeException.Throw<ConnectException>($"Not implemented for {nameof(SinkRecord)}");
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion src/net/KNet/Specific/Consumer/ConsumerRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ internal ConsumerRecord(Org.Apache.Kafka.Clients.Consumer.ConsumerRecord<TJVMK,
/// <inheritdoc cref="Org.Apache.Kafka.Clients.Consumer.ConsumerRecord{K, V}.Topic"/>
public string Topic { get { _topic ??= _record.Topic(); return _topic; } }
/// <inheritdoc cref="Org.Apache.Kafka.Clients.Consumer.ConsumerRecord{K, V}.LeaderEpoch"/>
public int? LeaderEpoch { get { var epoch = _record.LeaderEpoch(); return epoch.IsEmpty() ? null : epoch.Get(); } }
public int? LeaderEpoch { get { var epoch = _record.LeaderEpoch(); return epoch.IsPresent() ? epoch.Get() : null; } }
int? _partition = null;
/// <inheritdoc cref="Org.Apache.Kafka.Clients.Consumer.ConsumerRecord{K, V}.Partition"/>
public int Partition => _partition ??= _record.Partition();
Expand Down
8 changes: 4 additions & 4 deletions src/net/KNet/Specific/Serialization/KNetSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ public static int DeserializeInt(bool fallbackToKafka, string topic, byte[] data
//}
//else if (data.Length != 4)
//{
// JVMBridgeException<SerializationException>.ThrowNew("Size of data received by DeserializeInt is not 4");
// JVMBridgeException.Throw<SerializationException>("Size of data received by DeserializeInt is not 4");
// throw new SerializationException();
//}
//else
Expand Down Expand Up @@ -544,7 +544,7 @@ public static long DeserializeLong(bool fallbackToKafka, string topic, byte[] da
//}
//else if (data.Length != 8)
//{
// JVMBridgeException<SerializationException>.ThrowNew("Size of data received by DeserializeLong is not 8");
// JVMBridgeException.Throw<SerializationException>("Size of data received by DeserializeLong is not 8");
// throw new SerializationException();
//}
//else
Expand Down Expand Up @@ -588,7 +588,7 @@ public static short DeserializeShort(bool fallbackToKafka, string topic, byte[]
//}
//else if (data.Length != 2)
//{
// JVMBridgeException<SerializationException>.ThrowNew("Size of data received by DeserializeShort is not 2");
// JVMBridgeException.Throw<SerializationException>("Size of data received by DeserializeShort is not 2");
// throw new SerializationException();
//}
//else
Expand Down Expand Up @@ -630,7 +630,7 @@ public static Java.Lang.Void DeserializeVoid(bool fallbackToKafka, string topic,
{
if (data != null || data.Length != 0)
{
JVMBridgeException<Java.Lang.IllegalArgumentException>.ThrowNew("Data should be null for a VoidDeserializer.");
JVMBridgeException.Throw< Java.Lang.IllegalArgumentException>("Data should be null for a VoidDeserializer.");
throw new Java.Lang.IllegalArgumentException();
}
else
Expand Down
Loading