From 59d52355532da6c0da310e399798648e9c8eed10 Mon Sep 17 00:00:00 2001 From: Phil Bracikowski Date: Tue, 19 Aug 2025 12:22:47 -0700 Subject: [PATCH] feat: update inch options and url to support influxdb 3 core Adds an option to use the v3 native write endpoint so advanced control of write behavior can be used such as the "no-sync" option. Additionally, modifies the channel for buffered write batches to reflect concurrency setting, fixes an issue with retrying a write that was needlessly recompressing or copying the write batch, reduced allocations and copies by reusing buffers and letting bytes Buffer's take ownership of existing buffers. Updates golang to version 1.24 * closes #46 --- cmd/inch/main.go | 5 ++- go.mod | 2 +- inch.go | 105 ++++++++++++++++++++++++++++------------------- 3 files changed, 68 insertions(+), 44 deletions(-) diff --git a/cmd/inch/main.go b/cmd/inch/main.go index f593051..ddb8f8a 100644 --- a/cmd/inch/main.go +++ b/cmd/inch/main.go @@ -61,7 +61,7 @@ func (m *Main) ParseFlags(args []string) error { fs := flag.NewFlagSet("inch", flag.ContinueOnError) fs.BoolVar(&m.inch.Verbose, "v", false, "Verbose") fs.BoolVar(&m.inch.V2, "v2", false, "Writing into InfluxDB 2.0") - fs.StringVar(&m.inch.Token, "token", "", "InfluxDB 2.0 Authorization token") + fs.StringVar(&m.inch.Token, "token", "", "InfluxDB 2.0 or 3 Authorization token") fs.StringVar(&m.inch.ReportHost, "report-host", "", "Host to send metrics") fs.StringVar(&m.inch.ReportUser, "report-user", "", "User for Host to send metrics") fs.StringVar(&m.inch.ReportPassword, "report-password", "", "Password Host to send metrics") @@ -92,6 +92,9 @@ func (m *Main) ParseFlags(args []string) error { fs.BoolVar(&m.inch.Gzip, "gzip", false, "Use gzip compression") fs.StringVar(&m.inch.Precision, "precision", "ns", "Precision of writes") noSetup := fs.Bool("no-setup", false, "Don't ping or set up tables/buckets on run (this is useful for load testing kapacitor)") + fs.BoolVar(&m.inch.V3, "v3", false, "Use v3 write endpoint (only compatible with v3 write endpoint)") + fs.BoolVar(&m.inch.V3NoSync, "v3-no-sync", false, "Disable waiting for durability before ack") + fs.BoolVar(&m.inch.V3AcceptPartial, "v3-accept-partial", false, "Accept lines in batch successfully even if subsequent lines error") if err := fs.Parse(args); err != nil { return err diff --git a/go.mod b/go.mod index bca333d..0af3084 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/influxdata/inch -go 1.17 +go 1.24 require github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d diff --git a/inch.go b/inch.go index ac9ad8b..d0a8848 100644 --- a/inch.go +++ b/inch.go @@ -74,7 +74,7 @@ type Simulator struct { // Decay factor used when weighting average latency returned by server. alpha float64 - V2 bool + V2 bool // even with the V2 flag, the v1 write endpoint of v2 is used Token string Verbose bool ReportHost string @@ -102,6 +102,9 @@ type Simulator struct { TargetMaxLatency time.Duration Gzip bool Precision string + V3 bool // enables the v3 native write endpoint which has additional semantics; db and precision are required + V3NoSync bool // v3 supports a "no-sync" option, when true will ACK write as soon as possible without waiting for wal durability + V3AcceptPartial bool // allow partial write success when some lines in a batch fail to write Database string RetentionPolicy string // Write to a specific retention policy @@ -159,7 +162,7 @@ func (s *Simulator) Validate() error { el = append(el, errors.New("number of fields must be > 0")) } - // validate reporting client is accessable + // validate reporting client is accessible if s.ReportHost != "" { var err error s.clt, err = client.NewHTTPClient(client.HTTPConfig{ @@ -183,6 +186,19 @@ func (s *Simulator) Validate() error { el = append(el, fmt.Errorf("invalid precision: %s", s.Precision)) } + if s.Concurrency <= 0 { + fmt.Fprintf(s.Stdout, "Warning: concurrency set to non-positive; resetting to 1: your setting %v\n", s.Concurrency) + s.Concurrency = 1 + } + + if s.Concurrency >= 50 { + fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency) + } + + if !s.V3 && (s.V3NoSync || s.V3AcceptPartial) { + fmt.Fprintf(s.Stdout, "Warning: InfluxDB 3 flag(s) set to true, but V3 write endpoint not being used; flags will have no effect.\n") + } + if len(el) > 0 { return el } @@ -218,11 +234,14 @@ func (s *Simulator) Run(ctx context.Context) error { fmt.Fprintf(s.Stdout, "Retention Policy: %s\n", s.RetentionPolicy) fmt.Fprintf(s.Stdout, "Write Consistency: %s\n", s.Consistency) fmt.Fprintf(s.Stdout, "Writing into InfluxDB 2.0: %t\n", s.V2) - fmt.Fprintf(s.Stdout, "InfluxDB 2.0 Authorization Token: %s\n", s.Token) + fmt.Fprintf(s.Stdout, "InfluxDB 2 or 3 Authorization Token: %s\n", s.Token) fmt.Fprintf(s.Stdout, "Precision: %s\n", s.Precision) + fmt.Fprintf(s.Stdout, "Writing into InfluxDB 3: %t\n", s.V3) + fmt.Fprintf(s.Stdout, "InfluxDB 3 no-sync: %t\n", s.V3NoSync) + fmt.Fprintf(s.Stdout, "InfluxDB 3 accept partial writes: %t\n", s.V3AcceptPartial) - if s.V2 == true && s.Token == "" { - fmt.Println("ERROR: Need to provide a token in order to write into InfluxDB 2.0") + if (s.V2 || s.V3) && s.Token == "" { + fmt.Println("ERROR: Need to provide a token in order to write into InfluxDB 2 or 3") return err } @@ -275,7 +294,7 @@ func (s *Simulator) Run(ctx context.Context) error { } // Stream batches from a separate goroutine. - ch := s.generateBatches() + ch := s.generateBatches(s.Concurrency) // Start clients. var wg sync.WaitGroup @@ -350,8 +369,11 @@ func (s *Simulator) makeField(val int) []string { } // generateBatches returns a channel for streaming batches. -func (s *Simulator) generateBatches() <-chan []byte { - ch := make(chan []byte, 10) +func (s *Simulator) generateBatches(concurrency int) <-chan []byte { + // use concurrency setting to adjust internal buffering. We should attempt to make as many batches + // as there are concurrent writers up to some limit ~50. + pendingBatches := max(concurrency, 9) + 1 // use 10 (9+1) to keep close to previous behavior for default concurrency of 1 + ch := make(chan []byte, min(pendingBatches, 51)) // memory use is channel size * batch size * line size: could be significant go func() { values := make([]int, len(s.Tags)) @@ -440,7 +462,7 @@ func (s *Simulator) generateBatches() <-chan []byte { return ch } -var space []byte = []byte(" ") +var space = []byte(" ") func (s *Simulator) formatWrites(buf *bytes.Buffer, measurement []byte, tags []byte, fieldValues string, timestamp int64, timeDivisor int64) { buf.Write(measurement) // Write measurement @@ -623,8 +645,8 @@ func (s *Simulator) quartileResponse(q float64) time.Duration { // runClient executes a client to send points in a separate goroutine. func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) { - b := bytes.NewBuffer(make([]byte, 0, 1024)) - g := gzip.NewWriter(b) + gzipBackingBuffer := make([]byte, 0, 1024) + g := gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer)) for { select { @@ -636,35 +658,31 @@ func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) { return } - // Keep trying batch until successful. - // Stop client if it cannot connect. - for { - b.Reset() + b := bytes.NewBuffer(gzipBackingBuffer) // releases previous buffer in 'b' + b.Reset() // resets backing buffer to zero length - if s.Gzip { - g.Reset(b) + if s.Gzip { + g.Reset(b) - if _, err := g.Write(buf); err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } + if _, err := g.Write(buf); err != nil { + fmt.Fprintln(s.Stderr, err) + fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) + os.Exit(1) + } - if err := g.Close(); err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } - } else { - _, err := io.Copy(b, bytes.NewReader(buf)) - if err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } + if err := g.Close(); err != nil { + fmt.Fprintln(s.Stderr, err) + fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) + os.Exit(1) } + } else { + b = bytes.NewBuffer(buf) // don't copy; just take ownership of the buffer + } - if err := s.sendBatch(b.Bytes()); err == ErrConnectionRefused { + // Keep trying batch until successful. + // Stop client if it cannot connect. + for { + if err := s.sendBatch(b.Bytes()); errors.Is(err, ErrConnectionRefused) { return } else if err != nil { fmt.Fprintln(s.Stderr, err) @@ -719,7 +737,7 @@ var defaultSetupFn = func(s *Simulator) error { } req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - if s.V2 == true { + if s.V2 || s.V3 { req.Header.Set("Authorization", "Token "+s.Token) } @@ -744,7 +762,10 @@ var defaultSetupFn = func(s *Simulator) error { // It's the caller's responsibility to close the response body. var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io.ReadCloser, err error) { var url string - if s.RetentionPolicy == "" { + + if s.V3 { + url = fmt.Sprintf("%s/api/v3/write_lp?db=%s&precision=%s&no_sync=%v&accept_partial=%v", s.Host, s.Database, s.Precision, s.V3NoSync, s.V3AcceptPartial) + } else if s.RetentionPolicy == "" { url = fmt.Sprintf("%s/write?db=%s&precision=%s&consistency=%s", s.Host, s.Database, s.Precision, s.Consistency) } else { url = fmt.Sprintf("%s/write?db=%s&rp=%s&precision=%s&consistency=%s", s.Host, s.Database, s.RetentionPolicy, s.Precision, s.Consistency) @@ -755,7 +776,7 @@ var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io. return 0, nil, err } - if s.V2 == true { + if s.V2 || s.V3 { req.Header.Set("Authorization", "Token "+s.Token) } @@ -787,7 +808,7 @@ var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io. // sendBatch writes a batch to the server. Continually retries until successful. func (s *Simulator) sendBatch(buf []byte) error { - // Don't send the batch anywhere.. + // Don't send the batch anywhere. if s.DryRun { return nil } @@ -854,17 +875,17 @@ func (s *Simulator) sendBatch(buf []byte) error { // slower than the desired maximum latency. We use a weighted moving average // to determine that, favouring recent latencies over historic ones. // - // The implementation is pretty ghetto at the moment, it has the following + // The implementation is pretty primitive at the moment, it has the following // rules: // - // - wma reponse time faster than desired latency and currentDelay > 0? + // - wma response time faster than desired latency and currentDelay > 0? // * reduce currentDelay by 1/n * 0.25 * (desired latency - wma latency). // - response time slower than desired latency? // * increase currentDelay by 1/n * 0.25 * (desired latency - wma response). // - currentDelay < 100ms? // * set currentDelay to 0 // - // n is the number of concurent writers. The general rule then, is that + // n is the number of concurrent writers. The general rule then, is that // we look at how far away from the desired latency and move a quarter of the // way there in total (over all writers). If we're coming un under the max // latency and our writers are using a delay (currentDelay > 0) then we will