diff --git a/CHANGELOG.md b/CHANGELOG.md index 0dd3674c..4b6d5799 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file. ### Changed - Refactor: move server configuration properties from the command line to configuration files. ([#911]). +- Add support for ZooKeeper to KRaft migration ([#923]). - Bump testing-tools to `0.3.0-stackable0.0.0-dev` ([#925]). ### Removed @@ -26,6 +27,7 @@ All notable changes to this project will be documented in this file. [#911]: https://github.com/stackabletech/kafka-operator/pull/911 [#914]: https://github.com/stackabletech/kafka-operator/pull/914 [#915]: https://github.com/stackabletech/kafka-operator/pull/915 +[#923]: https://github.com/stackabletech/kafka-operator/pull/923 [#925]: https://github.com/stackabletech/kafka-operator/pull/925 [#927]: https://github.com/stackabletech/kafka-operator/pull/927 [#928]: https://github.com/stackabletech/kafka-operator/pull/928 diff --git a/deploy/helm/kafka-operator/crds/crds.yaml b/deploy/helm/kafka-operator/crds/crds.yaml index 0e975901..d98f1ccb 100644 --- a/deploy/helm/kafka-operator/crds/crds.yaml +++ b/deploy/helm/kafka-operator/crds/crds.yaml @@ -787,6 +787,62 @@ spec: - configMapName type: object type: object + brokerIdPodConfigMapName: + description: |- + Enable users to manually assign Kafka broker ids. + + Name of a ConfigMap containing a mapping of broker IDs to pod names. + The ConfigMap must contain a key for every broker pod in the cluster with the following format: + `: ` + + Example: + ``` + --- + apiVersion: v1 + kind: ConfigMap + metadata: + name: brokeridmapping + data: + simple-kafka-broker-default-0: "2000" + simple-kafka-broker-default-1: "2001" + simple-kafka-broker-default-2: "2002" + ``` + This is necessary when migrating from ZooKeeper to Kraft mode to retain existing broker IDs + because previously broker ids were generated by Kafka and not the operator. + nullable: true + type: string + metadataManager: + description: |- + Metadata manager to use for the Kafka cluster. + + IMPORTANT: This property will be removed as soon as Kafka 3.x support is dropped. + + Possible values are `zookeeper` and `kraft`. + + If not set, defaults to: + + - `zookeeper` for Kafka versions below `4.0.0`. + - `kraft` for Kafka versions `4.0.0` and higher. + + Using `zookeeper` for Kafka versions `4.0.0` and higher is not supported. + + When set to `kraft`, the operator will perform the following actions: + + * Generate the Kafka cluster id. + * Assign broker roles and configure controller quorum voters in the `broker.properties` files. + * Format storage before (re)starting Kafka brokers. + * Remove ZooKeeper related configuration options from the `broker.properties` files. + + These actions are **mandatory** when in Kraft mode and partially exclusive to the ZooKeeper mode. + This means they **cannot** be performed in ZooKeeper mode. + + This property is also useful when migrating from ZooKeeper to Kraft mode because it permits the operator + to reconcile controllers while still using ZooKeeper for brokers. + enum: + - zookeeper + - kraft + nullable: true + type: string tls: default: internalSecretClass: tls @@ -830,7 +886,7 @@ spec: Provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. - This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper suppport was dropped. + This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper support was dropped. Please use the 'controller' role instead. nullable: true type: string diff --git a/docs/modules/kafka/examples/kraft_migration/01-setup.yaml b/docs/modules/kafka/examples/kraft_migration/01-setup.yaml new file mode 100644 index 00000000..bb765307 --- /dev/null +++ b/docs/modules/kafka/examples/kraft_migration/01-setup.yaml @@ -0,0 +1,98 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + stackable.tech/vendor: Stackable + name: kraft-migration +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: simple-zk + namespace: kraft-migration +spec: + image: + productVersion: 3.9.4 + pullPolicy: IfNotPresent + servers: + roleGroups: + default: + replicas: 1 +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperZnode +metadata: + name: simple-kafka-znode + namespace: kraft-migration +spec: + clusterRef: + name: simple-zk +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: kafka-internal-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-kafka-internal-tls-ca + namespace: kraft-migration + autoGenerate: true +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: kafka-client-auth-tls +spec: + provider: + tls: + clientCertSecretClass: kafka-client-auth-secret +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: kafka-client-auth-secret +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-kafka-client-ca + namespace: kraft-migration + autoGenerate: true +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: broker-ids + namespace: kraft-migration +data: + simple-kafka-broker-default-0: "2000" + simple-kafka-broker-default-1: "2001" + simple-kafka-broker-default-2: "2002" +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: zookeeper + brokerIdPodConfigMapName: broker-ids + authentication: + - authenticationClass: kafka-client-auth-tls + tls: + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokers: + roleGroups: + default: + replicas: 3 diff --git a/docs/modules/kafka/examples/kraft_migration/02-start-controllers.yaml b/docs/modules/kafka/examples/kraft_migration/02-start-controllers.yaml new file mode 100644 index 00000000..8b1cb1e8 --- /dev/null +++ b/docs/modules/kafka/examples/kraft_migration/02-start-controllers.yaml @@ -0,0 +1,34 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: zookeeper + authentication: + - authenticationClass: kafka-client-auth-tls + tls: + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokerIdPodConfigMapName: broker-ids + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "aC1zl524Svm_uIjcvUGWSw" + roleGroups: + default: + replicas: 3 + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "aC1zl524Svm_uIjcvUGWSw" + configOverrides: + controller.properties: + zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper. diff --git a/docs/modules/kafka/examples/kraft_migration/03-migrate-metadata.yaml b/docs/modules/kafka/examples/kraft_migration/03-migrate-metadata.yaml new file mode 100644 index 00000000..ee065951 --- /dev/null +++ b/docs/modules/kafka/examples/kraft_migration/03-migrate-metadata.yaml @@ -0,0 +1,40 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: zookeeper + authentication: + - authenticationClass: kafka-client-auth-tls + tls: + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokerIdPodConfigMapName: broker-ids + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "aC1zl524Svm_uIjcvUGWSw" + roleGroups: + default: + replicas: 3 + configOverrides: + broker.properties: + inter.broker.protocol.version: "3.9" # - Latest value known to Kafka 3.9.1 + zookeeper.metadata.migration.enable: "true" # - Enable migration mode so the broker can participate in metadata migration. + controller.listener.names: "CONTROLLER" + controller.quorum.voters: "2110489703@simple-kafka-controller-default-0.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093,2110489704@simple-kafka-controller-default-1.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093,2110489705@simple-kafka-controller-default-2.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093" + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "aC1zl524Svm_uIjcvUGWSw" + configOverrides: + controller.properties: + zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper. diff --git a/docs/modules/kafka/examples/kraft_migration/04-migrate-brokers.yaml b/docs/modules/kafka/examples/kraft_migration/04-migrate-brokers.yaml new file mode 100644 index 00000000..9224b37d --- /dev/null +++ b/docs/modules/kafka/examples/kraft_migration/04-migrate-brokers.yaml @@ -0,0 +1,40 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: zookeeper + authentication: + - authenticationClass: kafka-client-auth-tls + tls: + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokerIdPodConfigMapName: broker-ids + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "aC1zl524Svm_uIjcvUGWSw" + roleGroups: + default: + replicas: 3 + configOverrides: + broker.properties: + controller.listener.names: "CONTROLLER" + controller.quorum.voters: "2110489703@simple-kafka-controller-default-0.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093,2110489704@simple-kafka-controller-default-1.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093,2110489705@simple-kafka-controller-default-2.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093" + process.roles: "broker" + node.id: "${env:REPLICA_ID}" + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "aC1zl524Svm_uIjcvUGWSw" + configOverrides: + controller.properties: + zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper. diff --git a/docs/modules/kafka/examples/kraft_migration/05-kraft-mode.yaml b/docs/modules/kafka/examples/kraft_migration/05-kraft-mode.yaml new file mode 100644 index 00000000..a4fbd1fe --- /dev/null +++ b/docs/modules/kafka/examples/kraft_migration/05-kraft-mode.yaml @@ -0,0 +1,33 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: kraft + authentication: + - authenticationClass: kafka-client-auth-tls + tls: + internalSecretClass: kafka-internal-tls + serverSecretClass: tls + brokerIdPodConfigMapName: broker-ids + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "aC1zl524Svm_uIjcvUGWSw" + roleGroups: + default: + replicas: 3 + configOverrides: + broker.properties: + controller.listener.names: "CONTROLLER" + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "aC1zl524Svm_uIjcvUGWSw" diff --git a/docs/modules/kafka/examples/mirror_maker/01-setup-source.yaml b/docs/modules/kafka/examples/mirror_maker/01-setup-source.yaml new file mode 100644 index 00000000..4d95005a --- /dev/null +++ b/docs/modules/kafka/examples/mirror_maker/01-setup-source.yaml @@ -0,0 +1,90 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + stackable.tech/vendor: Stackable + name: mm-migration +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: zookeeper + namespace: mm-migration +spec: + image: + productVersion: 3.9.4 + pullPolicy: IfNotPresent + servers: + roleGroups: + default: + replicas: 1 +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperZnode +metadata: + name: source-znode + namespace: mm-migration +spec: + clusterRef: + name: zookeeper +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: source-internal-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-source-internal-tls-ca + namespace: mm-migration + autoGenerate: true +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: source-client-auth +spec: + provider: + tls: + clientCertSecretClass: source-client-auth-secret +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: source-client-auth-secret +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-source-client-ca + namespace: mm-migration + autoGenerate: true +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: source + namespace: mm-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: zookeeper + authentication: + - authenticationClass: source-client-auth + tls: + internalSecretClass: source-internal-tls + serverSecretClass: tls + zookeeperConfigMapName: source-znode + brokers: + roleGroups: + default: + replicas: 1 + configOverrides: + broker.properties: + offsets.topic.replication.factor: "1" # https://github.com/stackabletech/kafka-operator/issues/587 diff --git a/docs/modules/kafka/examples/mirror_maker/02-setup-target.yaml b/docs/modules/kafka/examples/mirror_maker/02-setup-target.yaml new file mode 100644 index 00000000..a69421fa --- /dev/null +++ b/docs/modules/kafka/examples/mirror_maker/02-setup-target.yaml @@ -0,0 +1,63 @@ +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: target-internal-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-target-internal-tls-ca + namespace: mm-migration + autoGenerate: true +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: target-client-auth +spec: + provider: + tls: + clientCertSecretClass: target-client-auth-secret +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: target-client-auth-secret +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-target-client-ca + namespace: mm-migration + autoGenerate: true +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: target + namespace: mm-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: kraft + authentication: + - authenticationClass: target-client-auth + tls: + internalSecretClass: target-internal-tls + serverSecretClass: tls + brokers: + roleGroups: + default: + replicas: 1 + configOverrides: + broker.properties: + offsets.topic.replication.factor: "1" # https://github.com/stackabletech/kafka-operator/issues/587 + controllers: + roleGroups: + default: + replicas: 1 diff --git a/docs/modules/kafka/examples/mirror_maker/README.md b/docs/modules/kafka/examples/mirror_maker/README.md new file mode 100644 index 00000000..df069e6d --- /dev/null +++ b/docs/modules/kafka/examples/mirror_maker/README.md @@ -0,0 +1,46 @@ + +### Setup + +k create --save-config -f docs/modules/kafka/examples/mirror_maker/01-setup-source.yaml +k create --save-config -f docs/modules/kafka/examples/mirror_maker/02-setup-target.yaml + +k cp -n mm-migration -c kafka target-broker-default-0:/stackable/tls-kafka-server/keystore.p12 docs/modules/kafka/examples/mirror_maker/keystore.p12 +k cp -n mm-migration -c kafka target-broker-default-0:/stackable/tls-kafka-server/truststore.p12 docs/modules/kafka/examples/mirror_maker/truststore.p12 + +k cp -n mm-migration -c kafka docs/modules/kafka/examples/mirror_maker/truststore.p12 source-broker-default-0:/stackable/truststore.p12 +k cp -n mm-migration -c kafka docs/modules/kafka/examples/mirror_maker/keystore.p12 source-broker-default-0:/stackable/keystore.p12 + +k cp -n mm-migration -c kafka docs/modules/kafka/examples/mirror_maker/mm.properties source-broker-default-0:/stackable/mm.properties + +### Create a topic and publish some data + +/stackable/kafka/bin/kafka-topics.sh --create --topic test --partitions 1 --bootstrap-server source-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 --command-config /stackable/config/client.properties + +/stackable/kafka/bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=source-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 --payload-monotonic --throughput 1 --num-records 100 --producer.config /stackable/config/client.properties --topic test + +/stackable/kafka/bin/kafka-console-consumer.sh --bootstrap-server source-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 --consumer.config /stackable/config/client.properties --topic test --offset earliest --partition 0 --timeout-ms 10000 + +### Run MirrorMaker + +EXTRA_ARGS="" /stackable/kafka/bin/connect-mirror-maker.sh /stackable/mm.properties + +### Verify the topic is mirrored + +/stackable/kafka/bin/kafka-topics.sh --list --bootstrap-server target-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 --command-config /stackable/config/client.properties + +/stackable/kafka/bin/kafka-console-consumer.sh --bootstrap-server target-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 --consumer.config /stackable/config/client.properties --topic source.test --offset earliest --partition 0 --timeout-ms 10000 + +### Cleanup + +k delete -n mm-migration kafkaclusters source +k delete -n mm-migration kafkaclusters target +k delete -n mm-migration zookeeperznodes source-znode +k delete -n mm-migration zookeeperclusters zookeeper +k delete -n mm-migration secretclasses source-internal-tls +k delete -n mm-migration secretclasses source-client-auth-secret +k delete -n mm-migration secretclasses target-internal-tls +k delete -n mm-migration secretclasses target-client-auth-secret +k delete -n mm-migration authenticationclasses target-client-auth +k delete -n mm-migration authenticationclasses source-client-auth +k delete -n mm-migration persistentvolumeclaims --all +k delete ns mm-migration diff --git a/docs/modules/kafka/examples/mirror_maker/mm.properties b/docs/modules/kafka/examples/mirror_maker/mm.properties new file mode 100644 index 00000000..b8360527 --- /dev/null +++ b/docs/modules/kafka/examples/mirror_maker/mm.properties @@ -0,0 +1,38 @@ +# specify any number of cluster aliases +clusters = source, target + +# connection information for each cluster +# This is a comma separated host:port pairs for each cluster +# for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts +source.bootstrap.servers = source-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 +target.bootstrap.servers = target-broker-default-bootstrap.mm-migration.svc.cluster.local:9093 + +# enable and configure individual replication flows +source->target.enabled = true + +# regex which defines which topics gets replicated. For eg "foo-.*" +source->target.topics = test + +# Needed for mm2 internal topics if there is only one broker running per cluster +offset.storage.replication.factor=1 +config.storage.replication.factor=1 +status.storage.replication.factor=1 + +# SSL configuration +target.security.protocol=SSL +target.ssl.truststore.password= +target.ssl.truststore.location=/stackable/truststore.p12 +target.ssl.truststore.type=PKCS12 +#keystore location in case client.auth is set to required +target.ssl.keystore.password= +target.ssl.keystore.location=/stackable/keystore.p12 +target.ssl.keystore.type=PKCS12 + +source.security.protocol=SSL +source.ssl.truststore.password= +source.ssl.truststore.location=/stackable/tls-kafka-server/truststore.p12 +source.ssl.truststore.type=PKCS12 +#keystore location in case client.auth is set to required +source.ssl.keystore.password= +source.ssl.keystore.location=/stackable/tls-kafka-server/keystore.p12 +source.ssl.keystore.type=PKCS12 diff --git a/docs/modules/kafka/pages/usage-guide/kraft-controller.adoc b/docs/modules/kafka/pages/usage-guide/kraft-controller.adoc index ea5c4946..cfeffba9 100644 --- a/docs/modules/kafka/pages/usage-guide/kraft-controller.adoc +++ b/docs/modules/kafka/pages/usage-guide/kraft-controller.adoc @@ -83,7 +83,7 @@ KRaft mode requires major configuration changes compared to ZooKeeper: * `cluster-id`: This is set to the `metadata.name` of the KafkaCluster resource during initial formatting * `node.id`: This is a calculated integer, hashed from the `role` and `rolegroup` and added `replica` id. * `process.roles`: Will always only be `broker` or `controller`. Mixed `broker,controller` servers are not supported. -* The operator configures a static voter list containing the controller pods. Controllers are not dynamicaly managed. +* The operator configures a static voter list containing the controller pods. Controllers are not dynamically managed. == Known Issues @@ -110,3 +110,157 @@ The Stackable Kafka operator currently does not support the migration. The https://developers.redhat.com/articles/2024/11/27/dynamic-kafka-controller-quorum[Dynamic scaling] is only supported from Kafka version 3.9.0. If you are using older versions, automatic scaling may not work properly (e.g. adding or removing controller replicas). + +== Kraft migration guide + +The operator version `26.3.0` adds support for migrating Kafka clusters from ZooKeeper to KRaft mode. + +This guide describes the steps required to migrate an existing Kafka cluster managed by the Stackable Kafka operator from ZooKeeper to KRaft mode. + +NOTE: Before starting the migration we recommend to reduce producer/consumer operations to a minimum or even pause them completely if possible to reduce the risk of data loss during the migration. + +To make the migration step as clear as possible, we'll use a complete working example throughout this guide. +The example cluster will be kept minimal without any additional configuration. + +We start by creating a dedicated namespace to work in and deploy the Kafka cluster including ZooKeeper and credentials. + +[source,yaml] +---- +include::example$kraft_migration/01-setup.yaml[] +---- + +=== Requirements + +* Kafka clusters **must** be set up with the Stackable Kafka operator version `26.3.0` or higher. Kafka clusters set up with a previous operator version **cannot be upgraded** without migrating all broker instances first. This is because broker id management must be handed over from Kafka to the operator. The broker ids are not compatible between the two systems. +* Kafka version **must** be `3.7.2` or `3.9.1`. Starting with version `4.0.0` the Zookeeper support is removed completely. + +=== 1. Start Kraft controllers + +In this step we will perform the following actions: + +1. Retrieve the current `cluster.id` as generated by Kafka. +2. Retrieve and store the current broker ids. +3. Update the `KafkaCluster` resource to add `spec.controllers` property. +4. Configure the controllers to run in migration mode. +5. Apply the changes and wait for all cluster pods to become ready. + +We can obtain the current `cluster.id` either by inspecting the ZooKeeper data or from `meta.properties` file on one of the brokers. +In this example, the identifier is `cPh4Fb3pRvyqiiVjaBDaEw`. +We add this value to the `KAFKA_CLUSTER_ID` environment variable for both brokers and controllers. + +To be able to migrate the existing brokers, we need to preserve their broker ids. +Similarly to the cluster id, we can obtain the broker ids from the `meta.properties` file on each broker pod. +We then need to inform the operator to use these ids instead of generating new ones. +This is done by creating a configmap map containing the id mapping and pointing the `spec.clusterProperties.brokerIdPodConfigMapName` property of the `KafkaCluster` resource to it. + +These two properties must be preserverd for the rest of the migration process and the lifetime of the cluster. + +The complete example `KafkaCluster` resource after applying the required changes looks as follows: + +[source,yaml] +---- +include::example$kraft_migration/02-start-controllers.yaml[] +---- + +We `kubectl apply` the updated resource and wait for brokers and controllers to become ready. + +=== 2. Migrate metadata + +In this step we will perform the following actions: + +1. Obtain the controller quorum configuration. +2. Enable metadata migration mode on the brokers. +3. Configure the controller quorum on the brokers. +4. Apply the changes and restart the broker pods. + +The exact value of the quorum must be obtained from the `/tmp/controller.properties` file on one of the controller pods. +To start the metadata migration, we need to add the `zookeeper.metadata.migration.enable: "true"` and controller quorum configuration to the broker configuration. + +For this step, the complete example `KafkaCluster` resource looks as follows: + +[source,yaml] +---- +include::example$kraft_migration/03-migrate-metadata.yaml[] +---- + +After we apply the changes, we then restart the brokers and wait for them to become ready again. + +[source,bash] +---- +kubectl rollout restart statefulset simple-kafka-broker-default -n kraft-migration +---- + +Finally we check that metadata migration was successful: + +[source,bash] +---- +kubectl logs -n kraft-migration simple-kafka-controller-default-0 | grep -i 'completed migration' \ +|| kubectl logs -n kraft-migration simple-kafka-controller-default-1 | grep -i 'completed migration' \ +|| kubectl logs -n kraft-migration simple-kafka-controller-default-2 | grep -i 'completed migration' + +... +[2025-12-22 09:23:53,372] INFO [KRaftMigrationDriver id=2110489705] Completed migration of metadata from ZooKeeper to KRaft. 0 records were generated in 102 ms across 0 batches. The average time spent waiting on a batch was -1.00 ms. The record types were {}. The current metadata offset is now 280 with an epoch of 3. Saw 0 brokers in the migrated metadata []. (org.apache.kafka.metadata.migration.KRaftMigrationDriver) +---- + +=== 3. Migrate brokers + + +NOTE: This is the last step before fully switching to KRaft mode. In case of unforeseen issues, it is the last step where we can roll back to ZooKeeper mode. + +In this step we will perform the following actions: + +1. Remove the migration properties from the previous step on the brokers. +2. Assign Kraft role properties to brokers. +3. Apply the changes and restart the broker pods. + +We need to preserve the quorum configuration added in the previous step. + +For this step, the complete example `KafkaCluster` resource looks as follows: + + +[source,yaml] +---- +include::example$kraft_migration/04-migrate-brokers.yaml[] +---- + +=== 4. Enable Kraft mode + +After this step, the cluster will be fully running in KRaft mode and it cannot be rolled back to ZooKeeper mode anymore. + +In this step we will perform the following actions: + +1. Put the cluster in Kraft mode by updating the `spec.clusterConfig.metadataManager` property. +2. Remove Kraft quorum configuration from the broker pods. +3. Remove the ZooKeeper migration flag from the controllers. +4. Apply the changes and restart all pods. + +We need to preserve the `KAFKA_CLUSTER_ID` environment variable for the rest of the lifetime of this cluster. + +The complete example `KafkaCluster` resource after applying the required changes looks as follows: + +[source,yaml] +---- +include::example$kraft_migration/05-kraft-mode.yaml[] +---- + +Verify that the cluster is healthy and consumer/producer operations work as expected. + +=== 5. Cleanup + +Before proceeding with this step please ensure that the Kafka cluster is fully operational in KRaft mode. + +In this step we remove the now unused ZooKeeper cluster and related resources. + +If the ZooKeeper cluster is also serving other use cases than Kafka you can skip this step. + +In our example we can remove the ZooKeeper cluster and the Znode resource as follows: + +[source,bash] +---- +kubectl delete -n kraft-migration zookeeperznodes simple-kafka-znode +kubectl delete -n kraft-migration zookeeperclusters simple-zk +---- + +=== 6. Next steps + +After successfully migrating to Kraft mode, consider updating the Kafka version to `4.0.0` or higher to benefit from the latest features and improvements in KRaft mode. diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index 252d28c0..a4540001 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -11,15 +11,13 @@ use crate::{ KafkaPodDescriptor, STACKABLE_CONFIG_DIR, STACKABLE_KERBEROS_KRB5_PATH, role::{broker::BROKER_PROPERTIES_FILE, controller::CONTROLLER_PROPERTIES_FILE}, security::KafkaTlsSecurity, - v1alpha1, }, - product_logging::STACKABLE_LOG_DIR, + product_logging::{BROKER_ID_POD_MAP_DIR, STACKABLE_LOG_DIR}, }; /// Returns the commands to start the main Kafka container pub fn broker_kafka_container_commands( - kafka: &v1alpha1::KafkaCluster, - cluster_id: &str, + kraft_mode: bool, controller_descriptors: Vec, kafka_security: &KafkaTlsSecurity, product_version: &str, @@ -45,44 +43,49 @@ pub fn broker_kafka_container_commands( false => "".to_string(), }, import_opa_tls_cert = kafka_security.copy_opa_tls_cert_command(), - broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version), + broker_start_command = broker_start_command(kraft_mode, controller_descriptors, product_version), } } fn broker_start_command( - kafka: &v1alpha1::KafkaCluster, - cluster_id: &str, + kraft_mode: bool, controller_descriptors: Vec, product_version: &str, ) -> String { - if kafka.is_controller_configured() { - formatdoc! {" - POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + let common_command = formatdoc! {" + export POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) + if [ -f \"{broker_id_pod_map_dir}/$POD_NAME\" ]; then + REPLICA_ID=$(cat \"{broker_id_pod_map_dir}/$POD_NAME\") + fi + cp {config_dir}/{properties_file} /tmp/{properties_file} config-utils template /tmp/{properties_file} cp {config_dir}/jaas.properties /tmp/jaas.properties config-utils template /tmp/jaas.properties + ", + broker_id_pod_map_dir = BROKER_ID_POD_MAP_DIR, + config_dir = STACKABLE_CONFIG_DIR, + properties_file = BROKER_PROPERTIES_FILE, + }; - bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} + if kraft_mode { + formatdoc! {" + {common_command} + + bin/kafka-storage.sh format --cluster-id \"$KAFKA_CLUSTER_ID\" --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} bin/kafka-server-start.sh /tmp/{properties_file} & ", - config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, initial_controller_command = initial_controllers_command(&controller_descriptors, product_version), } } else { formatdoc! {" - cp {config_dir}/{properties_file} /tmp/{properties_file} - config-utils template /tmp/{properties_file} - - cp {config_dir}/jaas.properties /tmp/jaas.properties - config-utils template /tmp/jaas.properties + {common_command} bin/kafka-server-start.sh /tmp/{properties_file} &", - config_dir = STACKABLE_CONFIG_DIR, properties_file = BROKER_PROPERTIES_FILE, } } @@ -131,7 +134,6 @@ wait_for_termination() "#; pub fn controller_kafka_container_command( - cluster_id: &str, controller_descriptors: Vec, product_version: &str, ) -> String { @@ -148,7 +150,7 @@ pub fn controller_kafka_container_command( config-utils template /tmp/{properties_file} - bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} + bin/kafka-storage.sh format --cluster-id \"$KAFKA_CLUSTER_ID\" --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} bin/kafka-server-start.sh /tmp/{properties_file} & wait_for_termination $! diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 1f0b8c56..41d7796c 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -162,14 +162,17 @@ impl KafkaListenerConfig { .join(",") } - /// Returns the `listener.security.protocol.map` for the Kafka `broker.properties` config. - pub fn listener_security_protocol_map_for_listener( - &self, - listener_name: &KafkaListenerName, - ) -> Option { + /// Returns the `listener.security.protocol.map` for the Kraft controller. + /// This map must include the internal broker listener too. + pub fn listener_security_protocol_map_for_controller(&self) -> String { self.listener_security_protocol_map - .get(listener_name) - .map(|protocol| format!("{listener_name}:{protocol}")) + .iter() + .filter(|(name, _)| { + *name == &KafkaListenerName::Internal || *name == &KafkaListenerName::Controller + }) + .map(|(name, protocol)| format!("{name}:{protocol}")) + .collect::>() + .join(",") } } diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index dbc6bef8..fb556cbd 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -24,6 +24,7 @@ use stackable_operator::{ utils::cluster_info::KubernetesClusterInfo, versioned::versioned, }; +use strum::{Display, EnumIter, EnumString}; use crate::{ config::node_id_hasher::node_id_hash32_offset, @@ -59,6 +60,11 @@ pub const STACKABLE_KERBEROS_KRB5_PATH: &str = "/stackable/kerberos/krb5.conf"; #[derive(Snafu, Debug)] pub enum Error { + #[snafu(display( + "The ZooKeeper metadata manager is not supported for Kafka version 4 and higher" + ))] + Kafka4RequiresKraftMetadataManager, + #[snafu(display("The Kafka role [{role}] is missing from spec"))] MissingRole { role: String }, @@ -163,9 +169,61 @@ pub mod versioned { /// Provide the name of the ZooKeeper [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery) /// here. When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/) /// to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. - /// This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper suppport was dropped. + /// This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper support was dropped. /// Please use the 'controller' role instead. pub zookeeper_config_map_name: Option, + + /// Metadata manager to use for the Kafka cluster. + /// + /// IMPORTANT: This property will be removed as soon as Kafka 3.x support is dropped. + /// + /// Possible values are `zookeeper` and `kraft`. + /// + /// If not set, defaults to: + /// + /// - `zookeeper` for Kafka versions below `4.0.0`. + /// - `kraft` for Kafka versions `4.0.0` and higher. + /// + /// Using `zookeeper` for Kafka versions `4.0.0` and higher is not supported. + /// + /// When set to `kraft`, the operator will perform the following actions: + /// + /// * Generate the Kafka cluster id. + /// * Assign broker roles and configure controller quorum voters in the `broker.properties` files. + /// * Format storage before (re)starting Kafka brokers. + /// * Remove ZooKeeper related configuration options from the `broker.properties` files. + /// + /// These actions are **mandatory** when in Kraft mode and partially exclusive to the ZooKeeper mode. + /// This means they **cannot** be performed in ZooKeeper mode. + /// + /// This property is also useful when migrating from ZooKeeper to Kraft mode because it permits the operator + /// to reconcile controllers while still using ZooKeeper for brokers. + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata_manager: Option, + + /// Enable users to manually assign Kafka broker ids. + /// + /// Name of a ConfigMap containing a mapping of broker IDs to pod names. + /// The ConfigMap must contain a key for every broker pod in the cluster with the following format: + /// `: ` + /// + /// Example: + /// ``` + /// --- + /// apiVersion: v1 + /// kind: ConfigMap + /// metadata: + /// name: brokeridmapping + /// data: + /// simple-kafka-broker-default-0: "2000" + /// simple-kafka-broker-default-1: "2001" + /// simple-kafka-broker-default-2: "2002" + /// ``` + /// This is necessary when migrating from ZooKeeper to Kraft mode to retain existing broker IDs + /// because previously broker ids were generated by Kafka and not the operator. + /// + #[serde(skip_serializing_if = "Option::is_none")] + pub broker_id_pod_config_map_name: Option, } } @@ -177,6 +235,8 @@ impl Default for v1alpha1::KafkaClusterConfig { tls: tls::default_kafka_tls(), vector_aggregator_config_map_name: None, zookeeper_config_map_name: None, + metadata_manager: None, + broker_id_pod_config_map_name: None, } } } @@ -191,30 +251,46 @@ impl HasStatusCondition for v1alpha1::KafkaCluster { } impl v1alpha1::KafkaCluster { - /// Supporting Kraft alongside Zookeeper requires a couple of CRD checks - /// - If Kafka 4 and higher is used, no zookeeper config map ref has to be provided - /// - Configuring the controller role means no zookeeper config map ref has to be provided - pub fn check_kraft_vs_zookeeper(&self, product_version: &str) -> Result<(), Error> { - if product_version.starts_with("4.") && self.spec.controllers.is_none() { - return Err(Error::Kafka4RequiresKraft); - } - - if self.spec.controllers.is_some() - && self.spec.cluster_config.zookeeper_config_map_name.is_some() - { - return Err(Error::KraftAndZookeeperConfigured); + pub fn effective_metadata_manager(&self) -> Result { + match &self.spec.cluster_config.metadata_manager { + Some(manager) => match manager.clone() { + MetadataManager::ZooKeeper => { + if self.spec.image.product_version().starts_with("4\\.") { + Err(Error::Kafka4RequiresKraftMetadataManager) + } else { + Ok(MetadataManager::ZooKeeper) + } + } + _ => Ok(MetadataManager::KRaft), + }, + None => { + if self.spec.image.product_version().starts_with("4\\.") { + Ok(MetadataManager::KRaft) + } else { + Ok(MetadataManager::ZooKeeper) + } + } } - - Ok(()) - } - - pub fn is_controller_configured(&self) -> bool { - self.spec.controllers.is_some() } - // The cluster-id for Kafka + /// The Kafka cluster id when running in Kraft mode. + /// + /// In ZooKeeper mode the cluster id is a UUID generated by Kafka its self and users typically + /// do not need to deal with it. + /// + /// When in Kraft mode, the cluster id is passed on an as the environment variable `KAFKA_CLUSTER_ID`. + /// + /// When migrating to Kraft mode, users *must* set this variable via `envOverrides` to the value + /// found in the `cluster/id` ZooKeeper node or in the `meta.properties` file. + /// + /// For freshly installed clusters, users do not need to deal with the cluster id. pub fn cluster_id(&self) -> Option<&str> { - self.metadata.name.as_deref() + self.effective_metadata_manager() + .ok() + .and_then(|manager| match manager { + MetadataManager::KRaft => self.metadata.name.as_deref(), + _ => None, + }) } /// The name of the load-balanced Kubernetes Service providing the bootstrap address. Kafka clients will use this @@ -412,6 +488,25 @@ pub struct KafkaClusterStatus { pub conditions: Vec, } +#[derive( + Clone, + Debug, + Deserialize, + Display, + EnumIter, + Eq, + Hash, + JsonSchema, + PartialEq, + Serialize, + EnumString, +)] +#[serde(rename_all = "lowercase")] +pub enum MetadataManager { + ZooKeeper, + KRaft, +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/operator-binary/src/crd/role/broker.rs b/rust/operator-binary/src/crd/role/broker.rs index 00d614b9..70ac85d0 100644 --- a/rust/operator-binary/src/crd/role/broker.rs +++ b/rust/operator-binary/src/crd/role/broker.rs @@ -107,11 +107,15 @@ impl Configuration for BrokerConfigFragment { fn compute_env( &self, - _resource: &Self::Configurable, + resource: &Self::Configurable, _role_name: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - Ok(BTreeMap::new()) + let mut result = BTreeMap::new(); + if let Some(cluster_id) = resource.cluster_id() { + result.insert("KAFKA_CLUSTER_ID".to_string(), Some(cluster_id.to_string())); + } + Ok(result) } fn compute_cli( diff --git a/rust/operator-binary/src/crd/role/controller.rs b/rust/operator-binary/src/crd/role/controller.rs index 5b9513a5..bf1468b6 100644 --- a/rust/operator-binary/src/crd/role/controller.rs +++ b/rust/operator-binary/src/crd/role/controller.rs @@ -97,11 +97,15 @@ impl Configuration for ControllerConfigFragment { fn compute_env( &self, - _resource: &Self::Configurable, + resource: &Self::Configurable, _role_name: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - Ok(BTreeMap::new()) + let mut result = BTreeMap::new(); + if let Some(cluster_id) = resource.cluster_id() { + result.insert("KAFKA_CLUSTER_ID".to_string(), Some(cluster_id.to_string())); + } + Ok(result) } fn compute_cli( diff --git a/rust/operator-binary/src/crd/role/mod.rs b/rust/operator-binary/src/crd/role/mod.rs index 47210ea4..bc3f4df2 100644 --- a/rust/operator-binary/src/crd/role/mod.rs +++ b/rust/operator-binary/src/crd/role/mod.rs @@ -33,6 +33,11 @@ use crate::{ /// Env var pub const KAFKA_NODE_ID_OFFSET: &str = "NODE_ID_OFFSET"; +/// Past versions of the operator didn't set this explicitly and allowed Kafka to generate random ids. +/// To support Kraft migration, this must be carried over to `KAFKA_NODE_ID` so the operator needs +/// to know it's value for each broker Pod. +pub const KAFKA_BROKER_ID: &str = "broker.id"; + // See: https://kafka.apache.org/documentation/#brokerconfigs /// The node ID associated with the roles this process is playing when process.roles is non-empty. /// This is required configuration when running in KRaft mode. @@ -66,10 +71,6 @@ pub const KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: &str = "listener.security.protoc /// For example: localhost:9092,localhost:9093,localhost:9094. pub const KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS: &str = "controller.quorum.bootstrap.servers"; -/// Map of id/endpoint information for the set of voters in a comma-separated list of {id}@{host}:{port} entries. -/// For example: 1@localhost:9092,2@localhost:9093,3@localhost:9094 -pub const KAFKA_CONTROLLER_QUORUM_VOTERS: &str = "controller.quorum.voters"; - #[derive(Snafu, Debug)] pub enum Error { #[snafu(display("fragment validation failure"))] diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index dd5b4358..9a205b5f 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -768,6 +768,33 @@ impl KafkaTlsSecurity { KafkaListenerName::Controller.listener_ssl_truststore_type(), "PKCS12".to_string(), ); + + // The TLS properties for the internal broker listener are needed by the Kraft controllers + // too during metadata migration from ZooKeeper to Kraft mode. + config.insert( + KafkaListenerName::Internal.listener_ssl_keystore_location(), + format!("{}/keystore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), + ); + config.insert( + KafkaListenerName::Internal.listener_ssl_keystore_password(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + KafkaListenerName::Internal.listener_ssl_keystore_type(), + "PKCS12".to_string(), + ); + config.insert( + KafkaListenerName::Internal.listener_ssl_truststore_location(), + format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR), + ); + config.insert( + KafkaListenerName::Internal.listener_ssl_truststore_password(), + Self::SSL_STORE_PASSWORD.to_string(), + ); + config.insert( + KafkaListenerName::Internal.listener_ssl_truststore_type(), + "PKCS12".to_string(), + ); // We set either client tls with authentication or client tls without authentication // If authentication is explicitly required we do not want to have any other CAs to // be trusted. diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 4a4ac599..d6566b9d 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -274,11 +274,6 @@ pub async fn reconcile_kafka( .resolve(DOCKER_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION) .context(ResolveProductImageSnafu)?; - // check Kraft vs ZooKeeper and fail if misconfigured - kafka - .check_kraft_vs_zookeeper(&resolved_product_image.product_version) - .context(MisconfiguredKafkaClusterSnafu)?; - let mut cluster_resources = ClusterResources::new( APP_NAME, OPERATOR_NAME, @@ -571,7 +566,8 @@ fn validated_product_config( ), ); - if kafka.is_controller_configured() { + // TODO: need this if because controller_role() raises an error + if kafka.spec.controllers.is_some() { roles.insert( KafkaRole::Controller.to_string(), ( diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index b7990be6..8336f5f7 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -15,6 +15,7 @@ use crate::crd::{ v1alpha1, }; +pub const BROKER_ID_POD_MAP_DIR: &str = "/stackable/broker-id-pod-map"; pub const STACKABLE_LOG_CONFIG_DIR: &str = "/stackable/log_config"; pub const STACKABLE_LOG_DIR: &str = "/stackable/log"; // log4j diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index ded83c59..b423727c 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -15,13 +15,13 @@ use stackable_operator::{ use crate::{ crd::{ - JVM_SECURITY_PROPERTIES_FILE, KafkaPodDescriptor, STACKABLE_LISTENER_BOOTSTRAP_DIR, - STACKABLE_LISTENER_BROKER_DIR, + JVM_SECURITY_PROPERTIES_FILE, KafkaPodDescriptor, MetadataManager, + STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, listener::{KafkaListenerConfig, KafkaListenerName, node_address_cmd}, role::{ - AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, - KAFKA_CONTROLLER_QUORUM_VOTERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, - KAFKA_LOG_DIRS, KAFKA_NODE_ID, KAFKA_PROCESS_ROLES, KafkaRole, + AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_BROKER_ID, + KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, + KAFKA_LISTENERS, KAFKA_LOG_DIRS, KAFKA_NODE_ID, KAFKA_PROCESS_ROLES, KafkaRole, }, security::KafkaTlsSecurity, v1alpha1, @@ -34,6 +34,9 @@ use crate::{ #[derive(Snafu, Debug)] pub enum Error { + #[snafu(display("invalid metadata manager"))] + InvalidMetadataManager { source: crate::crd::Error }, + #[snafu(display("failed to build ConfigMap for {}", rolegroup))] BuildRoleGroupConfig { source: stackable_operator::builder::configmap::Error, @@ -44,7 +47,7 @@ pub enum Error { "failed to serialize [{JVM_SECURITY_PROPERTIES_FILE}] for {}", rolegroup ))] - JvmSecurityPoperties { + JvmSecurityProperties { source: product_config::writer::PropertiesWriterError, rolegroup: String, }, @@ -93,13 +96,21 @@ pub fn build_rolegroup_config_map( ) -> Result { let kafka_config_file_name = merged_config.config_file_name(); + let metadata_manager = kafka + .effective_metadata_manager() + .context(InvalidMetadataManagerSnafu)?; + let mut kafka_config = server_properties_file( - kafka.is_controller_configured(), + metadata_manager == MetadataManager::KRaft, &rolegroup.role, pod_descriptors, listener_config, opa_connect_string, - resolved_product_image.product_version.starts_with("3.7"), // needs_quorum_voters + kafka + .spec + .cluster_config + .broker_id_pod_config_map_name + .is_some(), )?; match merged_config { @@ -163,7 +174,7 @@ pub fn build_rolegroup_config_map( .add_data( JVM_SECURITY_PROPERTIES_FILE, to_java_properties_string(jvm_sec_props.iter()).with_context(|_| { - JvmSecurityPopertiesSnafu { + JvmSecurityPropertiesSnafu { rolegroup: rolegroup.role_group.clone(), } })?, @@ -176,7 +187,7 @@ pub fn build_rolegroup_config_map( .iter() .map(|(k, v)| (k, v)), ) - .with_context(|_| JvmSecurityPopertiesSnafu { + .with_context(|_| JvmSecurityPropertiesSnafu { rolegroup: rolegroup.role_group.clone(), })?, ) @@ -213,7 +224,7 @@ fn server_properties_file( pod_descriptors: &[KafkaPodDescriptor], listener_config: &KafkaListenerConfig, opa_connect_string: Option<&str>, - needs_quorum_voters: bool, + disable_broker_id_generation: bool, ) -> Result, Error> { let kraft_controllers = kraft_controllers(pod_descriptors); @@ -250,17 +261,22 @@ fn server_properties_file( ( KAFKA_LISTENER_SECURITY_PROTOCOL_MAP.to_string(), listener_config - .listener_security_protocol_map_for_listener(&KafkaListenerName::Controller) - .unwrap_or("".to_string())), + .listener_security_protocol_map_for_controller()), ]); - if needs_quorum_voters { - let kraft_voters = - kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; + result.insert( + "inter.broker.listener.name".to_string(), + KafkaListenerName::Internal.to_string(), + ); - result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); + // The ZooKeeper connection is needed for migration from ZooKeeper to KRaft mode. + // It is not needed once the controller is fully running in KRaft mode. + if !kraft_mode { + result.insert( + "zookeeper.connect".to_string(), + "${env:ZOOKEEPER}".to_string(), + ); } - Ok(result) } KafkaRole::Broker => { @@ -278,6 +294,10 @@ fn server_properties_file( KAFKA_LISTENER_SECURITY_PROTOCOL_MAP.to_string(), listener_config.listener_security_protocol_map(), ), + ( + "inter.broker.listener.name".to_string(), + KafkaListenerName::Internal.to_string(), + ), ]); if kraft_mode { @@ -285,6 +305,10 @@ fn server_properties_file( // Running in KRaft mode result.extend([ + ( + "broker.id.generation.enable".to_string(), + "false".to_string(), + ), (KAFKA_NODE_ID.to_string(), "${env:REPLICA_ID}".to_string()), ( KAFKA_PROCESS_ROLES.to_string(), @@ -299,19 +323,25 @@ fn server_properties_file( kraft_controllers.clone(), ), ]); - - if needs_quorum_voters { - let kraft_voters = - kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; - - result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); - } } else { // Running with ZooKeeper enabled result.extend([( "zookeeper.connect".to_string(), "${env:ZOOKEEPER}".to_string(), )]); + // We are in zookeeper mode and the user has defined a broker id mapping + // so we disable automatic id generation. + // This check ensures that existing clusters running in ZooKeeper mode do not + // suddenly break after the introduction of this change. + if disable_broker_id_generation { + result.extend([ + ( + "broker.id.generation.enable".to_string(), + "false".to_string(), + ), + (KAFKA_BROKER_ID.to_string(), "${env:REPLICA_ID}".to_string()), + ]); + } } // Enable OPA authorization @@ -358,21 +388,6 @@ fn kraft_controllers(pod_descriptors: &[KafkaPodDescriptor]) -> Option { } } -fn kraft_voters(pod_descriptors: &[KafkaPodDescriptor]) -> Option { - let result = pod_descriptors - .iter() - .filter(|pd| pd.role == KafkaRole::Controller.to_string()) - .map(|desc| desc.as_quorum_voter()) - .collect::>() - .join(","); - - if result.is_empty() { - None - } else { - Some(result) - } -} - // Generate JAAS configuration file for Kerberos authentication // or an empty string if Kerberos is not enabled. // See https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 10ae0366..5b9acaef 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -51,8 +51,8 @@ use crate::{ crd::{ self, APP_NAME, KAFKA_HEAP_OPTS, LISTENER_BOOTSTRAP_VOLUME_NAME, LISTENER_BROKER_VOLUME_NAME, LOG_DIRS_VOLUME_NAME, METRICS_PORT, METRICS_PORT_NAME, - STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, STACKABLE_LISTENER_BOOTSTRAP_DIR, - STACKABLE_LISTENER_BROKER_DIR, + MetadataManager, STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, + STACKABLE_LISTENER_BOOTSTRAP_DIR, STACKABLE_LISTENER_BROKER_DIR, role::{ AnyConfig, KAFKA_NODE_ID_OFFSET, KafkaRole, broker::BrokerContainer, controller::ControllerContainer, @@ -64,14 +64,17 @@ use crate::{ kerberos::add_kerberos_pod_config, operations::graceful_shutdown::add_graceful_shutdown_config, product_logging::{ - MAX_KAFKA_LOG_FILES_SIZE, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR, kafka_log_opts, - kafka_log_opts_env_var, + BROKER_ID_POD_MAP_DIR, MAX_KAFKA_LOG_FILES_SIZE, STACKABLE_LOG_CONFIG_DIR, + STACKABLE_LOG_DIR, kafka_log_opts, kafka_log_opts_env_var, }, utils::build_recommended_labels, }; #[derive(Snafu, Debug)] pub enum Error { + #[snafu(display("invalid metadata manager"))] + InvalidMetadataManager { source: crate::crd::Error }, + #[snafu(display("failed to add kerberos config"))] AddKerberosConfig { source: crate::kerberos::Error }, @@ -284,7 +287,9 @@ pub fn build_broker_rolegroup_statefulset( ..EnvVar::default() }); - let cluster_id = kafka.cluster_id().context(ClusterIdMissingSnafu)?; + let metadata_manager = kafka + .effective_metadata_manager() + .context(InvalidMetadataManagerSnafu)?; cb_kafka .image_from_product_image(resolved_product_image) @@ -296,8 +301,7 @@ pub fn build_broker_rolegroup_statefulset( "-c".to_string(), ]) .args(vec![broker_kafka_container_commands( - kafka, - cluster_id, + metadata_manager == MetadataManager::KRaft, // we need controller pods kafka .pod_descriptors( @@ -438,6 +442,22 @@ pub fn build_broker_rolegroup_statefulset( ) .context(AddListenerVolumeSnafu)?; } + + if let Some(broker_id_config_map_name) = + &kafka.spec.cluster_config.broker_id_pod_config_map_name + { + pod_builder + .add_volume( + VolumeBuilder::new("broker-id-pod-map-dir") + .with_config_map(broker_id_config_map_name) + .build(), + ) + .context(AddVolumeSnafu)?; + cb_kafka + .add_volume_mount("broker-id-pod-map-dir", BROKER_ID_POD_MAP_DIR) + .context(AddVolumeMountSnafu)?; + } + pod_builder .metadata(metadata) .image_pull_secrets_from_product_image(resolved_product_image) @@ -636,6 +656,22 @@ pub fn build_controller_rolegroup_statefulset( ..EnvVar::default() }); + // Controllers need the ZooKeeper connection string for migration + if let Some(zookeeper_config_map_name) = &kafka.spec.cluster_config.zookeeper_config_map_name { + env.push(EnvVar { + name: "ZOOKEEPER".to_string(), + value_from: Some(EnvVarSource { + config_map_key_ref: Some(ConfigMapKeySelector { + name: zookeeper_config_map_name.to_string(), + key: "ZOOKEEPER".to_string(), + ..ConfigMapKeySelector::default() + }), + ..EnvVarSource::default() + }), + ..EnvVar::default() + }) + }; + cb_kafka .image_from_product_image(resolved_product_image) .command(vec![ @@ -646,7 +682,6 @@ pub fn build_controller_rolegroup_statefulset( "-c".to_string(), ]) .args(vec![controller_kafka_container_command( - kafka.cluster_id().context(ClusterIdMissingSnafu)?, kafka .pod_descriptors(Some(kafka_role), cluster_info, kafka_security.client_port()) .context(BuildPodDescriptorsSnafu)?, diff --git a/tests/templates/kuttl/operations-kraft/20-install-kafka.yaml.j2 b/tests/templates/kuttl/operations-kraft/20-install-kafka.yaml.j2 index fd95c8ef..704cacaa 100644 --- a/tests/templates/kuttl/operations-kraft/20-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/20-install-kafka.yaml.j2 @@ -16,8 +16,9 @@ spec: productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} pullPolicy: IfNotPresent -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/operations-kraft/25-pause-kafka.yaml.j2 b/tests/templates/kuttl/operations-kraft/25-pause-kafka.yaml.j2 index 2be3f573..563e72a5 100644 --- a/tests/templates/kuttl/operations-kraft/25-pause-kafka.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/25-pause-kafka.yaml.j2 @@ -16,8 +16,9 @@ spec: productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} pullPolicy: IfNotPresent -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/operations-kraft/30-stop-kafka.yaml.j2 b/tests/templates/kuttl/operations-kraft/30-stop-kafka.yaml.j2 index b11dd670..cafaf9ba 100644 --- a/tests/templates/kuttl/operations-kraft/30-stop-kafka.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/30-stop-kafka.yaml.j2 @@ -16,8 +16,9 @@ spec: productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} pullPolicy: IfNotPresent -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} brokers: diff --git a/tests/templates/kuttl/operations-kraft/50-restart-kafka.yaml.j2 b/tests/templates/kuttl/operations-kraft/50-restart-kafka.yaml.j2 index 13a19572..a6ad4ec2 100644 --- a/tests/templates/kuttl/operations-kraft/50-restart-kafka.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/50-restart-kafka.yaml.j2 @@ -15,8 +15,9 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/operations-kraft/60-scale-controller-up.yaml.j2 b/tests/templates/kuttl/operations-kraft/60-scale-controller-up.yaml.j2 index c1b64e7e..3fdc5c4d 100644 --- a/tests/templates/kuttl/operations-kraft/60-scale-controller-up.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/60-scale-controller-up.yaml.j2 @@ -16,8 +16,9 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/operations-kraft/70-scale-controller-down.yaml.j2 b/tests/templates/kuttl/operations-kraft/70-scale-controller-down.yaml.j2 index ba70a1b5..a077213b 100644 --- a/tests/templates/kuttl/operations-kraft/70-scale-controller-down.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/70-scale-controller-down.yaml.j2 @@ -16,8 +16,9 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['kafka-kraft'] }}" {% endif %} -{% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft +{% if lookup('env', 'VECTOR_AGGREGATOR') %} vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/operations-kraft/80-scale-broker-down.yaml.j2 b/tests/templates/kuttl/operations-kraft/80-scale-broker-down.yaml.j2 index d532d273..d788a9c9 100644 --- a/tests/templates/kuttl/operations-kraft/80-scale-broker-down.yaml.j2 +++ b/tests/templates/kuttl/operations-kraft/80-scale-broker-down.yaml.j2 @@ -24,6 +24,7 @@ spec: {% endif %} {% if lookup('env', 'VECTOR_AGGREGATOR') %} clusterConfig: + metadataManager: kraft vectorAggregatorConfigMapName: vector-aggregator-discovery {% endif %} controllers: diff --git a/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 b/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 index 282686e9..95d85da6 100644 --- a/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 @@ -69,6 +69,7 @@ spec: {% endif %} pullPolicy: IfNotPresent clusterConfig: + metadataManager: kraft authentication: - authenticationClass: test-kafka-client-auth-tls tls: diff --git a/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 b/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 index e88820f0..93e1d415 100644 --- a/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 @@ -32,6 +32,9 @@ spec: productVersion: "{{ test_scenario['values']['upgrade_old'] }}" pullPolicy: IfNotPresent clusterConfig: + # Need to set this explicitly because the default would be zookeeper for 3.9.1 + # but we don't want to test zookeeper -> kraft migration here + metadataManager: kraft {% if test_scenario['values']['use-client-auth-tls'] == 'true' %} authentication: - authenticationClass: test-kafka-client-auth-tls