Skip to content

Commit

Permalink
ingest: serve p.js
Browse files Browse the repository at this point in the history
ingest: repository data and p.js cached on disk for reliable startup
ingest: gzip responses
  • Loading branch information
absorbb committed Dec 18, 2023
1 parent 3cce144 commit 90601fc
Show file tree
Hide file tree
Showing 14 changed files with 514 additions and 143 deletions.
26 changes: 26 additions & 0 deletions admin/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module github.com/jitsucom/bulker/admin

go 1.21

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/hjson/hjson-go/v4 v4.3.1
)

require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
github.com/opencontainers/runc v1.1.7 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/testcontainers/testcontainers-go v0.25.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
google.golang.org/grpc v1.59.0 // indirect
)
69 changes: 69 additions & 0 deletions admin/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/hjson/hjson-go/v4"
"github.com/jitsucom/bulker/jitsubase/utils"
"os"
)

// add partitions to the topic
func main() {
bootstapServers := os.Getenv("KAFKA_BOOTSTRAP_SERVERS")
securityProtocol := utils.Nvl(os.Getenv("KAFKA_SECURITY_PROTOCOL"), "SASL_SSL")
kafkaSasl := os.Getenv("KAFKA_SASL")

kafkaConfig := &kafka.ConfigMap{
"client.id": "bulkerapp_admin",
"bootstrap.servers": bootstapServers,
"reconnect.backoff.ms": 1000,
"reconnect.backoff.max.ms": 10000,
}
if securityProtocol != "" {
_ = kafkaConfig.SetKey("security.protocol", securityProtocol)
}
_ = kafkaConfig.SetKey("enable.ssl.certificate.verification", false)
if kafkaSasl != "" {
sasl := map[string]interface{}{}
err := hjson.Unmarshal([]byte(kafkaSasl), &sasl)
if err != nil {
panic(fmt.Errorf("error parsing Kafka SASL config: %v", err))
}
for k, v := range sasl {
_ = kafkaConfig.SetKey("sasl."+k, v)
}
}
admin, err := kafka.NewAdminClient(kafkaConfig)
if err != nil {
panic(fmt.Errorf("error creating Kafka admin client: %v", err))
}
m, err := admin.GetMetadata(nil, true, 10000)
if err != nil {
panic(fmt.Errorf("error getting Kafka metadata: %v", err))
}
fmt.Println(m.Brokers)
fmt.Print("Enter topic name to increase partitions: ")
var topic string
_, err = fmt.Scanln(&topic)
if err != nil {
panic(fmt.Errorf("error reading topic name: %v", err))
}
fmt.Printf("Enter new number of partitions for topic '%s': ", topic)
var partitions int
_, err = fmt.Scanln(&partitions)
if err != nil {
panic(fmt.Errorf("error reading partitions number: %v", err))
}
res, err := admin.CreatePartitions(context.Background(), []kafka.PartitionsSpecification{
{
Topic: topic,
IncreaseTo: partitions,
},
})
if err != nil {
panic(fmt.Errorf("error creating partitions: %v", err))
}
fmt.Println(res)
}
5 changes: 4 additions & 1 deletion ingest/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Context struct {
kafkaConfig *kafka.ConfigMap
dbpool *pgxpool.Pool
repository *Repository
script *Script
producer *kafkabase.Producer
eventsLogService eventslog.EventsLogService
server *http.Server
Expand All @@ -37,7 +38,8 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
if err != nil {
return fmt.Errorf("Unable to create postgres connection pool: %v\n", err)
}
a.repository = NewRepository(a.dbpool, a.config.RepositoryRefreshPeriodSec)
a.repository = NewRepository(a.dbpool, a.config.RepositoryRefreshPeriodSec, a.config.CacheDir)
a.script = NewScript(a.config.ScriptOrigin, a.config.CacheDir)
a.eventsLogService = &eventslog.DummyEventsLogService{}
eventsLogRedisUrl := a.config.RedisURL
if eventsLogRedisUrl != "" {
Expand Down Expand Up @@ -81,6 +83,7 @@ func (a *Context) Cleanup() error {
}
_ = a.metricsServer.Stop()
_ = a.eventsLogService.Close()
a.script.Close()
a.repository.Close()
a.dbpool.Close()
return nil
Expand Down
6 changes: 6 additions & 0 deletions ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ type Config struct {
// For ingest endpoint only
GlobalHashSecrets []string

// URL to fetch p.js javascript code from
ScriptOrigin string `mapstructure:"SCRIPT_ORIGIN" default:"https://cdn.jsdelivr.net/npm/@jitsu/js@latest/dist/web/p.js.txt"`

//Cache dir for repository and javascript data
CacheDir string `mapstructure:"CACHE_DIR"`

BackupLogDir string `mapstructure:"BACKUP_LOG_DIR"`
BackupLogTTL int `mapstructure:"BACKUP_LOG_TTL_DAYS" default:"7"`
BackupLogRotateHours int `mapstructure:"BACKUP_LOG_ROTATE_HOURS" default:"24"`
Expand Down
3 changes: 3 additions & 0 deletions ingest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/jackc/pgx/v5 v5.4.3
github.com/json-iterator/go v1.1.12
github.com/penglongli/gin-metrics v0.1.10
github.com/prometheus/client_golang v1.17.0
github.com/spf13/viper v1.17.0
github.com/vearne/gin-timeout v0.1.7
Expand All @@ -15,6 +16,7 @@ require (

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
Expand Down Expand Up @@ -44,6 +46,7 @@ require (
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
github.com/opencontainers/runc v1.1.7 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions ingest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/hcsshim v0.11.0 h1:7EFNIY4igHEXUdj1zXgAyU3fLc7QfOKHbkldRVTBdiM=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand All @@ -18,6 +19,7 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
github.com/gin-contrib/gzip v0.0.6 h1:NjcunTcGAj5CO1gn4N8jHOSIeRFHIbn51z6K+xaN4d4=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
Expand Down Expand Up @@ -56,6 +58,7 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI=
github.com/opencontainers/runc v1.1.7 h1:y2EZDS8sNng4Ksf0GUYNhKbTShZJPJg1FiXJNH/uoCk=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/penglongli/gin-metrics v0.1.10 h1:mNNWCM3swMOVHwzrHeXsE4C/myu8P/HIFohtyMi9rN8=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
Expand Down
9 changes: 9 additions & 0 deletions ingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ var (
return ingestedMessages.WithLabelValues(destinationId, status, errorType)
}

repositoryErrors = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "ingest",
Subsystem: "repository",
Name: "error",
})
RepositoryErrors = func() prometheus.Counter {
return repositoryErrors
}

panics = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "bulkerapp",
Subsystem: "safego",
Expand Down
108 changes: 98 additions & 10 deletions ingest/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package main

import (
"context"
"encoding/json"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/safego"
jsoniter "github.com/json-iterator/go"
"io"
"os"
"path"
"sync/atomic"
"time"
)
Expand Down Expand Up @@ -33,53 +37,133 @@ type Repository struct {
appbase.Service
dbpool *pgxpool.Pool
refreshPeriodSec int
inited atomic.Bool
cacheDir string
apiKeyBindings atomic.Pointer[map[string]*ApiKeyBinding]
streamsByIds atomic.Pointer[map[string]*StreamWithDestinations]
streamsByDomains atomic.Pointer[map[string][]*StreamWithDestinations]
closed chan struct{}
}

func NewRepository(dbpool *pgxpool.Pool, refreshPeriodSec int) *Repository {
type RepositoryCache struct {
ApiKeyBindings map[string]*ApiKeyBinding `json:"apiKeyBindings"`
StreamsByIds map[string]*StreamWithDestinations `json:"streamsByIds"`
StreamsByDomains map[string][]*StreamWithDestinations `json:"streamsByDomains"`
}

func NewRepository(dbpool *pgxpool.Pool, refreshPeriodSec int, cacheDir string) *Repository {
base := appbase.NewServiceBase("repository")
r := &Repository{
Service: base,
dbpool: dbpool,
refreshPeriodSec: refreshPeriodSec,
cacheDir: cacheDir,
closed: make(chan struct{}),
}
r.refresh()
r.Start()
r.start()
return r
}

func (r *Repository) loadCached() {
file, err := os.Open(path.Join(r.cacheDir, "repository.json"))
if err != nil {
r.Fatalf("Error opening cached repository: %v\nCannot serve without repository. Exitting...", err)
return
}
stat, err := file.Stat()
if err != nil {
r.Fatalf("Error getting cached repository info: %v\nCannot serve without repository. Exitting...", err)
return
}
fileSize := stat.Size()
if fileSize == 0 {
r.Fatalf("Cached repository is empty\nCannot serve without repository. Exitting...")
return
}
payload, err := io.ReadAll(file)
if err != nil {
r.Fatalf("Error reading cached script: %v\nCannot serve without repository. Exitting...", err)
return
}
repositoryCache := RepositoryCache{}
err = jsoniter.Unmarshal(payload, &repositoryCache)
if err != nil {
r.Fatalf("Error unmarshalling cached repository: %v\nCannot serve without repository. Exitting...", err)
return
}
r.apiKeyBindings.Store(&repositoryCache.ApiKeyBindings)
r.streamsByIds.Store(&repositoryCache.StreamsByIds)
r.streamsByDomains.Store(&repositoryCache.StreamsByDomains)
r.inited.Store(true)
r.Infof("Loaded cached repository data: %d bytes, last modified: %v", fileSize, stat.ModTime())
}

func (r *Repository) storeCached(payload RepositoryCache) {
filePath := path.Join(r.cacheDir, "repository.json")
err := os.MkdirAll(r.cacheDir, 0755)
if err != nil {
r.Errorf("Cannot write cached repository to %s: cannot make dir: %v", filePath, err)
return
}
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
r.Errorf("Cannot write cached repository to %s: %v", filePath, err)
return
}
err = json.NewEncoder(file).Encode(payload)
if err != nil {
r.Errorf("Cannot write cached repository to %s: %v", filePath, err)
return
}
err = file.Sync()
if err != nil {
r.Errorf("Cannot write cached script to %s: %v", filePath, err)
return
}
}

func (r *Repository) refresh() {
start := time.Now()
apiKeyBindings := map[string]*ApiKeyBinding{}
streamsByIds := map[string]*StreamWithDestinations{}
streamsByDomains := map[string][]*StreamWithDestinations{}
var err error
defer func() {
r.Debugf("Refreshed in %v", time.Now().Sub(start))
if err != nil {
r.Errorf("Error refreshing repository: %v", err)
RepositoryErrors().Add(1)
if !r.inited.Load() {
if r.cacheDir != "" {
r.loadCached()
} else {
r.Fatalf("Cannot load cached repository. No CACHE_DIR is set. Cannot serve without repository. Exitting...")
}
}
} else {
r.Debugf("Refreshed in %v", time.Now().Sub(start))
}
}()
rows, err := r.dbpool.Query(context.Background(), SQLQuery)
if err != nil {
r.Errorf("Error querying streams: %v", err)
err = r.NewError("Error querying streams: %v", err)
return
}
defer rows.Close()
for rows.Next() {
var streamId string
var streamConfig string
err := rows.Scan(&streamId, &streamConfig)
err = rows.Scan(&streamId, &streamConfig)
if err != nil {
r.Errorf("Error scanning row: %v", err)
continue
err = r.NewError("Error scanning row: %v", err)
return
}
//r.Infof("Stream %s: %s", streamId, streamConfig)
s := StreamWithDestinations{}
err = jsoniter.UnmarshalFromString(streamConfig, &s)
if err != nil {
r.Errorf("Error unmarshalling stream config: %v", err)
continue
err = r.NewError("Error unmarshalling stream config: %v", err)
return
}
s.init()
streamsByIds[s.Stream.Id] = &s
Expand Down Expand Up @@ -108,9 +192,13 @@ func (r *Repository) refresh() {
r.apiKeyBindings.Store(&apiKeyBindings)
r.streamsByIds.Store(&streamsByIds)
r.streamsByDomains.Store(&streamsByDomains)
r.inited.Store(true)
if r.cacheDir != "" {
r.storeCached(RepositoryCache{ApiKeyBindings: apiKeyBindings, StreamsByIds: streamsByIds, StreamsByDomains: streamsByDomains})
}
}

func (r *Repository) Start() {
func (r *Repository) start() {
safego.RunWithRestart(func() {
ticker := time.NewTicker(time.Duration(r.refreshPeriodSec) * time.Second)
for {
Expand Down
Loading

0 comments on commit 90601fc

Please sign in to comment.