pub struct Sender<T> { /* 私有字段 */ }实现§
源代码§impl<T> Sender<T>
impl<T> Sender<T>
源代码pub fn new(init: T) -> Self
pub fn new(init: T) -> Self
创建 watch 通道的发送端。
有关调用此函数时的错误,请参阅 watch::channel 的文档。请注意,当没有接收者时尝试发送值将返回错误。
§示例
let sender = tokio::sync::watch::Sender::new(0u8);
assert!(sender.send(3).is_err());
let _rec = sender.subscribe();
assert!(sender.send(4).is_ok());源代码pub fn send(&self, value: T) -> Result<(), SendError<T>>
pub fn send(&self, value: T) -> Result<(), SendError<T>>
通过通道发送一个新值,通知所有接收者。
如果通道已关闭(即所有接收者都已 drop),则此方法将失败。可以使用 subscribe 方法重新打开通道。但是,当 send 失败时,该值不会提供给未来的接收者(但会随 SendError 返回)。
为了始终为未来的接收者提供一个新值,即使当前没有接收者,也可以使用其他发送方法之一(send_if_modified、send_modify 或 send_replace)。
源代码pub fn send_modify<F>(&self, modify: F)
pub fn send_modify<F>(&self, modify: F)
在原地无条件修改被监视的值,通知所有接收者。
这对于修改被监视的值很有用,而无需分配新实例。此外,即使没有 receiver,此方法也允许发送值。
如果该值仅在可变借用期间被有条件地修改,则首选使用更通用的函数 Self::send_if_modified(),以防止对未修改的值发出不必要的更改通知。
§恐慌
当 modify 闭包被调用时发生 panic,此函数也会 panic。panic 时不会通知任何 receiver。在 panic 之前闭包对被监视的值所做的所有更改,都将在后续的 borrow 调用中可见。
§示例
use tokio::sync::watch;
struct State {
counter: usize,
}
let (state_tx, state_rx) = watch::channel(State { counter: 0 });
state_tx.send_modify(|state| state.counter += 1);
assert_eq!(state_rx.borrow().counter, 1);源代码pub fn send_if_modified<F>(&self, modify: F) -> bool
pub fn send_if_modified<F>(&self, modify: F) -> bool
在原地有条件修改被监视的值,仅在修改后通知所有接收者。
这对于修改被监视的值很有用,而无需分配新实例。此外,即使没有 receiver,此方法也允许发送值。
如果该值在可变借用期间确实已被修改,则 modify 闭包必须返回 true。仅当尽管进行了可变借用但保证该值未被修改时,它才应返回 false。
仅当闭包返回 true 时,接收者才会收到通知。如果闭包已修改该值但返回了 false,则会导致静默修改,即修改后的值在后续对 borrow 的调用中可见,但接收者不会收到更改通知。
返回闭包的结果,即如果值已被修改则为 true,否则为 false。
§恐慌
当 modify 闭包被调用时发生 panic,此函数也会 panic。panic 时不会通知任何 receiver。在 panic 之前闭包对被监视的值所做的所有更改,都将在后续的 borrow 调用中可见。
§示例
use tokio::sync::watch;
struct State {
counter: usize,
}
let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
let inc_counter_if_odd = |state: &mut State| {
if state.counter % 2 == 1 {
state.counter += 1;
return true;
}
false
};
assert_eq!(state_rx.borrow().counter, 1);
assert!(!state_rx.has_changed().unwrap());
assert!(state_tx.send_if_modified(inc_counter_if_odd));
assert!(state_rx.has_changed().unwrap());
assert_eq!(state_rx.borrow_and_update().counter, 2);
assert!(!state_rx.has_changed().unwrap());
assert!(!state_tx.send_if_modified(inc_counter_if_odd));
assert!(!state_rx.has_changed().unwrap());
assert_eq!(state_rx.borrow_and_update().counter, 2);源代码pub fn send_replace(&self, value: T) -> T
pub fn send_replace(&self, value: T) -> T
通过通道发送一个新值,通知所有接收者,并返回通道中的上一个值。
这对于重用被监视值内部的缓冲区非常有用。此外,即使没有接收者,此方法也允许发送值。
§示例
use tokio::sync::watch;
let (tx, _rx) = watch::channel(1);
assert_eq!(tx.send_replace(2), 1);
assert_eq!(tx.send_replace(3), 2);源代码pub fn borrow(&self) -> Ref<'_, T>
pub fn borrow(&self) -> Ref<'_, T>
返回对最近发送值的引用
未释放的借用会对内部值保持一个读锁。这意味着长时间存活的借用可能会导致生产者一半被阻塞。建议尽量缩短借用的生命周期。此外,如果运行在允许 !Send future 的环境中,必须确保返回的 Ref 类型不会跨越 .await 点存活,否则可能会导致死锁。
§示例
use tokio::sync::watch;
let (tx, _) = watch::channel("hello");
assert_eq!(*tx.borrow(), "hello");源代码pub fn is_closed(&self) -> bool
pub fn is_closed(&self) -> bool
检查通道是否已关闭。当所有接收者都已 drop 时会发生这种情况。
§示例
let (tx, rx) = tokio::sync::watch::channel(());
assert!(!tx.is_closed());
drop(rx);
assert!(tx.is_closed());源代码pub async fn closed(&self)
pub async fn closed(&self)
当所有接收者都已 drop 时完成。
这允许生产者在对所产生值的兴趣被取消时收到通知,并立即停止工作。通道关闭后,重新打开它的唯一方法是调用 Sender::subscribe 以获取新的接收者。
如果通道在短时间内关闭(例如,最后一个接收者被 drop,然后调用 subscribe),那么此次对 closed 的调用可能会返回,但也可能不会“注意到”通道在短时间内已关闭。
§取消安全性
此方法可安全取消。
§示例
use tokio::sync::watch;
let (tx, rx) = watch::channel("hello");
tokio::spawn(async move {
// use `rx`
drop(rx);
});
// Waits for `rx` to drop
tx.closed().await;
println!("the `rx` handles dropped")源代码pub fn subscribe(&self) -> Receiver<T>
pub fn subscribe(&self) -> Receiver<T>
创建一个连接到此 Sender 的新 Receiver。
在此次调用 subscribe 之前发送的所有消息最初都被新的 Receiver 标记为已读。
即使没有其他接收者,也可以调用此方法。在这种情况下,通道将被重新打开。
§示例
新通道将接收通过此 Sender 发送的消息。
use tokio::sync::watch;
let (tx, _rx) = watch::channel(0u64);
tx.send(5).unwrap();
let rx = tx.subscribe();
assert_eq!(5, *rx.borrow());
tx.send(10).unwrap();
assert_eq!(10, *rx.borrow());最近的消息被视为已被通道读取,因此保证此测试通过。
use tokio::sync::watch;
use tokio::time::Duration;
let (tx, _rx) = watch::channel(0u64);
tx.send(5).unwrap();
let mut rx = tx.subscribe();
tokio::spawn(async move {
// by spawning and sleeping, the message is sent after `main`
// hits the call to `changed`.
tokio::time::sleep(Duration::from_millis(10)).await;
tx.send(100).unwrap();
});
rx.changed().await.unwrap();
assert_eq!(100, *rx.borrow());源代码pub fn receiver_count(&self) -> usize
pub fn receiver_count(&self) -> usize
返回当前存在的接收者数量。
§示例
use tokio::sync::watch;
let (tx, rx1) = watch::channel("hello");
assert_eq!(1, tx.receiver_count());
let mut _rx2 = rx1.clone();
assert_eq!(2, tx.receiver_count());源代码pub fn sender_count(&self) -> usize
pub fn sender_count(&self) -> usize
返回当前存在的发送者数量。
§示例
use tokio::sync::watch;
let (tx1, rx) = watch::channel("hello");
assert_eq!(1, tx1.sender_count());
let tx2 = tx1.clone();
assert_eq!(2, tx1.sender_count());
assert_eq!(2, tx2.sender_count());源代码pub fn same_channel(&self, other: &Self) -> bool
pub fn same_channel(&self, other: &Self) -> bool
如果这些 sender 属于同一个通道,则返回 true。
§示例
let (tx, rx) = tokio::sync::watch::channel(true);
let tx2 = tx.clone();
assert!(tx.same_channel(&tx2));
let (tx3, rx3) = tokio::sync::watch::channel(true);
assert!(!tx3.same_channel(&tx2));