diff --git a/.gitignore b/.gitignore index 0aa647c..47666b6 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,9 @@ coverage*.out /test/pkg/ /test/e2e/logs/ +# Local notes +/notes/ + # IDE/editor state .idea/ .vscode/ diff --git a/Makefile b/Makefile index 2363ba0..245a79f 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -.PHONY: proto build test tidy lint generate docker-build docker-build-e2e-client docker-build-etcd-tools docker-clean ensure-minio start-minio stop-containers release-broker-ports test-produce-consume test-produce-consume-debug test-consumer-group test-ops-api test-mcp test-multi-segment-durability test-full test-operator demo demo-platform demo-platform-bootstrap iceberg-demo kafsql-demo platform-demo help clean-kind-all +.PHONY: proto build test tidy lint generate docker-build docker-build-e2e-client docker-build-etcd-tools docker-clean ensure-minio start-minio stop-containers release-broker-ports test-produce-consume test-produce-consume-debug test-consumer-group test-ops-api test-mcp test-multi-segment-durability test-full test-operator test-acl demo demo-platform demo-platform-bootstrap iceberg-demo kafsql-demo platform-demo help clean-kind-all REGISTRY ?= ghcr.io/kafscale STAMP_DIR ?= .build @@ -84,6 +84,9 @@ test: ## Run unit tests + vet + race go vet ./... go test -race ./... +test-acl: ## Run ACL e2e test (requires KAFSCALE_E2E=1) + KAFSCALE_E2E=1 go test -tags=e2e ./test/e2e -run TestACLsE2E + docker-build: docker-build-broker docker-build-operator docker-build-console docker-build-proxy docker-build-mcp docker-build-e2e-client docker-build-etcd-tools docker-build-sql-processor ## Build all container images @mkdir -p $(STAMP_DIR) diff --git a/cmd/broker/acl_test.go b/cmd/broker/acl_test.go new file mode 100644 index 0000000..65edcc8 --- /dev/null +++ b/cmd/broker/acl_test.go @@ -0,0 +1,515 @@ +// Copyright 2025, 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "context" + "encoding/binary" + "io" + "testing" + + "github.com/KafScale/platform/pkg/metadata" + "github.com/KafScale/platform/pkg/protocol" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func TestACLProduceDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[{"action":"fetch","resource":"topic","name":"orders"}]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.ProduceRequest{ + Acks: -1, + TimeoutMs: 1000, + Topics: []protocol.ProduceTopic{ + { + Name: "orders", + Partitions: []protocol.ProducePartition{ + {Partition: 0, Records: testBatchBytes(0, 0, 1)}, + }, + }, + }, + } + payload, err := handler.handleProduce(context.Background(), &protocol.RequestHeader{CorrelationID: 1, APIVersion: 0, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("handleProduce: %v", err) + } + resp := decodeProduceResponse(t, payload, 0) + if len(resp.Topics) != 1 || len(resp.Topics[0].Partitions) != 1 { + t.Fatalf("expected single topic/partition response") + } + if resp.Topics[0].Partitions[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %d", resp.Topics[0].Partitions[0].ErrorCode) + } + offset, err := store.NextOffset(context.Background(), "orders", 0) + if err != nil { + t.Fatalf("NextOffset: %v", err) + } + if offset != 0 { + t.Fatalf("expected offset unchanged, got %d", offset) + } +} + +func TestACLJoinGroupDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + joinReq := &protocol.JoinGroupRequest{GroupID: "group-a"} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 5, APIVersion: 4, ClientID: &clientID}, joinReq) + if err != nil { + t.Fatalf("Handle JoinGroup: %v", err) + } + resp := decodeJoinGroupResponse(t, payload, 4) + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.ErrorCode) + } +} + +func TestACLListOffsetsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.ListOffsetsRequest{ + Topics: []protocol.ListOffsetsTopic{ + { + Name: "orders", + Partitions: []protocol.ListOffsetsPartition{ + {Partition: 0, Timestamp: -1, MaxNumOffsets: 1}, + }, + }, + }, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 7, APIVersion: 4, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle ListOffsets: %v", err) + } + resp := decodeListOffsetsResponse(t, 4, payload) + if len(resp.Topics) != 1 || len(resp.Topics[0].Partitions) != 1 { + t.Fatalf("expected single topic/partition response") + } + if resp.Topics[0].Partitions[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %d", resp.Topics[0].Partitions[0].ErrorCode) + } +} + +func TestACLOffsetFetchDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.OffsetFetchRequest{ + GroupID: "group-a", + Topics: []protocol.OffsetFetchTopic{ + { + Name: "orders", + Partitions: []protocol.OffsetFetchPartition{ + {Partition: 0}, + }, + }, + }, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 8, APIVersion: 5, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle OffsetFetch: %v", err) + } + resp := decodeOffsetFetchResponse(t, payload, 5) + if len(resp.Topics) != 1 || len(resp.Topics[0].Partitions) != 1 { + t.Fatalf("expected single topic/partition response") + } + if resp.Topics[0].Partitions[0].ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.Topics[0].Partitions[0].ErrorCode) + } + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected top-level group auth failed, got %d", resp.ErrorCode) + } +} + +func TestACLSyncGroupDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.SyncGroupRequest{GroupID: "group-a", GenerationID: 1, MemberID: "member-a"} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 9, APIVersion: 4, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle SyncGroup: %v", err) + } + resp := decodeSyncGroupResponse(t, payload, 4) + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.ErrorCode) + } +} + +func TestACLHeartbeatDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.HeartbeatRequest{GroupID: "group-a", GenerationID: 1, MemberID: "member-a"} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 10, APIVersion: 4, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle Heartbeat: %v", err) + } + resp := decodeHeartbeatResponse(t, payload, 4) + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.ErrorCode) + } +} + +func TestACLLeaveGroupDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.LeaveGroupRequest{GroupID: "group-a", MemberID: "member-a"} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 11, APIVersion: 0, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle LeaveGroup: %v", err) + } + resp := decodeLeaveGroupResponse(t, payload) + if resp.ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.ErrorCode) + } +} + +func TestACLOffsetCommitDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.OffsetCommitRequest{ + GroupID: "group-a", + Topics: []protocol.OffsetCommitTopic{ + { + Name: "orders", + Partitions: []protocol.OffsetCommitPartition{ + {Partition: 0, Offset: 1, Metadata: ""}, + }, + }, + }, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 12, APIVersion: 3, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle OffsetCommit: %v", err) + } + resp := decodeOffsetCommitResponse(t, payload, 3) + if len(resp.Topics) == 0 || len(resp.Topics[0].Partitions) == 0 { + t.Fatalf("expected offset commit response") + } + if resp.Topics[0].Partitions[0].ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %d", resp.Topics[0].Partitions[0].ErrorCode) + } +} + +func TestACLCreateTopicsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.CreateTopicsRequest{ + Topics: []protocol.CreateTopicConfig{{Name: "orders", NumPartitions: 1, ReplicationFactor: 1}}, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 13, APIVersion: 0, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle CreateTopics: %v", err) + } + resp := decodeCreateTopicsResponse(t, payload, 0) + if len(resp.Topics) != 1 || resp.Topics[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %+v", resp.Topics) + } +} + +func TestACLDeleteTopicsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.DeleteTopicsRequest{TopicNames: []string{"orders"}} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 14, APIVersion: 0, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle DeleteTopics: %v", err) + } + resp := decodeDeleteTopicsResponse(t, payload, 0) + if len(resp.Topics) != 1 || resp.Topics[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %+v", resp.Topics) + } +} + +func TestACLAlterConfigsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.AlterConfigsRequest{ + Resources: []protocol.AlterConfigsResource{ + { + ResourceType: protocol.ConfigResourceTopic, + ResourceName: "orders", + }, + }, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 15, APIVersion: 1, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle AlterConfigs: %v", err) + } + resp := decodeAlterConfigsResponse(t, payload, 1) + if len(resp.Resources) != 1 || resp.Resources[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %+v", resp.Resources) + } +} + +func TestACLCreatePartitionsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.CreatePartitionsRequest{ + Topics: []protocol.CreatePartitionsTopic{{Name: "orders", Count: 2}}, + } + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 16, APIVersion: 3, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle CreatePartitions: %v", err) + } + resp := decodeCreatePartitionsResponse(t, payload, 3) + if len(resp.Topics) != 1 || resp.Topics[0].ErrorCode != protocol.TOPIC_AUTHORIZATION_FAILED { + t.Fatalf("expected topic auth failed, got %+v", resp.Topics) + } +} + +func TestACLDeleteGroupsDenied(t *testing.T) { + t.Setenv("KAFSCALE_ACL_ENABLED", "true") + t.Setenv("KAFSCALE_ACL_JSON", `{"default_policy":"deny","principals":[{"name":"client-a","allow":[]}]}`) + + store := metadata.NewInMemoryStore(defaultMetadata()) + handler := newTestHandler(store) + + clientID := "client-a" + req := &protocol.DeleteGroupsRequest{Groups: []string{"group-a"}} + payload, err := handler.Handle(context.Background(), &protocol.RequestHeader{CorrelationID: 17, APIVersion: 2, ClientID: &clientID}, req) + if err != nil { + t.Fatalf("Handle DeleteGroups: %v", err) + } + resp := decodeDeleteGroupsResponse(t, payload, 2) + if len(resp.Groups) != 1 || resp.Groups[0].ErrorCode != protocol.GROUP_AUTHORIZATION_FAILED { + t.Fatalf("expected group auth failed, got %+v", resp.Groups) + } +} + +func decodeOffsetFetchResponse(t *testing.T, payload []byte, version int16) *protocol.OffsetFetchResponse { + t.Helper() + reader := bytes.NewReader(payload) + resp := &protocol.OffsetFetchResponse{} + if err := binary.Read(reader, binary.BigEndian, &resp.CorrelationID); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if version >= 3 { + if err := binary.Read(reader, binary.BigEndian, &resp.ThrottleMs); err != nil { + t.Fatalf("read throttle: %v", err) + } + } + var topicCount int32 + if err := binary.Read(reader, binary.BigEndian, &topicCount); err != nil { + t.Fatalf("read topic count: %v", err) + } + resp.Topics = make([]protocol.OffsetFetchTopicResponse, 0, topicCount) + for i := 0; i < int(topicCount); i++ { + name := readKafkaString(t, reader) + var partCount int32 + if err := binary.Read(reader, binary.BigEndian, &partCount); err != nil { + t.Fatalf("read partition count: %v", err) + } + topic := protocol.OffsetFetchTopicResponse{Name: name} + topic.Partitions = make([]protocol.OffsetFetchPartitionResponse, 0, partCount) + for j := 0; j < int(partCount); j++ { + var part protocol.OffsetFetchPartitionResponse + if err := binary.Read(reader, binary.BigEndian, &part.Partition); err != nil { + t.Fatalf("read partition id: %v", err) + } + if err := binary.Read(reader, binary.BigEndian, &part.Offset); err != nil { + t.Fatalf("read offset: %v", err) + } + if version >= 5 { + if err := binary.Read(reader, binary.BigEndian, &part.LeaderEpoch); err != nil { + t.Fatalf("read leader epoch: %v", err) + } + } + part.Metadata = readKafkaNullableString(t, reader) + if err := binary.Read(reader, binary.BigEndian, &part.ErrorCode); err != nil { + t.Fatalf("read error code: %v", err) + } + topic.Partitions = append(topic.Partitions, part) + } + resp.Topics = append(resp.Topics, topic) + } + if version >= 2 { + if err := binary.Read(reader, binary.BigEndian, &resp.ErrorCode); err != nil { + t.Fatalf("read error code: %v", err) + } + } + return resp +} + +func decodeSyncGroupResponse(t *testing.T, payload []byte, version int16) *kmsg.SyncGroupResponse { + t.Helper() + reader := bytes.NewReader(payload) + var corr int32 + if err := binary.Read(reader, binary.BigEndian, &corr); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if version >= 4 { + skipTaggedFields(t, reader) + } + body, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("read response body: %v", err) + } + resp := kmsg.NewPtrSyncGroupResponse() + resp.Version = version + if err := resp.ReadFrom(body); err != nil { + t.Fatalf("decode sync group response: %v", err) + } + return resp +} + +func decodeHeartbeatResponse(t *testing.T, payload []byte, version int16) *kmsg.HeartbeatResponse { + t.Helper() + reader := bytes.NewReader(payload) + var corr int32 + if err := binary.Read(reader, binary.BigEndian, &corr); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if version >= 4 { + skipTaggedFields(t, reader) + } + body, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("read response body: %v", err) + } + resp := kmsg.NewPtrHeartbeatResponse() + resp.Version = version + if err := resp.ReadFrom(body); err != nil { + t.Fatalf("decode heartbeat response: %v", err) + } + return resp +} + +func decodeLeaveGroupResponse(t *testing.T, payload []byte) *protocol.LeaveGroupResponse { + t.Helper() + reader := bytes.NewReader(payload) + resp := &protocol.LeaveGroupResponse{} + if err := binary.Read(reader, binary.BigEndian, &resp.CorrelationID); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if err := binary.Read(reader, binary.BigEndian, &resp.ErrorCode); err != nil { + t.Fatalf("read error code: %v", err) + } + return resp +} + +func decodeAlterConfigsResponse(t *testing.T, payload []byte, version int16) *protocol.AlterConfigsResponse { + t.Helper() + if version != 1 { + t.Fatalf("alter configs decode only supports version 1") + } + reader := bytes.NewReader(payload) + resp := &protocol.AlterConfigsResponse{} + if err := binary.Read(reader, binary.BigEndian, &resp.CorrelationID); err != nil { + t.Fatalf("read correlation id: %v", err) + } + if err := binary.Read(reader, binary.BigEndian, &resp.ThrottleMs); err != nil { + t.Fatalf("read throttle ms: %v", err) + } + var count int32 + if err := binary.Read(reader, binary.BigEndian, &count); err != nil { + t.Fatalf("read resource count: %v", err) + } + resp.Resources = make([]protocol.AlterConfigsResponseResource, 0, count) + for i := 0; i < int(count); i++ { + var code int16 + if err := binary.Read(reader, binary.BigEndian, &code); err != nil { + t.Fatalf("read error code: %v", err) + } + msg := readKafkaNullableString(t, reader) + var rtype int8 + if err := binary.Read(reader, binary.BigEndian, &rtype); err != nil { + t.Fatalf("read resource type: %v", err) + } + name := readKafkaString(t, reader) + resp.Resources = append(resp.Resources, protocol.AlterConfigsResponseResource{ + ErrorCode: code, + ErrorMessage: msg, + ResourceType: rtype, + ResourceName: name, + }) + } + return resp +} + +func readKafkaNullableString(t *testing.T, reader *bytes.Reader) *string { + t.Helper() + var length int16 + if err := binary.Read(reader, binary.BigEndian, &length); err != nil { + t.Fatalf("read nullable string length: %v", err) + } + if length < 0 { + return nil + } + buf := make([]byte, length) + if _, err := reader.Read(buf); err != nil { + t.Fatalf("read nullable string: %v", err) + } + value := string(buf) + return &value +} diff --git a/cmd/broker/auth_metrics.go b/cmd/broker/auth_metrics.go new file mode 100644 index 0000000..7562624 --- /dev/null +++ b/cmd/broker/auth_metrics.go @@ -0,0 +1,72 @@ +// Copyright 2025, 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "io" + "sync" + "sync/atomic" + + "github.com/KafScale/platform/pkg/acl" +) + +type authMetrics struct { + deniedTotal uint64 + mu sync.Mutex + byKey map[string]uint64 +} + +func newAuthMetrics() *authMetrics { + return &authMetrics{byKey: make(map[string]uint64)} +} + +func (m *authMetrics) RecordDenied(action acl.Action, resource acl.Resource) { + if m == nil { + return + } + atomic.AddUint64(&m.deniedTotal, 1) + key := fmt.Sprintf("%s|%s", action, resource) + m.mu.Lock() + m.byKey[key]++ + m.mu.Unlock() +} + +func (m *authMetrics) writePrometheus(w io.Writer) { + if m == nil { + return + } + total := atomic.LoadUint64(&m.deniedTotal) + fmt.Fprintln(w, "# HELP kafscale_authz_denied_total Authorization denials across broker APIs.") + fmt.Fprintln(w, "# TYPE kafscale_authz_denied_total counter") + fmt.Fprintf(w, "kafscale_authz_denied_total %d\n", total) + + m.mu.Lock() + defer m.mu.Unlock() + for key, count := range m.byKey { + action, resource := splitAuthMetricKey(key) + fmt.Fprintf(w, "kafscale_authz_denied_total{action=%q,resource=%q} %d\n", action, resource, count) + } +} + +func splitAuthMetricKey(key string) (string, string) { + for i := 0; i < len(key); i++ { + if key[i] == '|' { + return key[:i], key[i+1:] + } + } + return key, "" +} diff --git a/cmd/broker/main.go b/cmd/broker/main.go index d3b8182..2a6bf02 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -17,6 +17,7 @@ package main import ( "context" + "encoding/json" "errors" "fmt" "log/slog" @@ -30,6 +31,7 @@ import ( "syscall" "time" + "github.com/KafScale/platform/pkg/acl" "github.com/KafScale/platform/pkg/broker" "github.com/KafScale/platform/pkg/cache" controlpb "github.com/KafScale/platform/pkg/gen/control" @@ -85,6 +87,10 @@ type handler struct { flushInterval time.Duration flushOnAck bool adminMetrics *adminMetrics + authorizer *acl.Authorizer + authMetrics *authMetrics + authLogMu sync.Mutex + authLogLast map[string]time.Time } type etcdAvailability interface { @@ -95,6 +101,7 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re if h.traceKafka { h.logger.Debug("received request", "api_key", header.APIKey, "api_version", header.APIVersion, "correlation", header.CorrelationID, "client_id", header.ClientID) } + principal := principalFromContext(ctx, header) switch req.(type) { case *protocol.ApiVersionsRequest: errorCode := protocol.NONE @@ -207,6 +214,15 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re } return protocol.EncodeFindCoordinatorResponse(resp, header.APIVersion) case *protocol.JoinGroupRequest: + req := req.(*protocol.JoinGroupRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupWrite) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupWrite, acl.ResourceGroup, req.GroupID) + return protocol.EncodeJoinGroupResponse(&protocol.JoinGroupResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }, header.APIVersion) + } if !h.etcdAvailable() { return protocol.EncodeJoinGroupResponse(&protocol.JoinGroupResponse{ CorrelationID: header.CorrelationID, @@ -214,12 +230,21 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } - resp, err := h.coordinator.JoinGroup(ctx, req.(*protocol.JoinGroupRequest), header.CorrelationID) + resp, err := h.coordinator.JoinGroup(ctx, req, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeJoinGroupResponse(resp, header.APIVersion) case *protocol.SyncGroupRequest: + req := req.(*protocol.SyncGroupRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupWrite) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupWrite, acl.ResourceGroup, req.GroupID) + return protocol.EncodeSyncGroupResponse(&protocol.SyncGroupResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }, header.APIVersion) + } if !h.etcdAvailable() { return protocol.EncodeSyncGroupResponse(&protocol.SyncGroupResponse{ CorrelationID: header.CorrelationID, @@ -227,15 +252,30 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } - resp, err := h.coordinator.SyncGroup(ctx, req.(*protocol.SyncGroupRequest), header.CorrelationID) + resp, err := h.coordinator.SyncGroup(ctx, req, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeSyncGroupResponse(resp, header.APIVersion) case *protocol.DescribeGroupsRequest: + req := req.(*protocol.DescribeGroupsRequest) + if !h.allowGroups(principal, req.Groups, acl.ActionGroupRead) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupRead, acl.ResourceGroup, strings.Join(req.Groups, ",")) + results := make([]protocol.DescribeGroupsResponseGroup, 0, len(req.Groups)) + for _, groupID := range req.Groups { + results = append(results, protocol.DescribeGroupsResponseGroup{ + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + GroupID: groupID, + }) + } + return protocol.EncodeDescribeGroupsResponse(&protocol.DescribeGroupsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Groups: results, + }, header.APIVersion) + } return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { if !h.etcdAvailable() { - req := req.(*protocol.DescribeGroupsRequest) results := make([]protocol.DescribeGroupsResponseGroup, 0, len(req.Groups)) for _, groupID := range req.Groups { results = append(results, protocol.DescribeGroupsResponseGroup{ @@ -249,13 +289,22 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re Groups: results, }, header.APIVersion) } - resp, err := h.coordinator.DescribeGroups(ctx, req.(*protocol.DescribeGroupsRequest), header.CorrelationID) + resp, err := h.coordinator.DescribeGroups(ctx, req, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeDescribeGroupsResponse(resp, header.APIVersion) }) case *protocol.ListGroupsRequest: + if !h.allowGroup(principal, "*", acl.ActionGroupRead) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupRead, acl.ResourceGroup, "*") + return protocol.EncodeListGroupsResponse(&protocol.ListGroupsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + Groups: nil, + }, header.APIVersion) + } return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { if !h.etcdAvailable() { return protocol.EncodeListGroupsResponse(&protocol.ListGroupsResponse{ @@ -272,6 +321,15 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re return protocol.EncodeListGroupsResponse(resp, header.APIVersion) }) case *protocol.HeartbeatRequest: + req := req.(*protocol.HeartbeatRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupWrite) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupWrite, acl.ResourceGroup, req.GroupID) + return protocol.EncodeHeartbeatResponse(&protocol.HeartbeatResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }, header.APIVersion) + } if !h.etcdAvailable() { return protocol.EncodeHeartbeatResponse(&protocol.HeartbeatResponse{ CorrelationID: header.CorrelationID, @@ -279,20 +337,50 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } - resp := h.coordinator.Heartbeat(ctx, req.(*protocol.HeartbeatRequest), header.CorrelationID) + resp := h.coordinator.Heartbeat(ctx, req, header.CorrelationID) return protocol.EncodeHeartbeatResponse(resp, header.APIVersion) case *protocol.LeaveGroupRequest: + req := req.(*protocol.LeaveGroupRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupWrite) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupWrite, acl.ResourceGroup, req.GroupID) + return protocol.EncodeLeaveGroupResponse(&protocol.LeaveGroupResponse{ + CorrelationID: header.CorrelationID, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }) + } if !h.etcdAvailable() { return protocol.EncodeLeaveGroupResponse(&protocol.LeaveGroupResponse{ CorrelationID: header.CorrelationID, ErrorCode: protocol.REQUEST_TIMED_OUT, }) } - resp := h.coordinator.LeaveGroup(ctx, req.(*protocol.LeaveGroupRequest), header.CorrelationID) + resp := h.coordinator.LeaveGroup(ctx, req, header.CorrelationID) return protocol.EncodeLeaveGroupResponse(resp) case *protocol.OffsetCommitRequest: + req := req.(*protocol.OffsetCommitRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupWrite) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupWrite, acl.ResourceGroup, req.GroupID) + topics := make([]protocol.OffsetCommitTopicResponse, 0, len(req.Topics)) + for _, topic := range req.Topics { + partitions := make([]protocol.OffsetCommitPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitions = append(partitions, protocol.OffsetCommitPartitionResponse{ + Partition: part.Partition, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }) + } + topics = append(topics, protocol.OffsetCommitTopicResponse{ + Name: topic.Name, + Partitions: partitions, + }) + } + return protocol.EncodeOffsetCommitResponse(&protocol.OffsetCommitResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: topics, + }) + } if !h.etcdAvailable() { - req := req.(*protocol.OffsetCommitRequest) topics := make([]protocol.OffsetCommitTopicResponse, 0, len(req.Topics)) for _, topic := range req.Topics { partitions := make([]protocol.OffsetCommitPartitionResponse, 0, len(topic.Partitions)) @@ -313,14 +401,39 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re Topics: topics, }) } - resp, err := h.coordinator.OffsetCommit(ctx, req.(*protocol.OffsetCommitRequest), header.CorrelationID) + resp, err := h.coordinator.OffsetCommit(ctx, req, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeOffsetCommitResponse(resp) case *protocol.OffsetFetchRequest: + req := req.(*protocol.OffsetFetchRequest) + if !h.allowGroup(principal, req.GroupID, acl.ActionGroupRead) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupRead, acl.ResourceGroup, req.GroupID) + topics := make([]protocol.OffsetFetchTopicResponse, 0, len(req.Topics)) + for _, topic := range req.Topics { + partitions := make([]protocol.OffsetFetchPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitions = append(partitions, protocol.OffsetFetchPartitionResponse{ + Partition: part.Partition, + Offset: -1, + LeaderEpoch: -1, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }) + } + topics = append(topics, protocol.OffsetFetchTopicResponse{ + Name: topic.Name, + Partitions: partitions, + }) + } + return protocol.EncodeOffsetFetchResponse(&protocol.OffsetFetchResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: topics, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }, header.APIVersion) + } if !h.etcdAvailable() { - req := req.(*protocol.OffsetFetchRequest) topics := make([]protocol.OffsetFetchTopicResponse, 0, len(req.Topics)) for _, topic := range req.Topics { partitions := make([]protocol.OffsetFetchPartitionResponse, 0, len(topic.Partitions)) @@ -344,14 +457,18 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re ErrorCode: protocol.REQUEST_TIMED_OUT, }, header.APIVersion) } - resp, err := h.coordinator.OffsetFetch(ctx, req.(*protocol.OffsetFetchRequest), header.CorrelationID) + resp, err := h.coordinator.OffsetFetch(ctx, req, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeOffsetFetchResponse(resp, header.APIVersion) case *protocol.OffsetForLeaderEpochRequest: return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { - return h.handleOffsetForLeaderEpoch(ctx, header, req.(*protocol.OffsetForLeaderEpochRequest)) + offsetReq := req.(*protocol.OffsetForLeaderEpochRequest) + if !h.allowTopics(principal, topicsFromOffsetForLeaderEpoch(offsetReq), acl.ActionFetch) { + return h.unauthorizedOffsetForLeaderEpoch(principal, header, offsetReq) + } + return h.handleOffsetForLeaderEpoch(ctx, header, offsetReq) }) case *protocol.DescribeConfigsRequest: return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { @@ -359,18 +476,29 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re }) case *protocol.AlterConfigsRequest: return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { - return h.handleAlterConfigs(ctx, header, req.(*protocol.AlterConfigsRequest)) + alterReq := req.(*protocol.AlterConfigsRequest) + if !h.allowAdmin(principal) { + return h.unauthorizedAlterConfigs(principal, header, alterReq) + } + return h.handleAlterConfigs(ctx, header, alterReq) }) case *protocol.CreatePartitionsRequest: return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { - return h.handleCreatePartitions(ctx, header, req.(*protocol.CreatePartitionsRequest)) + createReq := req.(*protocol.CreatePartitionsRequest) + if !h.allowAdmin(principal) { + return h.unauthorizedCreatePartitions(principal, header, createReq) + } + return h.handleCreatePartitions(ctx, header, createReq) }) case *protocol.DeleteGroupsRequest: return h.withAdminMetrics(header.APIKey, func() ([]byte, error) { + deleteReq := req.(*protocol.DeleteGroupsRequest) + if !h.allowGroups(principal, deleteReq.Groups, acl.ActionGroupAdmin) { + return h.unauthorizedDeleteGroups(principal, header, deleteReq) + } if !h.etcdAvailable() { - req := req.(*protocol.DeleteGroupsRequest) - results := make([]protocol.DeleteGroupsResponseGroup, 0, len(req.Groups)) - for _, groupID := range req.Groups { + results := make([]protocol.DeleteGroupsResponseGroup, 0, len(deleteReq.Groups)) + for _, groupID := range deleteReq.Groups { results = append(results, protocol.DeleteGroupsResponseGroup{ Group: groupID, ErrorCode: protocol.REQUEST_TIMED_OUT, @@ -382,18 +510,30 @@ func (h *handler) Handle(ctx context.Context, header *protocol.RequestHeader, re Groups: results, }, header.APIVersion) } - resp, err := h.coordinator.DeleteGroups(ctx, req.(*protocol.DeleteGroupsRequest), header.CorrelationID) + resp, err := h.coordinator.DeleteGroups(ctx, deleteReq, header.CorrelationID) if err != nil { return nil, err } return protocol.EncodeDeleteGroupsResponse(resp, header.APIVersion) }) case *protocol.CreateTopicsRequest: - return h.handleCreateTopics(ctx, header, req.(*protocol.CreateTopicsRequest)) + createReq := req.(*protocol.CreateTopicsRequest) + if !h.allowAdmin(principal) { + return h.unauthorizedCreateTopics(principal, header, createReq) + } + return h.handleCreateTopics(ctx, header, createReq) case *protocol.DeleteTopicsRequest: - return h.handleDeleteTopics(ctx, header, req.(*protocol.DeleteTopicsRequest)) + deleteReq := req.(*protocol.DeleteTopicsRequest) + if !h.allowAdmin(principal) { + return h.unauthorizedDeleteTopics(principal, header, deleteReq) + } + return h.handleDeleteTopics(ctx, header, deleteReq) case *protocol.ListOffsetsRequest: - return h.handleListOffsets(ctx, header, req.(*protocol.ListOffsetsRequest)) + listReq := req.(*protocol.ListOffsetsRequest) + if !h.allowTopics(principal, topicsFromListOffsets(listReq), acl.ActionFetch) { + return h.unauthorizedListOffsets(principal, header, listReq) + } + return h.handleListOffsets(ctx, header, listReq) default: return nil, ErrUnsupportedAPI } @@ -471,6 +611,7 @@ func (h *handler) metricsHandler(w http.ResponseWriter, r *http.Request) { } h.writeRuntimeMetrics(w) h.adminMetrics.writePrometheus(w) + h.authMetrics.writePrometheus(w) } func (h *handler) recordS3Op(op string, latency time.Duration, err error) { @@ -487,6 +628,38 @@ func (h *handler) recordProduceLatency(latency time.Duration) { h.produceLatency.Observe(float64(latency.Milliseconds())) } +func (h *handler) recordAuthzDenied(action acl.Action, resource acl.Resource) { + if h.authMetrics == nil { + return + } + h.authMetrics.RecordDenied(action, resource) +} + +func (h *handler) recordAuthzDeniedWithPrincipal(principal string, action acl.Action, resource acl.Resource, name string) { + h.recordAuthzDenied(action, resource) + h.logAuthzDenied(principal, action, resource, name) +} + +func (h *handler) logAuthzDenied(principal string, action acl.Action, resource acl.Resource, name string) { + if h == nil || h.logger == nil { + return + } + key := fmt.Sprintf("%s|%s|%s|%s", action, resource, principal, name) + now := time.Now() + h.authLogMu.Lock() + if len(h.authLogLast) > 10000 { + h.authLogLast = make(map[string]time.Time) + } + last := h.authLogLast[key] + if now.Sub(last) < time.Minute { + h.authLogMu.Unlock() + return + } + h.authLogLast[key] = now + h.authLogMu.Unlock() + h.logger.Warn("authorization denied", "principal", principal, "action", action, "resource", resource, "name", name) +} + func (h *handler) withAdminMetrics(apiKey int16, fn func() ([]byte, error)) ([]byte, error) { start := time.Now() payload, err := fn() @@ -494,6 +667,231 @@ func (h *handler) withAdminMetrics(apiKey int16, fn func() ([]byte, error)) ([]b return payload, err } +func principalFromContext(ctx context.Context, header *protocol.RequestHeader) string { + if info := broker.ConnInfoFromContext(ctx); info != nil { + if strings.TrimSpace(info.Principal) != "" { + return strings.TrimSpace(info.Principal) + } + } + if header == nil || header.ClientID == nil { + return "anonymous" + } + if strings.TrimSpace(*header.ClientID) == "" { + return "anonymous" + } + return *header.ClientID +} + +func (h *handler) allowTopic(principal string, topic string, action acl.Action) bool { + if h.authorizer == nil || !h.authorizer.Enabled() { + return true + } + return h.authorizer.Allows(principal, action, acl.ResourceTopic, topic) +} + +func (h *handler) allowTopics(principal string, topics []string, action acl.Action) bool { + for _, topic := range topics { + if !h.allowTopic(principal, topic, action) { + return false + } + } + return true +} + +func (h *handler) allowGroup(principal string, group string, action acl.Action) bool { + if h.authorizer == nil || !h.authorizer.Enabled() { + return true + } + return h.authorizer.Allows(principal, action, acl.ResourceGroup, group) +} + +func (h *handler) allowGroups(principal string, groups []string, action acl.Action) bool { + for _, group := range groups { + if !h.allowGroup(principal, group, action) { + return false + } + } + return true +} + +func (h *handler) allowCluster(principal string, action acl.Action) bool { + if h.authorizer == nil || !h.authorizer.Enabled() { + return true + } + return h.authorizer.Allows(principal, action, acl.ResourceCluster, "cluster") +} + +func (h *handler) allowAdmin(principal string) bool { + return h.allowCluster(principal, acl.ActionAdmin) +} + +func topicsFromListOffsets(req *protocol.ListOffsetsRequest) []string { + topics := make([]string, 0, len(req.Topics)) + for _, topic := range req.Topics { + topics = append(topics, topic.Name) + } + return topics +} + +func topicsFromOffsetForLeaderEpoch(req *protocol.OffsetForLeaderEpochRequest) []string { + topics := make([]string, 0, len(req.Topics)) + for _, topic := range req.Topics { + topics = append(topics, topic.Name) + } + return topics +} + +func topicsFromCreateTopics(req *protocol.CreateTopicsRequest) []string { + topics := make([]string, 0, len(req.Topics)) + for _, topic := range req.Topics { + topics = append(topics, topic.Name) + } + return topics +} + +func topicsFromCreatePartitions(req *protocol.CreatePartitionsRequest) []string { + topics := make([]string, 0, len(req.Topics)) + for _, topic := range req.Topics { + topics = append(topics, topic.Name) + } + return topics +} + +func (h *handler) unauthorizedOffsetForLeaderEpoch(principal string, header *protocol.RequestHeader, req *protocol.OffsetForLeaderEpochRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionFetch, acl.ResourceTopic, strings.Join(topicsFromOffsetForLeaderEpoch(req), ",")) + respTopics := make([]protocol.OffsetForLeaderEpochTopicResponse, 0, len(req.Topics)) + for _, topic := range req.Topics { + partitions := make([]protocol.OffsetForLeaderEpochPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitions = append(partitions, protocol.OffsetForLeaderEpochPartitionResponse{ + Partition: part.Partition, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + LeaderEpoch: -1, + EndOffset: -1, + }) + } + respTopics = append(respTopics, protocol.OffsetForLeaderEpochTopicResponse{ + Name: topic.Name, + Partitions: partitions, + }) + } + return protocol.EncodeOffsetForLeaderEpochResponse(&protocol.OffsetForLeaderEpochResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: respTopics, + }, header.APIVersion) +} + +func (h *handler) unauthorizedAlterConfigs(principal string, header *protocol.RequestHeader, req *protocol.AlterConfigsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceCluster, "cluster") + resources := make([]protocol.AlterConfigsResponseResource, 0, len(req.Resources)) + for _, resource := range req.Resources { + errorCode := protocol.CLUSTER_AUTHORIZATION_FAILED + if resource.ResourceType == protocol.ConfigResourceTopic { + errorCode = protocol.TOPIC_AUTHORIZATION_FAILED + } + resources = append(resources, protocol.AlterConfigsResponseResource{ + ErrorCode: errorCode, + ResourceType: resource.ResourceType, + ResourceName: resource.ResourceName, + }) + } + return protocol.EncodeAlterConfigsResponse(&protocol.AlterConfigsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Resources: resources, + }, header.APIVersion) +} + +func (h *handler) unauthorizedCreatePartitions(principal string, header *protocol.RequestHeader, req *protocol.CreatePartitionsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceTopic, strings.Join(topicsFromCreatePartitions(req), ",")) + results := make([]protocol.CreatePartitionsResponseTopic, 0, len(req.Topics)) + for _, topic := range req.Topics { + results = append(results, protocol.CreatePartitionsResponseTopic{ + Name: topic.Name, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + }) + } + return protocol.EncodeCreatePartitionsResponse(&protocol.CreatePartitionsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: results, + }, header.APIVersion) +} + +func (h *handler) unauthorizedDeleteGroups(principal string, header *protocol.RequestHeader, req *protocol.DeleteGroupsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionGroupAdmin, acl.ResourceGroup, strings.Join(req.Groups, ",")) + results := make([]protocol.DeleteGroupsResponseGroup, 0, len(req.Groups)) + for _, groupID := range req.Groups { + results = append(results, protocol.DeleteGroupsResponseGroup{ + Group: groupID, + ErrorCode: protocol.GROUP_AUTHORIZATION_FAILED, + }) + } + return protocol.EncodeDeleteGroupsResponse(&protocol.DeleteGroupsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Groups: results, + }, header.APIVersion) +} + +func (h *handler) unauthorizedCreateTopics(principal string, header *protocol.RequestHeader, req *protocol.CreateTopicsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceTopic, strings.Join(topicsFromCreateTopics(req), ",")) + results := make([]protocol.CreateTopicResult, 0, len(req.Topics)) + for _, topic := range req.Topics { + results = append(results, protocol.CreateTopicResult{ + Name: topic.Name, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + ErrorMessage: "unauthorized", + }) + } + return protocol.EncodeCreateTopicsResponse(&protocol.CreateTopicsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: results, + }, header.APIVersion) +} + +func (h *handler) unauthorizedDeleteTopics(principal string, header *protocol.RequestHeader, req *protocol.DeleteTopicsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceTopic, strings.Join(req.TopicNames, ",")) + results := make([]protocol.DeleteTopicResult, 0, len(req.TopicNames)) + for _, name := range req.TopicNames { + results = append(results, protocol.DeleteTopicResult{ + Name: name, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + ErrorMessage: "unauthorized", + }) + } + return protocol.EncodeDeleteTopicsResponse(&protocol.DeleteTopicsResponse{ + CorrelationID: header.CorrelationID, + ThrottleMs: 0, + Topics: results, + }, header.APIVersion) +} + +func (h *handler) unauthorizedListOffsets(principal string, header *protocol.RequestHeader, req *protocol.ListOffsetsRequest) ([]byte, error) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionFetch, acl.ResourceTopic, strings.Join(topicsFromListOffsets(req), ",")) + topicResponses := make([]protocol.ListOffsetsTopicResponse, 0, len(req.Topics)) + for _, topic := range req.Topics { + partitions := make([]protocol.ListOffsetsPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitions = append(partitions, protocol.ListOffsetsPartitionResponse{ + Partition: part.Partition, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + LeaderEpoch: -1, + }) + } + topicResponses = append(topicResponses, protocol.ListOffsetsTopicResponse{ + Name: topic.Name, + Partitions: partitions, + }) + } + return protocol.EncodeListOffsetsResponse(header.APIVersion, &protocol.ListOffsetsResponse{ + CorrelationID: header.CorrelationID, + Topics: topicResponses, + }) +} + func (h *handler) handleProduce(ctx context.Context, header *protocol.RequestHeader, req *protocol.ProduceRequest) ([]byte, error) { start := time.Now() defer func() { @@ -502,12 +900,27 @@ func (h *handler) handleProduce(ctx context.Context, header *protocol.RequestHea topicResponses := make([]protocol.ProduceTopicResponse, 0, len(req.Topics)) now := time.Now().UnixMilli() var producedMessages int64 + principal := principalFromContext(ctx, header) for _, topic := range req.Topics { if h.traceKafka { h.logger.Debug("produce request received", "topic", topic.Name, "partitions", len(topic.Partitions), "acks", req.Acks, "timeout_ms", req.TimeoutMs) } partitionResponses := make([]protocol.ProducePartitionResponse, 0, len(topic.Partitions)) + if !h.allowTopic(principal, topic.Name, acl.ActionProduce) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionProduce, acl.ResourceTopic, topic.Name) + for _, part := range topic.Partitions { + partitionResponses = append(partitionResponses, protocol.ProducePartitionResponse{ + Partition: part.Partition, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + }) + } + topicResponses = append(topicResponses, protocol.ProduceTopicResponse{ + Name: topic.Name, + Partitions: partitionResponses, + }) + continue + } for _, part := range topic.Partitions { if !h.etcdAvailable() { partitionResponses = append(partitionResponses, protocol.ProducePartitionResponse{ @@ -852,9 +1265,19 @@ func (h *handler) handleOffsetForLeaderEpoch(ctx context.Context, header *protoc func (h *handler) handleDescribeConfigs(ctx context.Context, header *protocol.RequestHeader, req *protocol.DescribeConfigsRequest) ([]byte, error) { resources := make([]protocol.DescribeConfigsResponseResource, 0, len(req.Resources)) + principal := principalFromContext(ctx, header) for _, resource := range req.Resources { switch resource.ResourceType { case protocol.ConfigResourceTopic: + if !h.allowTopic(principal, resource.ResourceName, acl.ActionFetch) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionFetch, acl.ResourceTopic, resource.ResourceName) + resources = append(resources, protocol.DescribeConfigsResponseResource{ + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + ResourceType: resource.ResourceType, + ResourceName: resource.ResourceName, + }) + continue + } cfg, err := h.store.FetchTopicConfig(ctx, resource.ResourceName) if err != nil { resources = append(resources, protocol.DescribeConfigsResponseResource{ @@ -872,6 +1295,15 @@ func (h *handler) handleDescribeConfigs(ctx context.Context, header *protocol.Re Configs: configs, }) case protocol.ConfigResourceBroker: + if !h.allowAdmin(principal) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceCluster, "cluster") + resources = append(resources, protocol.DescribeConfigsResponseResource{ + ErrorCode: protocol.CLUSTER_AUTHORIZATION_FAILED, + ResourceType: resource.ResourceType, + ResourceName: resource.ResourceName, + }) + continue + } configs := h.brokerConfigEntries(resource.ConfigNames) resources = append(resources, protocol.DescribeConfigsResponseResource{ ErrorCode: protocol.NONE, @@ -880,6 +1312,15 @@ func (h *handler) handleDescribeConfigs(ctx context.Context, header *protocol.Re Configs: configs, }) default: + if !h.allowAdmin(principal) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionAdmin, acl.ResourceCluster, "cluster") + resources = append(resources, protocol.DescribeConfigsResponseResource{ + ErrorCode: protocol.CLUSTER_AUTHORIZATION_FAILED, + ResourceType: resource.ResourceType, + ResourceName: resource.ResourceName, + }) + continue + } resources = append(resources, protocol.DescribeConfigsResponseResource{ ErrorCode: protocol.INVALID_REQUEST, ResourceType: resource.ResourceType, @@ -1231,6 +1672,7 @@ func (h *handler) handleFetch(ctx context.Context, header *protocol.RequestHeade var fetchedMessages int64 zeroID := [16]byte{} idToName := map[[16]byte]string{} + principal := principalFromContext(ctx, header) for _, topic := range req.Topics { if topic.TopicID != zeroID { meta, err := h.store.Metadata(ctx, nil) @@ -1265,6 +1707,22 @@ func (h *handler) handleFetch(ctx context.Context, header *protocol.RequestHeade continue } } + if !h.allowTopic(principal, topicName, acl.ActionFetch) { + h.recordAuthzDeniedWithPrincipal(principal, acl.ActionFetch, acl.ResourceTopic, topicName) + partitionResponses := make([]protocol.FetchPartitionResponse, 0, len(topic.Partitions)) + for _, part := range topic.Partitions { + partitionResponses = append(partitionResponses, protocol.FetchPartitionResponse{ + Partition: part.Partition, + ErrorCode: protocol.TOPIC_AUTHORIZATION_FAILED, + }) + } + topicResponses = append(topicResponses, protocol.FetchTopicResponse{ + Name: topicName, + TopicID: topic.TopicID, + Partitions: partitionResponses, + }) + continue + } partitionResponses := make([]protocol.FetchPartitionResponse, 0, len(topic.Partitions)) for _, part := range topic.Partitions { if h.traceKafka { @@ -1497,6 +1955,7 @@ func newHandler(store metadata.Store, s3Client storage.S3Client, brokerInfo prot autoPartitions = 1 } health := broker.NewS3HealthMonitor(s3HealthConfigFromEnv()) + authorizer := buildAuthorizerFromEnv(logger) return &handler{ apiVersions: generateApiVersions(), store: store, @@ -1535,6 +1994,9 @@ func newHandler(store metadata.Store, s3Client storage.S3Client, brokerInfo prot flushInterval: flushInterval, flushOnAck: flushOnAck, adminMetrics: newAdminMetrics(), + authorizer: authorizer, + authMetrics: newAuthMetrics(), + authLogLast: make(map[string]time.Time), } } @@ -1602,8 +2064,9 @@ func main() { startControlServer(ctx, controlAddr, handler, logger) kafkaAddr := envOrDefault("KAFSCALE_BROKER_ADDR", defaultKafkaAddr) srv := &broker.Server{ - Addr: kafkaAddr, - Handler: handler, + Addr: kafkaAddr, + Handler: handler, + ConnContextFunc: buildConnContextFunc(logger), } if err := srv.ListenAndServe(ctx); err != nil { logger.Error("broker server error", "error", err) @@ -1700,6 +2163,134 @@ func buildS3ConfigsFromEnv() (storage.S3Config, storage.S3Config, bool, bool, bo return writeCfg, readCfg, false, usingDefaultMinio, credsProvided, useReadReplica } +func buildConnContextFunc(logger *slog.Logger) broker.ConnContextFunc { + source := strings.TrimSpace(os.Getenv("KAFSCALE_PRINCIPAL_SOURCE")) + if source == "" { + source = "client_id" + } + proxyProtocol := parseEnvBool("KAFSCALE_PROXY_PROTOCOL", false) + if strings.EqualFold(source, "proxy_addr") { + proxyProtocol = true + } + if strings.EqualFold(source, "client_id") && !proxyProtocol { + if parseEnvBool("KAFSCALE_ACL_ENABLED", false) { + if logger == nil { + logger = slog.Default() + } + logger.Warn("ACL enabled with client_id principal source; client.id is spoofable without trusted edge auth") + } + return nil + } + if logger == nil { + logger = slog.Default() + } + if parseEnvBool("KAFSCALE_ACL_ENABLED", false) && strings.EqualFold(source, "client_id") { + logger.Warn("ACL enabled with client_id principal source; client.id is spoofable without trusted edge auth") + } + if proxyProtocol && strings.EqualFold(source, "client_id") { + logger.Warn("proxy protocol enabled but principal source remains client_id; proxy identity is unused") + } + return func(conn net.Conn) (net.Conn, *broker.ConnContext, error) { + info := &broker.ConnContext{} + var proxyInfo *broker.ProxyInfo + if proxyProtocol { + wrapped, parsed, err := broker.ReadProxyProtocol(conn) + if err != nil { + logger.Warn("proxy protocol parse failed", "error", err) + return conn, nil, err + } + if parsed == nil { + return conn, nil, fmt.Errorf("proxy protocol required but header missing") + } + if parsed.Local { + logger.Warn("proxy protocol local command received; using socket remote addr") + } + if wrapped != nil { + conn = wrapped + } + proxyInfo = parsed + if proxyInfo != nil { + if !proxyInfo.Local && proxyInfo.SourceAddr != "" { + info.ProxyAddr = proxyInfo.SourceAddr + info.RemoteAddr = proxyInfo.SourceAddr + } + } + } + if info.RemoteAddr == "" { + info.RemoteAddr = conn.RemoteAddr().String() + } + switch strings.ToLower(source) { + case "remote_addr": + info.Principal = hostFromAddr(info.RemoteAddr) + case "proxy_addr": + if proxyInfo != nil && proxyInfo.SourceAddr != "" { + info.Principal = hostFromAddr(proxyInfo.SourceAddr) + } else { + info.Principal = hostFromAddr(info.RemoteAddr) + } + case "client_id": + // Default; use client.id from the Kafka request header. + default: + logger.Warn("unknown principal source; defaulting to client_id", "source", source) + } + return conn, info, nil + } +} + +func hostFromAddr(addr string) string { + if addr == "" { + return "" + } + host, _, err := net.SplitHostPort(addr) + if err != nil { + return addr + } + return host +} + +func buildAuthorizerFromEnv(logger *slog.Logger) *acl.Authorizer { + if !parseEnvBool("KAFSCALE_ACL_ENABLED", false) { + return acl.NewAuthorizer(acl.Config{Enabled: false}) + } + failOpen := parseEnvBool("KAFSCALE_ACL_FAIL_OPEN", false) + if logger == nil { + logger = slog.Default() + } + var raw []byte + if inline := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_JSON")); inline != "" { + raw = []byte(inline) + } else if path := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_FILE")); path != "" { + data, err := os.ReadFile(path) + if err != nil { + if failOpen { + logger.Warn("failed to read ACL file; ACL disabled", "error", err) + return acl.NewAuthorizer(acl.Config{Enabled: false}) + } + logger.Warn("failed to read ACL file; default deny enabled", "error", err) + return acl.NewAuthorizer(acl.Config{Enabled: true, DefaultPolicy: "deny"}) + } + raw = data + } else { + if failOpen { + logger.Warn("ACL enabled but no config provided; ACL disabled") + return acl.NewAuthorizer(acl.Config{Enabled: false}) + } + logger.Warn("ACL enabled but no config provided; default deny enabled") + return acl.NewAuthorizer(acl.Config{Enabled: true, DefaultPolicy: "deny"}) + } + var cfg acl.Config + if err := json.Unmarshal(raw, &cfg); err != nil { + if failOpen { + logger.Warn("failed to parse ACL config; ACL disabled", "error", err) + return acl.NewAuthorizer(acl.Config{Enabled: false}) + } + logger.Warn("failed to parse ACL config; default deny enabled", "error", err) + return acl.NewAuthorizer(acl.Config{Enabled: true, DefaultPolicy: "deny"}) + } + cfg.Enabled = true + return acl.NewAuthorizer(cfg) +} + func s3HealthConfigFromEnv() broker.S3HealthConfig { return broker.S3HealthConfig{ Window: time.Duration(parseEnvInt("KAFSCALE_S3_HEALTH_WINDOW_SEC", 60)) * time.Second, diff --git a/deploy/helm/kafscale/templates/operator-deployment.yaml b/deploy/helm/kafscale/templates/operator-deployment.yaml index dde40cd..37368aa 100644 --- a/deploy/helm/kafscale/templates/operator-deployment.yaml +++ b/deploy/helm/kafscale/templates/operator-deployment.yaml @@ -81,6 +81,30 @@ spec: value: "{{ join "," .Values.operator.etcdEndpoints }}" - name: KAFSCALE_OPERATOR_LEADER_KEY value: "{{ .Values.operator.leaderKey }}" +{{- if .Values.operator.acl.enabled }} + - name: KAFSCALE_ACL_ENABLED + value: "true" +{{- end }} +{{- if .Values.operator.acl.configJson }} + - name: KAFSCALE_ACL_JSON + value: "{{ .Values.operator.acl.configJson }}" +{{- end }} +{{- if .Values.operator.acl.configFile }} + - name: KAFSCALE_ACL_FILE + value: "{{ .Values.operator.acl.configFile }}" +{{- end }} +{{- if .Values.operator.acl.failOpen }} + - name: KAFSCALE_ACL_FAIL_OPEN + value: "true" +{{- end }} +{{- if .Values.operator.auth.principalSource }} + - name: KAFSCALE_PRINCIPAL_SOURCE + value: "{{ .Values.operator.auth.principalSource }}" +{{- end }} +{{- if .Values.operator.auth.proxyProtocol }} + - name: KAFSCALE_PROXY_PROTOCOL + value: "true" +{{- end }} resources: {{- if .Values.operator.resources }} {{ toYaml .Values.operator.resources | indent 12 }} diff --git a/deploy/helm/kafscale/templates/proxy-service.yaml b/deploy/helm/kafscale/templates/proxy-service.yaml index ff1198d..e884889 100644 --- a/deploy/helm/kafscale/templates/proxy-service.yaml +++ b/deploy/helm/kafscale/templates/proxy-service.yaml @@ -21,8 +21,16 @@ metadata: labels: {{ include "kafscale.labels" . | indent 4 }} app.kubernetes.io/component: proxy + {{- with .Values.proxy.service.annotations }} + annotations: +{{ toYaml . | indent 4 }} + {{- end }} spec: type: {{ .Values.proxy.service.type }} + {{- if .Values.proxy.service.loadBalancerSourceRanges }} + loadBalancerSourceRanges: +{{ toYaml .Values.proxy.service.loadBalancerSourceRanges | indent 4 }} + {{- end }} selector: {{ include "kafscale.componentSelectorLabels" (dict "root" . "component" "proxy") | indent 4 }} ports: diff --git a/deploy/helm/kafscale/values.yaml b/deploy/helm/kafscale/values.yaml index 95efc7c..f671a0b 100644 --- a/deploy/helm/kafscale/values.yaml +++ b/deploy/helm/kafscale/values.yaml @@ -44,6 +44,14 @@ operator: etcdEndpoints: - "http://etcd:2379" leaderKey: "kafscale-operator" + acl: + enabled: false + configJson: "" + configFile: "" + failOpen: false + auth: + principalSource: "" + proxyProtocol: false podAnnotations: {} resources: {} nodeSelector: {} @@ -127,6 +135,8 @@ proxy: service: type: LoadBalancer port: 9092 + annotations: {} + loadBalancerSourceRanges: [] mcp: enabled: false diff --git a/docs/mcp.md b/docs/mcp.md index 69281a8..e6632b4 100644 --- a/docs/mcp.md +++ b/docs/mcp.md @@ -82,8 +82,8 @@ Mutation tools (future, gated by auth + RBAC): ## Security and Guardrails -Kafscale currently does not enforce auth on broker/admin APIs. For MCP, we -must ship secure-by-default to avoid "one prompt away from prod changes". +Kafscale v1.5 introduces basic broker ACLs, but MCP remains read-only for security reasons. +For MCP, we ship secure-by-default to avoid "one prompt away from prod changes". Requirements: diff --git a/docs/metrics.md b/docs/metrics.md index a7c483b..84dbb29 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -52,6 +52,7 @@ Broker metrics are emitted directly by the broker process. | `kafscale_admin_requests_total` | Counter | `api` | Count of admin API requests by API name. | | `kafscale_admin_request_errors_total` | Counter | `api` | Count of admin API errors by API name. | | `kafscale_admin_request_latency_ms_avg` | Gauge | `api` | Average admin API latency (ms). | +| `kafscale_authz_denied_total` | Counter | `action`, `resource` | Count of authorization denials by action/resource. | | `kafscale_produce_latency_ms` | Histogram | - | Produce request latency distribution (use p95 in PromQL). | | `kafscale_consumer_lag` | Histogram | - | Consumer lag distribution (use p95 in PromQL). | | `kafscale_consumer_lag_max` | Gauge | - | Maximum observed consumer lag. | diff --git a/docs/operations.md b/docs/operations.md index b0f7b9f..13c607a 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -72,10 +72,77 @@ helm upgrade --install kafscale deploy/helm/kafscale \ - **TLS** – Terminate TLS at your ingress or service mesh; broker/console TLS env flags are not wired in v1. - **Admin APIs** – Create/Delete Topics are enabled by default. Set `KAFSCALE_ALLOW_ADMIN_APIS=false` on broker pods to disable them, and gate external access via mTLS, ingress auth, or network policies. - **Network policies** – If your cluster enforces policies, allow the operator + brokers to reach etcd and S3 endpoints and lock everything else down. +- **ACLs (v1.5)** – Optional, basic ACL enforcement is available at the broker. Identity comes from Kafka `client.id` until SASL is introduced. Configure via `KAFSCALE_ACL_ENABLED` plus either `KAFSCALE_ACL_JSON` or `KAFSCALE_ACL_FILE`. Use `KAFSCALE_ACL_FAIL_OPEN=true` to allow traffic when the ACL config is missing/invalid (default is fail-closed). + - **Principal source** – Set `KAFSCALE_PRINCIPAL_SOURCE` to `client_id` (default), `remote_addr`, or `proxy_addr`. Use `proxy_addr` with `KAFSCALE_PROXY_PROTOCOL=true` to derive principals from a trusted TCP proxy (PROXY protocol v1/v2). + - **Auth denials** – Broker logs emit a rate-limited `authorization denied` entry with principal/action/resource context. + - **Trust boundary** – Only enable `proxy_addr`/PROXY protocol when brokers are reachable *only* through a trusted LB or sidecar that injects the header. Do not expose brokers directly, or clients can spoof identity. + - **Fail-closed** – When `KAFSCALE_PROXY_PROTOCOL=true`, brokers reject connections that do not include a valid PROXY header. + - **Header limits** – PROXY v1 headers are capped at 256 bytes; oversized headers are rejected. + - **Health checks** – PROXY v2 `LOCAL` connections are accepted (no identity); ensure LB health checks don’t rely on ACL-protected operations. + +Example ACL values (Helm operator): +```yaml +operator: + acl: + enabled: true + configJson: | + {"default_policy":"deny","principals":[ + {"name":"analytics","allow":[{"action":"fetch","resource":"topic","name":"orders-*"}]} + ]} + auth: + principalSource: "proxy_addr" + proxyProtocol: true +``` - **Health / metrics** – Prometheus can scrape `/metrics` on the brokers and operator for early detection of S3 pressure or degraded nodes. The operator exposes metrics on port `8080` and the Helm chart can create a metrics Service, ServiceMonitor, and PrometheusRule. - **Startup gating** – Broker pods exit immediately if they cannot read metadata or write a probe object to S3 during startup, so Kubernetes restarts them rather than leaving a stuck listener in place. - **Leader IDs** – Each broker advertises a numeric `NodeID` in etcd. In the single-node demo you’ll always see `Leader=0` in the Console’s topic detail because the only broker has ID `0`. In real clusters those IDs align with the broker addresses the operator published; if you see `Leader=3`, look for the broker with `NodeID 3` in the metadata payload. +### Proxy TLS via LoadBalancer (Recommended) + +The proxy is the external Kafka entrypoint. For TLS in v1.5, terminate TLS at +the cloud LoadBalancer by supplying Service annotations in the Helm values. +This keeps broker traffic plaintext inside the cluster while enabling HTTPS/TLS +for clients. + +Example (provider-specific annotations omitted here): + +```yaml +proxy: + enabled: true + service: + type: LoadBalancer + port: 9092 + annotations: + # Add your cloud provider TLS annotations here (ACM / GCP / Azure, etc.) + loadBalancerSourceRanges: + - 203.0.113.0/24 +``` + +If you need in-cluster ACME/Let’s Encrypt support, document it as an optional +stack (Traefik or another TCP-capable gateway + cert-manager). Keep it off by +default to avoid extra operational dependencies. + +#### Provider Examples (Verify with your cloud) + +These snippets show common patterns; annotation keys vary by provider and +feature. Always validate against your cloud provider docs. + +AWS NLB TLS (example): +```yaml +proxy: + service: + annotations: + service.beta.kubernetes.io/aws-load-balancer-type: "nlb" + service.beta.kubernetes.io/aws-load-balancer-ssl-cert: "arn:aws:acm:REGION:ACCOUNT:certificate/ID" + service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "9092" + service.beta.kubernetes.io/aws-load-balancer-backend-protocol: "tcp" +``` + +GCP / Azure: +- L4 LoadBalancers typically do TCP pass-through; TLS termination often requires + a provider gateway/ingress (or a TCP-capable ingress controller). +- If you terminate TLS outside the Service, keep the proxy Service as plain TCP. + ## Ops API Examples Kafscale exposes Kafka admin APIs for operator workflows (consumer group visibility, diff --git a/docs/protocol.md b/docs/protocol.md index 6bb91f6..fc56e6f 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -77,4 +77,7 @@ Kafscale implements a focused subset of the Kafka protocol. Versions below refle | v2.0 | mTLS | Certificate-based auth | | v2.0 | SASL/OAUTHBEARER | Enterprise SSO | -Until auth lands, Kafscale responds to SASL handshake attempts with `UNSUPPORTED_SASL_MECHANISM` (error code 33). +v1.5 focuses on basic broker ACLs with identity derived from `client.id`; terminate TLS at the proxy/LB. + +Until SASL lands, Kafscale rejects SASL handshake/auth requests as unsupported +API keys and closes the connection (no SASL-specific error payload is returned). diff --git a/docs/roadmap.md b/docs/roadmap.md index cf05531..e2abcf3 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -35,14 +35,4 @@ This roadmap tracks completed work and open gaps. It is intentionally high level - Admin ops API e2e coverage - Security review (TLS/auth) - End-to-end tests multi-segment restart durability - -## Open - -### Release Planning - -- v1.5: authentication groundwork and security hardening -- v2.0: SASL support and ACL authorization - -### Testing and Hardening - -- Performance benchmarks +- Authentication / ACL and security hardening \ No newline at end of file diff --git a/docs/security.md b/docs/security.md index a509d91..aca60b0 100644 --- a/docs/security.md +++ b/docs/security.md @@ -26,8 +26,9 @@ and the boundaries of what is and is not supported in v1. - **Authentication**: none at the Kafka protocol layer. Brokers accept any client connection. The console UI supports basic auth via `KAFSCALE_UI_USERNAME` / `KAFSCALE_UI_PASSWORD`. -- **Authorization**: none. All broker APIs are unauthenticated and authorized - implicitly. This includes admin APIs such as CreatePartitions and DeleteGroups. +- **Authorization**: optional in v1.5. When ACLs are enabled, broker APIs are + authorized by the configured rules; when disabled, all broker APIs are + implicitly allowed (including admin APIs like CreatePartitions/DeleteGroups). - **Transport Security**: TLS termination is expected at the ingress or mesh layer in v1; brokers and the console speak plaintext by default. - **Secrets Handling**: S3 credentials are read from Kubernetes secrets and are @@ -45,12 +46,72 @@ and the boundaries of what is and is not supported in v1. - Use least-privilege IAM roles for S3 access and restrict etcd endpoints. - Treat the console as privileged; do not expose it publicly without auth. +## v1.5 Auth (Basic ACLs) + +Kafscale v1.5 introduces optional, basic ACL enforcement at the broker. TLS is +still expected to terminate at your load balancer / ingress or service mesh. + +### What Is Supported + +- **ACL enforcement** for topic, group, and admin operations. +- **Principal identity** derived from the Kafka `client.id` (ClientID) until + SASL auth is introduced. +- **Allow/Deny rules** with wildcard topic/group names (prefix `*`). +- **Proxy protocol identity** when deployed behind a trusted LB/sidecar. + +### Enabling ACLs + +Set the following environment variables on broker pods: + +```bash +KAFSCALE_ACL_ENABLED=true +KAFSCALE_ACL_JSON='{ + "default_policy": "deny", + "principals": [ + { + "name": "analytics-service", + "allow": [ + {"action": "fetch", "resource": "topic", "name": "orders-*"}, + {"action": "group_read", "resource": "group", "name": "analytics-*"} + ] + }, + { + "name": "ops-admin", + "allow": [ + {"action": "admin", "resource": "cluster", "name": "*"} + ] + } + ] +}' +``` + +You can also supply `KAFSCALE_ACL_FILE=/path/to/acl.json` instead of inline JSON. +Set `KAFSCALE_ACL_FAIL_OPEN=true` to allow traffic if the ACL config is missing +or invalid. Default is fail-closed (deny). + +### Actions and Resources + +- **Actions**: `produce`, `fetch`, `group_read`, `group_write`, `group_admin`, `admin` +- **Resources**: `topic`, `group`, `cluster` + +### Client Configuration + +Set `client.id` in your Kafka clients to the principal name used in ACLs. +Until SASL is implemented, this is the default identity Kafscale uses for ACL +checks. You can also derive principals from network identity when the proxy +protocol is enabled (see Operations docs for `KAFSCALE_PRINCIPAL_SOURCE`). Only +enable proxy-derived identity when brokers are reachable solely through a +trusted proxy/LB. PROXY v1 headers are capped at 256 bytes; oversized headers +are rejected. + ## Known Gaps - No SASL or mTLS authentication for Kafka protocol clients. -- No ACLs or RBAC at the broker layer. +- ACLs are optional and rely on `client.id` or network-derived identity; no + strong client auth yet. - No multi-tenant isolation. -- Admin APIs are writable without auth; UI is read-only by policy, not enforcement. +- Admin APIs are writable without auth if ACLs are disabled; UI is read-only by + policy, not enforcement. ## Roadmap diff --git a/internal/console/auth.go b/internal/console/auth.go index 7cc57f8..cd5fe56 100644 --- a/internal/console/auth.go +++ b/internal/console/auth.go @@ -20,6 +20,7 @@ import ( "crypto/subtle" "encoding/base64" "encoding/json" + "net" "net/http" "sync" "time" @@ -39,6 +40,7 @@ type authManager struct { ttl time.Duration mu sync.Mutex sessions map[string]time.Time + limiter *loginRateLimiter } type authConfigResponse struct { @@ -70,6 +72,7 @@ func newAuthManager(cfg AuthConfig) *authManager { password: cfg.Password, ttl: 12 * time.Hour, sessions: make(map[string]time.Time), + limiter: newLoginRateLimiter(20, time.Minute), } } @@ -109,6 +112,13 @@ func (a *authManager) handleLogin(w http.ResponseWriter, r *http.Request) { }) return } + if a.limiter != nil { + ip := remoteIP(r) + if !a.limiter.Allow(ip) { + http.Error(w, "rate limit exceeded", http.StatusTooManyRequests) + return + } + } var payload loginRequest if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { http.Error(w, "invalid payload", http.StatusBadRequest) @@ -219,3 +229,58 @@ func generateToken(size int) (string, error) { } return base64.RawURLEncoding.EncodeToString(raw), nil } + +type loginRateLimiter struct { + limit int + window time.Duration + mu sync.Mutex + hits map[string][]time.Time +} + +func newLoginRateLimiter(limit int, window time.Duration) *loginRateLimiter { + if limit <= 0 || window <= 0 { + return nil + } + return &loginRateLimiter{ + limit: limit, + window: window, + hits: make(map[string][]time.Time), + } +} + +func (l *loginRateLimiter) Allow(key string) bool { + if l == nil { + return true + } + now := time.Now() + cutoff := now.Add(-l.window) + l.mu.Lock() + defer l.mu.Unlock() + hits := l.hits[key] + idx := 0 + for _, ts := range hits { + if ts.After(cutoff) { + hits[idx] = ts + idx++ + } + } + hits = hits[:idx] + if len(hits) >= l.limit { + l.hits[key] = hits + return false + } + hits = append(hits, now) + l.hits[key] = hits + return true +} + +func remoteIP(r *http.Request) string { + if r == nil { + return "unknown" + } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil || host == "" { + return r.RemoteAddr + } + return host +} diff --git a/pkg/acl/acl.go b/pkg/acl/acl.go new file mode 100644 index 0000000..c86b26c --- /dev/null +++ b/pkg/acl/acl.go @@ -0,0 +1,148 @@ +// Copyright 2025, 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package acl + +import "strings" + +type Action string + +type Resource string + +const ( + ActionAny Action = "*" + ActionProduce Action = "produce" + ActionFetch Action = "fetch" + ActionGroupRead Action = "group_read" + ActionGroupWrite Action = "group_write" + ActionGroupAdmin Action = "group_admin" + ActionAdmin Action = "admin" +) + +const ( + ResourceAny Resource = "*" + ResourceTopic Resource = "topic" + ResourceGroup Resource = "group" + ResourceCluster Resource = "cluster" +) + +type Rule struct { + Action Action `json:"action"` + Resource Resource `json:"resource"` + Name string `json:"name"` +} + +type PrincipalRules struct { + Name string `json:"name"` + Allow []Rule `json:"allow"` + Deny []Rule `json:"deny"` +} + +type Config struct { + Enabled bool `json:"enabled"` + DefaultPolicy string `json:"default_policy"` + Principals []PrincipalRules `json:"principals"` +} + +type Authorizer struct { + enabled bool + defaultAllow bool + principals map[string]PrincipalRules +} + +func NewAuthorizer(cfg Config) *Authorizer { + defaultAllow := strings.EqualFold(strings.TrimSpace(cfg.DefaultPolicy), "allow") + principals := make(map[string]PrincipalRules, len(cfg.Principals)) + for _, p := range cfg.Principals { + name := strings.TrimSpace(p.Name) + if name == "" { + continue + } + principals[name] = p + } + return &Authorizer{ + enabled: cfg.Enabled, + defaultAllow: defaultAllow, + principals: principals, + } +} + +func (a *Authorizer) Enabled() bool { + if a == nil { + return false + } + return a.enabled +} + +func (a *Authorizer) Allows(principal string, action Action, resource Resource, name string) bool { + if a == nil || !a.enabled { + return true + } + principal = strings.TrimSpace(principal) + if principal == "" { + principal = "anonymous" + } + rules, ok := a.principals[principal] + if !ok { + return a.defaultAllow + } + for _, rule := range rules.Deny { + if matches(rule, action, resource, name) { + return false + } + } + for _, rule := range rules.Allow { + if matches(rule, action, resource, name) { + return true + } + } + return a.defaultAllow +} + +func matches(rule Rule, action Action, resource Resource, name string) bool { + if !actionMatches(rule.Action, action) { + return false + } + if !resourceMatches(rule.Resource, resource) { + return false + } + return nameMatches(rule.Name, name) +} + +func actionMatches(rule Action, action Action) bool { + if rule == "" || rule == ActionAny { + return true + } + return strings.EqualFold(string(rule), string(action)) +} + +func resourceMatches(rule Resource, resource Resource) bool { + if rule == "" || rule == ResourceAny { + return true + } + return strings.EqualFold(string(rule), string(resource)) +} + +func nameMatches(ruleName, name string) bool { + ruleName = strings.TrimSpace(ruleName) + if ruleName == "" || ruleName == "*" { + return true + } + if strings.HasSuffix(ruleName, "*") { + prefix := strings.TrimSuffix(ruleName, "*") + return strings.HasPrefix(name, prefix) + } + return ruleName == name +} diff --git a/pkg/acl/acl_test.go b/pkg/acl/acl_test.go new file mode 100644 index 0000000..70693c0 --- /dev/null +++ b/pkg/acl/acl_test.go @@ -0,0 +1,90 @@ +// Copyright 2025, 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package acl + +import "testing" + +func TestAuthorizerDefaultAllow(t *testing.T) { + auth := NewAuthorizer(Config{ + Enabled: true, + DefaultPolicy: "allow", + }) + if !auth.Allows("unknown", ActionFetch, ResourceTopic, "orders") { + t.Fatalf("expected default allow") + } +} + +func TestAuthorizerDefaultDeny(t *testing.T) { + auth := NewAuthorizer(Config{ + Enabled: true, + DefaultPolicy: "deny", + }) + if auth.Allows("unknown", ActionFetch, ResourceTopic, "orders") { + t.Fatalf("expected default deny") + } +} + +func TestAuthorizerAllowOverridesDefaultDeny(t *testing.T) { + auth := NewAuthorizer(Config{ + Enabled: true, + DefaultPolicy: "deny", + Principals: []PrincipalRules{ + { + Name: "client-a", + Allow: []Rule{{Action: ActionFetch, Resource: ResourceTopic, Name: "orders"}}, + }, + }, + }) + if !auth.Allows("client-a", ActionFetch, ResourceTopic, "orders") { + t.Fatalf("expected allow rule to permit access") + } + if auth.Allows("client-a", ActionFetch, ResourceTopic, "payments") { + t.Fatalf("expected deny for unmatched topic") + } +} + +func TestAuthorizerDenyOverridesAllow(t *testing.T) { + auth := NewAuthorizer(Config{ + Enabled: true, + DefaultPolicy: "allow", + Principals: []PrincipalRules{ + { + Name: "client-a", + Allow: []Rule{{Action: ActionFetch, Resource: ResourceTopic, Name: "orders"}}, + Deny: []Rule{{Action: ActionFetch, Resource: ResourceTopic, Name: "orders"}}, + }, + }, + }) + if auth.Allows("client-a", ActionFetch, ResourceTopic, "orders") { + t.Fatalf("expected deny to override allow") + } +} + +func TestAuthorizerWildcardName(t *testing.T) { + auth := NewAuthorizer(Config{ + Enabled: true, + DefaultPolicy: "deny", + Principals: []PrincipalRules{ + { + Name: "client-a", + Allow: []Rule{{Action: ActionProduce, Resource: ResourceTopic, Name: "orders-*"}}, + }, + }, + }) + if !auth.Allows("client-a", ActionProduce, ResourceTopic, "orders-2025") { + t.Fatalf("expected prefix wildcard match") + } +} diff --git a/pkg/broker/conn_context.go b/pkg/broker/conn_context.go new file mode 100644 index 0000000..2cbb808 --- /dev/null +++ b/pkg/broker/conn_context.go @@ -0,0 +1,48 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package broker + +import "context" + +// ConnContext carries connection-scoped identity data for auth decisions. +type ConnContext struct { + Principal string + RemoteAddr string + ProxyAddr string +} + +type connContextKey struct{} + +// ContextWithConnInfo attaches connection info to a context. +func ContextWithConnInfo(ctx context.Context, info *ConnContext) context.Context { + if info == nil { + return ctx + } + return context.WithValue(ctx, connContextKey{}, info) +} + +// ConnInfoFromContext returns connection info if present. +func ConnInfoFromContext(ctx context.Context) *ConnContext { + if ctx == nil { + return nil + } + if v := ctx.Value(connContextKey{}); v != nil { + if info, ok := v.(*ConnContext); ok { + return info + } + } + return nil +} diff --git a/pkg/broker/proxyproto.go b/pkg/broker/proxyproto.go new file mode 100644 index 0000000..b5b1780 --- /dev/null +++ b/pkg/broker/proxyproto.go @@ -0,0 +1,202 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package broker + +import ( + "bufio" + "bytes" + "encoding/binary" + "fmt" + "io" + "net" +) + +var proxyV2Signature = []byte{'\r', '\n', '\r', '\n', 0x00, '\r', '\n', 'Q', 'U', 'I', 'T', '\n'} + +// ProxyInfo captures parsed proxy protocol metadata. +type ProxyInfo struct { + SourceAddr string + DestAddr string + SourceIP string + DestIP string + SourcePort int + DestPort int + Local bool +} + +// ReadProxyProtocol consumes a PROXY protocol header (v1 or v2) if present. +// It returns a wrapped connection that preserves buffered bytes. +func ReadProxyProtocol(conn net.Conn) (net.Conn, *ProxyInfo, error) { + br := bufio.NewReader(conn) + info, err := parseProxyHeader(br) + return wrapConnWithReader(conn, br), info, err +} + +func parseProxyHeader(br *bufio.Reader) (*ProxyInfo, error) { + peek, err := br.Peek(5) + if err != nil { + if err == io.EOF { + return nil, nil + } + return nil, err + } + if bytes.Equal(peek, []byte("PROXY")) { + return parseProxyV1(br) + } + if bytes.HasPrefix(peek, []byte{'\r', '\n', '\r', '\n', 0x00}) { + sig, err := br.Peek(len(proxyV2Signature)) + if err != nil { + return nil, err + } + if bytes.Equal(sig, proxyV2Signature) { + return parseProxyV2(br) + } + } + return nil, nil +} + +func parseProxyV1(br *bufio.Reader) (*ProxyInfo, error) { + line, err := readProxyV1Line(br, 256) + if err != nil { + return nil, err + } + parts := bytes.Fields([]byte(line)) + if len(parts) < 6 { + return nil, fmt.Errorf("proxy v1 header malformed") + } + srcIP := string(parts[2]) + dstIP := string(parts[3]) + srcPort := string(parts[4]) + dstPort := string(parts[5]) + return &ProxyInfo{ + SourceAddr: net.JoinHostPort(srcIP, srcPort), + DestAddr: net.JoinHostPort(dstIP, dstPort), + SourceIP: srcIP, + DestIP: dstIP, + SourcePort: atoiOrZero(srcPort), + DestPort: atoiOrZero(dstPort), + }, nil +} + +func parseProxyV2(br *bufio.Reader) (*ProxyInfo, error) { + header := make([]byte, 16) + if _, err := io.ReadFull(br, header); err != nil { + return nil, err + } + if !bytes.Equal(header[:12], proxyV2Signature) { + return nil, fmt.Errorf("proxy v2 signature mismatch") + } + cmd := header[12] & 0x0f + length := int(binary.BigEndian.Uint16(header[14:16])) + payload := make([]byte, length) + if _, err := io.ReadFull(br, payload); err != nil { + return nil, err + } + if cmd == 0x0 { + return &ProxyInfo{Local: true}, nil + } + family := header[13] & 0x0f + switch family { + case 0x1: + return parseProxyV2Inet(payload) + case 0x2: + return parseProxyV2Inet6(payload) + default: + return nil, nil + } +} + +func parseProxyV2Inet(payload []byte) (*ProxyInfo, error) { + if len(payload) < 12 { + return nil, fmt.Errorf("proxy v2 inet payload too short") + } + srcIP := net.IP(payload[0:4]).String() + dstIP := net.IP(payload[4:8]).String() + srcPort := int(binary.BigEndian.Uint16(payload[8:10])) + dstPort := int(binary.BigEndian.Uint16(payload[10:12])) + return &ProxyInfo{ + SourceAddr: net.JoinHostPort(srcIP, fmt.Sprintf("%d", srcPort)), + DestAddr: net.JoinHostPort(dstIP, fmt.Sprintf("%d", dstPort)), + SourceIP: srcIP, + DestIP: dstIP, + SourcePort: srcPort, + DestPort: dstPort, + }, nil +} + +func parseProxyV2Inet6(payload []byte) (*ProxyInfo, error) { + if len(payload) < 36 { + return nil, fmt.Errorf("proxy v2 inet6 payload too short") + } + srcIP := net.IP(payload[0:16]).String() + dstIP := net.IP(payload[16:32]).String() + srcPort := int(binary.BigEndian.Uint16(payload[32:34])) + dstPort := int(binary.BigEndian.Uint16(payload[34:36])) + return &ProxyInfo{ + SourceAddr: net.JoinHostPort(srcIP, fmt.Sprintf("%d", srcPort)), + DestAddr: net.JoinHostPort(dstIP, fmt.Sprintf("%d", dstPort)), + SourceIP: srcIP, + DestIP: dstIP, + SourcePort: srcPort, + DestPort: dstPort, + }, nil +} + +type connWithReader struct { + net.Conn + reader *bufio.Reader +} + +func wrapConnWithReader(conn net.Conn, reader *bufio.Reader) net.Conn { + if reader == nil { + return conn + } + return &connWithReader{Conn: conn, reader: reader} +} + +func (c *connWithReader) Read(p []byte) (int, error) { + return c.reader.Read(p) +} + +func atoiOrZero(value string) int { + var out int + for i := 0; i < len(value); i++ { + ch := value[i] + if ch < '0' || ch > '9' { + return 0 + } + out = out*10 + int(ch-'0') + } + return out +} + +func readProxyV1Line(br *bufio.Reader, maxLen int) (string, error) { + if maxLen <= 0 { + maxLen = 256 + } + buf := make([]byte, 0, maxLen) + for len(buf) < maxLen { + b, err := br.ReadByte() + if err != nil { + return "", err + } + buf = append(buf, b) + if b == '\n' { + return string(buf), nil + } + } + return "", fmt.Errorf("proxy v1 header too long") +} diff --git a/pkg/broker/server.go b/pkg/broker/server.go index 5f14b20..23ec3e1 100644 --- a/pkg/broker/server.go +++ b/pkg/broker/server.go @@ -33,12 +33,16 @@ type Handler interface { // Server implements minimal Kafka TCP handling for milestone 1. type Server struct { - Addr string - Handler Handler - listener net.Listener - wg sync.WaitGroup + Addr string + Handler Handler + ConnContextFunc ConnContextFunc + listener net.Listener + wg sync.WaitGroup } +// ConnContextFunc can wrap a connection and attach connection-scoped context data. +type ConnContextFunc func(conn net.Conn) (net.Conn, *ConnContext, error) + // ListenAndServe starts accepting Kafka protocol connections. func (s *Server) ListenAndServe(ctx context.Context) error { if s.Handler == nil { @@ -95,6 +99,19 @@ func (s *Server) handleConnection(conn net.Conn) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() defer conn.Close() + if s.ConnContextFunc != nil { + wrapped, info, err := s.ConnContextFunc(conn) + if err != nil { + log.Printf("connection context setup failed: %v", err) + return + } + if wrapped != nil { + conn = wrapped + } + if info != nil { + ctx = ContextWithConnInfo(ctx, info) + } + } for { frame, err := protocol.ReadFrame(conn) if err != nil { diff --git a/pkg/operator/cluster_controller.go b/pkg/operator/cluster_controller.go index d3e996b..063123f 100644 --- a/pkg/operator/cluster_controller.go +++ b/pkg/operator/cluster_controller.go @@ -229,6 +229,24 @@ func (r *ClusterReconciler) brokerContainer(cluster *kafscalev1alpha1.KafscaleCl Value: cluster.Spec.Config.CacheSize, }) } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_ENABLED")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_ACL_ENABLED", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_JSON")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_ACL_JSON", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_FILE")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_ACL_FILE", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_ACL_FAIL_OPEN")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_ACL_FAIL_OPEN", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_PRINCIPAL_SOURCE")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_PRINCIPAL_SOURCE", Value: val}) + } + if val := strings.TrimSpace(os.Getenv("KAFSCALE_PROXY_PROTOCOL")); val != "" { + env = append(env, corev1.EnvVar{Name: "KAFSCALE_PROXY_PROTOCOL", Value: val}) + } var envFrom []corev1.EnvFromSource if cluster.Spec.S3.CredentialsSecretRef != "" { envFrom = append(envFrom, corev1.EnvFromSource{ diff --git a/pkg/protocol/errors.go b/pkg/protocol/errors.go index c2d2c8c..b0de86e 100644 --- a/pkg/protocol/errors.go +++ b/pkg/protocol/errors.go @@ -33,6 +33,8 @@ const ( INVALID_CONFIG int16 = 40 TOPIC_ALREADY_EXISTS int16 = 36 TOPIC_AUTHORIZATION_FAILED int16 = 29 + GROUP_AUTHORIZATION_FAILED int16 = 30 + CLUSTER_AUTHORIZATION_FAILED int16 = 31 INVALID_PARTITIONS int16 = 37 UNSUPPORTED_VERSION int16 = 35 ) diff --git a/test/e2e/acl_test.go b/test/e2e/acl_test.go new file mode 100644 index 0000000..97b4794 --- /dev/null +++ b/test/e2e/acl_test.go @@ -0,0 +1,141 @@ +// Copyright 2025, 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build e2e + +package e2e + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" +) + +func TestACLsE2E(t *testing.T) { + const enableEnv = "KAFSCALE_E2E" + if os.Getenv(enableEnv) != "1" { + t.Skipf("set %s=1 to run integration harness", enableEnv) + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + etcd, endpoints := startEmbeddedEtcd(t) + defer etcd.Close() + + brokerAddr := freeAddr(t) + metricsAddr := freeAddr(t) + controlAddr := freeAddr(t) + + aclJSON := `{"default_policy":"deny","principals":[` + + `{"name":"allowed","allow":[` + + `{"action":"produce","resource":"topic","name":"orders-*" },` + + `{"action":"fetch","resource":"topic","name":"orders-*" },` + + `{"action":"group_read","resource":"group","name":"group-allowed"},` + + `{"action":"group_write","resource":"group","name":"group-allowed"}` + + `]},` + + `{"name":"denied","allow":[]}` + + `]}` + + brokerCmd, brokerLogs := startBrokerWithEtcdACL(t, ctx, brokerAddr, metricsAddr, controlAddr, endpoints, aclJSON) + t.Cleanup(func() { stopBroker(t, brokerCmd) }) + waitForBroker(t, brokerLogs, brokerAddr) + + allowedClient, err := kgo.NewClient( + kgo.SeedBrokers(brokerAddr), + kgo.AllowAutoTopicCreation(), + kgo.DisableIdempotentWrite(), + kgo.ClientID("allowed"), + ) + if err != nil { + t.Fatalf("create allowed client: %v\nbroker logs:\n%s", err, brokerLogs.String()) + } + defer allowedClient.Close() + + deniedClient, err := kgo.NewClient( + kgo.SeedBrokers(brokerAddr), + kgo.AllowAutoTopicCreation(), + kgo.DisableIdempotentWrite(), + kgo.ClientID("denied"), + ) + if err != nil { + t.Fatalf("create denied client: %v\nbroker logs:\n%s", err, brokerLogs.String()) + } + defer deniedClient.Close() + + topic := fmt.Sprintf("orders-%d", time.Now().UnixNano()) + produceErr := allowedClient.ProduceSync(ctx, &kgo.Record{Topic: topic, Value: []byte("ok")}).FirstErr() + if produceErr != nil { + t.Fatalf("allowed produce failed: %v\nbroker logs:\n%s", produceErr, brokerLogs.String()) + } + + denyErr := deniedClient.ProduceSync(ctx, &kgo.Record{Topic: topic, Value: []byte("deny")}).FirstErr() + if denyErr == nil { + t.Fatalf("expected denied produce error") + } + var kerrVal *kerr.Error + if !errors.As(denyErr, &kerrVal) || kerrVal.Code != kerr.TopicAuthorizationFailed.Code { + t.Fatalf("expected topic authorization failed, got: %v", denyErr) + } + + listReq := kmsg.NewPtrListGroupsRequest() + listReq.Version = 5 + listReq.StatesFilter = []string{"Stable"} + listReq.TypesFilter = []string{"classic"} + listResp, err := listReq.RequestWith(ctx, deniedClient) + if err != nil { + t.Fatalf("list groups request failed: %v\nbroker logs:\n%s", err, brokerLogs.String()) + } + if listResp.ErrorCode != kerr.GroupAuthorizationFailed.Code { + t.Fatalf("expected group authorization failed, got %d", listResp.ErrorCode) + } +} + +func startBrokerWithEtcdACL(t *testing.T, ctx context.Context, brokerAddr, metricsAddr, controlAddr string, endpoints []string, aclJSON string) (*exec.Cmd, *bytes.Buffer) { + t.Helper() + brokerCmd := exec.CommandContext(ctx, "go", "run", filepath.Join(repoRoot(t), "cmd", "broker")) + configureProcessGroup(brokerCmd) + brokerCmd.Env = append(os.Environ(), + "KAFSCALE_AUTO_CREATE_TOPICS=true", + "KAFSCALE_AUTO_CREATE_PARTITIONS=1", + "KAFSCALE_USE_MEMORY_S3=1", + "KAFSCALE_ACL_ENABLED=true", + fmt.Sprintf("KAFSCALE_ACL_JSON=%s", aclJSON), + fmt.Sprintf("KAFSCALE_BROKER_ADDR=%s", brokerAddr), + fmt.Sprintf("KAFSCALE_METRICS_ADDR=%s", metricsAddr), + fmt.Sprintf("KAFSCALE_CONTROL_ADDR=%s", controlAddr), + fmt.Sprintf("KAFSCALE_ETCD_ENDPOINTS=%s", strings.Join(endpoints, ",")), + ) + var brokerLogs bytes.Buffer + logWriter := io.MultiWriter(&brokerLogs, mustLogFile(t, "broker-acl.log")) + brokerCmd.Stdout = logWriter + brokerCmd.Stderr = logWriter + if err := brokerCmd.Start(); err != nil { + t.Fatalf("start broker: %v", err) + } + return brokerCmd, &brokerLogs +}