跳到主要内容

Receiver

搜索

结构体 Receiver 

源代码
pub struct Receiver<T> { /* 私有字段 */ }
展开描述

从关联的 Sender 接收值。

实例由 channel 函数创建。

此接收器可以使用 ReceiverStream 转换为 Stream

实现§

源代码§

impl<T> Receiver<T>

源代码

pub async fn recv(&mut self) -> Option<T>

接收此接收器的下一个值。

如果通道已关闭且通道缓冲区中没有剩余消息,则此方法返回 None。这表明从此 Receiver 不再可能接收到任何值。当所有 sender 都被 drop 或调用了 close 时,通道会被关闭。

如果 channel 的缓冲区中没有消息,但 channel 尚未关闭,则此方法将休眠,直到有消息被发送或 channel 被关闭。请注意,如果调用了 close,但仍有之前未完成的 Permits,则 recv 不会认为 channel 已关闭,直到这些 permit 被释放。

§取消安全性

此方法可安全取消。如果 recvtokio::select! 语句中作为事件使用,并且其他分支先完成,则可以保证此通道没有接收到任何消息。

§示例
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(100);

tokio::spawn(async move {
    tx.send("hello").await.unwrap();
});

assert_eq!(Some("hello"), rx.recv().await);
assert_eq!(None, rx.recv().await);

值被缓冲:

use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(100);

tx.send("hello").await.unwrap();
tx.send("world").await.unwrap();

assert_eq!(Some("hello"), rx.recv().await);
assert_eq!(Some("world"), rx.recv().await);
源代码

pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize

为此 receiver 接收下一个值并扩展 buffer

此方法以不超过 limit 指定的固定值数量扩展 buffer。如果 limit 为零,则函数立即返回 0。返回值是添加到 buffer 的值的数量。

对于 limit > 0,如果 channel 队列中没有消息,但 channel 尚未关闭,则此方法将休眠,直到有消息被发送或 channel 被关闭。请注意,如果调用了 close,但仍有之前未完成的 Permits,则 recv_many 不会认为 channel 已关闭,直到这些 permit 被释放。

对于 limit 的非零值,此方法永远不会返回 0,除非 channel 已关闭并且 channel 队列中没有剩余消息。这表明永远无法从此 Receiver 接收更多值。当所有 sender 都已被 drop 或调用 close 时,channel 关闭。

buffer 的容量会按需增加。

§取消安全性

此方法可安全取消。如果 recv_manytokio::select! 语句中作为事件使用,并且其他分支先完成,则可以保证此通道没有接收到任何消息。

§示例
use tokio::sync::mpsc;

let mut buffer: Vec<&str> = Vec::with_capacity(2);
let limit = 2;
let (tx, mut rx) = mpsc::channel(100);
let tx2 = tx.clone();
tx2.send("first").await.unwrap();
tx2.send("second").await.unwrap();
tx2.send("third").await.unwrap();

// Call `recv_many` to receive up to `limit` (2) values.
assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
assert_eq!(vec!["first", "second"], buffer);

// If the buffer is full, the next call to `recv_many`
// reserves additional capacity.
assert_eq!(1, rx.recv_many(&mut buffer, 1).await);

tokio::spawn(async move {
    tx.send("fourth").await.unwrap();
});

// 'tx' is dropped, but `recv_many`
// is guaranteed not to return 0 as the channel
// is not yet closed.
assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
assert_eq!(vec!["first", "second", "third", "fourth"], buffer);

// Once the last sender is dropped, the channel is
// closed and `recv_many` returns 0, capacity unchanged.
drop(tx2);
assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
源代码

pub fn try_recv(&mut self) -> Result<T, TryRecvError>

尝试为该 receiver 接收下一个值。

如果通道当前为空,但仍存在未释放的 senderpermit,则此方法返回 Empty 错误。

如果通道当前为空,且没有未释放的 senderpermit,则此方法返回 Disconnected 错误。

poll_recv 方法不同,此方法绝不会虚假地返回 Empty 错误。

§示例
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;

let (tx, mut rx) = mpsc::channel(100);

tx.send("hello").await.unwrap();

assert_eq!(Ok("hello"), rx.try_recv());
assert_eq!(Err(TryRecvError::Empty), rx.try_recv());

tx.send("hello").await.unwrap();
// Drop the last sender, closing the channel.
drop(tx);

assert_eq!(Ok("hello"), rx.try_recv());
assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
源代码

pub fn blocking_recv(&mut self) -> Option<T>

阻塞接收,可在异步上下文之外调用。

如果通道已关闭且通道缓冲区中没有剩余消息,则此方法返回 None。这表明从此 Receiver 不再可能接收到任何值。当所有 sender 都被 drop 或调用了 close 时,通道会被关闭。

