Skip to content

Commit

Permalink
add daily index option
Browse files Browse the repository at this point in the history
  • Loading branch information
pwillie committed May 28, 2019
1 parent 29ffb2a commit c17bdc2
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 32 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ A read and write adapter for prometheus persistent storage.
| ES_BATCH_MAX_DOCS | 1000 | Max items for bulk Elasticsearch insert operation |
| ES_BATCH_MAX_SIZE | 4096 | Max size in bytes for bulk Elasticsearch insert operation |
| ES_ALIAS | prom-metrics | Elasticsearch alias pointing to active write index |
| ES_INDEX_DAILY | false | Create daily indexes and disable index rollover |
| ES_INDEX_SHARDS | 5 | Number of Elasticsearch shards to create per index |
| ES_INDEX_REPLICAS | 1 | Number of Elasticsearch replicas to create per index |
| ES_INDEX_MAX_AGE | 7d | Max age of Elasticsearch index before rollover |
Expand Down
35 changes: 22 additions & 13 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,21 @@ import (
"fmt"
"net/http"

"go.uber.org/zap"

"github.com/TV4/graceful"
gorilla "github.com/gorilla/handlers"
"github.com/namsral/flag"
"github.com/pwillie/prometheus-es-adapter/pkg/elasticsearch"
"github.com/pwillie/prometheus-es-adapter/pkg/handlers"
"github.com/pwillie/prometheus-es-adapter/pkg/logger"
"go.uber.org/zap"
elastic "gopkg.in/olivere/elastic.v6"
)

var (
// Build number populated during build
Build string
// Git commit number populated during build
Commit string
// Build number populated during build
Build string
// Commit hash populated during build
Commit string
)

