rust-async异步


运行时

运行时runtime:

  • 程序声明周期的一个阶段。编译时、运行时。
  • 运行时库。
  • 某门语言的宿主环境。例如:Nodejs是一个JavaScript的运行时。

来自知乎:doodlewind

事件驱动性能很好,最大的问题:回调地狱。

回调地狱

JS的例子:(保证执行顺序)

setTimeout(function () {  //第一层
    console.log('武林要以和为贵');
    setTimeout(function () {  //第二程
        console.log('要讲武德');
        setTimeout(function () {   //第三层
            console.log('不要搞窝里斗');
        }, 1000)
    }, 2000)
}, 3000)

async简介

OS线程:适用于少量任务,生成和切换较大,允许重用同步代码

Async:内存CPU开销小、可执行文件大(需要生成状态机,每个可执行文件捆绑一个异步运行时)

二者没有好坏之分,只是更适用场景之分。

Future原理

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

Future代表着可以校验是否完成的操作,调用poll函数获取当前进展。

wake函数被调用时,执行器将再次调用poll函数,以便Future取得更多进展。

Waker

使用Waker实现一个简单计时器Future

use std::task::Poll;
use std::thread;
use std::time::Duration;
use std::{task::Waker, sync::Arc};
use std::sync::Mutex;
use futures::{ Future};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

// 在Future和等待的线程间共享状态
struct SharedState {
    completed: bool, // 睡眠时间是否结束
    
    // 在设置completed=true之后,线程用它来告诉
    // TimerFuture的任务可以唤醒
    waker: Option<Waker>, 
}

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // 设置waker以便timer结束时线程可以唤醒当前任务
            // 保证future可以再次被poll
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(
            SharedState {
                completed: false,
                waker: None,
            }
        ));

        let thread_shared_state = shared_state.clone();

        thread::spawn(move|| {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();

            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

Executor

谁来执行顶层async函数返回的futureFuture executor

Executor会获取一系列顶层的Future,然后再Future可以有进展的时候调用poll,直至完成。

通常执行者将poll一个future一次,来开始。

Future通过调用wake函数表示已经准备好取得进展时,就会被放入到一个队列里,然后poll再次被调用,直至Future完成。

构建简单执行者demo


// 从通道里接收任务并执行
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

// 生成future到channel
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

struct Task {
    future: Mutex<Option<FutureObj<'static, ()>>>,
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor {ready_queue}, Spawner{task_sender})
}

impl Spawner {
    fn spawn(&self, future:impl Future<Output=()> + 'static + Send) {
        let future_obj = FutureObj::new(Box::new(future));
        let task = Arc::new(Task {
            future: Mutex::new(Some(future_obj)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

// 再次发送给任务通道,执行者接收
impl Wake for Task {
    fn wake(self: Arc<Self>) {
        let cloned = self.clone();
        self.task_sender.send(cloned).expect("too many tasks queued");
    }
}

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // 这里找不到,看个意思
                let lw = local_waker_from_nonlocal(task.clone());
                if let Poll::Pending = Pin::new(&mut future).poll(&lw) {
                    *future_slot = Some(future);
                }
            }
        }
    }
}

fn main() {
    let (executor, spawner) = new_executor_and_spawner();
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });
    executor.run();
}

async/.await

Rust的特殊语法,当发生阻塞时,它让放弃当前线程的控制权成为可能,允许在等待操作完成的时候,允许其它代码取得进展。

.await只能用在async里。

image-20230104153958517 image-20230104154605908

Pin

PinUnpin标记一起工作。

保证实现了!Unpin的对象永远不会被移动。

内存交换demo

demo

a为变量,b为指向变量的指针。

image-20230104160432128

所以内存交换后,a、b打印的值就不一样了。

Pin类型会包裹指针类型,保证指针指向的值不被移动。

Unpin trait

大多数类型如果被移动,不会造成问题,他们实现了Unpin

指向Unpin类型的指针,可以自由放入或者从Pin中取出,例如u8Unpin的,Pin<&mut u8>和普通的&mut u8一样。

如果类型拥有!Unpin标记,那么在Pin之后他们无法被移动。(!取反)

再交换内存会报错。

Stream trait

类似Future trait,但是可以在完成前产生多个值,有点像Iterator trait

image-20230104161948353

demo:

image-20230104162114871

同时执行多个future

方式:

  • join!,等待所有future完成
  • select!,等待多个future中的一个完成
  • Spawning,创建一个顶级任务,它会运行一个future直至完成
  • FuturesUnordered,一组future,他们会产生每个子future的结果

join!

image-20230104162633896

try_join!,如果返回的Result里面有错误,就立刻返回。

select!

image-20230104163117666

并发web server

单线程模式

use std::{net::{TcpListener, TcpStream}, io::{Read, Write}, fs};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];

    stream.read(&mut buffer).unwrap();
    
    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();
    let response = format!("{status_line}{contents}");
    stream.write_all(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

async-std模式

use std::time::Duration;

use async_std::{net::{TcpListener, TcpStream}, io::{ReadExt, WriteExt}, task, fs};
use futures::StreamExt;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();

    listener.incoming()
        .for_each_concurrent(None, |tcpstream| async move{
            let stream = tcpstream.unwrap();

            handle_connection(stream).await;
    }).await;
}

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];

    stream.read(&mut buffer).await.unwrap();
    
    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).await.unwrap();
    let response = format!("{status_line}{contents}");
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

先有个感觉稍微了解下吧。