From ee4f49777a0c00e2f661d77fe51f4817c0f4eba2 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Mon, 27 Jan 2025 17:25:00 +0000 Subject: [PATCH] syn2mas: Migrate threepids to MAS (#3878) * Add a table to hold unsupported threepids * Migrate threepids from Synapse to MAS --- crates/cli/src/commands/syn2mas.rs | 34 +-- ...0124151529_unsupported_threepids_table.sql | 30 +++ ...b5b37ab50db3505712c35610b822cda322b5b.json | 17 ++ ...a853a8a7efccdc20b968d99d8c18deda8dd00.json | 17 ++ crates/syn2mas/Cargo.toml | 4 +- crates/syn2mas/src/lib.rs | 16 +- crates/syn2mas/src/mas_writer/checks.rs | 9 +- crates/syn2mas/src/mas_writer/locking.rs | 10 +- crates/syn2mas/src/mas_writer/mod.rs | 210 ++++++++++++++++-- .../syn2mas_revert_temporary_tables.sql | 2 + .../mas_writer/syn2mas_temporary_tables.sql | 2 + crates/syn2mas/src/migration.rs | 85 ++++++- crates/syn2mas/src/synapse_reader/checks.rs | 31 ++- crates/syn2mas/src/synapse_reader/config.rs | 36 +-- crates/syn2mas/src/synapse_reader/mod.rs | 95 ++++++-- 15 files changed, 502 insertions(+), 96 deletions(-) create mode 100644 crates/storage-pg/migrations/20250124151529_unsupported_threepids_table.sql create mode 100644 crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json create mode 100644 crates/syn2mas/.sqlx/query-dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00.json diff --git a/crates/cli/src/commands/syn2mas.rs b/crates/cli/src/commands/syn2mas.rs index 1133dd18a..05ceefbbb 100644 --- a/crates/cli/src/commands/syn2mas.rs +++ b/crates/cli/src/commands/syn2mas.rs @@ -12,10 +12,12 @@ use tracing::{error, warn}; use crate::util::database_connection_from_config; -/// The exit code used by `syn2mas check` and `syn2mas migrate` when there are errors preventing migration. +/// The exit code used by `syn2mas check` and `syn2mas migrate` when there are +/// errors preventing migration. const EXIT_CODE_CHECK_ERRORS: u8 = 10; -/// The exit code used by `syn2mas check` when there are warnings which should be considered prior to migration. +/// The exit code used by `syn2mas check` when there are warnings which should +/// be considered prior to migration. const EXIT_CODE_CHECK_WARNINGS: u8 = 11; #[derive(Parser, Debug)] @@ -23,32 +25,38 @@ pub(super) struct Options { #[command(subcommand)] subcommand: Subcommand, - /// This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. It is only suitable for TESTING. - /// If you want to use this tool anyway, please pass this argument. + /// This version of the syn2mas tool is EXPERIMENTAL and INCOMPLETE. It is + /// only suitable for TESTING. If you want to use this tool anyway, + /// please pass this argument. /// - /// If you want to migrate from Synapse to MAS today, please use the Node.js-based tool in the MAS repository. + /// If you want to migrate from Synapse to MAS today, please use the + /// Node.js-based tool in the MAS repository. #[clap(long = "i-swear-i-am-just-testing-in-a-staging-environment")] experimental_accepted: bool, /// Path to the Synapse configuration (in YAML format). - /// May be specified multiple times if multiple Synapse configuration files are in use. + /// May be specified multiple times if multiple Synapse configuration files + /// are in use. #[clap(long = "synapse-config")] synapse_configuration_files: Vec, /// Override the Synapse database URI. - /// syn2mas normally loads the Synapse database connection details from the Synapse configuration. - /// However, it may sometimes be necessary to override the database URI and in that case this flag can be used. + /// syn2mas normally loads the Synapse database connection details from the + /// Synapse configuration. However, it may sometimes be necessary to + /// override the database URI and in that case this flag can be used. /// /// Should be a connection URI of the following general form: /// ```text /// postgresql://[user[:password]@][host][:port][/dbname][?param1=value1&...] /// ``` - /// To use a UNIX socket at a custom path, the host should be a path to a socket, but in the URI string - /// it must be URI-encoded by replacing `/` with `%2F`. + /// To use a UNIX socket at a custom path, the host should be a path to a + /// socket, but in the URI string it must be URI-encoded by replacing + /// `/` with `%2F`. /// - /// Finally, any missing values will be loaded from the libpq-compatible environment variables - /// `PGHOST`, `PGPORT`, `PGUSER`, `PGDATABASE`, `PGPASSWORD`, etc. - /// It is valid to specify the URL `postgresql:` and configure all values through those environment variables. + /// Finally, any missing values will be loaded from the libpq-compatible + /// environment variables `PGHOST`, `PGPORT`, `PGUSER`, `PGDATABASE`, + /// `PGPASSWORD`, etc. It is valid to specify the URL `postgresql:` and + /// configure all values through those environment variables. #[clap(long = "synapse-database-uri")] synapse_database_uri: Option, } diff --git a/crates/storage-pg/migrations/20250124151529_unsupported_threepids_table.sql b/crates/storage-pg/migrations/20250124151529_unsupported_threepids_table.sql new file mode 100644 index 000000000..f00cb3247 --- /dev/null +++ b/crates/storage-pg/migrations/20250124151529_unsupported_threepids_table.sql @@ -0,0 +1,30 @@ +-- Copyright 2025 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + + + +-- Tracks third-party ID associations that have been verified but are +-- not currently supported by MAS. +-- This is currently used when importing third-party IDs from Synapse, +-- which historically could verify at least phone numbers. +-- E-mail associations will not be stored in this table because those are natively +-- supported by MAS; see the `user_emails` table. + +CREATE TABLE user_unsupported_third_party_ids( + -- The owner of the third-party ID assocation + user_id UUID NOT NULL + REFERENCES users(user_id) ON DELETE CASCADE, + + -- What type of association is this? + medium TEXT NOT NULL, + + -- The address of the associated ID, e.g. a phone number or other identifier. + address TEXT NOT NULL, + + -- When the association was created + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + + PRIMARY KEY (user_id, medium, address) +); diff --git a/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json b/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json new file mode 100644 index 000000000..b44dfc605 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_unsupported_third_party_ids\n (user_id, medium, address, created_at)\n SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "TextArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "b11590549fdd4cdcd36c937a353b5b37ab50db3505712c35610b822cda322b5b" +} diff --git a/crates/syn2mas/.sqlx/query-dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00.json b/crates/syn2mas/.sqlx/query-dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00.json new file mode 100644 index 000000000..cf89130f9 --- /dev/null +++ b/crates/syn2mas/.sqlx/query-dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO syn2mas__user_emails\n (user_email_id, user_id, email, created_at, confirmed_at)\n SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[])\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "UuidArray", + "UuidArray", + "TextArray", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "dfbd462f7874d3dae551f2a0328a853a8a7efccdc20b968d99d8c18deda8dd00" +} diff --git a/crates/syn2mas/Cargo.toml b/crates/syn2mas/Cargo.toml index 55c51d4d4..a7075c7f0 100644 --- a/crates/syn2mas/Cargo.toml +++ b/crates/syn2mas/Cargo.toml @@ -27,6 +27,8 @@ rand.workspace = true uuid = "1.10.0" ulid = { workspace = true, features = ["uuid"] } +mas-config.workspace = true + [dev-dependencies] mas-storage-pg.workspace = true @@ -34,7 +36,5 @@ anyhow.workspace = true insta.workspace = true serde.workspace = true -mas-config.workspace = true - [lints] workspace = true diff --git a/crates/syn2mas/src/lib.rs b/crates/syn2mas/src/lib.rs index f5ae181cf..723ebc869 100644 --- a/crates/syn2mas/src/lib.rs +++ b/crates/syn2mas/src/lib.rs @@ -8,11 +8,13 @@ mod synapse_reader; mod migration; -pub use self::mas_writer::locking::LockedMasDatabase; -pub use self::mas_writer::{checks::mas_pre_migration_checks, MasWriter}; -pub use self::migration::migrate; -pub use self::synapse_reader::checks::{ - synapse_config_check, synapse_config_check_against_mas_config, synapse_database_check, +pub use self::{ + mas_writer::{checks::mas_pre_migration_checks, locking::LockedMasDatabase, MasWriter}, + migration::migrate, + synapse_reader::{ + checks::{ + synapse_config_check, synapse_config_check_against_mas_config, synapse_database_check, + }, + config as synapse_config, SynapseReader, + }, }; -pub use self::synapse_reader::config as synapse_config; -pub use self::synapse_reader::SynapseReader; diff --git a/crates/syn2mas/src/mas_writer/checks.rs b/crates/syn2mas/src/mas_writer/checks.rs index 85a6e74a9..a8ea1a18a 100644 --- a/crates/syn2mas/src/mas_writer/checks.rs +++ b/crates/syn2mas/src/mas_writer/checks.rs @@ -5,7 +5,8 @@ //! # MAS Database Checks //! -//! This module provides safety checks to run against a MAS database before running the Synapse-to-MAS migration. +//! This module provides safety checks to run against a MAS database before +//! running the Synapse-to-MAS migration. use thiserror::Error; use thiserror_ext::ContextInto; @@ -43,7 +44,8 @@ pub enum Error { /// /// - If any database access error occurs. /// - If any MAS tables involved in the migration are not empty. -/// - If we can't check whether syn2mas is already in progress on this database or not. +/// - If we can't check whether syn2mas is already in progress on this database +/// or not. #[tracing::instrument(skip_all)] pub async fn mas_pre_migration_checks<'a>( mas_connection: &mut LockedMasDatabase<'a>, @@ -56,7 +58,8 @@ pub async fn mas_pre_migration_checks<'a>( return Ok(()); } - // Check that the database looks like a MAS database and that it is also an empty database. + // Check that the database looks like a MAS database and that it is also an + // empty database. for &table in MAS_TABLES_AFFECTED_BY_MIGRATION { let row_present = sqlx::query(&format!("SELECT 1 AS dummy FROM {table} LIMIT 1")) diff --git a/crates/syn2mas/src/mas_writer/locking.rs b/crates/syn2mas/src/mas_writer/locking.rs index 147bad4a6..f034025bf 100644 --- a/crates/syn2mas/src/mas_writer/locking.rs +++ b/crates/syn2mas/src/mas_writer/locking.rs @@ -13,17 +13,19 @@ use sqlx::{ static SYN2MAS_ADVISORY_LOCK: LazyLock = LazyLock::new(|| PgAdvisoryLock::new("syn2mas-maswriter")); -/// A wrapper around a Postgres connection which holds a session-wide advisory lock -/// preventing concurrent access by other syn2mas instances. +/// A wrapper around a Postgres connection which holds a session-wide advisory +/// lock preventing concurrent access by other syn2mas instances. pub struct LockedMasDatabase<'conn> { inner: PgAdvisoryLockGuard<'static, &'conn mut PgConnection>, } impl<'conn> LockedMasDatabase<'conn> { - /// Attempts to lock the MAS database against concurrent access by other syn2mas instances. + /// Attempts to lock the MAS database against concurrent access by other + /// syn2mas instances. /// /// If the lock can be acquired, returns a `LockedMasDatabase`. - /// If the lock cannot be acquired, returns the connection back to the caller wrapped in `Either::Right`. + /// If the lock cannot be acquired, returns the connection back to the + /// caller wrapped in `Either::Right`. /// /// # Errors /// diff --git a/crates/syn2mas/src/mas_writer/mod.rs b/crates/syn2mas/src/mas_writer/mod.rs index 1dfed0df4..0d0528aff 100644 --- a/crates/syn2mas/src/mas_writer/mod.rs +++ b/crates/syn2mas/src/mas_writer/mod.rs @@ -146,7 +146,8 @@ impl WriterConnectionPool { /// /// # Panics /// - /// - If connections were not returned to the pool. (This indicates a serious bug.) + /// - If connections were not returned to the pool. (This indicates a + /// serious bug.) pub async fn finish(self) -> Result<(), Vec> { let mut errors = Vec::new(); @@ -207,14 +208,33 @@ pub struct MasNewUserPassword { pub created_at: DateTime, } -/// The 'version' of the password hashing scheme used for passwords when they are -/// migrated from Synapse to MAS. +pub struct MasNewEmailThreepid { + pub user_email_id: Uuid, + pub user_id: Uuid, + pub email: String, + pub created_at: DateTime, +} + +pub struct MasNewUnsupportedThreepid { + pub user_id: Uuid, + pub medium: String, + pub address: String, + pub created_at: DateTime, +} + +/// The 'version' of the password hashing scheme used for passwords when they +/// are migrated from Synapse to MAS. /// This is version 1, as in the previous syn2mas script. // TODO hardcoding version to `1` may not be correct long-term? pub const MIGRATED_PASSWORD_VERSION: u16 = 1; /// List of all MAS tables that are written to by syn2mas. -pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &["users", "user_passwords"]; +pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &[ + "users", + "user_passwords", + "user_emails", + "user_unsupported_third_party_ids", +]; /// Detect whether a syn2mas migration has started on the given database. /// @@ -227,8 +247,8 @@ pub const MAS_TABLES_AFFECTED_BY_MIGRATION: &[&str] = &["users", "user_passwords /// Errors are returned under the following circumstances: /// /// - If any database error occurs whilst querying the database. -/// - If some, but not all, syn2mas restoration tables are present. -/// (This shouldn't be possible without syn2mas having been sabotaged!) +/// - If some, but not all, syn2mas restoration tables are present. (This +/// shouldn't be possible without syn2mas having been sabotaged!) pub async fn is_syn2mas_in_progress(conn: &mut PgConnection) -> Result { // Names of tables used for syn2mas resumption // Must be `String`s, not just `&str`, for the query. @@ -457,7 +477,8 @@ impl<'conn> MasWriter<'conn> { .await .map_err(|errors| Error::Multiple(MultipleErrors::from(errors)))?; - // Now all the data has been migrated, finish off by restoring indices and constraints! + // Now all the data has been migrated, finish off by restoring indices and + // constraints! query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;") .execute(self.conn.as_mut()) @@ -563,11 +584,11 @@ impl<'conn> MasWriter<'conn> { &mut self, passwords: Vec, ) -> Result<(), Error> { - self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { - if passwords.is_empty() { - return Ok(()); - } + if passwords.is_empty() { + return Ok(()); + } + self.writer_pool.spawn_with_connection(move |conn| Box::pin(async move { let mut user_password_ids: Vec = Vec::with_capacity(passwords.len()); let mut user_ids: Vec = Vec::with_capacity(passwords.len()); let mut hashed_passwords: Vec = Vec::with_capacity(passwords.len()); @@ -603,12 +624,107 @@ impl<'conn> MasWriter<'conn> { Ok(()) })).await } + + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub async fn write_email_threepids( + &mut self, + threepids: Vec, + ) -> Result<(), Error> { + if threepids.is_empty() { + return Ok(()); + } + self.writer_pool.spawn_with_connection(move |conn| { + Box::pin(async move { + let mut user_email_ids: Vec = Vec::with_capacity(threepids.len()); + let mut user_ids: Vec = Vec::with_capacity(threepids.len()); + let mut emails: Vec = Vec::with_capacity(threepids.len()); + let mut created_ats: Vec> = Vec::with_capacity(threepids.len()); + + for MasNewEmailThreepid { + user_email_id, + user_id, + email, + created_at, + } in threepids + { + user_email_ids.push(user_email_id); + user_ids.push(user_id); + emails.push(email); + created_ats.push(created_at); + } + + // `confirmed_at` is going to get removed in a future MAS release, + // so just populate with `created_at` + sqlx::query!( + r#" + INSERT INTO syn2mas__user_emails + (user_email_id, user_id, email, created_at, confirmed_at) + SELECT * FROM UNNEST($1::UUID[], $2::UUID[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[], $4::TIMESTAMP WITH TIME ZONE[]) + "#, + &user_email_ids[..], + &user_ids[..], + &emails[..], + &created_ats[..], + ).execute(&mut *conn).await.into_database("writing emails to MAS")?; + + Ok(()) + }) + }).await + } + + #[tracing::instrument(skip_all, level = Level::DEBUG)] + pub async fn write_unsupported_threepids( + &mut self, + threepids: Vec, + ) -> Result<(), Error> { + if threepids.is_empty() { + return Ok(()); + } + self.writer_pool.spawn_with_connection(move |conn| { + Box::pin(async move { + let mut user_ids: Vec = Vec::with_capacity(threepids.len()); + let mut mediums: Vec = Vec::with_capacity(threepids.len()); + let mut addresses: Vec = Vec::with_capacity(threepids.len()); + let mut created_ats: Vec> = Vec::with_capacity(threepids.len()); + + for MasNewUnsupportedThreepid { + user_id, + medium, + address, + created_at, + } in threepids + { + user_ids.push(user_id); + mediums.push(medium); + addresses.push(address); + created_ats.push(created_at); + } + + // `confirmed_at` is going to get removed in a future MAS release, + // so just populate with `created_at` + sqlx::query!( + r#" + INSERT INTO syn2mas__user_unsupported_third_party_ids + (user_id, medium, address, created_at) + SELECT * FROM UNNEST($1::UUID[], $2::TEXT[], $3::TEXT[], $4::TIMESTAMP WITH TIME ZONE[]) + "#, + &user_ids[..], + &mediums[..], + &addresses[..], + &created_ats[..], + ).execute(&mut *conn).await.into_database("writing unsupported threepids to MAS")?; + + Ok(()) + }) + }).await + } } -// How many entries to buffer at once, before writing a batch of rows to the database. -// TODO tune: didn't see that much difference between 4k and 64k -// (4k: 13.5~14, 64k: 12.5~13s — streaming the whole way would be better, especially for DB latency, but probably fiiine -// and also we won't be able to stream to two tables at once...) +// How many entries to buffer at once, before writing a batch of rows to the +// database. TODO tune: didn't see that much difference between 4k and 64k +// (4k: 13.5~14, 64k: 12.5~13s — streaming the whole way would be better, +// especially for DB latency, but probably fiiine and also we won't be able to +// stream to two tables at once...) const WRITE_BUFFER_BATCH_SIZE: usize = 4096; pub struct MasUserWriteBuffer<'writer, 'conn> { @@ -670,13 +786,69 @@ impl<'writer, 'conn> MasUserWriteBuffer<'writer, 'conn> { } } +pub struct MasThreepidWriteBuffer<'writer, 'conn> { + email: Vec, + unsupported: Vec, + writer: &'writer mut MasWriter<'conn>, +} + +impl<'writer, 'conn> MasThreepidWriteBuffer<'writer, 'conn> { + pub fn new(writer: &'writer mut MasWriter<'conn>) -> Self { + MasThreepidWriteBuffer { + email: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), + unsupported: Vec::with_capacity(WRITE_BUFFER_BATCH_SIZE), + writer, + } + } + + pub async fn finish(mut self) -> Result<(), Error> { + self.flush_emails().await?; + self.flush_unsupported().await?; + Ok(()) + } + + pub async fn flush_emails(&mut self) -> Result<(), Error> { + self.writer + .write_email_threepids(std::mem::take(&mut self.email)) + .await?; + self.email.reserve_exact(WRITE_BUFFER_BATCH_SIZE); + Ok(()) + } + + pub async fn flush_unsupported(&mut self) -> Result<(), Error> { + self.writer + .write_unsupported_threepids(std::mem::take(&mut self.unsupported)) + .await?; + self.unsupported.reserve_exact(WRITE_BUFFER_BATCH_SIZE); + Ok(()) + } + + pub async fn write_email(&mut self, user: MasNewEmailThreepid) -> Result<(), Error> { + self.email.push(user); + if self.email.len() >= WRITE_BUFFER_BATCH_SIZE { + self.flush_emails().await?; + } + Ok(()) + } + + pub async fn write_password( + &mut self, + unsupported: MasNewUnsupportedThreepid, + ) -> Result<(), Error> { + self.unsupported.push(unsupported); + if self.unsupported.len() >= WRITE_BUFFER_BATCH_SIZE { + self.flush_unsupported().await?; + } + Ok(()) + } +} + #[cfg(test)] mod test { use std::collections::{BTreeMap, BTreeSet}; use chrono::DateTime; use futures_util::TryStreamExt; - use serde::Serialize; use sqlx::{Column, PgConnection, PgPool, Row}; use uuid::Uuid; @@ -707,9 +879,11 @@ mod test { const SKIPPED_TABLES: &[&str] = &["_sqlx_migrations"]; - /// Produces a serialisable snapshot of a database, usable for snapshot testing + /// Produces a serialisable snapshot of a database, usable for snapshot + /// testing /// - /// For brevity, empty tables, as well as [`SKIPPED_TABLES`], will not be included in the snapshot. + /// For brevity, empty tables, as well as [`SKIPPED_TABLES`], will not be + /// included in the snapshot. async fn snapshot_database(conn: &mut PgConnection) -> DatabaseSnapshot { let mut out = DatabaseSnapshot::default(); let table_names: Vec = sqlx::query_scalar( diff --git a/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql index d82be82c8..e8fa90c12 100644 --- a/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql +++ b/crates/syn2mas/src/mas_writer/syn2mas_revert_temporary_tables.sql @@ -10,3 +10,5 @@ DROP TABLE syn2mas_restore_indices; ALTER TABLE syn2mas__users RENAME TO users; ALTER TABLE syn2mas__user_passwords RENAME TO user_passwords; +ALTER TABLE syn2mas__user_emails RENAME TO user_emails; +ALTER TABLE syn2mas__user_unsupported_third_party_ids RENAME TO user_unsupported_third_party_ids; diff --git a/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql index 3e5b86e40..36986d917 100644 --- a/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql +++ b/crates/syn2mas/src/mas_writer/syn2mas_temporary_tables.sql @@ -39,3 +39,5 @@ CREATE TABLE syn2mas_restore_indices ( -- Now we rename all tables that we touch during the migration. ALTER TABLE users RENAME TO syn2mas__users; ALTER TABLE user_passwords RENAME TO syn2mas__user_passwords; +ALTER TABLE user_emails RENAME TO syn2mas__user_emails; +ALTER TABLE user_unsupported_third_party_ids RENAME TO syn2mas__user_unsupported_third_party_ids; diff --git a/crates/syn2mas/src/migration.rs b/crates/syn2mas/src/migration.rs index 59afa73d9..6494215a8 100644 --- a/crates/syn2mas/src/migration.rs +++ b/crates/syn2mas/src/migration.rs @@ -5,9 +5,11 @@ //! # Migration //! -//! This module provides the high-level logic for performing the Synapse-to-MAS database migration. +//! This module provides the high-level logic for performing the Synapse-to-MAS +//! database migration. //! -//! This module does not implement any of the safety checks that should be run *before* the migration. +//! This module does not implement any of the safety checks that should be run +//! *before* the migration. use std::{collections::HashMap, pin::pin}; @@ -17,12 +19,16 @@ use futures_util::StreamExt as _; use rand::RngCore; use thiserror::Error; use thiserror_ext::ContextInto; +use tracing::Level; use ulid::Ulid; use uuid::Uuid; use crate::{ - mas_writer::{self, MasNewUser, MasNewUserPassword, MasUserWriteBuffer, MasWriter}, - synapse_reader::{self, ExtractLocalpartError, FullUserId, SynapseUser}, + mas_writer::{ + self, MasNewEmailThreepid, MasNewUnsupportedThreepid, MasNewUser, MasNewUserPassword, + MasThreepidWriteBuffer, MasUserWriteBuffer, MasWriter, + }, + synapse_reader::{self, ExtractLocalpartError, FullUserId, SynapseThreepid, SynapseUser}, SynapseReader, }; @@ -70,7 +76,7 @@ pub async fn migrate( ) -> Result<(), Error> { let counts = synapse.count_rows().await.into_synapse("counting users")?; - migrate_users( + let migrated_users = migrate_users( synapse, mas, counts @@ -82,9 +88,19 @@ pub async fn migrate( ) .await?; + migrate_threepids( + synapse, + mas, + server_name, + rng, + &migrated_users.user_localparts_to_uuid, + ) + .await?; + Ok(()) } +#[tracing::instrument(skip_all, level = Level::INFO)] async fn migrate_users( synapse: &mut SynapseReader<'_>, mas: &mut MasWriter<'_>, @@ -126,6 +142,65 @@ async fn migrate_users( }) } +#[tracing::instrument(skip_all, level = Level::INFO)] +async fn migrate_threepids( + synapse: &mut SynapseReader<'_>, + mas: &mut MasWriter<'_>, + server_name: &str, + rng: &mut impl RngCore, + user_localparts_to_uuid: &HashMap, +) -> Result<(), Error> { + let mut write_buffer = MasThreepidWriteBuffer::new(mas); + let mut users_stream = pin!(synapse.read_threepids()); + + while let Some(threepid_res) = users_stream.next().await { + let SynapseThreepid { + user_id: synapse_user_id, + medium, + address, + added_at, + } = threepid_res.into_synapse("reading threepid")?; + let created_at: DateTime = added_at.into(); + + let username = synapse_user_id + .extract_localpart(server_name) + .into_extract_localpart(synapse_user_id.clone())? + .to_owned(); + let Some(user_id) = user_localparts_to_uuid.get(username.as_str()).copied() else { + todo!() + }; + + if medium == "email" { + write_buffer + .write_email(MasNewEmailThreepid { + user_id, + user_email_id: Uuid::from(Ulid::from_datetime_with_source( + created_at.into(), + rng, + )), + email: address, + created_at, + }) + .await + .into_mas("writing email")?; + } else { + write_buffer + .write_password(MasNewUnsupportedThreepid { + user_id, + medium, + address, + created_at, + }) + .await + .into_mas("writing unsupported threepid")?; + } + } + + write_buffer.finish().await.into_mas("writing threepids")?; + + Ok(()) +} + fn transform_user( user: &SynapseUser, server_name: &str, diff --git a/crates/syn2mas/src/synapse_reader/checks.rs b/crates/syn2mas/src/synapse_reader/checks.rs index 8e9fc9e5a..71d4375b7 100644 --- a/crates/syn2mas/src/synapse_reader/checks.rs +++ b/crates/syn2mas/src/synapse_reader/checks.rs @@ -5,7 +5,8 @@ //! # Synapse Checks //! -//! This module provides safety checks to run against a Synapse database before running the Synapse-to-MAS migration. +//! This module provides safety checks to run against a Synapse database before +//! running the Synapse-to-MAS migration. use figment::Figment; use mas_config::{ @@ -15,9 +16,8 @@ use mas_config::{ use sqlx::{prelude::FromRow, query_as, query_scalar, PgConnection}; use thiserror::Error; -use crate::mas_writer::MIGRATED_PASSWORD_VERSION; - use super::config::Config; +use crate::mas_writer::MIGRATED_PASSWORD_VERSION; #[derive(Debug, Error)] pub enum Error { @@ -31,7 +31,8 @@ pub enum Error { MasPasswordConfig(#[source] anyhow::Error), } -/// An error found whilst checking the Synapse database, that should block a migration. +/// An error found whilst checking the Synapse database, that should block a +/// migration. #[derive(Debug, Error)] pub enum CheckError { #[error("MAS config is missing a password hashing scheme with version '1'")] @@ -74,8 +75,9 @@ pub enum CheckError { }, } -/// A potential hazard found whilst checking the Synapse database, that should be presented -/// to the operator to check they are aware of a caveat before proceeding with the migration. +/// A potential hazard found whilst checking the Synapse database, that should +/// be presented to the operator to check they are aware of a caveat before +/// proceeding with the migration. #[derive(Debug, Error)] pub enum CheckWarning { #[error("Synapse config contains OIDC auth configuration (issuer: {issuer:?}) which will need to be manually mapped to an upstream OpenID Connect Provider during migration.")] @@ -148,12 +150,14 @@ pub fn synapse_config_check(synapse_config: &Config) -> (Vec, Vec< (warnings, errors) } -/// Check that the given Synapse configuration is sane for migration to a MAS with the given MAS configuration. +/// Check that the given Synapse configuration is sane for migration to a MAS +/// with the given MAS configuration. /// /// # Errors /// /// - If any necessary section of MAS config cannot be parsed. -/// - If the MAS password configuration (including any necessary secrets) can't be loaded. +/// - If the MAS password configuration (including any necessary secrets) can't +/// be loaded. pub async fn synapse_config_check_against_mas_config( synapse: &Config, mas: &Figment, @@ -169,8 +173,9 @@ pub async fn synapse_config_check_against_mas_config( let mas_matrix = MatrixConfig::extract(mas)?; - // Look for the MAS password hashing scheme that will be used for imported Synapse passwords, - // then check the configuration matches so that Synapse passwords will be compatible with MAS. + // Look for the MAS password hashing scheme that will be used for imported + // Synapse passwords, then check the configuration matches so that Synapse + // passwords will be compatible with MAS. if let Some((_, algorithm, _, secret)) = mas_password_schemes .iter() .find(|(version, _, _, _)| *version == MIGRATED_PASSWORD_VERSION) @@ -215,11 +220,13 @@ pub async fn synapse_config_check_against_mas_config( Ok((warnings, errors)) } -/// Check that the Synapse database is sane for migration. Returns a list of warnings and errors. +/// Check that the Synapse database is sane for migration. Returns a list of +/// warnings and errors. /// /// # Errors /// -/// - If there is some database connection error, or the given database is not a Synapse database. +/// - If there is some database connection error, or the given database is not a +/// Synapse database. /// - If the OAuth2 section of the MAS configuration could not be parsed. #[tracing::instrument(skip_all)] pub async fn synapse_database_check( diff --git a/crates/syn2mas/src/synapse_reader/config.rs b/crates/syn2mas/src/synapse_reader/config.rs index 1c99b52ce..0dca5b6e7 100644 --- a/crates/syn2mas/src/synapse_reader/config.rs +++ b/crates/syn2mas/src/synapse_reader/config.rs @@ -11,7 +11,8 @@ use serde::Deserialize; use sqlx::postgres::PgConnectOptions; /// The root of a Synapse configuration. -/// This struct only includes fields which the Synapse-to-MAS migration is interested in. +/// This struct only includes fields which the Synapse-to-MAS migration is +/// interested in. /// /// See: #[derive(Deserialize)] @@ -29,7 +30,8 @@ pub struct Config { #[serde(default)] pub enable_registration_captcha: bool, - /// Normally this defaults to true, but when MAS integration is enabled in Synapse it defaults to false. + /// Normally this defaults to true, but when MAS integration is enabled in + /// Synapse it defaults to false. #[serde(default)] pub enable_3pid_changes: bool, @@ -118,8 +120,9 @@ impl Config { /// See: #[derive(Deserialize)] pub struct DatabaseSection { - /// Expecting `psycopg2` for Postgres or `sqlite3` for `SQLite3`, but may be an arbitrary string and future versions - /// of Synapse may support other database drivers, e.g. psycopg3. + /// Expecting `psycopg2` for Postgres or `sqlite3` for `SQLite3`, but may be + /// an arbitrary string and future versions of Synapse may support other + /// database drivers, e.g. psycopg3. pub name: String, #[serde(default)] pub args: DatabaseArgsSuboption, @@ -133,12 +136,14 @@ pub const SYNAPSE_DATABASE_DRIVER_NAME_SQLITE3: &str = "sqlite3"; impl DatabaseSection { /// Process the configuration into Postgres connection options. /// - /// Environment variables and libpq defaults will be used as fallback for any missing values; - /// this should match what Synapse does. - /// But note that if syn2mas is not run in the same context (host, user, environment variables) - /// as Synapse normally runs, then the connection options may not be valid. + /// Environment variables and libpq defaults will be used as fallback for + /// any missing values; this should match what Synapse does. + /// But note that if syn2mas is not run in the same context (host, user, + /// environment variables) as Synapse normally runs, then the connection + /// options may not be valid. /// - /// Returns `None` if this database configuration is not configured for Postgres. + /// Returns `None` if this database configuration is not configured for + /// Postgres. #[must_use] pub fn to_sqlx_postgres(&self) -> Option { if self.name != SYNAPSE_DATABASE_DRIVER_NAME_PSYCOPG2 { @@ -167,7 +172,8 @@ impl DatabaseSection { } /// The `args` suboption of the `database` section of the Synapse configuration. -/// This struct assumes Postgres is in use and does not represent fields used by SQLite. +/// This struct assumes Postgres is in use and does not represent fields used by +/// SQLite. #[derive(Deserialize, Default)] pub struct DatabaseArgsSuboption { pub user: Option, @@ -199,7 +205,8 @@ impl Default for PasswordSection { } } -/// A section that we only care about whether it's enabled or not, but is not enabled by default. +/// A section that we only care about whether it's enabled or not, but is not +/// enabled by default. #[derive(Default, Deserialize)] pub struct EnableableSection { #[serde(default)] @@ -208,11 +215,12 @@ pub struct EnableableSection { #[derive(Clone, Deserialize)] pub struct OidcProvider { - /// At least for `oidc_config`, if the dict is present but left empty then the config should be ignored, - /// so this field must be optional. + /// At least for `oidc_config`, if the dict is present but left empty then + /// the config should be ignored, so this field must be optional. pub issuer: Option, - /// Required, except for the old `oidc_config` where this is implied to be "oidc". + /// Required, except for the old `oidc_config` where this is implied to be + /// "oidc". pub idp_id: Option, } diff --git a/crates/syn2mas/src/synapse_reader/mod.rs b/crates/syn2mas/src/synapse_reader/mod.rs index d25d822da..4f1ed0af2 100644 --- a/crates/syn2mas/src/synapse_reader/mod.rs +++ b/crates/syn2mas/src/synapse_reader/mod.rs @@ -5,7 +5,8 @@ //! # Synapse Database Reader //! -//! This module provides facilities for streaming relevant types of database records from a Synapse database. +//! This module provides facilities for streaming relevant types of database +//! records from a Synapse database. use chrono::{DateTime, Utc}; use futures_util::{Stream, TryStreamExt}; @@ -46,13 +47,13 @@ pub enum ExtractLocalpartError { } impl FullUserId { - /// Extract the localpart from the User ID, asserting that the User ID has the correct - /// server name. + /// Extract the localpart from the User ID, asserting that the User ID has + /// the correct server name. /// /// # Errors /// - /// A handful of basic validity checks are performed and an error may be returned - /// if the User ID is not valid. + /// A handful of basic validity checks are performed and an error may be + /// returned if the User ID is not valid. /// However, the User ID grammar is not checked fully. /// /// If the wrong server name is asserted, returns an error. @@ -80,8 +81,8 @@ impl FullUserId { } /// A Synapse boolean. -/// Synapse stores booleans as 0 or 1, due to compatibility with old SQLite versions -/// that did not have native boolean support. +/// Synapse stores booleans as 0 or 1, due to compatibility with old SQLite +/// versions that did not have native boolean support. #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct SynapseBool(bool); @@ -107,8 +108,8 @@ impl From for bool { } /// A timestamp stored as the number of seconds since the Unix epoch. -/// Note that Synapse stores MOST timestamps as numbers of **milliseconds** since the Unix epoch. -/// But some timestamps are still stored in seconds. +/// Note that Synapse stores MOST timestamps as numbers of **milliseconds** +/// since the Unix epoch. But some timestamps are still stored in seconds. #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct SecondsTimestamp(DateTime); @@ -122,9 +123,9 @@ impl<'r> sqlx::Decode<'r, Postgres> for SecondsTimestamp { fn decode( value: ::ValueRef<'r>, ) -> Result { - >::decode(value).map(|milliseconds_since_epoch| { + >::decode(value).map(|seconds_since_epoch| { SecondsTimestamp(DateTime::from_timestamp_nanos( - milliseconds_since_epoch * 1_000_000_000, + seconds_since_epoch * 1_000_000_000, )) }) } @@ -136,11 +137,41 @@ impl sqlx::Type for SecondsTimestamp { } } +/// A timestamp stored as the number of milliseconds since the Unix epoch. +/// Note that Synapse stores some timestamps in seconds. +#[derive(Copy, Clone, Debug)] +pub struct MillisecondsTimestamp(DateTime); + +impl From for DateTime { + fn from(MillisecondsTimestamp(value): MillisecondsTimestamp) -> Self { + value + } +} + +impl<'r> sqlx::Decode<'r, Postgres> for MillisecondsTimestamp { + fn decode( + value: ::ValueRef<'r>, + ) -> Result { + >::decode(value).map(|milliseconds_since_epoch| { + MillisecondsTimestamp(DateTime::from_timestamp_nanos( + milliseconds_since_epoch * 1_000_000, + )) + }) + } +} + +impl sqlx::Type for MillisecondsTimestamp { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + #[derive(Clone, Debug, FromRow, PartialEq, Eq, PartialOrd, Ord)] pub struct SynapseUser { /// Full User ID of the user pub name: FullUserId, - /// Password hash string for the user. Optional (null if no password is set). + /// Password hash string for the user. Optional (null if no password is + /// set). pub password_hash: Option, /// Whether the user is a Synapse Admin pub admin: SynapseBool, @@ -153,10 +184,20 @@ pub struct SynapseUser { // TODO do we care about upgrade_ts (users who upgraded from guest accounts to real accounts) } +/// Row of the `user_threepids` table in Synapse. +#[derive(Clone, Debug, FromRow)] +pub struct SynapseThreepid { + pub user_id: FullUserId, + pub medium: String, + pub address: String, + pub added_at: MillisecondsTimestamp, +} + /// List of Synapse tables that we should acquire an `EXCLUSIVE` lock on. /// -/// This is a safety measure against other processes changing the data underneath our feet. -/// It's still not a good idea to run Synapse at the same time as the migration. +/// This is a safety measure against other processes changing the data +/// underneath our feet. It's still not a good idea to run Synapse at the same +/// time as the migration. // TODO not complete! const TABLES_TO_LOCK: &[&str] = &["users"]; @@ -172,14 +213,16 @@ pub struct SynapseReader<'c> { } impl<'conn> SynapseReader<'conn> { - /// Create a new Synapse reader, which entails creating a transaction and locking Synapse tables. + /// Create a new Synapse reader, which entails creating a transaction and + /// locking Synapse tables. /// /// # Errors /// /// Errors are returned under the following circumstances: /// /// - An underlying database error - /// - If we can't lock the Synapse tables (pointing to the fact that Synapse may still be running) + /// - If we can't lock the Synapse tables (pointing to the fact that Synapse + /// may still be running) pub async fn new( synapse_connection: &'conn mut PgConnection, dry_run: bool, @@ -224,7 +267,8 @@ impl<'conn> SynapseReader<'conn> { Ok(()) } - /// Counts the rows in the Synapse database to get an estimate of how large the migration is going to be. + /// Counts the rows in the Synapse database to get an estimate of how large + /// the migration is going to be. /// /// # Errors /// @@ -247,7 +291,8 @@ impl<'conn> SynapseReader<'conn> { Ok(SynapseRowCounts { users }) } - /// Reads Synapse users, excluding application service users (which do not need to be migrated), from the database. + /// Reads Synapse users, excluding application service users (which do not + /// need to be migrated), from the database. pub fn read_users(&mut self) -> impl Stream> + '_ { sqlx::query_as( " @@ -260,6 +305,20 @@ impl<'conn> SynapseReader<'conn> { .fetch(&mut *self.txn) .map_err(|err| err.into_database("reading Synapse users")) } + + /// Reads threepids (such as e-mail and phone number associations) from + /// Synapse. + pub fn read_threepids(&mut self) -> impl Stream> + '_ { + sqlx::query_as( + " + SELECT + user_id, medium, address, added_at + FROM user_threepids + ", + ) + .fetch(&mut *self.txn) + .map_err(|err| err.into_database("reading Synapse threepids")) + } } #[cfg(test)]