Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.

- Support objectOverrides using `.spec.objectOverrides`.
See [objectOverrides concepts page](https://docs.stackable.tech/home/nightly/concepts/overrides/#object-overrides) for details ([#927]).
- Added support for OPA/TLS ([#928]).
- Added experimental support for `4.1.1` ([#929])
- Enable the [restart-controller](https://docs.stackable.tech/home/nightly/commons-operator/restarter/), so that the Pods are automatically restarted on config changes ([#930], [#932]).

Expand All @@ -27,6 +28,7 @@ All notable changes to this project will be documented in this file.
[#915]: https://github.com/stackabletech/kafka-operator/pull/915
[#925]: https://github.com/stackabletech/kafka-operator/pull/925
[#927]: https://github.com/stackabletech/kafka-operator/pull/927
[#928]: https://github.com/stackabletech/kafka-operator/pull/928
[#929]: https://github.com/stackabletech/kafka-operator/pull/929
[#930]: https://github.com/stackabletech/kafka-operator/pull/930
[#932]: https://github.com/stackabletech/kafka-operator/pull/932
Expand Down
3 changes: 3 additions & 0 deletions rust/operator-binary/src/config/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub fn broker_kafka_container_commands(
containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &
{set_realm_env}

{import_opa_tls_cert}

{broker_start_command}

wait_for_termination $!
Expand All @@ -42,6 +44,7 @@ pub fn broker_kafka_container_commands(
true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {STACKABLE_KERBEROS_KRB5_PATH})"),
false => "".to_string(),
},
import_opa_tls_cert = kafka_security.copy_opa_tls_cert_command(),
broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version),
}
}
Expand Down
68 changes: 67 additions & 1 deletion rust/operator-binary/src/crd/authorization.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,78 @@
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_operator::{
commons::opa::OpaConfig,
client::Client,
commons::opa::{OpaApiVersion, OpaConfig},
k8s_openapi::api::core::v1::ConfigMap,
schemars::{self, JsonSchema},
};

use crate::crd::v1alpha1;

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("Failed to fetch OPA ConfigMap {configmap_name}"))]
FetchOpaConfigMap {
source: stackable_operator::client::Error,
configmap_name: String,
namespace: String,
},

#[snafu(display("invalid OpaConfig"))]
InvalidOpaConfig {
source: stackable_operator::commons::opa::Error,
},

#[snafu(display("object defines no namespace"))]
ObjectHasNoNamespace,
}

#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct KafkaAuthorization {
// no doc - docs in the OpaConfig struct.
pub opa: Option<OpaConfig>,
}

pub struct KafkaAuthorizationConfig {
pub opa_connect: String,
pub secret_class: Option<String>,
}

impl KafkaAuthorization {
pub async fn get_opa_config(
self,
client: &Client,
kafka: &v1alpha1::KafkaCluster,
) -> Result<Option<KafkaAuthorizationConfig>, Error> {
let auth_config = if let Some(opa) = self.opa {
let namespace = kafka
.metadata
.namespace
.as_deref()
.context(ObjectHasNoNamespaceSnafu)?;
// Resolve the secret class from the ConfigMap
let secret_class = client
.get::<ConfigMap>(&opa.config_map_name, namespace)
.await
.with_context(|_| FetchOpaConfigMapSnafu {
configmap_name: &opa.config_map_name,
namespace,
})?
.data
.and_then(|mut data| data.remove("OPA_SECRET_CLASS"));
let opa_connect = opa
.full_document_url_from_config_map(client, kafka, Some("allow"), OpaApiVersion::V1)
.await
.context(InvalidOpaConfigSnafu)?;
Some(KafkaAuthorizationConfig {
opa_connect,
secret_class,
})
} else {
None
};

Ok(auth_config)
}
}
4 changes: 4 additions & 0 deletions rust/operator-binary/src/crd/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ mod tests {
}]),
"internalTls".to_string(),
Some("tls".to_string()),
None,
);
let cluster_info = default_cluster_info();
// "simple-kafka-broker-default"
Expand Down Expand Up @@ -462,6 +463,7 @@ mod tests {
ResolvedAuthenticationClasses::new(vec![]),
"tls".to_string(),
Some("tls".to_string()),
None,
);
let config =
get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info)
Expand Down Expand Up @@ -518,6 +520,7 @@ mod tests {
ResolvedAuthenticationClasses::new(vec![]),
"".to_string(),
None,
None,
);

let config =
Expand Down Expand Up @@ -605,6 +608,7 @@ mod tests {
}]),
"tls".to_string(),
Some("tls".to_string()),
None,
);
let cluster_info = default_cluster_info();
// "simple-kafka-broker-default"
Expand Down
63 changes: 63 additions & 0 deletions rust/operator-binary/src/crd/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,19 @@ pub enum Error {

#[snafu(display("kerberos enablement requires TLS activation"))]
KerberosRequiresTls,

#[snafu(display("failed to build OPA TLS certificate volume"))]
OpaTlsCertSecretClassVolumeBuild {
source: stackable_operator::builder::pod::volume::SecretOperatorVolumeSourceBuilderError,
},
}

