跳到主要内容

Semaphore

搜索

结构体 Semaphore 

源代码
pub struct Semaphore { /* 私有字段 */ }
展开描述

执行异步许可获取的计数信号量。

信号量维护一组许可。许可用于同步对共享资源的访问。信号量与互斥锁的区别在于,它允许一个以上的并发调用者同时访问共享资源。

当调用 acquire 并且信号量有剩余许可时,函数会立即返回一个许可。但是,如果没有剩余许可可用,acquire 会(异步地)等待,直到某个未完成的许可被 drop。此时,释放的许可将分配给调用者。

Semaphore 是公平的,这意味着许可按请求顺序发放。这种公平性也适用于涉及 acquire_many 的情况,因此如果队列前端的 acquire_many 调用请求的许可数超过当前可用的数量,即使信号量有足够的许可完成该 acquire 调用,也会阻止其完成。

要在 poll 函数中使用 Semaphore,可以使用 PollSemaphore 工具。

§示例

基本用法:

use tokio::sync::{Semaphore, TryAcquireError};

let semaphore = Semaphore::new(3);

let a_permit = semaphore.acquire().await.unwrap();
let two_permits = semaphore.acquire_many(2).await.unwrap();

assert_eq!(semaphore.available_permits(), 0);

let permit_attempt = semaphore.try_acquire();
assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));

§Limit the number of simultaneously opened files in your program

大多数操作系统对打开的文件句柄数量有限制。即使在没有显式限制的系统中,资源约束也会隐式设置打开文件数量的上限。如果你的程序尝试打开大量文件并超出此限制,则会导致错误。

此示例使用具有 100 个许可的 Semaphore。通过在访问文件之前从 Semaphore 获取许可,你可以确保程序一次最多打开 100 个文件。当尝试打开第 101 个文件时,程序将等待直到有许可可用,然后再继续打开另一个文件。

use std::io::Result;
use tokio::fs::File;
use tokio::sync::Semaphore;
use tokio::io::AsyncWriteExt;

static PERMITS: Semaphore = Semaphore::const_new(100);

async fn write_to_file(message: &[u8]) -> Result<()> {
    let _permit = PERMITS.acquire().await.unwrap();
    let mut buffer = File::create("example.txt").await?;
    buffer.write_all(message).await?;
    Ok(()) // Permit goes out of scope here, and is available again for acquisition
}

§Limit the number of outgoing requests being sent at the same time

在某些情况下,可能需要限制并行发送的传出请求的数量。这可能是由于所使用 API 的限制或应用程序所运行系统的网络资源限制。

此示例使用具有 10 个许可的 Arc<Semaphore>。通过克隆 Arc<Semaphore>,每个生成的任务都会获得对信号量的引用。在任务发送请求之前,它必须通过调用 Semaphore::acquire 从信号量获取许可。这确保了在任何给定时间最多并行发送 10 个请求。任务发送请求后,它会 drop 该许可以允许其他任务发送请求。

use std::sync::Arc;
use tokio::sync::Semaphore;

// Define maximum number of parallel requests.
let semaphore = Arc::new(Semaphore::new(5));
// Spawn many tasks that will send requests.
let mut jhs = Vec::new();
for task_id in 0..50 {
    let semaphore = semaphore.clone();
    let jh = tokio::spawn(async move {
        // Acquire permit before sending request.
        let _permit = semaphore.acquire().await.unwrap();
        // Send the request.
        let response = send_request(task_id).await;
        // Drop the permit after the request has been sent.
        drop(_permit);
        // Handle response.
        // ...

        response
    });
    jhs.push(jh);
}
// Collect responses from tasks.
let mut responses = Vec::new();
for jh in jhs {
    let response = jh.await.unwrap();
    responses.push(response);
}
// Process responses.
// ...

§Limit the number of incoming requests being handled at the same time

与限制同时打开的文件数量类似,网络句柄也是一种有限的资源。允许无限制地处理请求可能会导致拒绝服务,以及许多其他问题。

此示例使用 Arc<Semaphore> 而不是全局变量。为了限制可以同时处理的请求数,我们在生成每个任务之前为其获取一个许可。一旦获取,就会生成一个新任务;任务完成后,许可会在任务内部被 drop,以允许其他任务生成。许可必须通过 Semaphore::acquire_owned 获取,才能跨任务边界移动。(因为我们的信号量不是全局变量——如果是的话,那么 acquire 就足够了。)

