Skip to content

Commit

Permalink
mailserver: build pruning into WMailServer (#1342)
Browse files Browse the repository at this point in the history
MailServer pruning was implemented as a separate command but that required stopping a mail server and executing the command manually. This change builds pruning into MailServer and can be set using MailServerDataRetention in WhisperConfig.
  • Loading branch information
adambabik authored Jan 23, 2019
1 parent 1a365a5 commit d20b5dc
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 109 deletions.
9 changes: 0 additions & 9 deletions cmd/statusd-prune/README.md

This file was deleted.

70 changes: 0 additions & 70 deletions cmd/statusd-prune/main.go

This file was deleted.

84 changes: 72 additions & 12 deletions mailserver/cleaner.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,100 @@
package mailserver

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
)

const batchSize = 1000
const (
dbCleanerBatchSize = 1000
dbCleanerPeriod = time.Hour
)

// dbCleaner removes old messages from a db.
type dbCleaner struct {
sync.RWMutex

// Cleaner removes old messages from a db
type Cleaner struct {
db dbImpl
batchSize int
retention time.Duration

period time.Duration
cancel chan struct{}
}

// NewCleanerWithDB returns a new Cleaner for db
func NewCleanerWithDB(db dbImpl) *Cleaner {
return &Cleaner{
// newDBCleaner returns a new cleaner for db.
func newDBCleaner(db dbImpl, retention time.Duration) *dbCleaner {
return &dbCleaner{
db: db,
batchSize: batchSize,
retention: retention,

batchSize: dbCleanerBatchSize,
period: dbCleanerPeriod,
}
}

// Start starts a loop that cleans up old messages.
func (c *dbCleaner) Start() {
log.Info("Starting cleaning envelopes", "period", c.period, "retention", c.retention)

cancel := make(chan struct{})

c.Lock()
c.cancel = cancel
c.Unlock()

go c.schedule(c.period, cancel)
}

// Stops stops the cleaning loop.
func (c *dbCleaner) Stop() {
c.Lock()
defer c.Unlock()

if c.cancel == nil {
return
}
close(c.cancel)
c.cancel = nil
}

func (c *dbCleaner) schedule(period time.Duration, cancel <-chan struct{}) {
t := time.NewTicker(period)
defer t.Stop()

for {
select {
case <-t.C:
count, err := c.PruneEntriesOlderThan(time.Now().Add(-c.retention))
if err != nil {
log.Error("failed to prune data", "err", err)
}
log.Info("Prunned some some messages successfully", "count", count)
case <-cancel:
return
}
}
}

// Prune removes messages sent between lower and upper timestamps and returns how many has been removed
func (c *Cleaner) Prune(lower, upper uint32) (int, error) {
// PruneEntriesOlderThan removes messages sent between lower and upper timestamps
// and returns how many have been removed.
func (c *dbCleaner) PruneEntriesOlderThan(t time.Time) (int, error) {
var zero common.Hash
kl := NewDBKey(lower, zero)
ku := NewDBKey(upper, zero)
kl := NewDBKey(0, zero)
ku := NewDBKey(uint32(t.Unix()), zero)
i := c.db.NewIterator(&util.Range{Start: kl.Bytes(), Limit: ku.Bytes()}, nil)
defer i.Release()

return c.prune(i)
}

func (c *Cleaner) prune(i iterator.Iterator) (int, error) {
func (c *dbCleaner) prune(i iterator.Iterator) (int, error) {
batch := leveldb.Batch{}
removed := 0

Expand Down
38 changes: 28 additions & 10 deletions mailserver/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,39 @@ import (
func TestCleaner(t *testing.T) {
now := time.Now()
server := setupTestServer(t)
cleaner := NewCleanerWithDB(server.db)
defer server.Close()
cleaner := newDBCleaner(server.db, time.Hour)

archiveEnvelope(t, now.Add(-10*time.Second), server)
archiveEnvelope(t, now.Add(-3*time.Second), server)
archiveEnvelope(t, now.Add(-1*time.Second), server)

testMessagesCount(t, 3, server)

testPrune(t, now.Add(-5*time.Second), 2, cleaner, server)
testPrune(t, now.Add(-5*time.Second), 1, cleaner, server)
testPrune(t, now.Add(-2*time.Second), 1, cleaner, server)
testPrune(t, now, 0, cleaner, server)
testPrune(t, now, 1, cleaner, server)

testMessagesCount(t, 0, server)
}

func TestCleanerSchedule(t *testing.T) {
now := time.Now()
server := setupTestServer(t)
defer server.Close()

cleaner := newDBCleaner(server.db, time.Hour)
cleaner.period = time.Millisecond * 10
cleaner.Start()
defer cleaner.Stop()

archiveEnvelope(t, now.Add(-3*time.Hour), server)
archiveEnvelope(t, now.Add(-2*time.Hour), server)
archiveEnvelope(t, now.Add(-1*time.Minute), server)

time.Sleep(time.Millisecond * 50)

testMessagesCount(t, 1, server)
}

func benchmarkCleanerPrune(b *testing.B, messages int, batchSize int) {
Expand All @@ -38,7 +59,7 @@ func benchmarkCleanerPrune(b *testing.B, messages int, batchSize int) {
server := setupTestServer(t)
defer server.Close()

cleaner := NewCleanerWithDB(server.db)
cleaner := newDBCleaner(server.db, time.Hour)
cleaner.batchSize = batchSize

for i := 0; i < messages; i++ {
Expand Down Expand Up @@ -81,13 +102,10 @@ func archiveEnvelope(t *testing.T, sentTime time.Time, server *WMailServer) *whi
return env
}

func testPrune(t *testing.T, u time.Time, expected int, c *Cleaner, s *WMailServer) {
upper := uint32(u.Unix())
_, err := c.Prune(0, upper)
func testPrune(t *testing.T, u time.Time, expected int, c *dbCleaner, s *WMailServer) {
n, err := c.PruneEntriesOlderThan(u)
require.NoError(t, err)

count := countMessages(t, s.db)
require.Equal(t, expected, count)
require.Equal(t, expected, n)
}

func testMessagesCount(t *testing.T, expected int, s *WMailServer) {
Expand Down
26 changes: 21 additions & 5 deletions mailserver/mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type WMailServer struct {

muRateLimiter sync.RWMutex
rateLimiter *rateLimiter

cleaner *dbCleaner // removes old envelopes
}

// Init initializes mailServer.
Expand All @@ -97,7 +99,10 @@ func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) e
if err := s.setupRequestMessageDecryptor(config); err != nil {
return err
}
s.setupRateLimiter(time.Duration(config.MailServerRateLimit) * time.Second)

if config.MailServerRateLimit > 0 {
s.setupRateLimiter(time.Duration(config.MailServerRateLimit) * time.Second)
}

// Open database in the last step in order not to init with error
// and leave the database open by accident.
Expand All @@ -107,16 +112,24 @@ func (s *WMailServer) Init(shh *whisper.Whisper, config *params.WhisperConfig) e
}
s.db = database

if config.MailServerDataRetention > 0 {
// MailServerDataRetention is a number of days.
s.setupCleaner(time.Duration(config.MailServerDataRetention) * time.Hour * 24)
}

return nil
}

// setupRateLimiter in case limit is bigger than 0 it will setup an automated
// limit db cleanup.
func (s *WMailServer) setupRateLimiter(limit time.Duration) {
if limit > 0 {
s.rateLimiter = newRateLimiter(limit)
s.rateLimiter.Start()
}
s.rateLimiter = newRateLimiter(limit)
s.rateLimiter.Start()
}

func (s *WMailServer) setupCleaner(retention time.Duration) {
s.cleaner = newDBCleaner(s.db, retention)
s.cleaner.Start()
}

// setupRequestMessageDecryptor setup a Whisper filter to decrypt
Expand Down Expand Up @@ -160,6 +173,9 @@ func (s *WMailServer) Close() {
if s.rateLimiter != nil {
s.rateLimiter.Stop()
}
if s.cleaner != nil {
s.cleaner.Stop()
}
}

func recoverLevelDBPanics(calleMethodName string) {
Expand Down
6 changes: 3 additions & 3 deletions params/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ type WhisperConfig struct {
// MailServerAsymKey is an hex-encoded asymmetric key to decrypt messages sent to MailServer.
MailServerAsymKey string

// RateLimit minimum time between queries to mail server per peer
// MailServerRateLimit minimum time between queries to mail server per peer.
MailServerRateLimit int

// MailServerCleanupPeriod time in seconds to wait to run mail server cleanup
MailServerCleanupPeriod int
// MailServerDataRetention is a number of days data should be stored by MailServer.
MailServerDataRetention int

// TTL time to live for messages, in seconds
TTL int
Expand Down

0 comments on commit d20b5dc

Please sign in to comment.