pub struct Receiver<T> { /* 私有字段 */ }展开描述
broadcast channel 的接收端。
不能并发使用。可以使用 recv 检索消息。
要将此接收器转换为 Stream,可以使用 BroadcastStream 包装器。
§示例
use tokio::sync::broadcast;
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();实现§
源代码§impl<T> Receiver<T>
impl<T> Receiver<T>
源代码pub fn len(&self) -> usize
pub fn len(&self) -> usize
返回已发送到 channel 中但此 Receiver 尚未接收的消息数。
如果 len 返回的值大于 channel 容量的下一个最大 2 的幂,则对 recv 的任何调用都将返回 Err(RecvError::Lagged),并且对 try_recv 的任何调用都将返回 Err(TryRecvError::Lagged)。例如,如果 channel 的容量为 10,则一旦 len 返回大于 16 的值,recv 将开始返回 Err(RecvError::Lagged)。
§示例
use tokio::sync::broadcast;
let (tx, mut rx1) = broadcast::channel(16);
tx.send(10).unwrap();
tx.send(20).unwrap();
assert_eq!(rx1.len(), 2);
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.len(), 1);
assert_eq!(rx1.recv().await.unwrap(), 20);
assert_eq!(rx1.len(), 0);源代码pub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
如果 channel 中没有任何 Receiver 尚未接收的消息,则返回 true。
§示例
use tokio::sync::broadcast;
let (tx, mut rx1) = broadcast::channel(16);
assert!(rx1.is_empty());
tx.send(10).unwrap();
tx.send(20).unwrap();
assert!(!rx1.is_empty());
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
assert!(rx1.is_empty());源代码pub fn same_channel(&self, other: &Self) -> bool
pub fn same_channel(&self, other: &Self) -> bool
如果这些 receiver 属于同一个通道,则返回 true。
§示例
use tokio::sync::broadcast;
let (tx, rx) = broadcast::channel::<()>(16);
let rx2 = tx.subscribe();
assert!(rx.same_channel(&rx2));
let (_tx3, rx3) = broadcast::channel::<()>(16);
assert!(!rx3.same_channel(&rx2));源代码pub fn sender_strong_count(&self) -> usize
pub fn sender_strong_count(&self) -> usize
返回 Sender 句柄的数量。
源代码pub fn sender_weak_count(&self) -> usize
pub fn sender_weak_count(&self) -> usize
返回 WeakSender 句柄的数量。
源代码§impl<T: Clone> Receiver<T>
impl<T: Clone> Receiver<T>
源代码pub fn resubscribe(&self) -> Self
pub fn resubscribe(&self) -> Self
从当前尾部元素开始重新订阅 channel。
此 Receiver 句柄将接收在重新订阅之后发送的所有值的克隆。这不包括当前接收器队列中的元素。请考虑以下示例。
§示例
use tokio::sync::broadcast;
let (tx, mut rx) = broadcast::channel(2);
tx.send(1).unwrap();
let mut rx2 = rx.resubscribe();
tx.send(2).unwrap();
assert_eq!(rx2.recv().await.unwrap(), 2);
assert_eq!(rx.recv().await.unwrap(), 1);源代码pub async fn recv(&mut self) -> Result<T, RecvError>
pub async fn recv(&mut self) -> Result<T, RecvError>
接收此接收器的下一个值。
每个 Receiver 句柄将接收在订阅之后发送的所有值的克隆。
当所有 Sender 端都已被 drop 时,返回 Err(RecvError::Closed),表示无法再在 channel 上发送任何值。
如果 Receiver 句柄滞后,一旦 channel 已满,新发送的值将覆盖旧值。此时,对 recv 的调用将返回 Err(RecvError::Lagged),并且 Receiver 的内部游标将更新为指向 channel 中仍保存的最旧值。随后对 recv 的调用将返回此值,除非它此后已被覆盖。
§取消安全性
此方法可安全取消。如果 recv 在 tokio::select! 语句中作为事件使用,并且其他分支先完成,则可以保证此通道没有接收到任何消息。
§示例
use tokio::sync::broadcast;
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();处理延迟
use tokio::sync::broadcast;
let (tx, mut rx) = broadcast::channel(2);
tx.send(10).unwrap();
tx.send(20).unwrap();
tx.send(30).unwrap();
// The receiver lagged behind
assert!(rx.recv().await.is_err());
// At this point, we can abort or continue with lost messages
assert_eq!(20, rx.recv().await.unwrap());
assert_eq!(30, rx.recv().await.unwrap());源代码pub fn try_recv(&mut self) -> Result<T, TryRecvError>
pub fn try_recv(&mut self) -> Result<T, TryRecvError>
尝试在不等待的情况下返回此接收器上的待处理值。
这对于在决定等待接收器之前的"乐观检查"很有用。
与 recv 相比,此函数有三种失败情况而不是两种(关闭、空缓冲区和滞后接收器各一种)。
当所有 Sender 端都已被 drop 时,返回 Err(TryRecvError::Closed),表示无法再在 channel 上发送任何值。
如果 Receiver 句柄滞后,一旦 channel 已满,新发送的值将覆盖旧值。此时,对 recv 的调用将返回 Err(TryRecvError::Lagged),并且 Receiver 的内部游标将更新为指向 channel 中仍保存的最旧值。随后对 try_recv 的调用将返回此值,除非它此后已被覆盖。如果没有可接收的值,则返回 Err(TryRecvError::Empty)。
§示例
use tokio::sync::broadcast;
let (tx, mut rx) = broadcast::channel(16);
assert!(rx.try_recv().is_err());
tx.send(10).unwrap();
let value = rx.try_recv().unwrap();
assert_eq!(10, value);源代码pub fn blocking_recv(&mut self) -> Result<T, RecvError>
pub fn blocking_recv(&mut self) -> Result<T, RecvError>
阻塞接收,可在异步上下文之外调用。
§恐慌
如果在异步执行上下文中调用,此函数会发生 panic。
§示例
use std::thread;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(16);
let sync_code = thread::spawn(move || {
assert_eq!(rx.blocking_recv(), Ok(10));
});
let _ = tx.send(10);
sync_code.join().unwrap();
}