亲宝软件园·资讯

展开

微服务通信方式——gRPC

甲由崽 人气:1

 

微服务设计的原则是单一职责、轻量级通信、服务粒度适当,而说到服务通信,我们熟知的有MQ通信,还有REST、Dubbo和Thrift等,这次我来说说gRPC,

谷歌开发的一种数据交换格式,说不定哪天就需要上了呢,多学习总是件好事。

准备:

Idea2019.03/Gradle6.0.1/Maven3.6.3/JDK11.0.4/Proto3.0/gRPC1.29.0

难度: 新手--战士--老兵--大师

目标:

  1. 实现四种模式下的gRPC通信

说明:

为了遇见各种问题,同时保持时效性,我尽量使用最新的软件版本。源码地址,其中的day28:https://github.com/xiexiaobiaohttps://img.qb5200.com/download-x/dubbo-project

1 介绍

先引用一小段官方介绍:

Protocol Buffers - Google's data interchange format. Protocol Buffers (a.k.a., protobuf) are Google's language-neutral, platform-neutral, extensible

mechanism for serializing structured data.

Protocol Buffers,即protobuf,类比如JSON,XML等数据格式,实现了对结构化数据序列化时跨语言,跨平台,可扩展的机制。通过预定义.proto文件,

来协商通信双方的数据交换格式、接口方法,这即是“Protocol”的原译。.proto文件能编译为多种语言对应的代码,从而实现跨平台。

grpc使用http2.0作为通信方式,先说http2.0,它是对1.0的增强,而不是替代,特点:

  • 二进制传输:相比1.0版的文本,2.0使用二进制帧传输数据;
  • 多路复用:一个TCP连接中包含多个帧组成的流,再解析为不同的请求,从而可同时发送多个请求,避免1.0版的队头阻塞;
  • 首部压缩:1.0版的Header部分信息很多,2.0使用HPACK压缩格式减少数据量;
  • 服务端推送:服务端预先主动推送客户端必要的信息,从而减少延迟;

适用场景:定制化接口及数据交换格式,追求高性能,通信对宽带敏感

缺点:大多数HTTP Server尚不支持http2.0,Nginx目前只能将其降级为1.0处理;没有连接池、服务发现和负载均衡的实现;

2 实战步骤

A 普通模式

2.1 建立gradle类型项目,命名为GPRC-project,我上传的Git代码中还包含了maven类型的项目,按照官方说明制作,运行方法略异,见其中.txt文件说明。

2.2 编写.proto文件,GPRC-project\src\main\proto\helloworld.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.biao.grpc.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
} 

以上代码解析,几个java相关参数:

  1. java_multiple_files 是否单独生成在.proto文件中定义的顶级message、enum、和service,否则只生成一个包装了内部类的外部类; java_package 编译生成的java类文件的包位置;java_outer_classname 外部类名称;java_generic_services是否生成各语言版本的基类(已过时);

  2. service Greeter {...} 定义服务和包含的方法;

  3. message 定义消息体结构,这里定义了一个 String类型,且只有一个字符串类型的value成员,该成员编号为1来代替名字,这也是protobuf体积小的原因之一,别的数据描述语言(json、xml)都是通过成员名字标识,而Portobuf通过唯一编号,只是不便于查阅。

 

2.3 编写GPRC-project\build.gradle,包含依赖引入和gradle编译配置:

buildscript {
    repositories {
        mavenCentral()
        maven{ url 'http://maven.aliyun.com/nexus/content/groups/public/'}
    }
    dependencies {
        // protobuf 编译插件,会在右侧gradle--other中,添加Proto相关的任务(共6个)
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.12'
    }
}

//plugins {
//    id 'java'
//    id 'com.google.protobuf'
//    id 'com.google.protobuf' version '0.8.8'
//}

apply plugin: 'java'
apply plugin: 'com.google.protobuf'


group 'com.biao.grpc'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
    maven{ url 'http://maven.aliyun.com/nexus/content/groups/public/'}
}

dependencies {

    //这里必须引入lib目录的j2ee相关jar,否则即使每次手动加入jar依赖,但启动应用时gradle会reimport,
    // 导致一直提示因少依赖而无法解析,这也是gradle引入第三方jar的方式
    compile fileTree(dir: "lib", include: "*.jar")

    testCompile group: 'junit', name: 'junit', version: '4.12'
    implementation 'io.grpc:grpc-netty-shaded:1.29.0'
    implementation 'io.grpc:grpc-protobuf:1.29.0'
    implementation 'io.grpc:grpc-stub:1.29.0'
}

