diff --git a/conf/proxy.toml b/conf/proxy.toml index 57c4b837..f3f7372f 100644 --- a/conf/proxy.toml +++ b/conf/proxy.toml @@ -34,7 +34,7 @@ graceful-close-conn-timeout = 15 # It's a tradeoff between memory and performance. # possible values: -# 0 => default value +# 0 => default value (16K) # 1K to 16M # conn-buffer-size = 0 diff --git a/lib/config/proxy_test.go b/lib/config/proxy_test.go index 7f49be50..3549d028 100644 --- a/lib/config/proxy_test.go +++ b/lib/config/proxy_test.go @@ -25,7 +25,7 @@ var testProxyConfig = Config{ FrontendKeepalive: KeepAlive{Enabled: true}, ProxyProtocol: "v2", GracefulWaitBeforeShutdown: 10, - ConnBufferSize: 32 * 1024, + ConnBufferSize: 16 * 1024, }, }, API: API{ diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index 42542f68..a7d8f83c 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -46,6 +46,8 @@ const ( TickerInterval = 5 * time.Second // DefaultPausedConnTimeout is the maximum duration a paused connection can last before being closed. DefaultPausedConnTimeout = 8 * time.Hour + // SlowCmdThreshold is the threshold for logging slow proxy command processing. + SlowCmdThreshold = 100 * time.Millisecond ) const ( @@ -316,8 +318,17 @@ func (mgr *BackendConnManager) getBackendIO(ctx context.Context, cctx ConnContex // ExecuteCmd forwards messages between the client and the backend. // If it finds that the session is ready for redirection, it migrates the session. func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (err error) { + lockStart := time.Now() + startWall := time.Now() startTime := monotime.Now() mgr.processLock.Lock() + lockWait := time.Since(lockStart) + var ( + cmd pnet.Command + waitingRedirect bool + holdRequest bool + resumeTriggered bool + ) defer func() { mgr.SetQuitSourceByErr(err) mgr.handshakeHandler.OnTraffic(mgr) @@ -338,26 +349,41 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) ( } mgr.lastActiveTime = now mgr.processLock.Unlock() + totalElapsed := time.Since(startWall) + if lockWait >= SlowCmdThreshold || totalElapsed >= SlowCmdThreshold { + fields := []zap.Field{ + zap.Stringer("cmd", cmd), + zap.Duration("lock_wait", lockWait), + zap.Duration("duration", totalElapsed), + zap.Bool("resume", resumeTriggered), + zap.Bool("waiting_redirect", waitingRedirect), + zap.Bool("hold_request", holdRequest), + } + if backendAddr := mgr.ServerAddr(); backendAddr != "" { + fields = append(fields, zap.String("backend_addr", backendAddr)) + } + mgr.logger.Info("slow execute cmd", fields...) + } }() if len(request) < 1 { err = mysql.ErrMalformPacket return } - cmd := pnet.Command(request[0]) + cmd = pnet.Command(request[0]) // Once the request is accepted, it's treated in the transaction, so we don't check graceful shutdown here. if mgr.closeStatus.Load() >= statusClosing { return } if mgr.backendIO.Load() == nil { + resumeTriggered = true err = mgr.resume(ctx) if err != nil { mgr.logger.Error("resume failed", zap.Error(err)) return } } - waitingRedirect := mgr.redirectInfo.Load() != nil - var holdRequest bool + waitingRedirect = mgr.redirectInfo.Load() != nil backendIO := mgr.backendIO.Load() holdRequest, err = mgr.cmdProcessor.executeCmd(request, mgr.clientIO, backendIO, waitingRedirect) if !holdRequest { @@ -485,9 +511,9 @@ func (mgr *BackendConnManager) processSignals(ctx context.Context) { mgr.notifyRedirectResult(ctx, rs) case <-checkBackendTicker.C: func() { - mgr.checkBackendActive() mgr.processLock.Lock() defer mgr.processLock.Unlock() + mgr.checkBackendActiveLocked() mgr.setKeepAlive() }() case <-ctx.Done(): @@ -664,10 +690,8 @@ func (mgr *BackendConnManager) tryGracefulClose(ctx context.Context) { mgr.closeStatus.CompareAndSwap(statusNotifyClose, statusClosing) } -func (mgr *BackendConnManager) checkBackendActive() { - mgr.processLock.Lock() - defer mgr.processLock.Unlock() - +// checkBackendActiveLocked assumes processLock is already held. +func (mgr *BackendConnManager) checkBackendActiveLocked() { if mgr.closeStatus.Load() >= statusNotifyClose { return } diff --git a/pkg/proxy/client/client_conn.go b/pkg/proxy/client/client_conn.go index 7659681b..d473657b 100644 --- a/pkg/proxy/client/client_conn.go +++ b/pkg/proxy/client/client_conn.go @@ -16,6 +16,8 @@ import ( "go.uber.org/zap" ) +const frontendSlowPathThreshold = 100 * time.Millisecond + type ClientConnection struct { logger *zap.Logger frontendTLSConfig *tls.Config // the TLS config to connect to clients. @@ -28,8 +30,10 @@ func NewClientConnection(logger *zap.Logger, conn net.Conn, frontendTLSConfig *t hsHandler backend.HandshakeHandler, connID uint64, addr string, frontendReadTimeout int, bcConfig *backend.BCConfig) *ClientConnection { bemgr := backend.NewBackendConnManager(logger.Named("be"), hsHandler, connID, bcConfig) bemgr.SetValue(backend.ConnContextKeyConnAddr, addr) - opts := make([]pnet.PacketIOption, 0, 2) + opts := make([]pnet.PacketIOption, 0, 4) opts = append(opts, pnet.WithWrapError(backend.ErrClientConn)) + opts = append(opts, pnet.WithQuickAck()) + opts = append(opts, pnet.WithSlowLog("frontend", frontendSlowPathThreshold)) if bcConfig.ProxyProtocol { opts = append(opts, pnet.WithProxy) } @@ -73,16 +77,28 @@ clean: func (cc *ClientConnection) processMsg(ctx context.Context) error { for { cc.pkt.ResetSequence() + readStart := time.Now() clientPkt, err := cc.pkt.ReadPacket() + readElapsed := time.Since(readStart) if err != nil { cc.connMgr.SetValue(backend.ConnContextKeyClientError, err) cc.connMgr.SetQuitSourceByErr(err) return err } + execStart := time.Now() err = cc.connMgr.ExecuteCmd(ctx, clientPkt) + execElapsed := time.Since(execStart) if err != nil { return err } + totalElapsed := readElapsed + execElapsed + if totalElapsed >= frontendSlowPathThreshold { + cc.logger.Info("slow frontend command", + zap.Stringer("cmd", pnet.Command(clientPkt[0])), + zap.Duration("read_packet_elapsed", readElapsed), + zap.Duration("execute_cmd_elapsed", execElapsed), + zap.Duration("total_elapsed", totalElapsed)) + } if pnet.Command(clientPkt[0]) == pnet.ComQuit { return nil } diff --git a/pkg/proxy/net/packetio.go b/pkg/proxy/net/packetio.go index a8b9ec08..60ae6c6c 100644 --- a/pkg/proxy/net/packetio.go +++ b/pkg/proxy/net/packetio.go @@ -44,8 +44,10 @@ var ( ErrInvalidSequence = dbterror.ClassServer.NewStd(errno.ErrInvalidSequence) ) +var requestQuickAck = setQuickAck + const ( - DefaultConnBufferSize = 32 * 1024 + DefaultConnBufferSize = 16 * 1024 ) type rwStatus int @@ -198,6 +200,9 @@ type PacketIO struct { logger *zap.Logger remoteAddr net.Addr wrap error + quickAck bool + slowLogRole string + slowThreshold time.Duration header [4]byte // reuse memory to reduce allocation inPackets uint64 outPackets uint64 @@ -277,6 +282,10 @@ func (p *PacketIO) readOnePacket() ([]byte, bool, error) { // ReadPacket reads data and removes the header func (p *PacketIO) ReadPacket() (data []byte, err error) { + start := time.Now() + defer func() { + p.logSlowIO("read_packet", start, len(data), err) + }() p.readWriter.BeginRW(rwRead) for more := true; more; { var buf []byte @@ -420,9 +429,14 @@ func (p *PacketIO) OutPackets() uint64 { } func (p *PacketIO) Flush() error { + start := time.Now() if err := p.readWriter.Flush(); err != nil { - return p.wrapErr(errors.Wrap(ErrFlushConn, errors.WithStack(err))) + err = p.wrapErr(errors.Wrap(ErrFlushConn, errors.WithStack(err))) + p.logSlowIO("flush", start, 0, err) + return err } + p.requestQuickAck() + p.logSlowIO("flush", start, 0, nil) return nil } @@ -443,6 +457,42 @@ func (p *PacketIO) LastKeepAlive() config.KeepAlive { return p.lastKeepAlive } +func (p *PacketIO) requestQuickAck() { + if !p.quickAck { + return + } + if err := requestQuickAck(p.rawConn); err != nil && p.logger != nil { + p.logger.Debug("failed to request tcp quickack", zap.Error(err)) + } +} + +func (p *PacketIO) logSlowIO(op string, start time.Time, bytes int, err error) { + if p.logger == nil || p.slowThreshold <= 0 { + return + } + elapsed := time.Since(start) + if elapsed < p.slowThreshold { + return + } + fields := []zap.Field{ + zap.String("op", op), + zap.Duration("duration", elapsed), + } + if p.slowLogRole != "" { + fields = append(fields, zap.String("role", p.slowLogRole)) + } + if bytes > 0 { + fields = append(fields, zap.Int("bytes", bytes)) + } + if remoteAddr := p.RemoteAddr(); remoteAddr != nil { + fields = append(fields, zap.Stringer("remote_addr", remoteAddr)) + } + if err != nil { + fields = append(fields, zap.Error(err)) + } + p.logger.Info("slow packet io", fields...) +} + func (p *PacketIO) GracefulClose() error { if err := p.readWriter.SetDeadline(time.Now()); err != nil && !errors.Is(err, net.ErrClosed) { return err diff --git a/pkg/proxy/net/packetio_options.go b/pkg/proxy/net/packetio_options.go index 9d4a27de..4ebbe1b9 100644 --- a/pkg/proxy/net/packetio_options.go +++ b/pkg/proxy/net/packetio_options.go @@ -49,3 +49,16 @@ func WithReadTimeout(d time.Duration) func(pi *PacketIO) { pi.readTimeout = d } } + +func WithQuickAck() func(pi *PacketIO) { + return func(pi *PacketIO) { + pi.quickAck = true + } +} + +func WithSlowLog(role string, threshold time.Duration) func(pi *PacketIO) { + return func(pi *PacketIO) { + pi.slowLogRole = role + pi.slowThreshold = threshold + } +} diff --git a/pkg/proxy/net/packetio_test.go b/pkg/proxy/net/packetio_test.go index cd8f061f..f8f2d405 100644 --- a/pkg/proxy/net/packetio_test.go +++ b/pkg/proxy/net/packetio_test.go @@ -6,6 +6,7 @@ package net import ( "encoding/binary" "net" + "sync/atomic" "testing" "time" @@ -258,6 +259,50 @@ func TestKeepAlive(t *testing.T) { ) } +func TestQuickAckRequestedAfterFlush(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + cliConn, srvConn := net.Pipe() + defer func() { + require.NoError(t, cliConn.Close()) + require.NoError(t, srvConn.Close()) + }() + + oldRequester := requestQuickAck + defer func() { + requestQuickAck = oldRequester + }() + var quickAckCalls atomic.Int32 + requestQuickAck = func(net.Conn) error { + quickAckCalls.Add(1) + return nil + } + + cli := NewPacketIO(cliConn, lg, DefaultConnBufferSize, WithQuickAck()) + srv := NewPacketIO(srvConn, lg, DefaultConnBufferSize) + + readDone := make(chan struct{}) + go func() { + defer close(readDone) + data, err := srv.ReadPacket() + require.NoError(t, err) + require.Equal(t, []byte("hello"), data) + }() + require.NoError(t, cli.WritePacket([]byte("hello"), true)) + <-readDone + require.EqualValues(t, 1, quickAckCalls.Load()) + + writeDone := make(chan struct{}) + go func() { + defer close(writeDone) + require.NoError(t, srv.WritePacket([]byte("world"), true)) + }() + data, err := cli.ReadPacket() + require.NoError(t, err) + require.Equal(t, []byte("world"), data) + <-writeDone + require.EqualValues(t, 1, quickAckCalls.Load()) +} + func TestPredefinedPacket(t *testing.T) { testTCPConn(t, func(t *testing.T, cli *PacketIO) { diff --git a/pkg/proxy/net/quickack_default.go b/pkg/proxy/net/quickack_default.go new file mode 100644 index 00000000..e8e4769d --- /dev/null +++ b/pkg/proxy/net/quickack_default.go @@ -0,0 +1,12 @@ +// Copyright 2023 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build !linux + +package net + +import "net" + +func setQuickAck(net.Conn) error { + return nil +} diff --git a/pkg/proxy/net/quickack_linux.go b/pkg/proxy/net/quickack_linux.go new file mode 100644 index 00000000..ef5a37d3 --- /dev/null +++ b/pkg/proxy/net/quickack_linux.go @@ -0,0 +1,30 @@ +// Copyright 2023 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package net + +import ( + "net" + + "golang.org/x/sys/unix" +) + +func setQuickAck(conn net.Conn) error { + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + return nil + } + + rawConn, err := tcpConn.SyscallConn() + if err != nil { + return err + } + + var setErr error + if err = rawConn.Control(func(fd uintptr) { + setErr = unix.SetsockoptInt(int(fd), unix.IPPROTO_TCP, unix.TCP_QUICKACK, 1) + }); err != nil { + return err + } + return setErr +} diff --git a/pkg/proxy/net/socket.go b/pkg/proxy/net/socket.go new file mode 100644 index 00000000..2a63d74a --- /dev/null +++ b/pkg/proxy/net/socket.go @@ -0,0 +1,25 @@ +// Copyright 2023 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package net + +import ( + "net" + + "github.com/pingcap/tiproxy/lib/util/errors" +) + +var ( + ErrSetNoDelay = errors.New("failed to set tcp no delay") +) + +func SetNoDelay(conn net.Conn, noDelay bool) error { + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + return errors.Wrapf(ErrSetNoDelay, "not net.TCPConn") + } + if err := tcpConn.SetNoDelay(noDelay); err != nil { + return errors.Wrap(ErrSetNoDelay, err) + } + return nil +} diff --git a/pkg/proxy/net/socket_test.go b/pkg/proxy/net/socket_test.go new file mode 100644 index 00000000..4b83b45f --- /dev/null +++ b/pkg/proxy/net/socket_test.go @@ -0,0 +1,38 @@ +// Copyright 2023 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package net + +import ( + "net" + "syscall" + "testing" + + "github.com/pingcap/tiproxy/pkg/testkit" + "github.com/stretchr/testify/require" +) + +func TestSetNoDelay(t *testing.T) { + testkit.TestTCPConn(t, + func(*testing.T, net.Conn) {}, + func(t *testing.T, conn net.Conn) { + require.NoError(t, SetNoDelay(conn, true)) + + tcpConn, ok := conn.(*net.TCPConn) + require.True(t, ok) + rawConn, err := tcpConn.SyscallConn() + require.NoError(t, err) + + var ( + value int + serr error + cerr error + ) + cerr = rawConn.Control(func(fd uintptr) { + value, serr = syscall.GetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_NODELAY) + }) + require.NoError(t, cerr) + require.NoError(t, serr) + require.NotZero(t, value) + }, 1) +} diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 069ed00c..59f716e5 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -205,6 +205,9 @@ func (s *SQLServer) onConn(ctx context.Context, conn net.Conn, addr string) { if err := keepalive.SetKeepalive(conn, config.KeepAlive{Enabled: tcpKeepAlive}); err != nil { logger.Warn("failed to set tcp keep alive option", zap.Error(err)) } + if err := pnet.SetNoDelay(conn, true); err != nil { + logger.Warn("failed to set tcp no delay option", zap.Error(err)) + } clientConn.Run(ctx) }