Skip to content

Commit

Permalink
cleanup and a couple of bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Apr 6, 2023
1 parent 242bea7 commit 66e70c3
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 186 deletions.
2 changes: 1 addition & 1 deletion build-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ goarc=( amd64 arm64 amd64 amd64 )
buildCount=${#goplat[@]}

# Supported database tags
dbadapters=( mysql mongodb rethinkdb )
dbadapters=( mysql mongodb rethinkdb postgres )
dbtags=( ${dbadapters[@]} alldbs )

for line in $@; do
Expand Down
4 changes: 2 additions & 2 deletions docker-build.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

# Build Tinode docker linux/amd64 images.
# Build Tinode docker linux/amd64 images.
# You may have to install buildx https://docs.docker.com/buildx/working-with-buildx/
# if your build host and target architectures are different (e.g. building on a Mac
# with Apple silicon).
Expand Down Expand Up @@ -30,7 +30,7 @@ if [ `uname -m` != 'x86_64' ]; then
buildcmd='buildx build --platform=linux/amd64'
fi

dbtags=( mysql mongodb rethinkdb alldbs )
dbtags=( mysql mongodb rethinkdb postgres alldbs )

# Build an images for various DB backends
for dbtag in "${dbtags[@]}"
Expand Down
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ require (
github.com/google/go-cmp v0.5.9
github.com/gorilla/handlers v1.5.1
github.com/gorilla/websocket v1.5.0
github.com/jackc/pgconn v1.14.0
github.com/jackc/pgx/v4 v4.18.1
github.com/jmoiron/sqlx v1.3.5
github.com/nyaruka/phonenumbers v1.1.6
github.com/prometheus/client_golang v1.14.0
Expand Down Expand Up @@ -44,6 +46,13 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
Expand Down
133 changes: 132 additions & 1 deletion go.sum

Large diffs are not rendered by default.

225 changes: 46 additions & 179 deletions server/db/postgres/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
defaultDSN = "postgresql://postgres:postgres@localhost:5432/tinode?sslmode=disable&connect_timeout=10"
defaultDatabase = "tinode"

adpVersion = 112
adpVersion = 113
adapterName = "postgres"

defaultMaxResults = 1024
Expand All @@ -64,11 +64,11 @@ const (
type configType struct {
// DB connection settings:
// Using fields
User string `json:"User,omitempty"`
Passwd string `json:"Passwd,omitempty"`
Host string `json:"Host,omitempty"`
Port string `json:"Port,omitempty"`
DBName string `json:"DBName,omitempty"`
User string `json:"user,omitempty"`
Passwd string `json:"passwd,omitempty"`
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
DBName string `json:"dbname,omitempty"`
// Deprecated.
DSN string `json:"dsn,omitempty"`
Database string `json:"database,omitempty"`
Expand Down Expand Up @@ -190,13 +190,12 @@ func (a *adapter) Open(jsonconfig json.RawMessage) error {

// Close closes the underlying database connection
func (a *adapter) Close() error {
var err error
if a.db != nil {
a.db.Close()
a.db = nil
a.version = -1
}
return err
return nil
}

// IsOpen returns true if connection to database has been established. It does not check if
Expand Down Expand Up @@ -354,7 +353,8 @@ func (a *adapter) CreateDb(reset bool) error {
tags JSON,
PRIMARY KEY(id)
);
CREATE INDEX users_state_stateat ON users(state, stateat);`); err != nil {
CREATE INDEX users_state_stateat ON users(state, stateat);
CREATE INDEX users_lastseen_updatedat ON users(lastseen, updatedat);`); err != nil {
return err
}

Expand Down Expand Up @@ -486,8 +486,8 @@ func (a *adapter) CreateDb(reset bool) error {
delid INT DEFAULT 0,
seqid INT NOT NULL,
topic VARCHAR(25) NOT NULL,
"from" BIGINT NOT NULL,
head JSON,
"from" BIGINT NOT NULL,
head JSON,
content JSON,
PRIMARY KEY(id),
FOREIGN KEY(topic) REFERENCES topics(name)
Expand Down Expand Up @@ -557,12 +557,12 @@ func (a *adapter) CreateDb(reset bool) error {
// Links between uploaded files and the topics, users or messages they are attached to.
if _, err = tx.Exec(ctx,
`CREATE TABLE filepgglinks(
id SERIAL NOT NULL,
createdat TIMESTAMP(3) NOT NULL,
fileid BIGINT NOT NULL,
pggid INT,
topic VARCHAR(25),
userid BIGINT,
id SERIAL NOT NULL,
createdat TIMESTAMP(3) NOT NULL,
fileid BIGINT NOT NULL,
pggid INT,
topic VARCHAR(25),
userid BIGINT,
PRIMARY KEY(id),
FOREIGN KEY(fileid) REFERENCES fileuploads(id) ON DELETE CASCADE,
FOREIGN KEY(pggid) REFERENCES messages(id) ON DELETE CASCADE,
Expand All @@ -574,10 +574,12 @@ func (a *adapter) CreateDb(reset bool) error {

if _, err = tx.Exec(ctx,
`CREATE TABLE kvmeta(
"key" VARCHAR(32),
"value" TEXT,
"key" VARCHAR(64) NOT NULL,
createdat TIMESTAMP(3),
"value" TEXT,
PRIMARY KEY("key")
);`); err != nil {
);
CREATE INDEX kvmeta_createdat_key ON kvmeta(createdat, "key");`); err != nil {
return err
}
if _, err = tx.Exec(ctx, `INSERT INTO kvmeta("key", "value") VALUES($1, $2)`, "version", strconv.Itoa(adpVersion)); err != nil {
Expand All @@ -597,172 +599,43 @@ func (a *adapter) UpgradeDb() error {
return err
}

ctx, cancel := a.getContext()
if cancel != nil {
defer cancel()
}

if _, err := a.GetDbVersion(); err != nil {
return err
}

if a.version == 106 {
// Perform database upgrade from version 106 to version 107.

if _, err := a.db.Exec(ctx, "CREATE UNIQUE INDEX usertags_userid_tag ON usertags(userid, tag)"); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "CREATE UNIQUE INDEX topictags_userid_tag ON topictags(topic, tag)"); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE credentials ADD deletedat TIMESTAMP(3) AFTER updatedat"); err != nil {
return err
}

if err := bumpVersion(a, 107); err != nil {
return err
}
}

if a.version == 107 {
// Perform database upgrade from version 107 to version 108.

// Replace default user access JRWPA with JRWPAS.
if _, err := a.db.Exec(ctx, `UPDATE users SET access=JSON_REPLACE(access, '$.Auth', 'JRWPAS')
WHERE CAST(JSON_EXTRACT(access, '$.Auth') AS CHAR) LIKE '"JRWPA"'`); err != nil {
return err
}

if err := bumpVersion(a, 108); err != nil {
return err
}
}

if a.version == 108 {
// Perform database upgrade from version 108 to version 109.

tx, err := a.db.Begin(ctx)
if err != nil {
return err
}
if err = createSystemTopic(tx); err != nil {
tx.Rollback(ctx)
return err
}
if err = tx.Commit(ctx); err != nil {
return err
}

if err = bumpVersion(a, 109); err != nil {
return err
}
}

if a.version == 109 {
// Perform database upgrade from version 109 to version 110.
if _, err := a.db.Exec(ctx, "UPDATE topics SET touchedat=updatedat WHERE touchedat IS NULL"); err != nil {
return err
}

if err := bumpVersion(a, 110); err != nil {
return err
}
}

if a.version == 110 {
// Users
if _, err := a.db.Exec(ctx, "ALTER TABLE users MODIFY state SMALLINT NOT NULL DEFAULT 0 AFTER updatedat"); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE users CHANGE deletedat stateat TIMESTAMP(3)"); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE users DROP INDEX users_deletedat"); err != nil {
return err
}

// Add status to formerly soft-deleted users.
if _, err := a.db.Exec(ctx, "UPDATE users SET state=$1 WHERE stateat IS NOT NULL", t.StateDeleted); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE users ADD INDEX users_state(state)"); err != nil {
return err
}

// Topics
if _, err := a.db.Exec(ctx, "ALTER TABLE topics ADD state SMALLINT NOT NULL DEFAULT 0 AFTER updatedat"); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE topics CHANGE deletedat stateat TIMESTAMP(3)"); err != nil {
return err
}

// Add status to formerly soft-deleted topics.
if _, err := a.db.Exec(ctx, "UPDATE topics SET state=$1 WHERE stateat IS NOT NULL", t.StateDeleted); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE topics ADD INDEX topics_state(state)"); err != nil {
return err
}

// Subscriptions
if _, err := a.db.Exec(ctx, "ALTER TABLE subscriptions ADD INDEX topics_deletedat(deletedat)"); err != nil {
return err
}

if err := bumpVersion(a, 111); err != nil {
return err
}
ctx, cancel := a.getContext()
if cancel != nil {
defer cancel()
}

if a.version == 111 {
// Perform database upgrade from version 111 to version 112.
if _, err := a.db.Exec(ctx, "ALTER TABLE users ADD trusted JSON AFTER public"); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE topics ADD trusted JSON AFTER public"); err != nil {
return err
}

// Remove NOT NULL constraint, so an avatar upload can be done at registration.
if _, err := a.db.Exec(ctx, "ALTER TABLE fileuploads MODIFY userid BIGINT"); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE fileuploads ADD INDEX fileuploads_status(status)"); err != nil {
return err
}
if a.version == 112 {
// Perform database upgrade from version 112 to version 113.

// Remove NOT NULL constraint to enable links to users and topics.
if _, err := a.db.Exec(ctx, "ALTER TABLE filepgglinks MODIFY pggid INT"); err != nil {
// Index for deleting unvalidated accounts.
if _, err := a.db.Exec(ctx, "CREATE INDEX users_lastseen_updatedat ON users(lastseen,updatedat)"); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE filepgglinks ADD topic CHAR(25)"); err != nil {
// Allow lnger kvmeta keys.
if _, err := a.db.Exec(ctx, `ALTER TABLE kvmeta ALTER COLUMN "key" TYPE VARCHAR(64)`); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE filepgglinks ADD userid BIGINT"); err != nil {
if _, err := a.db.Exec(ctx, `ALTER TABLE kvmeta ALTER COLUMN "key" SET NOT NULL`); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE filepgglinks ADD FOREIGN KEY(topic) REFERENCES topics(name) ON DELETE CASCADE"); err != nil {
// Add timestamp to kvmeta.
if _, err := a.db.Exec(ctx, `ALTER TABLE kvmeta ADD COLUMN createdat TIMESTAMP(3)`); err != nil {
return err
}

if _, err := a.db.Exec(ctx, "ALTER TABLE filepgglinks ADD FOREIGN KEY(userid) REFERENCES users(id) ON DELETE CASCADE"); err != nil {
// Add compound index on the new field and key (could be searched by key prefix).
if _, err := a.db.Exec(ctx, `CREATE INDEX kvmeta_createdat_key ON kvmeta(createdat, "key")`); err != nil {
return err
}

if err := bumpVersion(a, 112); err != nil {
if err := bumpVersion(a, 113); err != nil {
return err
}
}
Expand All @@ -783,19 +656,14 @@ func createSystemTopic(tx pgx.Tx) error {
}

func addTags(ctx context.Context, tx pgx.Tx, table, keyName string, keyVal interface{}, tags []string, ignoreDups bool) error {

if len(tags) == 0 {
return nil
}

var err error

sql := fmt.Sprintf("INSERT INTO %s (%s, tag) VALUES($1,$2)", table, keyName)

for _, tag := range tags {
_, err = tx.Exec(ctx, sql, keyVal, tag)

if err != nil {
if _, err := tx.Exec(ctx, sql, keyVal, tag); err != nil {
if isDupe(err) {
if ignoreDups {
continue
Expand Down Expand Up @@ -872,9 +740,9 @@ func (a *adapter) AuthAddRecord(uid t.Uid, scheme, unique string, authLvl auth.L
if cancel != nil {
defer cancel()
}
_, err := a.db.Exec(ctx, "INSERT INTO auth(uname,userid,scheme,authLvl,secret,expires) VALUES($1,$2,$3,$4,$5,$6)",
unique, store.DecodeUid(uid), scheme, authLvl, secret, exp)
if err != nil {

if _, err := a.db.Exec(ctx, "INSERT INTO auth(uname,userid,scheme,authLvl,secret,expires) VALUES($1,$2,$3,$4,$5,$6)",
unique, store.DecodeUid(uid), scheme, authLvl, secret, exp); err != nil {
if isDupe(err) {
return t.ErrDuplicate
}
Expand All @@ -899,6 +767,7 @@ func (a *adapter) AuthDelAllRecords(user t.Uid) (int, error) {
if cancel != nil {
defer cancel()
}

res, err := a.db.Exec(ctx, "DELETE FROM auth WHERE userid=$1", store.DecodeUid(user))
if err != nil {
return 0, err
Expand Down Expand Up @@ -1024,19 +893,17 @@ func (a *adapter) UserGet(uid t.Uid) (*t.User, error) {
}
defer row.Close()

row.Next()
if !row.Next() {
// Nothing found: user does not exist or marked as soft-deleted
return nil, nil
}

err = row.Scan(&id, &user.CreatedAt, &user.UpdatedAt, &user.State, &user.StateAt, &user.Access, &user.LastSeen, &user.UserAgent, &user.Public, &user.Trusted, &user.Tags)
if err == nil {
user.SetUid(uid)
return &user, nil
}

if err == pgx.ErrNoRows {
// Clear the error if user does not exist or marked as soft-deleted.
return nil, nil
}

return nil, err
}

Expand Down
Loading

0 comments on commit 66e70c3

Please sign in to comment.