rust异步编程详细讲解
上后左爱 人气:0简化版本 Future
// wake 函数 reactor发现状态是ready 通知executor 函数 trait SimpleFuture { type Output; fn poll(&mut self, wake: fn()) -> Poll<Self::Output>; // fn poll(&mut self, wake: u32) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending, } future 返回的是Poll枚举 , 状态有Ready ,pending 状态 executor 调用 Future 任务,Ready 执行完成, pending 阻塞 执行其他任务 reactor 检查任务是否变成 ready
simpleFuture 是一个trait, 属于struct MySleep
use std::thread; use std::time::Duration; trait SimpleFuture { type Output; // fn poll(&mut self, wake: fn()) -> Poll<Self::Output>; fn poll(&mut self, wake: u32) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending, } // 自定义循环技术 struct MySleeper { polls: u64, wake: u32, // 简化使用整数 替换函数 } impl MySleeper { fn new(wake: u32) -> Self { MySleeper { polls: 0, wake: wake, } } } static mut FINISHED: bool = false; impl SimpleFuture for MySleeper { type Output = (); fn poll(&mut self, wake: u32) -> Poll<Self::Output> { // 简化编程 使用unsafe 使用外部的变量 unsafe { if FINISHED { Poll::Ready(()) } else { self.wake = wake; self.polls += 1; println!("polls = {}", self.polls); Poll::Pending } } } } // 定义自定义Reactor struct MyReactor { wake: u32, // 通知exector handle: Option<thread::JoinHandle<()>>, // 线程句柄 } impl MyReactor { fn new() -> Self { MyReactor { wake: 0, handle: None, } } // 知道哪个wake 通知具体函数 fn add_wake(&mut self, wake: u32) { self.wake = wake; } // check status fn check_status(&mut self) { if self.handle.is_none() { let wake = self.wake; // 模拟 通过死循环进行监控状态 let handle = thread::spawn(|| loop { thread::sleep(Duration::from_secs(5)); unsafe { //模拟future就绪,然后调用wake FINISHED = true; } }); self.handle = Some(handle); } } } struct MyExecutor; impl MyExecutor { fn block_on<F: SimpleFuture>(mut myfuture: F, wake: u32) { //阻塞执行future loop { match myfuture.poll(wake) { Poll::Ready(_) => { println!("my future is ok"); break; } Poll::Pending => unsafe { while !FINISHED { // 循环 每隔一秒钟检测一下 thread::sleep(Duration::from_secs(1)); } } } } } } fn main() { let mut reactor = MyReactor::new(); let sleeper = MySleeper::new(5); let wake = sleeper.wake; reactor.add_wake(wake); reactor.check_status(); MyExecutor::block_on(sleeper, wake); }
在简化版本的Future 对象中 有定义MyReactor, 和 MyExecutor, MyReactor wake 函数进行标记后调用自定义的check_staus 模拟Future的就绪,调用wake 函数通知, 在MyExector 的block_on 函数中 通过wake函数匹配状态 判读任务是否已经Ready
理解Future的模型
运行时状态的框架: async-std, futures 中已经实现executor不需要自己在实现。
异步编程中,rust的编程语言中只给我们提供trait Future, async-std, tokio,futures 等异步编程库 对其进行扩展,并且提供相对应的函数
async fn hello() { println!("hello"); } 等价于下面函数 fn hello() -> impl Future<Output=()> { async { println!("hello"); } }
rust 标准库中 Future如下:
pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } // Pin 可以创建不可移动Future,通过不可移动对象在他们字段中存储指针 struct MyFut { a: i32, ptr: *const i32, // 指针指向字段a 所在的内存首地址 }
async/awit 使用
在闭包中使用async
use std::future::Future; fn my_function() -> impl Future<Output = u8> { let closure = async |x: u8| { x }; closure(5) } 闭包在稳定版本还不支持,后续的版本中会支持 异步闭包 ##### async lifetime ```rust 这个函数的生命周期 // fn foo_expand(x: &'a u8) -> impl Future<Output = u8> + 'a { // async { // *x // } // } async fn foo(x: & u8) -> u8 { *x } // async 函数返回一个Future的对象 fn good() -> impl Future<Output = u8>{ // 异步代码块中,定义 变量后 调用foo_expand async 函数后进行await async { // x 修饰后 x 的生命周期有多长 就有多长 let x = 5; foo_expand(&x).await } }
加载全部内容