/// Helper struct combining TLS settings for server and internal with the resolved AuthenticationClasses
pub struct KafkaTlsSecurity {
resolved_authentication_classes: ResolvedAuthenticationClasses,
internal_secret_class: String,
server_secret_class: Option<String>,
opa_secret_class: Option<String>,
}

impl KafkaTlsSecurity {
Expand All @@ -75,6 +81,9 @@ impl KafkaTlsSecurity {
pub const INTERNAL_PORT: u16 = 19092;
// - TLS internal
const INTER_BROKER_LISTENER_NAME: &'static str = "inter.broker.listener.name";
const OPA_TLS_MOUNT_PATH: &str = "/stackable/tls-opa";
// opa
const OPA_TLS_VOLUME_NAME: &str = "tls-opa";
pub const SECURE_BOOTSTRAP_PORT: u16 = 9095;
pub const SECURE_CLIENT_PORT: u16 = 9093;
pub const SECURE_CLIENT_PORT_NAME: &'static str = "kafka-tls";
Expand All @@ -94,11 +103,13 @@ impl KafkaTlsSecurity {
resolved_authentication_classes: ResolvedAuthenticationClasses,
internal_secret_class: String,
server_secret_class: Option<String>,
opa_secret_class: Option<String>,
) -> Self {
Self {
resolved_authentication_classes,
internal_secret_class,
server_secret_class,
opa_secret_class,
}
}

Expand All @@ -107,6 +118,7 @@ impl KafkaTlsSecurity {
pub async fn new_from_kafka_cluster(
client: &Client,
kafka: &v1alpha1::KafkaCluster,
opa_secret_class: Option<String>,
) -> Result<Self, Error> {
Ok(KafkaTlsSecurity {
resolved_authentication_classes: ResolvedAuthenticationClasses::from_references(
Expand All @@ -128,6 +140,7 @@ impl KafkaTlsSecurity {
.tls
.as_ref()
.and_then(|tls| tls.server_secret_class.clone()),
opa_secret_class,
})
}

Expand Down Expand Up @@ -166,6 +179,22 @@ impl KafkaTlsSecurity {
self.kerberos_secret_class().is_some()
}

fn has_opa_tls_enabled(&self) -> bool {
self.opa_secret_class.is_some()
}

pub fn copy_opa_tls_cert_command(&self) -> String {
match self.has_opa_tls_enabled() {
true => format!(
"keytool -importcert -file {opa_mount_path}/ca.crt -keystore {tls_dir}/truststore.p12 -storepass '{tls_password}' -alias opa-ca -noprompt",
opa_mount_path = Self::OPA_TLS_MOUNT_PATH,
tls_dir = Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR,
tls_password = Self::SSL_STORE_PASSWORD,
),
false => "".to_string(),
}
}

pub fn kerberos_secret_class(&self) -> Option<String> {
if let Some(kerberos) = self
.resolved_authentication_classes
Expand Down Expand Up @@ -486,6 +515,24 @@ impl KafkaTlsSecurity {
.context(AddVolumeMountSnafu)?;
}

if let Some(secret_class) = &self.opa_secret_class {
cb_kafka
.add_volume_mount(Self::OPA_TLS_VOLUME_NAME, Self::OPA_TLS_MOUNT_PATH)
.context(AddVolumeMountSnafu)?;

pod_builder
.add_volume(
VolumeBuilder::new(Self::OPA_TLS_VOLUME_NAME)
.ephemeral(
SecretOperatorVolumeSourceBuilder::new(secret_class)
.build()
.context(OpaTlsCertSecretClassVolumeBuildSnafu)?,
)
.build(),
)
.context(AddVolumeSnafu)?;
}

Ok(())
}

Expand Down Expand Up @@ -664,6 +711,22 @@ impl KafkaTlsSecurity {
);
}

//OPA Tls
if self.opa_secret_class.is_some() {
config.insert(
"opa.authorizer.truststore.path".to_string(),
format!("{}/truststore.p12", Self::STACKABLE_TLS_KAFKA_INTERNAL_DIR),
);
config.insert(
"opa.authorizer.truststore.password".to_string(),
Self::SSL_STORE_PASSWORD.to_string(),
);
config.insert(
"opa.authorizer.truststore.type".to_string(),
"PKCS12".to_string(),
);
}

// common
config.insert(
Self::INTER_BROKER_LISTENER_NAME.to_string(),
Expand Down
49 changes: 27 additions & 22 deletions rust/operator-binary/src/kafka_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use snafu::{ResultExt, Snafu};
use stackable_operator::{
cluster_resources::{ClusterResourceApplyStrategy, ClusterResources},
commons::{
opa::OpaApiVersion,
product_image_selection::{self},
rbac::build_rbac_resources,
},
Expand Down Expand Up @@ -36,7 +35,7 @@ use strum::{EnumDiscriminants, IntoStaticStr};
use crate::{
crd::{
self, APP_NAME, DOCKER_IMAGE_BASE_NAME, JVM_SECURITY_PROPERTIES_FILE, KafkaClusterStatus,
OPERATOR_NAME,
OPERATOR_NAME, authorization,
listener::get_kafka_listener_config,
role::{
AnyConfig, KafkaRole, broker::BROKER_PROPERTIES_FILE,
Expand Down Expand Up @@ -122,11 +121,6 @@ pub enum Error {
source: stackable_operator::cluster_resources::Error,
},

#[snafu(display("invalid OpaConfig"))]
InvalidOpaConfig {
source: stackable_operator::commons::opa::Error,
},

#[snafu(display("failed to delete orphaned resources"))]
DeleteOrphans {
source: stackable_operator::cluster_resources::Error,
Expand Down Expand Up @@ -212,6 +206,9 @@ pub enum Error {
BuildListener {
source: crate::resource::listener::Error,
},

#[snafu(display("object defines no namespace"))]
GetOpaConfig { source: authorization::Error },
}
type Result<T, E = Error> = std::result::Result<T, E>;

Expand All @@ -231,7 +228,6 @@ impl ReconcilerError for Error {
Error::InvalidProductConfig { .. } => None,
Error::BuildDiscoveryConfig { .. } => None,
Error::ApplyDiscoveryConfig { .. } => None,
Error::InvalidOpaConfig { .. } => None,
Error::DeleteOrphans { .. } => None,
Error::FailedToInitializeSecurityContext { .. } => None,
Error::CreateClusterResources { .. } => None,
Expand All @@ -253,6 +249,7 @@ impl ReconcilerError for Error {
Error::BuildListener { .. } => None,
Error::InvalidKafkaListeners { .. } => None,
Error::BuildPodDescriptors { .. } => None,
Error::GetOpaConfig { .. } => None,
}
}
}
Expand Down Expand Up @@ -298,7 +295,28 @@ pub async fn reconcile_kafka(
&ctx.product_config,
)?;

let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, kafka)
// Assemble the OPA connection string from the discovery and the given path if provided
// Will be passed as --override parameter in the cli in the stateful set
let opa_config = &kafka
.spec
.cluster_config
.authorization
.clone()
.get_opa_config(client, kafka)
.await
.context(GetOpaConfigSnafu)?;

let opa_connect = opa_config
.as_ref()
.map(|auth_config| auth_config.opa_connect.clone());

let opa_secret_class = if let Some(opa_config) = opa_config.as_ref() {
opa_config.secret_class.to_owned()
} else {
None
};

let kafka_security = KafkaTlsSecurity::new_from_kafka_cluster(client, kafka, opa_secret_class)
.await
.context(FailedToInitializeSecurityContextSnafu)?;

Expand All @@ -314,19 +332,6 @@ pub async fn reconcile_kafka(
.validate_authentication_methods()
.context(FailedToValidateAuthenticationMethodSnafu)?;

// Assemble the OPA connection string from the discovery and the given path if provided
// Will be passed as --override parameter in the cli in the state ful set
let opa_connect = if let Some(opa_spec) = &kafka.spec.cluster_config.authorization.opa {
Some(
opa_spec
.full_document_url_from_config_map(client, kafka, Some("allow"), OpaApiVersion::V1)
.await
.context(InvalidOpaConfigSnafu)?,
)
} else {
None
};

let mut ss_cond_builder = StatefulSetConditionBuilder::default();

let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
Expand Down
2 changes: 2 additions & 0 deletions tests/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ releases:
operatorVersion: 0.0.0-dev
kafka:
operatorVersion: 0.0.0-dev
opa:
operatorVersion: 0.0.0-dev
9 changes: 9 additions & 0 deletions tests/templates/kuttl/opa/00-patch-ns.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% if test_scenario['values']['openshift'] == 'true' %}
# see https://github.com/stackabletech/issues/issues/566
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}'
timeout: 120
{% endif %}
Loading