Skip to content

Commit

Permalink
Add log comment for prom remote read queries
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Apr 5, 2024
1 parent 683712e commit e0e8146
Showing 1 changed file with 51 additions and 2 deletions.
53 changes: 51 additions & 2 deletions storage/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package clickhouse
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -165,7 +166,8 @@ func (ch *clickHouse) runTimeSeriesReloader(ctx context.Context) {
err := func() error {
query := fmt.Sprintf(queryTmpl, ch.database, distributedTimeSeriesV4)
ch.l.Debug("Running reloader query:", query)
rows, err := ch.db.Query(query, ch.lastLoadedTimeStamp)
ctx = ch.addClickHouseSettings(ctx)
rows, err := ch.db.QueryContext(ctx, query, ch.lastLoadedTimeStamp)
if err != nil {
return err
}
Expand Down Expand Up @@ -218,6 +220,51 @@ func (ch *clickHouse) runTimeSeriesReloader(ctx context.Context) {
}
}

func (ch *clickHouse) getLogComment(ctx context.Context) string {
// Get the key-value pairs from context for log comment
kv := ctx.Value("log_comment")
if kv == nil {
return ""
}

logCommentKVs, ok := kv.(map[string]string)
if !ok {
return ""
}

logComment, _ := json.Marshal(logCommentKVs)

return string(logComment)
}

func (ch *clickHouse) addClickHouseSettings(ctx context.Context) context.Context {
settings := clickhouse.Settings{}

logComment := ch.getLogComment(ctx)
if logComment != "" {
settings["log_comment"] = logComment
}

if os.Getenv("ClickHouseMaxExecutionTime") != "" {
settings["max_execution_time"] = os.Getenv("ClickHouseMaxExecutionTime")
}

if os.Getenv("ClickHouseMaxExecutionTimeLeaf") != "" {
settings["max_execution_time_leaf"] = os.Getenv("ClickHouseMaxExecutionTimeLeaf")
}

if os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed") != "" {
settings["timeout_before_checking_execution_speed"] = os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed")
}

if os.Getenv("ClickHouseMaxBytesToRead") != "" {
settings["max_bytes_to_read"] = os.Getenv("ClickHouseMaxBytesToRead")
}

ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
return ctx
}

func (ch *clickHouse) scanSamples(rows *sql.Rows, fingerprints map[uint64][]prompb.Label) ([]*prompb.TimeSeries, error) {
// scan results
var res []*prompb.TimeSeries
Expand Down Expand Up @@ -285,7 +332,7 @@ func (ch *clickHouse) querySamples(
query := fmt.Sprintf(`
SELECT metric_name, fingerprint, unix_milli, value
FROM %s.%s
WHERE metric_name = $1 AND fingerprint GLOBAL IN (%s) AND unix_milli >= $%s AND unix_milli <= $%s;`,
WHERE metric_name = $1 AND fingerprint GLOBAL IN (%s) AND unix_milli >= $%s AND unix_milli <= $%s ORDER BY fingerprint, unix_milli;`,
ch.database, distributedSamplesV4, subQuery, strconv.Itoa(argCount+2), strconv.Itoa(argCount+3))
query = strings.TrimSpace(query)

Expand All @@ -295,6 +342,7 @@ func (ch *clickHouse) querySamples(
allArgs = append(allArgs, start, end)

// run query
ctx = ch.addClickHouseSettings(ctx)
rows, err := ch.db.QueryContext(ctx, query, allArgs...)
if err != nil {
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -383,6 +431,7 @@ func (ch *clickHouse) prepareClickHouseQuery(query *prompb.Query, metricName str

func (ch *clickHouse) fingerprintsForQuery(ctx context.Context, query string, args []interface{}) (map[uint64][]prompb.Label, error) {
// run query
ctx = ch.addClickHouseSettings(ctx)
rows, err := ch.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
Expand Down

0 comments on commit e0e8146

Please sign in to comment.