跳到主要内容

Receiver

搜索

结构体 Receiver 

源代码
pub struct Receiver<T> { /* 私有字段 */ }
展开描述

从关联的 Sender 接收一个值。

一对 SenderReceiverchannel 函数创建。

此通道没有 recv 方法,因为接收者本身实现了 Future trait。要接收 Result<T, error::RecvError>,请直接 .await Receiver 对象。

即使消息已发送,Future trait 上的 poll 方法也可能伪返回 Poll::Pending。如果发生此类伪失败,则当伪失败已解决时,调用者将被唤醒,以便调用者可以再次尝试接收消息。请注意,收到此类唤醒并不能保证下一次调用会成功——它可能以另一个伪失败而失败。(伪失败并不意味着消息已丢失。它只是被延迟了。)

§Cancellation safety

Receiver 可安全取消。如果在 tokio::select! 语句中将其用作事件,并且其他某个分支先完成,则可以保证此通道上未接收到任何消息。

§示例

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
    if let Err(_) = tx.send(3) {
        println!("the receiver dropped");
    }
});

match rx.await {
    Ok(v) => println!("got = {:?}", v),
    Err(_) => println!("the sender dropped"),
}

如果 sender 在未发送的情况下被 drop,则 receiver 将失败并返回 error::RecvError

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel::<u32>();

tokio::spawn(async move {
    drop(tx);
});

match rx.await {
    Ok(_) => panic!("This doesn't happen"),
    Err(_) => println!("the sender dropped"),
}

要在 tokio::select! 循环中使用 Receiver,请在通道前面添加 &mut

use tokio::sync::oneshot;
use tokio::time::{interval, sleep, Duration};

let (send, mut recv) = oneshot::channel();
let mut interval = interval(Duration::from_millis(100));

tokio::spawn(async move {
    sleep(Duration::from_secs(1)).await;
    send.send("shut down").unwrap();
});

loop {
    tokio::select! {
        _ = interval.tick() => println!("Another 100ms"),
        msg = &mut recv => {
            println!("Got message: {}", msg.unwrap());
            break;
        }
    }
}

实现§

源代码§

impl<T> Receiver<T>

源代码

pub fn close(&mut self)

阻止关联的 Sender 句柄发送值。

调用 close 之后发生的任何 send 操作都保证会失败。调用 close 后,如果有一个值在调用 close 之前已发送,则应调用 try_recv 来接收该值。

此函数可用于执行优雅关闭,并确保不会将值发送到通道中而永远不会被接收。

如果已接收到消息或通道已关闭,则 close 是空操作。

§示例

防止值被发送

use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;

let (tx, mut rx) = oneshot::channel();

assert!(!tx.is_closed());

rx.close();

assert!(tx.is_closed());
assert!(tx.send("never received").is_err());

match rx.try_recv() {
    Err(TryRecvError::Closed) => {}
    _ => unreachable!(),
}

接收在调用 close 之前发送的值

use tokio::sync::oneshot;

let (tx, mut rx) = oneshot::channel();

assert!(tx.send("will receive").is_ok());

rx.close();

let msg = rx.try_recv().unwrap();
assert_eq!(msg, "will receive");
源代码

pub fn is_terminated(&self) -> bool

检查此接收者是否已终止。

如果此接收者已产生一个 Poll::Ready 结果,则此函数返回 true。如果是这样,则不应再对此接收者进行 poll。

§示例

发送一个值并对其轮询。

use tokio::sync::oneshot;

use std::task::Poll;

let (tx, mut rx) = oneshot::channel();

// A receiver is not terminated when it is initialized.
assert!(!rx.is_terminated());

// A receiver is not terminated it is polled and is still pending.
let poll = futures::poll!(&mut rx);
assert_eq!(poll, Poll::Pending);
assert!(!rx.is_terminated());

// A receiver is not terminated if a value has been sent, but not yet read.
tx.send(0).unwrap();
assert!(!rx.is_terminated());

// A receiver *is* terminated after it has been polled and yielded a value.
assert_eq!((&mut rx).await, Ok(0));
assert!(rx.is_terminated());

关闭 sender。

use tokio::sync::oneshot;

let (tx, mut rx) = oneshot::channel::<()>();

// A receiver is not immediately terminated when the sender is dropped.
drop(tx);
assert!(!rx.is_terminated());

// A receiver *is* terminated after it has been polled and yielded an error.
let _ = (&mut rx).await.unwrap_err();
assert!(rx.is_terminated());
源代码

pub fn is_empty(&self) -> bool

检查通道是否为空。

如果通道中没有消息,此方法返回 true

对一个空的接收者(可能已产生一个值)进行 poll 不一定是安全的。请改用 is_terminated() 来检查接收者是否可以安全地 poll。

§示例

发送一个值。

use tokio::sync::oneshot;

let (tx, mut rx) = oneshot::channel();
assert!(rx.is_empty());

tx.send(0).unwrap();
assert!(!rx.is_empty());

