Skip to content

Commit

Permalink
cache consensus fetched data (crypto-power#603)
Browse files Browse the repository at this point in the history
* cache consensus fetched data

* refactor function name, return error on GetLastSyncedTimestamp function

* increase auto sync time, add manual sync button

* fix lint

---------

Co-authored-by: bugjt <@bugjt>
  • Loading branch information
bugjt authored Aug 5, 2024
1 parent 0ec2235 commit de0b634
Show file tree
Hide file tree
Showing 10 changed files with 516 additions and 130 deletions.
253 changes: 253 additions & 0 deletions libwallet/assets/dcr/agenda.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package dcr

import (
"context"
"fmt"
"net/http"
"sort"
"sync"
"time"

"decred.org/dcrwallet/v4/errors"
"github.com/asdine/storm"
"github.com/crypto-power/cryptopower/libwallet/utils"
"github.com/decred/dcrd/chaincfg/v3"
"github.com/decred/dcrd/wire"
)

const (
configDBBkt = "consensus_agenda_config"
LastSyncedTimestampConfigKey = "consensus_agenda_last_synced_timestamp"
)

type consensusAgendaSyncCallback func(status utils.AgendaSyncStatus)

type ConsensusAgenda struct {
db *storm.DB
chainParams *chaincfg.Params

mu *sync.RWMutex // Pointer required to avoid copying literal values.
ctx context.Context
cancelSync context.CancelFunc

syncCallbacksMtx *sync.RWMutex // Pointer required to avoid copying literal values.
syncCallbacks map[string]consensusAgendaSyncCallback
}

func NewConsensusAgenda(chainParams *chaincfg.Params, db *storm.DB) *ConsensusAgenda {
return &ConsensusAgenda{
chainParams: chainParams,
db: db,
mu: &sync.RWMutex{},
syncCallbacksMtx: &sync.RWMutex{},
syncCallbacks: make(map[string]consensusAgendaSyncCallback),
}
}

func (c *ConsensusAgenda) saveLastSyncedTimestamp(lastSyncedTimestamp int64) {
err := c.db.Set(configDBBkt, LastSyncedTimestampConfigKey, &lastSyncedTimestamp)
if err != nil {
log.Errorf("error setting config value for key: %s, error: %v", LastSyncedTimestampConfigKey, err)
}
}

func (c *ConsensusAgenda) GetLastSyncedTimestamp() (int64, error) {
var lastSyncedTimestamp int64
err := c.db.Get(configDBBkt, LastSyncedTimestampConfigKey, &lastSyncedTimestamp)
if err != nil {
return 0, err
}
return lastSyncedTimestamp, nil
}

func (c *ConsensusAgenda) saveOrOverwriteAgenda(dataAgenda *DcrdataAgenda) error {
var oldAgenda DcrdataAgenda
err := c.db.One("Name", dataAgenda.Name, &oldAgenda)
if err != nil && err != storm.ErrNotFound {
return errors.Errorf("error checking if agenda was already indexed: %s", err.Error())
}

if oldAgenda.Name != "" {
// delete old record before saving new (if it exists)
err = c.db.DeleteStruct(&oldAgenda)
if err != nil {
return err
}
}

return c.db.Save(dataAgenda)
}

// GetAgendaRaw fetches and returns a agenda from the db
func (c *ConsensusAgenda) getAgendaRaw(offset, limit int32, newestFirst bool) ([]DcrdataAgenda, error) {
query := c.db.Select()

if offset > 0 {
query = query.Skip(int(offset))
}

if limit > 0 {
query = query.Limit(int(limit))
}

if newestFirst {
query = query.OrderBy("StartTime").Reverse()
} else {
query = query.OrderBy("StartTime")
}

var agendas []DcrdataAgenda
err := query.Find(&agendas)
if err != nil && err != storm.ErrNotFound {
return nil, fmt.Errorf("error fetching agendas: %s", err.Error())
}

return agendas, nil
}

func (c *ConsensusAgenda) getAgendaDataFromURL() ([]*DcrdataAgenda, error) {
// Fetch high level agenda detail form dcrdata api.
var dcrdataAgenda []*DcrdataAgenda
host := dcrdataAgendasAPIMainnetURL
if c.chainParams.Net == wire.TestNet3 {
host = dcrdataAgendasAPITestnetURL
}

req := &utils.ReqConfig{
Method: http.MethodGet,
HTTPURL: host,
}

if _, err := utils.HTTPRequest(req, &dcrdataAgenda); err != nil {
return nil, err
}
return dcrdataAgenda, nil
}

func (c *ConsensusAgenda) IsSyncing() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cancelSync != nil
}

