跳到主要内容

UnboundedReceiver

搜索

结构体 UnboundedReceiver 

源代码
pub struct UnboundedReceiver<T> { /* 私有字段 */ }
展开描述

从关联的 UnboundedSender 接收值。

实例由 unbounded_channel 函数创建。

此接收器可以使用 UnboundedReceiverStream 转换为 Stream

实现§

源代码§

impl<T> UnboundedReceiver<T>

源代码

pub async fn recv(&mut self) -> Option<T>

接收此接收器的下一个值。

如果 channel 已关闭并且 channel 缓冲区中没有剩余消息,则此方法返回 None。这表明永远无法从此 Receiver 接收更多值。当所有 sender 都已被 drop 或调用 close 时,channel 关闭。

如果 channel 的缓冲区中没有消息,但 channel 尚未关闭,则此方法将休眠,直到有消息被发送或 channel 被关闭。

§取消安全性

此方法可安全取消。如果 recvtokio::select! 语句中作为事件使用,并且其他分支先完成,则可以保证此通道没有接收到任何消息。

§示例
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::unbounded_channel();

tokio::spawn(async move {
    tx.send("hello").unwrap();
});

assert_eq!(Some("hello"), rx.recv().await);
assert_eq!(None, rx.recv().await);

值被缓冲:

use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::unbounded_channel();

tx.send("hello").unwrap();
tx.send("world").unwrap();

assert_eq!(Some("hello"), rx.recv().await);
assert_eq!(Some("world"), rx.recv().await);
源代码

pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize

为此 receiver 接收下一个值并扩展 buffer

此方法以不超过 limit 指定的固定值数量扩展 buffer。如果 limit 为零,则函数立即返回 0。返回值是添加到 buffer 的值的数量。

对于 limit > 0,如果 channel 队列中没有消息,但 channel 尚未关闭,则此方法将休眠,直到有消息被发送或 channel 被关闭。

对于 limit 的非零值,此方法永远不会返回 0,除非 channel 已关闭并且 channel 队列中没有剩余消息。这表明永远无法从此 Receiver 接收更多值。当所有 sender 都已被 drop 或调用 close 时,channel 关闭。

buffer 的容量会按需增加。

§取消安全性

此方法可安全取消。如果 recv_manytokio::select! 语句中作为事件使用,并且其他分支先完成,则可以保证此通道没有接收到任何消息。

§示例
use tokio::sync::mpsc;

let mut buffer: Vec<&str> = Vec::with_capacity(2);
let limit = 2;
let (tx, mut rx) = mpsc::unbounded_channel();
let tx2 = tx.clone();
tx2.send("first").unwrap();
tx2.send("second").unwrap();
tx2.send("third").unwrap();

// Call `recv_many` to receive up to `limit` (2) values.
assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
assert_eq!(vec!["first", "second"], buffer);

// If the buffer is full, the next call to `recv_many`
// reserves additional capacity.
assert_eq!(1, rx.recv_many(&mut buffer, limit).await);

tokio::spawn(async move {
    tx.send("fourth").unwrap();
});

// 'tx' is dropped, but `recv_many`
// is guaranteed not to return 0 as the channel
// is not yet closed.
assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
assert_eq!(vec!["first", "second", "third", "fourth"], buffer);

// Once the last sender is dropped, the channel is
// closed and `recv_many` returns 0, capacity unchanged.
drop(tx2);
assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
源代码

pub fn try_recv(&mut self) -> Result<T, TryRecvError>

尝试为该 receiver 接收下一个值。

如果通道当前为空,但仍存在未释放的 senderpermit,则此方法返回 Empty 错误。

如果通道当前为空,且没有未释放的 senderpermit,则此方法返回 Disconnected 错误。

poll_recv 方法不同,此方法绝不会虚假地返回 Empty 错误。

§示例
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;

let (tx, mut rx) = mpsc::unbounded_channel();

tx.send("hello").unwrap();

assert_eq!(Ok("hello"), rx.try_recv());
assert_eq!(Err(TryRecvError::Empty), rx.try_recv());

tx.send("hello").unwrap();
// Drop the last sender, closing the channel.
drop(tx);

assert_eq!(Ok("hello"), rx.try_recv());
assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
源代码

pub fn blocking_recv(&mut self) -> Option<T>

阻塞接收,可在异步上下文之外调用。

§恐慌

如果在异步执行上下文中调用,此函数会发生 panic。

§示例
use std::thread;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::unbounded_channel::<u8>();

    let sync_code = thread::spawn(move || {
        assert_eq!(Some(10), rx.blocking_recv());
    });

    let _ = tx.send(10);
    sync_code.join().unwrap();
}
源代码

pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize

用于阻塞上下文的 Self::recv_many 变体。

Self::blocking_recv 中的相同条件适用。

源代码

pub fn close(&mut self)

关闭 channel 的接收端,但不 drop 它。

这可以防止任何进一步的消息在 channel 上发送,同时仍然允许接收器排出已缓冲的消息。

为了保证不会丢失消息,在调用 close() 之后,必须调用 recv() 直到返回 None

源代码

pub fn is_closed(&self) -> bool

检查通道是否已关闭。

如果 channel 已关闭,则此方法返回 true。当所有 UnboundedSender 都已被 drop 或调用 UnboundedReceiver::close 时,channel 关闭。

§示例
use tokio::sync::mpsc;

let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
assert!(!rx.is_closed());

rx.close();

assert!(rx.is_closed());
源代码

pub fn is_empty(&self) -> bool

检查通道是否为空。

如果通道中没有消息,此方法返回 true