let _ = (&mut rx).await;
assert!(rx.is_empty());

关闭 sender。

use tokio::sync::oneshot;

let (tx, mut rx) = oneshot::channel::<()>();

// A channel is empty if the sender is dropped.
drop(tx);
assert!(rx.is_empty());

// A closed channel still yields an error, however.
(&mut rx).await.expect_err("should yield an error");
assert!(rx.is_empty());

已终止的通道为空。

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = oneshot::channel();
    tx.send(0).unwrap();
    let _ = (&mut rx).await;

    // NB: an empty channel is not necessarily safe to poll!
    assert!(rx.is_empty());
    let _ = (&mut rx).await;
}
源代码

pub fn try_recv(&mut self) -> Result<T, TryRecvError>

尝试接收一个值。

如果通道中存在挂起的值,则返回该值。如果尚未发送任何值,则当前任务不会被注册以接收未来的通知。

此函数在异步任务上下文之外调用时非常有用。

请注意,与 poll 方法不同,try_recv 方法不会伪失败。在此次调用 try_recv 之前发生的任何发送或关闭事件都将正确返回给调用者。

§返回值
  • Ok(T) if a value is pending in the channel.
  • Err(TryRecvError::Empty) if no value has been sent yet.
  • Err(TryRecvError::Closed) if the sender has dropped without sending a value, or if the message has already been received.
§示例

在值发送之前调用 try_recv,然后在值发送之后调用。

use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;

let (tx, mut rx) = oneshot::channel();

match rx.try_recv() {
    // The channel is currently empty
    Err(TryRecvError::Empty) => {}
    _ => unreachable!(),
}

// Send a value
tx.send("hello").unwrap();

match rx.try_recv() {
     Ok(value) => assert_eq!(value, "hello"),
     _ => unreachable!(),
}

当 sender 在发送值之前被 drop 时调用 try_recv

use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;

let (tx, mut rx) = oneshot::channel::<()>();

drop(tx);

match rx.try_recv() {
    // The channel will never receive a value.
    Err(TryRecvError::Closed) => {}
    _ => unreachable!(),
}
源代码

pub fn blocking_recv(self) -> Result<T, RecvError>

阻塞接收,可在异步上下文之外调用。

§恐慌

如果在异步执行上下文中调用,此函数会发生 panic。

§示例
use std::thread;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel::<u8>();

    let sync_code = thread::spawn(move || {
        assert_eq!(Ok(10), rx.blocking_recv());
    });

    let _ = tx.send(10);
    sync_code.join().unwrap();
}

trait 实现§

源代码§

impl<T: Debug> Debug for Receiver<T>

源代码§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

使用给定的格式化器格式化此值。 更多信息
源代码§

impl<T> Drop for Receiver<T>

源代码§

fn drop(&mut self)

执行此类型的析构函数。 更多信息
源代码§

impl<T> Future for Receiver<T>

源代码§

type Output = Result<T, RecvError>

Future 完成时产生的值的类型。
源代码§

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>

Attempts to resolve the future to a final value, registering the current task for wakeup if the value is not yet available. 更多信息

自动 trait 实现§

§

impl<T> Freeze for Receiver<T>

§

impl<T> !RefUnwindSafe for Receiver<T>

§

impl<T> Send for Receiver<T>
where T: Send,

§

impl<T> Sync for Receiver<T>
where T: Send,

§

impl<T> Unpin for Receiver<T>

§

impl<T> UnsafeUnpin for Receiver<T>

§

impl<T> !UnwindSafe for Receiver<T>

blanket 实现§

源代码§

impl<T> Any for T
where T: 'static + ?Sized,

源代码§

fn type_id(&self) -> TypeId

Gets the TypeId of self. 更多信息
源代码§

impl<T> Borrow<T> for T
where T: ?Sized,

源代码§

fn borrow(&self) -> &T

Immutably borrows from an owned value. 更多信息
源代码§

impl<T> BorrowMut<T> for T
where T: ?Sized,

源代码§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. 更多信息
源代码§

impl<T> From<T> for T

源代码§

fn from(t: T) -> T

原样返回参数。

源代码§

impl<T, U> Into<U> for T
where U: From<T>,

源代码§

fn into(self) -> U

调用 U::from(self)

也就是说,此转换是 From<T> for U 实现选择执行的操作。

源代码§

impl<F> IntoFuture for F
where F: Future,

源代码§

type Output = <F as Future>::Output

Future 完成时产生的输出。
源代码§

type IntoFuture = F

我们将要把此值转变成哪种 future?
源代码§

fn into_future(self) -> <F as IntoFuture>::IntoFuture

Creates a future from a value. 更多信息
源代码§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

源代码§

type Error = Infallible

转换出错时返回的类型。
源代码§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

执行转换。
源代码§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

源代码§

type Error = <U as TryFrom<T>>::Error

转换出错时返回的类型。
源代码§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

执行转换。