diff --git a/cmd/inch/main.go b/cmd/inch/main.go index f593051..ddb8f8a 100644 --- a/cmd/inch/main.go +++ b/cmd/inch/main.go @@ -61,7 +61,7 @@ func (m *Main) ParseFlags(args []string) error { fs := flag.NewFlagSet("inch", flag.ContinueOnError) fs.BoolVar(&m.inch.Verbose, "v", false, "Verbose") fs.BoolVar(&m.inch.V2, "v2", false, "Writing into InfluxDB 2.0") - fs.StringVar(&m.inch.Token, "token", "", "InfluxDB 2.0 Authorization token") + fs.StringVar(&m.inch.Token, "token", "", "InfluxDB 2.0 or 3 Authorization token") fs.StringVar(&m.inch.ReportHost, "report-host", "", "Host to send metrics") fs.StringVar(&m.inch.ReportUser, "report-user", "", "User for Host to send metrics") fs.StringVar(&m.inch.ReportPassword, "report-password", "", "Password Host to send metrics") @@ -92,6 +92,9 @@ func (m *Main) ParseFlags(args []string) error { fs.BoolVar(&m.inch.Gzip, "gzip", false, "Use gzip compression") fs.StringVar(&m.inch.Precision, "precision", "ns", "Precision of writes") noSetup := fs.Bool("no-setup", false, "Don't ping or set up tables/buckets on run (this is useful for load testing kapacitor)") + fs.BoolVar(&m.inch.V3, "v3", false, "Use v3 write endpoint (only compatible with v3 write endpoint)") + fs.BoolVar(&m.inch.V3NoSync, "v3-no-sync", false, "Disable waiting for durability before ack") + fs.BoolVar(&m.inch.V3AcceptPartial, "v3-accept-partial", false, "Accept lines in batch successfully even if subsequent lines error") if err := fs.Parse(args); err != nil { return err diff --git a/go.mod b/go.mod index bca333d..0af3084 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/influxdata/inch -go 1.17 +go 1.24 require github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d diff --git a/inch.go b/inch.go index ac9ad8b..d0a8848 100644 --- a/inch.go +++ b/inch.go @@ -74,7 +74,7 @@ type Simulator struct { // Decay factor used when weighting average latency returned by server. alpha float64 - V2 bool + V2 bool // even with the V2 flag, the v1 write endpoint of v2 is used Token string Verbose bool ReportHost string @@ -102,6 +102,9 @@ type Simulator struct { TargetMaxLatency time.Duration Gzip bool Precision string + V3 bool // enables the v3 native write endpoint which has additional semantics; db and precision are required + V3NoSync bool // v3 supports a "no-sync" option, when true will ACK write as soon as possible without waiting for wal durability + V3AcceptPartial bool // allow partial write success when some lines in a batch fail to write Database string RetentionPolicy string // Write to a specific retention policy @@ -159,7 +162,7 @@ func (s *Simulator) Validate() error { el = append(el, errors.New("number of fields must be > 0")) } - // validate reporting client is accessable + // validate reporting client is accessible if s.ReportHost != "" { var err error s.clt, err = client.NewHTTPClient(client.HTTPConfig{ @@ -183,6 +186,19 @@ func (s *Simulator) Validate() error { el = append(el, fmt.Errorf("invalid precision: %s", s.Precision)) } + if s.Concurrency <= 0 { + fmt.Fprintf(s.Stdout, "Warning: concurrency set to non-positive; resetting to 1: your setting %v\n", s.Concurrency) + s.Concurrency = 1 + } + + if s.Concurrency >= 50 { + fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency) + } + + if !s.V3 && (s.V3NoSync || s.V3AcceptPartial) { + fmt.Fprintf(s.Stdout, "Warning: InfluxDB 3 flag(s) set to true, but V3 write endpoint not being used; flags will have no effect.\n") + } + if len(el) > 0 { return el } @@ -218,11 +234,14 @@ func (s *Simulator) Run(ctx context.Context) error { fmt.Fprintf(s.Stdout, "Retention Policy: %s\n", s.RetentionPolicy) fmt.Fprintf(s.Stdout, "Write Consistency: %s\n", s.Consistency) fmt.Fprintf(s.Stdout, "Writing into InfluxDB 2.0: %t\n", s.V2) - fmt.Fprintf(s.Stdout, "InfluxDB 2.0 Authorization Token: %s\n", s.Token) + fmt.Fprintf(s.Stdout, "InfluxDB 2 or 3 Authorization Token: %s\n", s.Token) fmt.Fprintf(s.Stdout, "Precision: %s\n", s.Precision) + fmt.Fprintf(s.Stdout, "Writing into InfluxDB 3: %t\n", s.V3) + fmt.Fprintf(s.Stdout, "InfluxDB 3 no-sync: %t\n", s.V3NoSync) + fmt.Fprintf(s.Stdout, "InfluxDB 3 accept partial writes: %t\n", s.V3AcceptPartial) - if s.V2 == true && s.Token == "" { - fmt.Println("ERROR: Need to provide a token in order to write into InfluxDB 2.0") + if (s.V2 || s.V3) && s.Token == "" { + fmt.Println("ERROR: Need to provide a token in order to write into InfluxDB 2 or 3") return err } @@ -275,7 +294,7 @@ func (s *Simulator) Run(ctx context.Context) error { } // Stream batches from a separate goroutine. - ch := s.generateBatches() + ch := s.generateBatches(s.Concurrency) // Start clients. var wg sync.WaitGroup @@ -350,8 +369,11 @@ func (s *Simulator) makeField(val int) []string { } // generateBatches returns a channel for streaming batches. -func (s *Simulator) generateBatches() <-chan []byte { - ch := make(chan []byte, 10) +func (s *Simulator) generateBatches(concurrency int) <-chan []byte { + // use concurrency setting to adjust internal buffering. We should attempt to make as many batches + // as there are concurrent writers up to some limit ~50. + pendingBatches := max(concurrency, 9) + 1 // use 10 (9+1) to keep close to previous behavior for default concurrency of 1 + ch := make(chan []byte, min(pendingBatches, 51)) // memory use is channel size * batch size * line size: could be significant go func() { values := make([]int, len(s.Tags)) @@ -440,7 +462,7 @@ func (s *Simulator) generateBatches() <-chan []byte { return ch } -var space []byte = []byte(" ") +var space = []byte(" ") func (s *Simulator) formatWrites(buf *bytes.Buffer, measurement []byte, tags []byte, fieldValues string, timestamp int64, timeDivisor int64) { buf.Write(measurement) // Write measurement @@ -623,8 +645,8 @@ func (s *Simulator) quartileResponse(q float64) time.Duration { // runClient executes a client to send points in a separate goroutine. func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) { - b := bytes.NewBuffer(make([]byte, 0, 1024)) - g := gzip.NewWriter(b) + gzipBackingBuffer := make([]byte, 0, 1024) + g := gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer)) for { select { @@ -636,35 +658,31 @@ func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) { return } - // Keep trying batch until successful. - // Stop client if it cannot connect. - for { - b.Reset() + b := bytes.NewBuffer(gzipBackingBuffer) // releases previous buffer in 'b' + b.Reset() // resets backing buffer to zero length - if s.Gzip { - g.Reset(b) + if s.Gzip { + g.Reset(b) - if _, err := g.Write(buf); err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } + if _, err := g.Write(buf); err != nil { + fmt.Fprintln(s.Stderr, err) + fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) + os.Exit(1) + } - if err := g.Close(); err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } - } else { - _, err := io.Copy(b, bytes.NewReader(buf)) - if err != nil { - fmt.Fprintln(s.Stderr, err) - fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) - os.Exit(1) - } + if err := g.Close(); err != nil { + fmt.Fprintln(s.Stderr, err) + fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err) + os.Exit(1) } + } else { + b = bytes.NewBuffer(buf) // don't copy; just take ownership of the buffer + } - if err := s.sendBatch(b.Bytes()); err == ErrConnectionRefused { + // Keep trying batch until successful. + // Stop client if it cannot connect. + for { + if err := s.sendBatch(b.Bytes()); errors.Is(err, ErrConnectionRefused) { return } else if err != nil { fmt.Fprintln(s.Stderr, err) @@ -719,7 +737,7 @@ var defaultSetupFn = func(s *Simulator) error { } req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - if s.V2 == true { + if s.V2 || s.V3 { req.Header.Set("Authorization", "Token "+s.Token) } @@ -744,7 +762,10 @@ var defaultSetupFn = func(s *Simulator) error { // It's the caller's responsibility to close the response body. var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io.ReadCloser, err error) { var url string - if s.RetentionPolicy == "" { + + if s.V3 { + url = fmt.Sprintf("%s/api/v3/write_lp?db=%s&precision=%s&no_sync=%v&accept_partial=%v", s.Host, s.Database, s.Precision, s.V3NoSync, s.V3AcceptPartial) + } else if s.RetentionPolicy == "" { url = fmt.Sprintf("%s/write?db=%s&precision=%s&consistency=%s", s.Host, s.Database, s.Precision, s.Consistency) } else { url = fmt.Sprintf("%s/write?db=%s&rp=%s&precision=%s&consistency=%s", s.Host, s.Database, s.RetentionPolicy, s.Precision, s.Consistency) @@ -755,7 +776,7 @@ var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io. return 0, nil, err } - if s.V2 == true { + if s.V2 || s.V3 { req.Header.Set("Authorization", "Token "+s.Token) } @@ -787,7 +808,7 @@ var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io. // sendBatch writes a batch to the server. Continually retries until successful. func (s *Simulator) sendBatch(buf []byte) error { - // Don't send the batch anywhere.. + // Don't send the batch anywhere. if s.DryRun { return nil } @@ -854,17 +875,17 @@ func (s *Simulator) sendBatch(buf []byte) error { // slower than the desired maximum latency. We use a weighted moving average // to determine that, favouring recent latencies over historic ones. // - // The implementation is pretty ghetto at the moment, it has the following + // The implementation is pretty primitive at the moment, it has the following // rules: // - // - wma reponse time faster than desired latency and currentDelay > 0? + // - wma response time faster than desired latency and currentDelay > 0? // * reduce currentDelay by 1/n * 0.25 * (desired latency - wma latency). // - response time slower than desired latency? // * increase currentDelay by 1/n * 0.25 * (desired latency - wma response). // - currentDelay < 100ms? // * set currentDelay to 0 // - // n is the number of concurent writers. The general rule then, is that + // n is the number of concurrent writers. The general rule then, is that // we look at how far away from the desired latency and move a quarter of the // way there in total (over all writers). If we're coming un under the max // latency and our writers are using a delay (currentDelay > 0) then we will