Skip to content

Commit

Permalink
SyncV2 with Share server
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPenner committed Jan 15, 2025
1 parent d95114b commit 33c5981
Show file tree
Hide file tree
Showing 17 changed files with 336 additions and 68 deletions.
92 changes: 60 additions & 32 deletions codebase2/codebase-sqlite/U/Codebase/Sqlite/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ module U.Codebase.Sqlite.Queries
expectEntity,
syncToTempEntity,
insertTempEntity,
insertTempEntityV2,
saveTempEntityInMain,
expectTempEntity,
deleteTempEntity,
Expand Down Expand Up @@ -315,6 +316,7 @@ import Data.Map.NonEmpty qualified as NEMap
import Data.Maybe qualified as Maybe
import Data.Sequence qualified as Seq
import Data.Set qualified as Set
import Data.Set.NonEmpty (NESet)
import Data.Text qualified as Text
import Data.Text.Encoding qualified as Text
import Data.Text.Lazy qualified as Text.Lazy
Expand Down Expand Up @@ -532,23 +534,18 @@ countWatches = queryOneCol [sql| SELECT COUNT(*) FROM watch |]

saveHash :: Hash32 -> Transaction HashId
saveHash hash = do
execute
[sql|
INSERT INTO hash (base32) VALUES (:hash)
ON CONFLICT DO NOTHING
|]
expectHashId hash
loadHashId hash >>= \case
Just h -> pure h
Nothing -> do
queryOneCol
[sql|
INSERT INTO hash (base32) VALUES (:hash)
RETURNING id
|]

saveHashes :: Traversable f => f Hash32 -> Transaction (f HashId)
saveHashes hashes = do
for_ hashes \hash ->
execute
[sql|
INSERT INTO hash (base32)
VALUES (:hash)
ON CONFLICT DO NOTHING
|]
traverse expectHashId hashes
for hashes saveHash

saveHashHash :: Hash -> Transaction HashId
saveHashHash = saveHash . Hash32.fromHash
Expand Down Expand Up @@ -623,13 +620,15 @@ expectBranchHashForCausalHash ch = do

saveText :: Text -> Transaction TextId
saveText t = do
execute
[sql|
INSERT INTO text (text)
VALUES (:t)
ON CONFLICT DO NOTHING
|]
expectTextId t
loadTextId t >>= \case
Just h -> pure h
Nothing -> do
queryOneCol
[sql|
INSERT INTO text (text)
VALUES (:t)
RETURNING id
|]

