diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 4650854b23..2630c84f7a 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -399,7 +399,7 @@ jobs:
- 9092:9092
env:
KNET_DOCKER_RUNNING_MODE: server
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_5 }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_6 }}
strategy:
fail-fast: false
@@ -423,7 +423,7 @@ jobs:
DOTNET_CreateDumpVerboseDiagnostics: ${{ inputs.EnableVerboseDiagnosticProcessDump }}
DOTNET_EnableCrashReport: ${{ inputs.EnableProcessDump }}
DOTNET_CreateDumpLogToFile: ${{ github.workspace }}/coredump.diagnostic
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_5 }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_6 }}
steps:
- name: Restore KNet ${{ matrix.framework }} bin from cache
@@ -513,7 +513,7 @@ jobs:
DOTNET_CreateDumpVerboseDiagnostics: ${{ inputs.EnableVerboseDiagnosticProcessDump }}
DOTNET_EnableCrashReport: ${{ inputs.EnableProcessDump }}
DOTNET_CreateDumpLogToFile: ${{ github.workspace }}/coredump.diagnostic
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_5 }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_6 }}
steps:
- name: Restore KNet ${{ matrix.framework }} bin from cache
@@ -627,7 +627,7 @@ jobs:
DOTNET_CreateDumpVerboseDiagnostics: ${{ inputs.EnableVerboseDiagnosticProcessDump }}
DOTNET_EnableCrashReport: ${{ inputs.EnableProcessDump }}
DOTNET_CreateDumpLogToFile: ${{ github.workspace }}/coredump.diagnostic
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_5 }}
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_6 }}
steps:
- name: Restore KNet ${{ matrix.framework }} bin from cache
diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml
index 0d325e94a9..2e394697e8 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -9,7 +9,7 @@
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
-name: "CodeQL"
+name: "CodeQL Manual"
on:
push:
diff --git a/.github/workflows/generateclasses.yaml b/.github/workflows/generateclasses.yaml
index b7f4556db0..e3712495e2 100644
--- a/.github/workflows/generateclasses.yaml
+++ b/.github/workflows/generateclasses.yaml
@@ -40,8 +40,8 @@ jobs:
DOTNET_CreateDumpDiagnostics: 1
DOTNET_CreateDumpVerboseDiagnostics: 1
DOTNET_EnableCrashReport: 1
- JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_5 }}
- JNet_Version: 2.6.5
+ JCOBRIDGE_LicensePath: ${{ secrets.JCOBRIDGE_ENCODED_2_6_6 }}
+ JNet_Version: 2.6.6
# Steps represent a sequence of tasks that will be executed as part of the job
steps:
diff --git a/src/copyright.txt b/src/copyright.txt
index 3d20b387f4..6fb0f75acb 100644
--- a/src/copyright.txt
+++ b/src/copyright.txt
@@ -1,5 +1,5 @@
/*
-* Copyright (c) 2021-2025 MASES s.r.l.
+* Copyright (c) 2021-2026 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/src/jvm/knet/pom.xml b/src/jvm/knet/pom.xml
index a40d372a7b..018ec53d8e 100644
--- a/src/jvm/knet/pom.xml
+++ b/src/jvm/knet/pom.xml
@@ -43,7 +43,7 @@
${java.version}
${basedir}/classpathfile.classpath
false
- 2.6.5.0
+ 2.6.6.0
3.9.1
2.9.6.0
${basedir}/../../../
diff --git a/src/net/KNet/Developed/Org/Apache/Kafka/Tools/StreamsResetter.cs b/src/net/KNet/Developed/Org/Apache/Kafka/Tools/StreamsResetter.cs
index 95d31aefff..32fcd86aa4 100644
--- a/src/net/KNet/Developed/Org/Apache/Kafka/Tools/StreamsResetter.cs
+++ b/src/net/KNet/Developed/Org/Apache/Kafka/Tools/StreamsResetter.cs
@@ -38,20 +38,46 @@ public static bool ResetApplication(string bootstrapserver, string applicationId
return ResetApplication(false, bootstrapserver, applicationId, inputTopics);
}
- ///
- /// Resets an of Apache Kafka Streams and forces deletion of active members from the group
- ///
- /// The bootstrap server of the Apache Kafka cluster
- /// The application id to be resetted
- /// Input topics to be resetted
- /// if everything goes well, otherwise
- /// Either or are
- public static bool ResetApplicationForced(string bootstrapserver, string applicationId, params string[] inputTopics)
+ ///
+ /// Resets an of Apache Kafka Streams
+ ///
+ /// The bootstrap server of the Apache Kafka cluster
+ /// The application id to be resetted
+ /// Input topics to be resetted
+ /// if everything goes well, otherwise
+ /// Either or are
+ public static bool ResetApplication(string bootstrapserver, string applicationId, IEnumerable inputTopics)
+ {
+ return ResetApplication(false, bootstrapserver, applicationId, inputTopics);
+ }
+
+ ///
+ /// Resets an of Apache Kafka Streams and forces deletion of active members from the group
+ ///
+ /// The bootstrap server of the Apache Kafka cluster
+ /// The application id to be resetted
+ /// Input topics to be resetted
+ /// if everything goes well, otherwise
+ /// Either or are
+ public static bool ResetApplicationForced(string bootstrapserver, string applicationId, params string[] inputTopics)
{
return ResetApplication(true, bootstrapserver, applicationId, inputTopics);
}
- static bool ResetApplication(bool force, string bootstrapserver, string applicationId, string[] inputTopics)
+ ///
+ /// Resets an of Apache Kafka Streams and forces deletion of active members from the group
+ ///
+ /// The bootstrap server of the Apache Kafka cluster
+ /// The application id to be resetted
+ /// Input topics to be resetted
+ /// if everything goes well, otherwise
+ /// Either or are
+ public static bool ResetApplicationForced(string bootstrapserver, string applicationId, IEnumerable inputTopics)
+ {
+ return ResetApplication(true, bootstrapserver, applicationId, inputTopics);
+ }
+
+ static bool ResetApplication(bool force, string bootstrapserver, string applicationId, IEnumerable inputTopics)
{
if (bootstrapserver == null) throw new ArgumentNullException(nameof(bootstrapserver));
if (applicationId == null) throw new ArgumentNullException(nameof(applicationId));
@@ -61,10 +87,11 @@ static bool ResetApplication(bool force, string bootstrapserver, string applicat
strings.Add(bootstrapserver);
strings.Add("--application-id");
strings.Add(applicationId);
- if (inputTopics != null && inputTopics.Length != 0)
+ string topics = inputTopics != null ? string.Join(",", inputTopics) : string.Empty;
+ if (!string.IsNullOrEmpty(topics))
{
strings.Add("--input-topics");
- strings.Add(string.Join(",", inputTopics));
+ strings.Add(topics);
}
if (force) strings.Add("--force");
diff --git a/src/net/KNet/KNet.csproj b/src/net/KNet/KNet.csproj
index b5d394785f..88c1b2cdcc 100644
--- a/src/net/KNet/KNet.csproj
+++ b/src/net/KNet/KNet.csproj
@@ -57,7 +57,7 @@
-
+
All
None
diff --git a/src/net/KNet/Specific/Connect/KNetConnectProxy.cs b/src/net/KNet/Specific/Connect/KNetConnectProxy.cs
index 6fab692689..4d3c31cd8e 100644
--- a/src/net/KNet/Specific/Connect/KNetConnectProxy.cs
+++ b/src/net/KNet/Specific/Connect/KNetConnectProxy.cs
@@ -112,7 +112,7 @@ public bool AllocateSinkConnector(string connectorClassName)
}
catch
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Unable to find {connectorClassName}");
+ JVMBridgeException.Throw($"Unable to find {connectorClassName}");
return false;
}
@@ -124,7 +124,7 @@ public bool AllocateSinkConnector(string connectorClassName)
}
catch (Exception ex)
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to create an instance of {connectorClassName}: {ex}");
+ JVMBridgeException.Throw($"Failed to create an instance of {connectorClassName}: {ex}");
return false;
}
@@ -135,7 +135,7 @@ public bool AllocateSinkConnector(string connectorClassName)
}
catch (Exception ex)
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to register the instance of {connectorClassName}: {ex}");
+ JVMBridgeException.Throw($"Failed to register the instance of {connectorClassName}: {ex}");
return false;
}
}
@@ -156,7 +156,7 @@ public bool AllocateSourceConnector(string connectorClassName)
}
catch
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Unable to find {connectorClassName}");
+ JVMBridgeException.Throw($"Unable to find {connectorClassName}");
return false;
}
@@ -168,7 +168,7 @@ public bool AllocateSourceConnector(string connectorClassName)
}
catch (Exception ex)
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to create an instance of {connectorClassName}: {ex}");
+ JVMBridgeException.Throw($"Failed to create an instance of {connectorClassName}: {ex}");
return false;
}
@@ -179,7 +179,7 @@ public bool AllocateSourceConnector(string connectorClassName)
}
catch (Exception ex)
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to register the instance of {connectorClassName}: {ex}");
+ JVMBridgeException.Throw($"Failed to register the instance of {connectorClassName}: {ex}");
return false;
}
}
@@ -200,7 +200,7 @@ public bool AllocateConnector(string connectorClassName, string uniqueId)
}
catch
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Unable to find {connectorClassName}");
+ JVMBridgeException.Throw($"Unable to find {connectorClassName}");
return false;
}
@@ -214,7 +214,7 @@ public bool AllocateConnector(string connectorClassName, string uniqueId)
}
catch (Exception ex)
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to create an instance of {connectorClassName}: {ex}");
+ JVMBridgeException.Throw($"Failed to create an instance of {connectorClassName}: {ex}");
return false;
}
@@ -236,7 +236,7 @@ public bool AllocateTransformation(string transformClassName, string uniqueId)
}
catch
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Unable to find {transformClassName}");
+ JVMBridgeException.Throw($"Unable to find {transformClassName}");
return false;
}
@@ -250,7 +250,7 @@ public bool AllocateTransformation(string transformClassName, string uniqueId)
}
catch (Exception ex)
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to create an instance of {transformClassName}: {ex}");
+ JVMBridgeException.Throw($"Failed to create an instance of {transformClassName}: {ex}");
return false;
}
@@ -272,7 +272,7 @@ public bool AllocatePredicate(string predicateClassName, string uniqueId)
}
catch
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Unable to find {predicateClassName}");
+ JVMBridgeException.Throw($"Unable to find {predicateClassName}");
return false;
}
@@ -286,7 +286,7 @@ public bool AllocatePredicate(string predicateClassName, string uniqueId)
}
catch (Exception ex)
{
- Org.Apache.Kafka.Common.Config.ConfigException.ThrowNew($"Failed to create an instance of {predicateClassName}: {ex}");
+ JVMBridgeException.Throw($"Failed to create an instance of {predicateClassName}: {ex}");
return false;
}
diff --git a/src/net/KNet/Specific/Connect/Transforms/KNetTransformation.cs b/src/net/KNet/Specific/Connect/Transforms/KNetTransformation.cs
index c8f43bce9e..768ec61c0a 100644
--- a/src/net/KNet/Specific/Connect/Transforms/KNetTransformation.cs
+++ b/src/net/KNet/Specific/Connect/Transforms/KNetTransformation.cs
@@ -119,21 +119,21 @@ public virtual ConnectRecord Apply(ConnectRecord record)
{
return Apply(record.CastTo());
}
- else ConnectException.ThrowNew($"Cannot manage directly the input, override the method {nameof(Apply)} with generic {nameof(ConnectRecord)} parameter.");
+ else JVMBridgeException.Throw($"Cannot manage directly the input, override the method {nameof(Apply)} with generic {nameof(ConnectRecord)} parameter.");
return null;
}
///
public virtual SourceRecord Apply(SourceRecord record)
{
- ConnectException.ThrowNew($"Not implemented for {nameof(SourceRecord)}");
+ JVMBridgeException.Throw($"Not implemented for {nameof(SourceRecord)}");
return null;
}
///
public virtual SinkRecord Apply(SinkRecord record)
{
- ConnectException.ThrowNew($"Not implemented for {nameof(SinkRecord)}");
+ JVMBridgeException.Throw($"Not implemented for {nameof(SinkRecord)}");
return null;
}
diff --git a/src/net/KNet/Specific/Connect/Transforms/Predicates/KNetPredicate.cs b/src/net/KNet/Specific/Connect/Transforms/Predicates/KNetPredicate.cs
index 6117dcd8e3..34f2fea6ce 100644
--- a/src/net/KNet/Specific/Connect/Transforms/Predicates/KNetPredicate.cs
+++ b/src/net/KNet/Specific/Connect/Transforms/Predicates/KNetPredicate.cs
@@ -121,21 +121,21 @@ public virtual bool Test(ConnectRecord record)
{
return Test(record.CastTo());
}
- else ConnectException.ThrowNew($"Cannot manage directly the input, override the method {nameof(Test)} with generic {nameof(ConnectRecord)} parameter.");
+ else JVMBridgeException.Throw($"Cannot manage directly the input, override the method {nameof(Test)} with generic {nameof(ConnectRecord)} parameter.");
return false;
}
///
public virtual bool Test(SourceRecord record)
{
- ConnectException.ThrowNew($"Not implemented for {nameof(SourceRecord)}");
+ JVMBridgeException.Throw($"Not implemented for {nameof(SourceRecord)}");
return false;
}
///
public virtual bool Test(SinkRecord record)
{
- ConnectException.ThrowNew($"Not implemented for {nameof(SinkRecord)}");
+ JVMBridgeException.Throw($"Not implemented for {nameof(SinkRecord)}");
return false;
}
diff --git a/src/net/KNet/Specific/Consumer/ConsumerRecord.cs b/src/net/KNet/Specific/Consumer/ConsumerRecord.cs
index af15951c42..d58dd67df3 100644
--- a/src/net/KNet/Specific/Consumer/ConsumerRecord.cs
+++ b/src/net/KNet/Specific/Consumer/ConsumerRecord.cs
@@ -68,7 +68,7 @@ internal ConsumerRecord(Org.Apache.Kafka.Clients.Consumer.ConsumerRecord
public string Topic { get { _topic ??= _record.Topic(); return _topic; } }
///
- public int? LeaderEpoch { get { var epoch = _record.LeaderEpoch(); return epoch.IsEmpty() ? null : epoch.Get(); } }
+ public int? LeaderEpoch { get { var epoch = _record.LeaderEpoch(); return epoch.IsPresent() ? epoch.Get() : null; } }
int? _partition = null;
///
public int Partition => _partition ??= _record.Partition();
diff --git a/src/net/KNet/Specific/Serialization/KNetSerialization.cs b/src/net/KNet/Specific/Serialization/KNetSerialization.cs
index ee9095ac3c..8545031403 100644
--- a/src/net/KNet/Specific/Serialization/KNetSerialization.cs
+++ b/src/net/KNet/Specific/Serialization/KNetSerialization.cs
@@ -500,7 +500,7 @@ public static int DeserializeInt(bool fallbackToKafka, string topic, byte[] data
//}
//else if (data.Length != 4)
//{
- // JVMBridgeException.ThrowNew("Size of data received by DeserializeInt is not 4");
+ // JVMBridgeException.Throw("Size of data received by DeserializeInt is not 4");
// throw new SerializationException();
//}
//else
@@ -544,7 +544,7 @@ public static long DeserializeLong(bool fallbackToKafka, string topic, byte[] da
//}
//else if (data.Length != 8)
//{
- // JVMBridgeException.ThrowNew("Size of data received by DeserializeLong is not 8");
+ // JVMBridgeException.Throw("Size of data received by DeserializeLong is not 8");
// throw new SerializationException();
//}
//else
@@ -588,7 +588,7 @@ public static short DeserializeShort(bool fallbackToKafka, string topic, byte[]
//}
//else if (data.Length != 2)
//{
- // JVMBridgeException.ThrowNew("Size of data received by DeserializeShort is not 2");
+ // JVMBridgeException.Throw("Size of data received by DeserializeShort is not 2");
// throw new SerializationException();
//}
//else
@@ -630,7 +630,7 @@ public static Java.Lang.Void DeserializeVoid(bool fallbackToKafka, string topic,
{
if (data != null || data.Length != 0)
{
- JVMBridgeException.ThrowNew("Data should be null for a VoidDeserializer.");
+ JVMBridgeException.Throw< Java.Lang.IllegalArgumentException>("Data should be null for a VoidDeserializer.");
throw new Java.Lang.IllegalArgumentException();
}
else