From d06ecfa9ad9a566e6322c4627100803b02bc28e4 Mon Sep 17 00:00:00 2001 From: Christiano Haesbaert Date: Thu, 12 Feb 2026 12:25:59 +0100 Subject: [PATCH] WIP: Refactor kube talker Idea is to be able to run kubetalker as goroutine, so we can have kubernetes enrichment when running quark in Go. --- go/quark-kube-talker/quark-kube-talker.go | 279 +------------------ go/quark/kubetalker/kubetalker.go | 320 ++++++++++++++++++++++ go/quark/quark.go | 52 ++++ 3 files changed, 386 insertions(+), 265 deletions(-) create mode 100644 go/quark/kubetalker/kubetalker.go diff --git a/go/quark-kube-talker/quark-kube-talker.go b/go/quark-kube-talker/quark-kube-talker.go index 2f3384e3..f4cdf1e6 100644 --- a/go/quark-kube-talker/quark-kube-talker.go +++ b/go/quark-kube-talker/quark-kube-talker.go @@ -1,126 +1,22 @@ package main import ( - "bytes" - "context" - "encoding/binary" - "encoding/json" "fmt" - "io" - "net/http" - "net/url" "os" - "sync" - "time" + "os/signal" getopt "github.com/pborman/getopt/v2" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/clientcmd" -) -var ( - outputMutex sync.Mutex - addMsgLen bool + kubetalker "quark/quark/kubetalker" ) -func fatal(v any) { - fmt.Fprintf(os.Stderr, "quark-kube-talker: fatal: %v\n", v) - os.Exit(1) -} - -func fetchClusterVersion(ctx context.Context, clientset *kubernetes.Clientset) error { - version, err := clientset.Discovery().ServerVersion() - if err != nil { - return err - } - data := map[string]string{ - "kind": "ClusterVersion", - "version": version.String(), - } - forwardAny(data) - - return nil -} - -func fetchGCP(ctx context.Context) error { - uri := url.URL{ - Scheme: "http", - Host: "169.254.169.254", - Path: "/computeMetadata/v1", - RawQuery: "recursive=true&alt=json", - } - - req, err := http.NewRequestWithContext(ctx, "GET", uri.String(), nil) - if err != nil { - return err - } - req.Header.Add("Metadata-Flavor", "Google") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - if resp.StatusCode != 200 { - return fmt.Errorf("HTTP GET error: %s", resp.Status) - } - body, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - - // Add kind - var data map[string]interface{} - err = json.Unmarshal(body, &data) - if err != nil { - return err - } - data["kind"] = "GcpMeta" - forwardAny(data) - - return nil -} - -func fetchConfig(Kflag string) (*rest.Config, error) { - var configPath string - - // Try only ENV - if Kflag == "ENV" { - return rest.InClusterConfig() - } - - // Try ENV first, fallback to config path otherwise - if Kflag == "" { - config, err := rest.InClusterConfig() - if err == nil { - return config, nil - } - // config, err := clientcmd.BuildConfigFromFlags("", configPath) - // rest.InClusterConfig() - if configPath, err = os.UserHomeDir(); err != nil { - return nil, err - } - configPath += "/.kube/config" - - config, err = clientcmd.BuildConfigFromFlags("", configPath) - return config, err - } - - // Treat Kflag as configPath - configPath = Kflag - return clientcmd.BuildConfigFromFlags("", configPath) -} - func main() { var err error + var addMsgLen bool var Kflag string var helpFlag bool var nodeName string - var config *rest.Config + var handle *kubetalker.Handle getopt.Flag(&helpFlag, 'h', "print this help") getopt.Flag(&addMsgLen, 'm', "prefix messages with binary length") @@ -134,167 +30,20 @@ func main() { os.Exit(1) } - if nodeName == "" { - nodeName = os.Getenv("QUARK_NODE_NAME") - } - if nodeName == "" { - fatal("can't fetch kubernetes node name") - } - - config, err = fetchConfig(Kflag) - if err != nil { - fatal(err) - } - - clientset, err := kubernetes.NewForConfig(config) + handle, err = kubetalker.Start(addMsgLen, nodeName, Kflag, os.Stdout) if err != nil { - fatal(err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - go fetchClusterVersion(ctx, clientset) - go fetchGCP(ctx) - - gotNode := false - gotNodeChan := make(chan struct{}) - - nodeOptions := func(options *metav1.ListOptions) { - options.FieldSelector = "metadata.name=" + nodeName - } - nodeFactory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, - informers.WithTweakListOptions(nodeOptions)) - nodeInformer := nodeFactory.Core().V1().Nodes().Informer() - nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - node := obj.(*v1.Node) - forwardNode(node) - if !gotNode { - gotNode = true - gotNodeChan <- struct{}{} - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - newNode := newObj.(*v1.Node) - forwardNode(newNode) - }, - DeleteFunc: func(obj interface{}) { - node := obj.(*v1.Node) - forwardNode(node) - }, - }) - - podOptions := func(options *metav1.ListOptions) { - options.FieldSelector = "spec.nodeName=" + nodeName + fmt.Fprintf(os.Stderr, "quark-kube-talker: fatal: %v\n", err) + os.Exit(1) } - podFactory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, - informers.WithTweakListOptions(podOptions)) - podInformer := podFactory.Core().V1().Pods().Informer() - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod := obj.(*v1.Pod) - // We don't care about addition without containerStatuses - // No point in forwarding Pending, when it gets to Running it will have what we want - if len(pod.Status.ContainerStatuses) == 0 || - pod.Status.Phase == v1.PodPending { - return - } - forwardPod(pod) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldPod := oldObj.(*v1.Pod) - newPod := newObj.(*v1.Pod) - if oldPod.ResourceVersion == newPod.ResourceVersion { - // Periodic resync will send update events for the same object. - // We don't want to process these. - return - } - // We don't care about updates without containerStatuses - if len(newPod.Status.ContainerStatuses) == 0 || - newPod.Status.Phase == v1.PodPending { - return - } - - forwardPod(newPod) - }, - DeleteFunc: func(obj interface{}) { - if pod, ok := obj.(*v1.Pod); ok { - forwardPod(pod) - } else { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - fmt.Fprintf(os.Stderr, "Error decoding object when deleting pod, could not get object from tombstone: %#v\n", obj) - return - } - if pod, ok := tombstone.Obj.(*v1.Pod); ok { - forwardPod(pod) - } else { - fmt.Fprintf(os.Stderr, "Error decoding object when deleting pod, tombstone contained non-Pod object: %#v\n", tombstone.Obj) - } - } - }, - }) - - stopCh := make(chan struct{}) - defer close(stopCh) - - podFactory.Start(stopCh) - nodeFactory.Start(stopCh) - - // Wait for an Add(node) for up to 5 seconds go func() { - select { - case <-gotNodeChan: - case <-time.After(5 * time.Second): - fatal("didn't receive node") - } + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt) + <-sigChan + handle.Stop() }() - - <-stopCh -} - -func forwardNode(node *v1.Node) { - node.TypeMeta.Kind = "Node" - forwardAny(node) -} - -func forwardPod(pod *v1.Pod) { - pod.TypeMeta.Kind = "Pod" - forwardAny(pod) -} - -func forwardAny(obj interface{}) { - j, err := json.Marshal(obj) + err = handle.Wait() if err != nil { - fatal(err) - } - - if addMsgLen { - var buffer bytes.Buffer - - // Golang doesn't export a WriteV, so we have to stash it in a buffer :/ - err = binary.Write(&buffer, binary.NativeEndian, uint32(len(j))) - if err != nil { - fatal(err) - } - _, err = buffer.Write(j) - if err != nil { - fatal(err) - } - outputMutex.Lock() - _, err = os.Stdout.Write(buffer.Bytes()) - outputMutex.Unlock() - if err != nil { - fatal(err) - } - } else { - outputMutex.Lock() - _, err = os.Stdout.Write(j) - outputMutex.Unlock() - if err != nil { - fatal(err) - } + fmt.Fprintf(os.Stderr, "quark-kube-talker: fatal: %v\n", err) + os.Exit(1) } - // pretty, _ := json.MarshalIndent(obj, "", " ") - // fmt.Fprintf(os.Stderr, "%s\n", pretty) } diff --git a/go/quark/kubetalker/kubetalker.go b/go/quark/kubetalker/kubetalker.go new file mode 100644 index 00000000..bdf2f01a --- /dev/null +++ b/go/quark/kubetalker/kubetalker.go @@ -0,0 +1,320 @@ +package kubetalker + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" +) + +type Handle struct { + once sync.Once + stopCh chan struct{} + ctx context.Context + cancel context.CancelFunc + err error + outputMutex sync.Mutex + output *os.File + addMsgLen bool + podFactory informers.SharedInformerFactory + nodeFactory informers.SharedInformerFactory +} + +func (h *Handle) fetchClusterVersion(ctx context.Context, clientset *kubernetes.Clientset) error { + version, err := clientset.Discovery().ServerVersion() + if err != nil { + return err + } + data := map[string]string{ + "kind": "ClusterVersion", + "version": version.String(), + } + h.forwardAny(data) + + return nil +} + +func (h *Handle) fetchGCP(ctx context.Context) error { + uri := url.URL{ + Scheme: "http", + Host: "169.254.169.254", + Path: "/computeMetadata/v1", + RawQuery: "recursive=true&alt=json", + } + + req, err := http.NewRequestWithContext(ctx, "GET", uri.String(), nil) + if err != nil { + return err + } + req.Header.Add("Metadata-Flavor", "Google") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + if resp.StatusCode != 200 { + return fmt.Errorf("HTTP GET error: %s", resp.Status) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + // Add kind + var data map[string]interface{} + err = json.Unmarshal(body, &data) + if err != nil { + return err + } + data["kind"] = "GcpMeta" + h.forwardAny(data) + + return nil +} + +func fetchConfig(Kflag string) (*rest.Config, error) { + var configPath string + + // Try only ENV + if Kflag == "ENV" { + return rest.InClusterConfig() + } + + // Try ENV first, fallback to config path otherwise + if Kflag == "" { + config, err := rest.InClusterConfig() + if err == nil { + return config, nil + } + // config, err := clientcmd.BuildConfigFromFlags("", configPath) + // rest.InClusterConfig() + if configPath, err = os.UserHomeDir(); err != nil { + return nil, err + } + configPath += "/.kube/config" + + config, err = clientcmd.BuildConfigFromFlags("", configPath) + return config, err + } + + // Treat Kflag as configPath + configPath = Kflag + return clientcmd.BuildConfigFromFlags("", configPath) +} + +func (h *Handle) Stop() { + h.Fail(nil) + h.Wait() +} + +func (h *Handle) Fail(err error) { + h.once.Do(func() { + h.err = err + h.cancel() + close(h.stopCh) + }) +} + +func (h *Handle) Wait() error { + <-h.stopCh + <-h.ctx.Done() + // If we read h.stopCh, then Shutdown must terminate + h.nodeFactory.Shutdown() + h.podFactory.Shutdown() + + return h.err +} + +func Start(addMsgLen bool, nodeName string, Kflag string, output *os.File) (*Handle, error) { + var err error + var config *rest.Config + var h *Handle + + if nodeName == "" { + nodeName = os.Getenv("QUARK_NODE_NAME") + } + if nodeName == "" { + return nil, fmt.Errorf("can't fetch kubernetes node name") + } + + config, err = fetchConfig(Kflag) + if err != nil { + return nil, err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + h = &Handle{} + h.addMsgLen = addMsgLen + h.output = output + h.stopCh = make(chan struct{}) + h.ctx, h.cancel = context.WithTimeout(context.Background(), 5*time.Second) + + gotNode := false + gotNodeChan := make(chan struct{}) + + nodeOptions := func(options *metav1.ListOptions) { + options.FieldSelector = "metadata.name=" + nodeName + } + h.nodeFactory = informers.NewSharedInformerFactoryWithOptions(clientset, 0, + informers.WithTweakListOptions(nodeOptions)) + nodeInformer := h.nodeFactory.Core().V1().Nodes().Informer() + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*v1.Node) + h.forwardNode(node) + if !gotNode { + gotNode = true + gotNodeChan <- struct{}{} + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + newNode := newObj.(*v1.Node) + h.forwardNode(newNode) + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*v1.Node) + h.forwardNode(node) + }, + }) + + podOptions := func(options *metav1.ListOptions) { + options.FieldSelector = "spec.nodeName=" + nodeName + } + h.podFactory = informers.NewSharedInformerFactoryWithOptions(clientset, 0, + informers.WithTweakListOptions(podOptions)) + podInformer := h.podFactory.Core().V1().Pods().Informer() + podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + // We don't care about addition without containerStatuses + // No point in forwarding Pending, when it gets to Running it will have what we want + if len(pod.Status.ContainerStatuses) == 0 || + pod.Status.Phase == v1.PodPending { + return + } + h.forwardPod(pod) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldPod := oldObj.(*v1.Pod) + newPod := newObj.(*v1.Pod) + if oldPod.ResourceVersion == newPod.ResourceVersion { + // Periodic resync will send update events for the same object. + // We don't want to process these. + return + } + // We don't care about updates without containerStatuses + if len(newPod.Status.ContainerStatuses) == 0 || + newPod.Status.Phase == v1.PodPending { + return + } + + h.forwardPod(newPod) + }, + DeleteFunc: func(obj interface{}) { + if pod, ok := obj.(*v1.Pod); ok { + h.forwardPod(pod) + } else { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + fmt.Fprintf(os.Stderr, "Error decoding object when deleting pod, could not get object from tombstone: %#v\n", obj) + return + } + if pod, ok := tombstone.Obj.(*v1.Pod); ok { + h.forwardPod(pod) + } else { + fmt.Fprintf(os.Stderr, "Error decoding object when deleting pod, tombstone contained non-Pod object: %#v\n", tombstone.Obj) + } + } + }, + }) + + h.podFactory.Start(h.stopCh) + h.nodeFactory.Start(h.stopCh) + + go h.fetchClusterVersion(h.ctx, clientset) + go h.fetchGCP(h.ctx) + + // Wait for an Add(node) for up to 5 seconds + go func() { + tmo := time.NewTimer(5 * time.Second) + defer tmo.Stop() + + select { + case <-gotNodeChan: + case <-tmo.C: + h.Fail(fmt.Errorf("didn't receive node")) + } + }() + + return h, nil +} + +func (h *Handle) forwardNode(node *v1.Node) { + node.TypeMeta.Kind = "Node" + h.forwardAny(node) +} + +func (h *Handle) forwardPod(pod *v1.Pod) { + pod.TypeMeta.Kind = "Pod" + h.forwardAny(pod) +} + +func (h *Handle) forwardAny(obj interface{}) { + j, err := json.Marshal(obj) + if err != nil { + h.Fail(err) + return + } + + if h.addMsgLen { + var buffer bytes.Buffer + + // Golang doesn't export a WriteV, so we have to stash it in a buffer :/ + err = binary.Write(&buffer, binary.NativeEndian, uint32(len(j))) + if err != nil { + h.Fail(err) + return + } + _, err = buffer.Write(j) + if err != nil { + h.Fail(err) + return + } + h.outputMutex.Lock() + _, err = h.output.Write(buffer.Bytes()) + h.outputMutex.Unlock() + if err != nil { + h.Fail(err) + return + } + } else { + h.outputMutex.Lock() + _, err = h.output.Write(j) + h.outputMutex.Unlock() + if err != nil { + h.Fail(err) + return + } + } + // pretty, _ := json.MarshalIndent(obj, "", " ") + // fmt.Fprintf(os.Stderr, "%s\n", pretty) +} diff --git a/go/quark/quark.go b/go/quark/quark.go index bc3ef731..0d88918b 100644 --- a/go/quark/quark.go +++ b/go/quark/quark.go @@ -53,9 +53,12 @@ import ( "errors" "fmt" "net/netip" + "os" "strings" "syscall" "unsafe" + + kubetalker "quark/quark/kubetalker" // "encoding/binary" ) @@ -199,6 +202,9 @@ type Event struct { type Queue struct { quarkQueue *C.struct_quark_queue // pointer to the queue structure epollFd int + kubeIn *os.File + kubeOut *os.File + handle *kubetalker.Handle } const ( @@ -268,6 +274,7 @@ type QueueAttr struct { MaxLength int CacheGraceTime int HoldTime int + Kubernetes bool } // Documented in https://elastic.github.io/quark/quark_queue_get_stats.3.html. @@ -328,9 +335,37 @@ func OpenQueue(attr QueueAttr) (*Queue, error) { cattr.max_length = C.int(attr.MaxLength) cattr.cache_grace_time = C.int(attr.CacheGraceTime) cattr.hold_time = C.int(attr.HoldTime) + + if attr.Kubernetes { + queue.kubeIn, queue.kubeOut, err = os.Pipe() + if err != nil { + C.free(unsafe.Pointer(queue.quarkQueue)) + return nil, err + } + queue.handle, err = kubetalker.Start(true, "minikube", "/home/haesbaert/.kube/config", queue.kubeOut) + if err != nil { + queue.kubeIn.Close() + queue.kubeOut.Close() + C.free(unsafe.Pointer(queue.quarkQueue)) + return nil, err + } + + cattr.kubefd = C.int(queue.kubeIn.Fd()) + queue.kubeOut = queue.kubeOut + } + ok, err := C.quark_queue_open(queue.quarkQueue, &cattr) if ok == -1 { C.free(unsafe.Pointer(queue.quarkQueue)) + if queue.handle != nil { + queue.handle.Stop() + } + if queue.kubeIn != nil { + queue.kubeIn.Close() + } + if queue.kubeOut != nil { + queue.kubeOut.Close() + } return nil, wrapErrno(err) } @@ -341,9 +376,26 @@ func OpenQueue(attr QueueAttr) (*Queue, error) { // Close closes the queue. func (queue *Queue) Close() { + // quark_queue_close() never closes input kube fd (kubeIn) C.quark_queue_close(queue.quarkQueue) C.free(unsafe.Pointer(queue.quarkQueue)) queue.quarkQueue = nil + // Close kubeIn, which would cause any blocking writes on + // kubeOut to error out + if queue.kubeIn != nil { + queue.kubeIn.Close() + queue.kubeIn = nil + } + // Stop the writers, so we can safely close kubeOut + if queue.handle != nil { + queue.handle.Stop() + queue.handle = nil + } + // Finally close kubeOut + if queue.kubeOut != nil { + queue.kubeOut.Close() + queue.kubeOut = nil + } } func (queue *Queue) GetEvent() (Event, bool) {