§示例
use tokio::sync::mpsc;

let (tx, rx) = mpsc::unbounded_channel();
assert!(rx.is_empty());

tx.send(0).unwrap();
assert!(!rx.is_empty());
源代码

pub fn len(&self) -> usize

返回通道中的消息数。

§示例
use tokio::sync::mpsc;

let (tx, rx) = mpsc::unbounded_channel();
assert_eq!(0, rx.len());

tx.send(0).unwrap();
assert_eq!(1, rx.len());
源代码

pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>

轮询以接收此通道上的下一条消息。

此方法返回:

  • Poll::Pending if no messages are available but the channel is not closed, or if a spurious failure happens.
  • Poll::Ready(Some(message)) if a message is available.
  • Poll::Ready(None) if the channel has been closed and all messages sent before it was closed have been received.

当该方法返回 Poll::Pending 时,所提供 Context 中的 Waker 会在任意接收器上发送消息或通道被关闭时被调度以接收唤醒。请注意,在多次调用 poll_recvpoll_recv_many 时,只有传递给最近一次调用的 Context 中的 Waker 会被调度以接收唤醒。

如果此方法由于虚假失败而返回 Poll::Pending,那么在导致虚假失败的情况得到解决后,会收到 Waker 通知。请注意,收到这样的唤醒并不能保证下一次调用一定会成功——它可能会因另一个虚假失败而失败。

源代码

pub fn poll_recv_many( &mut self, cx: &mut Context<'_>, buffer: &mut Vec<T>, limit: usize, ) -> Poll<usize>

轮询以接收此通道上的多条消息,并扩展所提供的缓冲区。

此方法返回:

  • Poll::Pending if no messages are available but the channel is not closed, or if a spurious failure happens.
  • Poll::Ready(count) where count is the number of messages successfully received and stored in buffer. This can be less than, or equal to, limit.
  • Poll::Ready(0) if limit is set to zero or when the channel is closed.

当该方法返回 Poll::Pending 时,所提供 Context 中的 Waker 会在任意接收器上发送消息或通道被关闭时被调度以接收唤醒。请注意,在多次调用 poll_recvpoll_recv_many 时,只有传递给最近一次调用的 Context 中的 Waker 会被调度以接收唤醒。

请注意,此方法不保证一定接收到恰好 limit 条消息。而是如果至少有一条消息可用,它会尽可能多地返回消息,直到达到给定的上限。仅当通道已关闭(或 limit 为 0)时,此方法才返回零。

§示例
use std::task::{Context, Poll};
use std::pin::Pin;
use tokio::sync::mpsc;
use futures::Future;

struct MyReceiverFuture<'a> {
    receiver: mpsc::UnboundedReceiver<i32>,
    buffer: &'a mut Vec<i32>,
    limit: usize,
}

impl<'a> Future for MyReceiverFuture<'a> {
    type Output = usize; // Number of messages received

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let MyReceiverFuture { receiver, buffer, limit } = &mut *self;

        // Now `receiver` and `buffer` are mutable references, and `limit` is copied
        match receiver.poll_recv_many(cx, *buffer, *limit) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(count) => Poll::Ready(count),
        }
    }
}

let (tx, rx) = mpsc::unbounded_channel::<i32>();
let mut buffer = Vec::new();

let my_receiver_future = MyReceiverFuture {
    receiver: rx,
    buffer: &mut buffer,
    limit: 3,
};

for i in 0..10 {
    tx.send(i).expect("Unable to send integer");
}

let count = my_receiver_future.await;
assert_eq!(count, 3);
assert_eq!(buffer, vec![0,1,2])
源代码

pub fn sender_strong_count(&self) -> usize

返回 UnboundedSender 句柄的数量。

源代码

pub fn sender_weak_count(&self) -> usize

返回 WeakUnboundedSender 句柄的数量。

trait 实现§

源代码§

impl<T> Debug for UnboundedReceiver<T>

源代码§

fn fmt(&self, fmt: &mut Formatter<'_>) -> Result

使用给定的格式化器格式化此值。 更多信息

自动 trait 实现§

§

impl<T> Freeze for UnboundedReceiver<T>

§

impl<T> RefUnwindSafe for UnboundedReceiver<T>

§

impl<T> Send for UnboundedReceiver<T>
where T: Send,

§

impl<T> Sync for UnboundedReceiver<T>
where T: Send,

§

impl<T> Unpin for UnboundedReceiver<T>

§

impl<T> UnsafeUnpin for UnboundedReceiver<T>

§

impl<T> UnwindSafe for UnboundedReceiver<T>

blanket 实现§

源代码§

impl<T> Any for T
where T: 'static + ?Sized,

源代码§

fn type_id(&self) -> TypeId

Gets the TypeId of self. 更多信息
源代码§

impl<T> Borrow<T> for T
where T: ?Sized,

源代码§

fn borrow(&self) -> &T

Immutably borrows from an owned value. 更多信息
源代码§

impl<T> BorrowMut<T> for T
where T: ?Sized,

源代码§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. 更多信息
源代码§

impl<T> From<T> for T

源代码§

fn from(t: T) -> T

原样返回参数。

源代码§

impl<T, U> Into<U> for T
where U: From<T>,

源代码§

fn into(self) -> U

调用 U::from(self)

也就是说,此转换是 From<T> for U 实现选择执行的操作。

源代码§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

源代码§

type Error = Infallible

转换出错时返回的类型。
源代码§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

执行转换。
源代码§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

源代码§

type Error = <U as TryFrom<T>>::Error

转换出错时返回的类型。
源代码§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

执行转换。