Skip to content

Commit

Permalink
postgres: db interfaces wip (sqlite passes) (#1419)
Browse files Browse the repository at this point in the history
  • Loading branch information
spaced4ndy authored Dec 12, 2024
1 parent 07be469 commit 6414959
Show file tree
Hide file tree
Showing 26 changed files with 3,623 additions and 3,287 deletions.
99 changes: 58 additions & 41 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -96,47 +96,11 @@ library
Simplex.Messaging.Agent.RetryInterval
Simplex.Messaging.Agent.Stats
Simplex.Messaging.Agent.Store
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20241210_initial
Simplex.Messaging.Agent.Store.SQLite
Simplex.Messaging.Agent.Store.SQLite.Common
Simplex.Messaging.Agent.Store.SQLite.DB
Simplex.Messaging.Agent.Store.SQLite.Migrations
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220101_initial
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220301_snd_queue_keys
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220322_notifications
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220608_v2
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220625_v2_ntf_mode
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220811_onion_hosts
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220905_commands
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230110_users
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230510_files_pending_replicas_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230516_encrypted_rcv_message_hashes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230531_switch_status
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230701_delivery_receipts
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230720_delete_expired_messages
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230722_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230814_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240121_message_delivery_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240124_file_redirect
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240223_connections_wait_delivery
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240225_ratchet_kem
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240624_snd_secure
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240702_servers_stats
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240930_ntf_tokens_to_delete
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20241007_rcv_queues_last_broker_ts
Simplex.Messaging.Agent.Store.AgentStore
Simplex.Messaging.Agent.Store.Common
Simplex.Messaging.Agent.Store.DB
Simplex.Messaging.Agent.Store.Migrations
Simplex.Messaging.Agent.Store.Shared
Simplex.Messaging.Agent.TRcvQueues
Simplex.Messaging.Client
Simplex.Messaging.Client.Agent
Expand Down Expand Up @@ -183,6 +147,55 @@ library
Simplex.RemoteControl.Discovery.Multicast
Simplex.RemoteControl.Invitation
Simplex.RemoteControl.Types
if flag(client_postgres)
exposed-modules:
Simplex.Messaging.Agent.Store.Postgres
Simplex.Messaging.Agent.Store.Postgres.Common
Simplex.Messaging.Agent.Store.Postgres.DB
Simplex.Messaging.Agent.Store.Postgres.Migrations
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20241210_initial
else
exposed-modules:
Simplex.Messaging.Agent.Store.SQLite
Simplex.Messaging.Agent.Store.SQLite.Common
Simplex.Messaging.Agent.Store.SQLite.DB
Simplex.Messaging.Agent.Store.SQLite.Migrations
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220101_initial
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220301_snd_queue_keys
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220322_notifications
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220608_v2
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220625_v2_ntf_mode
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220811_onion_hosts
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220905_commands
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220915_connection_queues
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230110_users
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230117_fkey_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230120_delete_errors
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230217_server_key_hash
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230223_files
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230320_retry_state
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230510_files_pending_replicas_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230516_encrypted_rcv_message_hashes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230531_switch_status
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230701_delivery_receipts
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230720_delete_expired_messages
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230722_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230814_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240121_message_delivery_indexes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240124_file_redirect
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240223_connections_wait_delivery
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240225_ratchet_kem
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240417_rcv_files_approved_relays
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240624_snd_secure
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240702_servers_stats
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240930_ntf_tokens_to_delete
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20241007_rcv_queues_last_broker_ts
if !flag(client_library)
exposed-modules:
Simplex.FileTransfer.Server
Expand Down Expand Up @@ -289,6 +302,8 @@ library
if flag(client_postgres)
build-depends:
postgresql-simple ==0.6.*
, raw-strings-qq ==1.1.*
cpp-options: -DdbPostgres
if impl(ghc >= 9.6.2)
build-depends:
bytestring ==0.11.*
Expand Down Expand Up @@ -481,3 +496,5 @@ test-suite simplexmq-test
if flag(client_postgres)
build-depends:
postgresql-simple ==0.6.*
, raw-strings-qq ==1.1.*
cpp-options: -DdbPostgres
4 changes: 2 additions & 2 deletions src/Simplex/FileTransfer/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import Simplex.Messaging.Agent.Store.AgentStore
import qualified Simplex.Messaging.Agent.Store.DB as DB
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs)
import qualified Simplex.Messaging.Crypto.File as CF
Expand Down
16 changes: 9 additions & 7 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Agent.Store.AgentStore
import Simplex.Messaging.Agent.Store.Common (DBStore)
import qualified Simplex.Messaging.Agent.Store.DB as DB
import qualified Simplex.Messaging.Agent.Store.Migrations as Migrations
import Simplex.Messaging.Agent.Store.Shared (UpMigration (..), upMigration)
import Simplex.Messaging.Client (SMPClientError, ServerTransmission (..), ServerTransmissionBatch, temporaryClientError, unexpectedResponse)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs)
Expand Down Expand Up @@ -200,11 +202,11 @@ import UnliftIO.STM
type AE a = ExceptT AgentErrorType IO a

