-
Notifications
You must be signed in to change notification settings - Fork 769
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
3404652
commit de76b1f
Showing
11 changed files
with
1,356 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# agma Analytics | ||
|
||
In order to use the Agma Analytics Adapter, please adjust the accounts with the data provided by agma (https://www.agma-mmc.de). | ||
|
||
## Configuration | ||
|
||
```yaml | ||
analytics: | ||
agma: | ||
# Required: enable the module | ||
enabled: true | ||
# Required: set the accounts you want to track | ||
accounts: | ||
- code: "my-code" # Required: provied by agma | ||
publisher_id: "123" # Required: Exchange specific publisher_id | ||
site_app_id: "openrtb2-site.id-or-app.id" # optional: scope to the publisher with an openrtb2 Site object id or App object id | ||
# Optional properties (advanced configuration) | ||
endpoint: | ||
url: "https://pbs-go.agma-analytics.de/v1/prebid-server" | ||
timeout: "2s" | ||
gzip: true | ||
buffer: # Flush events when (first condition reached) | ||
# Size of the buffer in bytes | ||
size: "2MB" # greater than 2MB (size using SI standard eg. "44kB", "17MB") | ||
count : 100 # greater than 100 events | ||
timeout: "15m" # greater than 15 minutes (parsed as golang duration) | ||
|
||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,267 @@ | ||
package agma | ||
|
||
import ( | ||
"bytes" | ||
"errors" | ||
"net/http" | ||
"os" | ||
"os/signal" | ||
"sync" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/benbjohnson/clock" | ||
"github.com/docker/go-units" | ||
"github.com/golang/glog" | ||
"github.com/prebid/go-gdpr/vendorconsent" | ||
"github.com/prebid/prebid-server/v2/analytics" | ||
"github.com/prebid/prebid-server/v2/config" | ||
"github.com/prebid/prebid-server/v2/openrtb_ext" | ||
) | ||
|
||
type httpSender = func(payload []byte) error | ||
|
||
const ( | ||
agmaGVLID = 1122 | ||
analyticsPurpose = 7 | ||
) | ||
|
||
type AgamLogger struct { | ||
sender httpSender | ||
clock clock.Clock | ||
accounts []config.AgmaAnalyticsAccount | ||
eventCount int64 | ||
maxEventCount int64 | ||
maxBufferByteSize int64 | ||
maxDuration time.Duration | ||
mux sync.RWMutex | ||
sigTermCh chan os.Signal | ||
buffer bytes.Buffer | ||
bufferCh chan []byte | ||
} | ||
|
||
func newAgmaLogger(cfg config.AgmaAnalytics, sender httpSender, clock clock.Clock) (*AgamLogger, error) { | ||
pSize, err := units.FromHumanSize(cfg.Buffers.BufferSize) | ||
if err != nil { | ||
return nil, err | ||
} | ||
pDuration, err := time.ParseDuration(cfg.Buffers.Timeout) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if len(cfg.Accounts) == 0 { | ||
return nil, errors.New("Please configure at least one account for Agma Analytics") | ||
} | ||
|
||
buffer := bytes.Buffer{} | ||
buffer.Write([]byte("[")) | ||
|
||
return &AgamLogger{ | ||
sender: sender, | ||
clock: clock, | ||
accounts: cfg.Accounts, | ||
maxBufferByteSize: pSize, | ||
eventCount: 0, | ||
maxEventCount: int64(cfg.Buffers.EventCount), | ||
maxDuration: pDuration, | ||
buffer: buffer, | ||
bufferCh: make(chan []byte), | ||
sigTermCh: make(chan os.Signal, 1), | ||
}, nil | ||
} | ||
|
||
func NewModule(httpClient *http.Client, cfg config.AgmaAnalytics, clock clock.Clock) (analytics.Module, error) { | ||
sender, err := createHttpSender(httpClient, cfg.Endpoint) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
m, err := newAgmaLogger(cfg, sender, clock) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
signal.Notify(m.sigTermCh, os.Interrupt, syscall.SIGTERM) | ||
|
||
go m.start() | ||
|
||
return m, nil | ||
} | ||
|
||
func (l *AgamLogger) start() { | ||
ticker := l.clock.Ticker(l.maxDuration) | ||
for { | ||
select { | ||
case <-l.sigTermCh: | ||
glog.Infof("[AgmaAnalytics] Received Close, trying to flush buffer") | ||
l.flush() | ||
return | ||
case event := <-l.bufferCh: | ||
l.bufferEvent(event) | ||
if l.isFull() { | ||
l.flush() | ||
} | ||
case <-ticker.C: | ||
l.flush() | ||
} | ||
} | ||
} | ||
|
||
func (l *AgamLogger) bufferEvent(data []byte) { | ||
l.mux.Lock() | ||
defer l.mux.Unlock() | ||
|
||
l.buffer.Write(data) | ||
l.buffer.WriteByte(',') | ||
l.eventCount++ | ||
} | ||
|
||
func (l *AgamLogger) isFull() bool { | ||
l.mux.RLock() | ||
defer l.mux.RUnlock() | ||
return l.eventCount >= l.maxEventCount || int64(l.buffer.Len()) >= l.maxBufferByteSize | ||
} | ||
|
||
func (l *AgamLogger) flush() { | ||
l.mux.Lock() | ||
|
||
if l.eventCount == 0 || l.buffer.Len() == 0 { | ||
l.mux.Unlock() | ||
return | ||
} | ||
|
||
// Close the json array, remove last , | ||
l.buffer.Truncate(l.buffer.Len() - 1) | ||
_, err := l.buffer.Write([]byte("]")) | ||
if err != nil { | ||
l.reset() | ||
l.mux.Unlock() | ||
glog.Warning("[AgmaAnalytics] fail to close the json array") | ||
return | ||
} | ||
|
||
payload := make([]byte, l.buffer.Len()) | ||
_, err = l.buffer.Read(payload) | ||
if err != nil { | ||
l.reset() | ||
l.mux.Unlock() | ||
glog.Warning("[AgmaAnalytics] fail to copy the buffer") | ||
return | ||
} | ||
|
||
go l.sender(payload) | ||
|
||
l.reset() | ||
l.mux.Unlock() | ||
} | ||
|
||
func (l *AgamLogger) reset() { | ||
l.buffer.Reset() | ||
l.buffer.Write([]byte("[")) | ||
l.eventCount = 0 | ||
} | ||
|
||
func (l *AgamLogger) shouldTrackEvent(requestWrapper *openrtb_ext.RequestWrapper) (bool, string) { | ||
userExt, err := requestWrapper.GetUserExt() | ||
if err != nil || userExt == nil { | ||
return false, "" | ||
} | ||
consent := userExt.GetConsent() | ||
if consent == nil { | ||
return false, "" | ||
} | ||
consentStr := *consent | ||
parsedConsent, err := vendorconsent.ParseString(consentStr) | ||
if err != nil { | ||
return false, "" | ||
} | ||
|
||
analyticsAllowed := parsedConsent.PurposeAllowed(analyticsPurpose) | ||
agmaAllowed := parsedConsent.VendorConsent(agmaGVLID) | ||
if !analyticsAllowed || !agmaAllowed { | ||
return false, "" | ||
} | ||
publisherId := "" | ||
appSiteId := "" | ||
if requestWrapper.Site != nil { | ||
if requestWrapper.Site.Publisher != nil { | ||
publisherId = requestWrapper.Site.Publisher.ID | ||
} | ||
appSiteId = requestWrapper.Site.ID | ||
} | ||
if requestWrapper.App != nil { | ||
if requestWrapper.App.Publisher != nil { | ||
publisherId = requestWrapper.App.Publisher.ID | ||
} | ||
appSiteId = requestWrapper.App.ID | ||
} | ||
|
||
if publisherId == "" && appSiteId == "" { | ||
return false, "" | ||
} | ||
|
||
for _, account := range l.accounts { | ||
if account.PublisherId == publisherId { | ||
if account.SiteAppId == "" { | ||
return true, account.Code | ||
} | ||
if account.SiteAppId == appSiteId { | ||
return true, account.Code | ||
} | ||
} | ||
} | ||
|
||
return false, "" | ||
} | ||
|
||
func (l *AgamLogger) LogAuctionObject(event *analytics.AuctionObject) { | ||
if event == nil || event.Status != http.StatusOK || event.RequestWrapper == nil { | ||
return | ||
} | ||
shouldTrack, code := l.shouldTrackEvent(event.RequestWrapper) | ||
if !shouldTrack { | ||
return | ||
} | ||
data, err := serializeAnayltics(event.RequestWrapper, EventTypeAuction, code, event.StartTime) | ||
if err != nil { | ||
glog.Errorf("[AgmaAnalytics] Error serializing auction object: %v", err) | ||
return | ||
} | ||
l.bufferCh <- data | ||
} | ||
|
||
func (l *AgamLogger) LogAmpObject(event *analytics.AmpObject) { | ||
if event == nil || event.Status != http.StatusOK || event.RequestWrapper == nil { | ||
return | ||
} | ||
shouldTrack, code := l.shouldTrackEvent(event.RequestWrapper) | ||
if !shouldTrack { | ||
return | ||
} | ||
data, err := serializeAnayltics(event.RequestWrapper, EventTypeAmp, code, event.StartTime) | ||
if err != nil { | ||
glog.Errorf("[AgmaAnalytics] Error serializing amp object: %v", err) | ||
return | ||
} | ||
l.bufferCh <- data | ||
} | ||
|
||
func (l *AgamLogger) LogVideoObject(event *analytics.VideoObject) { | ||
if event == nil || event.Status != http.StatusOK || event.RequestWrapper == nil { | ||
return | ||
} | ||
shouldTrack, code := l.shouldTrackEvent(event.RequestWrapper) | ||
if !shouldTrack { | ||
return | ||
} | ||
data, err := serializeAnayltics(event.RequestWrapper, EventTypeVideo, code, event.StartTime) | ||
if err != nil { | ||
glog.Errorf("[AgmaAnalytics] Error serializing video object: %v", err) | ||
return | ||
} | ||
l.bufferCh <- data | ||
} | ||
|
||
func (l *AgamLogger) LogCookieSyncObject(event *analytics.CookieSyncObject) {} | ||
func (l *AgamLogger) LogNotificationEventObject(event *analytics.NotificationEvent) {} | ||
func (l *AgamLogger) LogSetUIDObject(event *analytics.SetUIDObject) {} |
Oops, something went wrong.