Skip to content

Commit

Permalink
Support Mail Server data synchronization (#1302)
Browse files Browse the repository at this point in the history
These changes add a support for syncing data between two Mail Servers. It does not give an easy way to start syncing. This will be solved in the next PR.
  • Loading branch information
adambabik authored Dec 6, 2018
1 parent d51761f commit 913dbfc
Show file tree
Hide file tree
Showing 10 changed files with 482 additions and 132 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

[[constraint]]
name = "github.com/status-im/whisper"
version = "=v1.4.1"
version = "=v1.4.2"

[[override]]
name = "github.com/golang/protobuf"
Expand Down
24 changes: 23 additions & 1 deletion cmd/node-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ func verifyMailserverBehavior(mailserverNode *enode.Node) {
}
requestID := common.BytesToHash(requestIDBytes)

// wait for mailserver request sent event
err = waitForMailServerRequestSent(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second)
if err != nil {
logger.Error("Error waiting for mailserver request sent event", "error", err)
os.Exit(3)
}

// wait for mailserver response
resp, err := waitForMailServerResponse(mailServerResponseWatcher, requestID, time.Duration(*timeout)*time.Second)
if err != nil {
Expand Down Expand Up @@ -313,6 +320,21 @@ func joinPublicChat(w *whisper.Whisper, rpcClient *rpc.Client, name string) (str
return keyID, topic, filterID, err
}

func waitForMailServerRequestSent(events chan whisper.EnvelopeEvent, requestID common.Hash, timeout time.Duration) error {
timeoutTimer := time.NewTimer(timeout)
for {
select {
case event := <-events:
if event.Hash == requestID && event.Event == whisper.EventMailServerRequestSent {
timeoutTimer.Stop()
return nil
}
case <-timeoutTimer.C:
return errors.New("timed out waiting for mailserver request sent")
}
}
}

func waitForMailServerResponse(events chan whisper.EnvelopeEvent, requestID common.Hash, timeout time.Duration) (*whisper.MailServerResponse, error) {
timeoutTimer := time.NewTimer(timeout)
for {
Expand Down Expand Up @@ -345,7 +367,7 @@ func decodeMailServerResponse(event whisper.EnvelopeEvent) (*whisper.MailServerR
case whisper.EventMailServerRequestExpired:
return nil, errors.New("no messages available from mailserver")
default:
return nil, errors.New("unexpected event type")
return nil, fmt.Errorf("unexpected event type: %v", event.Event)
}
}

Expand Down
230 changes: 148 additions & 82 deletions mailserver/mailserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (
requestValidationErrorsCounter = metrics.NewRegisteredCounter("mailserver/requestValidationErrors", nil)
processRequestErrorsCounter = metrics.NewRegisteredCounter("mailserver/processRequestErrors", nil)
historicResponseErrorsCounter = metrics.NewRegisteredCounter("mailserver/historicResponseErrors", nil)
syncRequestsMeter = metrics.NewRegisteredMeter("mailserver/syncRequests", nil)
)

const (
Expand Down Expand Up @@ -242,7 +243,10 @@ func (s *WMailServer) Archive(env *whisper.Envelope) {

// DeliverMail sends mail to specified whisper peer.
func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) {
defer recoverLevelDBPanics("DeliverMail")

log.Info("Delivering mail", "peerID", peerIDString(peer))

requestsMeter.Mark(1)

if peer == nil {
Expand All @@ -257,8 +261,6 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
return
}

defer recoverLevelDBPanics("DeliverMail")

var (
lower, upper uint32
bloom []byte
Expand Down Expand Up @@ -288,24 +290,56 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
}

log.Debug("Processing request",
"lower", lower, "upper", upper,
"lower", lower,
"upper", upper,
"bloom", bloom,
"limit", limit,
"cursor", cursor,
"batch", batch)
"batch", batch,
)

if batch {
requestsBatchedCounter.Inc(1)
}

_, lastEnvelopeHash, nextPageCursor, err := s.processRequest(
peer,
lower, upper,
iter := s.createIterator(lower, upper, cursor)
defer iter.Release()

bundles := make(chan []*whisper.Envelope, 5)
errCh := make(chan error)

go func() {
for bundle := range bundles {
if err := s.sendEnvelopes(peer, bundle, batch); err != nil {
errCh <- err
break
}
}
close(errCh)
}()

start := time.Now()
nextPageCursor, lastEnvelopeHash := s.processRequestInBundles(
iter,
bloom,
limit,
cursor,
batch)
if err != nil {
int(limit),
bundles,
)
requestProcessTimer.UpdateSince(start)

// bundles channel can be closed now because the processing finished.
close(bundles)

// Wait for the goroutine to finish the work. It may return an error.
if err := <-errCh; err != nil {
processRequestErrorsCounter.Inc(1)
log.Error("Error while processing mail server request", "err", err, "peerID", peerIDString(peer))
s.trySendHistoricMessageErrorResponse(peer, request, err)
return
}

// Processing of the request could be finished earlier due to iterator error.
if err := iter.Error(); err != nil {
processRequestErrorsCounter.Inc(1)
log.Error("Error while processing mail server request", "err", err, "peerID", peerIDString(peer))
s.trySendHistoricMessageErrorResponse(peer, request, err)
Expand All @@ -322,6 +356,80 @@ func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope)
}
}

// SyncMail syncs mail servers between two Mail Servers.
func (s *WMailServer) SyncMail(peer *whisper.Peer, request whisper.SyncMailRequest) error {
log.Info("Started syncing envelopes", "peer", peerIDString(peer), "req", request)

defer recoverLevelDBPanics("SyncMail")

syncRequestsMeter.Mark(1)

// Check rate limiting for a requesting peer.
if s.exceedsPeerRequests(peer.ID()) {
requestErrorsCounter.Inc(1)
log.Error("Peer exceeded request per seconds limit", "peerID", peerIDString(peer))
return fmt.Errorf("requests per seconds limit exceeded")
}

iter := s.createIterator(request.Lower, request.Upper, request.Cursor)
defer iter.Release()

bundles := make(chan []*whisper.Envelope, 5)
errCh := make(chan error)

go func() {
for bundle := range bundles {
resp := whisper.SyncResponse{Envelopes: bundle}
if err := s.w.SendSyncResponse(peer, resp); err != nil {
errCh <- fmt.Errorf("failed to send sync response: %v", err)
break
}
}
close(errCh)
}()

start := time.Now()
nextCursor, _ := s.processRequestInBundles(
iter,
request.Bloom,
int(request.Limit),
bundles,
)
requestProcessTimer.UpdateSince(start)

// bundles channel can be closed now because the processing finished.
close(bundles)

// Wait for the goroutine to finish the work. It may return an error.
if err := <-errCh; err != nil {
_ = s.w.SendSyncResponse(
peer,
whisper.SyncResponse{Error: "failed to send a response"},
)
return err
}

// Processing of the request could be finished earlier due to iterator error.
if err := iter.Error(); err != nil {
_ = s.w.SendSyncResponse(
peer,
whisper.SyncResponse{Error: "failed to process all envelopes"},
)
return fmt.Errorf("levelDB iterator failed: %v", err)
}

log.Info("Finished syncing envelopes", "peer", peerIDString(peer))

if err := s.w.SendSyncResponse(peer, whisper.SyncResponse{
Cursor: nextCursor,
Final: true,
}); err != nil {
return fmt.Errorf("failed to send the final sync response: %v", err)
}

return nil
}

// exceedsPeerRequests in case limit its been setup on the current server and limit
// allows the query, it will store/update new query time for the current peer.
func (s *WMailServer) exceedsPeerRequests(peer []byte) bool {
Expand Down Expand Up @@ -360,45 +468,24 @@ func (s *WMailServer) createIterator(lower, upper uint32, cursor cursorType) ite
return i
}

// processRequest processes the current request and re-sends all stored messages
// accomplishing lower and upper limits. The limit parameter determines the maximum number of
// messages to be sent back for the current request.
// The cursor parameter is used for pagination.
// After sending all the messages, a message of type p2pRequestCompleteCode is sent by the mailserver to
// the peer.
func (s *WMailServer) processRequest(
peer *whisper.Peer,
lower, upper uint32,
bloom []byte,
limit uint32,
cursor cursorType,
batch bool,
) (ret []*whisper.Envelope, lastEnvelopeHash common.Hash, nextPageCursor cursorType, err error) {
// Recover from possible goleveldb panics
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("recovered from panic in processRequest: %v", r)
}
}()

var (
sentEnvelopes uint32
sentEnvelopesSize int64
)

i := s.createIterator(lower, upper, cursor)
defer i.Release()

// processRequestInBundles processes envelopes using an iterator and passes them
// to the output channel in bundles.
func (s *WMailServer) processRequestInBundles(
iter iterator.Iterator, bloom []byte, limit int, output chan<- []*whisper.Envelope,
) (cursorType, common.Hash) {
var (
bundle []*whisper.Envelope
bundleSize uint32
bundle []*whisper.Envelope
bundleSize uint32
processedEnvelopes int
processedEnvelopesSize int64
nextCursor cursorType
lastEnvelopeHash common.Hash
)

start := time.Now()

for i.Prev() {
for iter.Prev() {
var envelope whisper.Envelope
decodeErr := rlp.DecodeBytes(i.Value(), &envelope)

decodeErr := rlp.DecodeBytes(iter.Value(), &envelope)
if decodeErr != nil {
log.Error("failed to decode RLP", "err", decodeErr)
continue
Expand All @@ -409,38 +496,30 @@ func (s *WMailServer) processRequest(
}

newSize := bundleSize + whisper.EnvelopeHeaderLength + uint32(len(envelope.Data))
limitReached := limit != noLimits && (int(sentEnvelopes)+len(bundle)) == int(limit)
limitReached := limit != noLimits && (processedEnvelopes+len(bundle)) == limit
if !limitReached && newSize < s.w.MaxMessageSize() {
bundle = append(bundle, &envelope)
bundleSize = newSize
lastEnvelopeHash = envelope.Hash()
continue
}

if peer == nil {
// used for test purposes
ret = append(ret, bundle...)
} else {
err = s.sendEnvelopes(peer, bundle, batch)
if err != nil {
return
}
}
output <- bundle
bundle = bundle[:0]
bundleSize = 0

sentEnvelopes += uint32(len(bundle))
sentEnvelopesSize += int64(bundleSize)
processedEnvelopes += len(bundle)
processedEnvelopesSize += int64(bundleSize)

if limitReached {
bundle = nil
bundleSize = 0

// When the limit is reached, the current retrieved envelope
// is not included in the response.
// The nextPageCursor is a key used as a limit in a range and
// The nextCursor is a key used as a limit in a range and
// is not included in the range, hence, we need to get
// the previous iterator key.
i.Next()
nextPageCursor = i.Key()
// Because we iterate backwards, we use Next().
iter.Next()
nextCursor = iter.Key()
break
} else {
// Reset bundle information and add the last read envelope
Expand All @@ -452,28 +531,15 @@ func (s *WMailServer) processRequest(
lastEnvelopeHash = envelope.Hash()
}

// Send any outstanding envelopes.
// There might be some outstanding elements in the bundle.
if len(bundle) > 0 && bundleSize > 0 {
if peer == nil {
ret = append(ret, bundle...)
} else {
err = s.sendEnvelopes(peer, bundle, batch)
if err != nil {
return
}
}
output <- bundle
}

requestProcessTimer.UpdateSince(start)
sentEnvelopesMeter.Mark(int64(sentEnvelopes))
sentEnvelopesSizeMeter.Mark(sentEnvelopesSize)

err = i.Error()
if err != nil {
err = fmt.Errorf("levelDB iterator error: %v", err)
}
sentEnvelopesMeter.Mark(int64(processedEnvelopes))
sentEnvelopesSizeMeter.Mark(processedEnvelopesSize)

return
return nextCursor, lastEnvelopeHash
}

func (s *WMailServer) sendEnvelopes(peer *whisper.Peer, envelopes []*whisper.Envelope, batch bool) error {
Expand Down
4 changes: 1 addition & 3 deletions mailserver/mailserver_db_panic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ func (s *MailServerDBPanicSuite) TestArchive() {

func (s *MailServerDBPanicSuite) TestDeliverMail() {
defer s.testPanicRecover("DeliverMail")
_, _, _, err := s.server.processRequest(nil, 10, 20, []byte{}, 0, nil, false)
s.Error(err)
s.Equal("recovered from panic in processRequest: panicDB panic on NewIterator", err.Error())
s.server.DeliverMail(&whisper.Peer{}, &whisper.Envelope{})
}

func (s *MailServerDBPanicSuite) testPanicRecover(method string) {
Expand Down
Loading

0 comments on commit 913dbfc

Please sign in to comment.