mas_storage/queue/
tasks.rs

1// Copyright 2025, 2026 Element Creations Ltd.
2// Copyright 2024, 2025 New Vector Ltd.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7use chrono::{DateTime, Utc};
8use mas_data_model::{
9    BrowserSession, CompatSession, Device, Session, User, UserEmailAuthentication,
10    UserRecoverySession,
11};
12use serde::{Deserialize, Serialize};
13use ulid::Ulid;
14
15use super::InsertableJob;
16use crate::{Page, Pagination};
17
18/// This is the previous iteration of the email verification job. It has been
19/// replaced by [`SendEmailAuthenticationCodeJob`]. This struct is kept to be
20/// able to consume jobs that are still in the queue.
21#[derive(Serialize, Deserialize, Debug, Clone)]
22pub struct VerifyEmailJob {
23    user_email_id: Ulid,
24    language: Option<String>,
25}
26
27impl VerifyEmailJob {
28    /// The ID of the email address to verify.
29    #[must_use]
30    pub fn user_email_id(&self) -> Ulid {
31        self.user_email_id
32    }
33}
34
35impl InsertableJob for VerifyEmailJob {
36    const QUEUE_NAME: &'static str = "verify-email";
37}
38
39/// A job to send an email authentication code to a user.
40#[derive(Serialize, Deserialize, Debug, Clone)]
41pub struct SendEmailAuthenticationCodeJob {
42    user_email_authentication_id: Ulid,
43    language: String,
44}
45
46impl SendEmailAuthenticationCodeJob {
47    /// Create a new job to send an email authentication code to a user.
48    #[must_use]
49    pub fn new(user_email_authentication: &UserEmailAuthentication, language: String) -> Self {
50        Self {
51            user_email_authentication_id: user_email_authentication.id,
52            language,
53        }
54    }
55
56    /// The language to use for the email.
57    #[must_use]
58    pub fn language(&self) -> &str {
59        &self.language
60    }
61
62    /// The ID of the email authentication to send the code for.
63    #[must_use]
64    pub fn user_email_authentication_id(&self) -> Ulid {
65        self.user_email_authentication_id
66    }
67}
68
69impl InsertableJob for SendEmailAuthenticationCodeJob {
70    const QUEUE_NAME: &'static str = "send-email-authentication-code";
71}
72
73/// A job to provision the user on the homeserver.
74#[derive(Serialize, Deserialize, Debug, Clone)]
75pub struct ProvisionUserJob {
76    user_id: Ulid,
77    set_display_name: Option<String>,
78}
79
80impl ProvisionUserJob {
81    /// Create a new job to provision the user on the homeserver.
82    #[must_use]
83    pub fn new(user: &User) -> Self {
84        Self {
85            user_id: user.id,
86            set_display_name: None,
87        }
88    }
89
90    #[doc(hidden)]
91    #[must_use]
92    pub fn new_for_id(user_id: Ulid) -> Self {
93        Self {
94            user_id,
95            set_display_name: None,
96        }
97    }
98
99    /// Set the display name of the user.
100    #[must_use]
101    pub fn set_display_name(mut self, display_name: String) -> Self {
102        self.set_display_name = Some(display_name);
103        self
104    }
105
106    /// Get the display name to be set.
107    #[must_use]
108    pub fn display_name_to_set(&self) -> Option<&str> {
109        self.set_display_name.as_deref()
110    }
111
112    /// The ID of the user to provision.
113    #[must_use]
114    pub fn user_id(&self) -> Ulid {
115        self.user_id
116    }
117}
118
119impl InsertableJob for ProvisionUserJob {
120    const QUEUE_NAME: &'static str = "provision-user";
121}
122
123/// A job to provision a device for a user on the homeserver.
124///
125/// This job is deprecated, use the `SyncDevicesJob` instead. It is kept to
126/// not break existing jobs in the database.
127#[derive(Serialize, Deserialize, Debug, Clone)]
128pub struct ProvisionDeviceJob {
129    user_id: Ulid,
130    device_id: String,
131}
132
133impl ProvisionDeviceJob {
134    /// The ID of the user to provision the device for.
135    #[must_use]
136    pub fn user_id(&self) -> Ulid {
137        self.user_id
138    }
139
140    /// The ID of the device to provision.
141    #[must_use]
142    pub fn device_id(&self) -> &str {
143        &self.device_id
144    }
145}
146
147impl InsertableJob for ProvisionDeviceJob {
148    const QUEUE_NAME: &'static str = "provision-device";
149}
150
151/// A job to delete a device for a user on the homeserver.
152///
153/// This job is deprecated, use the `SyncDevicesJob` instead. It is kept to
154/// not break existing jobs in the database.
155#[derive(Serialize, Deserialize, Debug, Clone)]
156pub struct DeleteDeviceJob {
157    user_id: Ulid,
158    device_id: String,
159}
160
161impl DeleteDeviceJob {
162    /// Create a new job to delete a device for a user on the homeserver.
163    #[must_use]
164    pub fn new(user: &User, device: &Device) -> Self {
165        Self {
166            user_id: user.id,
167            device_id: device.as_str().to_owned(),
168        }
169    }
170
171    /// The ID of the user to delete the device for.
172    #[must_use]
173    pub fn user_id(&self) -> Ulid {
174        self.user_id
175    }
176
177    /// The ID of the device to delete.
178    #[must_use]
179    pub fn device_id(&self) -> &str {
180        &self.device_id
181    }
182}
183
184impl InsertableJob for DeleteDeviceJob {
185    const QUEUE_NAME: &'static str = "delete-device";
186}
187
188/// A job which syncs the list of devices of a user with the homeserver
189#[derive(Serialize, Deserialize, Debug, Clone)]
190pub struct SyncDevicesJob {
191    user_id: Ulid,
192}
193
194impl SyncDevicesJob {
195    /// Create a new job to sync the list of devices of a user with the
196    /// homeserver
197    #[must_use]
198    pub fn new(user: &User) -> Self {
199        Self { user_id: user.id }
200    }
201
202    /// Create a new job to sync the list of devices of a user with the
203    /// homeserver for the given user ID
204    ///
205    /// This is useful to use in cases where the [`User`] object isn't loaded
206    #[must_use]
207    pub fn new_for_id(user_id: Ulid) -> Self {
208        Self { user_id }
209    }
210
211    /// The ID of the user to sync the devices for
212    #[must_use]
213    pub fn user_id(&self) -> Ulid {
214        self.user_id
215    }
216}
217
218impl InsertableJob for SyncDevicesJob {
219    const QUEUE_NAME: &'static str = "sync-devices";
220}
221
222/// A job to deactivate and lock a user
223#[derive(Serialize, Deserialize, Debug, Clone)]
224pub struct DeactivateUserJob {
225    user_id: Ulid,
226    hs_erase: bool,
227}
228
229impl DeactivateUserJob {
230    /// Create a new job to deactivate and lock a user
231    ///
232    /// # Parameters
233    ///
234    /// * `user` - The user to deactivate
235    /// * `hs_erase` - Whether to erase the user from the homeserver
236    #[must_use]
237    pub fn new(user: &User, hs_erase: bool) -> Self {
238        Self {
239            user_id: user.id,
240            hs_erase,
241        }
242    }
243
244    /// The ID of the user to deactivate
245    #[must_use]
246    pub fn user_id(&self) -> Ulid {
247        self.user_id
248    }
249
250    /// Whether to erase the user from the homeserver
251    #[must_use]
252    pub fn hs_erase(&self) -> bool {
253        self.hs_erase
254    }
255}
256
257impl InsertableJob for DeactivateUserJob {
258    const QUEUE_NAME: &'static str = "deactivate-user";
259}
260
261/// A job to reactivate a user
262#[derive(Serialize, Deserialize, Debug, Clone)]
263pub struct ReactivateUserJob {
264    user_id: Ulid,
265}
266
267impl ReactivateUserJob {
268    /// Create a new job to reactivate a user
269    ///
270    /// # Parameters
271    ///
272    /// * `user` - The user to reactivate
273    #[must_use]
274    pub fn new(user: &User) -> Self {
275        Self { user_id: user.id }
276    }
277
278    /// The ID of the user to reactivate
279    #[must_use]
280    pub fn user_id(&self) -> Ulid {
281        self.user_id
282    }
283}
284
285impl InsertableJob for ReactivateUserJob {
286    const QUEUE_NAME: &'static str = "reactivate-user";
287}
288
289/// Send account recovery emails
290#[derive(Serialize, Deserialize, Debug, Clone)]
291pub struct SendAccountRecoveryEmailsJob {
292    user_recovery_session_id: Ulid,
293}
294
295impl SendAccountRecoveryEmailsJob {
296    /// Create a new job to send account recovery emails
297    ///
298    /// # Parameters
299    ///
300    /// * `user_recovery_session` - The user recovery session to send the email
301    ///   for
302    /// * `language` - The locale to send the email in
303    #[must_use]
304    pub fn new(user_recovery_session: &UserRecoverySession) -> Self {
305        Self {
306            user_recovery_session_id: user_recovery_session.id,
307        }
308    }
309
310    /// The ID of the user recovery session to send the email for
311    #[must_use]
312    pub fn user_recovery_session_id(&self) -> Ulid {
313        self.user_recovery_session_id
314    }
315}
316
317impl InsertableJob for SendAccountRecoveryEmailsJob {
318    const QUEUE_NAME: &'static str = "send-account-recovery-email";
319}
320
321/// Cleanup revoked OAuth 2.0 access tokens
322#[derive(Serialize, Deserialize, Debug, Clone, Default)]
323pub struct CleanupRevokedOAuthAccessTokensJob;
324
325impl InsertableJob for CleanupRevokedOAuthAccessTokensJob {
326    const QUEUE_NAME: &'static str = "cleanup-revoked-oauth-access-tokens";
327}
328
329/// Cleanup expired OAuth 2.0 access tokens
330#[derive(Serialize, Deserialize, Debug, Clone, Default)]
331pub struct CleanupExpiredOAuthAccessTokensJob;
332
333impl InsertableJob for CleanupExpiredOAuthAccessTokensJob {
334    const QUEUE_NAME: &'static str = "cleanup-expired-oauth-access-tokens";
335}
336
337/// Cleanup revoked OAuth 2.0 refresh tokens
338#[derive(Serialize, Deserialize, Debug, Clone, Default)]
339pub struct CleanupRevokedOAuthRefreshTokensJob;
340
341impl InsertableJob for CleanupRevokedOAuthRefreshTokensJob {
342    const QUEUE_NAME: &'static str = "cleanup-revoked-oauth-refresh-tokens";
343}
344
345/// Cleanup consumed OAuth 2.0 refresh tokens
346#[derive(Serialize, Deserialize, Debug, Clone, Default)]
347pub struct CleanupConsumedOAuthRefreshTokensJob;
348
349impl InsertableJob for CleanupConsumedOAuthRefreshTokensJob {
350    const QUEUE_NAME: &'static str = "cleanup-consumed-oauth-refresh-tokens";
351}
352
353/// Scheduled job to expire inactive sessions
354///
355/// This job will trigger jobs to expire inactive compat, oauth and user
356/// sessions.
357#[derive(Serialize, Deserialize, Debug, Clone)]
358pub struct ExpireInactiveSessionsJob;
359
360impl InsertableJob for ExpireInactiveSessionsJob {
361    const QUEUE_NAME: &'static str = "expire-inactive-sessions";
362}
363
364/// Expire inactive OAuth 2.0 sessions
365#[derive(Serialize, Deserialize, Debug, Clone)]
366pub struct ExpireInactiveOAuthSessionsJob {
367    threshold: DateTime<Utc>,
368    after: Option<Ulid>,
369}
370
371impl ExpireInactiveOAuthSessionsJob {
372    /// Create a new job to expire inactive OAuth 2.0 sessions
373    ///
374    /// # Parameters
375    ///
376    /// * `threshold` - The threshold to expire sessions at
377    #[must_use]
378    pub fn new(threshold: DateTime<Utc>) -> Self {
379        Self {
380            threshold,
381            after: None,
382        }
383    }
384
385    /// Get the threshold to expire sessions at
386    #[must_use]
387    pub fn threshold(&self) -> DateTime<Utc> {
388        self.threshold
389    }
390
391    /// Get the pagination cursor
392    #[must_use]
393    pub fn pagination(&self, batch_size: usize) -> Pagination {
394        let pagination = Pagination::first(batch_size);
395        if let Some(after) = self.after {
396            pagination.after(after)
397        } else {
398            pagination
399        }
400    }
401
402    /// Get the next job given the page returned by the database
403    #[must_use]
404    pub fn next(&self, page: &Page<Session>) -> Option<Self> {
405        if !page.has_next_page {
406            return None;
407        }
408
409        let last_edge = page.edges.last()?;
410        Some(Self {
411            threshold: self.threshold,
412            after: Some(last_edge.cursor),
413        })
414    }
415}
416
417impl InsertableJob for ExpireInactiveOAuthSessionsJob {
418    const QUEUE_NAME: &'static str = "expire-inactive-oauth-sessions";
419}
420
421/// Expire inactive compatibility sessions
422#[derive(Serialize, Deserialize, Debug, Clone)]
423pub struct ExpireInactiveCompatSessionsJob {
424    threshold: DateTime<Utc>,
425    after: Option<Ulid>,
426}
427
428impl ExpireInactiveCompatSessionsJob {
429    /// Create a new job to expire inactive compatibility sessions
430    ///
431    /// # Parameters
432    ///
433    /// * `threshold` - The threshold to expire sessions at
434    #[must_use]
435    pub fn new(threshold: DateTime<Utc>) -> Self {
436        Self {
437            threshold,
438            after: None,
439        }
440    }
441
442    /// Get the threshold to expire sessions at
443    #[must_use]
444    pub fn threshold(&self) -> DateTime<Utc> {
445        self.threshold
446    }
447
448    /// Get the pagination cursor
449    #[must_use]
450    pub fn pagination(&self, batch_size: usize) -> Pagination {
451        let pagination = Pagination::first(batch_size);
452        if let Some(after) = self.after {
453            pagination.after(after)
454        } else {
455            pagination
456        }
457    }
458
459    /// Get the next job given the page returned by the database
460    #[must_use]
461    pub fn next(&self, page: &Page<CompatSession>) -> Option<Self> {
462        if !page.has_next_page {
463            return None;
464        }
465
466        let last_edge = page.edges.last()?;
467        Some(Self {
468            threshold: self.threshold,
469            after: Some(last_edge.cursor),
470        })
471    }
472}
473
474impl InsertableJob for ExpireInactiveCompatSessionsJob {
475    const QUEUE_NAME: &'static str = "expire-inactive-compat-sessions";
476}
477
478/// Expire inactive user sessions
479#[derive(Debug, Serialize, Deserialize)]
480pub struct ExpireInactiveUserSessionsJob {
481    threshold: DateTime<Utc>,
482    after: Option<Ulid>,
483}
484
485impl ExpireInactiveUserSessionsJob {
486    /// Create a new job to expire inactive user/browser sessions
487    ///
488    /// # Parameters
489    ///
490    /// * `threshold` - The threshold to expire sessions at
491    #[must_use]
492    pub fn new(threshold: DateTime<Utc>) -> Self {
493        Self {
494            threshold,
495            after: None,
496        }
497    }
498
499    /// Get the threshold to expire sessions at
500    #[must_use]
501    pub fn threshold(&self) -> DateTime<Utc> {
502        self.threshold
503    }
504
505    /// Get the pagination cursor
506    #[must_use]
507    pub fn pagination(&self, batch_size: usize) -> Pagination {
508        let pagination = Pagination::first(batch_size);
509        if let Some(after) = self.after {
510            pagination.after(after)
511        } else {
512            pagination
513        }
514    }
515
516    /// Get the next job given the page returned by the database
517    #[must_use]
518    pub fn next(&self, page: &Page<BrowserSession>) -> Option<Self> {
519        if !page.has_next_page {
520            return None;
521        }
522
523        let last_edge = page.edges.last()?;
524        Some(Self {
525            threshold: self.threshold,
526            after: Some(last_edge.cursor),
527        })
528    }
529}
530
531impl InsertableJob for ExpireInactiveUserSessionsJob {
532    const QUEUE_NAME: &'static str = "expire-inactive-user-sessions";
533}
534
535/// Prune stale policy data
536#[derive(Debug, Serialize, Deserialize)]
537pub struct PruneStalePolicyDataJob;
538
539impl InsertableJob for PruneStalePolicyDataJob {
540    const QUEUE_NAME: &'static str = "prune-stale-policy-data";
541}