1use std::sync::{Arc, LazyLock};
9
10use mas_data_model::{Clock, SiteConfig};
11use mas_email::Mailer;
12use mas_matrix::HomeserverConnection;
13use mas_router::UrlBuilder;
14use mas_storage::{BoxRepository, RepositoryError, RepositoryFactory};
15use mas_storage_pg::PgRepositoryFactory;
16use new_queue::QueueRunnerError;
17use opentelemetry::metrics::Meter;
18use rand::SeedableRng;
19use sqlx::{Pool, Postgres};
20use tokio_util::{sync::CancellationToken, task::TaskTracker};
21
22pub use crate::new_queue::QueueWorker;
23
24mod database;
25mod email;
26mod matrix;
27mod new_queue;
28mod recovery;
29mod sessions;
30mod user;
31
32static METER: LazyLock<Meter> = LazyLock::new(|| {
33 let scope = opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
34 .with_version(env!("CARGO_PKG_VERSION"))
35 .with_schema_url(opentelemetry_semantic_conventions::SCHEMA_URL)
36 .build();
37
38 opentelemetry::global::meter_with_scope(scope)
39});
40
41#[derive(Clone)]
42struct State {
43 repository_factory: PgRepositoryFactory,
44 mailer: Mailer,
45 clock: Arc<dyn Clock>,
46 homeserver: Arc<dyn HomeserverConnection>,
47 url_builder: UrlBuilder,
48 site_config: SiteConfig,
49}
50
51impl State {
52 pub fn new(
53 repository_factory: PgRepositoryFactory,
54 clock: impl Clock + 'static,
55 mailer: Mailer,
56 homeserver: impl HomeserverConnection + 'static,
57 url_builder: UrlBuilder,
58 site_config: SiteConfig,
59 ) -> Self {
60 Self {
61 repository_factory,
62 mailer,
63 clock: Arc::new(clock),
64 homeserver: Arc::new(homeserver),
65 url_builder,
66 site_config,
67 }
68 }
69
70 pub fn pool(&self) -> Pool<Postgres> {
71 self.repository_factory.pool()
72 }
73
74 pub fn clock(&self) -> &dyn Clock {
75 &self.clock
76 }
77
78 pub fn mailer(&self) -> &Mailer {
79 &self.mailer
80 }
81
82 #[allow(clippy::unused_self, clippy::disallowed_methods)]
84 pub fn rng(&self) -> rand_chacha::ChaChaRng {
85 rand_chacha::ChaChaRng::from_rng(rand::thread_rng()).expect("failed to seed rng")
86 }
87
88 pub async fn repository(&self) -> Result<BoxRepository, RepositoryError> {
89 self.repository_factory.create().await
90 }
91
92 pub fn matrix_connection(&self) -> &dyn HomeserverConnection {
93 self.homeserver.as_ref()
94 }
95
96 pub fn url_builder(&self) -> &UrlBuilder {
97 &self.url_builder
98 }
99
100 pub fn site_config(&self) -> &SiteConfig {
101 &self.site_config
102 }
103}
104
105pub async fn init(
113 repository_factory: PgRepositoryFactory,
114 clock: impl Clock + 'static,
115 mailer: &Mailer,
116 homeserver: impl HomeserverConnection + 'static,
117 url_builder: UrlBuilder,
118 site_config: &SiteConfig,
119 cancellation_token: CancellationToken,
120) -> Result<QueueWorker, QueueRunnerError> {
121 let state = State::new(
122 repository_factory,
123 clock,
124 mailer.clone(),
125 homeserver,
126 url_builder,
127 site_config.clone(),
128 );
129 let mut worker = QueueWorker::new(state, cancellation_token).await?;
130
131 worker
132 .register_handler::<mas_storage::queue::CleanupRevokedOAuthAccessTokensJob>()
133 .register_handler::<mas_storage::queue::CleanupExpiredOAuthAccessTokensJob>()
134 .register_handler::<mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob>()
135 .register_handler::<mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob>()
136 .register_handler::<mas_storage::queue::DeactivateUserJob>()
137 .register_handler::<mas_storage::queue::DeleteDeviceJob>()
138 .register_handler::<mas_storage::queue::ProvisionDeviceJob>()
139 .register_handler::<mas_storage::queue::ProvisionUserJob>()
140 .register_handler::<mas_storage::queue::ReactivateUserJob>()
141 .register_handler::<mas_storage::queue::SendAccountRecoveryEmailsJob>()
142 .register_handler::<mas_storage::queue::SendEmailAuthenticationCodeJob>()
143 .register_handler::<mas_storage::queue::SyncDevicesJob>()
144 .register_handler::<mas_storage::queue::VerifyEmailJob>()
145 .register_handler::<mas_storage::queue::ExpireInactiveSessionsJob>()
146 .register_handler::<mas_storage::queue::ExpireInactiveCompatSessionsJob>()
147 .register_handler::<mas_storage::queue::ExpireInactiveOAuthSessionsJob>()
148 .register_handler::<mas_storage::queue::ExpireInactiveUserSessionsJob>()
149 .register_handler::<mas_storage::queue::PruneStalePolicyDataJob>()
150 .register_deprecated_queue("cleanup-expired-tokens")
151 .add_schedule(
152 "cleanup-revoked-oauth-access-tokens",
153 "0 0 * * * *".parse()?,
155 mas_storage::queue::CleanupRevokedOAuthAccessTokensJob,
156 )
157 .add_schedule(
158 "cleanup-revoked-oauth-refresh-tokens",
159 "0 10 * * * *".parse()?,
161 mas_storage::queue::CleanupRevokedOAuthRefreshTokensJob,
162 )
163 .add_schedule(
164 "cleanup-consumed-oauth-refresh-tokens",
165 "0 20 * * * *".parse()?,
167 mas_storage::queue::CleanupConsumedOAuthRefreshTokensJob,
168 )
169 .add_schedule(
170 "cleanup-expired-oauth-access-tokens",
171 "0 5 */4 * * *".parse()?,
173 mas_storage::queue::CleanupExpiredOAuthAccessTokensJob,
174 )
175 .add_schedule(
176 "expire-inactive-sessions",
177 "30 */15 * * * *".parse()?,
179 mas_storage::queue::ExpireInactiveSessionsJob,
180 )
181 .add_schedule(
182 "prune-stale-policy-data",
183 "0 0 2 * * *".parse()?,
185 mas_storage::queue::PruneStalePolicyDataJob,
186 );
187
188 Ok(worker)
189}
190
191#[expect(clippy::too_many_arguments, reason = "this is fine")]
197pub async fn init_and_run(
198 repository_factory: PgRepositoryFactory,
199 clock: impl Clock + 'static,
200 mailer: &Mailer,
201 homeserver: impl HomeserverConnection + 'static,
202 url_builder: UrlBuilder,
203 site_config: &SiteConfig,
204 cancellation_token: CancellationToken,
205 task_tracker: &TaskTracker,
206) -> Result<(), QueueRunnerError> {
207 let worker = init(
208 repository_factory,
209 clock,
210 mailer,
211 homeserver,
212 url_builder,
213 site_config,
214 cancellation_token,
215 )
216 .await?;
217
218 task_tracker.spawn(worker.run());
219
220 Ok(())
221}