※前回記事:
※前前回記事:
grpcのストリーム通信について
今回の記事では、grpcのストリーム通信を扱います。
grpcでは1リクエスト1レスポンス方式のUnary通信に加え、ストリーム通信を実装することができます。
ストリーム通信は以下の3通りあります。
サーバストリーミング
1リクエストに対して複数のレスポンスが返る通信形式です。
クライアントストリーミング
複数リクエストに対して1レスポンスが返る通信形式です。
双方向ストリーミング
複数リクエストに対して複数レスポンスが返る通信形式です。
サーバストリーミング
Protoファイルは以下のような実装となります。
message GetListRequest {
}
message GetListResponse {
string name = 1;
}
service ListService {
rpc GetList (GetListRequest) returns (stream GetListResponse);
}
サービス定義の返り値指定の箇所にstream
を加えているのがわかります。
これにより対象のRPCメソッドがサーバストリーミング形式となります。
Protocコマンドでgoファイルを実装すると、サーバ用の以下のようなコードが生成されます。
func (UnimplementedListServiceServer) GetList(*GetListRequest, ListService_GetListServer) error {
return status.Errorf(codes.Unimplemented, "method GetList not implemented")
}
RPCメソッドの引数にListService_GetListServer
インターフェースを指定するようになっていますが、このインターフェースはSend
メソッドを内包しており、このメソッドでレスポンスを返せるようになっています。
type ListService_GetListServer interface {
Send(*GetListResponse) error
grpc.ServerStream
}
...
func (x *listServiceGetListServer) Send(m *GetListResponse) error {
return x.ServerStream.SendMsg(m)
}
サーバ側での実装例としては、下記のようになります。
func (*Server) GetList(req *pb.GetListRequest, stream pb.ListService_GetListServer) error {
for i := 0; i < 5; i++ {
err := stream.Send(&pb.GetListResponse{
Name: fmt.Sprintf("This is list %d", i),
})
if err != nil {
return err
}
log.Println("stream sent:", i)
time.Sleep(1 * time.Second)
}
return nil
}
この例では5回forループを回し、毎ループでレスポンスを返しています。
最後にnilをreturnして通信をcloseさせています。
一方、クライアント側の生成コードは以下のようになります。
func (c *listServiceClient) GetList(ctx context.Context, in *GetListRequest, opts ...grpc.CallOption) (ListService_GetListClient, error) {
...
}
RPCメソッドの実行でListService_GetListClient
インターフェースを返すのですが
このインターフェースはRecv
メソッドを内包しており、このメソッドでサーバ側から返される複数のレスポンスを都度受け取ります。
type ListService_GetListClient interface {
Recv() (*GetListResponse, error)
grpc.ClientStream
}
type listServiceGetListClient struct {
grpc.ClientStream
}
func (x *listServiceGetListClient) Recv() (*GetListResponse, error) {
m := new(GetListResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
以上を踏まえたクライアント側の実装例としては、下記のようになります。
func getList(client pb.ListServiceClient) {
stream, err := client.GetList(context.Background(), &pb.GetListRequest{})
if err != nil {
log.Fatal(err)
}
for {
res, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Println("stream finished.")
return
}
log.Fatal(err)
}
log.Println("response:", res.Name)
}
}
この例ではforで無限ループを回し、ループごとにRecvでレスポンス取得を試みます。
レスポンス取得できた場合は内容を出力し、サーバがデータを返しきった(io.EOF
エラーが発生)した段階でクライアント処理も終了させています。
以上のサーバ、クライアントを実装してサーバ実行し、クライアントを実行すると以下のような出力がされます(1秒ごとに1行)。
$ go run client/main.go
2022/11/17 02:11:01 response: This is list 0
2022/11/17 02:11:02 response: This is list 1
2022/11/17 02:11:03 response: This is list 2
2022/11/17 02:11:04 response: This is list 3
2022/11/17 02:11:05 response: This is list 4
クライアントストリーミング
Protoファイルは以下のような実装となります。
message UploadListRequest {
string name = 1;
}
message UploadListResponse {
string result = 1;
}
service ListService {
rpc UploadList (stream UploadListRequest) returns (UploadListResponse);
}
今回はサーバストリーミングと異なり、サービス定義の引数の箇所にstream
を加えているのがわかります。
これにより対象のRPCメソッドがクライアントストリーミング形式となります。
Protocコマンドでgoファイルを実装すると、サーバ用の以下のようなコードが生成されます。
func (UnimplementedListServiceServer) UploadList(ListService_UploadListServer) error {
return status.Errorf(codes.Unimplemented, "method UploadList not implemented")
}
RPCメソッドの引数にListService_UploadListServer
インターフェースを指定するようになっていますが、このインターフェースはSendAndClose
メソッドとRecv
を内包しています。
Recvメソッドはクライアントからの複数リクエストを都度受け取るために使用し、SendAndCloseはレスポンスを返すとともにストリームをcloseします。
type ListService_UploadListServer interface {
SendAndClose(*UploadListResponse) error
Recv() (*UploadListRequest, error)
grpc.ServerStream
}
...
func (x *listServiceUploadListServer) SendAndClose(m *UploadListResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *listServiceUploadListServer) Recv() (*UploadListRequest, error) {
m := new(UploadListRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
サーバ側での実装例としては、下記のようになります。
func (*Server) UploadList(stream pb.ListService_UploadListServer) error {
res := ""
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
stream.SendAndClose(&pb.UploadListResponse{
Result: res,
})
log.Println("request:", res)
log.Println("stream finished.")
return nil
}
return err
}
res += fmt.Sprintf("%s\n", req.Name)
}
}
この例では無限forループを回し、毎ループでリクエストの取得を試みます。
リクエストを取得する度にres変数に加えていき、クライアントからのリクエスト送信が終了する(io.EOF
エラーを受け取る)とレスポンスを返却してストリームをcloseし、終了します。
一方、クライアント側の生成コードは以下のようになります。
func (c *listServiceClient) UploadList(ctx context.Context, opts ...grpc.CallOption) (ListService_UploadListClient, error) {
...
}
RPCメソッドの実行でListService_UploadListClient
インターフェースを返すのですが、このインターフェースはSend
メソッドとCloseAndRecv
メソッドを内包しています。
Sendメソッドはリクエストを送信する際に使用し、CloseAndRecvメソッドはリクエスト送信の終了を意味するio.EOFエラーを送信するとともに、サーバからレスポンスを受け取るためのものです。
type ListService_UploadListClient interface {
Send(*UploadListRequest) error
CloseAndRecv() (*UploadListResponse, error)
grpc.ClientStream
}
....
func (x *listServiceUploadListClient) Send(m *UploadListRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *listServiceUploadListClient) CloseAndRecv() (*UploadListResponse, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(UploadListResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
以上を踏まえたクライアント側の実装例としては、下記のようになります。
func uploadList(client pb.ListServiceClient) {
stream, err := client.UploadList(context.Background())
if err != nil {
log.Fatal(err)
}
for i := 0; i < 5; i++ {
err := stream.Send(&pb.UploadListRequest{
Name: fmt.Sprintf("This is list %d", i),
})
if err != nil {
log.Fatal(err)
}
log.Println("stream sent:", i)
time.Sleep(1 * time.Second)
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatal(err)
}
log.Println("response:", res.Result)
}
この例では5回forループを回し、ループごとにSendでリクエストを送ります。
リクエストを送りきった後にCloseAndRecvでリクエスト送信完了通知&レスポンスを取得します。
以上のサーバ、クライアントを実装してサーバ実行し、クライアントを実行すると、リクエストを送信しきった後に以下のような出力がされます。
2022/11/17 02:39:33 response: This is list 0
This is list 1
This is list 2
This is list 3
This is list 4
次回の記事では、双方向ストリーミングについて扱おうと思います。
コメント