亲宝软件园·资讯

展开

Python gRPC流式通信协议详细讲解

雕弓 人气:0

ProtoBuf 协议

gRPC使用 protocol buffer 协议做为接口描述语言(IDL) ,来定义接口及传递的消息数据类型。

gRPC的message 与 service 均须在protobuf 文件中定义好,才能进行编译。 该文件须按protobuf 语法编写, 也比较简单,当前只须学习gRPC用到的部分。

如下例:

service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
  string greeting = 1;
}
message HelloResponse {
  string reply = 1;
}

说明:

service interface_name {
rpc api_name( request ) returns ( response );
}

Message: 相当于接口函数的参数类型与返回值类型 ,需要分开定义

详细 protobuf 使用教程,请参考菜鸟教程tutorialspoint 的教程 https://www.tutorialspoint.com/protobuf/index.htm

gRPC 4种通信模式介绍

1. 单向RPC

gRPC的术语为unary RPC,这种方式与函数调用类似, client 发送1条请求,server回1条响应。

rpc SayHello(HelloRequest) returns (HelloResponse);

2. 服务器流式处理 RPC

客户端向服务器发送1条请求,服务器以回应多条响应消息,客户机从返回的流中读取数据,直至没有更多消息。 这时要在响应类型前加1个 stream关键字修饰。

rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);

3. 客户端流式处理 RPC

由客户端写入一系列消息并将其发送到服务。 客户端完成消息写入后,它将等待服务器读取消息并返回其响应. 这种模式,要在request的类型前加 stream 修饰。

rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);

4. 双向流式处理 RPC

其中双方使用读写流发送一系列消息。这两个流独立运行,因此客户端和服务器可以按照它们喜欢的任何顺序进行读取和写入:例如,服务器可以等待接收所有客户端消息,然后再写入响应,或者它可以交替读取消息然后写入消息,或者读取和写入的某种其他组合。将保留每个流中消息的顺序。此模式下, request 与 response类型均需要用stream修饰。

rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);

说明

流程处理实现过程

1. 用protobuf 定义接口

下面以实例说明: users.proto ,

syntax = "proto3";
package users;
message User {
  string username = 1;
  uint32 user_id = 2;
}
message CreateUserRequest {
  string username = 1;
  string password = 2;
  string email = 3;
}
message CreateUserResult {
  User user = 1;
}
message GetUsersRequest {
  repeated User user = 1;
}
service Users {
  rpc CreateUser (users.CreateUserRequest) returns (users.CreateUserResult);
  rpc GetUsers (users.GetUsersRequest) returns (stream users.GetUsersResult);
}
message GetUsersResult {
  User user = 1;
}

2. 根据.protobuf文件生成客户方与服务方代码

首先要安装 grpcio-tools package:

pip install grpcio-tools

进入proto文件所在目录,执行如下命令

python -m grpc_tools.protoc

\ --proto_path=.

\ --python_out=.

\ --grpc_python_out=.

\ proto文件名

参数说明

本例 :

python -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. users.proto

生成的文件有两个: Users_pb2.py 与 Users_pb2_grpc.py,

3. 服务器端代码

from concurrent import futures
import time
import grpc
import users_pb2_grpc as users_service
import users_pb2 as users_messages
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class UsersService(users_service.UsersServicer):
    def CreateUser(self, request, context):
        metadata = dict(context.invocation_metadata())
        print(metadata)
        user = users_messages.User(username=request.username, user_id=1)
        return users_messages.CreateUserResult(user=user)
    def GetUsers(self, request, context):
        for user in request.user:
            user = users_messages.User(
                username=user.username, user_id=user.user_id
            )
            yield users_messages.GetUsersResult(user=user)
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    users_service.add_UsersServicer_to_server(UsersService(), server)
    server.add_insecure_port('0.0.0.0:50051')
    server.start()
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)
if __name__ == '__main__':
    serve()

4. 客户端侧代码

import sys
import grpc
import users_pb2_grpc as users_service
import users_pb2 as users_messages
def run():
    channel = grpc.insecure_channel('localhost:50051')
    try:
        grpc.channel_ready_future(channel).result(timeout=10)
    except grpc.FutureTimeoutError:
        sys.exit('Error connecting to server')
    else:
        stub = users_service.UsersStub(channel)
        metadata = [('ip', '127.0.0.1')]
        response = stub.CreateUser(
            users_messages.CreateUserRequest(username='tom'),
            metadata=metadata,
        )
        if response:
            print("User created:", response.user.username)
        request = users_messages.GetUsersRequest(
            user=[users_messages.User(username="alexa", user_id=1),
                  users_messages.User(username="christie", user_id=1)]
        )
        response = stub.GetUsers(request)
        for resp in response:
            print(resp)
if __name__ == '__main__':
    run()

5. 测试代码

打开两个终端窗口,分别运行grpc_server.py, grpc_client.py

可以看到client.py 窗口显示

(enva) D:\workplace\python\enva\test1>py grpc_client.py
User created: tom
user {
  username: "alexa"
  user_id: 1
}
user {
  username: "christie"
  user_id: 1
}

服务器窗口同时显示

(enva) D:\workplace\python\enva\test1>py grpc_server.py
{'user-agent': 'grpc-python/1.50.0 grpc-c/28.0.0 (windows; chttp2)', 'ip': '127.0.0.1'}

学习小记

流式处理编程,其实比较简单,只是流式处理一方要构建多条mesage,接口方法会自动逐条发送,接收侧也只须遍历读取即可。流式处理用来发送大文件,如图片,视频之类,比REST有明显优势,而且有规范接口,也便于团队合作。

加载全部内容

相关教程
猜你喜欢
用户评论