展开描述
多生产者、多消费者 channel,仅保留最后发送的值。
此 channel 对于从代码库中的多个位置监视值的更改非常有用,例如配置值的更改。
§Usage
channel 返回一个 Sender / Receiver 对。它们是 channel 的生产者和消费者两半。该 channel 以初始值创建。
每个 Receiver 独立跟踪其调用者看到的最后一个值。
要访问 channel 中存储的当前值并将其标记为由给定 Receiver 看到的,请使用 Receiver::borrow_and_update()。
要访问当前值而不将其标记为已查看,请使用 Receiver::borrow()。(如果该值已被标记为已查看,则 Receiver::borrow() 等效于 Receiver::borrow_and_update()。)
有关何时使用这些方法的更多信息,请参阅此处。
§Change notifications
Receiver 一半提供了一个异步 changed 方法。当通过 Sender 一半发送新的未见过的值时,此方法就绪。
Receiver::changed()returns:Ok(())on receiving a new value.Err(RecvError)if the channel has been closed AND the current value is seen.
- If the current value is unseen when calling
changed, thenchangedwill return immediately. If the current value is seen, then it will sleep until either a new message is sent via theSenderhalf, or theSenderis dropped. - On completion, the
changedmethod marks the new value as seen. - At creation, the initial value is considered seen. In other words,
Receiver::changed()will not return until a subsequent value is sent. - New
Receiverinstances can be created withSender::subscribe(). The current value at the time theReceiveris created is considered seen.
§changed versus has_changed
Receiver 一半提供了两种方法来检查 channel 中的更改:has_changed 和 changed。
-
has_changed是一个同步方法,用于检查当前值是否已被查看,并返回一个布尔值。此方法不会将该值标记为已查看。 -
changed是一个异步方法,一旦 channel 中出现未见过的值,它就会返回。此方法确实会将该值标记为已查看。
请注意,这两种方法在何时返回错误方面存在两个行为差异。
has_changederrors if and only if the channel is closed.changederrors if the channel has been closed AND the current value is seen.
请参阅下面的示例,该示例展示了这些方法具有不同的可失败性。
§borrow_and_update versus borrow
如果接收者打算在循环中等待来自 changed 的通知,则应优先使用 Receiver::borrow_and_update() 而非 Receiver::borrow()。这避免了 changed 就绪和读取该值之间可能发送新值的竞争条件。(如果使用 Receiver::borrow(),则循环可能会使用相同的值运行两次。)
如果接收者只对当前值感兴趣,并且不打算等待更改,那么可以使用 Receiver::borrow()。使用 borrow 可能更方便,因为它是 &self 方法——borrow_and_update 需要 &mut self。
§示例
以下示例会打印 hello! world! 。
use tokio::sync::watch;
use tokio::time::{Duration, sleep};
let (tx, mut rx) = watch::channel("hello");
tokio::spawn(async move {
// Use the equivalent of a "do-while" loop so the initial value is
// processed before awaiting the `changed()` future.
loop {
println!("{}! ", *rx.borrow_and_update());
if rx.changed().await.is_err() {
break;
}
}
});
sleep(Duration::from_millis(100)).await;
tx.send("world")?;changed 与 has_changed 在可失败性方面的差异。
use tokio::sync::watch;
let (tx, mut rx) = watch::channel("hello");
tx.send("goodbye").unwrap();
drop(tx);
// `has_changed` does not mark the value as seen and errors
// since the channel is closed.
assert!(rx.has_changed().is_err());
// `changed` returns Ok since the value is not already marked as seen
// even if the channel is closed.
assert!(rx.changed().await.is_ok());
// The `changed` call above marks the value as seen.
// The next `changed` call now returns an error as the channel is closed
// AND the current value is seen.
assert!(rx.changed().await.is_err());§Closing
Sender::is_closed 和 Sender::closed 允许生产者检测何时所有 Receiver 句柄都已被 drop。这表明对正在生成的值不再有兴趣,可以停止工作。
在所有 sender 和 receiver 都 drop 之前,channel 中的值不会被 drop。
§Thread safety
Sender 和 Receiver 都是线程安全的。它们可以移动到其他线程,也可以在并发环境中使用。Receiver 句柄的克隆可以移动到单独的线程,也可以并发使用。
模块§
- error
- Watch error types.
结构体§
- Receiver
- Receives values from the associated
Sender. - Ref
- Returns a reference to the inner value.
- Sender
- Sends values to the associated
Receiver.
函数§
- channel
- Creates a new watch channel, returning the “send” and “receive” handles.