Skip to content

Commit

Permalink
Merge pull request #17 from nlnwa/log-service
Browse files Browse the repository at this point in the history
Log service
  • Loading branch information
johnerikhalse authored Apr 21, 2021
2 parents 5b9e052 + cc3b222 commit 298d2a5
Show file tree
Hide file tree
Showing 13 changed files with 589 additions and 255 deletions.
7 changes: 2 additions & 5 deletions Corefile.docker
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
maxSizeMb 2048
contentWriterHost {$CONTENT_WRITER_HOST}
contentWriterPort {$CONTENT_WRITER_PORT}
dbHost {$DB_HOST}
dbPort {$DB_PORT}
dbUser {$DB_USER}
dbPassword {$DB_PASSWORD}
dbName {$DB_NAME}
logHost {$LOG_WRITER_HOST}
logPort {$LOG_WRITER_PORT}
}
forward . {$DNS_SERVER} {
# except IGNORED_NAMES...
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ require (
github.com/coredns/caddy v1.1.0
github.com/coredns/coredns v1.8.3
github.com/dnstap/golang-dnstap v0.4.0
github.com/golang/protobuf v1.4.3
github.com/golang/protobuf v1.5.0
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/miekg/dns v1.1.38
github.com/nlnwa/veidemann-api/go v0.0.0-20201211114104-e5b947faa1a3
github.com/nlnwa/veidemann-api/go v0.0.0-20210413093311-7ff38e848604
github.com/nlnwa/veidemann-log-service v0.1.0
github.com/opentracing/opentracing-go v1.2.0
github.com/prometheus/client_golang v1.9.0
golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d // indirect
Expand All @@ -19,5 +20,5 @@ require (
golang.org/x/text v0.3.5 // indirect
google.golang.org/genproto v0.0.0-20210219173056-d891e3cb3b5b // indirect
google.golang.org/grpc v1.35.0
gopkg.in/rethinkdb/rethinkdb-go.v6 v6.2.1
google.golang.org/protobuf v1.26.0
)
382 changes: 363 additions & 19 deletions go.sum

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ var directives = []string{
"forward",
}


func init() {
dnsserver.Directives = directives
}
Expand Down
77 changes: 51 additions & 26 deletions plugin/archivingcache/archivingcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
"github.com/coredns/coredns/plugin/metrics"
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/plugin/pkg/nonwriter"
"github.com/coredns/coredns/plugin/pkg/rcode"
"github.com/coredns/coredns/plugin/pkg/response"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
contentwriterV1 "github.com/nlnwa/veidemann-api/go/contentwriter/v1"
logV1 "github.com/nlnwa/veidemann-api/go/log/v1"
"github.com/nlnwa/veidemann-dns-resolver/plugin/forward"
"github.com/nlnwa/veidemann-dns-resolver/plugin/resolve"
"github.com/nlnwa/veidemann-log-service/pkg/logclient"
"google.golang.org/grpc/connectivity"
"google.golang.org/protobuf/types/known/timestamppb"
"strings"
"time"
)
Expand All @@ -27,27 +30,29 @@ var (
log = clog.NewWithPlugin("archivingcache")
)

// ArchivingCache is a cache plugin.
// ArchivingCache is a CoreDNS plugin.
type ArchivingCache struct {
Next plugin.Handler
Next plugin.Handler

cache *Cache
contentWriter *ContentWriterClient
db *database
now time.Time
logClient *logclient.LogClient

now time.Time
}

// NewArchivingCache returns a new instance of ArchivingCache
func NewArchivingCache(cache *Cache, db *database, cw *ContentWriterClient) *ArchivingCache {
func NewArchivingCache(cache *Cache, logClient *logclient.LogClient, cw *ContentWriterClient) *ArchivingCache {
return &ArchivingCache{
cache: cache,
db: db,
logClient: logClient,
contentWriter: cw,
now: time.Now().UTC(),
}
}

func (a *ArchivingCache) Ready() bool {
return a.db.Session.IsConnected() && a.contentWriter.conn.Conn.GetState() != connectivity.Shutdown
return a.contentWriter.GetState() != connectivity.Shutdown
}

// ServeDNS implements the plugin.Handler interface.
Expand All @@ -60,6 +65,7 @@ func (a *ArchivingCache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *
server := metrics.WithServer(ctx)
fetchStart := time.Now().UTC()
collectionId, hasCollectionId := ctx.Value(resolve.CollectionIdKey{}).(string)
executionId, _ := ctx.Value(resolve.ExecutionIdKey{}).(string)
key := state.Name() + state.Class() + state.Type()
var msg *dns.Msg

Expand All @@ -70,11 +76,8 @@ func (a *ArchivingCache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *
ctx = context.WithValue(ctx, forward.ProxyKey{}, &proxyAddr)
nw := nonwriter.New(w)
rc, err := plugin.NextOrFailure(a.Name(), a.Next, ctx, nw, r)
if err != nil || rc != dns.RcodeSuccess ||
nw.Msg == nil ||
(len(nw.Msg.Answer) == 0 && len(nw.Msg.Question) > 0 && nw.Msg.Question[0].Qtype == dns.TypeA) {
log.Debugf("%s: err: %v, archive: %v\n\n", rcode.ToString(rc), err, nw.Msg)
return rc, fmt.Errorf("rc: %v, err: %v, archive: %v\n\n", rc, err, nw.Msg)
if rc != dns.RcodeSuccess {
return rc, err
}
if proxyAddr == "" {
return dns.RcodeServerFailure, errors.New("failed to get proxy address")
Expand All @@ -92,10 +95,10 @@ func (a *ArchivingCache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *
// cache the response
err := a.set(key, msg, collectionId, proxyAddr, server)
if err != nil {
log.Warningf("Failed to cache new response: %v, %v, %v", state.Name(), entry, err)
log.Warningf("Failed to cache: %v: %v", key, err)
}
// archive the response
a.archive(state, msg, collectionId, proxyAddr, fetchStart)
a.archive(state, msg, executionId, collectionId, proxyAddr, fetchStart)
break out
}
}
Expand All @@ -109,18 +112,15 @@ func (a *ArchivingCache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *
entry.CollectionIds = append(entry.CollectionIds, collectionId)
err := a.update(key, entry)
if err != nil {
log.Warningf("failed to update cache entry: %v", err)
log.Warningf("Failed to update existing cache entry: %v, %v", state.Name(), err)
}
a.archive(state, msg, collectionId, entry.ProxyAddr, fetchStart)
a.archive(state, msg, executionId, collectionId, entry.ProxyAddr, fetchStart)
}
}
}

if err := w.WriteMsg(msg.SetReply(r)); err != nil {
return dns.RcodeServerFailure, err
} else {
return dns.RcodeSuccess, nil
}
w.WriteMsg(msg.SetReply(r))
return 0, nil
}

func (a *ArchivingCache) update(key string, entry *CacheEntry) error {
Expand Down Expand Up @@ -172,19 +172,19 @@ func (a *ArchivingCache) get(key string, server string) *CacheEntry {
return entry
}

// archive stores a dns record as a WARC record and as an entry in the crawl log.
func (a *ArchivingCache) archive(state *request.Request, msg *dns.Msg, collectionId, proxyAddr string, fetchStart time.Time) {
// archive writes a WARC record and a crawl log.
func (a *ArchivingCache) archive(state *request.Request, msg *dns.Msg, executionId string, collectionId string, proxyAddr string, fetchStart time.Time) {
fetchDurationMs := (time.Now().Sub(fetchStart).Nanoseconds() + 500000) / 1000000
requestedHost := strings.Trim(state.Name(), ".")
payload := []byte(fmt.Sprintf("%d%02d%02d%02d%02d%02d\n%s\n",
fetchStart.Year(), fetchStart.Month(), fetchStart.Day(),
fetchStart.Hour(), fetchStart.Minute(), fetchStart.Second(), msg.Answer[0]))

payload, reply, err := a.contentWriter.writeRecord(payload, fetchStart, requestedHost, proxyAddr, collectionId)
payload, reply, err := a.contentWriter.writeRecord(payload, fetchStart, requestedHost, proxyAddr, executionId, collectionId)
if err != nil {
log.Errorf("Failed to write WARC record: %v", err)
} else {
err := a.db.WriteCrawlLog(payload, reply.GetMeta().GetRecordMeta()[0], requestedHost, fetchStart, fetchDurationMs, proxyAddr)
err := a.WriteCrawlLog(payload, reply.GetMeta().GetRecordMeta()[0], requestedHost, fetchStart, fetchDurationMs, proxyAddr, executionId)
if err != nil {
log.Error("Failed to write crawl log: %w", err)
}
Expand All @@ -193,3 +193,28 @@ func (a *ArchivingCache) archive(state *request.Request, msg *dns.Msg, collectio

// Name implements the Handler interface.
func (a *ArchivingCache) Name() string { return "archivingcache" }

// WriteCrawlLog stores a crawl log of a dns request/response.
func (a *ArchivingCache) WriteCrawlLog(payload []byte, record *contentwriterV1.WriteResponseMeta_RecordMeta, requestedHost string, fetchStart time.Time, fetchDurationMs int64, proxyAddr string, executionId string) error {
crawlLog := &logV1.CrawlLog{
ExecutionId: executionId,
RecordType: "resource",
RequestedUri: "dns:" + requestedHost,
DiscoveryPath: "P",
StatusCode: 1,
TimeStamp: timestamppb.New(time.Now().UTC()),
FetchTimeStamp: timestamppb.New(fetchStart),
FetchTimeMs: fetchDurationMs,
IpAddress: proxyAddr,
ContentType: "text/dns",
Size: int64(len(payload)),
WarcId: record.GetWarcId(),
BlockDigest: record.GetBlockDigest(),
PayloadDigest: record.GetPayloadDigest(),
CollectionFinalName: record.GetCollectionFinalName(),
StorageRef: record.GetStorageRef(),
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return a.logClient.WriteCrawlLogs(ctx, []*logV1.CrawlLog{crawlLog})
}
Loading

0 comments on commit 298d2a5

Please sign in to comment.