mas_tasks/
lib.rs

1// Copyright 2025, 2026 Element Creations Ltd.
2// Copyright 2024, 2025 New Vector Ltd.
3// Copyright 2021-2024 The Matrix.org Foundation C.I.C.
4//
5// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
6// Please see LICENSE files in the repository root for full details.
7
8use std::sync::{Arc, LazyLock};
9
10use mas_data_model::{Clock, SiteConfig};
11use mas_email::Mailer;
12use mas_matrix::HomeserverConnection;
13use mas_router::UrlBuilder;
14use mas_storage::{BoxRepository, RepositoryError, RepositoryFactory};
15use mas_storage_pg::PgRepositoryFactory;
16use new_queue::QueueRunnerError;
17use opentelemetry::metrics::Meter;
18use rand::SeedableRng;
19use sqlx::{Pool, Postgres};
20use tokio_util::{sync::CancellationToken, task::TaskTracker};
21
22pub use crate::new_queue::QueueWorker;
23
24mod database;
25mod email;
26mod matrix;
27mod new_queue;
28mod recovery;
29mod sessions;
30mod user;
31
32static METER: LazyLock<Meter> = LazyLock::new(|| {
33    let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
34        .with_version(env!("CARGO_PKG_VERSION"))
35        .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
36        .build();
37
38    opentelemetry::global::meter_with_scope(scope)
39});
40
41#[derive(Clone)]
42struct State {
43    repository_factory: PgRepositoryFactory,
44    mailer: Mailer,
45    clock: Arc<dyn Clock>,
46    homeserver: Arc<dyn HomeserverConnection>,
47    url_builder: UrlBuilder,
48    site_config: SiteConfig,
49}
50
51impl State {
52    pub fn new(
53        repository_factory: PgRepositoryFactory,
54        clock: impl Clock + 'static,
55        mailer: Mailer,
56        homeserver: impl HomeserverConnection + 'static,
57        url_builder: UrlBuilder,
58        site_config: SiteConfig,
59    ) -> Self {
60        Self {
61            repository_factory,
62            mailer,
63            clock: Arc::new(clock),
64            homeserver: Arc::new(homeserver),
65            url_builder,
66            site_config,
67        }
68    }
69
70    pub fn pool(&self) -> Pool<Postgres> {
71        self.repository_factory.pool()
72    }
73
74    pub fn clock(&self) -> &dyn Clock {
75        &self.clock
76    }
77
78    pub fn mailer(&self) -> &Mailer {
79        &self.mailer
80    }
81
82    // This is fine for now, we may move that to a trait at some point.
83    #[allow(clippy::unused_self, clippy::disallowed_methods)]
84    pub fn rng(&self) -> rand_chacha::ChaChaRng {
85        rand_chacha::ChaChaRng::from_rng(rand::thread_rng()).expect("failed to seed rng")
86    }
87
88    pub async fn repository(&self) -> Result<BoxRepository, RepositoryError> {
89        self.repository_factory.create().await
90    }
91
92    pub fn matrix_connection(&self) -> &dyn HomeserverConnection {
93        self.homeserver.as_ref()
94    }
95
96    pub fn url_builder(&self) -> &UrlBuilder {
97        &self.url_builder
98    }
99
100    pub fn site_config(&self) -> &SiteConfig {
101        &self.site_config
102    }
103}
104
105/// Initialise the worker, without running it.
106///
107/// This is mostly useful for tests.
108///
109/// # Errors
110///
111/// This function can fail if the database connection fails.
112pub async fn init(
113    repository_factory: PgRepositoryFactory,
114    clock: impl Clock + 'static,
115    mailer: &Mailer,
116    homeserver: impl HomeserverConnection + 'static,
117    url_builder: UrlBuilder,
118    site_config: &SiteConfig,
119    cancellation_token: CancellationToken,
120) -> Result<QueueWorker, QueueRunnerError> {
121    let state = State::new(
122        repository_factory,
123        clock,
124        mailer.clone(),
125        homeserver,
126        url_builder,
127        site_config.clone(),
128    );
129    let mut worker = QueueWorker::new(state, cancellation_token).await?;
130
131    worker
132        .register_handler::<mas_storage::queue::CleanupRevokedOAuthAccessTokensJob>()
133        .register_handler::<mas_storage::queue::CleanupExpiredOAuthAccessTokensJob>()
134        .register_handler::<mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob>()
135        .register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
136        .register_handler::<mas_storage::queue::DeactivateUserJob>()
137        .register_handler::<mas_storage::queue::DeleteDeviceJob>()
138        .register_handler::<mas_storage::queue::ProvisionDeviceJob>()
139        .register_handler::<mas_storage::queue::ProvisionUserJob>()
140        .register_handler::<mas_storage::queue::ReactivateUserJob>()
141        .register_handler::<mas_storage::queue::SendAccountRecoveryEmailsJob>()
142        .register_handler::<mas_storage::queue::SendEmailAuthenticationCodeJob>()
143        .register_handler::<mas_storage::queue::SyncDevicesJob>()
144        .register_handler::<mas_storage::queue::VerifyEmailJob>()
145        .register_handler::<mas_storage::queue::ExpireInactiveSessionsJob>()
146        .register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
147        .register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
148        .register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
149        .register_handler::<mas_storage::queue::PruneStalePolicyDataJob>()
150        .register_deprecated_queue("cleanup-expired-tokens")
151        .add_schedule(
152            "cleanup-revoked-oauth-access-tokens",
153            // Run this job every hour
154            "0 0 * * * *".parse()?,
155            mas_storage::queue::CleanupRevokedOAuthAccessTokensJob,
156        )
157        .add_schedule(
158            "cleanup-revoked-oauth-refresh-tokens",
159            // Run this job every hour
160            "0 10 * * * *".parse()?,
161            mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob,
162        )
163        .add_schedule(
164            "cleanup-consumed-oauth-refresh-tokens",
165            // Run this job every hour
166            "0 20 * * * *".parse()?,
167            mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob,
168        )
169        .add_schedule(
170            "cleanup-expired-oauth-access-tokens",
171            // Run this job every 4 hours
172            "0 5 */4 * * *".parse()?,
173            mas_storage::queue::CleanupExpiredOAuthAccessTokensJob,
174        )
175        .add_schedule(
176            "expire-inactive-sessions",
177            // Run this job every 15 minutes
178            "30 */15 * * * *".parse()?,
179            mas_storage::queue::ExpireInactiveSessionsJob,
180        )
181        .add_schedule(
182            "prune-stale-policy-data",
183            // Run once a day
184            "0 0 2 * * *".parse()?,
185            mas_storage::queue::PruneStalePolicyDataJob,
186        );
187
188    Ok(worker)
189}
190
191/// Initialise the worker and run it.
192///
193/// # Errors
194///
195/// This function can fail if the database connection fails.
196#[expect(clippy::too_many_arguments, reason = "this is fine")]
197pub async fn init_and_run(
198    repository_factory: PgRepositoryFactory,
199    clock: impl Clock + 'static,
200    mailer: &Mailer,
201    homeserver: impl HomeserverConnection + 'static,
202    url_builder: UrlBuilder,
203    site_config: &SiteConfig,
204    cancellation_token: CancellationToken,
205    task_tracker: &TaskTracker,
206) -> Result<(), QueueRunnerError> {
207    let worker = init(
208        repository_factory,
209        clock,
210        mailer,
211        homeserver,
212        url_builder,
213        site_config,
214        cancellation_token,
215    )
216    .await?;
217
218    task_tracker.spawn(worker.run());
219
220    Ok(())
221}