From 769fc74c76a8bd05d404f99a1905d058852f3e70 Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Wed, 28 Jan 2026 15:05:22 +0100 Subject: [PATCH 1/3] auth v1.5 plumbing and ACL coverage --- Makefile | 5 +- cmd/broker/acl_test.go | 515 ++++++++++++++ cmd/broker/auth_metrics.go | 72 ++ cmd/broker/main.go | 627 +++++++++++++++++- .../templates/operator-deployment.yaml | 24 + .../kafscale/templates/proxy-service.yaml | 8 + deploy/helm/kafscale/values.yaml | 10 + docs/mcp.md | 4 +- docs/metrics.md | 1 + docs/operations.md | 64 ++ docs/protocol.md | 5 +- docs/roadmap.md | 12 +- docs/security.md | 68 +- internal/console/auth.go | 65 ++ pkg/acl/acl.go | 148 +++++ pkg/acl/acl_test.go | 90 +++ pkg/broker/conn_context.go | 48 ++ pkg/broker/proxyproto.go | 179 +++++ pkg/broker/server.go | 25 +- pkg/operator/cluster_controller.go | 18 + pkg/protocol/errors.go | 2 + test/e2e/acl_test.go | 141 ++++ 22 files changed, 2086 insertions(+), 45 deletions(-) create mode 100644 cmd/broker/acl_test.go create mode 100644 cmd/broker/auth_metrics.go create mode 100644 pkg/acl/acl.go create mode 100644 pkg/acl/acl_test.go create mode 100644 pkg/broker/conn_context.go create mode 100644 pkg/broker/proxyproto.go create mode 100644 test/e2e/acl_test.go diff --git a/Makefile b/Makefile index 2363ba0..245a79f 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -.PHONY: proto build test tidy lint generate docker-build docker-build-e2e-client docker-build-etcd-tools docker-clean ensure-minio start-minio stop-containers release-broker-ports test-produce-consume test-produce-consume-debug test-consumer-group test-ops-api test-mcp test-multi-segment-durability test-full test-operator demo demo-platform demo-platform-bootstrap iceberg-demo kafsql-demo platform-demo help clean-kind-all +.PHONY: proto build test tidy lint generate docker-build docker-build-e2e-client docker-build-etcd-tools docker-clean ensure-minio start-minio stop-containers release-broker-ports test-produce-consume test-produce-consume-debug test-consumer-group test-ops-api test-mcp test-multi-segment-durability test-full test-operator test-acl demo demo-platform demo-platform-bootstrap iceberg-demo kafsql-demo platform-demo help clean-kind-all REGISTRY ?= ghcr.io/kafscale STAMP_DIR ?= .build @@ -84,6 +84,9 @@ test: ## Run unit tests + vet + race go vet ./... go test -race ./... +test-acl: ## Run ACL e2e test (requires KAFSCALE_E2E=1) + KAFSCALE_E2E=1 go test -tags=e2e ./test/e2e -run TestACLsE2E + docker-build: docker-build-broker docker-build-operator docker-build-console docker-build-proxy docker-build-mcp docker-build-e2e-client docker-build-etcd-tools docker-build-sql-processor ## Build all container images @mkdir -p $(STAMP_DIR) diff --git a/cmd/broker/acl_test.go b/cmd/broker/acl_test.go new file mode 100644 index 0000000..65edcc8 --- /dev/null +++ b/cmd/broker/acl_test.go @@ -0,0 +1,515 @@ +// Copyright 2025, 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "context" + "encoding/binary" + "io" + "testing" + + "github.com/KafScale/platform/pkg/metadata" + "github.com/KafScale/platform/pkg/protocol" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func TestACLProduceDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[{"action":"fetch","resource":"topic","name":"orders"}]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.ProduceRequest{ + Acks: -1, + TimeoutMs: 1000, + Topics: []protocol.ProduceTopic{ + { + Name: "orders", + Partitions: []protocol.ProducePartition{ + {Partition: 0, Records: testBatchBytes(0, 0, 1)}, + }, + }, + }, + } + payload, err := handler.handleProduce(context.Background(), &protocol.RequestHeader{CorrelationID: 1, APIVersion: 0, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("handleProduce: %v", err) + } + resp := decodeProduceResponse(t, payload, 0) + if len(resp.Topics) != 1 || len(resp.Topics[0].Partitions) != 1 { + t.Fatalf("expected single topic/partition response") + } + if resp.Topics[0].Partitions[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %d", resp.Topics[0].Partitions[0].ErrorCode) + } + offset, err := store.NextOffset(context.Background(), "orders", 0) + if err != nil { + t.Fatalf("NextOffset: %v", err) + } + if offset != 0 { + t.Fatalf("expected offset unchanged, got %d", offset) + } +} + +func TestACLJoinGroupDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + joinReq := &protocol.JoinGroupRequest{GroupID: "group-a"} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 5, APIVersion: 4, ClientID: &clientID}, joinReq) + if err != nil { + t.Fatalf("Handle JoinGroup: %v", err) + } + resp := decodeJoinGroupResponse(t, payload, 4) + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.ErrorCode) + } +} + +func TestACLListOffsetsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.ListOffsetsRequest{ + Topics: []protocol.ListOffsetsTopic{ + { + Name: "orders", + Partitions: []protocol.ListOffsetsPartition{ + {Partition: 0, Timestamp: -1, MaxNumOffsets: 1}, + }, + }, + }, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 7, APIVersion: 4, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle ListOffsets: %v", err) + } + resp := decodeListOffsetsResponse(t, 4, payload) + if len(resp.Topics) != 1 || len(resp.Topics[0].Partitions) != 1 { + t.Fatalf("expected single topic/partition response") + } + if resp.Topics[0].Partitions[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %d", resp.Topics[0].Partitions[0].ErrorCode) + } +} + +func TestACLOffsetFetchDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.OffsetFetchRequest{ + GroupID: "group-a", + Topics: []protocol.OffsetFetchTopic{ + { + Name: "orders", + Partitions: []protocol.OffsetFetchPartition{ + {Partition: 0}, + }, + }, + }, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 8, APIVersion: 5, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle OffsetFetch: %v", err) + } + resp := decodeOffsetFetchResponse(t, payload, 5) + if len(resp.Topics) != 1 || len(resp.Topics[0].Partitions) != 1 { + t.Fatalf("expected single topic/partition response") + } + if resp.Topics[0].Partitions[0].ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.Topics[0].Partitions[0].ErrorCode) + } + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected top-level group auth failed, got %d", resp.ErrorCode) + } +} + +func TestACLSyncGroupDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.SyncGroupRequest{GroupID: "group-a", GenerationID: 1, MemberID: "member-a"} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 9, APIVersion: 4, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle SyncGroup: %v", err) + } + resp := decodeSyncGroupResponse(t, payload, 4) + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.ErrorCode) + } +} + +func TestACLHeartbeatDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.HeartbeatRequest{GroupID: "group-a", GenerationID: 1, MemberID: "member-a"} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 10, APIVersion: 4, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle Heartbeat: %v", err) + } + resp := decodeHeartbeatResponse(t, payload, 4) + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.ErrorCode) + } +} + +func TestACLLeaveGroupDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.LeaveGroupRequest{GroupID: "group-a", MemberID: "member-a"} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 11, APIVersion: 0, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle LeaveGroup: %v", err) + } + resp := decodeLeaveGroupResponse(t, payload) + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.ErrorCode) + } +} + +func TestACLOffsetCommitDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.OffsetCommitRequest{ + GroupID: "group-a", + Topics: []protocol.OffsetCommitTopic{ + { + Name: "orders", + Partitions: []protocol.OffsetCommitPartition{ + {Partition: 0, Offset: 1, Metadata: ""}, + }, + }, + }, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 12, APIVersion: 3, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle OffsetCommit: %v", err) + } + resp := decodeOffsetCommitResponse(t, payload, 3) + if len(resp.Topics) == 0 || len(resp.Topics[0].Partitions) == 0 { + t.Fatalf("expected offset commit response") + } + if resp.Topics[0].Partitions[0].ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.Topics[0].Partitions[0].ErrorCode) + } +} + +func TestACLCreateTopicsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.CreateTopicsRequest{ + Topics: []protocol.CreateTopicConfig{{Name: "orders", NumPartitions: 1, ReplicationFactor: 1}}, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 13, APIVersion: 0, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle CreateTopics: %v", err) + } + resp := decodeCreateTopicsResponse(t, payload, 0) + if len(resp.Topics) != 1 || resp.Topics[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %+v", resp.Topics) + } +} + +func TestACLDeleteTopicsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.DeleteTopicsRequest{TopicNames: []string{"orders"}} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 14, APIVersion: 0, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle DeleteTopics: %v", err) + } + resp := decodeDeleteTopicsResponse(t, payload, 0) + if len(resp.Topics) != 1 || resp.Topics[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %+v", resp.Topics) + } +} + +func TestACLAlterConfigsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.AlterConfigsRequest{ + Resources: []protocol.AlterConfigsResource{ + { + ResourceType: protocol.ConfigResourceTopic, + ResourceName: "orders", + }, + }, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 15, APIVersion: 1, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle AlterConfigs: %v", err) + } + resp := decodeAlterConfigsResponse(t, payload, 1) + if len(resp.Resources) != 1 || resp.Resources[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %+v", resp.Resources) + } +} + +func TestACLCreatePartitionsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.CreatePartitionsRequest{ + Topics: []protocol.CreatePartitionsTopic{{Name: "orders", Count: 2}}, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 16, APIVersion: 3, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle CreatePartitions: %v", err) + } + resp := decodeCreatePartitionsResponse(t, payload, 3) + if len(resp.Topics) != 1 || resp.Topics[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %+v", resp.Topics) + } +} + +func TestACLDeleteGroupsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.DeleteGroupsRequest{Groups: []string{"group-a"}} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 17, APIVersion: 2, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle DeleteGroups: %v", err) + } + resp := decodeDeleteGroupsResponse(t, payload, 2) + if len(resp.Groups) != 1 || resp.Groups[0].ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %+v", resp.Groups) + } +} + +func decodeOffsetFetchResponse(t *testing.T, payload []byte, version int16) *protocol.OffsetFetchResponse { + t.Helper() + reader := bytes.NewReader(payload) + resp := &protocol.OffsetFetchResponse{} + if err := binary.Read(reader, binary.BigEndian, &resp.CorrelationID); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if version >= 3 { + if err := binary.Read(reader, binary.BigEndian, &resp.ThrottleMs); err != nil { + t.Fatalf("read throttle: %v", err) + } + } + var topicCount int32 + if err := binary.Read(reader, binary.BigEndian, &topicCount); err != nil { + t.Fatalf("read topic count: %v", err) + } + resp.Topics = make([]protocol.OffsetFetchTopicResponse, 0, topicCount) + for i := 0; i < int(topicCount); i++ { + name := readKafkaString(t, reader) + var partCount int32 + if err := binary.Read(reader, binary.BigEndian, &partCount); err != nil { + t.Fatalf("read partition count: %v", err) + } + topic := protocol.OffsetFetchTopicResponse{Name: name} + topic.Partitions = make([]protocol.OffsetFetchPartitionResponse, 0, partCount) + for j := 0; j < int(partCount); j++ { + var part protocol.OffsetFetchPartitionResponse + if err := binary.Read(reader, binary.BigEndian, &part.Partition); err != nil { + t.Fatalf("read partition id: %v", err) + } + if err := binary.Read(reader, binary.BigEndian, &part.Offset); err != nil { + t.Fatalf("read offset: %v", err) + } + if version >= 5 { + if err := binary.Read(reader, binary.BigEndian, &part.LeaderEpoch); err != nil { + t.Fatalf("read leader epoch: %v", err) + } + } + part.Metadata = readKafkaNullableString(t, reader) + if err := binary.Read(reader, binary.BigEndian, &part.ErrorCode); err != nil { + t.Fatalf("read error code: %v", err) + } + topic.Partitions = append(topic.Partitions, part) + } + resp.Topics = append(resp.Topics, topic) + } + if version >= 2 { + if err := binary.Read(reader, binary.BigEndian, &resp.ErrorCode); err != nil { + t.Fatalf("read error code: %v", err) + } + } + return resp +} + +func decodeSyncGroupResponse(t *testing.T, payload []byte, version int16) *kmsg.SyncGroupResponse { + t.Helper() + reader := bytes.NewReader(payload) + var corr int32 + if err := binary.Read(reader, binary.BigEndian, &corr); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if version >= 4 { + skipTaggedFields(t, reader) + } + body, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("read response body: %v", err) + } + resp := kmsg.NewPtrSyncGroupResponse() + resp.Version = version + if err := resp.ReadFrom(body); err != nil { + t.Fatalf("decode sync group response: %v", err) + } + return resp +} + +func decodeHeartbeatResponse(t *testing.T, payload []byte, version int16) *kmsg.HeartbeatResponse { + t.Helper() + reader := bytes.NewReader(payload) + var corr int32 + if err := binary.Read(reader, binary.BigEndian, &corr); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if version >= 4 { + skipTaggedFields(t, reader) + } + body, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("read response body: %v", err) + } + resp := kmsg.NewPtrHeartbeatResponse() + resp.Version = version + if err := resp.ReadFrom(body); err != nil { + t.Fatalf("decode heartbeat response: %v", err) + } + return resp +} + +func decodeLeaveGroupResponse(t *testing.T, payload []byte) *protocol.LeaveGroupResponse { + t.Helper() + reader := bytes.NewReader(payload) + resp := &protocol.LeaveGroupResponse{} + if err := binary.Read(reader, binary.BigEndian, &resp.CorrelationID); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if err := binary.Read(reader, binary.BigEndian, &resp.ErrorCode); err != nil { + t.Fatalf("read error code: %v", err) + } + return resp +} + +func decodeAlterConfigsResponse(t *testing.T, payload []byte, version int16) *protocol.AlterConfigsResponse { + t.Helper() + if version != 1 { + t.Fatalf("alter configs decode only supports version 1") + } + reader := bytes.NewReader(payload) + resp := &protocol.AlterConfigsResponse{} + if err := binary.Read(reader, binary.BigEndian, &resp.CorrelationID); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if err := binary.Read(reader, binary.BigEndian, &resp.ThrottleMs); err != nil { + t.Fatalf("read throttle ms: %v", err) + } + var count int32 + if err := binary.Read(reader, binary.BigEndian, &count); err != nil { + t.Fatalf("read resource count: %v", err) + } + resp.Resources = make([]protocol.AlterConfigsResponseResource, 0, count) + for i := 0; i < int(count); i++ { + var code int16 + if err := binary.Read(reader, binary.BigEndian, &code); err != nil { + t.Fatalf("read error code: %v", err) + } + msg := readKafkaNullableString(t, reader) + var rtype int8 + if err := binary.Read(reader, binary.BigEndian, &rtype); err != nil { + t.Fatalf("read resource type: %v", err) + } + name := readKafkaString(t, reader) + resp.Resources = append(resp.Resources, protocol.AlterConfigsResponseResource{ + ErrorCode: code, + ErrorMessage: msg, + ResourceType: rtype, + ResourceName: name, + }) + } + return resp +} + +func readKafkaNullableString(t *testing.T, reader *bytes.Reader) *string { + t.Helper() + var length int16 + if err := binary.Read(reader, binary.BigEndian, &length); err != nil { + t.Fatalf("read nullable string length: %v", err) + } + if length < 0 { + return nil + } + buf := make([]byte, length) + if _, err := reader.Read(buf); err != nil { + t.Fatalf("read nullable string: %v", err) + } + value := string(buf) + return &value +} diff --git a/cmd/broker/auth_metrics.go b/cmd/broker/auth_metrics.go new file mode 100644 index 0000000..7562624 --- /dev/null +++ b/cmd/broker/auth_metrics.go @@ -0,0 +1,72 @@ +// Copyright 2025, 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "io" + "sync" + "sync/atomic" + + "github.com/KafScale/platform/pkg/acl" +) + +type authMetrics struct { + deniedTotal uint64 + mu sync.Mutex + byKey map[string]uint64 +} + +func newAuthMetrics() *authMetrics { + return &authMetrics{byKey: make(map[string]uint64)} +} + +func (m *authMetrics) RecordDenied(action acl.Action, resource acl.Resource) { + if m == nil { + return + } + atomic.AddUint64(&m.deniedTotal, 1) + key := fmt.Sprintf("%s|%s", action, resource) + m.mu.Lock() + m.byKey[key]++ + m.mu.Unlock() +} + +func (m *authMetrics) writePrometheus(w io.Writer) { + if m == nil { + return + } + total := atomic.LoadUint64(&m.deniedTotal) + fmt.Fprintln(w, "# HELP kafscale_authz_denied_total Authorization denials across broker APIs.") + fmt.Fprintln(w, "# TYPE kafscale_authz_denied_total counter") + fmt.Fprintf(w, "kafscale_authz_denied_total %d\n", total) + + m.mu.Lock() + defer m.mu.Unlock() + for key, count := range m.byKey { + action, resource := splitAuthMetricKey(key) + fmt.Fprintf(w, "kafscale_authz_denied_total{action=%q,resource=%q} %d\n", action, resource, count) + } +} + +func splitAuthMetricKey(key string) (string, string) { + for i := 0; i < len(key); i++ { + if key[i] == '|' { + return key[:i], key[i+1:] + } + } + return key, "" +} diff --git a/cmd/broker/main.go b/cmd/broker/main.go index d3b8182..d6de330 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -17,6 +17,7 @@ package main import ( "context" + "encoding/json" "errors" "fmt" "log/slog" @@ -30,6 +31,7 @@ import ( "syscall" "time" + "github.com/KafScale/platform/pkg/acl" "github.com/KafScale/platform/pkg/broker" "github.com/KafScale/platform/pkg/cache" controlpb "github.com/KafScale/platform/pkg/gen/control" @@ -85,6 +87,10 @@ type handler struct { flushInterval time.Duration flushOnAck bool adminMetrics *adminMetrics + authorizer *acl.Authorizer + authMetrics *authMetrics + authLogMu sync.Mutex + authLogLast map[string]time.Time } type etcdAvailability interface { @@ -95,6 +101,7 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re if h.traceKafka { h.logger.Debug("received request", "api_key", header.APIKey, "api_version", header.APIVersion, "correlation", header.CorrelationID, "client_id", header.ClientID) } + principal := principalFromContext(ctx, header) switch req.(type) { case *protocol.ApiVersionsRequest: errorCode := protocol.NONE @@ -207,6 +214,15 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re } return protocol.EncodeFindCoordinatorResponse(resp, header.APIVersion) case *protocol.JoinGroupRequest: + req := req.(*protocol.JoinGroupRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupWrite) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupWrite, acl.ResourceGroup, req.GroupID) + return protocol.EncodeJoinGroupResponse(&protocol.JoinGroupResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }, header.APIVersion) + } if !h.etcdAvailable() { return protocol.EncodeJoinGroupResponse(&protocol.JoinGroupResponse{ CorrelationID: header.CorrelationID, @@ -214,12 +230,21 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } - resp, err := h.coordinator.JoinGroup(ctx, req.(*protocol.JoinGroupRequest), header.CorrelationID) + resp, err := h.coordinator.JoinGroup(ctx, req, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeJoinGroupResponse(resp, header.APIVersion) case *protocol.SyncGroupRequest: + req := req.(*protocol.SyncGroupRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupWrite) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupWrite, acl.ResourceGroup, req.GroupID) + return protocol.EncodeSyncGroupResponse(&protocol.SyncGroupResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }, header.APIVersion) + } if !h.etcdAvailable() { return protocol.EncodeSyncGroupResponse(&protocol.SyncGroupResponse{ CorrelationID: header.CorrelationID, @@ -227,15 +252,30 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } - resp, err := h.coordinator.SyncGroup(ctx, req.(*protocol.SyncGroupRequest), header.CorrelationID) + resp, err := h.coordinator.SyncGroup(ctx, req, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeSyncGroupResponse(resp, header.APIVersion) case *protocol.DescribeGroupsRequest: + req := req.(*protocol.DescribeGroupsRequest) + if !h.allowGroups(principal, req.Groups, acl.ActionGroupRead) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupRead, acl.ResourceGroup, strings.Join(req.Groups, ",")) + results := make([]protocol.DescribeGroupsResponseGroup, 0, len(req.Groups)) + for _, groupID := range req.Groups { + results = append(results, protocol.DescribeGroupsResponseGroup{ + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + GroupID: groupID, + }) + } + return protocol.EncodeDescribeGroupsResponse(&protocol.DescribeGroupsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Groups: results, + }, header.APIVersion) + } return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { if !h.etcdAvailable() { - req := req.(*protocol.DescribeGroupsRequest) results := make([]protocol.DescribeGroupsResponseGroup, 0, len(req.Groups)) for _, groupID := range req.Groups { results = append(results, protocol.DescribeGroupsResponseGroup{ @@ -249,13 +289,22 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re Groups: results, }, header.APIVersion) } - resp, err := h.coordinator.DescribeGroups(ctx, req.(*protocol.DescribeGroupsRequest), header.CorrelationID) + resp, err := h.coordinator.DescribeGroups(ctx, req, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeDescribeGroupsResponse(resp, header.APIVersion) }) case *protocol.ListGroupsRequest: + if !h.allowGroup(principal, "*", acl.ActionGroupRead) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupRead, acl.ResourceGroup, "*") + return protocol.EncodeListGroupsResponse(&protocol.ListGroupsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + Groups: nil, + }, header.APIVersion) + } return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { if !h.etcdAvailable() { return protocol.EncodeListGroupsResponse(&protocol.ListGroupsResponse{ @@ -272,6 +321,15 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re return protocol.EncodeListGroupsResponse(resp, header.APIVersion) }) case *protocol.HeartbeatRequest: + req := req.(*protocol.HeartbeatRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupWrite) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupWrite, acl.ResourceGroup, req.GroupID) + return protocol.EncodeHeartbeatResponse(&protocol.HeartbeatResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }, header.APIVersion) + } if !h.etcdAvailable() { return protocol.EncodeHeartbeatResponse(&protocol.HeartbeatResponse{ CorrelationID: header.CorrelationID, @@ -279,20 +337,50 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } - resp := h.coordinator.Heartbeat(ctx, req.(*protocol.HeartbeatRequest), header.CorrelationID) + resp := h.coordinator.Heartbeat(ctx, req, header.CorrelationID) return protocol.EncodeHeartbeatResponse(resp, header.APIVersion) case *protocol.LeaveGroupRequest: + req := req.(*protocol.LeaveGroupRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupWrite) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupWrite, acl.ResourceGroup, req.GroupID) + return protocol.EncodeLeaveGroupResponse(&protocol.LeaveGroupResponse{ + CorrelationID: header.CorrelationID, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }) + } if !h.etcdAvailable() { return protocol.EncodeLeaveGroupResponse(&protocol.LeaveGroupResponse{ CorrelationID: header.CorrelationID, ErrorCode: protocol.REQUEST_TIMED_OUT, }) } - resp := h.coordinator.LeaveGroup(ctx, req.(*protocol.LeaveGroupRequest), header.CorrelationID) + resp := h.coordinator.LeaveGroup(ctx, req, header.CorrelationID) return protocol.EncodeLeaveGroupResponse(resp) case *protocol.OffsetCommitRequest: + req := req.(*protocol.OffsetCommitRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupWrite) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupWrite, acl.ResourceGroup, req.GroupID) + topics := make([]protocol.OffsetCommitTopicResponse, 0, len(req.Topics)) + for _, topic := range req.Topics { + partitions := make([]protocol.OffsetCommitPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitions = append(partitions, protocol.OffsetCommitPartitionResponse{ + Partition: part.Partition, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }) + } + topics = append(topics, protocol.OffsetCommitTopicResponse{ + Name: topic.Name, + Partitions: partitions, + }) + } + return protocol.EncodeOffsetCommitResponse(&protocol.OffsetCommitResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: topics, + }) + } if !h.etcdAvailable() { - req := req.(*protocol.OffsetCommitRequest) topics := make([]protocol.OffsetCommitTopicResponse, 0, len(req.Topics)) for _, topic := range req.Topics { partitions := make([]protocol.OffsetCommitPartitionResponse, 0, len(topic.Partitions)) @@ -313,14 +401,39 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re Topics: topics, }) } - resp, err := h.coordinator.OffsetCommit(ctx, req.(*protocol.OffsetCommitRequest), header.CorrelationID) + resp, err := h.coordinator.OffsetCommit(ctx, req, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeOffsetCommitResponse(resp) case *protocol.OffsetFetchRequest: + req := req.(*protocol.OffsetFetchRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupRead) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupRead, acl.ResourceGroup, req.GroupID) + topics := make([]protocol.OffsetFetchTopicResponse, 0, len(req.Topics)) + for _, topic := range req.Topics { + partitions := make([]protocol.OffsetFetchPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitions = append(partitions, protocol.OffsetFetchPartitionResponse{ + Partition: part.Partition, + Offset: -1, + LeaderEpoch: -1, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }) + } + topics = append(topics, protocol.OffsetFetchTopicResponse{ + Name: topic.Name, + Partitions: partitions, + }) + } + return protocol.EncodeOffsetFetchResponse(&protocol.OffsetFetchResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: topics, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }, header.APIVersion) + } if !h.etcdAvailable() { - req := req.(*protocol.OffsetFetchRequest) topics := make([]protocol.OffsetFetchTopicResponse, 0, len(req.Topics)) for _, topic := range req.Topics { partitions := make([]protocol.OffsetFetchPartitionResponse, 0, len(topic.Partitions)) @@ -344,14 +457,18 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } - resp, err := h.coordinator.OffsetFetch(ctx, req.(*protocol.OffsetFetchRequest), header.CorrelationID) + resp, err := h.coordinator.OffsetFetch(ctx, req, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeOffsetFetchResponse(resp, header.APIVersion) case *protocol.OffsetForLeaderEpochRequest: return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { - return h.handleOffsetForLeaderEpoch(ctx, header, req.(*protocol.OffsetForLeaderEpochRequest)) + offsetReq := req.(*protocol.OffsetForLeaderEpochRequest) + if !h.allowTopics(principal, topicsFromOffsetForLeaderEpoch(offsetReq), acl.ActionFetch) { + return h.unauthorizedOffsetForLeaderEpoch(principal, header, offsetReq) + } + return h.handleOffsetForLeaderEpoch(ctx, header, offsetReq) }) case *protocol.DescribeConfigsRequest: return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { @@ -359,18 +476,29 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re }) case *protocol.AlterConfigsRequest: return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { - return h.handleAlterConfigs(ctx, header, req.(*protocol.AlterConfigsRequest)) + alterReq := req.(*protocol.AlterConfigsRequest) + if !h.allowAdmin(principal) { + return h.unauthorizedAlterConfigs(principal, header, alterReq) + } + return h.handleAlterConfigs(ctx, header, alterReq) }) case *protocol.CreatePartitionsRequest: return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { - return h.handleCreatePartitions(ctx, header, req.(*protocol.CreatePartitionsRequest)) + createReq := req.(*protocol.CreatePartitionsRequest) + if !h.allowAdmin(principal) { + return h.unauthorizedCreatePartitions(principal, header, createReq) + } + return h.handleCreatePartitions(ctx, header, createReq) }) case *protocol.DeleteGroupsRequest: return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { + deleteReq := req.(*protocol.DeleteGroupsRequest) + if !h.allowGroups(principal, deleteReq.Groups, acl.ActionGroupAdmin) { + return h.unauthorizedDeleteGroups(principal, header, deleteReq) + } if !h.etcdAvailable() { - req := req.(*protocol.DeleteGroupsRequest) - results := make([]protocol.DeleteGroupsResponseGroup, 0, len(req.Groups)) - for _, groupID := range req.Groups { + results := make([]protocol.DeleteGroupsResponseGroup, 0, len(deleteReq.Groups)) + for _, groupID := range deleteReq.Groups { results = append(results, protocol.DeleteGroupsResponseGroup{ Group: groupID, ErrorCode: protocol.REQUEST_TIMED_OUT, @@ -382,18 +510,30 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re Groups: results, }, header.APIVersion) } - resp, err := h.coordinator.DeleteGroups(ctx, req.(*protocol.DeleteGroupsRequest), header.CorrelationID) + resp, err := h.coordinator.DeleteGroups(ctx, deleteReq, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeDeleteGroupsResponse(resp, header.APIVersion) }) case *protocol.CreateTopicsRequest: - return h.handleCreateTopics(ctx, header, req.(*protocol.CreateTopicsRequest)) + createReq := req.(*protocol.CreateTopicsRequest) + if !h.allowAdmin(principal) { + return h.unauthorizedCreateTopics(principal, header, createReq) + } + return h.handleCreateTopics(ctx, header, createReq) case *protocol.DeleteTopicsRequest: - return h.handleDeleteTopics(ctx, header, req.(*protocol.DeleteTopicsRequest)) + deleteReq := req.(*protocol.DeleteTopicsRequest) + if !h.allowAdmin(principal) { + return h.unauthorizedDeleteTopics(principal, header, deleteReq) + } + return h.handleDeleteTopics(ctx, header, deleteReq) case *protocol.ListOffsetsRequest: - return h.handleListOffsets(ctx, header, req.(*protocol.ListOffsetsRequest)) + listReq := req.(*protocol.ListOffsetsRequest) + if !h.allowTopics(principal, topicsFromListOffsets(listReq), acl.ActionFetch) { + return h.unauthorizedListOffsets(principal, header, listReq) + } + return h.handleListOffsets(ctx, header, listReq) default: return nil, ErrUnsupportedAPI } @@ -471,6 +611,7 @@ func (h *handler) metricsHandler(w http.ResponseWriter, r *http.Request) { } h.writeRuntimeMetrics(w) h.adminMetrics.writePrometheus(w) + h.authMetrics.writePrometheus(w) } func (h *handler) recordS3Op(op string, latency time.Duration, err error) { @@ -487,6 +628,35 @@ func (h *handler) recordProduceLatency(latency time.Duration) { h.produceLatency.Observe(float64(latency.Milliseconds())) } +func (h *handler) recordAuthzDenied(action acl.Action, resource acl.Resource) { + if h.authMetrics == nil { + return + } + h.authMetrics.RecordDenied(action, resource) +} + +func (h *handler) recordAuthzDeniedWithPrincipal(principal string, action acl.Action, resource acl.Resource, name string) { + h.recordAuthzDenied(action, resource) + h.logAuthzDenied(principal, action, resource, name) +} + +func (h *handler) logAuthzDenied(principal string, action acl.Action, resource acl.Resource, name string) { + if h == nil || h.logger == nil { + return + } + key := fmt.Sprintf("%s|%s|%s|%s", action, resource, principal, name) + now := time.Now() + h.authLogMu.Lock() + last := h.authLogLast[key] + if now.Sub(last) < time.Minute { + h.authLogMu.Unlock() + return + } + h.authLogLast[key] = now + h.authLogMu.Unlock() + h.logger.Warn("authorization denied", "principal", principal, "action", action, "resource", resource, "name", name) +} + func (h *handler) withAdminMetrics(apiKey int16, fn func() ([]byte, error)) ([]byte, error) { start := time.Now() payload, err := fn() @@ -494,6 +664,231 @@ func (h *handler) withAdminMetrics(apiKey int16, fn func() ([]byte, error)) ([]b return payload, err } +func principalFromContext(ctx context.Context, header *protocol.RequestHeader) string { + if info := broker.ConnInfoFromContext(ctx); info != nil { + if strings.TrimSpace(info.Principal) != "" { + return strings.TrimSpace(info.Principal) + } + } + if header == nil || header.ClientID == nil { + return "anonymous" + } + if strings.TrimSpace(*header.ClientID) == "" { + return "anonymous" + } + return *header.ClientID +} + +func (h *handler) allowTopic(principal string, topic string, action acl.Action) bool { + if h.authorizer == nil || !h.authorizer.Enabled() { + return true + } + return h.authorizer.Allows(principal, action, acl.ResourceTopic, topic) +} + +func (h *handler) allowTopics(principal string, topics []string, action acl.Action) bool { + for _, topic := range topics { + if !h.allowTopic(principal, topic, action) { + return false + } + } + return true +} + +func (h *handler) allowGroup(principal string, group string, action acl.Action) bool { + if h.authorizer == nil || !h.authorizer.Enabled() { + return true + } + return h.authorizer.Allows(principal, action, acl.ResourceGroup, group) +} + +func (h *handler) allowGroups(principal string, groups []string, action acl.Action) bool { + for _, group := range groups { + if !h.allowGroup(principal, group, action) { + return false + } + } + return true +} + +func (h *handler) allowCluster(principal string, action acl.Action) bool { + if h.authorizer == nil || !h.authorizer.Enabled() { + return true + } + return h.authorizer.Allows(principal, action, acl.ResourceCluster, "cluster") +} + +func (h *handler) allowAdmin(principal string) bool { + return h.allowCluster(principal, acl.ActionAdmin) +} + +func topicsFromListOffsets(req *protocol.ListOffsetsRequest) []string { + topics := make([]string, 0, len(req.Topics)) + for _, topic := range req.Topics { + topics = append(topics, topic.Name) + } + return topics +} + +func topicsFromOffsetForLeaderEpoch(req *protocol.OffsetForLeaderEpochRequest) []string { + topics := make([]string, 0, len(req.Topics)) + for _, topic := range req.Topics { + topics = append(topics, topic.Name) + } + return topics +} + +func topicsFromCreateTopics(req *protocol.CreateTopicsRequest) []string { + topics := make([]string, 0, len(req.Topics)) + for _, topic := range req.Topics { + topics = append(topics, topic.Name) + } + return topics +} + +func topicsFromCreatePartitions(req *protocol.CreatePartitionsRequest) []string { + topics := make([]string, 0, len(req.Topics)) + for _, topic := range req.Topics { + topics = append(topics, topic.Name) + } + return topics +} + +func (h *handler) unauthorizedOffsetForLeaderEpoch(principal string, header *protocol.RequestHeader, req *protocol.OffsetForLeaderEpochRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionFetch, acl.ResourceTopic, strings.Join(topicsFromOffsetForLeaderEpoch(req), ",")) + respTopics := make([]protocol.OffsetForLeaderEpochTopicResponse, 0, len(req.Topics)) + for _, topic := range req.Topics { + partitions := make([]protocol.OffsetForLeaderEpochPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitions = append(partitions, protocol.OffsetForLeaderEpochPartitionResponse{ + Partition: part.Partition, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + LeaderEpoch: -1, + EndOffset: -1, + }) + } + respTopics = append(respTopics, protocol.OffsetForLeaderEpochTopicResponse{ + Name: topic.Name, + Partitions: partitions, + }) + } + return protocol.EncodeOffsetForLeaderEpochResponse(&protocol.OffsetForLeaderEpochResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: respTopics, + }, header.APIVersion) +} + +func (h *handler) unauthorizedAlterConfigs(principal string, header *protocol.RequestHeader, req *protocol.AlterConfigsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceCluster, "cluster") + resources := make([]protocol.AlterConfigsResponseResource, 0, len(req.Resources)) + for _, resource := range req.Resources { + errorCode := protocol.CLUSTER_AUTHORIZATION_FAILED + if resource.ResourceType == protocol.ConfigResourceTopic { + errorCode = protocol.TOPIC_AUTHORIZATION_FAILED + } + resources = append(resources, protocol.AlterConfigsResponseResource{ + ErrorCode: errorCode, + ResourceType: resource.ResourceType, + ResourceName: resource.ResourceName, + }) + } + return protocol.EncodeAlterConfigsResponse(&protocol.AlterConfigsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Resources: resources, + }, header.APIVersion) +} + +func (h *handler) unauthorizedCreatePartitions(principal string, header *protocol.RequestHeader, req *protocol.CreatePartitionsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceTopic, strings.Join(topicsFromCreatePartitions(req), ",")) + results := make([]protocol.CreatePartitionsResponseTopic, 0, len(req.Topics)) + for _, topic := range req.Topics { + results = append(results, protocol.CreatePartitionsResponseTopic{ + Name: topic.Name, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + }) + } + return protocol.EncodeCreatePartitionsResponse(&protocol.CreatePartitionsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: results, + }, header.APIVersion) +} + +func (h *handler) unauthorizedDeleteGroups(principal string, header *protocol.RequestHeader, req *protocol.DeleteGroupsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupAdmin, acl.ResourceGroup, strings.Join(req.Groups, ",")) + results := make([]protocol.DeleteGroupsResponseGroup, 0, len(req.Groups)) + for _, groupID := range req.Groups { + results = append(results, protocol.DeleteGroupsResponseGroup{ + Group: groupID, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }) + } + return protocol.EncodeDeleteGroupsResponse(&protocol.DeleteGroupsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Groups: results, + }, header.APIVersion) +} + +func (h *handler) unauthorizedCreateTopics(principal string, header *protocol.RequestHeader, req *protocol.CreateTopicsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceTopic, strings.Join(topicsFromCreateTopics(req), ",")) + results := make([]protocol.CreateTopicResult, 0, len(req.Topics)) + for _, topic := range req.Topics { + results = append(results, protocol.CreateTopicResult{ + Name: topic.Name, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + ErrorMessage: "unauthorized", + }) + } + return protocol.EncodeCreateTopicsResponse(&protocol.CreateTopicsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: results, + }, header.APIVersion) +} + +func (h *handler) unauthorizedDeleteTopics(principal string, header *protocol.RequestHeader, req *protocol.DeleteTopicsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceTopic, strings.Join(req.TopicNames, ",")) + results := make([]protocol.DeleteTopicResult, 0, len(req.TopicNames)) + for _, name := range req.TopicNames { + results = append(results, protocol.DeleteTopicResult{ + Name: name, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + ErrorMessage: "unauthorized", + }) + } + return protocol.EncodeDeleteTopicsResponse(&protocol.DeleteTopicsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: results, + }, header.APIVersion) +} + +func (h *handler) unauthorizedListOffsets(principal string, header *protocol.RequestHeader, req *protocol.ListOffsetsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionFetch, acl.ResourceTopic, strings.Join(topicsFromListOffsets(req), ",")) + topicResponses := make([]protocol.ListOffsetsTopicResponse, 0, len(req.Topics)) + for _, topic := range req.Topics { + partitions := make([]protocol.ListOffsetsPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitions = append(partitions, protocol.ListOffsetsPartitionResponse{ + Partition: part.Partition, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + LeaderEpoch: -1, + }) + } + topicResponses = append(topicResponses, protocol.ListOffsetsTopicResponse{ + Name: topic.Name, + Partitions: partitions, + }) + } + return protocol.EncodeListOffsetsResponse(header.APIVersion, &protocol.ListOffsetsResponse{ + CorrelationID: header.CorrelationID, + Topics: topicResponses, + }) +} + func (h *handler) handleProduce(ctx context.Context, header *protocol.RequestHeader, req *protocol.ProduceRequest) ([]byte, error) { start := time.Now() defer func() { @@ -502,12 +897,27 @@ func (h *handler) handleProduce(ctx context.Context, header *protocol.RequestHea topicResponses := make([]protocol.ProduceTopicResponse, 0, len(req.Topics)) now := time.Now().UnixMilli() var producedMessages int64 + principal := principalFromContext(ctx, header) for _, topic := range req.Topics { if h.traceKafka { h.logger.Debug("produce request received", "topic", topic.Name, "partitions", len(topic.Partitions), "acks", req.Acks, "timeout_ms", req.TimeoutMs) } partitionResponses := make([]protocol.ProducePartitionResponse, 0, len(topic.Partitions)) + if !h.allowTopic(principal, topic.Name, acl.ActionProduce) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionProduce, acl.ResourceTopic, topic.Name) + for _, part := range topic.Partitions { + partitionResponses = append(partitionResponses, protocol.ProducePartitionResponse{ + Partition: part.Partition, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + }) + } + topicResponses = append(topicResponses, protocol.ProduceTopicResponse{ + Name: topic.Name, + Partitions: partitionResponses, + }) + continue + } for _, part := range topic.Partitions { if !h.etcdAvailable() { partitionResponses = append(partitionResponses, protocol.ProducePartitionResponse{ @@ -852,9 +1262,19 @@ func (h *handler) handleOffsetForLeaderEpoch(ctx context.Context, header *protoc func (h *handler) handleDescribeConfigs(ctx context.Context, header *protocol.RequestHeader, req *protocol.DescribeConfigsRequest) ([]byte, error) { resources := make([]protocol.DescribeConfigsResponseResource, 0, len(req.Resources)) + principal := principalFromContext(ctx, header) for _, resource := range req.Resources { switch resource.ResourceType { case protocol.ConfigResourceTopic: + if !h.allowTopic(principal, resource.ResourceName, acl.ActionFetch) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionFetch, acl.ResourceTopic, resource.ResourceName) + resources = append(resources, protocol.DescribeConfigsResponseResource{ + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + ResourceType: resource.ResourceType, + ResourceName: resource.ResourceName, + }) + continue + } cfg, err := h.store.FetchTopicConfig(ctx, resource.ResourceName) if err != nil { resources = append(resources, protocol.DescribeConfigsResponseResource{ @@ -872,6 +1292,15 @@ func (h *handler) handleDescribeConfigs(ctx context.Context, header *protocol.Re Configs: configs, }) case protocol.ConfigResourceBroker: + if !h.allowAdmin(principal) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceCluster, "cluster") + resources = append(resources, protocol.DescribeConfigsResponseResource{ + ErrorCode: protocol.CLUSTER_AUTHORIZATION_FAILED, + ResourceType: resource.ResourceType, + ResourceName: resource.ResourceName, + }) + continue + } configs := h.brokerConfigEntries(resource.ConfigNames) resources = append(resources, protocol.DescribeConfigsResponseResource{ ErrorCode: protocol.NONE, @@ -880,6 +1309,15 @@ func (h *handler) handleDescribeConfigs(ctx context.Context, header *protocol.Re Configs: configs, }) default: + if !h.allowAdmin(principal) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceCluster, "cluster") + resources = append(resources, protocol.DescribeConfigsResponseResource{ + ErrorCode: protocol.CLUSTER_AUTHORIZATION_FAILED, + ResourceType: resource.ResourceType, + ResourceName: resource.ResourceName, + }) + continue + } resources = append(resources, protocol.DescribeConfigsResponseResource{ ErrorCode: protocol.INVALID_REQUEST, ResourceType: resource.ResourceType, @@ -1231,6 +1669,7 @@ func (h *handler) handleFetch(ctx context.Context, header *protocol.RequestHeade var fetchedMessages int64 zeroID := [16]byte{} idToName := map[[16]byte]string{} + principal := principalFromContext(ctx, header) for _, topic := range req.Topics { if topic.TopicID != zeroID { meta, err := h.store.Metadata(ctx, nil) @@ -1265,6 +1704,22 @@ func (h *handler) handleFetch(ctx context.Context, header *protocol.RequestHeade continue } } + if !h.allowTopic(principal, topicName, acl.ActionFetch) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionFetch, acl.ResourceTopic, topicName) + partitionResponses := make([]protocol.FetchPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitionResponses = append(partitionResponses, protocol.FetchPartitionResponse{ + Partition: part.Partition, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + }) + } + topicResponses = append(topicResponses, protocol.FetchTopicResponse{ + Name: topicName, + TopicID: topic.TopicID, + Partitions: partitionResponses, + }) + continue + } partitionResponses := make([]protocol.FetchPartitionResponse, 0, len(topic.Partitions)) for _, part := range topic.Partitions { if h.traceKafka { @@ -1497,6 +1952,7 @@ func newHandler(store metadata.Store, s3Client storage.S3Client, brokerInfo prot autoPartitions = 1 } health := broker.NewS3HealthMonitor(s3HealthConfigFromEnv()) + authorizer := buildAuthorizerFromEnv(logger) return &handler{ apiVersions: generateApiVersions(), store: store, @@ -1535,6 +1991,9 @@ func newHandler(store metadata.Store, s3Client storage.S3Client, brokerInfo prot flushInterval: flushInterval, flushOnAck: flushOnAck, adminMetrics: newAdminMetrics(), + authorizer: authorizer, + authMetrics: newAuthMetrics(), + authLogLast: make(map[string]time.Time), } } @@ -1602,8 +2061,9 @@ func main() { startControlServer(ctx, controlAddr, handler, logger) kafkaAddr := envOrDefault("KAFSCALE_BROKER_ADDR", defaultKafkaAddr) srv := &broker.Server{ - Addr: kafkaAddr, - Handler: handler, + Addr: kafkaAddr, + Handler: handler, + ConnContextFunc: buildConnContextFunc(logger), } if err := srv.ListenAndServe(ctx); err != nil { logger.Error("broker server error", "error", err) @@ -1700,6 +2160,129 @@ func buildS3ConfigsFromEnv() (storage.S3Config, storage.S3Config, bool, bool, bo return writeCfg, readCfg, false, usingDefaultMinio, credsProvided, useReadReplica } +func buildConnContextFunc(logger *slog.Logger) broker.ConnContextFunc { + source := strings.TrimSpace(os.Getenv("KAFSCALE_PRINCIPAL_SOURCE")) + if source == "" { + source = "client_id" + } + proxyProtocol := parseEnvBool("KAFSCALE_PROXY_PROTOCOL", false) + if strings.EqualFold(source, "proxy_addr") { + proxyProtocol = true + } + if strings.EqualFold(source, "client_id") && !proxyProtocol { + if parseEnvBool("KAFSCALE_ACL_ENABLED", false) { + if logger == nil { + logger = slog.Default() + } + logger.Warn("ACL enabled with client_id principal source; client.id is spoofable without trusted edge auth") + } + return nil + } + if logger == nil { + logger = slog.Default() + } + if parseEnvBool("KAFSCALE_ACL_ENABLED", false) && strings.EqualFold(source, "client_id") { + logger.Warn("ACL enabled with client_id principal source; client.id is spoofable without trusted edge auth") + } + if proxyProtocol && strings.EqualFold(source, "client_id") { + logger.Warn("proxy protocol enabled but principal source remains client_id; proxy identity is unused") + } + return func(conn net.Conn) (net.Conn, *broker.ConnContext, error) { + info := &broker.ConnContext{} + var proxyInfo *broker.ProxyInfo + if proxyProtocol { + wrapped, parsed, err := broker.ReadProxyProtocol(conn) + if err != nil { + logger.Warn("proxy protocol parse failed", "error", err) + return conn, nil, err + } + if parsed == nil { + return conn, nil, fmt.Errorf("proxy protocol required but header missing") + } + if wrapped != nil { + conn = wrapped + } + proxyInfo = parsed + if proxyInfo != nil { + info.ProxyAddr = proxyInfo.SourceAddr + info.RemoteAddr = proxyInfo.SourceAddr + } + } + if info.RemoteAddr == "" { + info.RemoteAddr = conn.RemoteAddr().String() + } + switch strings.ToLower(source) { + case "remote_addr": + info.Principal = hostFromAddr(info.RemoteAddr) + case "proxy_addr": + if proxyInfo != nil && proxyInfo.SourceAddr != "" { + info.Principal = hostFromAddr(proxyInfo.SourceAddr) + } else { + info.Principal = hostFromAddr(info.RemoteAddr) + } + case "client_id": + // Default; use client.id from the Kafka request header. + default: + logger.Warn("unknown principal source; defaulting to client_id", "source", source) + } + return conn, info, nil + } +} + +func hostFromAddr(addr string) string { + if addr == "" { + return "" + } + host, _, err := net.SplitHostPort(addr) + if err != nil { + return addr + } + return host +} + +func buildAuthorizerFromEnv(logger *slog.Logger) *acl.Authorizer { + if !parseEnvBool("KAFSCALE_ACL_ENABLED", false) { + return acl.NewAuthorizer(acl.Config{Enabled: false}) + } + failOpen := parseEnvBool("KAFSCALE_ACL_FAIL_OPEN", false) + if logger == nil { + logger = slog.Default() + } + var raw []byte + if inline := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_JSON")); inline != "" { + raw = []byte(inline) + } else if path := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_FILE")); path != "" { + data, err := os.ReadFile(path) + if err != nil { + if failOpen { + logger.Warn("failed to read ACL file; ACL disabled", "error", err) + return acl.NewAuthorizer(acl.Config{Enabled: false}) + } + logger.Warn("failed to read ACL file; default deny enabled", "error", err) + return acl.NewAuthorizer(acl.Config{Enabled: true, DefaultPolicy: "deny"}) + } + raw = data + } else { + if failOpen { + logger.Warn("ACL enabled but no config provided; ACL disabled") + return acl.NewAuthorizer(acl.Config{Enabled: false}) + } + logger.Warn("ACL enabled but no config provided; default deny enabled") + return acl.NewAuthorizer(acl.Config{Enabled: true, DefaultPolicy: "deny"}) + } + var cfg acl.Config + if err := json.Unmarshal(raw, &cfg); err != nil { + if failOpen { + logger.Warn("failed to parse ACL config; ACL disabled", "error", err) + return acl.NewAuthorizer(acl.Config{Enabled: false}) + } + logger.Warn("failed to parse ACL config; default deny enabled", "error", err) + return acl.NewAuthorizer(acl.Config{Enabled: true, DefaultPolicy: "deny"}) + } + cfg.Enabled = true + return acl.NewAuthorizer(cfg) +} + func s3HealthConfigFromEnv() broker.S3HealthConfig { return broker.S3HealthConfig{ Window: time.Duration(parseEnvInt("KAFSCALE_S3_HEALTH_WINDOW_SEC", 60)) * time.Second, diff --git a/deploy/helm/kafscale/templates/operator-deployment.yaml b/deploy/helm/kafscale/templates/operator-deployment.yaml index dde40cd..37368aa 100644 --- a/deploy/helm/kafscale/templates/operator-deployment.yaml +++ b/deploy/helm/kafscale/templates/operator-deployment.yaml @@ -81,6 +81,30 @@ spec: value: "{{ join "," .Values.operator.etcdEndpoints }}" - name: KAFSCALE_OPERATOR_LEADER_KEY value: "{{ .Values.operator.leaderKey }}" +{{- if .Values.operator.acl.enabled }} + - name: KAFSCALE_ACL_ENABLED + value: "true" +{{- end }} +{{- if .Values.operator.acl.configJson }} + - name: KAFSCALE_ACL_JSON + value: "{{ .Values.operator.acl.configJson }}" +{{- end }} +{{- if .Values.operator.acl.configFile }} + - name: KAFSCALE_ACL_FILE + value: "{{ .Values.operator.acl.configFile }}" +{{- end }} +{{- if .Values.operator.acl.failOpen }} + - name: KAFSCALE_ACL_FAIL_OPEN + value: "true" +{{- end }} +{{- if .Values.operator.auth.principalSource }} + - name: KAFSCALE_PRINCIPAL_SOURCE + value: "{{ .Values.operator.auth.principalSource }}" +{{- end }} +{{- if .Values.operator.auth.proxyProtocol }} + - name: KAFSCALE_PROXY_PROTOCOL + value: "true" +{{- end }} resources: {{- if .Values.operator.resources }} {{ toYaml .Values.operator.resources | indent 12 }} diff --git a/deploy/helm/kafscale/templates/proxy-service.yaml b/deploy/helm/kafscale/templates/proxy-service.yaml index ff1198d..e884889 100644 --- a/deploy/helm/kafscale/templates/proxy-service.yaml +++ b/deploy/helm/kafscale/templates/proxy-service.yaml @@ -21,8 +21,16 @@ metadata: labels: {{ include "kafscale.labels" . | indent 4 }} app.kubernetes.io/component: proxy + {{- with .Values.proxy.service.annotations }} + annotations: +{{ toYaml . | indent 4 }} + {{- end }} spec: type: {{ .Values.proxy.service.type }} + {{- if .Values.proxy.service.loadBalancerSourceRanges }} + loadBalancerSourceRanges: +{{ toYaml .Values.proxy.service.loadBalancerSourceRanges | indent 4 }} + {{- end }} selector: {{ include "kafscale.componentSelectorLabels" (dict "root" . "component" "proxy") | indent 4 }} ports: diff --git a/deploy/helm/kafscale/values.yaml b/deploy/helm/kafscale/values.yaml index 95efc7c..f671a0b 100644 --- a/deploy/helm/kafscale/values.yaml +++ b/deploy/helm/kafscale/values.yaml @@ -44,6 +44,14 @@ operator: etcdEndpoints: - "http://etcd:2379" leaderKey: "kafscale-operator" + acl: + enabled: false + configJson: "" + configFile: "" + failOpen: false + auth: + principalSource: "" + proxyProtocol: false podAnnotations: {} resources: {} nodeSelector: {} @@ -127,6 +135,8 @@ proxy: service: type: LoadBalancer port: 9092 + annotations: {} + loadBalancerSourceRanges: [] mcp: enabled: false diff --git a/docs/mcp.md b/docs/mcp.md index 69281a8..e6632b4 100644 --- a/docs/mcp.md +++ b/docs/mcp.md @@ -82,8 +82,8 @@ Mutation tools (future, gated by auth + RBAC): ## Security and Guardrails -Kafscale currently does not enforce auth on broker/admin APIs. For MCP, we -must ship secure-by-default to avoid "one prompt away from prod changes". +Kafscale v1.5 introduces basic broker ACLs, but MCP remains read-only for security reasons. +For MCP, we ship secure-by-default to avoid "one prompt away from prod changes". Requirements: diff --git a/docs/metrics.md b/docs/metrics.md index a7c483b..84dbb29 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -52,6 +52,7 @@ Broker metrics are emitted directly by the broker process. | `kafscale_admin_requests_total` | Counter | `api` | Count of admin API requests by API name. | | `kafscale_admin_request_errors_total` | Counter | `api` | Count of admin API errors by API name. | | `kafscale_admin_request_latency_ms_avg` | Gauge | `api` | Average admin API latency (ms). | +| `kafscale_authz_denied_total` | Counter | `action`, `resource` | Count of authorization denials by action/resource. | | `kafscale_produce_latency_ms` | Histogram | - | Produce request latency distribution (use p95 in PromQL). | | `kafscale_consumer_lag` | Histogram | - | Consumer lag distribution (use p95 in PromQL). | | `kafscale_consumer_lag_max` | Gauge | - | Maximum observed consumer lag. | diff --git a/docs/operations.md b/docs/operations.md index b0f7b9f..a537078 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -72,10 +72,74 @@ helm upgrade --install kafscale deploy/helm/kafscale \ - **TLS** – Terminate TLS at your ingress or service mesh; broker/console TLS env flags are not wired in v1. - **Admin APIs** – Create/Delete Topics are enabled by default. Set `KAFSCALE_ALLOW_ADMIN_APIS=false` on broker pods to disable them, and gate external access via mTLS, ingress auth, or network policies. - **Network policies** – If your cluster enforces policies, allow the operator + brokers to reach etcd and S3 endpoints and lock everything else down. +- **ACLs (v1.5)** – Optional, basic ACL enforcement is available at the broker. Identity comes from Kafka `client.id` until SASL is introduced. Configure via `KAFSCALE_ACL_ENABLED` plus either `KAFSCALE_ACL_JSON` or `KAFSCALE_ACL_FILE`. Use `KAFSCALE_ACL_FAIL_OPEN=true` to allow traffic when the ACL config is missing/invalid (default is fail-closed). + - **Principal source** – Set `KAFSCALE_PRINCIPAL_SOURCE` to `client_id` (default), `remote_addr`, or `proxy_addr`. Use `proxy_addr` with `KAFSCALE_PROXY_PROTOCOL=true` to derive principals from a trusted TCP proxy (PROXY protocol v1/v2). + - **Auth denials** – Broker logs emit a rate-limited `authorization denied` entry with principal/action/resource context. + - **Trust boundary** – Only enable `proxy_addr`/PROXY protocol when brokers are reachable *only* through a trusted LB or sidecar that injects the header. Do not expose brokers directly, or clients can spoof identity. + +Example ACL values (Helm operator): +```yaml +operator: + acl: + enabled: true + configJson: | + {"default_policy":"deny","principals":[ + {"name":"analytics","allow":[{"action":"fetch","resource":"topic","name":"orders-*"}]} + ]} + auth: + principalSource: "proxy_addr" + proxyProtocol: true +``` - **Health / metrics** – Prometheus can scrape `/metrics` on the brokers and operator for early detection of S3 pressure or degraded nodes. The operator exposes metrics on port `8080` and the Helm chart can create a metrics Service, ServiceMonitor, and PrometheusRule. - **Startup gating** – Broker pods exit immediately if they cannot read metadata or write a probe object to S3 during startup, so Kubernetes restarts them rather than leaving a stuck listener in place. - **Leader IDs** – Each broker advertises a numeric `NodeID` in etcd. In the single-node demo you’ll always see `Leader=0` in the Console’s topic detail because the only broker has ID `0`. In real clusters those IDs align with the broker addresses the operator published; if you see `Leader=3`, look for the broker with `NodeID 3` in the metadata payload. +### Proxy TLS via LoadBalancer (Recommended) + +The proxy is the external Kafka entrypoint. For TLS in v1.5, terminate TLS at +the cloud LoadBalancer by supplying Service annotations in the Helm values. +This keeps broker traffic plaintext inside the cluster while enabling HTTPS/TLS +for clients. + +Example (provider-specific annotations omitted here): + +```yaml +proxy: + enabled: true + service: + type: LoadBalancer + port: 9092 + annotations: + # Add your cloud provider TLS annotations here (ACM / GCP / Azure, etc.) + loadBalancerSourceRanges: + - 203.0.113.0/24 +``` + +If you need in-cluster ACME/Let’s Encrypt support, document it as an optional +stack (Traefik or another TCP-capable gateway + cert-manager). Keep it off by +default to avoid extra operational dependencies. + +#### Provider Examples (Verify with your cloud) + +These snippets show common patterns; annotation keys vary by provider and +feature. Always validate against your cloud provider docs. + +AWS NLB TLS (example): +```yaml +proxy: + service: + annotations: + service.beta.kubernetes.io/aws-load-balancer-type: "nlb" + service.beta.kubernetes.io/aws-load-balancer-ssl-cert: "arn:aws:acm:REGION:ACCOUNT:certificate/ID" + service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9092" + service.beta.kubernetes.io/aws-load-balancer-backend-protocol: "tcp" +``` + +GCP / Azure: +- L4 LoadBalancers typically do TCP pass-through; TLS termination often requires + a provider gateway/ingress (or a TCP-capable ingress controller). +- If you terminate TLS outside the Service, keep the proxy Service as plain TCP. + ## Ops API Examples Kafscale exposes Kafka admin APIs for operator workflows (consumer group visibility, diff --git a/docs/protocol.md b/docs/protocol.md index 6bb91f6..fc56e6f 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -77,4 +77,7 @@ Kafscale implements a focused subset of the Kafka protocol. Versions below refle | v2.0 | mTLS | Certificate-based auth | | v2.0 | SASL/OAUTHBEARER | Enterprise SSO | -Until auth lands, Kafscale responds to SASL handshake attempts with `UNSUPPORTED_SASL_MECHANISM` (error code 33). +v1.5 focuses on basic broker ACLs with identity derived from `client.id`; terminate TLS at the proxy/LB. + +Until SASL lands, Kafscale rejects SASL handshake/auth requests as unsupported +API keys and closes the connection (no SASL-specific error payload is returned). diff --git a/docs/roadmap.md b/docs/roadmap.md index cf05531..e2abcf3 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -35,14 +35,4 @@ This roadmap tracks completed work and open gaps. It is intentionally high level - Admin ops API e2e coverage - Security review (TLS/auth) - End-to-end tests multi-segment restart durability - -## Open - -### Release Planning - -- v1.5: authentication groundwork and security hardening -- v2.0: SASL support and ACL authorization - -### Testing and Hardening - -- Performance benchmarks +- Authentication / ACL and security hardening \ No newline at end of file diff --git a/docs/security.md b/docs/security.md index a509d91..dae5c9f 100644 --- a/docs/security.md +++ b/docs/security.md @@ -26,8 +26,9 @@ and the boundaries of what is and is not supported in v1. - **Authentication**: none at the Kafka protocol layer. Brokers accept any client connection. The console UI supports basic auth via `KAFSCALE_UI_USERNAME` / `KAFSCALE_UI_PASSWORD`. -- **Authorization**: none. All broker APIs are unauthenticated and authorized - implicitly. This includes admin APIs such as CreatePartitions and DeleteGroups. +- **Authorization**: optional in v1.5. When ACLs are enabled, broker APIs are + authorized by the configured rules; when disabled, all broker APIs are + implicitly allowed (including admin APIs like CreatePartitions/DeleteGroups). - **Transport Security**: TLS termination is expected at the ingress or mesh layer in v1; brokers and the console speak plaintext by default. - **Secrets Handling**: S3 credentials are read from Kubernetes secrets and are @@ -45,12 +46,71 @@ and the boundaries of what is and is not supported in v1. - Use least-privilege IAM roles for S3 access and restrict etcd endpoints. - Treat the console as privileged; do not expose it publicly without auth. +## v1.5 Auth (Basic ACLs) + +Kafscale v1.5 introduces optional, basic ACL enforcement at the broker. TLS is +still expected to terminate at your load balancer / ingress or service mesh. + +### What Is Supported + +- **ACL enforcement** for topic, group, and admin operations. +- **Principal identity** derived from the Kafka `client.id` (ClientID) until + SASL auth is introduced. +- **Allow/Deny rules** with wildcard topic/group names (prefix `*`). +- **Proxy protocol identity** when deployed behind a trusted LB/sidecar. + +### Enabling ACLs + +Set the following environment variables on broker pods: + +```bash +KAFSCALE_ACL_ENABLED=true +KAFSCALE_ACL_JSON='{ + "default_policy": "deny", + "principals": [ + { + "name": "analytics-service", + "allow": [ + {"action": "fetch", "resource": "topic", "name": "orders-*"}, + {"action": "group_read", "resource": "group", "name": "analytics-*"} + ] + }, + { + "name": "ops-admin", + "allow": [ + {"action": "admin", "resource": "cluster", "name": "*"} + ] + } + ] +}' +``` + +You can also supply `KAFSCALE_ACL_FILE=/path/to/acl.json` instead of inline JSON. +Set `KAFSCALE_ACL_FAIL_OPEN=true` to allow traffic if the ACL config is missing +or invalid. Default is fail-closed (deny). + +### Actions and Resources + +- **Actions**: `produce`, `fetch`, `group_read`, `group_write`, `group_admin`, `admin` +- **Resources**: `topic`, `group`, `cluster` + +### Client Configuration + +Set `client.id` in your Kafka clients to the principal name used in ACLs. +Until SASL is implemented, this is the default identity Kafscale uses for ACL +checks. You can also derive principals from network identity when the proxy +protocol is enabled (see Operations docs for `KAFSCALE_PRINCIPAL_SOURCE`). Only +enable proxy-derived identity when brokers are reachable solely through a +trusted proxy/LB. + ## Known Gaps - No SASL or mTLS authentication for Kafka protocol clients. -- No ACLs or RBAC at the broker layer. +- ACLs are optional and rely on `client.id` or network-derived identity; no + strong client auth yet. - No multi-tenant isolation. -- Admin APIs are writable without auth; UI is read-only by policy, not enforcement. +- Admin APIs are writable without auth if ACLs are disabled; UI is read-only by + policy, not enforcement. ## Roadmap diff --git a/internal/console/auth.go b/internal/console/auth.go index 7cc57f8..cd5fe56 100644 --- a/internal/console/auth.go +++ b/internal/console/auth.go @@ -20,6 +20,7 @@ import ( "crypto/subtle" "encoding/base64" "encoding/json" + "net" "net/http" "sync" "time" @@ -39,6 +40,7 @@ type authManager struct { ttl time.Duration mu sync.Mutex sessions map[string]time.Time + limiter *loginRateLimiter } type authConfigResponse struct { @@ -70,6 +72,7 @@ func newAuthManager(cfg AuthConfig) *authManager { password: cfg.Password, ttl: 12 * time.Hour, sessions: make(map[string]time.Time), + limiter: newLoginRateLimiter(20, time.Minute), } } @@ -109,6 +112,13 @@ func (a *authManager) handleLogin(w http.ResponseWriter, r *http.Request) { }) return } + if a.limiter != nil { + ip := remoteIP(r) + if !a.limiter.Allow(ip) { + http.Error(w, "rate limit exceeded", http.StatusTooManyRequests) + return + } + } var payload loginRequest if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { http.Error(w, "invalid payload", http.StatusBadRequest) @@ -219,3 +229,58 @@ func generateToken(size int) (string, error) { } return base64.RawURLEncoding.EncodeToString(raw), nil } + +type loginRateLimiter struct { + limit int + window time.Duration + mu sync.Mutex + hits map[string][]time.Time +} + +func newLoginRateLimiter(limit int, window time.Duration) *loginRateLimiter { + if limit <= 0 || window <= 0 { + return nil + } + return &loginRateLimiter{ + limit: limit, + window: window, + hits: make(map[string][]time.Time), + } +} + +func (l *loginRateLimiter) Allow(key string) bool { + if l == nil { + return true + } + now := time.Now() + cutoff := now.Add(-l.window) + l.mu.Lock() + defer l.mu.Unlock() + hits := l.hits[key] + idx := 0 + for _, ts := range hits { + if ts.After(cutoff) { + hits[idx] = ts + idx++ + } + } + hits = hits[:idx] + if len(hits) >= l.limit { + l.hits[key] = hits + return false + } + hits = append(hits, now) + l.hits[key] = hits + return true +} + +func remoteIP(r *http.Request) string { + if r == nil { + return "unknown" + } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil || host == "" { + return r.RemoteAddr + } + return host +} diff --git a/pkg/acl/acl.go b/pkg/acl/acl.go new file mode 100644 index 0000000..c86b26c --- /dev/null +++ b/pkg/acl/acl.go @@ -0,0 +1,148 @@ +// Copyright 2025, 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package acl + +import "strings" + +type Action string + +type Resource string + +const ( + ActionAny Action = "*" + ActionProduce Action = "produce" + ActionFetch Action = "fetch" + ActionGroupRead Action = "group_read" + ActionGroupWrite Action = "group_write" + ActionGroupAdmin Action = "group_admin" + ActionAdmin Action = "admin" +) + +const ( + ResourceAny Resource = "*" + ResourceTopic Resource = "topic" + ResourceGroup Resource = "group" + ResourceCluster Resource = "cluster" +) + +type Rule struct { + Action Action `json:"action"` + Resource Resource `json:"resource"` + Name string `json:"name"` +} + +type PrincipalRules struct { + Name string `json:"name"` + Allow []Rule `json:"allow"` + Deny []Rule `json:"deny"` +} + +type Config struct { + Enabled bool `json:"enabled"` + DefaultPolicy string `json:"default_policy"` + Principals []PrincipalRules `json:"principals"` +} + +type Authorizer struct { + enabled bool + defaultAllow bool + principals map[string]PrincipalRules +} + +func NewAuthorizer(cfg Config) *Authorizer { + defaultAllow := strings.EqualFold(strings.TrimSpace(cfg.DefaultPolicy), "allow") + principals := make(map[string]PrincipalRules, len(cfg.Principals)) + for _, p := range cfg.Principals { + name := strings.TrimSpace(p.Name) + if name == "" { + continue + } + principals[name] = p + } + return &Authorizer{ + enabled: cfg.Enabled, + defaultAllow: defaultAllow, + principals: principals, + } +} + +func (a *Authorizer) Enabled() bool { + if a == nil { + return false + } + return a.enabled +} + +func (a *Authorizer) Allows(principal string, action Action, resource Resource, name string) bool { + if a == nil || !a.enabled { + return true + } + principal = strings.TrimSpace(principal) + if principal == "" { + principal = "anonymous" + } + rules, ok := a.principals[principal] + if !ok { + return a.defaultAllow + } + for _, rule := range rules.Deny { + if matches(rule, action, resource, name) { + return false + } + } + for _, rule := range rules.Allow { + if matches(rule, action, resource, name) { + return true + } + } + return a.defaultAllow +} + +func matches(rule Rule, action Action, resource Resource, name string) bool { + if !actionMatches(rule.Action, action) { + return false + } + if !resourceMatches(rule.Resource, resource) { + return false + } + return nameMatches(rule.Name, name) +} + +func actionMatches(rule Action, action Action) bool { + if rule == "" || rule == ActionAny { + return true + } + return strings.EqualFold(string(rule), string(action)) +} + +func resourceMatches(rule Resource, resource Resource) bool { + if rule == "" || rule == ResourceAny { + return true + } + return strings.EqualFold(string(rule), string(resource)) +} + +func nameMatches(ruleName, name string) bool { + ruleName = strings.TrimSpace(ruleName) + if ruleName == "" || ruleName == "*" { + return true + } + if strings.HasSuffix(ruleName, "*") { + prefix := strings.TrimSuffix(ruleName, "*") + return strings.HasPrefix(name, prefix) + } + return ruleName == name +} diff --git a/pkg/acl/acl_test.go b/pkg/acl/acl_test.go new file mode 100644 index 0000000..70693c0 --- /dev/null +++ b/pkg/acl/acl_test.go @@ -0,0 +1,90 @@ +// Copyright 2025, 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package acl + +import "testing" + +func TestAuthorizerDefaultAllow(t *testing.T) { + auth := NewAuthorizer(Config{ + Enabled: true, + DefaultPolicy: "allow", + }) + if !auth.Allows("unknown", ActionFetch, ResourceTopic, "orders") { + t.Fatalf("expected default allow") + } +} + +func TestAuthorizerDefaultDeny(t *testing.T) { + auth := NewAuthorizer(Config{ + Enabled: true, + DefaultPolicy: "deny", + }) + if auth.Allows("unknown", ActionFetch, ResourceTopic, "orders") { + t.Fatalf("expected default deny") + } +} + +func TestAuthorizerAllowOverridesDefaultDeny(t *testing.T) { + auth := NewAuthorizer(Config{ + Enabled: true, + DefaultPolicy: "deny", + Principals: []PrincipalRules{ + { + Name: "client-a", + Allow: []Rule{{Action: ActionFetch, Resource: ResourceTopic, Name: "orders"}}, + }, + }, + }) + if !auth.Allows("client-a", ActionFetch, ResourceTopic, "orders") { + t.Fatalf("expected allow rule to permit access") + } + if auth.Allows("client-a", ActionFetch, ResourceTopic, "payments") { + t.Fatalf("expected deny for unmatched topic") + } +} + +func TestAuthorizerDenyOverridesAllow(t *testing.T) { + auth := NewAuthorizer(Config{ + Enabled: true, + DefaultPolicy: "allow", + Principals: []PrincipalRules{ + { + Name: "client-a", + Allow: []Rule{{Action: ActionFetch, Resource: ResourceTopic, Name: "orders"}}, + Deny: []Rule{{Action: ActionFetch, Resource: ResourceTopic, Name: "orders"}}, + }, + }, + }) + if auth.Allows("client-a", ActionFetch, ResourceTopic, "orders") { + t.Fatalf("expected deny to override allow") + } +} + +func TestAuthorizerWildcardName(t *testing.T) { + auth := NewAuthorizer(Config{ + Enabled: true, + DefaultPolicy: "deny", + Principals: []PrincipalRules{ + { + Name: "client-a", + Allow: []Rule{{Action: ActionProduce, Resource: ResourceTopic, Name: "orders-*"}}, + }, + }, + }) + if !auth.Allows("client-a", ActionProduce, ResourceTopic, "orders-2025") { + t.Fatalf("expected prefix wildcard match") + } +} diff --git a/pkg/broker/conn_context.go b/pkg/broker/conn_context.go new file mode 100644 index 0000000..2cbb808 --- /dev/null +++ b/pkg/broker/conn_context.go @@ -0,0 +1,48 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package broker + +import "context" + +// ConnContext carries connection-scoped identity data for auth decisions. +type ConnContext struct { + Principal string + RemoteAddr string + ProxyAddr string +} + +type connContextKey struct{} + +// ContextWithConnInfo attaches connection info to a context. +func ContextWithConnInfo(ctx context.Context, info *ConnContext) context.Context { + if info == nil { + return ctx + } + return context.WithValue(ctx, connContextKey{}, info) +} + +// ConnInfoFromContext returns connection info if present. +func ConnInfoFromContext(ctx context.Context) *ConnContext { + if ctx == nil { + return nil + } + if v := ctx.Value(connContextKey{}); v != nil { + if info, ok := v.(*ConnContext); ok { + return info + } + } + return nil +} diff --git a/pkg/broker/proxyproto.go b/pkg/broker/proxyproto.go new file mode 100644 index 0000000..62b1400 --- /dev/null +++ b/pkg/broker/proxyproto.go @@ -0,0 +1,179 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package broker + +import ( + "bufio" + "bytes" + "encoding/binary" + "fmt" + "io" + "net" +) + +var proxyV2Signature = []byte{'\r', '\n', '\r', '\n', 0x00, '\r', '\n', 'Q', 'U', 'I', 'T', '\n'} + +// ProxyInfo captures parsed proxy protocol metadata. +type ProxyInfo struct { + SourceAddr string + DestAddr string + SourceIP string + DestIP string + SourcePort int + DestPort int +} + +// ReadProxyProtocol consumes a PROXY protocol header (v1 or v2) if present. +// It returns a wrapped connection that preserves buffered bytes. +func ReadProxyProtocol(conn net.Conn) (net.Conn, *ProxyInfo, error) { + br := bufio.NewReader(conn) + info, err := parseProxyHeader(br) + return wrapConnWithReader(conn, br), info, err +} + +func parseProxyHeader(br *bufio.Reader) (*ProxyInfo, error) { + peek, err := br.Peek(5) + if err != nil { + if err == io.EOF { + return nil, nil + } + return nil, err + } + if bytes.Equal(peek, []byte("PROXY")) { + return parseProxyV1(br) + } + if bytes.HasPrefix(peek, []byte{'\r', '\n', '\r', '\n', 0x00}) { + sig, err := br.Peek(len(proxyV2Signature)) + if err != nil { + return nil, err + } + if bytes.Equal(sig, proxyV2Signature) { + return parseProxyV2(br) + } + } + return nil, nil +} + +func parseProxyV1(br *bufio.Reader) (*ProxyInfo, error) { + line, err := br.ReadString('\n') + if err != nil { + return nil, err + } + parts := bytes.Fields([]byte(line)) + if len(parts) < 6 { + return nil, fmt.Errorf("proxy v1 header malformed") + } + srcIP := string(parts[2]) + dstIP := string(parts[3]) + srcPort := string(parts[4]) + dstPort := string(parts[5]) + return &ProxyInfo{ + SourceAddr: net.JoinHostPort(srcIP, srcPort), + DestAddr: net.JoinHostPort(dstIP, dstPort), + SourceIP: srcIP, + DestIP: dstIP, + SourcePort: atoiOrZero(srcPort), + DestPort: atoiOrZero(dstPort), + }, nil +} + +func parseProxyV2(br *bufio.Reader) (*ProxyInfo, error) { + header := make([]byte, 16) + if _, err := io.ReadFull(br, header); err != nil { + return nil, err + } + if !bytes.Equal(header[:12], proxyV2Signature) { + return nil, fmt.Errorf("proxy v2 signature mismatch") + } + length := int(binary.BigEndian.Uint16(header[14:16])) + payload := make([]byte, length) + if _, err := io.ReadFull(br, payload); err != nil { + return nil, err + } + family := header[13] & 0x0f + switch family { + case 0x1: + return parseProxyV2Inet(payload) + case 0x2: + return parseProxyV2Inet6(payload) + default: + return nil, nil + } +} + +func parseProxyV2Inet(payload []byte) (*ProxyInfo, error) { + if len(payload) < 12 { + return nil, fmt.Errorf("proxy v2 inet payload too short") + } + srcIP := net.IP(payload[0:4]).String() + dstIP := net.IP(payload[4:8]).String() + srcPort := int(binary.BigEndian.Uint16(payload[8:10])) + dstPort := int(binary.BigEndian.Uint16(payload[10:12])) + return &ProxyInfo{ + SourceAddr: net.JoinHostPort(srcIP, fmt.Sprintf("%d", srcPort)), + DestAddr: net.JoinHostPort(dstIP, fmt.Sprintf("%d", dstPort)), + SourceIP: srcIP, + DestIP: dstIP, + SourcePort: srcPort, + DestPort: dstPort, + }, nil +} + +func parseProxyV2Inet6(payload []byte) (*ProxyInfo, error) { + if len(payload) < 36 { + return nil, fmt.Errorf("proxy v2 inet6 payload too short") + } + srcIP := net.IP(payload[0:16]).String() + dstIP := net.IP(payload[16:32]).String() + srcPort := int(binary.BigEndian.Uint16(payload[32:34])) + dstPort := int(binary.BigEndian.Uint16(payload[34:36])) + return &ProxyInfo{ + SourceAddr: net.JoinHostPort(srcIP, fmt.Sprintf("%d", srcPort)), + DestAddr: net.JoinHostPort(dstIP, fmt.Sprintf("%d", dstPort)), + SourceIP: srcIP, + DestIP: dstIP, + SourcePort: srcPort, + DestPort: dstPort, + }, nil +} + +type connWithReader struct { + net.Conn + reader *bufio.Reader +} + +func wrapConnWithReader(conn net.Conn, reader *bufio.Reader) net.Conn { + if reader == nil { + return conn + } + return &connWithReader{Conn: conn, reader: reader} +} + +func (c *connWithReader) Read(p []byte) (int, error) { + return c.reader.Read(p) +} + +func atoiOrZero(value string) int { + var out int + for i := 0; i < len(value); i++ { + ch := value[i] + if ch < '0' || ch > '9' { + return 0 + } + out = out*10 + int(ch-'0') + } + return out +} diff --git a/pkg/broker/server.go b/pkg/broker/server.go index 5f14b20..23ec3e1 100644 --- a/pkg/broker/server.go +++ b/pkg/broker/server.go @@ -33,12 +33,16 @@ type Handler interface { // Server implements minimal Kafka TCP handling for milestone 1. type Server struct { - Addr string - Handler Handler - listener net.Listener - wg sync.WaitGroup + Addr string + Handler Handler + ConnContextFunc ConnContextFunc + listener net.Listener + wg sync.WaitGroup } +// ConnContextFunc can wrap a connection and attach connection-scoped context data. +type ConnContextFunc func(conn net.Conn) (net.Conn, *ConnContext, error) + // ListenAndServe starts accepting Kafka protocol connections. func (s *Server) ListenAndServe(ctx context.Context) error { if s.Handler == nil { @@ -95,6 +99,19 @@ func (s *Server) handleConnection(conn net.Conn) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() defer conn.Close() + if s.ConnContextFunc != nil { + wrapped, info, err := s.ConnContextFunc(conn) + if err != nil { + log.Printf("connection context setup failed: %v", err) + return + } + if wrapped != nil { + conn = wrapped + } + if info != nil { + ctx = ContextWithConnInfo(ctx, info) + } + } for { frame, err := protocol.ReadFrame(conn) if err != nil { diff --git a/pkg/operator/cluster_controller.go b/pkg/operator/cluster_controller.go index d3e996b..063123f 100644 --- a/pkg/operator/cluster_controller.go +++ b/pkg/operator/cluster_controller.go @@ -229,6 +229,24 @@ func (r *ClusterReconciler) brokerContainer(cluster *kafscalev1alpha1.KafscaleCl Value: cluster.Spec.Config.CacheSize, }) } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_ENABLED")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_ACL_ENABLED", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_JSON")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_ACL_JSON", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_FILE")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_ACL_FILE", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_FAIL_OPEN")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_ACL_FAIL_OPEN", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_PRINCIPAL_SOURCE")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_PRINCIPAL_SOURCE", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_PROXY_PROTOCOL")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_PROXY_PROTOCOL", Value: val}) + } var envFrom []corev1.EnvFromSource if cluster.Spec.S3.CredentialsSecretRef != "" { envFrom = append(envFrom, corev1.EnvFromSource{ diff --git a/pkg/protocol/errors.go b/pkg/protocol/errors.go index c2d2c8c..b0de86e 100644 --- a/pkg/protocol/errors.go +++ b/pkg/protocol/errors.go @@ -33,6 +33,8 @@ const ( INVALID_CONFIG int16 = 40 TOPIC_ALREADY_EXISTS int16 = 36 TOPIC_AUTHORIZATION_FAILED int16 = 29 + GROUP_AUTHORIZATION_FAILED int16 = 30 + CLUSTER_AUTHORIZATION_FAILED int16 = 31 INVALID_PARTITIONS int16 = 37 UNSUPPORTED_VERSION int16 = 35 ) diff --git a/test/e2e/acl_test.go b/test/e2e/acl_test.go new file mode 100644 index 0000000..97b4794 --- /dev/null +++ b/test/e2e/acl_test.go @@ -0,0 +1,141 @@ +// Copyright 2025, 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build e2e + +package e2e + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func TestACLsE2E(t *testing.T) { + const enableEnv = "KAFSCALE_E2E" + if os.Getenv(enableEnv) != "1" { + t.Skipf("set %s=1 to run integration harness", enableEnv) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + etcd, endpoints := startEmbeddedEtcd(t) + defer etcd.Close() + + brokerAddr := freeAddr(t) + metricsAddr := freeAddr(t) + controlAddr := freeAddr(t) + + aclJSON := `{"default_policy":"deny","principals":[` + + `{"name":"allowed","allow":[` + + `{"action":"produce","resource":"topic","name":"orders-*" },` + + `{"action":"fetch","resource":"topic","name":"orders-*" },` + + `{"action":"group_read","resource":"group","name":"group-allowed"},` + + `{"action":"group_write","resource":"group","name":"group-allowed"}` + + `]},` + + `{"name":"denied","allow":[]}` + + `]}` + + brokerCmd, brokerLogs := startBrokerWithEtcdACL(t, ctx, brokerAddr, metricsAddr, controlAddr, endpoints, aclJSON) + t.Cleanup(func() { stopBroker(t, brokerCmd) }) + waitForBroker(t, brokerLogs, brokerAddr) + + allowedClient, err := kgo.NewClient( + kgo.SeedBrokers(brokerAddr), + kgo.AllowAutoTopicCreation(), + kgo.DisableIdempotentWrite(), + kgo.ClientID("allowed"), + ) + if err != nil { + t.Fatalf("create allowed client: %v\nbroker logs:\n%s", err, brokerLogs.String()) + } + defer allowedClient.Close() + + deniedClient, err := kgo.NewClient( + kgo.SeedBrokers(brokerAddr), + kgo.AllowAutoTopicCreation(), + kgo.DisableIdempotentWrite(), + kgo.ClientID("denied"), + ) + if err != nil { + t.Fatalf("create denied client: %v\nbroker logs:\n%s", err, brokerLogs.String()) + } + defer deniedClient.Close() + + topic := fmt.Sprintf("orders-%d", time.Now().UnixNano()) + produceErr := allowedClient.ProduceSync(ctx, &kgo.Record{Topic: topic, Value: []byte("ok")}).FirstErr() + if produceErr != nil { + t.Fatalf("allowed produce failed: %v\nbroker logs:\n%s", produceErr, brokerLogs.String()) + } + + denyErr := deniedClient.ProduceSync(ctx, &kgo.Record{Topic: topic, Value: []byte("deny")}).FirstErr() + if denyErr == nil { + t.Fatalf("expected denied produce error") + } + var kerrVal *kerr.Error + if !errors.As(denyErr, &kerrVal) || kerrVal.Code != kerr.TopicAuthorizationFailed.Code { + t.Fatalf("expected topic authorization failed, got: %v", denyErr) + } + + listReq := kmsg.NewPtrListGroupsRequest() + listReq.Version = 5 + listReq.StatesFilter = []string{"Stable"} + listReq.TypesFilter = []string{"classic"} + listResp, err := listReq.RequestWith(ctx, deniedClient) + if err != nil { + t.Fatalf("list groups request failed: %v\nbroker logs:\n%s", err, brokerLogs.String()) + } + if listResp.ErrorCode != kerr.GroupAuthorizationFailed.Code { + t.Fatalf("expected group authorization failed, got %d", listResp.ErrorCode) + } +} + +func startBrokerWithEtcdACL(t *testing.T, ctx context.Context, brokerAddr, metricsAddr, controlAddr string, endpoints []string, aclJSON string) (*exec.Cmd, *bytes.Buffer) { + t.Helper() + brokerCmd := exec.CommandContext(ctx, "go", "run", filepath.Join(repoRoot(t), "cmd", "broker")) + configureProcessGroup(brokerCmd) + brokerCmd.Env = append(os.Environ(), + "KAFSCALE_AUTO_CREATE_TOPICS=true", + "KAFSCALE_AUTO_CREATE_PARTITIONS=1", + "KAFSCALE_USE_MEMORY_S3=1", + "KAFSCALE_ACL_ENABLED=true", + fmt.Sprintf("KAFSCALE_ACL_JSON=%s", aclJSON), + fmt.Sprintf("KAFSCALE_BROKER_ADDR=%s", brokerAddr), + fmt.Sprintf("KAFSCALE_METRICS_ADDR=%s", metricsAddr), + fmt.Sprintf("KAFSCALE_CONTROL_ADDR=%s", controlAddr), + fmt.Sprintf("KAFSCALE_ETCD_ENDPOINTS=%s", strings.Join(endpoints, ",")), + ) + var brokerLogs bytes.Buffer + logWriter := io.MultiWriter(&brokerLogs, mustLogFile(t, "broker-acl.log")) + brokerCmd.Stdout = logWriter + brokerCmd.Stderr = logWriter + if err := brokerCmd.Start(); err != nil { + t.Fatalf("start broker: %v", err) + } + return brokerCmd, &brokerLogs +} From d127e93b64c336fad23d5f9566cb2d47ff49c290 Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Wed, 28 Jan 2026 15:17:33 +0100 Subject: [PATCH 2/3] harden proxy protocol handling --- cmd/broker/main.go | 12 ++++++++++-- docs/operations.md | 3 +++ docs/security.md | 3 ++- pkg/broker/proxyproto.go | 25 ++++++++++++++++++++++++- 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index d6de330..2a6bf02 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -647,6 +647,9 @@ func (h *handler) logAuthzDenied(principal string, action acl.Action, resource a key := fmt.Sprintf("%s|%s|%s|%s", action, resource, principal, name) now := time.Now() h.authLogMu.Lock() + if len(h.authLogLast) > 10000 { + h.authLogLast = make(map[string]time.Time) + } last := h.authLogLast[key] if now.Sub(last) < time.Minute { h.authLogMu.Unlock() @@ -2199,13 +2202,18 @@ func buildConnContextFunc(logger *slog.Logger) broker.ConnContextFunc { if parsed == nil { return conn, nil, fmt.Errorf("proxy protocol required but header missing") } + if parsed.Local { + logger.Warn("proxy protocol local command received; using socket remote addr") + } if wrapped != nil { conn = wrapped } proxyInfo = parsed if proxyInfo != nil { - info.ProxyAddr = proxyInfo.SourceAddr - info.RemoteAddr = proxyInfo.SourceAddr + if !proxyInfo.Local && proxyInfo.SourceAddr != "" { + info.ProxyAddr = proxyInfo.SourceAddr + info.RemoteAddr = proxyInfo.SourceAddr + } } } if info.RemoteAddr == "" { diff --git a/docs/operations.md b/docs/operations.md index a537078..13c607a 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -76,6 +76,9 @@ helm upgrade --install kafscale deploy/helm/kafscale \ - **Principal source** – Set `KAFSCALE_PRINCIPAL_SOURCE` to `client_id` (default), `remote_addr`, or `proxy_addr`. Use `proxy_addr` with `KAFSCALE_PROXY_PROTOCOL=true` to derive principals from a trusted TCP proxy (PROXY protocol v1/v2). - **Auth denials** – Broker logs emit a rate-limited `authorization denied` entry with principal/action/resource context. - **Trust boundary** – Only enable `proxy_addr`/PROXY protocol when brokers are reachable *only* through a trusted LB or sidecar that injects the header. Do not expose brokers directly, or clients can spoof identity. + - **Fail-closed** – When `KAFSCALE_PROXY_PROTOCOL=true`, brokers reject connections that do not include a valid PROXY header. + - **Header limits** – PROXY v1 headers are capped at 256 bytes; oversized headers are rejected. + - **Health checks** – PROXY v2 `LOCAL` connections are accepted (no identity); ensure LB health checks don’t rely on ACL-protected operations. Example ACL values (Helm operator): ```yaml diff --git a/docs/security.md b/docs/security.md index dae5c9f..aca60b0 100644 --- a/docs/security.md +++ b/docs/security.md @@ -101,7 +101,8 @@ Until SASL is implemented, this is the default identity Kafscale uses for ACL checks. You can also derive principals from network identity when the proxy protocol is enabled (see Operations docs for `KAFSCALE_PRINCIPAL_SOURCE`). Only enable proxy-derived identity when brokers are reachable solely through a -trusted proxy/LB. +trusted proxy/LB. PROXY v1 headers are capped at 256 bytes; oversized headers +are rejected. ## Known Gaps diff --git a/pkg/broker/proxyproto.go b/pkg/broker/proxyproto.go index 62b1400..b5b1780 100644 --- a/pkg/broker/proxyproto.go +++ b/pkg/broker/proxyproto.go @@ -34,6 +34,7 @@ type ProxyInfo struct { DestIP string SourcePort int DestPort int + Local bool } // ReadProxyProtocol consumes a PROXY protocol header (v1 or v2) if present. @@ -68,7 +69,7 @@ func parseProxyHeader(br *bufio.Reader) (*ProxyInfo, error) { } func parseProxyV1(br *bufio.Reader) (*ProxyInfo, error) { - line, err := br.ReadString('\n') + line, err := readProxyV1Line(br, 256) if err != nil { return nil, err } @@ -98,11 +99,15 @@ func parseProxyV2(br *bufio.Reader) (*ProxyInfo, error) { if !bytes.Equal(header[:12], proxyV2Signature) { return nil, fmt.Errorf("proxy v2 signature mismatch") } + cmd := header[12] & 0x0f length := int(binary.BigEndian.Uint16(header[14:16])) payload := make([]byte, length) if _, err := io.ReadFull(br, payload); err != nil { return nil, err } + if cmd == 0x0 { + return &ProxyInfo{Local: true}, nil + } family := header[13] & 0x0f switch family { case 0x1: @@ -177,3 +182,21 @@ func atoiOrZero(value string) int { } return out } + +func readProxyV1Line(br *bufio.Reader, maxLen int) (string, error) { + if maxLen <= 0 { + maxLen = 256 + } + buf := make([]byte, 0, maxLen) + for len(buf) < maxLen { + b, err := br.ReadByte() + if err != nil { + return "", err + } + buf = append(buf, b) + if b == '\n' { + return string(buf), nil + } + } + return "", fmt.Errorf("proxy v1 header too long") +} From c41f927dbe4baa76bdc3f55c38a94ba6941927a6 Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Wed, 28 Jan 2026 15:20:50 +0100 Subject: [PATCH 3/3] ignore notes directory --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 0aa647c..47666b6 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,9 @@ coverage*.out /test/pkg/ /test/e2e/logs/ +# Local notes +/notes/ + # IDE/editor state .idea/ .vscode/