Skip to content

Commit

Permalink
implement metrics for fast_forward and cache
Browse files Browse the repository at this point in the history
  • Loading branch information
IrineSistiana committed Jun 24, 2022
1 parent 561508c commit 82f1496
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
26 changes: 24 additions & 2 deletions plugin/executable/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/IrineSistiana/mosdns/v4/pkg/cache/redis_cache"
"github.com/IrineSistiana/mosdns/v4/pkg/dnsutils"
"github.com/IrineSistiana/mosdns/v4/pkg/executable_seq"
"github.com/IrineSistiana/mosdns/v4/pkg/metrics"
"github.com/IrineSistiana/mosdns/v4/pkg/query_context"
"github.com/IrineSistiana/mosdns/v4/pkg/utils"
"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -73,6 +74,14 @@ type cachePlugin struct {
whenHit executable_seq.Executable
backend cache.Backend
lazyUpdateSF singleflight.Group

m *cacheMetrics
}

type cacheMetrics struct {
query *metrics.Counter
hit *metrics.Counter
lazyHit *metrics.Counter
}

func Init(bp *coremain.BP, args interface{}) (p coremain.Plugin, err error) {
Expand Down Expand Up @@ -109,12 +118,22 @@ func newCachePlugin(bp *coremain.BP, args *Args) (*cachePlugin, error) {
}
}

return &cachePlugin{
p := &cachePlugin{
BP: bp,
args: args,
whenHit: whenHit,
backend: c,
}, nil
}
m := &cacheMetrics{
query: metrics.NewCounter(),
hit: metrics.NewCounter(),
lazyHit: metrics.NewCounter(),
}
bp.GetMetricsReg().Set("query", m.query)
bp.GetMetricsReg().Set("hit", m.hit)
bp.GetMetricsReg().Set("lazy_hit", m.lazyHit)
p.m = m
return p, nil
}

func (c *cachePlugin) skip(q *dns.Msg) bool {
Expand All @@ -126,6 +145,7 @@ func (c *cachePlugin) skip(q *dns.Msg) bool {
}

func (c *cachePlugin) Exec(ctx context.Context, qCtx *query_context.Context, next executable_seq.ExecutableChainNode) error {
c.m.query.Inc(1)
q := qCtx.Q()
if c.skip(q) {
c.L().Debug("skipped", qCtx.InfoField())
Expand All @@ -142,6 +162,7 @@ func (c *cachePlugin) Exec(ctx context.Context, qCtx *query_context.Context, nex

// cache hit
if v != nil {
c.m.hit.Inc(1)
r := new(dns.Msg)
if err := r.Unpack(v); err != nil {
return fmt.Errorf("failed to unpack cached data, %w", err)
Expand All @@ -168,6 +189,7 @@ func (c *cachePlugin) Exec(ctx context.Context, qCtx *query_context.Context, nex

// expired but lazy update enabled
if c.args.LazyCacheTTL > 0 {
c.m.lazyHit.Inc(1)
c.L().Debug("expired cache hit", qCtx.InfoField())
// prepare a response with 1 ttl
dnsutils.SetTTL(r, uint32(c.args.LazyCacheReplyTTL))
Expand Down
30 changes: 29 additions & 1 deletion plugin/executable/fast_forward/fast_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import (
"github.com/IrineSistiana/mosdns/v4/coremain"
"github.com/IrineSistiana/mosdns/v4/pkg/bundled_upstream"
"github.com/IrineSistiana/mosdns/v4/pkg/executable_seq"
"github.com/IrineSistiana/mosdns/v4/pkg/metrics"
"github.com/IrineSistiana/mosdns/v4/pkg/query_context"
"github.com/IrineSistiana/mosdns/v4/pkg/upstream"
"github.com/IrineSistiana/mosdns/v4/pkg/utils"
"github.com/miekg/dns"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -137,7 +139,17 @@ func newFastForward(bp *coremain.BP, args *Args) (*fastForward, error) {
address: c.Addr,
trusted: c.Trusted,
u: u,
m: upstreamMetrics{
query: metrics.NewCounter(),
err: metrics.NewCounter(),
latency: metrics.NewHistogram(128),
},
}
upstreamReg := metrics.NewRegistry()
upstreamReg.Set("query", wu.m.query)
upstreamReg.Set("err", wu.m.err)
upstreamReg.Set("latency", wu.m.latency)
bp.GetMetricsReg().Set(strconv.Itoa(i), upstreamReg)

if i == 0 { // Set first upstream as trusted upstream.
wu.trusted = true
Expand All @@ -155,10 +167,26 @@ type upstreamWrapper struct {
address string
trusted bool
u upstream.Upstream

m upstreamMetrics
}

type upstreamMetrics struct {
query *metrics.Counter
err *metrics.Counter
latency *metrics.Histogram
}

func (u *upstreamWrapper) Exchange(ctx context.Context, q *dns.Msg) (*dns.Msg, error) {
return u.u.ExchangeContext(ctx, q)
u.m.query.Inc(1)
start := time.Now()
r, err := u.u.ExchangeContext(ctx, q)
if err != nil {
u.m.err.Inc(1)
} else {
u.m.latency.Update(time.Since(start).Milliseconds())
}
return r, err
}

func (u *upstreamWrapper) Address() string {
Expand Down

0 comments on commit 82f1496

Please sign in to comment.