func (c *ConsensusAgenda) StopSync() {
c.mu.Lock()
if c.cancelSync != nil {
c.cancelSync()
c.cancelSync = nil
}
c.mu.Unlock()
log.Info("Consensus agenda sync: stopped")
}

func (c *ConsensusAgenda) AllVoteAgendas(chainParams *chaincfg.Params, newestFirst bool) ([]*Agenda, error) {
if chainParams.Deployments == nil {
return nil, nil // no agendas to return
}

// Check for all agendas from the intital stake version to the
// current stake version, in order to fetch legacy agendas.
deployments := make([]chaincfg.ConsensusDeployment, 0)
for _, v := range chainParams.Deployments {
deployments = append(deployments, v...)
}

dcrdataAgenda, err := c.getAgendaRaw(0, 0, newestFirst) // get all agenda from db
if err != nil {
return nil, err
}

agendaStatuses := make(map[string]string, len(dcrdataAgenda))
for _, agenda := range dcrdataAgenda {
agendaStatuses[agenda.Name] = AgendaStatusFromStr(agenda.Status).String()
}

agendas := make([]*Agenda, len(deployments))
for i := range deployments {
d := &deployments[i]

agendas[i] = &Agenda{
AgendaID: d.Vote.Id,
Description: d.Vote.Description,
Mask: uint32(d.Vote.Mask),
Choices: d.Vote.Choices,
VotingPreference: "", // this value can be updated after reading a selected wallet's preferences
StartTime: int64(d.StartTime),
ExpireTime: int64(d.ExpireTime),
Status: agendaStatuses[d.Vote.Id],
}
}
sort.Slice(agendas, func(i, j int) bool {
if newestFirst {
return agendas[i].StartTime > agendas[j].StartTime
}
return agendas[i].StartTime < agendas[j].StartTime
})
return agendas, nil
}

func (c *ConsensusAgenda) AddSyncCallback(syncCallback consensusAgendaSyncCallback, uniqueIdentifier string) error {
c.syncCallbacksMtx.Lock()
defer c.syncCallbacksMtx.Unlock()

if _, ok := c.syncCallbacks[uniqueIdentifier]; ok {
return errors.New(ErrListenerAlreadyExist)
}

c.syncCallbacks[uniqueIdentifier] = syncCallback
return nil
}

func (c *ConsensusAgenda) RemoveSyncCallback(uniqueIdentifier string) {
c.syncCallbacksMtx.Lock()
defer c.syncCallbacksMtx.Unlock()

delete(c.syncCallbacks, uniqueIdentifier)
}

// Sync fetches all agenda from the server saving them to the db
func (c *ConsensusAgenda) Sync(ctx context.Context) error {
c.mu.RLock()
if c.cancelSync != nil {
c.mu.RUnlock()
return errors.New(ErrSyncAlreadyInProgress)
}

c.ctx, c.cancelSync = context.WithCancel(ctx)
defer func() {
c.cancelSync = nil
}()
c.mu.RUnlock()

log.Info("Consensus Agenda sync: started")
// Check if agenda has been shutdown and exit if true.
if c.ctx.Err() != nil {
return c.ctx.Err()
}

agendaDatas, err := c.getAgendaDataFromURL()
if err != nil {
log.Errorf("Error fetching for agenda server policy: %v", err)
return err
}
for _, agendaData := range agendaDatas {
if err := c.saveOrOverwriteAgenda(agendaData); err != nil {
log.Errorf("Error saving agenda: %v", err)
return err
}
}

log.Info("Consensus Agenda sync: update complete")
c.saveLastSyncedTimestamp(time.Now().Unix())
c.publishSynced()
return nil
}