func main() {
Expand All @@ -33,6 +32,7 @@ func main() {
batchMaxDocs = flag.Int("es_batch_max_docs", 1000, "Max items for bulk Elasticsearch insert operation")
batchMaxSize = flag.Int("es_batch_max_size", 4096, "Max size in bytes for bulk Elasticsearch insert operation")
indexAlias = flag.String("es_alias", "prom-metrics", "Elasticsearch alias pointing to active write index")
indexDaily = flag.Bool("es_index_daily", false, "Create daily indexes and disable index management service")
indexShards = flag.Int("es_index_shards", 5, "Number of Elasticsearch shards to create per index")
indexReplicas = flag.Int("es_index_replicas", 1, "Number of Elasticsearch replicas to create per index")
indexMaxAge = flag.String("es_index_max_age", "7d", "Max age of Elasticsearch index before rollover")
Expand Down Expand Up @@ -65,17 +65,25 @@ func main() {
}
defer client.Stop()

indexCfg := &elasticsearch.IndexConfig{
err = elasticsearch.EnsureIndexTemplate(ctx, client, &elasticsearch.IndexTemplateConfig{
Alias: *indexAlias,
MaxAge: *indexMaxAge,
MaxDocs: *indexMaxDocs,
MaxSize: *indexMaxSize,
Shards: *indexShards,
Replicas: *indexReplicas,
}
_, err = elasticsearch.NewIndexService(ctx, log, client, indexCfg)
})
if err != nil {
log.Fatal("Failed to create indexer", zap.Error(err))
log.Fatal("Failed to create index template", zap.Error(err))
}

if !*indexDaily {
_, err = elasticsearch.NewIndexService(ctx, log, client, &elasticsearch.IndexConfig{
Alias: *indexAlias,
MaxAge: *indexMaxAge,
MaxDocs: *indexMaxDocs,
MaxSize: *indexMaxSize,
})
if err != nil {
log.Fatal("Failed to create indexer", zap.Error(err))
}
}

readCfg := &elasticsearch.ReadConfig{
Expand All @@ -86,6 +94,7 @@ func main() {

writeCfg := &elasticsearch.WriteConfig{
Alias: *indexAlias,
Daily: *indexDaily,
MaxAge: *batchMaxAge,
MaxDocs: *batchMaxDocs,
MaxSize: *batchMaxSize,
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ services:
- '8001:9000'
environment:
- ES_URL=http://elasticsearch:9200
- ES_INDEX_DAILY=true
- ES_INDEX_SHARDS=1
- ES_INDEX_REPLICAS=0
- ES_INDEX_MAX_DOCS=2000
Expand Down
21 changes: 11 additions & 10 deletions pkg/elasticsearch/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"go.uber.org/zap"

elastic "gopkg.in/olivere/elastic.v6"
)

Expand All @@ -22,10 +21,15 @@ type IndexService struct {

// IndexConfig is used to configure IndexService
type IndexConfig struct {
Alias string
MaxAge string
MaxDocs int64
MaxSize string
}

// IndexTemplateConfig is used to resolve template
type IndexTemplateConfig struct {
Alias string
MaxAge string
MaxDocs int64
MaxSize string
Shards int
Replicas int
}
Expand All @@ -39,26 +43,23 @@ func NewIndexService(ctx context.Context, logger *zap.Logger, client *elastic.Cl
config: config,
logger: logger,
}
if err := svc.createIndexTemplate(); err != nil {
return nil, err
}
if err := svc.createIndex(); err != nil {
return nil, err
}
go svc.rolloverIndex()
return svc, nil
}

func (svc *IndexService) createIndexTemplate() error {
func EnsureIndexTemplate(ctx context.Context, client *elastic.Client, config *IndexTemplateConfig) error {
var buf bytes.Buffer
t := template.Must(template.New("template").Parse(indexTemplate))
err := t.Execute(&buf, svc.config)
err := t.Execute(&buf, config)
if err != nil {
return fmt.Errorf("executing template: %s", err)
}
payload := buf.String()

_, err = svc.client.IndexPutTemplate(svc.config.Alias).BodyString(payload).Do(svc.ctx)
_, err = client.IndexPutTemplate(config.Alias).BodyString(payload).Do(ctx)
if err != nil {
return fmt.Errorf("Failed to create index template: %s", err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/elasticsearch/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type WriteService struct {
// WriteConfig is used to configure WriteService
type WriteConfig struct {
Alias string
Daily bool
MaxAge int
MaxDocs int
MaxSize int
Expand Down Expand Up @@ -67,6 +68,7 @@ func (svc *WriteService) Close() error {

// Write will enqueue Prometheus sample data to be batch written to Elasticsearch
func (svc *WriteService) Write(req []*prompb.TimeSeries) {
index := svc.config.Alias
for _, ts := range req {
metric := make(model.Metric, len(ts.Labels))
for _, l := range ts.Labels {
Expand All @@ -83,9 +85,12 @@ func (svc *WriteService) Write(req []*prompb.TimeSeries) {
v,
s.Timestamp,
}
if svc.config.Daily {
index = svc.config.Alias + "-" + time.Unix(s.Timestamp/1000, 0).Format("2006-01-02")
}
r := elastic.
NewBulkIndexRequest().
Index(svc.config.Alias).
Index(index).
Type(sampleType).
Doc(sample)
svc.processor.Add(r)
Expand Down
17 changes: 13 additions & 4 deletions pkg/handlers/adapter.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package handlers

import (
"context"
"fmt"
"io/ioutil"
"net/http"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/pwillie/prometheus-es-adapter/pkg/elasticsearch"
)

func writeHandler(svc *elasticsearch.WriteService) http.HandlerFunc {
type writeService interface {
Write([]*prompb.TimeSeries)
}

func writeHandler(svc writeService) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -33,14 +38,18 @@ func writeHandler(svc *elasticsearch.WriteService) http.HandlerFunc {

svc.Write(req.Timeseries)
if err != nil {
// log.Println("msg", "Error sending samples to remote storage", "err", err, "storage", "num_samples", len(samples))
http.Error(w, "Error sending samples to remote storage", http.StatusInternalServerError)
}
}
}

func readHandler(svc *elasticsearch.ReadService) http.HandlerFunc {
type readService interface {
Read(context.Context, []*prompb.Query) ([]*prompb.QueryResult, error)
}

func readHandler(svc readService) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
3 changes: 1 addition & 2 deletions pkg/handlers/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (

func healthzHandler(client *elastic.Client) http.Handler {
health := healthcheck.NewHandler()
health.AddReadinessCheck("elasticsearch",
esCheck(client))
health.AddReadinessCheck("elasticsearch", esCheck(client))
return health
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/handlers/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package handlers
import (
"net/http"

"gopkg.in/olivere/elastic.v6"

"github.com/prometheus/client_golang/prometheus"
"github.com/pwillie/prometheus-es-adapter/pkg/elasticsearch"
"gopkg.in/olivere/elastic.v6"
)

// NewRouter returns a configured http router
Expand Down

0 comments on commit c17bdc2

Please sign in to comment.