From e9bb17a68d453281dc9d408144208239b1f23072 Mon Sep 17 00:00:00 2001 From: fanyang Date: Mon, 9 Feb 2026 19:00:05 +0800 Subject: [PATCH] Pipeline vfs raw parsing and writes --- cmd_vfs_raw.go | 42 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/cmd_vfs_raw.go b/cmd_vfs_raw.go index 5a0bdb2..1d42a1b 100644 --- a/cmd_vfs_raw.go +++ b/cmd_vfs_raw.go @@ -300,9 +300,45 @@ var vfsRawCmd = &cli.Command{ r = f } - return jsonParseThenAppend(r, func(e *vfsEvent) error { - return appender.AppendRow(e.Timestamp, e.Probe, e.Tid, e.ReturnValue, - e.Path, e.Inode, e.Offset, e.Length) + events := make(chan *vfsEvent, 1024) + writeErrCh := make(chan error, 1) + done := make(chan struct{}) + + go func() { + defer close(done) + for e := range events { + if err := appender.AppendRow(e.Timestamp, e.Probe, e.Tid, e.ReturnValue, + e.Path, e.Inode, e.Offset, e.Length); err != nil { + writeErrCh <- err + return + } + } + }() + + parseErr := jsonParseThenAppend(r, func(e *vfsEvent) error { + select { + case events <- e: + return nil + case err := <-writeErrCh: + if err != nil { + return err + } + return errors.New("writer stopped") + } }) + + close(events) + <-done + + if parseErr != nil { + return parseErr + } + + select { + case err := <-writeErrCh: + return err + default: + return nil + } }, }