Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ graceful-close-conn-timeout = 15

# It's a tradeoff between memory and performance.
# possible values:
# 0 => default value
# 0 => default value (16K)
# 1K to 16M
# conn-buffer-size = 0

Expand Down
2 changes: 1 addition & 1 deletion lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var testProxyConfig = Config{
FrontendKeepalive: KeepAlive{Enabled: true},
ProxyProtocol: "v2",
GracefulWaitBeforeShutdown: 10,
ConnBufferSize: 32 * 1024,
ConnBufferSize: 16 * 1024,
},
},
API: API{
Expand Down
40 changes: 32 additions & 8 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
TickerInterval = 5 * time.Second
// DefaultPausedConnTimeout is the maximum duration a paused connection can last before being closed.
DefaultPausedConnTimeout = 8 * time.Hour
// SlowCmdThreshold is the threshold for logging slow proxy command processing.
SlowCmdThreshold = 100 * time.Millisecond
)

const (
Expand Down Expand Up @@ -316,8 +318,17 @@ func (mgr *BackendConnManager) getBackendIO(ctx context.Context, cctx ConnContex
// ExecuteCmd forwards messages between the client and the backend.
// If it finds that the session is ready for redirection, it migrates the session.
func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (err error) {
lockStart := time.Now()
startWall := time.Now()
startTime := monotime.Now()
mgr.processLock.Lock()
lockWait := time.Since(lockStart)
var (
cmd pnet.Command
waitingRedirect bool
holdRequest bool
resumeTriggered bool
)
defer func() {
mgr.SetQuitSourceByErr(err)
mgr.handshakeHandler.OnTraffic(mgr)
Expand All @@ -338,26 +349,41 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (
}
mgr.lastActiveTime = now
mgr.processLock.Unlock()
totalElapsed := time.Since(startWall)
if lockWait >= SlowCmdThreshold || totalElapsed >= SlowCmdThreshold {
fields := []zap.Field{
zap.Stringer("cmd", cmd),
zap.Duration("lock_wait", lockWait),
zap.Duration("duration", totalElapsed),
zap.Bool("resume", resumeTriggered),
zap.Bool("waiting_redirect", waitingRedirect),
zap.Bool("hold_request", holdRequest),
}
if backendAddr := mgr.ServerAddr(); backendAddr != "" {
fields = append(fields, zap.String("backend_addr", backendAddr))
}
mgr.logger.Info("slow execute cmd", fields...)
}
}()
if len(request) < 1 {
err = mysql.ErrMalformPacket
return
}
cmd := pnet.Command(request[0])
cmd = pnet.Command(request[0])

// Once the request is accepted, it's treated in the transaction, so we don't check graceful shutdown here.
if mgr.closeStatus.Load() >= statusClosing {
return
}
if mgr.backendIO.Load() == nil {
resumeTriggered = true
err = mgr.resume(ctx)
if err != nil {
mgr.logger.Error("resume failed", zap.Error(err))
return
}
}
waitingRedirect := mgr.redirectInfo.Load() != nil
var holdRequest bool
waitingRedirect = mgr.redirectInfo.Load() != nil
backendIO := mgr.backendIO.Load()
holdRequest, err = mgr.cmdProcessor.executeCmd(request, mgr.clientIO, backendIO, waitingRedirect)
if !holdRequest {
Expand Down Expand Up @@ -485,9 +511,9 @@ func (mgr *BackendConnManager) processSignals(ctx context.Context) {
mgr.notifyRedirectResult(ctx, rs)
case <-checkBackendTicker.C:
func() {
mgr.checkBackendActive()
mgr.processLock.Lock()
defer mgr.processLock.Unlock()
mgr.checkBackendActiveLocked()
mgr.setKeepAlive()
}()
case <-ctx.Done():
Expand Down Expand Up @@ -664,10 +690,8 @@ func (mgr *BackendConnManager) tryGracefulClose(ctx context.Context) {
mgr.closeStatus.CompareAndSwap(statusNotifyClose, statusClosing)
}

func (mgr *BackendConnManager) checkBackendActive() {
mgr.processLock.Lock()
defer mgr.processLock.Unlock()

// checkBackendActiveLocked assumes processLock is already held.
func (mgr *BackendConnManager) checkBackendActiveLocked() {
if mgr.closeStatus.Load() >= statusNotifyClose {
return
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/proxy/client/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"go.uber.org/zap"
)

const frontendSlowPathThreshold = 100 * time.Millisecond

type ClientConnection struct {
logger *zap.Logger
frontendTLSConfig *tls.Config // the TLS config to connect to clients.
Expand All @@ -28,8 +30,10 @@ func NewClientConnection(logger *zap.Logger, conn net.Conn, frontendTLSConfig *t
hsHandler backend.HandshakeHandler, connID uint64, addr string, frontendReadTimeout int, bcConfig *backend.BCConfig) *ClientConnection {
bemgr := backend.NewBackendConnManager(logger.Named("be"), hsHandler, connID, bcConfig)
bemgr.SetValue(backend.ConnContextKeyConnAddr, addr)
opts := make([]pnet.PacketIOption, 0, 2)
opts := make([]pnet.PacketIOption, 0, 4)
opts = append(opts, pnet.WithWrapError(backend.ErrClientConn))
opts = append(opts, pnet.WithQuickAck())
opts = append(opts, pnet.WithSlowLog("frontend", frontendSlowPathThreshold))
if bcConfig.ProxyProtocol {
opts = append(opts, pnet.WithProxy)
}
Expand Down Expand Up @@ -73,16 +77,28 @@ clean:
func (cc *ClientConnection) processMsg(ctx context.Context) error {
for {
cc.pkt.ResetSequence()
readStart := time.Now()
clientPkt, err := cc.pkt.ReadPacket()
readElapsed := time.Since(readStart)
if err != nil {
cc.connMgr.SetValue(backend.ConnContextKeyClientError, err)
cc.connMgr.SetQuitSourceByErr(err)
return err
}
execStart := time.Now()
err = cc.connMgr.ExecuteCmd(ctx, clientPkt)
execElapsed := time.Since(execStart)
if err != nil {
return err
}
totalElapsed := readElapsed + execElapsed
if totalElapsed >= frontendSlowPathThreshold {
cc.logger.Info("slow frontend command",
zap.Stringer("cmd", pnet.Command(clientPkt[0])),
zap.Duration("read_packet_elapsed", readElapsed),
zap.Duration("execute_cmd_elapsed", execElapsed),
zap.Duration("total_elapsed", totalElapsed))
}
if pnet.Command(clientPkt[0]) == pnet.ComQuit {
return nil
}
Expand Down
54 changes: 52 additions & 2 deletions pkg/proxy/net/packetio.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ var (
ErrInvalidSequence = dbterror.ClassServer.NewStd(errno.ErrInvalidSequence)
)

var requestQuickAck = setQuickAck

const (
DefaultConnBufferSize = 32 * 1024
DefaultConnBufferSize = 16 * 1024
)

type rwStatus int
Expand Down Expand Up @@ -198,6 +200,9 @@ type PacketIO struct {
logger *zap.Logger
remoteAddr net.Addr
wrap error
quickAck bool
slowLogRole string
slowThreshold time.Duration
header [4]byte // reuse memory to reduce allocation
inPackets uint64
outPackets uint64
Expand Down Expand Up @@ -277,6 +282,10 @@ func (p *PacketIO) readOnePacket() ([]byte, bool, error) {

// ReadPacket reads data and removes the header
func (p *PacketIO) ReadPacket() (data []byte, err error) {
start := time.Now()
defer func() {
p.logSlowIO("read_packet", start, len(data), err)
}()
p.readWriter.BeginRW(rwRead)
for more := true; more; {
var buf []byte
Expand Down Expand Up @@ -420,9 +429,14 @@ func (p *PacketIO) OutPackets() uint64 {
}

func (p *PacketIO) Flush() error {
start := time.Now()
if err := p.readWriter.Flush(); err != nil {
return p.wrapErr(errors.Wrap(ErrFlushConn, errors.WithStack(err)))
err = p.wrapErr(errors.Wrap(ErrFlushConn, errors.WithStack(err)))
p.logSlowIO("flush", start, 0, err)
return err
}
p.requestQuickAck()
p.logSlowIO("flush", start, 0, nil)
return nil
}

Expand All @@ -443,6 +457,42 @@ func (p *PacketIO) LastKeepAlive() config.KeepAlive {
return p.lastKeepAlive
}

func (p *PacketIO) requestQuickAck() {
if !p.quickAck {
return
}
if err := requestQuickAck(p.rawConn); err != nil && p.logger != nil {
p.logger.Debug("failed to request tcp quickack", zap.Error(err))
}
}

func (p *PacketIO) logSlowIO(op string, start time.Time, bytes int, err error) {
if p.logger == nil || p.slowThreshold <= 0 {
return
}
elapsed := time.Since(start)
if elapsed < p.slowThreshold {
return
}
fields := []zap.Field{
zap.String("op", op),
zap.Duration("duration", elapsed),
}
if p.slowLogRole != "" {
fields = append(fields, zap.String("role", p.slowLogRole))
}
if bytes > 0 {
fields = append(fields, zap.Int("bytes", bytes))
}
if remoteAddr := p.RemoteAddr(); remoteAddr != nil {
fields = append(fields, zap.Stringer("remote_addr", remoteAddr))
}
if err != nil {
fields = append(fields, zap.Error(err))
}
p.logger.Info("slow packet io", fields...)
}

func (p *PacketIO) GracefulClose() error {
if err := p.readWriter.SetDeadline(time.Now()); err != nil && !errors.Is(err, net.ErrClosed) {
return err
Expand Down
13 changes: 13 additions & 0 deletions pkg/proxy/net/packetio_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,16 @@ func WithReadTimeout(d time.Duration) func(pi *PacketIO) {
pi.readTimeout = d
}
}

func WithQuickAck() func(pi *PacketIO) {
return func(pi *PacketIO) {
pi.quickAck = true
}
}

func WithSlowLog(role string, threshold time.Duration) func(pi *PacketIO) {
return func(pi *PacketIO) {
pi.slowLogRole = role
pi.slowThreshold = threshold
}
}
45 changes: 45 additions & 0 deletions pkg/proxy/net/packetio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package net
import (
"encoding/binary"
"net"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -258,6 +259,50 @@ func TestKeepAlive(t *testing.T) {
)
}

func TestQuickAckRequestedAfterFlush(t *testing.T) {
lg, _ := logger.CreateLoggerForTest(t)
cliConn, srvConn := net.Pipe()
defer func() {
require.NoError(t, cliConn.Close())
require.NoError(t, srvConn.Close())
}()

oldRequester := requestQuickAck
defer func() {
requestQuickAck = oldRequester
}()
var quickAckCalls atomic.Int32
requestQuickAck = func(net.Conn) error {
quickAckCalls.Add(1)
return nil
}

cli := NewPacketIO(cliConn, lg, DefaultConnBufferSize, WithQuickAck())
srv := NewPacketIO(srvConn, lg, DefaultConnBufferSize)

readDone := make(chan struct{})
go func() {
defer close(readDone)
data, err := srv.ReadPacket()
require.NoError(t, err)
require.Equal(t, []byte("hello"), data)
}()
require.NoError(t, cli.WritePacket([]byte("hello"), true))
<-readDone
require.EqualValues(t, 1, quickAckCalls.Load())

writeDone := make(chan struct{})
go func() {
defer close(writeDone)
require.NoError(t, srv.WritePacket([]byte("world"), true))
}()
data, err := cli.ReadPacket()
require.NoError(t, err)
require.Equal(t, []byte("world"), data)
<-writeDone
require.EqualValues(t, 1, quickAckCalls.Load())
}

func TestPredefinedPacket(t *testing.T) {
testTCPConn(t,
func(t *testing.T, cli *PacketIO) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/proxy/net/quickack_default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2023 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

//go:build !linux

package net

import "net"

func setQuickAck(net.Conn) error {
return nil
}
30 changes: 30 additions & 0 deletions pkg/proxy/net/quickack_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2023 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package net

import (
"net"

"golang.org/x/sys/unix"
)

func setQuickAck(conn net.Conn) error {
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
return nil
}

rawConn, err := tcpConn.SyscallConn()
if err != nil {
return err
}

var setErr error
if err = rawConn.Control(func(fd uintptr) {
setErr = unix.SetsockoptInt(int(fd), unix.IPPROTO_TCP, unix.TCP_QUICKACK, 1)
}); err != nil {
return err
}
return setErr
}
Loading