.NET Core使用gRPC
有梦想的鱼i 人气:01.什么是gRPC
1.基本介绍
gRPC 一开始由 google 开发,是一款语言中立、平台中立、开源的远程过程调用(RPC)系统,所以叫g(google)RPC。支持主流开发语言(C, C++, Python, PHP, Ruby, NodeJS, C#, Objective-C、Golang
2.proto文件
用于定义协议接口和数据格式,不同的语言,相同的文件,可以理解为一项约定,序列化支持 PB(Protocol buffer)和 JSON,PB 是一种语言无关的高性能序列化框架,基于 HTTP/2 + PB, 保障了 RPC 调用的高性能。
说这么多感觉还是很模糊,上面只是介绍了gRPC是什么,在我看来其实它大致的作用跟WebServices
、WCF
差不多,在某个维度上可以说都是作为远程调用,只不过所处的时代和本身的特性,以及生态的发展下,导致它成为目前比较火热的原因之一,具体的内容后面再讨论,先用起来,再深入了解,接下来我们使用.Net Core 先搭建一个简单的Demo,来亲自上手实践一下。
其实背景就是最近在做一个项目,需要做一个公司内部的Nuget包,大概的业务就是Nuget包请求微服务数据
,开始想直接使用http的方式,基于整体项目结构后面定了使用gRPC,既学即用,刚好也可以在实际项目应用中,查漏补缺。
3.上手实践
1.使用vs首先创建一个NetCore gRPC
项目,得到一个项目结构如下,框架默认包含一个已经预先定义
的协议文件
和服务接口
,如果使用其他的方式也很简单直接引用相关的包,然后添加以下服务就可以了
2.我们自己创建一个自己的接口,定义一个协议文件mytestdemo.proto,然后定义一些方法,主要包含如下几类,其他的一些用法可以在网上搜到,或者去看文档,只是简单列一下
1.有参数有返回值
2.无参数有返回值 ,无参使用google.protobuf.Empty
3.集合作为返回值,必须使用repeated
标记
如果你真的不熟悉protobuf的定义方式和写法,这个无伤大雅,可以使用
工具生成
syntax = "proto3"; //引入集合包 import "google/protobuf/empty.proto"; //命名空间 option csharp_namespace = "GrpcDemo"; //包名 package MyTest; //接口定义 service MyTestDemo { rpc MultipleParam(MultipleRequestPara) returns (MultipleRespone); rpc NoParam(google.protobuf.Empty) returns (SingeRespone); rpc CollectionParam(google.protobuf.Empty) returns (CollectionResponePara); } //多参数请求参数 message MultipleRequestPara { int32 Id = 1; string Name = 2;//参数个数 bool IsExists =3; } message SingeRespone { bool Success =1; TestEntity a1 = 2; message TestEntity{ int32 Id =1; } } //多参数返回 message MultipleRespone { bool Success =1; } //返回集合参数 message CollectionResponePara { repeated CollectionChildrenRespone1 param1 =1; repeated CollectionChildrenRespone2 param2 =2; repeated int32 param3 =3; } //集合属性1 message CollectionChildrenRespone1 { int32 Id =1; } //集合属性2 message CollectionChildrenRespone2 { string Name =1; }
3.右键类,选择添加,选择连接的服务,添加gRPC,或者直接修改项目文件,将新建的proto添加到类中
3.1 重新生成,然后创建服务代码MyTestService
,如下代码
3.2 在启动类中映射gRPC app.MapGrpcService<MyTestService>();
否则会报service is unimplemented.
/// <summary> /// 继承自MyTestDemo.MyTestDemoBase /// </summary> public class MyTestService : MyTestDemo.MyTestDemoBase { public override async Task<MultipleRespone> MultipleParam(MultipleRequestPara request, ServerCallContext context) { return await Task.FromResult(new MultipleRespone { Success = true, }); } public override async Task<SingeRespone> NoParam(Empty request, ServerCallContext context) { TestEntity t = new TestEntity(); t.Id = 1; return await Task.FromResult(new SingeRespone { Success = true, entity = t }); ; } public override async Task<CollectionResponePara> CollectionParam(Empty request, ServerCallContext context) { CollectionResponePara collectionResponePara = new CollectionResponePara(); CollectionChildrenRespone1 a = new CollectionChildrenRespone1 { Id = 1 }; CollectionChildrenRespone2 b = new CollectionChildrenRespone2 { Name = "jeck" }; collectionResponePara.Param1.Add(a); collectionResponePara.Param2.Add(b); return await Task.FromResult(collectionResponePara); } }
4.创建客户端,将proto文件拷贝过去调用,添加服务为客户端模式,然后添加如下代码
using (var channel = GrpcChannel.ForAddress("https://localhost:7245")) { var client = new MyTestDemo.MyTestDemoClient(channel); //多参数调用 var reply = client.MultipleParam(new MultipleRequestPara { Id = 123, Name = "sa", IsExists = true }); //无参调用 var singeRespone = client.NoParam(new Google.Protobuf.WellKnownTypes.Empty()); //调用集合 var collectionResponePara = client.CollectionParam(new Google.Protobuf.WellKnownTypes.Empty()); }
2.gRPC流
gRPC中支持4种流,分别是:
1.简单 RPC(Unary RPC)
它的特点是传入一个请求对象,返回一个请求对象
2.服务端流式 RPC (Server streaming RPC)
客户端传入一个请求对象,服务端可以返回多个结果对象,形象的表示就是客户端传入一个股票的id,服务端就将股票的信息远远不断地返回
3.客户端流式 RPC (Client streaming RPC)
客户端源源不断的传入多个请求对象,服务端返回一个结果对象,形象的表示例如上位机采集实时将采集数据,源源不断的传入服务器
4.双向流式 RPC (Bi-directional streaming RPC)
结合服务端和客户端流,传入多请求,返回多个结果,相当于建立长连接,可以进行相互的操作
下面我们就主要介绍几类主要的流的使用以及步骤
1.服务端流、客户端流、双向流
服务端流主要的特征就是服务端会源源不断的响应数据到客户端
1.首先还是创建protobuf
文件,声明
一个服务端流
的rpc接口ExcuteServerStream
和一个客户端流接口ExcuteClientStream
syntax = "proto3"; option csharp_namespace = "GrpcDemo"; package streamtest; service StreamTest { //服务端流定义 rpc ExcuteServerStream(StreamForClientRequest) returns (stream StreamForClientRespones); //客户端流定义 rpc ExcuteServerStream(StreamForClientRequest) returns (stream StreamForClientRespones); //双向流 rpc ExcuteMutualStream(stream StreamForClientRequest) returns ( stream StreamForClientRespones); } //调用流的请求对象 message StreamForClientRequest{ int32 Id=1; } //调用端流的返回对象 message StreamForClientRespones{ repeated int32 Number=1;//集合 }
2.重新生成服务引用,然后创建对应的实现接口StreamTestService
并重写生成的服务,然后在启动程序映射服务接口
//服务端流接口 public override async Task ExcuteServerStream(StreamForClientRequest req,IServerStreamWriter<StreamForClientRespones> resStream,ServerCallContext context) { //list集合作为模拟数据源 var list = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 }; foreach (var item in list) { Console.WriteLine($"********{item}*******"); var ele = new StreamForClientRespones(); ele.Number.Add(item); //写入流中 await resStream.WriteAsync(ele); //模拟源源不断的数据响应 await Task.Delay(1000); } } //客户端流接口 public override async Task<StreamForClientRespones> ExcuteClientStream( IAsyncStreamReader<StreamForClientRequest> requestStream, ServerCallContext context) { StreamForClientRespones intArrayModel = new StreamForClientRespones(); //获取请求流中的数据 while (await requestStream.MoveNext()) { intArrayModel.Number.Add(requestStream.Current.Id + 1); Console.WriteLine($"ExcuteClientStream Number {requestStream.Current.Id} 获取到并处理."); Thread.Sleep(100); } return intArrayModel; } //双向流 public override async Task ExcuteMutualStream(IAsyncStreamReader<StreamForClientRequest> reqStream,IServerStreamWriter<StreamForClientRespones> resStream,ServerCallContext context) { int i = 0; //从流中获取请求 while (await reqStream.MoveNext()) { i++; var ele = new StreamForClientRespones(); ele.Number.Add(i); //写入响应流 await resStream.WriteAsync(ele); await Task.Delay(500); } }
3.创建客户端调用,把服务端的protobuf
文件拷贝到客户端,然后生成,启动调用
//调用服务端流 using (var channel = GrpcChannel.ForAddress("https://localhost:7245")) { var client = new StreamTest.StreamTestClient(channel); //调用服务端流 var reply = client.ExcuteServerStream(new StreamForClientRequest { Id =1}); //利用线程取消 //CancellationTokenSource cts = new CancellationTokenSource(); //指定在2s后进行取消操作 //cts.CancelAfter(TimeSpan.FromSeconds(2.5)); //var reply = client.ExcuteServerStream(new StreamForClientRequest { Id = 1 }, cancellationToken: cts.Token); await foreach (var resp in reply.ResponseStream.ReadAllAsync()) { Console.WriteLine(resp.Number[0]); } } //调用客户端流 using (var channel = GrpcChannel.ForAddress("https://localhost:7245")) { var client = new StreamTest.StreamTestClient(channel); //调用客户端流接口 var reply = client.ExcuteClientStream(); //模拟源源不断的数据发送 for (int i = 0; i < 10; i++) { await reply.RequestStream.WriteAsync(new StreamForClientRequest() { Id = new Random().Next(0, 20) }); await Task.Delay(100); } Console.WriteLine("*************发送完毕*******************"); await reply.RequestStream.CompleteAsync(); //接受结果 foreach (var item in reply.ResponseAsync.Result.Number) { Console.WriteLine($"This is {item} Result"); } } //双向流 using (var channel = GrpcChannel.ForAddress("https://localhost:7245")) { var client = new StreamTest.StreamTestClient(channel); //调用双向流接口 var reply = client.ExcuteMutualStream(); //获取流放入线程 var bathCatRespTask = Task.Run(async () => { await foreach (var resp in reply.ResponseStream.ReadAllAsync()) { Console.WriteLine(resp.Number[0]); } }); //写入流 for (int i = 0; i < 10; i++) { await reply.RequestStream.WriteAsync(new StreamForClientRequest() { Id = new Random().Next(0, 20) }); await Task.Delay(100); } //发送完毕 await reply.RequestStream.CompleteAsync(); //开始接收响应 await bathCatRespTask; }
2.NetCore Web
项目作为客户端
1.首先还是先引入proto文件,然后生成客户端
2.在web项目中的控制器中,我们就不能直接简陋的使用 using的方式来连接gRPC服务端了,可以利用内置的依赖注入的模式来完成
3.下载Grpc.Net.ClientFactory
包,然后在`Program将客户端添加到依赖注入容器
builder.Services.AddGrpcClient<MyTestDemo.MyTestDemoClient>(option => { option.Address = new Uri("https://localhost:7245"); });
4.然后在控制器中直接注入,就可以使用
public class gRPCTestController : ControllerBase { private readonly MyTestDemoClient _client; public gRPCTestController(MyTestDemoClient client) { _client = client; } [HttpGet(Name = "Excute")] public async Task<string> Get() { var a = await _client.NoParamAsync(new Google.Protobuf.WellKnownTypes.Empty()); var str = a.Success.ToString(); return str; } }
5.调用出现如下问题 ,使用dotnet dev-certs https --trust
3.gRPC AOP拦截
有时候我们想在gRPC服务执行前后做一些操作,这时候可以使用其Aop拦截
,如果你要问拦截器可以做什么,我不太想解释,继续往下看,拦截器方法定义在Interceptor
类中,服务端和客户端拦截是一样的原理,下面列举一些拦截器:
名称 | 特点 |
---|---|
BlockingUnaryCall | 拦截阻塞调用 |
AsyncUnaryCall | 拦截异步调用 |
AsyncServerStreamingCall | 拦截异步服务端流调用 |
AsyncClientStreamingCall | 拦截异步客户端流调用 |
AsyncDuplexStreamingCall | 拦截异步双向流调用 |
UnaryServerHandler | 用于拦截和传入普通调用的服务器端处理程序 |
ClientStreamingSerHandler | 用于拦截客户端流调用的服务器端处理程序 |
ServerStreamingSerHandler | 用于拦截服务端流调用的服务器端处理程序 |
DuplexStreamingSerHandler | 用于拦截双向流调用的服务器端处理程序 |
1.声明一个UnaryServerHandlerInterceptor
类型的自定义拦截器,用于拦截和传入普通调用的服务器端处理程序
,然后继承自Grpc.Core.Interceptors.Interceptor类, 重写已经定义的方法UnaryServerHandler
public class UnaryServerHandlerInterceptor : Interceptor { public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { Console.WriteLine("执行调用前"); var result = await continuation(request, context); Console.WriteLine("执行调用后"); // 或向 客户端附加 一些信息 // 也可以 用try catch 做异常日志 // 可以从 context中取出 调用方ip,做ip限制 // 可以 监控continuation 的 执行时间 return result; } }
2.然后在注入容器时加入选项
builder.Services.AddGrpc(option => { option.EnableDetailedErrors = true; //加入服务端拦截器选项 option.Interceptors.Add<UnaryServerHandlerInterceptor>(); });
加载全部内容