亲宝软件园·资讯

展开

通过Dapr实现一个简单的基于.net的微服务电商系统(六)——一步一步教你如何撸Dapr之Actor服务

a1010 人气:0

  我个人认为Actor应该是Dapr里比较重头的部分也是Dapr一直在讲的所谓“stateful applications”真正具体的一个实现(个人认为),上一章讲到有状态服务可能很多同学看到后的第一反应是“不就是个分布式缓存吗”。那今天就讲讲Actor,看看这个东西到底能不能算得上有状态服务,同时由于篇幅有限,这里只会快速的过一遍Actor相关的概念,着重还是代码层面的实现。

目录:
一、通过Dapr实现一个简单的基于.net的微服务电商系统

二、通过Dapr实现一个简单的基于.net的微服务电商系统(二)——通讯框架讲解

三、通过Dapr实现一个简单的基于.net的微服务电商系统(三)——一步一步教你如何撸Dapr

四、通过Dapr实现一个简单的基于.net的微服务电商系统(四)——一步一步教你如何撸Dapr之订阅发布

五、通过Dapr实现一个简单的基于.net的微服务电商系统(五)——一步一步教你如何撸Dapr之状态管理

六、通过Dapr实现一个简单的基于.net的微服务电商系统(六)——一步一步教你如何撸Dapr之Actor服务
附录:(如果你觉得对你有用,请给个star)
一、电商Demo地址

二、通讯框架地址

  最早我接触到Actor应该是微软的Orleans框架(熟悉Actor或者Orleans的同学这一大段可以直接跳过),百度Actor关键词一大堆“通用并发编程模型”可能让人云里雾里的,其实它并不是一个特别复杂的概念。什么是并发编程?这个概念大家应该很熟悉了,现在主流的web服务器(如.netcore的kestrel或者dotnetty)几乎都是支持并行访问的,通过线程池充分调度操作系统的多线程来并行完成任务。在传统的多线程模式中如果多个线程同时访问某个数据并对其进行非幂等操作,往往是线程不安全的。

  在单应用时代我们可以很方便的通过lock关键字或者semaphore信号量或者concurrent线程安全集合或者Interlocked这样的CAS原子操作去规避多线程访问导致的数据不安全,亦或者直接采用以数据库事务为基础的乐观 or 悲观事务来实现,而一旦我们的应用由于吞吐瓶颈需要以集群的方式部署时或者分布式部署后对数据库也进行了拆分后,上面的那些方案都会失效或者会导致高昂的成本(比如数据库分布式事务协调机制)。这个时候往往需要引入一些分布式组件比如zookeeper或者redis锁来解决。这也是分布式系统比较常用的数据一致性方案。而actor则是提出了一个新的在分布式环境下解决多线程污染数据的思路。

  actor概念相对比较复杂这里就不展开了,简单粗暴的来理解就是在内存里为每一个actor对象维护了一个消息队列,当任意的请求不管该请求是来自于其他进程的线程亦或是当前进程的线程,都会将请求写入该消息队列,而Actor对象会监听该队列,当收到消息后Actor会处理该请求,在请求处理期间,外部线程会被阻塞在消息队列中,并且新的请求也会入队等待,直到actor对象完成操作后从队列里取出下一个请求处理直到整个队列为空。同时每一个actor对象在其临界区内的内存是私有的,并不会被其他线程共享,从而就实现了内存安全。这样当我们客户端发起数个请求访问一个或多个Actor对象时每个请求都会进入对应的Actor对象的消息队列(术语叫Mailboxs)并等待actor消费。同时Dapr框架会确保同一个Actor对象在同一时间在整个分布式系统中只会被激活一个实例!从而确保了你无论从分布式系统的任意角落访问某个Actor对象(user?id=1),总能得到唯一的一个实例

  Dapr框架会确保你的Actor实例永远能够被访问到(正确激活),哪怕对象在长时间未被访问后系统回收休眠亦或者在未处理的异常导致其崩溃后

  正确使用Actor唯一的要求就只有一条,由于Actor是一个内存并发模型所以不要在并发访问Actor时去做任意的可能的IO阻塞(比如读取数据库)!

  开始撸码,首先我们做一个RPC服务,看看多线程访问下的数据会是什么个情况,再对比一下Actor模式!在RPC层我们创建一个接口,代表产品服务,其有两个方法对应读取产品以及减扣库存

   接着我们在servicesample层实现一下这个服务(这里直接创建一个静态变量模拟多线程下访问共享内存数据的场景)

   接着我们在clientsample发起对着两个服务的RPC调用

   现在我们通过并发测试统计jmter对其进行并发测试,并发1000个线程去减100个库存,最后我们通过postman去访问get方法看看结果是什么

 