saveTexts :: Traversable f => f Text -> Transaction (f TextId)
saveTexts =
Expand Down Expand Up @@ -686,7 +685,7 @@ saveObject ::
ObjectType ->
ByteString ->
Transaction ObjectId
saveObject hh h t blob = do
saveObject _hh h t blob = do
execute
[sql|
INSERT INTO object (primary_hash_id, type_id, bytes)
Expand All @@ -697,9 +696,9 @@ saveObject hh h t blob = do
saveHashObject h oId 2 -- todo: remove this from here, and add it to other relevant places once there are v1 and v2 hashes
rowsModified >>= \case
0 -> pure ()
_ -> do
hash <- expectHash32 h
tryMoveTempEntityDependents hh hash
_ -> pure ()
-- hash <- expectHash32 h
-- tryMoveTempEntityDependents hh hash
pure oId

expectObject :: SqliteExceptionReason e => ObjectId -> (ByteString -> Either e a) -> Transaction a
Expand Down Expand Up @@ -957,7 +956,7 @@ saveCausal ::
BranchHashId ->
[CausalHashId] ->
Transaction ()
saveCausal hh self value parents = do
saveCausal _hh self value parents = do
execute
[sql|
INSERT INTO causal (self_hash_id, value_hash_id)
Expand All @@ -973,27 +972,27 @@ saveCausal hh self value parents = do
INSERT INTO causal_parent (causal_id, parent_id)
VALUES (:self, :parent)
|]
flushCausalDependents hh self
-- flushCausalDependents hh self

flushCausalDependents ::
_flushCausalDependents ::
HashHandle ->
CausalHashId ->
Transaction ()
flushCausalDependents hh chId = do
_flushCausalDependents hh chId = do
hash <- expectHash32 (unCausalHashId chId)
tryMoveTempEntityDependents hh hash
_tryMoveTempEntityDependents hh hash

-- | `tryMoveTempEntityDependents #foo` does this:
-- 0. Precondition: We just inserted object #foo.
-- 1. Look up the dependents of #foo
-- 2. Delete #foo as dependency from temp_entity_missing_dependency. e.g. (#bar, #foo), (#baz, #foo)
-- 3. For each like #bar and #baz with no more rows in temp_entity_missing_dependency,
-- insert_entity them.
tryMoveTempEntityDependents ::
_tryMoveTempEntityDependents ::
HashHandle ->
Hash32 ->
Transaction ()
tryMoveTempEntityDependents hh dependency = do
_tryMoveTempEntityDependents hh dependency = do
dependents <-
queryListCol
[sql|
Expand Down Expand Up @@ -2993,6 +2992,35 @@ insertTempEntity entityHash entity missingDependencies = do
entityType =
Entity.entityType entity

-- | Insert a new `temp_entity` row, and its associated 1+ `temp_entity_missing_dependency` rows.
--
-- Preconditions:
-- 1. The entity does not already exist in "main" storage (`object` / `causal`)
-- 2. The entity does not already exist in `temp_entity`.
insertTempEntityV2 :: Hash32 -> TempEntity -> NESet Hash32 -> Transaction ()
insertTempEntityV2 entityHash entity missingDependencies = do
execute
[sql|
INSERT INTO temp_entity (hash, blob, type_id)
VALUES (:entityHash, :entityBlob, :entityType)
ON CONFLICT DO NOTHING
|]

for_ missingDependencies \depHash ->
execute
[sql|
INSERT INTO temp_entity_missing_dependency (dependent, dependency)
VALUES (:entityHash, :depHash)
|]
where
entityBlob :: ByteString
entityBlob =
runPutS (Serialization.putTempEntity entity)

entityType :: TempEntityType
entityType =
Entity.entityType entity

-- | Delete a row from the `temp_entity` table, if it exists.
deleteTempEntity :: Hash32 -> Transaction ()
deleteTempEntity hash =
Expand Down
3 changes: 2 additions & 1 deletion codebase2/codebase-sqlite/sql/001-temp-entity-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ create table if not exists temp_entity (
create table if not exists temp_entity_missing_dependency (
dependent text not null references temp_entity(hash),
dependency text not null,
dependencyJwt text not null,
-- TODO: this is just for testing
dependencyJwt text null,
unique (dependent, dependency)
);
create index if not exists temp_entity_missing_dependency_ix_dependent on temp_entity_missing_dependency (dependent);
Expand Down
1 change: 1 addition & 0 deletions lib/unison-sqlite/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ library:

dependencies:
- base
- containers
- direct-sqlite
- megaparsec
- pretty-simple
Expand Down
62 changes: 47 additions & 15 deletions lib/unison-sqlite/src/Unison/Sqlite/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ module Unison.Sqlite.Connection
)
where

import Data.Map qualified as Map
import Database.SQLite.Simple qualified as Sqlite
import Database.SQLite.Simple.FromField qualified as Sqlite
import Database.SQLite3 qualified as Direct.Sqlite
Expand All @@ -71,7 +72,10 @@ import Unison.Sqlite.Connection.Internal (Connection (..))
import Unison.Sqlite.Exception
import Unison.Sqlite.Sql (Sql (..))
import Unison.Sqlite.Sql qualified as Sql
import UnliftIO (atomically)
import UnliftIO.Exception
import UnliftIO.STM (readTVar)
import UnliftIO.STM qualified as STM

-- | Perform an action with a connection to a SQLite database.
--
Expand Down Expand Up @@ -103,19 +107,47 @@ openConnection name file = do
Just "" -> file
_ -> "file:" <> file <> "?mode=ro"
conn0 <- Sqlite.open sqliteURI `catch` rethrowAsSqliteConnectException name file
let conn = Connection {conn = conn0, file, name}
statementCache <- STM.newTVarIO Map.empty
let conn = Connection {conn = conn0, file, name, statementCache}
execute conn [Sql.sql| PRAGMA foreign_keys = ON |]
execute conn [Sql.sql| PRAGMA busy_timeout = 60000 |]
execute conn [Sql.sql| PRAGMA synchronous = normal |]
execute conn [Sql.sql| PRAGMA journal_size_limit = 6144000 |]
execute conn [Sql.sql| PRAGMA cache_size = -64000 |]
execute conn [Sql.sql| PRAGMA temp_store = 2 |]

pure conn

-- Close a connection opened with 'openConnection'.
closeConnection :: Connection -> IO ()
closeConnection (Connection _ _ conn) =
closeConnection conn@(Connection {conn = conn0}) = do
-- FIXME if this throws an exception, it won't be under `SomeSqliteException`
-- Possible fixes:
-- 1. Add close exception to the hierarchy, e.g. `SqliteCloseException`
-- 2. Always ignore exceptions thrown by `close` (Mitchell prefers this one)
Sqlite.close conn
closeAllStatements conn
Sqlite.close conn0

withStatement :: Connection -> Text -> (Sqlite.Statement -> IO a) -> IO a
withStatement conn sql action = do
bracket (prepareStatement conn sql) Sqlite.reset action
where
prepareStatement :: Connection -> Text -> IO Sqlite.Statement
prepareStatement Connection {conn, statementCache} sql = do
cached <- atomically $ do
cache <- STM.readTVar statementCache
pure $ Map.lookup sql cache
case cached of
Just stmt -> pure stmt
Nothing -> do
stmt <- Sqlite.openStatement conn (coerce @Text @Sqlite.Query sql)
atomically $ STM.modifyTVar statementCache (Map.insert sql stmt)
pure stmt

closeAllStatements :: Connection -> IO ()
closeAllStatements Connection {statementCache} = do
cache <- atomically $ readTVar statementCache
for_ cache Sqlite.closeStatement

-- An internal type, for making prettier debug logs

Expand Down Expand Up @@ -152,7 +184,7 @@ logQuery (Sql sql params) result =
-- Without results

execute :: (HasCallStack) => Connection -> Sql -> IO ()
execute conn@(Connection _ _ conn0) sql@(Sql s params) = do
execute conn sql@(Sql s params) = do
logQuery sql Nothing
doExecute `catch` \(exception :: Sqlite.SQLError) ->
throwSqliteQueryException
Expand All @@ -163,16 +195,16 @@ execute conn@(Connection _ _ conn0) sql@(Sql s params) = do
}
where
doExecute :: IO ()
doExecute =
Sqlite.withStatement conn0 (coerce s) \(Sqlite.Statement statement) -> do
bindParameters statement params
void (Direct.Sqlite.step statement)
doExecute = do
withStatement conn s \statement -> do
bindParameters (coerce statement) params
void (Direct.Sqlite.step $ coerce statement)

-- | Execute one or more semicolon-delimited statements.
--
-- This function does not support parameters, and is mostly useful for executing DDL and migrations.
executeStatements :: (HasCallStack) => Connection -> Text -> IO ()
executeStatements conn@(Connection _ _ (Sqlite.Connection database _tempNameCounter)) sql = do
executeStatements conn@(Connection {conn = Sqlite.Connection database _tempNameCounter}) sql = do
logQuery (Sql sql []) Nothing
Direct.Sqlite.exec database sql `catch` \(exception :: Sqlite.SQLError) ->
throwSqliteQueryException
Expand All @@ -185,7 +217,7 @@ executeStatements conn@(Connection _ _ (Sqlite.Connection database _tempNameCoun
-- With results, without checks

queryStreamRow :: (HasCallStack, Sqlite.FromRow a) => Connection -> Sql -> (IO (Maybe a) -> IO r) -> IO r
queryStreamRow conn@(Connection _ _ conn0) sql@(Sql s params) callback =
queryStreamRow conn sql@(Sql s params) callback =
run `catch` \(exception :: Sqlite.SQLError) ->
throwSqliteQueryException
SqliteQueryExceptionInfo
Expand All @@ -194,8 +226,8 @@ queryStreamRow conn@(Connection _ _ conn0) sql@(Sql s params) callback =
sql
}
where
run =
bracket (Sqlite.openStatement conn0 (coerce s)) Sqlite.closeStatement \statement -> do
run = do
withStatement conn s \statement -> do
Sqlite.bind statement params
callback (Sqlite.nextRow statement)

Expand All @@ -213,7 +245,7 @@ queryStreamCol =
queryStreamRow

queryListRow :: forall a. (Sqlite.FromRow a, HasCallStack) => Connection -> Sql -> IO [a]
queryListRow conn@(Connection _ _ conn0) sql@(Sql s params) = do
queryListRow conn sql@(Sql s params) = do
result <-
doQuery
`catch` \(exception :: Sqlite.SQLError) ->
Expand All @@ -228,7 +260,7 @@ queryListRow conn@(Connection _ _ conn0) sql@(Sql s params) = do
where
doQuery :: IO [a]
doQuery =
Sqlite.withStatement conn0 (coerce s) \statement -> do
withStatement conn (coerce s) \statement -> do
bindParameters (coerce statement) params
let loop :: [a] -> IO [a]
loop rows =
Expand Down Expand Up @@ -347,7 +379,7 @@ queryOneColCheck conn s check =
-- Rows modified

rowsModified :: Connection -> IO Int
rowsModified (Connection _ _ conn) =
rowsModified (Connection {conn}) =
Sqlite.changes conn

-- Vacuum
Expand Down
8 changes: 6 additions & 2 deletions lib/unison-sqlite/src/Unison/Sqlite/Connection/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ module Unison.Sqlite.Connection.Internal
)
where

import Data.Map (Map)
import Data.Text (Text)
import Database.SQLite.Simple qualified as Sqlite
import UnliftIO.STM (TVar)

-- | A /non-thread safe/ connection to a SQLite database.
data Connection = Connection
{ name :: String,
file :: FilePath,
conn :: Sqlite.Connection
conn :: Sqlite.Connection,
statementCache :: TVar (Map Text Sqlite.Statement)
}

instance Show Connection where
show (Connection name file _conn) =
show (Connection name file _conn _statementCache) =
"Connection { name = " ++ show name ++ ", file = " ++ show file ++ " }"
3 changes: 2 additions & 1 deletion lib/unison-sqlite/unison-sqlite.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: 1.12

-- This file has been generated from package.yaml by hpack version 0.36.0.
-- This file has been generated from package.yaml by hpack version 0.37.0.
--
-- see: https://github.com/sol/hpack

Expand Down Expand Up @@ -64,6 +64,7 @@ library
ghc-options: -Wall
build-depends:
base
, containers
, direct-sqlite
, megaparsec
, pretty-simple
Expand Down
Loading

0 comments on commit 33c5981

Please sign in to comment.