Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core
Zhang_Xiang 人气:0前置条件:
《Dapr运用》
《Dapr 运用之 Java gRPC 调用篇》
《Dapr 运用之集成 Asp.Net Core Grpc 调用篇》
搭建 RabbitMQ
Docker 搭建 RabbitMQ 服务
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
创建 rabbiqmq.yaml
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: messagebus spec: type: pubsub.rabbitmq metadata: - name: host value: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672" - name: consumerID value: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID" - name: durable value: "true" # Optional. Default: "false" - name: deletedWhenUnused value: "false" # Optional. Default: "false" - name: autoAck value: "false" # Optional. Default: "false" - name: deliveryMode value: "2" # Optional. Default: "0". Values between 0 - 2. - name: requeueInFailure value: "true" # Optional. Default: "false".
改造 StorageService.Api
目的:把 StorageService 从 Grpc 客户端改造为 Grpc 服务端,并 Sub Storage.Reduce 主题,完成减库存操作。
- 删除 Storage 中无用的代码 StorageController.cs
修改 Program.cs 中的 CreateHostBuilder 代码为
public static IHostBuilder CreateHostBuilder(string[] args) { return Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.ConfigureKestrel(options => { options.Listen(IPAddress.Loopback, 5003, listenOptions => { listenOptions.Protocols = HttpProtocols.Http2; }); }); webBuilder.UseStartup<Startup>(); }); }
添加 DaprClientService
public sealed class DaprClientService : DaprClient.DaprClientBase { public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context) { var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope(); topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce"); return Task.FromResult(topicSubscriptionsEnvelope); } }
Dapr 运行时将调用此方法获取 StorageServcie 关注的主题列表。
修改 Startup.cs
/// <summary> /// This method gets called by the runtime. Use this method to add services to the container. /// </summary> /// <param name="services">Services.</param> public void ConfigureServices(IServiceCollection services) { services.AddGrpc(); services.AddDbContextPool<StorageContext>(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); }); }
/// <summary> /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline. /// </summary> /// <param name="app">app.</param> /// <param name="env">env.</param> public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapSubscribeHandler(); endpoints.MapGrpcService<DaprClientService>(); }); }
复制 rabbimq.yaml 文件到 components 文件夹中,删除 redis_messagebus.yaml 文件
启动 StorageService 服务
dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
- 使用 Java 开发一个 Order 服务端,Order 服务提供的功能为
- 下单
- 查看订单详情
- 获取订单列表
在当前上下文中着重处理的是下单功能,以及下单成功后 Java 服务端将发布一个事件到 Storage.Reduce 主题,即减少库存。
创建 CreateOrder.proto 文件
syntax = "proto3"; package daprexamples; option java_outer_classname = "CreateOrderProtos"; option java_package = "generate.protos"; service OrderService { rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse); rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse); rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse); } message CreateOrderRequest { string ProductID = 1; //Product ID int32 Amount=2; //Product Amount string CustomerID=3; //Customer ID } message CreateOrderResponse { bool Succeed = 1; //Create Order Result,true:success,false:fail } message RetrieveOrderRequest{ string OrderID=1; } message RetrieveOrderResponse{ Order Order=1; } message GetOrderListRequest{ string CustomerID=1; } message GetOrderListResponse{ repeated Order Orders=1; } message Order{ string ID=1; string ProductID=2; int32 Amount=3; string CustomerID=4; }
使用 protoc 生成 Java 代码
protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto
- 引用 MyBatis 做为 Mapper 工具
- 修改 HelloWorldService.java 文件,提取 GrpcHelloWorldDaprService.java 到单独的包中,在此文件中添加
createOrder()
、getOrderList()
、retrieveOrder()
三个函数的实现 - 复制 rabbimq.yaml 文件到 components 文件夹中,删除原有 redis_messagebus.yaml 文件
启动 OrderService 服务
dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
创建 Golang Grpc 客户端,该客户端需要完成创建订单 Grpc 调用,订单创建成功发布扣除库存事件
引用 CreateOrder.proto 文件,并生成 CreateOrder.pb.go 文件
如未安装 protoc-gen-gogo ,通过一下命令获取并安装
go get github.com/gogo/protobuf/gogoproto
安装 protoc-gen-gogo
go install github.com/gogo/protobuf/gogoproto
根据 proto 文件生成代码
protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
客户端代码,创建订单
... response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{ Id: "OrderService", Data: createOrderRequestData, Method: "createOrder", }) if err != nil { fmt.Println(err) return } ...
添加 DataToPublish.proto 文件,此文件作为事件发布数据结构
syntax = "proto3"; package daprexamples; option java_outer_classname = "DataToPublishProtos"; option java_package = "generate.protos"; message StorageReduceData { string ProductID = 1; int32 Amount=2; }
生成 DataToPublish 代码
protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
修改 main.go 代码,根据 createOrder 结果判断是否要发布信息到消息队列
... createOrderResponse := &daprexamples.CreateOrderResponse{} if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil { fmt.Println(err) return } fmt.Println(createOrderResponse.Succeed) if !createOrderResponse.Succeed { //下单失败 return } storageReduceData := &daprexamples.StorageReduceData{ ProductID: createOrderRequest.ProductID, Amount: createOrderRequest.Amount, } storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData) if err != nil { fmt.Println(err) return } _, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{ Topic: "Storage.Reduce", Data: &any.Any{Value: storageReduceDataData}, }) fmt.Println(storageReduceDataData) if err != nil { fmt.Println(err) } else { fmt.Println("Published message!") } ...
注意: 发送数据前,使用 jsoniter 转换数据为 json 字符串,原因是如果直接传输 Grpc 流,当前版本(0.3.x) Dapr runtime 打包数据时使用 Json 打包,解包使用 String ,导致数据不一致。
- 复制 rabbimq.yaml 文件到 components 文件夹,删除原有 redis_messagebus.yaml 文件
启动 golang Grpc 客户端
dapr run --app-id client go run main.go
输出
== APP == true == APP == Published message!
RabbitMQ
- 在浏览器中输入
http://localhost:15672/
,账号和密码均为 guest - 查看 Connections ,有3个连接
- 这个3个连接来自配置了 messagebus.yaml 组件的三个服务
查看 Exchanges
Name Type Features Message rate in Message rate out (AMQP default) direct D Storage.Reduce fanout D amq.direct direct D amq.fanout fanout D ...
着重看 Storage.Reduce ,可以看出 Dapr 运行时创建了一个 fanout 类型的 Exchange ,这表明该 Exhange 中的数据是广播的。
查看 Queues
Dapr 运行时创建了 storageService-Storage.Reduce ,该 Queue 绑定了 Storage.Reduce Exchange ,所以可以收到 Storage.Reduce 的广播数据。
- 在浏览器中输入
DotNet Core StorageService.Api 改造以完成 Sub 事件
打开 DaprClientService.cs 文件,更改内容为
public sealed class DaprClientService : DaprClient.DaprClientBase { private readonly StorageContext _storageContext; public DaprClientService(StorageContext storageContext) { _storageContext = storageContext; } public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context) { var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope(); topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce"); return Task.FromResult(topicSubscriptionsEnvelope); } public override async Task<Empty> OnTopicEvent(CloudEventEnvelope request, ServerCallContext context) { if (request.Topic.Equals("Storage.Reduce")) { StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8()); Console.WriteLine("ProductID:" + storageReduceData.ProductID); Console.WriteLine("Amount:" + storageReduceData.Amount); await HandlerStorageReduce(storageReduceData); } return new Empty(); } private async Task HandlerStorageReduce(StorageReduceData storageReduceData) { Guid productID = Guid.Parse(storageReduceData.ProductID); Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID)); if (storageFromDb == null) { return; } if (storageFromDb.Amount < storageReduceData.Amount) { return; } storageFromDb.Amount -= storageReduceData.Amount; Console.WriteLine(storageFromDb.Amount); await _storageContext.SaveChangesAsync(); }
- 说明
- 添加
GetTopicSubscriptions()
将完成对主题的关注- 当应用停止时,RabbitMQ 中的 Queue 自动删除
- 添加
OnTopicEvent()
重写,此方法将完成对 Sub 主题的事件处理
HandlerStorageReduce
用于减少库存
- 添加
启动 DotNet Core StorageService.Api Grpc 服务,启动 Java OrderService Grpc 服务,启动 Go Grpc 客户端
DotNet Core
dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
Java
dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
go
dapr run --app-id client go run main.go
go grpc 输出为
== APP == true == APP == Published message!
查看 MySql Storage 数据库,对应产品库存减少 20
至此,通过 Dapr runtime 完成了 Go 和 Java 之间的 Grpc 调用,并通过 RabbitMQ 组件完成了 Pub/Sub
源码地址
加载全部内容