展开描述
多生产者、多消费者的广播队列。每个发送的值会被所有消费者看到。
Sender 用于将值广播给所有连接的 Receiver。Sender 句柄是可克隆的,允许并发发送和接收操作。只要 T 是 Send,Sender 和 Receiver 都实现 Send 和 Sync。
当发送一个值时,所有 Receiver 句柄都将收到通知并接收该值。该值在 channel 内存储一次,并根据需要为每个接收器克隆。一旦所有接收器都收到了该值的克隆,该值就会从 channel 中释放。
通过调用 channel 创建 channel,指定 channel 在任何给定时间可以保留的最大消息数。
通过调用 Sender::subscribe 创建新的 Receiver 句柄。返回的 Receiver 将接收在调用 subscribe 之后发送的值。
此 channel 也适用于单生产者多消费者用例,其中单个 sender 向许多 receiver 广播值。
§Lagging
由于发送的消息必须保留到所有 Receiver 句柄都接收到一个克隆,因此 broadcast channel 容易出现"慢接收者"问题。在这种情况下,除一个接收者外,所有接收者都能够以发送速率接收值。因为一个接收者停滞不前,channel 开始填满。
此 broadcast channel 实现通过设置 channel 在任何给定时间可以保留的值的硬上限来处理这种情况。此上限作为参数传递给 channel 函数。
如果 channel 达到容量时发送一个值,则会释放 channel 当前持有的最旧值。这为新值腾出了空间。任何尚未看到已释放值的接收器在下一次调用 recv 时将返回 RecvError::Lagged。
一旦返回 RecvError::Lagged,滞后接收器的位置将更新为 channel 中包含的最旧值。下一次调用 recv 将返回此值。
此行为使接收器能够检测到何时已经滞后到数据已被丢弃的程度。调用者可以决定如何响应:要么中止其任务,要么容忍丢失的消息并恢复 channel 的消费。
§Closing
当所有 Sender 句柄都已被 drop 时,不能再发送新值。此时,channel 已"关闭"。一旦接收器已接收 channel 保留的所有值,则下次调用 recv 将返回 RecvError::Closed。
当 Receiver 句柄被 drop 时,接收器尚未读取的任何消息都将标记为已读。如果该接收器是唯一尚未读取该消息的接收器,则此时该消息将被丢弃。
§示例
基本用法
use tokio::sync::broadcast;
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();处理延迟
use tokio::sync::broadcast;
let (tx, mut rx) = broadcast::channel(2);
tx.send(10).unwrap();
tx.send(20).unwrap();
tx.send(30).unwrap();
// The receiver lagged behind
assert!(rx.recv().await.is_err());
// At this point, we can abort or continue with lost messages
assert_eq!(20, rx.recv().await.unwrap());
assert_eq!(30, rx.recv().await.unwrap());模块§
- error
- Broadcast error types
结构体§
- Receiver
- Receiving-half of the
broadcastchannel. - Sender
- Sending-half of the
broadcastchannel. - Weak
Sender - A sender that does not prevent the channel from being closed.
函数§
- channel
- Create a bounded, multi-producer, multi-consumer channel where each sent value is broadcasted to all active receivers.