use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let semaphore = Arc::new(Semaphore::new(3));
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        // Acquire permit before accepting the next socket.
        //
        // We use `acquire_owned` so that we can move `permit` into
        // other tasks.
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            // Do work using the socket.
            handle_connection(&mut socket).await;
            // Drop socket while the permit is still live.
            drop(socket);
            // Drop the permit, so more tasks can be created.
            drop(permit);
        });
    }
}

§Prevent tests from running in parallel

默认情况下,Rust 在同一文件中并行运行测试。但是,在某些情况下,并行运行两个测试可能会导致问题。例如,当测试使用同一数据库时,可能会发生这种情况。

考虑以下场景:

  1. test_insert: Inserts a key-value pair into the database, then retrieves the value using the same key to verify the insertion.
  2. test_update: Inserts a key, then updates the key to a new value and verifies that the value has been accurately updated.
  3. test_others: A third test that doesn’t modify the database state. It can run in parallel with the other tests.

在此示例中,test_inserttest_update 需要按顺序运行才能工作,但哪个测试先运行并不重要。我们可以利用具有单个许可的信号量来解决此挑战。

use tokio::sync::Semaphore;

// Initialize a static semaphore with only one permit, which is used to
// prevent test_insert and test_update from running in parallel.
static PERMIT: Semaphore = Semaphore::const_new(1);

// Initialize the database that will be used by the subsequent tests.
static DB: Database = Database::setup();

#[tokio::test]
async fn test_insert() {
    // Acquire permit before proceeding. Since the semaphore has only one permit,
    // the test will wait if the permit is already acquired by other tests.
    let permit = PERMIT.acquire().await.unwrap();

    // Do the actual test stuff with database

    // Insert a key-value pair to database
    let (key, value) = ("name", 0);
    DB.insert(key, value).await;

    // Verify that the value has been inserted correctly.
    assert_eq!(DB.get(key).await, value);

    // Undo the insertion, so the database is empty at the end of the test.
    DB.delete(key).await;

    // Drop permit. This allows the other test to start running.
    drop(permit);
}

#[tokio::test]
async fn test_update() {
    // Acquire permit before proceeding. Since the semaphore has only one permit,
    // the test will wait if the permit is already acquired by other tests.
    let permit = PERMIT.acquire().await.unwrap();

    // Do the same insert.
    let (key, value) = ("name", 0);
    DB.insert(key, value).await;

    // Update the existing value with a new one.
    let new_value = 1;
    DB.update(key, new_value).await;

    // Verify that the value has been updated correctly.
    assert_eq!(DB.get(key).await, new_value);

    // Undo any modificattion.
    DB.delete(key).await;

    // Drop permit. This allows the other test to start running.
    drop(permit);
}

#[tokio::test]
async fn test_others() {
    // This test can run in parallel with test_insert and test_update,
    // so it does not use PERMIT.
}

§Rate limiting using a token bucket

此示例展示了 add_permitsSemaphorePermit::forget 方法。

许多应用程序和系统对某些操作的执行速率有限制。超过此速率可能导致性能不佳甚至出错。

此示例使用令牌桶实现速率限制。令牌桶是一种速率限制形式,它不会立即生效,以允许同时到达的请求出现短时突发。

使用令牌桶时,每个传入的请求都会消耗一个令牌,并且令牌以定义速率限制的特定速率进行补充。当请求突发到达时,令牌会立即发放,直到桶空为止。一旦桶为空,请求将不得不等待新令牌的添加。

与限制可以同时处理的请求数量的示例不同,我们在处理完一个请求后不会将令牌添加回去。相反,令牌仅由计时器任务添加。

请注意,当持续时间很短时,此实现并非最优,因为它会不断循环和休眠,从而消耗大量 CPU。

use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{interval, Duration};

struct TokenBucket {
    sem: Arc<Semaphore>,
    jh: tokio::task::JoinHandle<()>,
}

impl TokenBucket {
    fn new(duration: Duration, capacity: usize) -> Self {
        let sem = Arc::new(Semaphore::new(capacity));

        // refills the tokens at the end of each interval
        let jh = tokio::spawn({
            let sem = sem.clone();
            let mut interval = interval(duration);
            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

            async move {
                loop {
                    interval.tick().await;

                    if sem.available_permits() < capacity {
                        sem.add_permits(1);
                    }
                }
            }
        });

        Self { jh, sem }
    }

    async fn acquire(&self) {
        // This can return an error if the semaphore is closed, but we
        // never close it, so this error can never happen.
        let permit = self.sem.acquire().await.unwrap();
        // To avoid releasing the permit back to the semaphore, we use
        // the `SemaphorePermit::forget` method.
        permit.forget();
    }
}

