Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions proxy/server/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"math/rand"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -232,7 +233,7 @@ func (s *TargetStream) Run(nonce uint32, replyChan chan *pb.ProxyReply) {
for {
msg := s.serviceMethod.NewReply()
err := grpcStream.RecvMsg(msg)
if err == io.EOF {
if isEOF(err) {
return nil
}
if err != nil {
Expand Down Expand Up @@ -321,7 +322,7 @@ func (s *TargetStream) Run(nonce uint32, replyChan chan *pb.ProxyReply) {
// would cancel the errgroup early. Instead, we
// can return nil, and the error will be picked
// up by the receiving goroutine
if err == io.EOF {
if isEOF(err) {
return nil
}
// Otherwise, this is the 'final' error. The underlying
Expand Down Expand Up @@ -679,3 +680,13 @@ func (u *unconnectedClientStream) SendMsg(interface{}) error {
func (u *unconnectedClientStream) RecvMsg(interface{}) error {
return fmt.Errorf("%w: RecvMsg", errUnconnectedClient)
}

func isEOF(err error) bool {
// grpc-go 1.75 and older
if err == io.EOF {
return true
}

// grpc-go 1.76+
return status.Code(err) == codes.Internal && strings.HasSuffix(err.Error(), "cardinality violation: received no response message from non-server-streaming RPC")
}
Loading