-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathmain.go
139 lines (116 loc) · 3.27 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package main
import (
"context"
"flag"
"flare-ftso-indexer/chain"
"flare-ftso-indexer/config"
"flare-ftso-indexer/database"
"flare-ftso-indexer/indexer"
"flare-ftso-indexer/logger"
"os"
"os/signal"
"syscall"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/pkg/errors"
"gorm.io/gorm"
)
func main() {
defer logger.SyncFileLogger()
if err := run(context.Background()); err != nil {
logger.Fatal("Fatal error: %s", err)
}
}
func run(ctx context.Context) error {
flag.Parse()
cfg, err := config.BuildConfig()
if err != nil {
return errors.Wrap(err, "config error")
}
config.GlobalConfigCallback.Call(cfg)
// Sync logger when docker container stops or Ctrl+C is pressed
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
sig := <-signalChan
logger.Info("Received signal: %v", sig)
logger.SyncFileLogger()
os.Exit(0)
}()
ethClient, err := chain.DialRPCNode(cfg)
if err != nil {
return errors.Wrap(err, "Could not connect to the RPC nodes")
}
db, err := database.ConnectAndInitialize(ctx, &cfg.DB)
if err != nil {
return errors.Wrap(err, "Database connect and initialize errors")
}
if cfg.DB.HistoryDrop > 0 {
// Run an initial iteration of the history drop. This could take some
// time if it has not been run in a while after an outage - running
// separately avoids database clashes with the indexer.
logger.Info("running initial DropHistory iteration")
startTime := time.Now()
var firstBlockNumber uint64
err = backoff.RetryNotify(
func() (err error) {
firstBlockNumber, err = database.DropHistoryIteration(ctx, db, cfg.DB.HistoryDrop, ethClient)
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
},
backoff.NewExponentialBackOff(),
func(err error, d time.Duration) {
logger.Error("DropHistory error: %s. Will retry after %s", err, d)
},
)
if err != nil {
return errors.Wrap(err, "startup DropHistory error")
}
logger.Info("initial DropHistory iteration finished in %s", time.Since(startTime))
if firstBlockNumber > cfg.Indexer.StartIndex {
logger.Info("Setting new startIndex due to history drop: %d", firstBlockNumber)
cfg.Indexer.StartIndex = firstBlockNumber
}
}
return runIndexer(ctx, cfg, db, ethClient)
}
func runIndexer(ctx context.Context, cfg *config.Config, db *gorm.DB, ethClient *chain.Client) error {
cIndexer, err := indexer.CreateBlockIndexer(cfg, db, ethClient)
if err != nil {
return err
}
bOff := backoff.NewExponentialBackOff()
err = backoff.RetryNotify(
func() error {
return cIndexer.IndexHistory(ctx)
},
bOff,
func(err error, d time.Duration) {
logger.Error("Index history error: %s. Will retry after %s", err, d)
},
)
if err != nil {
return errors.Wrap(err, "Index history fatal error")
}
if cfg.DB.HistoryDrop > 0 {
go database.DropHistory(
ctx, db, cfg.DB.HistoryDrop, database.HistoryDropIntervalCheck, ethClient,
)
}
err = backoff.RetryNotify(
func() error {
return cIndexer.IndexContinuous(ctx)
},
bOff,
func(err error, d time.Duration) {
logger.Error("Index continuous error: %s. Will retry after %s", err, d)
},
)
if err != nil {
return errors.Wrap(err, "Index continuous fatal error")
}
logger.Info("Finished indexing")
return nil
}