impl Drop for TokenBucket {
    fn drop(&mut self) {
        // Kill the background task so it stops taking up resources when we
        // don't need it anymore.
        self.jh.abort();
    }
}

let capacity = 5;
let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
let bucket = TokenBucket::new(update_interval, capacity);

for _ in 0..5 {
    bucket.acquire().await;

    // do the operation
}

实现§

源代码§

impl Semaphore

源代码

pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS

信号量可以持有的最大许可数。它是 usize::MAX >> 3

超出此限制通常会导致 panic。

源代码

pub fn new(permits: usize) -> Self

使用初始许可数创建一个新的信号量。

如果 permits 超过 Semaphore::MAX_PERMITS,则会发生 panic。

源代码

pub const fn const_new(permits: usize) -> Self

使用初始许可数创建一个新的信号量。

当使用 tracing 不稳定特性 时,使用 const_new 创建的 Semaphore 将不会被检测。因此,它将不会出现在 tokio-console 中。如果需要,应改用 Semaphore::new 创建一个已检测的对象。

§示例
use tokio::sync::Semaphore;

static SEM: Semaphore = Semaphore::const_new(10);
源代码

pub fn available_permits(&self) -> usize

返回当前可用的许可数。

源代码

pub fn add_permits(&self, n: usize)

向信号量添加 n 个新许可。

最大许可数为 Semaphore::MAX_PERMITS,如果超出此限制,此函数将 panic。

源代码

pub fn forget_permits(&self, n: usize) -> usize

将信号量的许可最多减少 n 个。

如果许可不足且无法减少 n 个,则返回实际减少的许可数。

源代码

pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError>

从信号量获取一个许可。

如果信号量已关闭,则返回 AcquireError。否则,返回一个 SemaphorePermit,表示已获取的许可。

§取消安全性

此方法使用一个队列来按请求顺序公平地分发许可。取消对 acquire 的调用会使你失去在队列中的位置。

§示例
use tokio::sync::Semaphore;

let semaphore = Semaphore::new(2);

let permit_1 = semaphore.acquire().await.unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = semaphore.acquire().await.unwrap();
assert_eq!(semaphore.available_permits(), 0);

drop(permit_1);
assert_eq!(semaphore.available_permits(), 1);
源代码

pub async fn acquire_many( &self, n: u32, ) -> Result<SemaphorePermit<'_>, AcquireError>

从信号量获取 n 个许可。

如果信号量已关闭,则返回 AcquireError。否则,返回一个 SemaphorePermit,表示已获取的许可。

§取消安全性

此方法使用一个队列来按请求顺序公平地分发许可。取消对 acquire_many 的调用会使你失去在队列中的位置。

§示例
use tokio::sync::Semaphore;

let semaphore = Semaphore::new(5);

let permit = semaphore.acquire_many(3).await.unwrap();
assert_eq!(semaphore.available_permits(), 2);
源代码

pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError>

尝试从信号量获取一个许可。

如果信号量已关闭,则返回 TryAcquireError::Closed;如果没有剩余许可,则返回 TryAcquireError::NoPermits。否则,返回一个 SemaphorePermit,表示已获取的许可。

§示例
use tokio::sync::{Semaphore, TryAcquireError};

let semaphore = Semaphore::new(2);

let permit_1 = semaphore.try_acquire().unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = semaphore.try_acquire().unwrap();
assert_eq!(semaphore.available_permits(), 0);

let permit_3 = semaphore.try_acquire();
assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
源代码

pub fn try_acquire_many( &self, n: u32, ) -> Result<SemaphorePermit<'_>, TryAcquireError>

尝试从信号量获取 n 个许可。

如果信号量已关闭,则返回 TryAcquireError::Closed;如果没有足够的剩余许可,则返回 TryAcquireError::NoPermits。否则,返回一个 SemaphorePermit,表示已获取的许可。

§示例
use tokio::sync::{Semaphore, TryAcquireError};

let semaphore = Semaphore::new(4);

let permit_1 = semaphore.try_acquire_many(3).unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = semaphore.try_acquire_many(2);
assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
源代码

pub async fn acquire_owned( self: Arc<Self>, ) -> Result<OwnedSemaphorePermit, AcquireError>

从信号量获取一个许可。

调用此方法时,信号量必须包装在 Arc 中。如果信号量已关闭,则返回 AcquireError。否则返回表示已获取许可的 OwnedSemaphorePermit

§取消安全性

此方法使用一个队列来按请求顺序公平地分发许可。取消对 acquire_owned 的调用会使你失去在队列中的位置。

§示例
use std::sync::Arc;
use tokio::sync::Semaphore;

