pub struct Semaphore { /* 私有字段 */ }展开描述
执行异步许可获取的计数信号量。
信号量维护一组许可。许可用于同步对共享资源的访问。信号量与互斥锁的区别在于,它允许一个以上的并发调用者同时访问共享资源。
当调用 acquire 并且信号量有剩余许可时,函数会立即返回一个许可。但是,如果没有剩余许可可用,acquire 会(异步地)等待,直到某个未完成的许可被 drop。此时,释放的许可将分配给调用者。
此 Semaphore 是公平的,这意味着许可按请求顺序发放。这种公平性也适用于涉及 acquire_many 的情况,因此如果队列前端的 acquire_many 调用请求的许可数超过当前可用的数量,即使信号量有足够的许可完成该 acquire 调用,也会阻止其完成。
要在 poll 函数中使用 Semaphore,可以使用 PollSemaphore 工具。
§示例
基本用法:
use tokio::sync::{Semaphore, TryAcquireError};
let semaphore = Semaphore::new(3);
let a_permit = semaphore.acquire().await.unwrap();
let two_permits = semaphore.acquire_many(2).await.unwrap();
assert_eq!(semaphore.available_permits(), 0);
let permit_attempt = semaphore.try_acquire();
assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));§Limit the number of simultaneously opened files in your program
大多数操作系统对打开的文件句柄数量有限制。即使在没有显式限制的系统中,资源约束也会隐式设置打开文件数量的上限。如果你的程序尝试打开大量文件并超出此限制,则会导致错误。
此示例使用具有 100 个许可的 Semaphore。通过在访问文件之前从 Semaphore 获取许可,你可以确保程序一次最多打开 100 个文件。当尝试打开第 101 个文件时,程序将等待直到有许可可用,然后再继续打开另一个文件。
use std::io::Result;
use tokio::fs::File;
use tokio::sync::Semaphore;
use tokio::io::AsyncWriteExt;
static PERMITS: Semaphore = Semaphore::const_new(100);
async fn write_to_file(message: &[u8]) -> Result<()> {
let _permit = PERMITS.acquire().await.unwrap();
let mut buffer = File::create("example.txt").await?;
buffer.write_all(message).await?;
Ok(()) // Permit goes out of scope here, and is available again for acquisition
}§Limit the number of outgoing requests being sent at the same time
在某些情况下,可能需要限制并行发送的传出请求的数量。这可能是由于所使用 API 的限制或应用程序所运行系统的网络资源限制。
此示例使用具有 10 个许可的 Arc<Semaphore>。通过克隆 Arc<Semaphore>,每个生成的任务都会获得对信号量的引用。在任务发送请求之前,它必须通过调用 Semaphore::acquire 从信号量获取许可。这确保了在任何给定时间最多并行发送 10 个请求。任务发送请求后,它会 drop 该许可以允许其他任务发送请求。
use std::sync::Arc;
use tokio::sync::Semaphore;
// Define maximum number of parallel requests.
let semaphore = Arc::new(Semaphore::new(5));
// Spawn many tasks that will send requests.
let mut jhs = Vec::new();
for task_id in 0..50 {
let semaphore = semaphore.clone();
let jh = tokio::spawn(async move {
// Acquire permit before sending request.
let _permit = semaphore.acquire().await.unwrap();
// Send the request.
let response = send_request(task_id).await;
// Drop the permit after the request has been sent.
drop(_permit);
// Handle response.
// ...
response
});
jhs.push(jh);
}
// Collect responses from tasks.
let mut responses = Vec::new();
for jh in jhs {
let response = jh.await.unwrap();
responses.push(response);
}
// Process responses.
// ...§Limit the number of incoming requests being handled at the same time
与限制同时打开的文件数量类似,网络句柄也是一种有限的资源。允许无限制地处理请求可能会导致拒绝服务,以及许多其他问题。
此示例使用 Arc<Semaphore> 而不是全局变量。为了限制可以同时处理的请求数,我们在生成每个任务之前为其获取一个许可。一旦获取,就会生成一个新任务;任务完成后,许可会在任务内部被 drop,以允许其他任务生成。许可必须通过 Semaphore::acquire_owned 获取,才能跨任务边界移动。(因为我们的信号量不是全局变量——如果是的话,那么 acquire 就足够了。)
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> std::io::Result<()> {
let semaphore = Arc::new(Semaphore::new(3));
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
// Acquire permit before accepting the next socket.
//
// We use `acquire_owned` so that we can move `permit` into
// other tasks.
let permit = semaphore.clone().acquire_owned().await.unwrap();
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// Do work using the socket.
handle_connection(&mut socket).await;
// Drop socket while the permit is still live.
drop(socket);
// Drop the permit, so more tasks can be created.
drop(permit);
});
}
}§Prevent tests from running in parallel
默认情况下,Rust 在同一文件中并行运行测试。但是,在某些情况下,并行运行两个测试可能会导致问题。例如,当测试使用同一数据库时,可能会发生这种情况。
考虑以下场景:
test_insert: Inserts a key-value pair into the database, then retrieves the value using the same key to verify the insertion.test_update: Inserts a key, then updates the key to a new value and verifies that the value has been accurately updated.test_others: A third test that doesn’t modify the database state. It can run in parallel with the other tests.
在此示例中,test_insert 和 test_update 需要按顺序运行才能工作,但哪个测试先运行并不重要。我们可以利用具有单个许可的信号量来解决此挑战。
use tokio::sync::Semaphore;
// Initialize a static semaphore with only one permit, which is used to
// prevent test_insert and test_update from running in parallel.
static PERMIT: Semaphore = Semaphore::const_new(1);
// Initialize the database that will be used by the subsequent tests.
static DB: Database = Database::setup();
#[tokio::test]
async fn test_insert() {
// Acquire permit before proceeding. Since the semaphore has only one permit,
// the test will wait if the permit is already acquired by other tests.
let permit = PERMIT.acquire().await.unwrap();
// Do the actual test stuff with database
// Insert a key-value pair to database
let (key, value) = ("name", 0);
DB.insert(key, value).await;
// Verify that the value has been inserted correctly.
assert_eq!(DB.get(key).await, value);
// Undo the insertion, so the database is empty at the end of the test.
DB.delete(key).await;
// Drop permit. This allows the other test to start running.
drop(permit);
}
#[tokio::test]
async fn test_update() {
// Acquire permit before proceeding. Since the semaphore has only one permit,
// the test will wait if the permit is already acquired by other tests.
let permit = PERMIT.acquire().await.unwrap();
// Do the same insert.
let (key, value) = ("name", 0);
DB.insert(key, value).await;
// Update the existing value with a new one.
let new_value = 1;
DB.update(key, new_value).await;
// Verify that the value has been updated correctly.
assert_eq!(DB.get(key).await, new_value);
// Undo any modificattion.
DB.delete(key).await;
// Drop permit. This allows the other test to start running.
drop(permit);
}
#[tokio::test]
async fn test_others() {
// This test can run in parallel with test_insert and test_update,
// so it does not use PERMIT.
}§Rate limiting using a token bucket
此示例展示了 add_permits 和 SemaphorePermit::forget 方法。
许多应用程序和系统对某些操作的执行速率有限制。超过此速率可能导致性能不佳甚至出错。
此示例使用令牌桶实现速率限制。令牌桶是一种速率限制形式,它不会立即生效,以允许同时到达的请求出现短时突发。
使用令牌桶时,每个传入的请求都会消耗一个令牌,并且令牌以定义速率限制的特定速率进行补充。当请求突发到达时,令牌会立即发放,直到桶空为止。一旦桶为空,请求将不得不等待新令牌的添加。
与限制可以同时处理的请求数量的示例不同,我们在处理完一个请求后不会将令牌添加回去。相反,令牌仅由计时器任务添加。
请注意,当持续时间很短时,此实现并非最优,因为它会不断循环和休眠,从而消耗大量 CPU。
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{interval, Duration};
struct TokenBucket {
sem: Arc<Semaphore>,
jh: tokio::task::JoinHandle<()>,
}
impl TokenBucket {
fn new(duration: Duration, capacity: usize) -> Self {
let sem = Arc::new(Semaphore::new(capacity));
// refills the tokens at the end of each interval
let jh = tokio::spawn({
let sem = sem.clone();
let mut interval = interval(duration);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
async move {
loop {
interval.tick().await;
if sem.available_permits() < capacity {
sem.add_permits(1);
}
}
}
});
Self { jh, sem }
}
async fn acquire(&self) {
// This can return an error if the semaphore is closed, but we
// never close it, so this error can never happen.
let permit = self.sem.acquire().await.unwrap();
// To avoid releasing the permit back to the semaphore, we use
// the `SemaphorePermit::forget` method.
permit.forget();
}
}
impl Drop for TokenBucket {
fn drop(&mut self) {
// Kill the background task so it stops taking up resources when we
// don't need it anymore.
self.jh.abort();
}
}
let capacity = 5;
let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
let bucket = TokenBucket::new(update_interval, capacity);
for _ in 0..5 {
bucket.acquire().await;
// do the operation
}实现§
源代码§impl Semaphore
impl Semaphore
源代码pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS
pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS
信号量可以持有的最大许可数。它是 usize::MAX >> 3。
超出此限制通常会导致 panic。
源代码pub fn new(permits: usize) -> Self
pub fn new(permits: usize) -> Self
使用初始许可数创建一个新的信号量。
如果 permits 超过 Semaphore::MAX_PERMITS,则会发生 panic。
源代码pub const fn const_new(permits: usize) -> Self
pub const fn const_new(permits: usize) -> Self
使用初始许可数创建一个新的信号量。
当使用 tracing 不稳定特性 时,使用 const_new 创建的 Semaphore 将不会被检测。因此,它将不会出现在 tokio-console 中。如果需要,应改用 Semaphore::new 创建一个已检测的对象。
§示例
use tokio::sync::Semaphore;
static SEM: Semaphore = Semaphore::const_new(10);源代码pub fn available_permits(&self) -> usize
pub fn available_permits(&self) -> usize
返回当前可用的许可数。
源代码pub fn add_permits(&self, n: usize)
pub fn add_permits(&self, n: usize)
向信号量添加 n 个新许可。
最大许可数为 Semaphore::MAX_PERMITS,如果超出此限制,此函数将 panic。
源代码pub fn forget_permits(&self, n: usize) -> usize
pub fn forget_permits(&self, n: usize) -> usize
将信号量的许可最多减少 n 个。
如果许可不足且无法减少 n 个,则返回实际减少的许可数。
源代码pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError>
pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError>
从信号量获取一个许可。
如果信号量已关闭,则返回 AcquireError。否则,返回一个 SemaphorePermit,表示已获取的许可。
§取消安全性
此方法使用一个队列来按请求顺序公平地分发许可。取消对 acquire 的调用会使你失去在队列中的位置。
§示例
use tokio::sync::Semaphore;
let semaphore = Semaphore::new(2);
let permit_1 = semaphore.acquire().await.unwrap();
assert_eq!(semaphore.available_permits(), 1);
let permit_2 = semaphore.acquire().await.unwrap();
assert_eq!(semaphore.available_permits(), 0);
drop(permit_1);
assert_eq!(semaphore.available_permits(), 1);源代码pub async fn acquire_many(
&self,
n: u32,
) -> Result<SemaphorePermit<'_>, AcquireError>
pub async fn acquire_many( &self, n: u32, ) -> Result<SemaphorePermit<'_>, AcquireError>
从信号量获取 n 个许可。
如果信号量已关闭,则返回 AcquireError。否则,返回一个 SemaphorePermit,表示已获取的许可。
§取消安全性
此方法使用一个队列来按请求顺序公平地分发许可。取消对 acquire_many 的调用会使你失去在队列中的位置。
§示例
use tokio::sync::Semaphore;
let semaphore = Semaphore::new(5);
let permit = semaphore.acquire_many(3).await.unwrap();
assert_eq!(semaphore.available_permits(), 2);源代码pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError>
pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError>
尝试从信号量获取一个许可。
如果信号量已关闭,则返回 TryAcquireError::Closed;如果没有剩余许可,则返回 TryAcquireError::NoPermits。否则,返回一个 SemaphorePermit,表示已获取的许可。
§示例
use tokio::sync::{Semaphore, TryAcquireError};
let semaphore = Semaphore::new(2);
let permit_1 = semaphore.try_acquire().unwrap();
assert_eq!(semaphore.available_permits(), 1);
let permit_2 = semaphore.try_acquire().unwrap();
assert_eq!(semaphore.available_permits(), 0);
let permit_3 = semaphore.try_acquire();
assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));源代码pub fn try_acquire_many(
&self,
n: u32,
) -> Result<SemaphorePermit<'_>, TryAcquireError>
pub fn try_acquire_many( &self, n: u32, ) -> Result<SemaphorePermit<'_>, TryAcquireError>
尝试从信号量获取 n 个许可。
如果信号量已关闭,则返回 TryAcquireError::Closed;如果没有足够的剩余许可,则返回 TryAcquireError::NoPermits。否则,返回一个 SemaphorePermit,表示已获取的许可。
§示例
use tokio::sync::{Semaphore, TryAcquireError};
let semaphore = Semaphore::new(4);
let permit_1 = semaphore.try_acquire_many(3).unwrap();
assert_eq!(semaphore.available_permits(), 1);
let permit_2 = semaphore.try_acquire_many(2);
assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));源代码pub async fn acquire_owned(
self: Arc<Self>,
) -> Result<OwnedSemaphorePermit, AcquireError>
pub async fn acquire_owned( self: Arc<Self>, ) -> Result<OwnedSemaphorePermit, AcquireError>
从信号量获取一个许可。
调用此方法时,信号量必须包装在 Arc 中。如果信号量已关闭,则返回 AcquireError。否则返回表示已获取许可的 OwnedSemaphorePermit。
§取消安全性
此方法使用一个队列来按请求顺序公平地分发许可。取消对 acquire_owned 的调用会使你失去在队列中的位置。
§示例
use std::sync::Arc;
use tokio::sync::Semaphore;
let semaphore = Arc::new(Semaphore::new(3));
let mut join_handles = Vec::new();
for _ in 0..5 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_handles.push(tokio::spawn(async move {
// perform task...
// explicitly own `permit` in the task
drop(permit);
}));
}
for handle in join_handles {
handle.await.unwrap();
}源代码pub async fn acquire_many_owned(
self: Arc<Self>,
n: u32,
) -> Result<OwnedSemaphorePermit, AcquireError>
pub async fn acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, AcquireError>
从信号量获取 n 个许可。
调用此方法时,信号量必须包装在 Arc 中。如果信号量已关闭,则返回 AcquireError。否则返回表示已获取许可的 OwnedSemaphorePermit。
§取消安全性
此方法使用一个队列来按请求顺序公平地分发许可。取消对 acquire_many_owned 的调用会使你失去在队列中的位置。
§示例
use std::sync::Arc;
use tokio::sync::Semaphore;
let semaphore = Arc::new(Semaphore::new(10));
let mut join_handles = Vec::new();
for _ in 0..5 {
let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
join_handles.push(tokio::spawn(async move {
// perform task...
// explicitly own `permit` in the task
drop(permit);
}));
}
for handle in join_handles {
handle.await.unwrap();
}源代码pub fn try_acquire_owned(
self: Arc<Self>,
) -> Result<OwnedSemaphorePermit, TryAcquireError>
pub fn try_acquire_owned( self: Arc<Self>, ) -> Result<OwnedSemaphorePermit, TryAcquireError>
尝试从信号量获取一个许可。
调用此方法时,信号量必须包装在 Arc 中。如果信号量已关闭,则返回 TryAcquireError::Closed;如果没有任何许可,则返回 TryAcquireError::NoPermits。否则返回表示已获取许可的 OwnedSemaphorePermit。
§示例
use std::sync::Arc;
use tokio::sync::{Semaphore, TryAcquireError};
let semaphore = Arc::new(Semaphore::new(2));
let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
assert_eq!(semaphore.available_permits(), 1);
let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
assert_eq!(semaphore.available_permits(), 0);
let permit_3 = semaphore.try_acquire_owned();
assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));源代码pub fn try_acquire_many_owned(
self: Arc<Self>,
n: u32,
) -> Result<OwnedSemaphorePermit, TryAcquireError>
pub fn try_acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, TryAcquireError>
尝试从信号量获取 n 个许可。
调用此方法时,信号量必须包装在 Arc 中。如果信号量已关闭,则返回 TryAcquireError::Closed;如果没有任何许可,则返回 TryAcquireError::NoPermits。否则返回表示已获取许可的 OwnedSemaphorePermit。
§示例
use std::sync::Arc;
use tokio::sync::{Semaphore, TryAcquireError};
let semaphore = Arc::new(Semaphore::new(4));
let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
assert_eq!(semaphore.available_permits(), 1);
let permit_2 = semaphore.try_acquire_many_owned(2);
assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));源代码pub fn close(&self)
pub fn close(&self)
关闭信号量。
这会阻止信号量发放新的许可,并通知所有挂起的等待者。
§示例
use tokio::sync::Semaphore;
use std::sync::Arc;
use tokio::sync::TryAcquireError;
let semaphore = Arc::new(Semaphore::new(1));
let semaphore2 = semaphore.clone();
tokio::spawn(async move {
let permit = semaphore.acquire_many(2).await;
assert!(permit.is_err());
println!("waiter received error");
});
println!("closing semaphore");
semaphore2.close();
// Cannot obtain more permits
assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))