sourceSets {
    main {
        proto {
            // .proto文件目录
            srcDir 'src/main/proto'
        }
        java {
            // include self written and generated code, 源代码生成到一个单独的目录
            srcDirs 'src/main/java','generated-sources/main/java'
        }
    }
    // remove the test configuration - at least in your example you don't have a special test proto file
/*    test {
        proto {
            srcDir 'src/test/proto'
        }
        proto {
            srcDir 'src/test/java'
        }
    }*/
}

protobuf {
    // Configure the protoc executable
    protoc {
        // Download from repositories ,从仓库下载,
        artifact = "com.google.protobuf:protoc:3.11.0"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.29.0'
        }
    }
    //'src' 改为'generated-sources',则会变更.proto文件对应的java类文件生成目录
    generateProtoTasks.generatedFilesBaseDir = 'src/main/java'

    generateProtoTasks {
        // all() returns the collection of all protoc tasks
        all()*.plugins {
            grpc {}
        }

        // In addition to all(),you may get the task collection by various
        // criteria:

        // (Java only) returns tasks for a sourceSet
        ofSourceSet('main')
    }
}

以上代码解析:

  1. Moduleapply plugin: 'com.google.protobuf' 引入protobuf-gradle-plugin,作为protobuf 编译插件,会在右侧gradle--other中,自动添加proto相关的任务(共6个)
  2. compile fileTree(dir: "lib", include: "*.jar") 这里必须引入lib目录的j2ee相关jar,否则即使每次手动加入jar依赖,但启动应用时gradle会reimport,导致一直提示因少依赖而无法解析,这也是gradle引入第三方jar的正确方式
  3. implementation 'io.grpc:grpc-protobuf:1.29.0',gradle新版语法,implementation 仅仅对当前的Module提供接口,对外隐藏不必要的接口,而compile(新版升级为 api )依赖的库将会完全参与编译和打包,
  4. protobuf {...}中则声明protoc-gen-grpc-java插件来源的.proto文件源目录及生成目标目录,

 

2.4 运行右侧 task :gradle --> other --> generateProto,则自动生成类文件和接口文件(XXXGrpc),并且很贴心的是,如果原.proto文件有注释,

生成的文件中会自动带上原注释内容。可以看到,helloworld.proto 和 stream.proto 生成的对应的文件,前者为 6 个,后者为 1 个,因java_multiple_files参数不同。

生成文件后,将文件移动到src/main/java对应的包下面,并将build.gradle与自动生成文件的部分注释掉,否则启动应用时,又会自动生成,导致IDE提示类重复。

 

2.5 看下源码,包com.biao.grpc.helloworld下面生成了对应于helloworld.proto的类和接口,包括服务、请求消息结构体和响应消息结构体。

com.biao.grpc.helloworld.GreeterGrpcgetSayHelloMethod 方法即约定了RPC的方法、请求/响应数据类型,并获取方法全名:

@io.grpc.stub.annotations.RpcMethod(
      fullMethodName = SERVICE_NAME + '/' + "SayHello",
      requestType = com.biao.grpc.helloworld.HelloRequest.class,
      responseType = com.biao.grpc.helloworld.HelloReply.class,
      methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
  public static io.grpc.MethodDescriptor<com.biao.grpc.helloworld.HelloRequest,
      com.biao.grpc.helloworld.HelloReply> getSayHelloMethod() {
    io.grpc.MethodDescriptor<com.biao.grpc.helloworld.HelloRequest, com.biao.grpc.helloworld.HelloReply> getSayHelloMethod;
    if ((getSayHelloMethod = GreeterGrpc.getSayHelloMethod) == null) {
      synchronized (GreeterGrpc.class) {
        if ((getSayHelloMethod = GreeterGrpc.getSayHelloMethod) == null) {
          GreeterGrpc.getSayHelloMethod = getSayHelloMethod =
              io.grpc.MethodDescriptor.<com.biao.grpc.helloworld.HelloRequest, com.biao.grpc.helloworld.HelloReply>newBuilder()
              .setType(io.grpc.MethodDescriptor.MethodType.UNARY)
              .setFullMethodName(generateFullMethodName(SERVICE_NAME, "SayHello"))
              .setSampledToLocalTracing(true)
              .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
                  com.biao.grpc.helloworld.HelloRequest.getDefaultInstance()))
              .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
                  com.biao.grpc.helloworld.HelloReply.getDefaultInstance()))
              .setSchemaDescriptor(new GreeterMethodDescriptorSupplier("SayHello"))
              .build();
        }
      }
    }
    return getSayHelloMethod;
  }
 

