RUST异步流处理方法详细讲解
上后左爱 人气:0Stream 特质
在同步Rust 中流的核心是Iterator
提供了一种在序列中产生项的方法,并在它们之间进行阻塞,通过迭代器传递给其他迭代器
在异步Rust中流的核心Stream, 允许其他任务在当前阻塞等待时允许
Read/Write, AsyncRead/AsyncWrite
fn main() { let f = file::create("E:\\foot.txt").await?; f.write_all(b"hello world").await?; let f = file::open("E:\\foot.txt").await?; let mut buffer = Vec::new(); f.read_to_end(&mut buffer).await?; }
Stream 经典子流
source: 可以生成数据流
Sink: 可以消费数据流
Through: 消费数据,对其进行操作生成新数据流
Duplex: 流可以生成数据,也可以独立消费数据(AsyncWrite/Read)
asyncread 和 Stream 区别
这两种对byte 进行操作,AsyncRead 只能对byte进行操作(生成未解析数据),Stream对任何类型的数据进行操作(生成解析数据)
使用for_each_concurrent, try_for_each_concurrent 进行并发的处理流,进行流的处理
yield 匿名流
在async 异步过程中使用yield 关键字, 类似于Python 迭代产生时候可以返回,下一次从上一次返回值在进行开始跌打
try_join
如果某个发生错误后会立即返回数据
使用try_join 需要函数返回结果,并且错误的类型,才能正常运行
use futures; use tokio::runtime::Runtime; use std::io::Result; async fn func1() -> Result<()> { tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await; println!("func1 finished!"); Ok(()) } async fn func2() -> Result<()> { println!("func2 finished!"); Ok(()) } async fn async_main() { let f1 = func1(); let f2 = func2(); if let Err(_) = futures::try_join!(f1, f2) { println!("Err!"); } } fn main() { let mut runtime = Runtime::new().unwrap(); runtime.block_on(async_main()); println!("Hello, world!"); }
select
使用场景 有三个运行任务 ,只要其中一个完成后立马返回,使用select
在使用select启动使用pin_mut!(f1, f2),
使用select! 进行匹配
use futures::{select, future::FutureExt, pin_mut}; use tokio::runtime::Runtime; use std::io::Result; async fn func1() -> Result<()> { tokio::time::delay_for(tokio::time::Duration::from_secs(2)).await; println!("func1 finished!"); Ok(()) } async fn func2() -> Result<()> { println!("func2 finished!"); Ok(()) } async fn async_main() { let f1 = func1().fuse(); let f2 = func2().fuse(); pin_mut!(f1, f2); // 使用select 进行匹配 select! { _ = f1 => println!("func1 finished++++++!"), _ = f2 => println!("func2 finished++++++!"), } } fn main() { // 使用tokio的runtime() let mut runtime = Runtime::new().unwrap(); runtime.block_on(async_main()); println!("Hello, world!"); }
select! y与default/complete 一起联合使用
complete :表示两个都已经就绪,default表示两个都没有就绪
use futures::{future, select, executor}; async fn count() { let mut a_fut = future::ready(4); let mut b_fut = future::ready(6); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, //表示所有的分支都已经完成,并且不会再取得进展的情况 default => unreachable!(), //表示没有分支完成 } } assert_eq!(total, 10); } fn main() { executor::block_on(count()); println!("Hello, world!"); }
complete 表示所有分支都已经完成,并且不会取得进展的情况,如上所示,使用loop 第一次b分支准备好,下一次循环可能是a分支,最后两个分支都已经完成后 就break退出
complete 类似让所有分支都完成后直接退出
SELECT宏几个条件
- select中使用Future必须首先UnPinFuture trait, Fused trait
- 必须实现UnpinFuture原因在于select! 不是按照值获取,按照引用获取,这样能够在不获取future所有权条件下,未完成的future可以继续使用
- 必须实现FusedFuture: select 完成后不在轮询future,因此需要实现FusedFuture 跟踪Future是否完成
- 如果select使用stream,其stream 也是需要实现FusedStream
async 问号使用
如果返回类型有Result<T, E> 结果使用.await?
Send trait
在保证多线程安全时候 需要保证接口实现Send trait 、sync trait 才能保证多线程的安全
Send trait 表示数据能够在线程间安全的发送,sync trait 能够保证线程安全的引用
use std::rc::Rc; #[derive(Default)] struct NoSend(Rc<()>); async fn bar() {} async fn foo() { NoSend::default(); //{ // let x = NoSend::default(); // //to do : xxxxx //} let _ = NoSend::default(); bar().await; } //Send trait:如果所有的子类型都是实现Send trait的,那么它本身也是实现Send Trait的 // 如果内部没有定义 只是使用 是一个Send Trait 主要是在 生成 匿名结构体中 会进行解析 not let x: impl Send Trait //struct Foo { // f: Future, //} let x: Not impl Send Trait //struct Foo { // x: NoSend, //not impl Send Trait // f: Future, //impl Send Trait //} fn required_send(_: impl Send) {} fn main() { required_send(foo()); println!("Hello, world!"); }
加载全部内容