From 0359dc7ffc6caae538c9abbf8757ff4fb1dc2747 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E7=84=B6?= Date: Tue, 10 Mar 2026 17:57:28 +0800 Subject: [PATCH] feat(egress): add blocked hostname webhook fanout --- components/egress/README.md | 3 + components/egress/main.go | 9 ++ .../egress/pkg/constants/configuration.go | 1 + components/egress/pkg/dnsproxy/proxy.go | 20 +++ components/egress/pkg/events/broadcaster.go | 117 +++++++++++++++ components/egress/pkg/events/events_test.go | 139 ++++++++++++++++++ components/egress/pkg/events/webhook.go | 120 +++++++++++++++ components/egress/tests/smoke-dns.sh | 65 +++++++- 8 files changed, 471 insertions(+), 3 deletions(-) create mode 100644 components/egress/pkg/events/broadcaster.go create mode 100644 components/egress/pkg/events/events_test.go create mode 100644 components/egress/pkg/events/webhook.go diff --git a/components/egress/README.md b/components/egress/README.md index 193bfe88..760e9840 100644 --- a/components/egress/README.md +++ b/components/egress/README.md @@ -49,6 +49,9 @@ The egress control is implemented as a **Sidecar** that shares the network names - **127.0.0.1** — so packets redirected by iptables to the proxy (127.0.0.1:15353) are accepted by nft. - **Nameserver IPs** from `/etc/resolv.conf` — so client DNS and proxy upstream work (e.g. private DNS). Nameserver IPs are validated (unspecified and loopback are skipped) and capped. Use `OPENSANDBOX_EGRESS_MAX_NS` (default `3`; `0` = no cap, `1`–`10` = cap). See [SECURITY-RISKS.md](SECURITY-RISKS.md) for trust and scope of this whitelist. +- **Blocked hostname webhook** + - `OPENSANDBOX_EGRESS_DENY_WEBHOOK`: HTTP endpoint URL. When set, egress asynchronously POSTs JSON **only when a hostname is denied**: `{"hostname": "", "timestamp": "", "source": "opensandbox-egress"}`. Default timeout 5s, up to 3 retries with exponential backoff starting at 1s; 4xx is not retried, 5xx/network errors are retried. + - **Allow requirement**: you must allow the webhook host (or its IP/CIDR) in the policy; with default deny, if you don’t explicitly allow it, the webhook traffic will be blocked by egress itself. Example: `{"defaultAction":"deny","egress":[{"action":"allow","target":"webhook.example.com"}]}`. If a broader deny CIDR covers the resolved IP, it will still be blocked—adjust your policy accordingly. - DoH/DoT blocking: - DoT (tcp/udp 853) blocked by default. - Optional DoH over 443: `OPENSANDBOX_EGRESS_BLOCK_DOH_443=true`. If enabled without blocklist, all 443 is dropped. diff --git a/components/egress/main.go b/components/egress/main.go index 20b5ec3c..7966e94b 100644 --- a/components/egress/main.go +++ b/components/egress/main.go @@ -24,6 +24,7 @@ import ( "github.com/alibaba/opensandbox/egress/pkg/constants" "github.com/alibaba/opensandbox/egress/pkg/dnsproxy" + "github.com/alibaba/opensandbox/egress/pkg/events" "github.com/alibaba/opensandbox/egress/pkg/iptables" "github.com/alibaba/opensandbox/egress/pkg/log" slogger "github.com/alibaba/opensandbox/internal/logger" @@ -64,6 +65,14 @@ func main() { } log.Infof("dns proxy started on 127.0.0.1:15353") + if blockWebhookURL := strings.TrimSpace(os.Getenv(constants.EnvBlockedWebhook)); blockWebhookURL != "" { + blockedBroadcaster := events.NewBroadcaster(ctx, events.BroadcasterConfig{QueueSize: 256}) + blockedBroadcaster.AddSubscriber(events.NewWebhookSubscriber(blockWebhookURL)) + proxy.SetBlockedBroadcaster(blockedBroadcaster) + defer blockedBroadcaster.Close() + log.Infof("blocked hostname webhook enabled: %s", blockWebhookURL) + } + exemptDst := dnsproxy.ParseNameserverExemptList() if len(exemptDst) > 0 { log.Infof("nameserver exempt list: %v (proxy upstream in this list will not set SO_MARK)", exemptDst) diff --git a/components/egress/pkg/constants/configuration.go b/components/egress/pkg/constants/configuration.go index 2ed70efb..efb038b7 100644 --- a/components/egress/pkg/constants/configuration.go +++ b/components/egress/pkg/constants/configuration.go @@ -23,6 +23,7 @@ const ( EnvEgressRules = "OPENSANDBOX_EGRESS_RULES" EnvEgressLogLevel = "OPENSANDBOX_EGRESS_LOG_LEVEL" EnvMaxNameservers = "OPENSANDBOX_EGRESS_MAX_NS" + EnvBlockedWebhook = "OPENSANDBOX_EGRESS_DENY_WEBHOOK" // EnvNameserverExempt comma-separated IPs; proxy upstream to these is not marked and is allowed in nft allow set EnvNameserverExempt = "OPENSANDBOX_EGRESS_NAMESERVER_EXEMPT" diff --git a/components/egress/pkg/dnsproxy/proxy.go b/components/egress/pkg/dnsproxy/proxy.go index 4965ee1b..0100e0d5 100644 --- a/components/egress/pkg/dnsproxy/proxy.go +++ b/components/egress/pkg/dnsproxy/proxy.go @@ -25,6 +25,7 @@ import ( "github.com/miekg/dns" + "github.com/alibaba/opensandbox/egress/pkg/events" "github.com/alibaba/opensandbox/egress/pkg/log" "github.com/alibaba/opensandbox/egress/pkg/nftables" "github.com/alibaba/opensandbox/egress/pkg/policy" @@ -41,6 +42,9 @@ type Proxy struct { // optional; called in goroutine when A/AAAA are present onResolved func(domain string, ips []nftables.ResolvedIP) + + // optional broadcaster to notify blocked hostnames + blockedBroadcaster *events.Broadcaster } // New builds a proxy with resolved upstream; listenAddr can be empty for default. @@ -109,6 +113,7 @@ func (p *Proxy) serveDNS(w dns.ResponseWriter, r *dns.Msg) { currentPolicy := p.policy p.policyMu.RUnlock() if currentPolicy != nil && currentPolicy.Evaluate(domain) == policy.ActionDeny { + p.publishBlocked(domain) resp := new(dns.Msg) resp.SetRcode(r, dns.RcodeNameError) _ = w.WriteMsg(resp) @@ -179,6 +184,21 @@ func (p *Proxy) SetOnResolved(fn func(domain string, ips []nftables.ResolvedIP)) p.onResolved = fn } +// SetBlockedBroadcaster wires a broadcaster used to notify blocked hostnames. +func (p *Proxy) SetBlockedBroadcaster(b *events.Broadcaster) { + p.blockedBroadcaster = b +} + +func (p *Proxy) publishBlocked(domain string) { + if p.blockedBroadcaster == nil { + return + } + p.blockedBroadcaster.Publish(events.BlockedEvent{ + Hostname: domain, + Timestamp: time.Now().UTC(), + }) +} + // extractResolvedIPs parses A and AAAA records from resp.Answer into ResolvedIP slice. // // Uses netip.ParseAddr(v.A.String()) which allocates a temporary string per record; typically diff --git a/components/egress/pkg/events/broadcaster.go b/components/egress/pkg/events/broadcaster.go new file mode 100644 index 00000000..bb7bbfbb --- /dev/null +++ b/components/egress/pkg/events/broadcaster.go @@ -0,0 +1,117 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "sync" + "time" + + "github.com/alibaba/opensandbox/egress/pkg/log" +) + +const defaultQueueSize = 128 + +// BlockedEvent describes a blocked hostname notification. +type BlockedEvent struct { + Hostname string `json:"hostname"` + Timestamp time.Time `json:"timestamp"` +} + +// Subscriber consumes blocked events. +type Subscriber interface { + HandleBlocked(ctx context.Context, ev BlockedEvent) +} + +// BroadcasterConfig defines queue sizing for the broadcaster. +type BroadcasterConfig struct { + QueueSize int +} + +// Broadcaster fans out blocked events to one or more subscribers via channels. +type Broadcaster struct { + ctx context.Context + cancel context.CancelFunc + + mu sync.RWMutex + subscribers []chan BlockedEvent + queueSize int +} + +// NewBroadcaster builds a broadcaster with the given queue size (defaults to 128). +func NewBroadcaster(ctx context.Context, cfg BroadcasterConfig) *Broadcaster { + if cfg.QueueSize <= 0 { + cfg.QueueSize = defaultQueueSize + } + cctx, cancel := context.WithCancel(ctx) + return &Broadcaster{ + ctx: cctx, + cancel: cancel, + queueSize: cfg.QueueSize, + } +} + +// AddSubscriber registers a new subscriber with its own buffered queue and worker. +func (b *Broadcaster) AddSubscriber(sub Subscriber) { + if sub == nil { + return + } + ch := make(chan BlockedEvent, b.queueSize) + + b.mu.Lock() + b.subscribers = append(b.subscribers, ch) + b.mu.Unlock() + + go func() { + for { + select { + case <-b.ctx.Done(): + return + case ev, ok := <-ch: + if !ok { + return + } + sub.HandleBlocked(b.ctx, ev) + } + } + }() +} + +// Publish sends an event to all subscribers; drops and logs when a subscriber queue is full. +func (b *Broadcaster) Publish(event BlockedEvent) { + b.mu.RLock() + subs := append([]chan BlockedEvent(nil), b.subscribers...) + b.mu.RUnlock() + + for _, ch := range subs { + select { + case ch <- event: + default: + log.Warnf("[events] blocked-event queue full; dropping hostname %s", event.Hostname) + } + } +} + +// Close stops all workers and closes subscriber queues. +func (b *Broadcaster) Close() { + b.cancel() + + b.mu.Lock() + for _, ch := range b.subscribers { + close(ch) + } + b.subscribers = nil + b.mu.Unlock() +} diff --git a/components/egress/pkg/events/events_test.go b/components/egress/pkg/events/events_test.go new file mode 100644 index 00000000..f73fecc7 --- /dev/null +++ b/components/egress/pkg/events/events_test.go @@ -0,0 +1,139 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +type captureSubscriber struct { + recv chan BlockedEvent +} + +func (c *captureSubscriber) HandleBlocked(_ context.Context, ev BlockedEvent) { + c.recv <- ev +} + +type blockingSubscriber struct { + block chan struct{} +} + +func (b *blockingSubscriber) HandleBlocked(_ context.Context, ev BlockedEvent) { + // Block until the channel is closed to simulate a slow consumer and trigger backpressure. + <-b.block + _ = ev +} + +func TestBroadcasterFanout(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + b := NewBroadcaster(ctx, BroadcasterConfig{QueueSize: 2}) + + sub1 := &captureSubscriber{recv: make(chan BlockedEvent, 1)} + sub2 := &captureSubscriber{recv: make(chan BlockedEvent, 1)} + b.AddSubscriber(sub1) + b.AddSubscriber(sub2) + + ev := BlockedEvent{Hostname: "example.com.", Timestamp: time.Now()} + b.Publish(ev) + + select { + case got := <-sub1.recv: + if got.Hostname != ev.Hostname { + t.Fatalf("sub1 expected hostname %s, got %s", ev.Hostname, got.Hostname) + } + case <-time.After(2 * time.Second): + t.Fatal("sub1 did not receive event") + } + + select { + case got := <-sub2.recv: + if got.Hostname != ev.Hostname { + t.Fatalf("sub2 expected hostname %s, got %s", ev.Hostname, got.Hostname) + } + case <-time.After(2 * time.Second): + t.Fatal("sub2 did not receive event") + } + + b.Close() +} + +func TestBroadcasterDropsWhenSubscriberBackedUp(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Small queue; blocking subscriber will hold the first event. + b := NewBroadcaster(ctx, BroadcasterConfig{QueueSize: 1}) + block := make(chan struct{}) + sub := &blockingSubscriber{block: block} + b.AddSubscriber(sub) + + ev1 := BlockedEvent{Hostname: "first.example", Timestamp: time.Now()} + ev2 := BlockedEvent{Hostname: "second.example", Timestamp: time.Now()} + + b.Publish(ev1) + // This publish should drop because subscriber is blocked and queue size is 1. + b.Publish(ev2) + + // Allow subscriber to drain and exit. + close(block) + + b.Close() +} + +func TestWebhookSubscriberSendsPayload(t *testing.T) { + var ( + gotMethod string + gotPayload webhookPayload + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotMethod = r.Method + body, _ := io.ReadAll(r.Body) + _ = r.Body.Close() + _ = json.Unmarshal(body, &gotPayload) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + sub := NewWebhookSubscriber(server.URL) + if sub == nil { + t.Fatal("webhook subscriber should not be nil") + } + + ts := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC) + ev := BlockedEvent{Hostname: "Example.com.", Timestamp: ts} + sub.HandleBlocked(context.Background(), ev) + + if gotMethod != http.MethodPost { + t.Fatalf("expected POST, got %s", gotMethod) + } + if gotPayload.Hostname != ev.Hostname { + t.Fatalf("expected hostname %s, got %s", ev.Hostname, gotPayload.Hostname) + } + if gotPayload.Source != webhookSource { + t.Fatalf("expected source %s, got %s", webhookSource, gotPayload.Source) + } + if gotPayload.Timestamp == "" { + t.Fatalf("expected timestamp to be set") + } +} diff --git a/components/egress/pkg/events/webhook.go b/components/egress/pkg/events/webhook.go new file mode 100644 index 00000000..b8287327 --- /dev/null +++ b/components/egress/pkg/events/webhook.go @@ -0,0 +1,120 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/alibaba/opensandbox/egress/pkg/log" +) + +const ( + webhookSource = "opensandbox-egress" + defaultWebhookTimeout = 5 * time.Second + defaultWebhookRetries = 3 + defaultWebhookBackoff = 1 * time.Second +) + +// WebhookSubscriber delivers blocked events to an HTTP endpoint. +type WebhookSubscriber struct { + url string + client *http.Client + timeout time.Duration + maxRetries int + backoff time.Duration +} + +type webhookPayload struct { + Hostname string `json:"hostname"` + Timestamp string `json:"timestamp"` + Source string `json:"source"` +} + +// NewWebhookSubscriber builds a webhook subscriber with hardcoded timeout/retry settings. +func NewWebhookSubscriber(url string) *WebhookSubscriber { + if url == "" { + return nil + } + return &WebhookSubscriber{ + url: url, + client: &http.Client{}, + timeout: defaultWebhookTimeout, + maxRetries: defaultWebhookRetries, + backoff: defaultWebhookBackoff, + } +} + +// HandleBlocked sends the blocked event to the configured webhook with retries. +func (w *WebhookSubscriber) HandleBlocked(ctx context.Context, ev BlockedEvent) { + payload := webhookPayload{ + Hostname: ev.Hostname, + Timestamp: ev.Timestamp.UTC().Format(time.RFC3339), + Source: webhookSource, + } + body, err := json.Marshal(payload) + if err != nil { + log.Warnf("[webhook] failed to marshal payload for hostname %s: %v", ev.Hostname, err) + return + } + + var lastErr error + for attempt := 0; attempt <= w.maxRetries; attempt++ { + reqCtx := ctx + cancel := func() {} + if w.timeout > 0 { + reqCtx, cancel = context.WithTimeout(ctx, w.timeout) + } + + req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, w.url, bytes.NewReader(body)) + if err != nil { + cancel() + lastErr = err + break + } + req.Header.Set("Content-Type", "application/json") + + resp, err := w.client.Do(req) + if err == nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + if resp.StatusCode < 300 { + cancel() + return + } + if resp.StatusCode < 500 { + cancel() + log.Warnf("[webhook] non-retriable status %d for hostname %s", resp.StatusCode, payload.Hostname) + return + } + err = fmt.Errorf("status %d", resp.StatusCode) + } + + cancel() + lastErr = err + if attempt < w.maxRetries { + time.Sleep(w.backoff * time.Duration(1</dev/null 2>&1 || true + fi + rm -f "${WEBHOOK_LOG}" >/dev/null 2>&1 || true docker rm -f "${containerName}" >/dev/null 2>&1 || true } trap cleanup EXIT @@ -37,12 +45,51 @@ trap cleanup EXIT info "Building image ${IMG}" docker build -t "${IMG}" -f "${REPO_ROOT}/components/egress/Dockerfile" "${REPO_ROOT}" +info "Starting host webhook on ${WEBHOOK_PORT}" +python - <<'PY' & +import http.server, json, sys, time +from http.server import ThreadingHTTPServer + +LOG_PATH = sys.argv[1] +PORT = int(sys.argv[2]) + +class Handler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(200) + self.end_headers() + self.wfile.write(b'ok') + + def do_POST(self): + length = int(self.headers.get('content-length', 0)) + body = self.rfile.read(length) + with open(LOG_PATH, 'ab') as f: + f.write(body + b'\n') + self.send_response(200) + self.end_headers() + self.wfile.write(b'ok') + + def log_message(self, *args, **kwargs): + return + +ThreadingHTTPServer(('', PORT), Handler).serve_forever() +PY "${WEBHOOK_LOG}" "${WEBHOOK_PORT}" +WEBHOOK_PID=$! + +info "Waiting for webhook server..." +for i in {1..20}; do + if curl -sf "http://127.0.0.1:${WEBHOOK_PORT}" >/dev/null; then + break + fi + sleep 0.3 +done + info "Starting containerName" docker run -d --name "${containerName}" \ --cap-add=NET_ADMIN \ --sysctl net.ipv6.conf.all.disable_ipv6=1 \ --sysctl net.ipv6.conf.default.disable_ipv6=1 \ -e OPENSANDBOX_EGRESS_MODE=dns \ + -e OPENSANDBOX_EGRESS_DENY_WEBHOOK="${WEBHOOK_URL}" \ -p ${POLICY_PORT}:18080 \ "${IMG}" @@ -54,12 +101,12 @@ for i in {1..50}; do sleep 0.5 done -info "Pushing policy (allow by default; deny github.com & 10.0.0.0/8)" +info "Pushing policy (default deny; allow webhook host + *.github.com)" curl -sSf -XPOST "http://127.0.0.1:${POLICY_PORT}/policy" \ - -d '{"defaultAction":"deny","egress":[{"action":"allow","target":"*.github.com"}]}' + -d '{"defaultAction":"deny","egress":[{"action":"allow","target":"host.docker.internal"},{"action":"allow","target":"*.github.com"}]}' run_in_app() { - docker run --rm --network container:"${containerName}" curlimages/curl "$@" + docker run --rm --network container:"${containerName}" --dns 127.0.0.1 curlimages/curl "$@" } pass() { info "PASS: $*"; } @@ -76,4 +123,16 @@ info "Test: allowed domain should succeed (api.github.com)" run_in_app -I https://api.github.com --max-time 10 >/dev/null 2>&1 || fail "api.github.com should succeed" pass "api.github.com allowed" +info "Test: webhook should receive denied hostname (google.com)" +sleep 1 +if [[ -f "${WEBHOOK_LOG}" ]]; then + if grep -q "google.com" "${WEBHOOK_LOG}"; then + pass "webhook received blocked hostname" + else + fail "webhook log missing blocked hostname" + fi +else + fail "webhook log not found" +fi + info "All smoke tests passed." \ No newline at end of file