Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions internal/tmpbbs/pullpeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
)

type pullPeer struct {
Expand All @@ -28,6 +29,8 @@ type pullPeer struct {
maxResults int32
}

const minClientTimeout = 15 * time.Second

func newPullPeer(address string, caCertPool *x509.CertPool, interval time.Duration,
postStore *PostStore,
) (*pullPeer, error) {
Expand All @@ -46,7 +49,14 @@ func newPullPeer(address string, caCertPool *x509.CertPool, interval time.Durati
creds = insecure.NewCredentials()
}

clientConn, err := grpc.NewClient(address, grpc.WithTransportCredentials(creds))
clientConn, err := grpc.NewClient(address,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: grpcKeepAliveTime,
Timeout: grpcKeepAliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithTransportCredentials(creds),
)
if err != nil {
return nil, err
}
Expand All @@ -72,10 +82,13 @@ func (pp *pullPeer) run(initialWait time.Duration) {
}

func (pp *pullPeer) sync(ctx context.Context) int {
response, err := pp.client.Get(ctx, &proto.PostSyncRequest{Id: pp.lastIDSynced, MaxResults: pp.maxResults},
clientCtx, cancel := context.WithTimeout(ctx, max(pp.interval-5*time.Second, minClientTimeout))
defer cancel()

response, err := pp.client.Get(clientCtx, &proto.PostSyncRequest{Id: pp.lastIDSynced, MaxResults: pp.maxResults},
grpc.UseCompressor(gzip.Name))
if err != nil {
pp.logger.ErrorContext(ctx, err.Error())
pp.logger.ErrorContext(ctx, "peer sync failed", "error", err)

return 0
}
Expand Down
21 changes: 19 additions & 2 deletions internal/tmpbbs/servegrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,18 @@ import (
"crypto/tls"
"log/slog"
"net"
"time"

"github.com/mmb/tmpbbs/internal/tmpbbs/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)

const (
grpcKeepAliveTime = 25 * time.Second
grpcKeepAliveTimeout = 10 * time.Second
grpcServerKeepAliveMinTime = 20 * time.Second
)

// ServeGRPC creates and configures a grpc.Server then starts listening.
Expand All @@ -25,6 +33,15 @@ func ServeGRPC(ctx context.Context, listenAddress string, tlsCertFile string, tl

var grpcServer *grpc.Server

keepAliveEnforcementPolicy := grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: grpcServerKeepAliveMinTime,
PermitWithoutStream: true,
})
keepAliveParams := grpc.KeepaliveParams(keepalive.ServerParameters{
Time: grpcKeepAliveTime,
Timeout: grpcKeepAliveTimeout,
})

if tlsCertFile != "" && tlsKeyFile != "" {
var certificate tls.Certificate

Expand All @@ -38,9 +55,9 @@ func ServeGRPC(ctx context.Context, listenAddress string, tlsCertFile string, tl
ClientAuth: tls.NoClientCert,
MinVersion: tls.VersionTLS13,
}
grpcServer = grpc.NewServer(grpc.Creds(credentials.NewTLS(config)))
grpcServer = grpc.NewServer(keepAliveEnforcementPolicy, keepAliveParams, grpc.Creds(credentials.NewTLS(config)))
} else {
grpcServer = grpc.NewServer()
grpcServer = grpc.NewServer(keepAliveEnforcementPolicy, keepAliveParams)
}

proto.RegisterPostSyncServer(grpcServer, postSyncServer)
Expand Down
Loading