减库存前

 

 

 

 并行访问1000次

  

  可以看到由于没有并发控制,我们的库存被扣负了。现在我们开始对其进行Actor改造。首先我们将接口继承iactorservice并申明服务的方法为actor(这一步的目的是为类型生成actor代理)

    [RemoteService("servicesample", "product")]
    public interface IProductService : IActorService
    {
        [RemoteFunc(FuncType.Actor)]
        Task<ProductOutput> Get(ProductInput input);
        [RemoteFunc(FuncType.Actor)]
        Task<ProductOutput> ReduceStock(ProductInput input);
    }

  接着我们让入参类继承一个基类,这个基类需要派生类重写其Actorid字段。原因是Actor是通过全局唯一标识符在内部被标识的,访问相同标识会被路由到同一个actor。

    public class ProductInput : ActorSendDto
    {
        public int PorductId { get; set; }
        public int ReduceStock { get; set; }
        public override string ActorId { get; set; }
    }

  接下来我们改造一下clientsample的调用方法,这里修改的部分不多,只是把代理生成的方式替换了一下

        public async Task<dynamic> GetProduct()
        {
            var actorService = serviceProxyFactory.CreateActorProxy<IProductService>();
            return await actorService.Get(new ProductInput() { ActorId = "1", PorductId = 1 });
        }
        public async Task<dynamic> ProductReduceStock()
        {
            var actorService = serviceProxyFactory.CreateActorProxy<IProductService>();
            return await actorService.ReduceStock(new ProductInput() { ActorId = "1", PorductId = 1, ReduceStock = 1 });
        }

  接着我们对servicesample进行改造,首先我们需要在hostbuilder里替换掉默认的OxygenStartup,OxygenActorStartup会帮我们扫描类型生成对应的actor代理(其他代码无变化,略)

           .ConfigureWebHostDefaults(webhostbuilder => {
               //注册成为oxygen服务节点
               webhostbuilder.StartOxygenServer<OxygenActorStartup>((config) => {
                   config.Port = 80;
                   config.PubSubCompentName = "pubsub";
                   config.StateStoreCompentName = "statestore";
                   config.TracingHeaders = "Authentication";
               });
           })

  接着我们需要将之前的商品持久化PO类继承一个基类ActorStateModel,该基类会强制派生类重写两个属性AutoSave和ReminderSeconds,前者代表是否自动持久化(调用Actor SDK的Statemanage持久化到中间件,第二个代表如果开启持久化,是瞬时持久化还是由Actor的Timer按照周期持久化,这里的设计有点类似于redis aof模式下的always和everysec,前者(ReminderSeconds=0)采用每一次变更同步一次,性能损耗较大,后者采用每n(取决于ReminderSeconds设置)秒通过timer异步同步一次,同时我在Actor代理中添加了版本管理,并不会导致你的ReminderSeconds设置了周期同步后到时间就会请求你的同步委托,而是检测到版本变化后才会请求),这里我测试就直接开启自动同步并使用always模式

    public class ProductPo : ActorStateModel
    {
        public int Id { get; set; }
        public string Name { get; set; }
        public int Stock { get; set; }
        public override bool AutoSave { get; set; } = true;
        public override int ReminderSeconds => 0;
    }

  最后我们对ProductService进行改造,如下:

    public class ProductService : BaseActorService<ProductPo>, IProductService
    {
        static int visitCount = 0;
        static ProductPo ProductPoInstance;
        public async Task<ProductOutput> Get(ProductInput input)
        {
            ActorData ??= new ProductPo() { Id = 1, Name = "小白菜", Stock = 100 };
            return new ProductOutput() { Message = $"第{visitCount}次请求成功,当前库存剩余{ActorData.Stock}" };
        }
        public async Task<ProductOutput> ReduceStock(ProductInput input)
        {
            Interlocked.Increment(ref visitCount);
            await Task.Delay(new Random(Guid.NewGuid().GetHashCode()).Next(20, 50));//模拟数据库耗时
            ActorData ??= new ProductPo() { Id = 1, Name = "小白菜", Stock = 100 };
            if (ActorData.Stock >= input.ReduceStock)
            {
                await Task.Delay(new Random(Guid.NewGuid().GetHashCode()).Next(50, 100));//模拟数据库耗时
                ActorData.Stock -= input.ReduceStock;
            }
            return new ProductOutput() { Message = $"第{visitCount}次请求成功,当前库存剩余{ActorData.Stock}" };
        }

        public override async Task SaveData(ProductPo model, ILifetimeScope scope)
        {
            Console.WriteLine("同步请求被调用了,此处可以进行数据库持久化!");
            await Task.CompletedTask;
        }
    }

  可以看到我的服务继承了一个基类BaseActorService,并需要传递一个类型为ActorStateModel的泛型,这样在我的服务里不再通过IO去拉取ProductPoInstance,而是直接使用ActorData这个泛型实例进行各种操作即可,所以我删除掉了对应的数据库模拟耗时(避免actor队列访问阻塞),最后你必须重写BaseActorService的SaveData方法,该方法就是上文提到的同步委托,当我们开启AutoSave时,ReminderSeconds=0会在actor被调用操作完成后激活该委托,ReminderSeconds>0时会被定时器定期根据actor对比版本后判断是否需要激活。同时无论哪种方式我都在actor代理内部维护了一个channel异步队列通过异步订阅发布的方式实现非阻塞式的actor持久化而不用担心持久化导致的io阻塞问题。SaveData入参返回的一个ILifetimeScope容器可以很方便的获取到你的repository或者直接获取ef的上下文进行对应的数据库持久化操作(这里需要注意一下,Actor持久化有两层意思,第一层意思是Actor sdk会自带一个StateManager,当Component开启actor支持后,可以通过StateManager将actor对象写入中间件,而这里提供的SaveData是我封装的一个通过订阅发布异步调用的委托,方便开发人员持久化到数据库用的,非actor原生自带的设计)。

  最后我们需要扩展我们的Component,需要开启Actor持久化支持,编辑文件后用kubectl apply -f x.yaml即可:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: actorStateStore
    value: "true"
  - name: redisHost
    value: redis.infrastructure.svc.cluster.local:6379
  - name: keyPrefix
    value: none

  接下来我们看看通过jmter重新请求后的情况

 

  可以看到Actor确实解决了并发访问安全的问题,同时也能看到我们的委托被正确的调用了。

  总结一下,Actor确实通过其特殊的设计模式解决了并发访问数据安全的问题,同时也带来了一些问题诸如需要特定框架支持,诸如Actor行为内不能阻塞等等限制,不过相比其带来的无锁对象访问来讲,这点限制都是可以克服的,至少在特定场景下比如抢票、发红包等等有一定并发同时又需要确保数据一致的场景,Actor算是一个可选方案。至于更多的场景探索则需要同学们自己去摸索了,今天的分享就到这里。下期不出意外的话我们会分享一下Dapr的服务限流

加载全部内容

相关教程
猜你喜欢
用户评论