Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 37 additions & 34 deletions thrift_streaming/client/demo_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"time"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/client/streamclient"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/endpoint/cep"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/streaming"
"github.com/cloudwego/kitex/pkg/transmeta"
Expand All @@ -36,12 +36,12 @@ import (
)

var (
streamClient = testservice.MustNewStreamClient(
streamClient = testservice.MustNewClient(
"server_name_for_discovery",
streamclient.WithHostPorts("127.0.0.1:8888"),
client.WithHostPorts("127.0.0.1:8888"),

// client middleware
streamclient.WithMiddleware(func(e endpoint.Endpoint) endpoint.Endpoint {
client.WithMiddleware(func(e endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
method, _ := kitexutil.GetMethod(ctx)
klog.Infof("[%s] streamclient middleware, req = %v", method, req)
Expand All @@ -52,25 +52,27 @@ var (
}),

// send middleware
streamclient.WithSendMiddleware(func(next endpoint.SendEndpoint) endpoint.SendEndpoint {
return func(stream streaming.Stream, req interface{}) (err error) {
method, _ := kitexutil.GetMethod(stream.Context())
err = next(stream, req)
klog.Infof("[%s] streamclient send middleware, err = %v, req = %v", method, err, req)
return err
}
}),

// recv middleware
// NOTE: message (response from server) will NOT be available until `next` returns
streamclient.WithRecvMiddleware(func(next endpoint.RecvEndpoint) endpoint.RecvEndpoint {
return func(stream streaming.Stream, resp interface{}) (err error) {
method, _ := kitexutil.GetMethod(stream.Context())
err = next(stream, resp)
klog.Infof("[%s] streamclient recv middleware, err = %v, resp = %v", method, err, resp)
return err
}
}),
client.WithStreamOptions(
client.WithStreamSendMiddleware(func(next cep.StreamSendEndpoint) cep.StreamSendEndpoint {
return func(ctx context.Context, stream streaming.ClientStream, req interface{}) (err error) {
method, _ := kitexutil.GetMethod(stream.Context())
err = next(ctx, stream, req)
klog.Infof("[%s] streamclient send middleware, err = %v, req = %v", method, err, req)
return err
}
}),

// recv middleware
// NOTE: message (response from server) will NOT be available until `next` returns
client.WithStreamRecvMiddleware(func(next cep.StreamRecvEndpoint) cep.StreamRecvEndpoint {
return func(ctx context.Context, stream streaming.ClientStream, resp interface{}) (err error) {
method, _ := kitexutil.GetMethod(stream.Context())
err = next(ctx, stream, resp)
klog.Infof("[%s] streamclient recv middleware, err = %v, resp = %v", method, err, resp)
return err
}
}),
),
)

pingPongClient = testservice.MustNewClient(
Expand Down Expand Up @@ -105,7 +107,7 @@ func echoPingPong(cli testservice.Client) {
klog.Infof("echoPingPong: rsp = %v", rsp)
}

func echoUnary(cli testservice.StreamClient) {
func echoUnary(cli testservice.Client) {
ctx := context.Background()
req := &test.Request{Message: "hello"}
rsp, err := cli.EchoUnary(ctx, req)
Expand All @@ -116,7 +118,7 @@ func echoUnary(cli testservice.StreamClient) {
klog.Infof("echoPingPong: rsp = %v", rsp)
}

func echo(cli testservice.StreamClient) {
func echo(cli testservice.Client) {
ctx := context.Background()
stream, err := cli.Echo(ctx)
if err != nil {
Expand All @@ -128,10 +130,10 @@ func echo(cli testservice.StreamClient) {
// Send
go func() {
defer wg.Done()
defer stream.Close() // Tell the server there'll be no more message from client
defer stream.CloseSend(stream.Context()) // Tell the server there'll be no more message from client
for i := 0; i < 3; i++ {
req := &test.Request{Message: "client, " + strconv.Itoa(i)}
if err = stream.Send(req); err != nil {
if err = stream.Send(ctx, req); err != nil {
klog.Warnf("echo.send: failed, err = " + err.Error())
break
}
Expand All @@ -143,7 +145,7 @@ func echo(cli testservice.StreamClient) {
go func() {
defer wg.Done()
for {
resp, err := stream.Recv()
resp, err := stream.Recv(ctx)
// make sure you receive and io.EOF or other non-nil error
// otherwise RPCFinish event will not be recorded
if err == io.EOF {
Expand All @@ -159,37 +161,38 @@ func echo(cli testservice.StreamClient) {
wg.Wait()
}

func echoClient(cli testservice.StreamClient) {
stream, err := cli.EchoClient(context.Background())
func echoClient(cli testservice.Client) {
ctx := context.Background()
stream, err := cli.EchoClient(ctx)
if err != nil {
panic("failed to call Echo: " + err.Error())
}
for i := 0; i < 3; i++ {
req := &test.Request{Message: "hello, " + strconv.Itoa(i)}
err := stream.Send(req)
err := stream.Send(stream.Context(), req)
if err != nil {
panic("failed to send Echo: " + err.Error())
}
klog.Infof("sent: %+v", req)
}

// Recv
resp, err := stream.CloseAndRecv()
resp, err := stream.CloseAndRecv(ctx)
if err != nil {
klog.Warnf("failed to recv Echo: " + err.Error())
} else {
klog.Infof("recv: %+v", resp)
}
}

func echoServer(cli testservice.StreamClient) {
func echoServer(cli testservice.Client) {
req := &test.Request{Message: "hello"}
stream, err := cli.EchoServer(context.Background(), req)
if err != nil {
panic("failed to call Echo: " + err.Error())
}
for {
resp, err := stream.Recv()
resp, err := stream.Recv(stream.Context())
// make sure you receive and io.EOF or other non-nil error
// otherwise RPCFinish event will not be recorded
if err == io.EOF {
Expand Down
2 changes: 1 addition & 1 deletion thrift_streaming/generate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ echo "kitex: using `which $kitex` $kitex_version"
echo -e "\nMake sure you're using kitex >= v0.9.0 && thriftgo >= v0.3.6\n"

set -x
$kitex $verbose $module $service $idl
$kitex -streamx $verbose $module $service $idl
16 changes: 8 additions & 8 deletions thrift_streaming/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
type TestServiceImpl struct{}

// Echo is bidirectional streaming
func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) {
func (s *TestServiceImpl) Echo(ctx context.Context, stream echo.TestService_EchoServer) (err error) {
// no need to call `stream.Close()` manually
wg := sync.WaitGroup{}
wg.Add(2)
Expand All @@ -44,7 +44,7 @@ func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) {
wg.Done()
}()
for {
msg, recvErr := stream.Recv()
msg, recvErr := stream.Recv(ctx)
// make sure you receive and io.EOF or other non-nil error
// otherwise RPCFinish event will not be recorded
if recvErr == io.EOF {
Expand All @@ -67,7 +67,7 @@ func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) {
}()
for i := 0; i < 3; i++ {
msg := &echo.Response{Message: "server, " + strconv.Itoa(i)}
if sendErr := stream.Send(msg); sendErr != nil {
if sendErr := stream.Send(ctx, msg); sendErr != nil {
err = sendErr
return
}
Expand All @@ -79,23 +79,23 @@ func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) {
}

// EchoClient is client streaming
func (s *TestServiceImpl) EchoClient(stream echo.TestService_EchoClientServer) (err error) {
func (s *TestServiceImpl) EchoClient(ctx context.Context, stream echo.TestService_EchoClientServer) (err error) {
for i := 0; i < 3; i++ {
msg, err := stream.Recv()
msg, err := stream.Recv(ctx)
if err != nil {
return err
}
klog.Infof("EchoClient: recv message = %s", msg)
}
return stream.SendAndClose(&echo.Response{Message: "echoClient"})
return stream.SendAndClose(ctx, &echo.Response{Message: "echoClient"})
}

// EchoServer is server streaming
func (s *TestServiceImpl) EchoServer(req *echo.Request, stream echo.TestService_EchoServerServer) (err error) {
func (s *TestServiceImpl) EchoServer(ctx context.Context, req *echo.Request, stream echo.TestService_EchoServerServer) (err error) {
klog.Infof("EchoServer called, req = %+v", req)
for i := 0; i < 3; i++ {
msg := &echo.Response{Message: "server, " + strconv.Itoa(i)}
if err = stream.Send(msg); err != nil {
if err = stream.Send(ctx, msg); err != nil {
return
}
klog.Infof("EchoServer: sent message = %s", msg)
Expand Down
Loading
Loading