如果 channel 的缓冲区中没有消息,但 channel 尚未关闭,则此方法将阻塞,直到有消息被发送或 channel 被关闭。

此方法用于从异步代码向同步代码发送的用例,即使发送方没有使用 blocking_send 发送消息也能工作。

请注意,如果调用了 close,但仍有之前未完成的 Permits,则 blocking_recv 不会认为 channel 已关闭,直到这些 permit 被释放。

§恐慌

如果在异步执行上下文中调用,此函数会发生 panic。

§示例
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

fn main() {
    let (tx, mut rx) = mpsc::channel::<u8>(10);

    let sync_code = thread::spawn(move || {
        assert_eq!(Some(10), rx.blocking_recv());
    });

    Runtime::new()
        .unwrap()
        .block_on(async move {
            let _ = tx.send(10).await;
        });
    sync_code.join().unwrap()
}
源代码

pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize

用于阻塞上下文的 Self::recv_many 变体。

Self::blocking_recv 中的相同条件适用。

源代码

pub fn close(&mut self)

关闭 channel 的接收端,但不 drop 它。

这可以防止任何进一步的消息在 channel 上发送,同时仍然允许接收器排出已缓冲的消息。任何未完成的 Permit 值仍然能够发送消息。

为了保证不会丢失消息,在调用 close() 之后,必须调用 recv() 直到返回 None。如果有未完成的 PermitOwnedPermit 值,则 recv 方法在这些值被释放之前不会返回 None

§示例
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(20);

tokio::spawn(async move {
    let mut i = 0;
    while let Ok(permit) = tx.reserve().await {
        permit.send(i);
        i += 1;
    }
});

rx.close();

while let Some(msg) = rx.recv().await {
    println!("got {}", msg);
}

// Channel closed and no messages are lost.
源代码

pub fn is_closed(&self) -> bool

检查通道是否已关闭。

如果 channel 已关闭,则此方法返回 true。当所有 Sender 都已被 drop 或调用 Receiver::close 时,channel 关闭。

§示例
use tokio::sync::mpsc;

let (_tx, mut rx) = mpsc::channel::<()>(10);
assert!(!rx.is_closed());

rx.close();

assert!(rx.is_closed());
源代码

pub fn is_empty(&self) -> bool

检查通道是否为空。

如果通道中没有消息,此方法返回 true

§示例
use tokio::sync::mpsc;

let (tx, rx) = mpsc::channel(10);
assert!(rx.is_empty());

tx.send(0).await.unwrap();
assert!(!rx.is_empty());
源代码

pub fn len(&self) -> usize

返回通道中的消息数。

§示例
use tokio::sync::mpsc;

let (tx, rx) = mpsc::channel(10);
assert_eq!(0, rx.len());

tx.send(0).await.unwrap();
assert_eq!(1, rx.len());
源代码

pub fn capacity(&self) -> usize

返回通道的当前容量。

当 sender 通过调用 Sender::send 或通过 Sender::reserve 预留容量发送值时,容量会减少。当接收值时,容量会增加。这与 max_capacity 不同,后者始终返回调用 channel 时最初指定的缓冲区容量。

§示例
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel::<()>(5);

assert_eq!(rx.capacity(), 5);

// Making a reservation drops the capacity by one.
let permit = tx.reserve().await.unwrap();
assert_eq!(rx.capacity(), 4);
assert_eq!(rx.len(), 0);

// Sending and receiving a value increases the capacity by one.
permit.send(());
assert_eq!(rx.len(), 1);
rx.recv().await.unwrap();
assert_eq!(rx.capacity(), 5);

// Directly sending a message drops the capacity by one.
tx.send(()).await.unwrap();
assert_eq!(rx.capacity(), 4);
assert_eq!(rx.len(), 1);

// Receiving the message increases the capacity by one.
rx.recv().await.unwrap();
assert_eq!(rx.capacity(), 5);
assert_eq!(rx.len(), 0);
源代码

pub fn max_capacity(&self) -> usize

返回通道的最大缓冲区容量。

最大容量是调用 channel 时最初指定的缓冲区容量。这与 capacity 不同,后者返回当前可用的缓冲区容量:随着消息的发送和接收,capacity 返回的值会上下波动,而 max_capacity 返回的值保持不变。

§示例
use tokio::sync::mpsc;

let (tx, rx) = mpsc::channel::<()>(5);

// both max capacity and capacity are the same at first
assert_eq!(rx.max_capacity(), 5);
assert_eq!(rx.capacity(), 5);

// Making a reservation doesn't change the max capacity.
let permit = tx.reserve().await.unwrap();
assert_eq!(rx.max_capacity(), 5);
// but drops the capacity by one
assert_eq!(rx.capacity(), 4);
源代码

pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>

轮询以接收此通道上的下一条消息。

