From f02f2b180a90e13a0438fcddb41984d878737642 Mon Sep 17 00:00:00 2001 From: rogerogers Date: Wed, 11 Feb 2026 16:13:54 +0800 Subject: [PATCH 1/2] fix: thrift_streaming deprecated Signed-off-by: rogerogers --- thrift_streaming/client/demo_client.go | 70 ++--- thrift_streaming/generate.sh | 2 +- thrift_streaming/handler.go | 16 +- thrift_streaming/kitex_gen/echo/api.go | 271 +----------------- thrift_streaming/kitex_gen/echo/k-api.go | 2 +- .../kitex_gen/echo/testservice/client.go | 84 ++---- .../kitex_gen/echo/testservice/server.go | 2 +- .../kitex_gen/echo/testservice/testservice.go | 216 +++----------- thrift_streaming/main.go | 41 +-- 9 files changed, 135 insertions(+), 569 deletions(-) diff --git a/thrift_streaming/client/demo_client.go b/thrift_streaming/client/demo_client.go index 1689e879..8977f44f 100644 --- a/thrift_streaming/client/demo_client.go +++ b/thrift_streaming/client/demo_client.go @@ -23,8 +23,8 @@ import ( "time" "github.com/cloudwego/kitex/client" - "github.com/cloudwego/kitex/client/streamclient" "github.com/cloudwego/kitex/pkg/endpoint" + "github.com/cloudwego/kitex/pkg/endpoint/cep" "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/streaming" "github.com/cloudwego/kitex/pkg/transmeta" @@ -36,12 +36,12 @@ import ( ) var ( - streamClient = testservice.MustNewStreamClient( + streamClient = testservice.MustNewClient( "server_name_for_discovery", - streamclient.WithHostPorts("127.0.0.1:8888"), + client.WithHostPorts("127.0.0.1:8888"), // client middleware - streamclient.WithMiddleware(func(e endpoint.Endpoint) endpoint.Endpoint { + client.WithMiddleware(func(e endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, req, resp interface{}) (err error) { method, _ := kitexutil.GetMethod(ctx) klog.Infof("[%s] streamclient middleware, req = %v", method, req) @@ -52,25 +52,27 @@ var ( }), // send middleware - streamclient.WithSendMiddleware(func(next endpoint.SendEndpoint) endpoint.SendEndpoint { - return func(stream streaming.Stream, req interface{}) (err error) { - method, _ := kitexutil.GetMethod(stream.Context()) - err = next(stream, req) - klog.Infof("[%s] streamclient send middleware, err = %v, req = %v", method, err, req) - return err - } - }), - - // recv middleware - // NOTE: message (response from server) will NOT be available until `next` returns - streamclient.WithRecvMiddleware(func(next endpoint.RecvEndpoint) endpoint.RecvEndpoint { - return func(stream streaming.Stream, resp interface{}) (err error) { - method, _ := kitexutil.GetMethod(stream.Context()) - err = next(stream, resp) - klog.Infof("[%s] streamclient recv middleware, err = %v, resp = %v", method, err, resp) - return err - } - }), + client.WithStreamOptions( + client.WithStreamSendMiddleware(func(next cep.StreamSendEndpoint) cep.StreamSendEndpoint { + return func(ctx context.Context, stream streaming.ClientStream, req interface{}) (err error) { + method, _ := kitexutil.GetMethod(stream.Context()) + err = next(ctx, stream, req) + klog.Infof("[%s] streamclient send middleware, err = %v, req = %v", method, err, req) + return err + } + }), + + // recv middleware + // NOTE: message (response from server) will NOT be available until `next` returns + client.WithStreamRecvMiddleware(func(next cep.StreamRecvEndpoint) cep.StreamRecvEndpoint { + return func(ctx context.Context, stream streaming.ClientStream, resp interface{}) (err error) { + method, _ := kitexutil.GetMethod(stream.Context()) + err = next(ctx, stream, resp) + klog.Infof("[%s] streamclient recv middleware, err = %v, resp = %v", method, err, resp) + return err + } + }), + ), ) pingPongClient = testservice.MustNewClient( @@ -105,7 +107,7 @@ func echoPingPong(cli testservice.Client) { klog.Infof("echoPingPong: rsp = %v", rsp) } -func echoUnary(cli testservice.StreamClient) { +func echoUnary(cli testservice.Client) { ctx := context.Background() req := &test.Request{Message: "hello"} rsp, err := cli.EchoUnary(ctx, req) @@ -116,7 +118,7 @@ func echoUnary(cli testservice.StreamClient) { klog.Infof("echoPingPong: rsp = %v", rsp) } -func echo(cli testservice.StreamClient) { +func echo(cli testservice.Client) { ctx := context.Background() stream, err := cli.Echo(ctx) if err != nil { @@ -128,10 +130,10 @@ func echo(cli testservice.StreamClient) { // Send go func() { defer wg.Done() - defer stream.Close() // Tell the server there'll be no more message from client + defer stream.CloseSend(ctx) // Tell the server there'll be no more message from client for i := 0; i < 3; i++ { req := &test.Request{Message: "client, " + strconv.Itoa(i)} - if err = stream.Send(req); err != nil { + if err = stream.Send(ctx, req); err != nil { klog.Warnf("echo.send: failed, err = " + err.Error()) break } @@ -143,7 +145,7 @@ func echo(cli testservice.StreamClient) { go func() { defer wg.Done() for { - resp, err := stream.Recv() + resp, err := stream.Recv(ctx) // make sure you receive and io.EOF or other non-nil error // otherwise RPCFinish event will not be recorded if err == io.EOF { @@ -159,14 +161,15 @@ func echo(cli testservice.StreamClient) { wg.Wait() } -func echoClient(cli testservice.StreamClient) { +func echoClient(cli testservice.Client) { + ctx := context.Background() stream, err := cli.EchoClient(context.Background()) if err != nil { panic("failed to call Echo: " + err.Error()) } for i := 0; i < 3; i++ { req := &test.Request{Message: "hello, " + strconv.Itoa(i)} - err := stream.Send(req) + err := stream.Send(ctx, req) if err != nil { panic("failed to send Echo: " + err.Error()) } @@ -174,7 +177,7 @@ func echoClient(cli testservice.StreamClient) { } // Recv - resp, err := stream.CloseAndRecv() + resp, err := stream.CloseAndRecv(ctx) if err != nil { klog.Warnf("failed to recv Echo: " + err.Error()) } else { @@ -182,14 +185,15 @@ func echoClient(cli testservice.StreamClient) { } } -func echoServer(cli testservice.StreamClient) { +func echoServer(cli testservice.Client) { + ctx := context.Background() req := &test.Request{Message: "hello"} stream, err := cli.EchoServer(context.Background(), req) if err != nil { panic("failed to call Echo: " + err.Error()) } for { - resp, err := stream.Recv() + resp, err := stream.Recv(ctx) // make sure you receive and io.EOF or other non-nil error // otherwise RPCFinish event will not be recorded if err == io.EOF { diff --git a/thrift_streaming/generate.sh b/thrift_streaming/generate.sh index 6e672217..eeb588d9 100755 --- a/thrift_streaming/generate.sh +++ b/thrift_streaming/generate.sh @@ -31,4 +31,4 @@ echo "kitex: using `which $kitex` $kitex_version" echo -e "\nMake sure you're using kitex >= v0.9.0 && thriftgo >= v0.3.6\n" set -x -$kitex $verbose $module $service $idl +$kitex -streamx $verbose $module $service $idl diff --git a/thrift_streaming/handler.go b/thrift_streaming/handler.go index 5dbcbf5c..a1fd8686 100644 --- a/thrift_streaming/handler.go +++ b/thrift_streaming/handler.go @@ -31,7 +31,7 @@ import ( type TestServiceImpl struct{} // Echo is bidirectional streaming -func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) { +func (s *TestServiceImpl) Echo(ctx context.Context, stream echo.TestService_EchoServer) (err error) { // no need to call `stream.Close()` manually wg := sync.WaitGroup{} wg.Add(2) @@ -44,7 +44,7 @@ func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) { wg.Done() }() for { - msg, recvErr := stream.Recv() + msg, recvErr := stream.Recv(ctx) // make sure you receive and io.EOF or other non-nil error // otherwise RPCFinish event will not be recorded if recvErr == io.EOF { @@ -67,7 +67,7 @@ func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) { }() for i := 0; i < 3; i++ { msg := &echo.Response{Message: "server, " + strconv.Itoa(i)} - if sendErr := stream.Send(msg); sendErr != nil { + if sendErr := stream.Send(ctx, msg); sendErr != nil { err = sendErr return } @@ -79,23 +79,23 @@ func (s *TestServiceImpl) Echo(stream echo.TestService_EchoServer) (err error) { } // EchoClient is client streaming -func (s *TestServiceImpl) EchoClient(stream echo.TestService_EchoClientServer) (err error) { +func (s *TestServiceImpl) EchoClient(ctx context.Context, stream echo.TestService_EchoClientServer) (err error) { for i := 0; i < 3; i++ { - msg, err := stream.Recv() + msg, err := stream.Recv(ctx) if err != nil { return err } klog.Infof("EchoClient: recv message = %s", msg) } - return stream.SendAndClose(&echo.Response{Message: "echoClient"}) + return stream.SendAndClose(ctx, &echo.Response{Message: "echoClient"}) } // EchoServer is server streaming -func (s *TestServiceImpl) EchoServer(req *echo.Request, stream echo.TestService_EchoServerServer) (err error) { +func (s *TestServiceImpl) EchoServer(ctx context.Context, req *echo.Request, stream echo.TestService_EchoServerServer) (err error) { klog.Infof("EchoServer called, req = %+v", req) for i := 0; i < 3; i++ { msg := &echo.Response{Message: "server, " + strconv.Itoa(i)} - if err = stream.Send(msg); err != nil { + if err = stream.Send(ctx, msg); err != nil { return } klog.Infof("EchoServer: sent message = %s", msg) diff --git a/thrift_streaming/kitex_gen/echo/api.go b/thrift_streaming/kitex_gen/echo/api.go index d4fd5cf8..2b990fa6 100644 --- a/thrift_streaming/kitex_gen/echo/api.go +++ b/thrift_streaming/kitex_gen/echo/api.go @@ -1,4 +1,4 @@ -// Code generated by thriftgo (0.3.18). DO NOT EDIT. +// Code generated by thriftgo (0.4.3). DO NOT EDIT. package echo @@ -6,7 +6,6 @@ import ( "context" "fmt" "github.com/cloudwego/kitex/pkg/streaming" - "strings" ) type Request struct { @@ -34,26 +33,6 @@ func (p *Request) String() string { return fmt.Sprintf("Request(%+v)", *p) } -func (p *Request) DeepEqual(ano *Request) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field1DeepEqual(ano.Message) { - return false - } - return true -} - -func (p *Request) Field1DeepEqual(src string) bool { - - if strings.Compare(p.Message, src) != 0 { - return false - } - return true -} - var fieldIDToName_Request = map[int16]string{ 1: "message", } @@ -83,36 +62,16 @@ func (p *Response) String() string { return fmt.Sprintf("Response(%+v)", *p) } -func (p *Response) DeepEqual(ano *Response) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field1DeepEqual(ano.Message) { - return false - } - return true -} - -func (p *Response) Field1DeepEqual(src string) bool { - - if strings.Compare(p.Message, src) != 0 { - return false - } - return true -} - var fieldIDToName_Response = map[int16]string{ 1: "message", } type TestService interface { - Echo(stream TestService_EchoServer) (err error) + Echo(ctx context.Context, stream TestService_EchoServer) (err error) - EchoClient(stream TestService_EchoClientServer) (err error) + EchoClient(ctx context.Context, stream TestService_EchoClientServer) (err error) - EchoServer(req *Request, stream TestService_EchoServerServer) (err error) + EchoServer(ctx context.Context, req *Request, stream TestService_EchoServerServer) (err error) EchoUnary(ctx context.Context, req *Request) (r *Response, err error) @@ -153,26 +112,6 @@ func (p *TestServiceEchoArgs) String() string { return fmt.Sprintf("TestServiceEchoArgs(%+v)", *p) } -func (p *TestServiceEchoArgs) DeepEqual(ano *TestServiceEchoArgs) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field1DeepEqual(ano.Req) { - return false - } - return true -} - -func (p *TestServiceEchoArgs) Field1DeepEqual(src *Request) bool { - - if !p.Req.DeepEqual(src) { - return false - } - return true -} - var fieldIDToName_TestServiceEchoArgs = map[int16]string{ 1: "req", } @@ -211,37 +150,11 @@ func (p *TestServiceEchoResult) String() string { return fmt.Sprintf("TestServiceEchoResult(%+v)", *p) } -func (p *TestServiceEchoResult) DeepEqual(ano *TestServiceEchoResult) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field0DeepEqual(ano.Success) { - return false - } - return true -} - -func (p *TestServiceEchoResult) Field0DeepEqual(src *Response) bool { - - if !p.Success.DeepEqual(src) { - return false - } - return true -} - var fieldIDToName_TestServiceEchoResult = map[int16]string{ 0: "success", } -type TestService_EchoServer interface { - streaming.Stream - - Recv() (*Request, error) - - Send(*Response) error -} +type TestService_EchoServer streaming.BidiStreamingServer[Request, Response] type TestServiceEchoClientArgs struct { Req *Request `thrift:"req,1" frugal:"1,default,Request" json:"req"` @@ -277,26 +190,6 @@ func (p *TestServiceEchoClientArgs) String() string { return fmt.Sprintf("TestServiceEchoClientArgs(%+v)", *p) } -func (p *TestServiceEchoClientArgs) DeepEqual(ano *TestServiceEchoClientArgs) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field1DeepEqual(ano.Req) { - return false - } - return true -} - -func (p *TestServiceEchoClientArgs) Field1DeepEqual(src *Request) bool { - - if !p.Req.DeepEqual(src) { - return false - } - return true -} - var fieldIDToName_TestServiceEchoClientArgs = map[int16]string{ 1: "req", } @@ -335,37 +228,11 @@ func (p *TestServiceEchoClientResult) String() string { return fmt.Sprintf("TestServiceEchoClientResult(%+v)", *p) } -func (p *TestServiceEchoClientResult) DeepEqual(ano *TestServiceEchoClientResult) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field0DeepEqual(ano.Success) { - return false - } - return true -} - -func (p *TestServiceEchoClientResult) Field0DeepEqual(src *Response) bool { - - if !p.Success.DeepEqual(src) { - return false - } - return true -} - var fieldIDToName_TestServiceEchoClientResult = map[int16]string{ 0: "success", } -type TestService_EchoClientServer interface { - streaming.Stream - - Recv() (*Request, error) - - SendAndClose(*Response) error -} +type TestService_EchoClientServer streaming.ClientStreamingServer[Request, Response] type TestServiceEchoServerArgs struct { Req *Request `thrift:"req,1" frugal:"1,default,Request" json:"req"` @@ -401,26 +268,6 @@ func (p *TestServiceEchoServerArgs) String() string { return fmt.Sprintf("TestServiceEchoServerArgs(%+v)", *p) } -func (p *TestServiceEchoServerArgs) DeepEqual(ano *TestServiceEchoServerArgs) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field1DeepEqual(ano.Req) { - return false - } - return true -} - -func (p *TestServiceEchoServerArgs) Field1DeepEqual(src *Request) bool { - - if !p.Req.DeepEqual(src) { - return false - } - return true -} - var fieldIDToName_TestServiceEchoServerArgs = map[int16]string{ 1: "req", } @@ -459,35 +306,11 @@ func (p *TestServiceEchoServerResult) String() string { return fmt.Sprintf("TestServiceEchoServerResult(%+v)", *p) } -func (p *TestServiceEchoServerResult) DeepEqual(ano *TestServiceEchoServerResult) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field0DeepEqual(ano.Success) { - return false - } - return true -} - -func (p *TestServiceEchoServerResult) Field0DeepEqual(src *Response) bool { - - if !p.Success.DeepEqual(src) { - return false - } - return true -} - var fieldIDToName_TestServiceEchoServerResult = map[int16]string{ 0: "success", } -type TestService_EchoServerServer interface { - streaming.Stream - - Send(*Response) error -} +type TestService_EchoServerServer streaming.ServerStreamingServer[Response] type TestServiceEchoUnaryArgs struct { Req *Request `thrift:"req,1" frugal:"1,default,Request" json:"req"` @@ -523,26 +346,6 @@ func (p *TestServiceEchoUnaryArgs) String() string { return fmt.Sprintf("TestServiceEchoUnaryArgs(%+v)", *p) } -func (p *TestServiceEchoUnaryArgs) DeepEqual(ano *TestServiceEchoUnaryArgs) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field1DeepEqual(ano.Req) { - return false - } - return true -} - -func (p *TestServiceEchoUnaryArgs) Field1DeepEqual(src *Request) bool { - - if !p.Req.DeepEqual(src) { - return false - } - return true -} - var fieldIDToName_TestServiceEchoUnaryArgs = map[int16]string{ 1: "req", } @@ -581,26 +384,6 @@ func (p *TestServiceEchoUnaryResult) String() string { return fmt.Sprintf("TestServiceEchoUnaryResult(%+v)", *p) } -func (p *TestServiceEchoUnaryResult) DeepEqual(ano *TestServiceEchoUnaryResult) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field0DeepEqual(ano.Success) { - return false - } - return true -} - -func (p *TestServiceEchoUnaryResult) Field0DeepEqual(src *Response) bool { - - if !p.Success.DeepEqual(src) { - return false - } - return true -} - var fieldIDToName_TestServiceEchoUnaryResult = map[int16]string{ 0: "success", } @@ -639,26 +422,6 @@ func (p *TestServiceEchoPingPongArgs) String() string { return fmt.Sprintf("TestServiceEchoPingPongArgs(%+v)", *p) } -func (p *TestServiceEchoPingPongArgs) DeepEqual(ano *TestServiceEchoPingPongArgs) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field1DeepEqual(ano.Req) { - return false - } - return true -} - -func (p *TestServiceEchoPingPongArgs) Field1DeepEqual(src *Request) bool { - - if !p.Req.DeepEqual(src) { - return false - } - return true -} - var fieldIDToName_TestServiceEchoPingPongArgs = map[int16]string{ 1: "req", } @@ -697,26 +460,6 @@ func (p *TestServiceEchoPingPongResult) String() string { return fmt.Sprintf("TestServiceEchoPingPongResult(%+v)", *p) } -func (p *TestServiceEchoPingPongResult) DeepEqual(ano *TestServiceEchoPingPongResult) bool { - if p == ano { - return true - } else if p == nil || ano == nil { - return false - } - if !p.Field0DeepEqual(ano.Success) { - return false - } - return true -} - -func (p *TestServiceEchoPingPongResult) Field0DeepEqual(src *Response) bool { - - if !p.Success.DeepEqual(src) { - return false - } - return true -} - var fieldIDToName_TestServiceEchoPingPongResult = map[int16]string{ 0: "success", } diff --git a/thrift_streaming/kitex_gen/echo/k-api.go b/thrift_streaming/kitex_gen/echo/k-api.go index 102e3fa1..a046d43a 100644 --- a/thrift_streaming/kitex_gen/echo/k-api.go +++ b/thrift_streaming/kitex_gen/echo/k-api.go @@ -1,4 +1,4 @@ -// Code generated by Kitex v0.12.0. DO NOT EDIT. +// Code generated by Kitex v0.16.1. DO NOT EDIT. package echo diff --git a/thrift_streaming/kitex_gen/echo/testservice/client.go b/thrift_streaming/kitex_gen/echo/testservice/client.go index 2e0b59a3..b164e2fc 100644 --- a/thrift_streaming/kitex_gen/echo/testservice/client.go +++ b/thrift_streaming/kitex_gen/echo/testservice/client.go @@ -1,4 +1,4 @@ -// Code generated by Kitex v0.12.0. DO NOT EDIT. +// Code generated by Kitex v0.16.1. DO NOT EDIT. package testservice @@ -7,50 +7,36 @@ import ( echo "github.com/cloudwego/kitex-examples/thrift_streaming/kitex_gen/echo" client "github.com/cloudwego/kitex/client" callopt "github.com/cloudwego/kitex/client/callopt" - "github.com/cloudwego/kitex/client/callopt/streamcall" - "github.com/cloudwego/kitex/client/streamclient" + streamcall "github.com/cloudwego/kitex/client/callopt/streamcall" streaming "github.com/cloudwego/kitex/pkg/streaming" transport "github.com/cloudwego/kitex/transport" ) // Client is designed to provide IDL-compatible methods with call-option parameter for kitex framework. type Client interface { - EchoPingPong(ctx context.Context, req *echo.Request, callOptions ...callopt.Option) (r *echo.Response, err error) -} - -// StreamClient is designed to provide Interface for Streaming APIs. -type StreamClient interface { Echo(ctx context.Context, callOptions ...streamcall.Option) (stream TestService_EchoClient, err error) EchoClient(ctx context.Context, callOptions ...streamcall.Option) (stream TestService_EchoClientClient, err error) EchoServer(ctx context.Context, req *echo.Request, callOptions ...streamcall.Option) (stream TestService_EchoServerClient, err error) - EchoUnary(ctx context.Context, req *echo.Request, callOptions ...streamcall.Option) (r *echo.Response, err error) + EchoUnary(ctx context.Context, req *echo.Request, callOptions ...callopt.Option) (r *echo.Response, err error) + EchoPingPong(ctx context.Context, req *echo.Request, callOptions ...callopt.Option) (r *echo.Response, err error) } -type TestService_EchoClient interface { - streaming.Stream - Send(*echo.Request) error - Recv() (*echo.Response, error) -} +type TestService_EchoClient streaming.BidiStreamingClient[echo.Request, echo.Response] -type TestService_EchoClientClient interface { - streaming.Stream - Send(*echo.Request) error - CloseAndRecv() (*echo.Response, error) -} +type TestService_EchoClientClient streaming.ClientStreamingClient[echo.Request, echo.Response] -type TestService_EchoServerClient interface { - streaming.Stream - Recv() (*echo.Response, error) -} +type TestService_EchoServerClient streaming.ServerStreamingClient[echo.Response] // NewClient creates a client for the service defined in IDL. func NewClient(destService string, opts ...client.Option) (Client, error) { var options []client.Option options = append(options, client.WithDestService(destService)) + options = append(options, client.WithTransportProtocol(transport.TTHeaderStreaming)) + options = append(options, opts...) - kc, err := client.NewClient(serviceInfoForClient(), options...) + kc, err := client.NewClient(serviceInfo(), options...) if err != nil { return nil, err } @@ -72,57 +58,27 @@ type kTestServiceClient struct { *kClient } -func (p *kTestServiceClient) EchoPingPong(ctx context.Context, req *echo.Request, callOptions ...callopt.Option) (r *echo.Response, err error) { - ctx = client.NewCtxWithCallOptions(ctx, callOptions) - return p.kClient.EchoPingPong(ctx, req) -} - -// NewStreamClient creates a stream client for the service's streaming APIs defined in IDL. -func NewStreamClient(destService string, opts ...streamclient.Option) (StreamClient, error) { - var options []client.Option - options = append(options, client.WithDestService(destService)) - options = append(options, client.WithTransportProtocol(transport.GRPC)) - options = append(options, streamclient.GetClientOptions(opts)...) - - kc, err := client.NewClient(serviceInfoForStreamClient(), options...) - if err != nil { - return nil, err - } - return &kTestServiceStreamClient{ - kClient: newServiceClient(kc), - }, nil -} - -// MustNewStreamClient creates a stream client for the service's streaming APIs defined in IDL. -// It panics if any error occurs. -func MustNewStreamClient(destService string, opts ...streamclient.Option) StreamClient { - kc, err := NewStreamClient(destService, opts...) - if err != nil { - panic(err) - } - return kc -} - -type kTestServiceStreamClient struct { - *kClient -} - -func (p *kTestServiceStreamClient) Echo(ctx context.Context, callOptions ...streamcall.Option) (stream TestService_EchoClient, err error) { +func (p *kTestServiceClient) Echo(ctx context.Context, callOptions ...streamcall.Option) (stream TestService_EchoClient, err error) { ctx = client.NewCtxWithCallOptions(ctx, streamcall.GetCallOptions(callOptions)) return p.kClient.Echo(ctx) } -func (p *kTestServiceStreamClient) EchoClient(ctx context.Context, callOptions ...streamcall.Option) (stream TestService_EchoClientClient, err error) { +func (p *kTestServiceClient) EchoClient(ctx context.Context, callOptions ...streamcall.Option) (stream TestService_EchoClientClient, err error) { ctx = client.NewCtxWithCallOptions(ctx, streamcall.GetCallOptions(callOptions)) return p.kClient.EchoClient(ctx) } -func (p *kTestServiceStreamClient) EchoServer(ctx context.Context, req *echo.Request, callOptions ...streamcall.Option) (stream TestService_EchoServerClient, err error) { +func (p *kTestServiceClient) EchoServer(ctx context.Context, req *echo.Request, callOptions ...streamcall.Option) (stream TestService_EchoServerClient, err error) { ctx = client.NewCtxWithCallOptions(ctx, streamcall.GetCallOptions(callOptions)) return p.kClient.EchoServer(ctx, req) } -func (p *kTestServiceStreamClient) EchoUnary(ctx context.Context, req *echo.Request, callOptions ...streamcall.Option) (r *echo.Response, err error) { - ctx = client.NewCtxWithCallOptions(ctx, streamcall.GetCallOptions(callOptions)) +func (p *kTestServiceClient) EchoUnary(ctx context.Context, req *echo.Request, callOptions ...callopt.Option) (r *echo.Response, err error) { + ctx = client.NewCtxWithCallOptions(ctx, callOptions) return p.kClient.EchoUnary(ctx, req) } + +func (p *kTestServiceClient) EchoPingPong(ctx context.Context, req *echo.Request, callOptions ...callopt.Option) (r *echo.Response, err error) { + ctx = client.NewCtxWithCallOptions(ctx, callOptions) + return p.kClient.EchoPingPong(ctx, req) +} diff --git a/thrift_streaming/kitex_gen/echo/testservice/server.go b/thrift_streaming/kitex_gen/echo/testservice/server.go index 58fef3c7..b8d2dcaa 100644 --- a/thrift_streaming/kitex_gen/echo/testservice/server.go +++ b/thrift_streaming/kitex_gen/echo/testservice/server.go @@ -1,4 +1,4 @@ -// Code generated by Kitex v0.12.0. DO NOT EDIT. +// Code generated by Kitex v0.16.1. DO NOT EDIT. package testservice import ( diff --git a/thrift_streaming/kitex_gen/echo/testservice/testservice.go b/thrift_streaming/kitex_gen/echo/testservice/testservice.go index 623a90e6..8ba3c4ca 100644 --- a/thrift_streaming/kitex_gen/echo/testservice/testservice.go +++ b/thrift_streaming/kitex_gen/echo/testservice/testservice.go @@ -1,11 +1,10 @@ -// Code generated by Kitex v0.12.0. DO NOT EDIT. +// Code generated by Kitex v0.16.1. DO NOT EDIT. package testservice import ( "context" "errors" - "fmt" echo "github.com/cloudwego/kitex-examples/thrift_streaming/kitex_gen/echo" client "github.com/cloudwego/kitex/client" kitex "github.com/cloudwego/kitex/pkg/serviceinfo" @@ -41,7 +40,7 @@ var serviceMethods = map[string]kitex.MethodInfo{ newTestServiceEchoUnaryArgs, newTestServiceEchoUnaryResult, false, - kitex.WithStreamingMode(kitex.StreamingUnary), + kitex.WithStreamingMode(kitex.StreamingNone), ), "EchoPingPong": kitex.NewMethodInfo( echoPingPongHandler, @@ -53,9 +52,7 @@ var serviceMethods = map[string]kitex.MethodInfo{ } var ( - testServiceServiceInfo = NewServiceInfo() - testServiceServiceInfoForClient = NewServiceInfoForClient() - testServiceServiceInfoForStreamClient = NewServiceInfoForStreamClient() + testServiceServiceInfo = NewServiceInfo() ) // for server @@ -63,98 +60,35 @@ func serviceInfo() *kitex.ServiceInfo { return testServiceServiceInfo } -// for stream client -func serviceInfoForStreamClient() *kitex.ServiceInfo { - return testServiceServiceInfoForStreamClient -} - -// for client -func serviceInfoForClient() *kitex.ServiceInfo { - return testServiceServiceInfoForClient -} - -// NewServiceInfo creates a new ServiceInfo containing all methods +// NewServiceInfo creates a new ServiceInfo func NewServiceInfo() *kitex.ServiceInfo { - return newServiceInfo(true, true, true) -} - -// NewServiceInfo creates a new ServiceInfo containing non-streaming methods -func NewServiceInfoForClient() *kitex.ServiceInfo { - return newServiceInfo(false, false, true) -} -func NewServiceInfoForStreamClient() *kitex.ServiceInfo { - return newServiceInfo(true, true, false) + return newServiceInfo() } -func newServiceInfo(hasStreaming bool, keepStreamingMethods bool, keepNonStreamingMethods bool) *kitex.ServiceInfo { +func newServiceInfo() *kitex.ServiceInfo { serviceName := "TestService" handlerType := (*echo.TestService)(nil) - methods := map[string]kitex.MethodInfo{} - for name, m := range serviceMethods { - if m.IsStreaming() && !keepStreamingMethods { - continue - } - if !m.IsStreaming() && !keepNonStreamingMethods { - continue - } - methods[name] = m - } extra := map[string]interface{}{ "PackageName": "echo", } - if hasStreaming { - extra["streaming"] = hasStreaming - } svcInfo := &kitex.ServiceInfo{ ServiceName: serviceName, HandlerType: handlerType, - Methods: methods, + Methods: serviceMethods, PayloadCodec: kitex.Thrift, - KiteXGenVersion: "v0.12.0", + KiteXGenVersion: "v0.16.1", Extra: extra, } return svcInfo } func echoHandler(ctx context.Context, handler interface{}, arg, result interface{}) error { - st, ok := arg.(*streaming.Args) - if !ok { - return errors.New("TestService.Echo is a thrift streaming method, please call with Kitex StreamClient") - } - stream := &testServiceEchoServer{st.Stream} - return handler.(echo.TestService).Echo(stream) -} - -type testServiceEchoClient struct { - streaming.Stream -} - -func (x *testServiceEchoClient) DoFinish(err error) { - if finisher, ok := x.Stream.(streaming.WithDoFinish); ok { - finisher.DoFinish(err) - } else { - panic(fmt.Sprintf("streaming.WithDoFinish is not implemented by %T", x.Stream)) + st, err := streaming.GetServerStreamFromArg(arg) + if err != nil { + return err } -} -func (x *testServiceEchoClient) Send(m *echo.Request) error { - return x.Stream.SendMsg(m) -} -func (x *testServiceEchoClient) Recv() (*echo.Response, error) { - m := new(echo.Response) - return m, x.Stream.RecvMsg(m) -} - -type testServiceEchoServer struct { - streaming.Stream -} - -func (x *testServiceEchoServer) Send(m *echo.Response) error { - return x.Stream.SendMsg(m) -} - -func (x *testServiceEchoServer) Recv() (*echo.Request, error) { - m := new(echo.Request) - return m, x.Stream.RecvMsg(m) + stream := streaming.NewBidiStreamingServer[echo.Request, echo.Response](st) + return handler.(echo.TestService).Echo(ctx, stream) } func newTestServiceEchoArgs() interface{} { @@ -166,47 +100,12 @@ func newTestServiceEchoResult() interface{} { } func echoClientHandler(ctx context.Context, handler interface{}, arg, result interface{}) error { - st, ok := arg.(*streaming.Args) - if !ok { - return errors.New("TestService.EchoClient is a thrift streaming method, please call with Kitex StreamClient") - } - stream := &testServiceEchoClientServer{st.Stream} - return handler.(echo.TestService).EchoClient(stream) -} - -type testServiceEchoClientClient struct { - streaming.Stream -} - -func (x *testServiceEchoClientClient) DoFinish(err error) { - if finisher, ok := x.Stream.(streaming.WithDoFinish); ok { - finisher.DoFinish(err) - } else { - panic(fmt.Sprintf("streaming.WithDoFinish is not implemented by %T", x.Stream)) - } -} -func (x *testServiceEchoClientClient) Send(m *echo.Request) error { - return x.Stream.SendMsg(m) -} -func (x *testServiceEchoClientClient) CloseAndRecv() (*echo.Response, error) { - if err := x.Stream.Close(); err != nil { - return nil, err + st, err := streaming.GetServerStreamFromArg(arg) + if err != nil { + return err } - m := new(echo.Response) - return m, x.Stream.RecvMsg(m) -} - -type testServiceEchoClientServer struct { - streaming.Stream -} - -func (x *testServiceEchoClientServer) SendAndClose(m *echo.Response) error { - return x.Stream.SendMsg(m) -} - -func (x *testServiceEchoClientServer) Recv() (*echo.Request, error) { - m := new(echo.Request) - return m, x.Stream.RecvMsg(m) + stream := streaming.NewClientStreamingServer[echo.Request, echo.Response](st) + return handler.(echo.TestService).EchoClient(ctx, stream) } func newTestServiceEchoClientArgs() interface{} { @@ -218,40 +117,16 @@ func newTestServiceEchoClientResult() interface{} { } func echoServerHandler(ctx context.Context, handler interface{}, arg, result interface{}) error { - st, ok := arg.(*streaming.Args) - if !ok { - return errors.New("TestService.EchoServer is a thrift streaming method, please call with Kitex StreamClient") + st, err := streaming.GetServerStreamFromArg(arg) + if err != nil { + return err } - stream := &testServiceEchoServerServer{st.Stream} + stream := streaming.NewServerStreamingServer[echo.Response](st) req := new(echo.Request) - if err := st.Stream.RecvMsg(req); err != nil { + if err := stream.RecvMsg(ctx, req); err != nil { return err } - return handler.(echo.TestService).EchoServer(req, stream) -} - -type testServiceEchoServerClient struct { - streaming.Stream -} - -func (x *testServiceEchoServerClient) DoFinish(err error) { - if finisher, ok := x.Stream.(streaming.WithDoFinish); ok { - finisher.DoFinish(err) - } else { - panic(fmt.Sprintf("streaming.WithDoFinish is not implemented by %T", x.Stream)) - } -} -func (x *testServiceEchoServerClient) Recv() (*echo.Response, error) { - m := new(echo.Response) - return m, x.Stream.RecvMsg(m) -} - -type testServiceEchoServerServer struct { - streaming.Stream -} - -func (x *testServiceEchoServerServer) Send(m *echo.Response) error { - return x.Stream.SendMsg(m) + return handler.(echo.TestService).EchoServer(ctx, req, stream) } func newTestServiceEchoServerArgs() interface{} { @@ -263,9 +138,6 @@ func newTestServiceEchoServerResult() interface{} { } func echoUnaryHandler(ctx context.Context, handler interface{}, arg, result interface{}) error { - if streaming.GetStream(ctx) == nil { - return errors.New("TestService.EchoUnary is a thrift streaming unary method, please call with Kitex StreamClient or remove the annotation streaming.mode") - } realArg := arg.(*echo.TestServiceEchoUnaryArgs) realResult := result.(*echo.TestServiceEchoUnaryResult) success, err := handler.(echo.TestService).EchoUnary(ctx, realArg.Req) @@ -275,6 +147,7 @@ func echoUnaryHandler(ctx context.Context, handler interface{}, arg, result inte realResult.Success = success return nil } + func newTestServiceEchoUnaryArgs() interface{} { return echo.NewTestServiceEchoUnaryArgs() } @@ -293,6 +166,7 @@ func echoPingPongHandler(ctx context.Context, handler interface{}, arg, result i realResult.Success = success return nil } + func newTestServiceEchoPingPongArgs() interface{} { return echo.NewTestServiceEchoPingPongArgs() } @@ -302,59 +176,45 @@ func newTestServiceEchoPingPongResult() interface{} { } type kClient struct { - c client.Client + c client.Client + sc client.Streaming } func newServiceClient(c client.Client) *kClient { return &kClient{ - c: c, + c: c, + sc: c.(client.Streaming), } } func (p *kClient) Echo(ctx context.Context) (TestService_EchoClient, error) { - streamClient, ok := p.c.(client.Streaming) - if !ok { - return nil, fmt.Errorf("client not support streaming") - } - res := new(streaming.Result) - err := streamClient.Stream(ctx, "Echo", nil, res) + st, err := p.sc.StreamX(ctx, "Echo") if err != nil { return nil, err } - stream := &testServiceEchoClient{res.Stream} + stream := streaming.NewBidiStreamingClient[echo.Request, echo.Response](st) return stream, nil } func (p *kClient) EchoClient(ctx context.Context) (TestService_EchoClientClient, error) { - streamClient, ok := p.c.(client.Streaming) - if !ok { - return nil, fmt.Errorf("client not support streaming") - } - res := new(streaming.Result) - err := streamClient.Stream(ctx, "EchoClient", nil, res) + st, err := p.sc.StreamX(ctx, "EchoClient") if err != nil { return nil, err } - stream := &testServiceEchoClientClient{res.Stream} + stream := streaming.NewClientStreamingClient[echo.Request, echo.Response](st) return stream, nil } func (p *kClient) EchoServer(ctx context.Context, req *echo.Request) (TestService_EchoServerClient, error) { - streamClient, ok := p.c.(client.Streaming) - if !ok { - return nil, fmt.Errorf("client not support streaming") - } - res := new(streaming.Result) - err := streamClient.Stream(ctx, "EchoServer", nil, res) + st, err := p.sc.StreamX(ctx, "EchoServer") if err != nil { return nil, err } - stream := &testServiceEchoServerClient{res.Stream} - - if err := stream.Stream.SendMsg(req); err != nil { + stream := streaming.NewServerStreamingClient[echo.Response](st) + if err := stream.SendMsg(ctx, req); err != nil { return nil, err } - if err := stream.Stream.Close(); err != nil { + if err := stream.CloseSend(ctx); err != nil { return nil, err } return stream, nil diff --git a/thrift_streaming/main.go b/thrift_streaming/main.go index ee3bed91..e6478da9 100644 --- a/thrift_streaming/main.go +++ b/thrift_streaming/main.go @@ -20,6 +20,7 @@ import ( "log" "github.com/cloudwego/kitex/pkg/endpoint" + "github.com/cloudwego/kitex/pkg/endpoint/sep" "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/streaming" "github.com/cloudwego/kitex/pkg/utils/kitexutil" @@ -41,26 +42,28 @@ func main() { } }), - // recv middleware - // NOTE: message (request from client) will NOT be available until `next` returns - server.WithRecvMiddleware(func(next endpoint.RecvEndpoint) endpoint.RecvEndpoint { - return func(stream streaming.Stream, req interface{}) (err error) { - method, _ := kitexutil.GetMethod(stream.Context()) - err = next(stream, req) - klog.Infof("[%s] server recv middleware, err = %#v, req = %#v", method, err, req) - return err - } - }), + server.WithStreamOptions( + // recv middleware + // NOTE: message (request from client) will NOT be available until `next` returns + server.WithStreamRecvMiddleware(func(next sep.StreamRecvEndpoint) sep.StreamRecvEndpoint { + return func(ctx context.Context, stream streaming.ServerStream, req interface{}) (err error) { + method, _ := kitexutil.GetMethod(ctx) + err = next(ctx, stream, req) + klog.Infof("[%s] server recv middleware, err = %#v, req = %#v", method, err, req) + return err + } + }), - // send middleware - server.WithSendMiddleware(func(next endpoint.SendEndpoint) endpoint.SendEndpoint { - return func(stream streaming.Stream, resp interface{}) (err error) { - method, _ := kitexutil.GetMethod(stream.Context()) - err = next(stream, resp) - klog.Infof("[%s] server send middleware, err = %#v, resp = %#v", method, err, resp) - return err - } - }), + // send middleware + server.WithStreamSendMiddleware(func(next sep.StreamSendEndpoint) sep.StreamSendEndpoint { + return func(ctx context.Context, stream streaming.ServerStream, resp interface{}) (err error) { + method, _ := kitexutil.GetMethod(ctx) + err = next(ctx, stream, resp) + klog.Infof("[%s] server send middleware, err = %#v, resp = %#v", method, err, resp) + return err + } + }), + ), ) err := svr.Run() From 2deb29cacbcc3f323bc388ef3e0b740a22302220 Mon Sep 17 00:00:00 2001 From: rogerogers Date: Wed, 11 Feb 2026 21:41:26 +0800 Subject: [PATCH 2/2] fix(thrift_streaming): Use stream.Context() instead of an external context. Signed-off-by: rogerogers --- thrift_streaming/client/demo_client.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/thrift_streaming/client/demo_client.go b/thrift_streaming/client/demo_client.go index 8977f44f..0021e7f0 100644 --- a/thrift_streaming/client/demo_client.go +++ b/thrift_streaming/client/demo_client.go @@ -130,7 +130,7 @@ func echo(cli testservice.Client) { // Send go func() { defer wg.Done() - defer stream.CloseSend(ctx) // Tell the server there'll be no more message from client + defer stream.CloseSend(stream.Context()) // Tell the server there'll be no more message from client for i := 0; i < 3; i++ { req := &test.Request{Message: "client, " + strconv.Itoa(i)} if err = stream.Send(ctx, req); err != nil { @@ -163,13 +163,13 @@ func echo(cli testservice.Client) { func echoClient(cli testservice.Client) { ctx := context.Background() - stream, err := cli.EchoClient(context.Background()) + stream, err := cli.EchoClient(ctx) if err != nil { panic("failed to call Echo: " + err.Error()) } for i := 0; i < 3; i++ { req := &test.Request{Message: "hello, " + strconv.Itoa(i)} - err := stream.Send(ctx, req) + err := stream.Send(stream.Context(), req) if err != nil { panic("failed to send Echo: " + err.Error()) } @@ -186,14 +186,13 @@ func echoClient(cli testservice.Client) { } func echoServer(cli testservice.Client) { - ctx := context.Background() req := &test.Request{Message: "hello"} stream, err := cli.EchoServer(context.Background(), req) if err != nil { panic("failed to call Echo: " + err.Error()) } for { - resp, err := stream.Recv(ctx) + resp, err := stream.Recv(stream.Context()) // make sure you receive and io.EOF or other non-nil error // otherwise RPCFinish event will not be recorded if err == io.EOF {