当前位置:嗨网首页>书籍在线阅读

14-通过消息传递进行通信

  
选择背景色: 黄橙 洋红 淡粉 水蓝 草绿 白色 选择字体: 宋体 黑体 微软雅黑 楷体 选择字体大小: 恢复默认

8.4.4 通过消息传递进行通信

线程还可以通过被称为消息传递的更高级抽象来互相通信。这种线程通信模型避免了需要用户显式锁定的要求。

标准库中的std::sync::mpsc模块提供了一个无锁定的多生产者、单订阅者(消费者)队列,以此作为希望彼此通信的线程的共享消息队列。mpsc模块标准库包含两种通道。

  • channel:这是一个异步的无限缓冲通道。
  • sync_channel:这是一个同步的有界缓冲通道。

通道可用于将数据从一个线程发送到另一个线程。我们先来看看异步通道。

异步通道

以下是一个简单的生产者-消费者系统的示例,其中主线程生成值0,1,…,9,然后在新生成的线程输出它们:

// async_channels.rs
use std::thread;
use std::sync::mpsc::channel;
fn main() {
    let (tx, rx) = channel();
    let join_handle = thread::spawn(move || {
        while let Ok(n) = rx.recv() {
            println!("Received {}", n);
        }
    });
    for i in 0..10 {
        tx.send(i).unwrap();
    }
    join_handle.join().unwrap();
}

我们首先调用了channel方法,这将返回两个值tx和rx。tx是包含Sender类型的发送端,rx是包含Receiver类型的接收端。它们的名字是约定俗成的,你也可以给它们设置其他任意名称。通常,你会看到代码库中采用这些名称,因为它们的构造非常简洁。

接下来,我们生成一个从rx端接收值的线程:

let join_handle = thread::spawn(move || {
    //在循环中持续接收值,直到tx失效
    while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
        println!("Received {}", n);
    }
});

我们使用了while let循环。当tx被丢弃时,该循环将收到Err。上述丢弃操作一般发生在main函数返回时。

在上述代码中,首先为了创建mpsc队列,会调用channel函数,它会返回Sender和Receiver

Sender是一种复制类型,这意味着它可以切换到多个线程中,允许它们将消息发送到共享队列。

多个生产者,单个消费者(Multi Producer, Single Consumer,MPSC)方法提供了多个作者,但只提供了一个读者。这两个函数都返回一对泛型:发送者和接收者。发送者可用于将新事物推入通道,而接收者可用于从通道获取内容。发送者实现了复制特征,而接收者没有。

使用默认的异步通道时,send方法永远不会阻塞。这是因为通道缓冲区是无限的,所以总是会提供更多的空间。当然,它实际上并不是无限的,只是在概念上如此:如果你在没有收到任何数据的情况下向通道发送数千兆字节,那么系统可能会耗尽内存。

同步通道

同步通道有一个有界缓冲区,当它被填满时,send方法会被阻塞,直到通道中出现更多空间。其他用法和异步通道类似:

// sync_channels.rs
use std::thread;
use std::sync::mpsc;
fn main() {
    let (tx, rx) = mpsc::sync_channel(1);
    let tx_clone = tx.clone();
    let _ = tx.send(0);
    thread::spawn(move || {
        let _ = tx.send(1);
    });
    thread::spawn(move || {
        let _ = tx_clone.send(2);
    });
    println!("Received {} via the channel", rx.recv().unwrap());
    println!("Received {} via the channel", rx.recv().unwrap());
    println!("Received {} via the channel", rx.recv().unwrap());
    println!("Received {:?} via the channel", rx.recv());
}

由上述代码可知,当同步通道的大小为1时,这意味着通道中不可能存在多个元素。在这种情况下,第一次发送请求之后的任何请求都会被阻塞。但是,在前面的代码中,我们的请求不会被阻塞(至少对于那些长请求是如此),因为两个发送线程在后台工作,主线程会在不被send调用阻塞的情况下接收信息。对于这两种通道类型,如果通道是空的,那么recv调用会返回Err值。