此方法返回:

  • Poll::Pending if no messages are available but the channel is not closed, or if a spurious failure happens.
  • Poll::Ready(Some(message)) if a message is available.
  • Poll::Ready(None) if the channel has been closed and all messages sent before it was closed have been received.

当该方法返回 Poll::Pending 时,所提供 Context 中的 Waker 会在任意接收器上发送消息或通道被关闭时被调度以接收唤醒。请注意,在多次调用 poll_recvpoll_recv_many 时,只有传递给最近一次调用的 Context 中的 Waker 会被调度以接收唤醒。

如果此方法由于虚假失败而返回 Poll::Pending,那么在导致虚假失败的情况得到解决后,会收到 Waker 通知。请注意,收到这样的唤醒并不能保证下一次调用一定会成功——它可能会因另一个虚假失败而失败。

源代码

pub fn poll_recv_many( &mut self, cx: &mut Context<'_>, buffer: &mut Vec<T>, limit: usize, ) -> Poll<usize>

轮询以接收此通道上的多条消息,并扩展所提供的缓冲区。

此方法返回:

  • Poll::Pending if no messages are available but the channel is not closed, or if a spurious failure happens.
  • Poll::Ready(count) where count is the number of messages successfully received and stored in buffer. This can be less than, or equal to, limit.
  • Poll::Ready(0) if limit is set to zero or when the channel is closed.

当该方法返回 Poll::Pending 时,所提供 Context 中的 Waker 会在任意接收器上发送消息或通道被关闭时被调度以接收唤醒。请注意,在多次调用 poll_recvpoll_recv_many 时,只有传递给最近一次调用的 Context 中的 Waker 会被调度以接收唤醒。

请注意,此方法不保证一定接收到恰好 limit 条消息。而是如果至少有一条消息可用,它会尽可能多地返回消息,直到达到给定的上限。仅当通道已关闭(或 limit 为 0)时,此方法才返回零。

§示例
use std::task::{Context, Poll};
use std::pin::Pin;
use tokio::sync::mpsc;
use futures::Future;

struct MyReceiverFuture<'a> {
    receiver: mpsc::Receiver<i32>,
    buffer: &'a mut Vec<i32>,
    limit: usize,
}

impl<'a> Future for MyReceiverFuture<'a> {
    type Output = usize; // Number of messages received

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let MyReceiverFuture { receiver, buffer, limit } = &mut *self;

        // Now `receiver` and `buffer` are mutable references, and `limit` is copied
        match receiver.poll_recv_many(cx, *buffer, *limit) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(count) => Poll::Ready(count),
        }
    }
}

let (tx, rx) = mpsc::channel(32);
let mut buffer = Vec::new();

let my_receiver_future = MyReceiverFuture {
    receiver: rx,
    buffer: &mut buffer,
    limit: 3,
};

for i in 0..10 {
    tx.send(i).await.unwrap();
}

let count = my_receiver_future.await;
assert_eq!(count, 3);
assert_eq!(buffer, vec![0,1,2])
源代码

pub fn sender_strong_count(&self) -> usize

返回 Sender 句柄的数量。

源代码

pub fn sender_weak_count(&self) -> usize

返回 WeakSender 句柄的数量。

trait 实现§

源代码§

impl<T> Debug for Receiver<T>

源代码§

fn fmt(&self, fmt: &mut Formatter<'_>) -> Result

使用给定的格式化器格式化此值。 更多信息
源代码§

impl<T> Unpin for Receiver<T>

自动 trait 实现§

§

impl<T> Freeze for Receiver<T>

§

impl<T> RefUnwindSafe for Receiver<T>

§

impl<T> Send for Receiver<T>
where T: Send,

§

impl<T> Sync for Receiver<T>
where T: Send,

§

impl<T> UnsafeUnpin for Receiver<T>

§

impl<T> UnwindSafe for Receiver<T>

blanket 实现§

源代码§

impl<T> Any for T
where T: 'static + ?Sized,

源代码§

fn type_id(&self) -> TypeId

Gets the TypeId of self. 更多信息
源代码§

impl<T> Borrow<T> for T
where T: ?Sized,

源代码§

fn borrow(&self) -> &T

Immutably borrows from an owned value. 更多信息
源代码§

impl<T> BorrowMut<T> for T
where T: ?Sized,

源代码§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. 更多信息
源代码§

impl<T> From<T> for T

源代码§

fn from(t: T) -> T

原样返回参数。

源代码§

impl<T, U> Into<U> for T
where U: From<T>,

源代码§

fn into(self) -> U

调用 U::from(self)

也就是说,此转换是 From<T> for U 实现选择执行的操作。

源代码§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

源代码§

type Error = Infallible

转换出错时返回的类型。
源代码§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

执行转换。
源代码§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

源代码§

type Error = <U as TryFrom<T>>::Error

转换出错时返回的类型。
源代码§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

执行转换。