rust-async异步
运行时
运行时runtime:
- 程序声明周期的一个阶段。编译时、运行时。
- 运行时库。
- 某门语言的宿主环境。例如:Nodejs是一个JavaScript的运行时。
来自知乎:doodlewind
事件驱动性能很好,最大的问题:回调地狱。
回调地狱
JS的例子:(保证执行顺序)
setTimeout(function () { //第一层 console.log('武林要以和为贵'); setTimeout(function () { //第二程 console.log('要讲武德'); setTimeout(function () { //第三层 console.log('不要搞窝里斗'); }, 1000) }, 2000) }, 3000)
async简介
OS线程:适用于少量任务,生成和切换较大,允许重用同步代码
Async:内存CPU开销小、可执行文件大(需要生成状态机,每个可执行文件捆绑一个异步运行时)
二者没有好坏之分,只是更适用场景之分。
Future原理
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
Future
代表着可以校验是否完成的操作,调用poll
函数获取当前进展。
wake
函数被调用时,执行器将再次调用poll
函数,以便Future
取得更多进展。
Waker
使用Waker
实现一个简单计时器Future
use std::task::Poll;
use std::thread;
use std::time::Duration;
use std::{task::Waker, sync::Arc};
use std::sync::Mutex;
use futures::{ Future};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
// 在Future和等待的线程间共享状态
struct SharedState {
completed: bool, // 睡眠时间是否结束
// 在设置completed=true之后,线程用它来告诉
// TimerFuture的任务可以唤醒
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// 设置waker以便timer结束时线程可以唤醒当前任务
// 保证future可以再次被poll
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(
SharedState {
completed: false,
waker: None,
}
));
let thread_shared_state = shared_state.clone();
thread::spawn(move|| {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
Executor
谁来执行顶层async
函数返回的future
?Future executor
Executor
会获取一系列顶层的Future
,然后再Future
可以有进展的时候调用poll
,直至完成。
通常执行者将poll
一个future
一次,来开始。
当Future
通过调用wake
函数表示已经准备好取得进展时,就会被放入到一个队列里,然后poll
再次被调用,直至Future
完成。
构建简单执行者demo
:
// 从通道里接收任务并执行
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
// 生成future到channel
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
struct Task {
future: Mutex<Option<FutureObj<'static, ()>>>,
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor {ready_queue}, Spawner{task_sender})
}
impl Spawner {
fn spawn(&self, future:impl Future<Output=()> + 'static + Send) {
let future_obj = FutureObj::new(Box::new(future));
let task = Arc::new(Task {
future: Mutex::new(Some(future_obj)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}
// 再次发送给任务通道,执行者接收
impl Wake for Task {
fn wake(self: Arc<Self>) {
let cloned = self.clone();
self.task_sender.send(cloned).expect("too many tasks queued");
}
}
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 这里找不到,看个意思
let lw = local_waker_from_nonlocal(task.clone());
if let Poll::Pending = Pin::new(&mut future).poll(&lw) {
*future_slot = Some(future);
}
}
}
}
}
fn main() {
let (executor, spawner) = new_executor_and_spawner();
spawner.spawn(async {
println!("howdy!");
// Wait for our timer future to complete after two seconds.
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
executor.run();
}
async/.await
Rust的特殊语法,当发生阻塞时,它让放弃当前线程的控制权成为可能,允许在等待操作完成的时候,允许其它代码取得进展。
.await
只能用在async
里。


Pin
Pin
与Unpin
标记一起工作。
保证实现了!Unpin
的对象永远不会被移动。
内存交换demo
demo
:
a为变量,b为指向变量的指针。

所以内存交换后,a、b打印的值就不一样了。
Pin
类型会包裹指针类型,保证指针指向的值不被移动。
Unpin trait
大多数类型如果被移动,不会造成问题,他们实现了Unpin
。
指向Unpin
类型的指针,可以自由放入或者从Pin
中取出,例如u8
是Unpin
的,Pin<&mut u8>
和普通的&mut u8
一样。
如果类型拥有!Unpin
标记,那么在Pin
之后他们无法被移动。(!取反)
再交换内存会报错。
Stream trait
类似Future trait
,但是可以在完成前产生多个值,有点像Iterator trait
。

demo:

同时执行多个future
方式:
- join!,等待所有future完成
- select!,等待多个future中的一个完成
- Spawning,创建一个顶级任务,它会运行一个future直至完成
- FuturesUnordered,一组future,他们会产生每个子future的结果
join!

try_join!
,如果返回的Result
里面有错误,就立刻返回。
select!

并发web server
单线程模式
use std::{net::{TcpListener, TcpStream}, io::{Read, Write}, fs};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{status_line}{contents}");
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
async-std模式
use std::time::Duration;
use async_std::{net::{TcpListener, TcpStream}, io::{ReadExt, WriteExt}, task, fs};
use futures::StreamExt;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener.incoming()
.for_each_concurrent(None, |tcpstream| async move{
let stream = tcpstream.unwrap();
handle_connection(stream).await;
}).await;
}
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
let get = b"GET / HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
task::sleep(Duration::from_secs(5)).await;
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).await.unwrap();
let response = format!("{status_line}{contents}");
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
先有个感觉稍微了解下吧。