func (c *ConsensusAgenda) publishSynced() {
c.syncCallbacksMtx.Lock()
defer c.syncCallbacksMtx.Unlock()

for _, syncCallback := range c.syncCallbacks {
syncCallback(utils.AgendaStatusSynced)
}
}
6 changes: 6 additions & 0 deletions libwallet/assets/dcr/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package dcr

const (
ErrSyncAlreadyInProgress = "sync_already_in_progress"
ErrListenerAlreadyExist = "listener_already_exist"
)
19 changes: 10 additions & 9 deletions libwallet/assets/dcr/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,18 @@ type Agenda struct {
// dcrdata api https://dcrdata.decred.org/api/agendas for mainnet or
// https://testnet.decred.org/api/agendas for testnet.
type DcrdataAgenda struct {
ID int `storm:"id,increment"`
Name string `json:"name"`
Description string `json:"-"`
Description string `json:"description"`
Status string `json:"status"`
VotingStarted int64 `json:"-"`
VotingDone int64 `json:"-"`
Activated int64 `json:"-"`
HardForked int64 `json:"-"`
StartTime string `json:"-"`
ExpireTime string `json:"-"`
VoteVersion uint32 `json:"-"`
Mask uint16 `json:"-"`
VotingStarted int64 `json:"votingStarted"`
VotingDone int64 `json:"votingdone"`
Activated int64 `json:"activated"`
HardForked int64 `json:"hardforked"`
StartTime string `json:"starttime"`
ExpireTime string `json:"expiretime"`
VoteVersion uint32 `json:"voteversion"`
Mask uint16 `json:"mask"`
}

/** end agenda types */
Expand Down
5 changes: 4 additions & 1 deletion libwallet/assets_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type AssetsManager struct {
cancelFuncs []context.CancelFunc
chainsParams utils.ChainsParams

ConsensusAgenda *dcr.ConsensusAgenda
Politeia *politeia.Politeia
InstantSwap *instantswap.InstantSwap
ExternalService *ext.Service
Expand Down Expand Up @@ -178,6 +179,8 @@ func NewAssetsManager(rootDir, logDir string, netType utils.NetworkType, dexTest
return nil, err
}

mgr.ConsensusAgenda = dcr.NewConsensusAgenda(mgr.chainsParams.DCR, mwDB)

mgr.params.DB = mwDB
mgr.Politeia = politeia
mgr.InstantSwap = instantSwap
Expand Down Expand Up @@ -453,7 +456,7 @@ func (mgr *AssetsManager) PiKeys() [][]byte {
// AllVoteAgendas returns all agendas of all stake versions for the active
// network and this version of the software.
func (mgr *AssetsManager) AllVoteAgendas(newestFirst bool) ([]*dcr.Agenda, error) {
return dcr.AllVoteAgendas(mgr.chainsParams.DCR, newestFirst)
return mgr.ConsensusAgenda.AllVoteAgendas(mgr.chainsParams.DCR, newestFirst)
}

// sortWallets returns the watchonly wallets ordered last.
Expand Down
6 changes: 6 additions & 0 deletions libwallet/utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
)

type ProposalStatus int
type AgendaSyncStatus int

const (
AgendaStatusSynced AgendaSyncStatus = iota
AgendaStatusSyncing
)

const (
ProposalStatusSynced ProposalStatus = iota
Expand Down
Loading

0 comments on commit de0b634

Please sign in to comment.