↓前回記事たち↓
本記事では、双方向ストリーミングを扱います。
双方向ストリーミング
双方向ストリーミングとは、複数リクエストに対して複数レスポンスを返す通信形式のことを言います。
クライアントから複数文字列を1秒ごとに送信し、それに対する文字列のレスポンスをリクエストごとに受け取る、といった簡易的なサーバを作ってみたいと思います。
まず、protoファイルは以下のように記述します。
message UploadAndGetProgressRequest {
string name = 1;
}
message UploadAndGetProgressResponse {
string progress = 1;
}
// サービス定義
service ListService {
rpc UploadAndGetProgress (stream UploadAndGetProgressRequest) returns (stream UploadAndGetProgressResponse);
}
サービス定義で引数と返り値の両方にstream
キーワードを記述することで、該当rpcメソッドが双方向ストリーミング対応になります。
いつも通りprotocコマンドでgrpcソースコードを作成します。
$ protoc -I. --go_out=. --go-grpc_out=. *.proto
サーバ
以下、サーバで使用するstream用のインターフェースです。
type ListService_UploadAndGetProgressServer interface {
Send(*UploadAndGetProgressResponse) error
Recv() (*UploadAndGetProgressRequest, error)
grpc.ServerStream
}
Send
やRecv
メソッドでリクエストの取得やレスポンス送信ができる他、Serverstream
インターフェースのメソッド一式も持っています。
これを利用したサーバ側の実装例が以下のようになります。
func (*Server) UploadAndGetProgress(stream pb.ListService_UploadAndGetProgressServer) error {
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Println("Server stream finished.")
return nil
}
return err
}
name := req.Name
log.Println("Server received:", name)
err = stream.Send(&pb.UploadAndGetProgressResponse{
Progress: fmt.Sprintf("Progressed at the server: %s", name),
})
if err != nil {
return err
}
}
}
EOFエラーが発生するまでRecvメソッドを繰り返し実行し、リクエストの取得を試みます。
リクエストを取得するたびにSendメソッドでメッセージをクライアントに返します。
クライアント
以下、クライアントで使用するstream用のインターフェースです。
type ListService_UploadAndGetProgressClient interface {
Send(*UploadAndGetProgressRequest) error
Recv() (*UploadAndGetProgressResponse, error)
grpc.ClientStream
}
サーバ用のインターフェースと同様にSend
やRecv
メソッドでリクエストの送信やレスポンス取得ができるようになっています。
また、ClientSteam
インターフェースのメソッド一式も持っています。
これを利用したクライアント側の実装例が以下のようになります。
func uploadAndGetProgress(client pb.ListServiceClient) {
stream, err := client.UploadAndGetProgress(context.Background())
if err != nil {
log.Fatal(err)
}
// 送信ゴルーチン
go func() {
for i := 0; i < 5; i++ {
err = stream.Send(&pb.UploadAndGetProgressRequest{
Name: fmt.Sprintf("Name%d", i),
})
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second * 1)
}
err = stream.CloseSend()
if err != nil {
log.Fatal(err)
}
}()
// 受信ゴルーチン
ch := make(chan int)
go func() {
for {
res, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Println("Client stream finished.")
break
}
log.Fatal(err)
}
progress := res.Progress
log.Println("Client received:", progress)
}
close(ch)
}()
// 受信ゴルーチンが終了するまで待機
<-ch
}
クライアントではゴルーチンを、以下の2つ実行しています。
ゴルーチン1つ目
リクエスト送信用のゴルーチンです。1秒の間隔を空けて、サーバに文字列を5回送信します。
送信が全て完了するとClientStreamインターフェースのCloseSendメソッドを実行し、クライアント側のクローズをサーバに知らせます。
ゴルーチン2つ目
レスポンス受信用のゴルーチンです。forで繰り返しレスポンスの取得を試みます。
サーバからEOFが返されると(サーバ側メソッドでreturn nil
するとクライアントにEOFが渡されます)、forループを抜けてメインで初期化したチャネルをcloseします。
ゴルーチンが終了する前にメイン処理が終わってしまうのを防ぐため、メインでは<-ch
部分でチャネルに値が入る、もしくはチャネルがcloseされるまでメイン処理をストップさせます。
受信ゴルーチンでチャネルがcloseされるとこの部分が実行され、メイン処理がそのまま終了するという流れになります。
コメント