2.6 建立server端:

public class HelloWorld_Server {
    private static final Logger logger = Logger.getLogger(HelloWorld_Server.class.getName());


    private int port = 50051;
    private Server server;

    private void start() throws IOException{
        server = ServerBuilder.forPort(port)
                .addService(new GreeterImpl())
                .build()
                .start();
        logger.info("Server started, listening on "+ port);

        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run(){

                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                HelloWorld_Server.this.stop();
                System.err.println("*** server shut down");
            }
        });
    }

    private void stop(){
        if (server != null){
            server.shutdown();
        }
    }

    // block 一直到退出程序
    private void blockUntilShutdown() throws InterruptedException {
        if (server != null){
            server.awaitTermination();
        }
    }


    public  static  void main(String[] args) throws IOException, InterruptedException {

        final HelloWorld_Server server = new HelloWorld_Server();
        server.start();
        server.blockUntilShutdown();
    }


    // 实现 定义一个实现服务接口的类
    private class GreeterImpl extends com.biao.grpc.helloworld.GreeterGrpc.GreeterImplBase {

        @Override
        public void sayHello(com.biao.grpc.helloworld.HelloRequest req, 
                             StreamObserver<com.biao.grpc.helloworld.HelloReply> responseObserver){
            com.biao.grpc.helloworld.HelloReply reply = com.biao.grpc.helloworld.HelloReply.
                    newBuilder()
                    .setMessage(("Hello "+req.getName()))
                    .build();
            responseObserver.onNext(reply);
            responseObserver.onCompleted();
            System.out.println("Message from gRPC-Client:" + req.getName());
        }
    }
}

以上代码解析:通过GreeterImpl扩展GreeterGrpc.GreeterImplBase具体实现了gRPC服务的方法,作为服务端响应请求的业务逻辑。

 

2.7 建立client端:

public class HelloWorld_Client {
    private final ManagedChannel channel;
    private final com.biao.grpc.helloworld.GreeterGrpc.GreeterBlockingStub blockingStub;
    private static final Logger logger = Logger.getLogger(HelloWorld_Client.class.getName());

    public HelloWorld_Client(String host,int port){
        channel = ManagedChannelBuilder.forAddress(host,port)
                .usePlaintext()
                .build();

        blockingStub = com.biao.grpc.helloworld.GreeterGrpc.newBlockingStub(channel);
    }


    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    public  void greet(String name){
        com.biao.grpc.helloworld.HelloRequest request = com.biao.grpc.helloworld.HelloRequest
                .newBuilder()
                .setName(name)
                .build();
        com.biao.grpc.helloworld.HelloReply response;
        try{
            response = blockingStub.sayHello(request);
        } catch (StatusRuntimeException e)
        {
            logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
            return;
        }
        logger.info("Message from gRPC-Server: "+response.getMessage());
    }

    public static void main(String[] args) throws InterruptedException {
        HelloWorld_Client client = new HelloWorld_Client("127.0.0.1",50051);
        try{
            String user = "world";
            if (args.length > 0){
                user = args[0];
            }
            client.greet(user);
        }finally {
            client.shutdown();
        }
    }
}

以上代码解析:在greet方法中,引用HelloRequest和HelloReply,并发起gRPC业务请求。

 

2.8 先运行com.biao.grpc.helloworld.HelloWorld_Server,再运行com.biao.grpc.helloworld.HelloWorld_Client, 输出以下为成功:

B 流模式

gRPC有四种通信模式:

  • 普通模式:一次请求对应一次响应,和普通方法请求一样;
  • 客户端流模式:客户端使用流模式传入多个请求对象,服务端返回一个响应结果;
  • 服务端流模式:一个请求对象,服务端使用流模式传回多个结果对象;
  • 双向流模式:客户端流式和服务端流式组合;

前面有说过,gRPC使用http/2通信,数据传输使用二进制帧,帧是HTTP2.0通信的最小单位,而消息由一或多个帧组成,流是比消息更大的通讯单位,

是TCP连接中的一个虚拟通道。每个数据流以消息的形式发送,消息中的帧可以乱序发送,然后再根据每个帧首部的流标识符重新组装为流。

前面的helloworld算第1种,我这里写了后 3 种模式,根据stream.proto开发,使用同一个server端:com.biao.grpc.stream.StreamServer,其中实现