-- | Creates an SMP agent client instance
getSMPAgentClient :: AgentConfig -> InitialAgentServers -> SQLiteStore -> Bool -> IO AgentClient
getSMPAgentClient :: AgentConfig -> InitialAgentServers -> DBStore -> Bool -> IO AgentClient
getSMPAgentClient = getSMPAgentClient_ 1
{-# INLINE getSMPAgentClient #-}

getSMPAgentClient_ :: Int -> AgentConfig -> InitialAgentServers -> SQLiteStore -> Bool -> IO AgentClient
getSMPAgentClient_ :: Int -> AgentConfig -> InitialAgentServers -> DBStore -> Bool -> IO AgentClient
getSMPAgentClient_ clientId cfg initServers@InitialAgentServers {smp, xftp} store backgroundMode =
newSMPAgentEnv cfg store >>= runReaderT runAgent
where
Expand Down Expand Up @@ -277,7 +279,7 @@ disposeAgentClient c@AgentClient {acThread, agentEnv = Env {store}} = do
t_ <- atomically (swapTVar acThread Nothing) $>>= (liftIO . deRefWeak)
disconnectAgentClient c
mapM_ killThread t_
liftIO $ closeSQLiteStore store
liftIO $ closeStore store

resumeAgentClient :: AgentClient -> IO ()
resumeAgentClient c = atomically $ writeTVar (active c) True
Expand Down Expand Up @@ -2154,7 +2156,7 @@ execAgentStoreSQL :: AgentClient -> Text -> AE [Text]
execAgentStoreSQL c sql = withAgentEnv c $ withStore' c (`execSQL` sql)

getAgentMigrations :: AgentClient -> AE [UpMigration]
getAgentMigrations c = withAgentEnv c $ map upMigration <$> withStore' c (Migrations.getCurrent . DB.conn)
getAgentMigrations c = withAgentEnv c $ map upMigration <$> withStore' c Migrations.getCurrent

debugAgentLocks :: AgentClient -> IO AgentLocks
debugAgentLocks AgentClient {connLocks = cs, invLocks = is, deleteLock = d} = do
Expand Down
12 changes: 6 additions & 6 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction)
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction)
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues (getRcvQueues))
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
import Simplex.Messaging.Client
Expand Down Expand Up @@ -555,7 +555,7 @@ slowNetworkConfig cfg@NetworkConfig {tcpConnectTimeout, tcpTimeout, tcpTimeoutPe
slow :: Integral a => a -> a
slow t = (t * 3) `div` 2

agentClientStore :: AgentClient -> SQLiteStore
agentClientStore :: AgentClient -> DBStore
agentClientStore AgentClient {agentEnv = Env {store}} = store
{-# INLINE agentClientStore #-}

Expand Down Expand Up @@ -1649,7 +1649,7 @@ disableQueuesNtfs = sendTSessionBatches "NDEL" snd disableQueues_
sendAck :: AgentClient -> RcvQueue -> MsgId -> AM ()
sendAck c rq@RcvQueue {rcvId, rcvPrivateKey} msgId =
withSMPClient c rq ("ACK:" <> logSecret' msgId) $ \smp ->
ackSMPMessage smp rcvPrivateKey rcvId msgId
ackSMPMessage smp rcvPrivateKey rcvId msgId

hasGetLock :: AgentClient -> RcvQueue -> IO Bool
hasGetLock c RcvQueue {server, rcvId} =
Expand Down Expand Up @@ -2044,7 +2044,7 @@ pickServer = \case
getNextServer ::
(ProtocolTypeI p, UserProtocol p) =>
AgentClient ->
UserId ->
UserId ->
(UserServers p -> NonEmpty (Maybe OperatorId, ProtoServerWithAuth p)) ->
[ProtocolServer p] ->
AM (ProtoServerWithAuth p)
Expand Down Expand Up @@ -2097,7 +2097,7 @@ withNextSrv ::
UserId ->
(UserServers p -> NonEmpty (Maybe OperatorId, ProtoServerWithAuth p)) ->
TVar (Set TransportHost) ->
[ProtocolServer p] ->
[ProtocolServer p] ->
(ProtoServerWithAuth p -> AM a) ->
AM a
withNextSrv c userId srvsSel triedHosts usedSrvs action = do
Expand Down
25 changes: 18 additions & 7 deletions src/Simplex/Messaging/Agent/Env/SQLite.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
Expand Down Expand Up @@ -52,7 +53,6 @@ import Control.Monad.Reader
import Crypto.Random
import Data.Aeson (FromJSON (..), ToJSON (..))
import qualified Data.Aeson.TH as JQ
import Data.ByteArray (ScrubbedBytes)
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
Expand All @@ -68,8 +68,9 @@ import Numeric.Natural
import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConfig)
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Agent.Store (createStore)
import Simplex.Messaging.Agent.Store.Common (DBStore)
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..), MigrationError (..))
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (VersionRangeE2E, supportedE2EEncryptVRange)
Expand All @@ -86,6 +87,10 @@ import Simplex.Messaging.Util (allFinally, catchAllErrors, catchAllErrors', tryA
import System.Mem.Weak (Weak)
import System.Random (StdGen, newStdGen)
import UnliftIO.STM
#if defined(dbPostgres)
#else
import Data.ByteArray (ScrubbedBytes)
#endif

type AM' a = ReaderT Env IO a

Expand Down Expand Up @@ -254,15 +259,15 @@ defaultAgentConfig =

data Env = Env
{ config :: AgentConfig,
store :: SQLiteStore,
store :: DBStore,
random :: TVar ChaChaDRG,
randomServer :: TVar StdGen,
ntfSupervisor :: NtfSupervisor,
xftpAgent :: XFTPAgent,
multicastSubscribers :: TMVar Int
}

newSMPAgentEnv :: AgentConfig -> SQLiteStore -> IO Env
newSMPAgentEnv :: AgentConfig -> DBStore -> IO Env
newSMPAgentEnv config store = do
random <- C.newRandom
randomServer <- newTVarIO =<< liftIO newStdGen
Expand All @@ -271,8 +276,14 @@ newSMPAgentEnv config store = do
multicastSubscribers <- newTMVarIO 0
pure Env {config, store, random, randomServer, ntfSupervisor, xftpAgent, multicastSubscribers}

createAgentStore :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> IO (Either MigrationError SQLiteStore)
createAgentStore dbFilePath dbKey keepKey = createSQLiteStore dbFilePath dbKey keepKey Migrations.app
#if defined(dbPostgres)
-- TODO [postgres] pass db name / ConnectInfo?
createAgentStore :: MigrationConfirmation -> IO (Either MigrationError DBStore)
createAgentStore = createStore
#else
createAgentStore :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createAgentStore = createStore
#endif

data NtfSupervisor = NtfSupervisor
{ ntfTkn :: TVar (Maybe NtfToken),
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Agent/NtfSubSupervisor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import Simplex.Messaging.Agent.Store.AgentStore
import qualified Simplex.Messaging.Agent.Store.DB as DB
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Types
Expand Down
28 changes: 27 additions & 1 deletion src/Simplex/Messaging/Agent/Store.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
Expand Down Expand Up @@ -25,10 +26,15 @@ import Data.List (find)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Maybe (isJust)
import Data.Text (Text)
import Data.Time (UTCTime)
import Data.Type.Equality
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval (RI2State)
import Simplex.Messaging.Agent.Store.Common
import qualified Simplex.Messaging.Agent.Store.DB as DB
import qualified Simplex.Messaging.Agent.Store.Migrations as Migrations
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..), MigrationError (..))
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport, RatchetX448)
import Simplex.Messaging.Encoding.String
Expand All @@ -42,12 +48,32 @@ import Simplex.Messaging.Protocol
RcvDhSecret,
RcvNtfDhSecret,
RcvPrivateAuthKey,
SenderCanSecure,
SndPrivateAuthKey,
SndPublicAuthKey,
SenderCanSecure,
VersionSMPC,
)
import qualified Simplex.Messaging.Protocol as SMP
#if defined(dbPostgres)
import qualified Simplex.Messaging.Agent.Store.Postgres as StoreFunctions
#else
import qualified Simplex.Messaging.Agent.Store.SQLite as StoreFunctions
import Data.ByteArray (ScrubbedBytes)
#endif

#if defined(dbPostgres)
createStore :: MigrationConfirmation -> IO (Either MigrationError DBStore)
createStore = StoreFunctions.createDBStore Migrations.app
#else
createStore :: FilePath -> ScrubbedBytes -> Bool -> MigrationConfirmation -> IO (Either MigrationError DBStore)
createStore dbFilePath dbKey keepKey = StoreFunctions.createDBStore dbFilePath dbKey keepKey Migrations.app
#endif

closeStore :: DBStore -> IO ()
closeStore = StoreFunctions.closeDBStore

execSQL :: DB.Connection -> Text -> IO [Text]
execSQL = StoreFunctions.execSQL

-- * Queue types

Expand Down
Loading

0 comments on commit 6414959

Please sign in to comment.