Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/inch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *Main) ParseFlags(args []string) error {
fs := flag.NewFlagSet("inch", flag.ContinueOnError)
fs.BoolVar(&m.inch.Verbose, "v", false, "Verbose")
fs.BoolVar(&m.inch.V2, "v2", false, "Writing into InfluxDB 2.0")
fs.StringVar(&m.inch.Token, "token", "", "InfluxDB 2.0 Authorization token")
fs.StringVar(&m.inch.Token, "token", "", "InfluxDB 2.0 or 3 Authorization token")
fs.StringVar(&m.inch.ReportHost, "report-host", "", "Host to send metrics")
fs.StringVar(&m.inch.ReportUser, "report-user", "", "User for Host to send metrics")
fs.StringVar(&m.inch.ReportPassword, "report-password", "", "Password Host to send metrics")
Expand Down Expand Up @@ -92,6 +92,9 @@ func (m *Main) ParseFlags(args []string) error {
fs.BoolVar(&m.inch.Gzip, "gzip", false, "Use gzip compression")
fs.StringVar(&m.inch.Precision, "precision", "ns", "Precision of writes")
noSetup := fs.Bool("no-setup", false, "Don't ping or set up tables/buckets on run (this is useful for load testing kapacitor)")
fs.BoolVar(&m.inch.V3, "v3", false, "Use v3 write endpoint (only compatible with v3 write endpoint)")
fs.BoolVar(&m.inch.V3NoSync, "v3-no-sync", false, "Disable waiting for durability before ack")
fs.BoolVar(&m.inch.V3AcceptPartial, "v3-accept-partial", false, "Accept lines in batch successfully even if subsequent lines error")

if err := fs.Parse(args); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/influxdata/inch

go 1.17
go 1.24

require github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
105 changes: 63 additions & 42 deletions inch.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Simulator struct {

// Decay factor used when weighting average latency returned by server.
alpha float64
V2 bool
V2 bool // even with the V2 flag, the v1 write endpoint of v2 is used
Token string
Verbose bool
ReportHost string
Expand Down Expand Up @@ -102,6 +102,9 @@ type Simulator struct {
TargetMaxLatency time.Duration
Gzip bool
Precision string
V3 bool // enables the v3 native write endpoint which has additional semantics; db and precision are required
V3NoSync bool // v3 supports a "no-sync" option, when true will ACK write as soon as possible without waiting for wal durability
V3AcceptPartial bool // allow partial write success when some lines in a batch fail to write

Database string
RetentionPolicy string // Write to a specific retention policy
Expand Down Expand Up @@ -159,7 +162,7 @@ func (s *Simulator) Validate() error {
el = append(el, errors.New("number of fields must be > 0"))
}

// validate reporting client is accessable
// validate reporting client is accessible
if s.ReportHost != "" {
var err error
s.clt, err = client.NewHTTPClient(client.HTTPConfig{
Expand All @@ -183,6 +186,19 @@ func (s *Simulator) Validate() error {
el = append(el, fmt.Errorf("invalid precision: %s", s.Precision))
}

if s.Concurrency <= 0 {
fmt.Fprintf(s.Stdout, "Warning: concurrency set to non-positive; resetting to 1: your setting %v\n", s.Concurrency)
s.Concurrency = 1
}

if s.Concurrency >= 50 {
fmt.Fprintf(s.Stdout, "Warning: concurrency has diminishing returns; over 50 concurrent writers is likely bottlenecking: your setting: %v\n", s.Concurrency)
}

if !s.V3 && (s.V3NoSync || s.V3AcceptPartial) {
fmt.Fprintf(s.Stdout, "Warning: InfluxDB 3 flag(s) set to true, but V3 write endpoint not being used; flags will have no effect.\n")
}

if len(el) > 0 {
return el
}
Expand Down Expand Up @@ -218,11 +234,14 @@ func (s *Simulator) Run(ctx context.Context) error {
fmt.Fprintf(s.Stdout, "Retention Policy: %s\n", s.RetentionPolicy)
fmt.Fprintf(s.Stdout, "Write Consistency: %s\n", s.Consistency)
fmt.Fprintf(s.Stdout, "Writing into InfluxDB 2.0: %t\n", s.V2)
fmt.Fprintf(s.Stdout, "InfluxDB 2.0 Authorization Token: %s\n", s.Token)
fmt.Fprintf(s.Stdout, "InfluxDB 2 or 3 Authorization Token: %s\n", s.Token)
fmt.Fprintf(s.Stdout, "Precision: %s\n", s.Precision)
fmt.Fprintf(s.Stdout, "Writing into InfluxDB 3: %t\n", s.V3)
fmt.Fprintf(s.Stdout, "InfluxDB 3 no-sync: %t\n", s.V3NoSync)
fmt.Fprintf(s.Stdout, "InfluxDB 3 accept partial writes: %t\n", s.V3AcceptPartial)

if s.V2 == true && s.Token == "" {
fmt.Println("ERROR: Need to provide a token in order to write into InfluxDB 2.0")
if (s.V2 || s.V3) && s.Token == "" {
fmt.Println("ERROR: Need to provide a token in order to write into InfluxDB 2 or 3")
return err
}

Expand Down Expand Up @@ -275,7 +294,7 @@ func (s *Simulator) Run(ctx context.Context) error {
}

// Stream batches from a separate goroutine.
ch := s.generateBatches()
ch := s.generateBatches(s.Concurrency)

// Start clients.
var wg sync.WaitGroup
Expand Down Expand Up @@ -350,8 +369,11 @@ func (s *Simulator) makeField(val int) []string {
}

// generateBatches returns a channel for streaming batches.
func (s *Simulator) generateBatches() <-chan []byte {
ch := make(chan []byte, 10)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a channel of size 10 means 10 batches get buffered (naturally) but if the concurrency is higher, they might drain unsmoothly creating delays for the concurrent writers because a batch isn't available yet: the writing might stutter on inch's data generation size.

If the channel for buffering batches is slightly bigger than concurrency, the individual writeres should see a smoother pattern where the next batch is always available when it is finished with the last and ready to write the next batch (without waiting on generation).

This pr increases cpu usage but increasing the concurrent work batch generation and writing to the db under test. Also increases memory usage if concurrency is high, but really high concurrency might be better simulated with multiple inch instances running.

func (s *Simulator) generateBatches(concurrency int) <-chan []byte {
// use concurrency setting to adjust internal buffering. We should attempt to make as many batches
// as there are concurrent writers up to some limit ~50.
pendingBatches := max(concurrency, 9) + 1 // use 10 (9+1) to keep close to previous behavior for default concurrency of 1
ch := make(chan []byte, min(pendingBatches, 51)) // memory use is channel size * batch size * line size: could be significant

go func() {
values := make([]int, len(s.Tags))
Expand Down Expand Up @@ -440,7 +462,7 @@ func (s *Simulator) generateBatches() <-chan []byte {
return ch
}

var space []byte = []byte(" ")
var space = []byte(" ")

func (s *Simulator) formatWrites(buf *bytes.Buffer, measurement []byte, tags []byte, fieldValues string, timestamp int64, timeDivisor int64) {
buf.Write(measurement) // Write measurement
Expand Down Expand Up @@ -623,8 +645,8 @@ func (s *Simulator) quartileResponse(q float64) time.Duration {

// runClient executes a client to send points in a separate goroutine.
func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) {
b := bytes.NewBuffer(make([]byte, 0, 1024))
g := gzip.NewWriter(b)
gzipBackingBuffer := make([]byte, 0, 1024)
g := gzip.NewWriter(bytes.NewBuffer(gzipBackingBuffer))

for {
select {
Expand All @@ -636,35 +658,31 @@ func (s *Simulator) runClient(ctx context.Context, ch <-chan []byte) {
return
}

// Keep trying batch until successful.
// Stop client if it cannot connect.
for {
b.Reset()
b := bytes.NewBuffer(gzipBackingBuffer) // releases previous buffer in 'b'
b.Reset() // resets backing buffer to zero length

if s.Gzip {
g.Reset(b)
if s.Gzip {
g.Reset(b)

if _, err := g.Write(buf); err != nil {
fmt.Fprintln(s.Stderr, err)
fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err)
os.Exit(1)
}
if _, err := g.Write(buf); err != nil {
fmt.Fprintln(s.Stderr, err)
fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err)
os.Exit(1)
}

if err := g.Close(); err != nil {
fmt.Fprintln(s.Stderr, err)
fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err)
os.Exit(1)
}
} else {
_, err := io.Copy(b, bytes.NewReader(buf))
if err != nil {
fmt.Fprintln(s.Stderr, err)
fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err)
os.Exit(1)
}
if err := g.Close(); err != nil {
fmt.Fprintln(s.Stderr, err)
fmt.Fprintf(s.Stderr, "Exiting due to fatal errors: %v.\n", err)
os.Exit(1)
}
} else {
b = bytes.NewBuffer(buf) // don't copy; just take ownership of the buffer
}

if err := s.sendBatch(b.Bytes()); err == ErrConnectionRefused {
// Keep trying batch until successful.
// Stop client if it cannot connect.
for {
if err := s.sendBatch(b.Bytes()); errors.Is(err, ErrConnectionRefused) {
return
} else if err != nil {
fmt.Fprintln(s.Stderr, err)
Expand Down Expand Up @@ -719,7 +737,7 @@ var defaultSetupFn = func(s *Simulator) error {
}

req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
if s.V2 == true {
if s.V2 || s.V3 {
req.Header.Set("Authorization", "Token "+s.Token)
}

Expand All @@ -744,7 +762,10 @@ var defaultSetupFn = func(s *Simulator) error {
// It's the caller's responsibility to close the response body.
var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io.ReadCloser, err error) {
var url string
if s.RetentionPolicy == "" {

if s.V3 {
url = fmt.Sprintf("%s/api/v3/write_lp?db=%s&precision=%s&no_sync=%v&accept_partial=%v", s.Host, s.Database, s.Precision, s.V3NoSync, s.V3AcceptPartial)
} else if s.RetentionPolicy == "" {
url = fmt.Sprintf("%s/write?db=%s&precision=%s&consistency=%s", s.Host, s.Database, s.Precision, s.Consistency)
} else {
url = fmt.Sprintf("%s/write?db=%s&rp=%s&precision=%s&consistency=%s", s.Host, s.Database, s.RetentionPolicy, s.Precision, s.Consistency)
Expand All @@ -755,7 +776,7 @@ var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io.
return 0, nil, err
}

if s.V2 == true {
if s.V2 || s.V3 {
req.Header.Set("Authorization", "Token "+s.Token)
}

Expand Down Expand Up @@ -787,7 +808,7 @@ var defaultWriteBatch = func(s *Simulator, buf []byte) (statusCode int, body io.

// sendBatch writes a batch to the server. Continually retries until successful.
func (s *Simulator) sendBatch(buf []byte) error {
// Don't send the batch anywhere..
// Don't send the batch anywhere.
if s.DryRun {
return nil
}
Expand Down Expand Up @@ -854,17 +875,17 @@ func (s *Simulator) sendBatch(buf []byte) error {
// slower than the desired maximum latency. We use a weighted moving average
// to determine that, favouring recent latencies over historic ones.
//
// The implementation is pretty ghetto at the moment, it has the following
// The implementation is pretty primitive at the moment, it has the following
// rules:
//
// - wma reponse time faster than desired latency and currentDelay > 0?
// - wma response time faster than desired latency and currentDelay > 0?
// * reduce currentDelay by 1/n * 0.25 * (desired latency - wma latency).
// - response time slower than desired latency?
// * increase currentDelay by 1/n * 0.25 * (desired latency - wma response).
// - currentDelay < 100ms?
// * set currentDelay to 0
//
// n is the number of concurent writers. The general rule then, is that
// n is the number of concurrent writers. The general rule then, is that
// we look at how far away from the desired latency and move a quarter of the
// way there in total (over all writers). If we're coming un under the max
// latency and our writers are using a delay (currentDelay > 0) then we will
Expand Down