了客户端流模式、服务端流模式和双向流模式3种通信模式的具体方法实现,

public class StreamServer {

    private static int port = 8883;
    private static io.grpc.Server server;

    public void run() {
        ServiceImpl serviceImpl = new ServiceImpl();
        server = io.grpc.ServerBuilder.forPort(port).addService(serviceImpl).build();
        try {
            server.start();
            System.out.println("Server start success on port:" + port);
            server.awaitTermination();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 实现 定义一个实现服务接口的类
    private static class ServiceImpl extends StreamServiceGrpc.StreamServiceImplBase {

        @Override
        public void serverSideStreamFun(Stream.RequestData request, StreamObserver<Stream.ResponseData> responseObserver) {
            // TODO Auto-generated method stub
            System.out.println("请求参数:" + request.getText());
            for (int i = 0; i < 10; i++) {
                responseObserver.onNext(Stream.ResponseData.newBuilder()
                        .setText("你好" + i)
                        .build());
            }
            responseObserver.onCompleted();
        }

        @Override
        public StreamObserver<Stream.RequestData> clientSideStreamFun(StreamObserver<Stream.ResponseData> responseObserver) {
            // TODO Auto-generated method stub
            return new StreamObserver<Stream.RequestData>() {
                private Stream.ResponseData.Builder builder = Stream.ResponseData.newBuilder();

                @Override
                public void onNext(Stream.RequestData value) {
                    // TODO Auto-generated method stub
                    System.out.println("请求参数:" + value.getText());

                }

                @Override
                public void onError(Throwable t) {
                    // TODO Auto-generated method stub

                }

                @Override
                public void onCompleted() {
                    // TODO Auto-generated method stub
                    builder.setText("数据接收完成");
                    responseObserver.onNext(builder.build());
                    responseObserver.onCompleted();
                }
            };
        }

        @Override
        public StreamObserver<Stream.RequestData> twoWayStreamFun(StreamObserver<Stream.ResponseData> responseObserver) {
            // TODO Auto-generated method stub
            return new StreamObserver<Stream.RequestData>() {

                @Override
                public void onNext(Stream.RequestData value) {
                    // TODO Auto-generated method stub
                    System.out.println("请求参数:" + value.getText());
                    responseObserver.onNext(Stream.ResponseData.newBuilder()
                            .setText(value.getText() + ",欢迎你的加入")
                            .build());
                }

                @Override
                public void onError(Throwable t) {
                    // TODO Auto-generated method stub
                    t.printStackTrace();
                }

                @Override
                public void onCompleted() {
                    // TODO Auto-generated method stub
                    responseObserver.onCompleted();
                }
            };
        }
    }

    public static void main(String[] args) {
        StreamServer server = new StreamServer();
        server.run();
    }

}
 

另外我写了3个client端,代码分析,略!运行后即可看到效果,我这里给个双向流的结果例子:

附:手动编译

为了加强动手能力,我这里也做个手动编译生成java代码的步骤:

Java代码生成编译器下载: https://repo.maven.apache.org/maven2/io/grpc/protoc-gen-grpc-java/

下载安装protobuf,https://github.com/protocolbuffers/protobuf/releases?after=v3.0.0-alpha-4

To install protobuf, you need to install the protocol compiler (used to compile .proto files) and the protobuf runtime for your chosen programming language.

要使用protobuf,需先安装协议编译器(protocol compiler),用于编译.proto文件,并且作为protobuf的运行时环境。其实安装protobuf等价于安装其编译环境protoc。

 

环境变量设置,将protoc的解压目录添加到Path下:

 

CMD下使用protoc --version命令输出如下即为成功:

手动编译:进入 .proto 文件所在目录, 使用protoc.exe生成消息结构体,下图中标号 2 :

protoc <待编译文件> --java_out=<输出文件保存路径>

使用protoc-gen-grpc-java生成服务接口:下图中标号 3 :

protoc *.proto --plugin=protoc-gen-grpc-java=C:\protobuf-3.0-beta\protoc-gen-grpc-java-1.9.1-windows-x86_64.exe --grpc-java_out=./

 

全文完!


我的其他文章:

  • 1 分布式任务调度系统
  • 2 Dubbo学习系列之十八(Skywalking服务跟踪)
  • 3 Spring优雅整合Redis缓存
  • 4 SOFARPC模式下的Consul注册中心
  • 5 八种控制线程顺序的方法

       只写原创,敬请关注 

加载全部内容

相关教程
猜你喜欢
用户评论