pub struct Receiver<T> { /* 私有字段 */ }展开描述
实现§
源代码§impl<T> Receiver<T>
impl<T> Receiver<T>
源代码pub async fn recv(&mut self) -> Option<T>
pub async fn recv(&mut self) -> Option<T>
接收此接收器的下一个值。
如果通道已关闭且通道缓冲区中没有剩余消息,则此方法返回 None。这表明从此 Receiver 不再可能接收到任何值。当所有 sender 都被 drop 或调用了 close 时,通道会被关闭。
如果 channel 的缓冲区中没有消息,但 channel 尚未关闭,则此方法将休眠,直到有消息被发送或 channel 被关闭。请注意,如果调用了 close,但仍有之前未完成的 Permits,则 recv 不会认为 channel 已关闭,直到这些 permit 被释放。
§取消安全性
此方法可安全取消。如果 recv 在 tokio::select! 语句中作为事件使用,并且其他分支先完成,则可以保证此通道没有接收到任何消息。
§示例
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(100);
tokio::spawn(async move {
tx.send("hello").await.unwrap();
});
assert_eq!(Some("hello"), rx.recv().await);
assert_eq!(None, rx.recv().await);值被缓冲:
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(100);
tx.send("hello").await.unwrap();
tx.send("world").await.unwrap();
assert_eq!(Some("hello"), rx.recv().await);
assert_eq!(Some("world"), rx.recv().await);源代码pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize
pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize
为此 receiver 接收下一个值并扩展 buffer。
此方法以不超过 limit 指定的固定值数量扩展 buffer。如果 limit 为零,则函数立即返回 0。返回值是添加到 buffer 的值的数量。
对于 limit > 0,如果 channel 队列中没有消息,但 channel 尚未关闭,则此方法将休眠,直到有消息被发送或 channel 被关闭。请注意,如果调用了 close,但仍有之前未完成的 Permits,则 recv_many 不会认为 channel 已关闭,直到这些 permit 被释放。
对于 limit 的非零值,此方法永远不会返回 0,除非 channel 已关闭并且 channel 队列中没有剩余消息。这表明永远无法从此 Receiver 接收更多值。当所有 sender 都已被 drop 或调用 close 时,channel 关闭。
buffer 的容量会按需增加。
§取消安全性
此方法可安全取消。如果 recv_many 在 tokio::select! 语句中作为事件使用,并且其他分支先完成,则可以保证此通道没有接收到任何消息。
§示例
use tokio::sync::mpsc;
let mut buffer: Vec<&str> = Vec::with_capacity(2);
let limit = 2;
let (tx, mut rx) = mpsc::channel(100);
let tx2 = tx.clone();
tx2.send("first").await.unwrap();
tx2.send("second").await.unwrap();
tx2.send("third").await.unwrap();
// Call `recv_many` to receive up to `limit` (2) values.
assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
assert_eq!(vec!["first", "second"], buffer);
// If the buffer is full, the next call to `recv_many`
// reserves additional capacity.
assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
tokio::spawn(async move {
tx.send("fourth").await.unwrap();
});
// 'tx' is dropped, but `recv_many`
// is guaranteed not to return 0 as the channel
// is not yet closed.
assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
// Once the last sender is dropped, the channel is
// closed and `recv_many` returns 0, capacity unchanged.
drop(tx2);
assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
assert_eq!(vec!["first", "second", "third", "fourth"], buffer);源代码pub fn try_recv(&mut self) -> Result<T, TryRecvError>
pub fn try_recv(&mut self) -> Result<T, TryRecvError>
尝试为该 receiver 接收下一个值。
如果通道当前为空,但仍存在未释放的 sender 或 permit,则此方法返回 Empty 错误。
如果通道当前为空,且没有未释放的 sender 或 permit,则此方法返回 Disconnected 错误。
与 poll_recv 方法不同,此方法绝不会虚假地返回 Empty 错误。
§示例
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
let (tx, mut rx) = mpsc::channel(100);
tx.send("hello").await.unwrap();
assert_eq!(Ok("hello"), rx.try_recv());
assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
tx.send("hello").await.unwrap();
// Drop the last sender, closing the channel.
drop(tx);
assert_eq!(Ok("hello"), rx.try_recv());
assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());源代码pub fn blocking_recv(&mut self) -> Option<T>
pub fn blocking_recv(&mut self) -> Option<T>
阻塞接收,可在异步上下文之外调用。
如果通道已关闭且通道缓冲区中没有剩余消息,则此方法返回 None。这表明从此 Receiver 不再可能接收到任何值。当所有 sender 都被 drop 或调用了 close 时,通道会被关闭。
如果 channel 的缓冲区中没有消息,但 channel 尚未关闭,则此方法将阻塞,直到有消息被发送或 channel 被关闭。
此方法用于从异步代码向同步代码发送的用例,即使发送方没有使用 blocking_send 发送消息也能工作。
请注意,如果调用了 close,但仍有之前未完成的 Permits,则 blocking_recv 不会认为 channel 已关闭,直到这些 permit 被释放。
§恐慌
如果在异步执行上下文中调用,此函数会发生 panic。
§示例
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
fn main() {
let (tx, mut rx) = mpsc::channel::<u8>(10);
let sync_code = thread::spawn(move || {
assert_eq!(Some(10), rx.blocking_recv());
});
Runtime::new()
.unwrap()
.block_on(async move {
let _ = tx.send(10).await;
});
sync_code.join().unwrap()
}源代码pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize
pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize
用于阻塞上下文的 Self::recv_many 变体。
Self::blocking_recv 中的相同条件适用。
源代码pub fn close(&mut self)
pub fn close(&mut self)
关闭 channel 的接收端,但不 drop 它。
这可以防止任何进一步的消息在 channel 上发送,同时仍然允许接收器排出已缓冲的消息。任何未完成的 Permit 值仍然能够发送消息。
为了保证不会丢失消息,在调用 close() 之后,必须调用 recv() 直到返回 None。如果有未完成的 Permit 或 OwnedPermit 值,则 recv 方法在这些值被释放之前不会返回 None。
§示例
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(20);
tokio::spawn(async move {
let mut i = 0;
while let Ok(permit) = tx.reserve().await {
permit.send(i);
i += 1;
}
});
rx.close();
while let Some(msg) = rx.recv().await {
println!("got {}", msg);
}
// Channel closed and no messages are lost.源代码pub fn is_closed(&self) -> bool
pub fn is_closed(&self) -> bool
检查通道是否已关闭。
如果 channel 已关闭,则此方法返回 true。当所有 Sender 都已被 drop 或调用 Receiver::close 时,channel 关闭。
§示例
use tokio::sync::mpsc;
let (_tx, mut rx) = mpsc::channel::<()>(10);
assert!(!rx.is_closed());
rx.close();
assert!(rx.is_closed());源代码pub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
检查通道是否为空。
如果通道中没有消息,此方法返回 true。
§示例
use tokio::sync::mpsc;
let (tx, rx) = mpsc::channel(10);
assert!(rx.is_empty());
tx.send(0).await.unwrap();
assert!(!rx.is_empty());
源代码pub fn len(&self) -> usize
pub fn len(&self) -> usize
返回通道中的消息数。
§示例
use tokio::sync::mpsc;
let (tx, rx) = mpsc::channel(10);
assert_eq!(0, rx.len());
tx.send(0).await.unwrap();
assert_eq!(1, rx.len());源代码pub fn capacity(&self) -> usize
pub fn capacity(&self) -> usize
返回通道的当前容量。
当 sender 通过调用 Sender::send 或通过 Sender::reserve 预留容量发送值时,容量会减少。当接收值时,容量会增加。这与 max_capacity 不同,后者始终返回调用 channel 时最初指定的缓冲区容量。
§示例
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel::<()>(5);
assert_eq!(rx.capacity(), 5);
// Making a reservation drops the capacity by one.
let permit = tx.reserve().await.unwrap();
assert_eq!(rx.capacity(), 4);
assert_eq!(rx.len(), 0);
// Sending and receiving a value increases the capacity by one.
permit.send(());
assert_eq!(rx.len(), 1);
rx.recv().await.unwrap();
assert_eq!(rx.capacity(), 5);
// Directly sending a message drops the capacity by one.
tx.send(()).await.unwrap();
assert_eq!(rx.capacity(), 4);
assert_eq!(rx.len(), 1);
// Receiving the message increases the capacity by one.
rx.recv().await.unwrap();
assert_eq!(rx.capacity(), 5);
assert_eq!(rx.len(), 0);源代码pub fn max_capacity(&self) -> usize
pub fn max_capacity(&self) -> usize
返回通道的最大缓冲区容量。
最大容量是调用 channel 时最初指定的缓冲区容量。这与 capacity 不同,后者返回当前可用的缓冲区容量:随着消息的发送和接收,capacity 返回的值会上下波动,而 max_capacity 返回的值保持不变。
§示例
use tokio::sync::mpsc;
let (tx, rx) = mpsc::channel::<()>(5);
// both max capacity and capacity are the same at first
assert_eq!(rx.max_capacity(), 5);
assert_eq!(rx.capacity(), 5);
// Making a reservation doesn't change the max capacity.
let permit = tx.reserve().await.unwrap();
assert_eq!(rx.max_capacity(), 5);
// but drops the capacity by one
assert_eq!(rx.capacity(), 4);源代码pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>
轮询以接收此通道上的下一条消息。
此方法返回:
Poll::Pendingif no messages are available but the channel is not closed, or if a spurious failure happens.Poll::Ready(Some(message))if a message is available.Poll::Ready(None)if the channel has been closed and all messages sent before it was closed have been received.
当该方法返回 Poll::Pending 时,所提供 Context 中的 Waker 会在任意接收器上发送消息或通道被关闭时被调度以接收唤醒。请注意,在多次调用 poll_recv 或 poll_recv_many 时,只有传递给最近一次调用的 Context 中的 Waker 会被调度以接收唤醒。
如果此方法由于虚假失败而返回 Poll::Pending,那么在导致虚假失败的情况得到解决后,会收到 Waker 通知。请注意,收到这样的唤醒并不能保证下一次调用一定会成功——它可能会因另一个虚假失败而失败。
源代码pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize>
pub fn poll_recv_many( &mut self, cx: &mut Context<'_>, buffer: &mut Vec<T>, limit: usize, ) -> Poll<usize>
轮询以接收此通道上的多条消息,并扩展所提供的缓冲区。
此方法返回:
Poll::Pendingif no messages are available but the channel is not closed, or if a spurious failure happens.Poll::Ready(count)wherecountis the number of messages successfully received and stored inbuffer. This can be less than, or equal to,limit.Poll::Ready(0)iflimitis set to zero or when the channel is closed.
当该方法返回 Poll::Pending 时,所提供 Context 中的 Waker 会在任意接收器上发送消息或通道被关闭时被调度以接收唤醒。请注意,在多次调用 poll_recv 或 poll_recv_many 时,只有传递给最近一次调用的 Context 中的 Waker 会被调度以接收唤醒。
请注意,此方法不保证一定接收到恰好 limit 条消息。而是如果至少有一条消息可用,它会尽可能多地返回消息,直到达到给定的上限。仅当通道已关闭(或 limit 为 0)时,此方法才返回零。
§示例
use std::task::{Context, Poll};
use std::pin::Pin;
use tokio::sync::mpsc;
use futures::Future;
struct MyReceiverFuture<'a> {
receiver: mpsc::Receiver<i32>,
buffer: &'a mut Vec<i32>,
limit: usize,
}
impl<'a> Future for MyReceiverFuture<'a> {
type Output = usize; // Number of messages received
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
// Now `receiver` and `buffer` are mutable references, and `limit` is copied
match receiver.poll_recv_many(cx, *buffer, *limit) {
Poll::Pending => Poll::Pending,
Poll::Ready(count) => Poll::Ready(count),
}
}
}
let (tx, rx) = mpsc::channel(32);
let mut buffer = Vec::new();
let my_receiver_future = MyReceiverFuture {
receiver: rx,
buffer: &mut buffer,
limit: 3,
};
for i in 0..10 {
tx.send(i).await.unwrap();
}
let count = my_receiver_future.await;
assert_eq!(count, 3);
assert_eq!(buffer, vec![0,1,2])源代码pub fn sender_strong_count(&self) -> usize
pub fn sender_strong_count(&self) -> usize
返回 Sender 句柄的数量。
源代码pub fn sender_weak_count(&self) -> usize
pub fn sender_weak_count(&self) -> usize
返回 WeakSender 句柄的数量。