Skip to content

Commit

Permalink
Add postgres
Browse files Browse the repository at this point in the history
This commits adds support for postgres database.
Currently two fields are stored: the bloom filter and the topic.
Only the bloom filter is actually used to query, but potentially we will
use also the topic in the future, so easier to separate it now in order
to avoid a migration.
  • Loading branch information
cammellos committed May 15, 2019
1 parent 9e89efd commit 4ab0862
Show file tree
Hide file tree
Showing 52 changed files with 7,655 additions and 56 deletions.
3 changes: 2 additions & 1 deletion .codeclimate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ exclude_patterns:
- vendor/
- static/
- t/

- mailserver/migrations
- services/shhext/chat/migrations
17 changes: 16 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ setup-build: dep-install lint-install release-install gomobile-install ##@other
setup: setup-build setup-dev ##@other Prepare project for development and building

generate: ##@other Regenerate assets and other auto-generated stuff
go generate ./static ./static/migrations
go generate ./static ./static/encryption_migrations ./static/mailserver_db_migrations
$(shell cd ./services/shhext/chat && exec protoc --go_out=. ./*.proto)

prepare-release: clean-release
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.25.0-beta.1
0.25.0-beta.2
20 changes: 20 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ If you want your node to relay Whisper(SHH) protocol messages you'll want to inc
```
The `MailServerPassword` is used for symmetric encryption of history requests.

By default it will use `leveldb` embedded database. To use postgres instead you need to
pass a config of this kind:

```json
{
"WhisperConfig": {
"Enabled": true,
"EnableMailServer": true,
"LightClient": false,
"MailServerPassword": "status-offline-inbox"
"DatabaseConfig": {
"PGConfig": {
"Enabled": true,
"URI": "postgres://user:password@host:port?options"
}
}
}
}
```

__NOTE:__ The default password used by Status App and [our mailservers](https://fleets.status.im/) is `status-offline-inbox`.

## `ClusterConfig`
Expand Down
4 changes: 2 additions & 2 deletions mailserver/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
type dbCleaner struct {
sync.RWMutex

db dbImpl
db DB
batchSize int
retention time.Duration

Expand All @@ -25,7 +25,7 @@ type dbCleaner struct {
}

// newDBCleaner returns a new cleaner for db.
func newDBCleaner(db dbImpl, retention time.Duration) *dbCleaner {
func newDBCleaner(db DB, retention time.Duration) *dbCleaner {
return &dbCleaner{
db: db,
retention: retention,
Expand Down
13 changes: 9 additions & 4 deletions mailserver/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func setupTestServer(t *testing.T) *WMailServer {
var s WMailServer
db, _ := leveldb.Open(storage.NewMemStorage(), nil)

s.db = &LevelDBImpl{ldb: db}
s.db = &LevelDB{ldb: db}
s.pow = powRequirement
return &s
}
Expand All @@ -114,7 +114,7 @@ func testMessagesCount(t *testing.T, expected int, s *WMailServer) {
require.Equal(t, expected, count, fmt.Sprintf("expected %d message, got: %d", expected, count))
}

func countMessages(t *testing.T, db dbImpl) int {
func countMessages(t *testing.T, db DB) int {
var (
count int
zero common.Hash
Expand All @@ -130,12 +130,17 @@ func countMessages(t *testing.T, db dbImpl) int {
end: ku.raw,
}

i := db.BuildIterator(query)
i, _ := db.BuildIterator(query)
defer i.Release()

for i.Next() {
var env whisper.Envelope
err := rlp.DecodeBytes(i.Value(), &env)
value, err := i.GetEnvelope(query.bloom)
if err != nil {
t.Fatal(err)
}

err = rlp.DecodeBytes(value, &env)
if err != nil {
t.Fatal(err)
}
Expand Down
50 changes: 38 additions & 12 deletions mailserver/mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const (

// WMailServer whisper mailserver.
type WMailServer struct {
db dbImpl
db DB
w *whisper.Whisper
pow float64
symFilter *whisper.Filter
Expand All @@ -69,8 +69,6 @@ type WMailServer struct {

// Init initializes mailServer.
func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) error {
var err error

if len(config.DataDir) == 0 {
return errDirectoryNotProvided
}
Expand All @@ -92,11 +90,22 @@ func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) e

// Open database in the last step in order not to init with error
// and leave the database open by accident.
database, err := NewLevelDBImpl(config)
if err != nil {
return fmt.Errorf("open DB: %s", err)
if config.DatabaseConfig.PGConfig.Enabled {
log.Info("Connecting to postgres database")
database, err := NewPostgresDB(config)
if err != nil {
return fmt.Errorf("open DB: %s", err)
}
s.db = database
log.Info("Connected to postgres database")
} else {
// Defaults to LevelDB
database, err := NewLevelDB(config)
if err != nil {
return fmt.Errorf("open DB: %s", err)
}
s.db = database
}
s.db = database

if config.MailServerDataRetention > 0 {
// MailServerDataRetention is a number of days.
Expand Down Expand Up @@ -253,7 +262,15 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
requestsBatchedCounter.Inc(1)
}

iter := s.createIterator(lower, upper, cursor, bloom, limit)
iter, err := s.createIterator(lower, upper, cursor, bloom, limit)
if err != nil {
log.Error("[mailserver:DeliverMail] request failed",
"peerID", peerID,
"requestID", requestID)

return
}

defer iter.Release()

bundles := make(chan []rlp.RawValue, 5)
Expand Down Expand Up @@ -347,7 +364,10 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque
return fmt.Errorf("request is invalid: %v", err)
}

iter := s.createIterator(request.Lower, request.Upper, request.Cursor, nil, 0)
iter, err := s.createIterator(request.Lower, request.Upper, request.Cursor, nil, 0)
if err != nil {
return err
}
defer iter.Release()

bundles := make(chan []rlp.RawValue, 5)
Expand Down Expand Up @@ -393,7 +413,7 @@ func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailReque
peer,
whisper.SyncResponse{Error: "failed to process all envelopes"},
)
return fmt.Errorf("levelDB iterator failed: %v", err)
return fmt.Errorf("LevelDB iterator failed: %v", err)
}

log.Info("Finished syncing envelopes", "peer", peerIDString(peer))
Expand Down Expand Up @@ -428,7 +448,7 @@ func (s *WMailServer) exceedsPeerRequests(peer []byte) bool {
return true
}

func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte, bloom []byte, limit uint32) Iterator {
func (s *WMailServer) createIterator(lower, upper uint32, cursor []byte, bloom []byte, limit uint32) (Iterator, error) {
var (
emptyHash common.Hash
emptyTopic whisper.TopicType
Expand Down Expand Up @@ -495,7 +515,13 @@ func (s *WMailServer) processRequestInBundles(
continue
}

key := iter.DBKey()
key, err := iter.DBKey()
if err != nil {
log.Error("[mailserver:processRequestInBundles] failed getting key",
"requestID", requestID)
break

}

lastEnvelopeHash = key.EnvelopeHash()
processedEnvelopes++
Expand Down
10 changes: 4 additions & 6 deletions mailserver/mailserver_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"time"
)

// dbImpl is an interface to abstract interactions with the db so that the mailserver
// DB is an interface to abstract interactions with the db so that the mailserver
// is agnostic to the underlaying technology used
type dbImpl interface {
type DB interface {
Close() error
// SaveEnvelope stores an envelope
SaveEnvelope(*whisper.Envelope) error
Expand All @@ -16,14 +16,12 @@ type dbImpl interface {
// Prune removes envelopes older than time
Prune(time.Time, int) (int, error)
// BuildIterator returns an iterator over envelopes
BuildIterator(query CursorQuery) Iterator
BuildIterator(query CursorQuery) (Iterator, error)
}

type Iterator interface {
Next() bool
Prev() bool
DBKey() *DBKey
Value() []byte
DBKey() (*DBKey, error)
Release()
Error() error
GetEnvelope(bloom []byte) ([]byte, error)
Expand Down
Loading

0 comments on commit 4ab0862

Please sign in to comment.