let semaphore = Arc::new(Semaphore::new(3));
let mut join_handles = Vec::new();

for _ in 0..5 {
    let permit = semaphore.clone().acquire_owned().await.unwrap();
    join_handles.push(tokio::spawn(async move {
        // perform task...
        // explicitly own `permit` in the task
        drop(permit);
    }));
}

for handle in join_handles {
    handle.await.unwrap();
}
源代码

pub async fn acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, AcquireError>

从信号量获取 n 个许可。

调用此方法时,信号量必须包装在 Arc 中。如果信号量已关闭,则返回 AcquireError。否则返回表示已获取许可的 OwnedSemaphorePermit

§取消安全性

此方法使用一个队列来按请求顺序公平地分发许可。取消对 acquire_many_owned 的调用会使你失去在队列中的位置。

§示例
use std::sync::Arc;
use tokio::sync::Semaphore;

let semaphore = Arc::new(Semaphore::new(10));
let mut join_handles = Vec::new();

for _ in 0..5 {
    let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
    join_handles.push(tokio::spawn(async move {
        // perform task...
        // explicitly own `permit` in the task
        drop(permit);
    }));
}

for handle in join_handles {
    handle.await.unwrap();
}
源代码

pub fn try_acquire_owned( self: Arc<Self>, ) -> Result<OwnedSemaphorePermit, TryAcquireError>

尝试从信号量获取一个许可。

调用此方法时,信号量必须包装在 Arc 中。如果信号量已关闭,则返回 TryAcquireError::Closed;如果没有任何许可,则返回 TryAcquireError::NoPermits。否则返回表示已获取许可的 OwnedSemaphorePermit

§示例
use std::sync::Arc;
use tokio::sync::{Semaphore, TryAcquireError};

let semaphore = Arc::new(Semaphore::new(2));

let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
assert_eq!(semaphore.available_permits(), 0);

let permit_3 = semaphore.try_acquire_owned();
assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
源代码

pub fn try_acquire_many_owned( self: Arc<Self>, n: u32, ) -> Result<OwnedSemaphorePermit, TryAcquireError>

尝试从信号量获取 n 个许可。

调用此方法时,信号量必须包装在 Arc 中。如果信号量已关闭,则返回 TryAcquireError::Closed;如果没有任何许可,则返回 TryAcquireError::NoPermits。否则返回表示已获取许可的 OwnedSemaphorePermit

§示例
use std::sync::Arc;
use tokio::sync::{Semaphore, TryAcquireError};

let semaphore = Arc::new(Semaphore::new(4));

let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
assert_eq!(semaphore.available_permits(), 1);

let permit_2 = semaphore.try_acquire_many_owned(2);
assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
源代码

pub fn close(&self)

关闭信号量。

这会阻止信号量发放新的许可,并通知所有挂起的等待者。

§示例
use tokio::sync::Semaphore;
use std::sync::Arc;
use tokio::sync::TryAcquireError;

let semaphore = Arc::new(Semaphore::new(1));
let semaphore2 = semaphore.clone();

tokio::spawn(async move {
    let permit = semaphore.acquire_many(2).await;
    assert!(permit.is_err());
    println!("waiter received error");
});

println!("closing semaphore");
semaphore2.close();

// Cannot obtain more permits
assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
源代码

pub fn is_closed(&self) -> bool

如果信号量已关闭,则返回 true

trait 实现§

源代码§

impl Debug for Semaphore

源代码§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

使用给定的格式化器格式化此值。 更多信息

自动 trait 实现§

blanket 实现§

源代码§

impl<T> Any for T
where T: 'static + ?Sized,

源代码§

fn type_id(&self) -> TypeId

Gets the TypeId of self. 更多信息
源代码§

impl<T> Borrow<T> for T
where T: ?Sized,

源代码§

fn borrow(&self) -> &T

Immutably borrows from an owned value. 更多信息
源代码§

impl<T> BorrowMut<T> for T
where T: ?Sized,

源代码§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. 更多信息
源代码§

impl<T> From<T> for T

源代码§

fn from(t: T) -> T

原样返回参数。

源代码§

impl<T, U> Into<U> for T
where U: From<T>,

源代码§

fn into(self) -> U

调用 U::from(self)

也就是说,此转换是 From<T> for U 实现选择执行的操作。

源代码§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

源代码§

type Error = Infallible

转换出错时返回的类型。
源代码§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

执行转换。
源代码§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

源代码§

type Error = <U as TryFrom<T>>::Error

转换出错时返回的类型。
源代码§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

执行转换。