Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/hijack all release queries #70

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
12 changes: 9 additions & 3 deletions cmd/alice/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ func flagsInitialization(expertMode bool) []cli.Flag {
&cli.IntFlag{
Name: "limiter-max-req",
Category: "Limiter settings",
Hidden: true,
Hidden: expertMode,
Value: 200,
},
&cli.DurationFlag{
Name: "limiter-records-duration",
Category: "Limiter settings",
Hidden: true,
Hidden: expertMode,
Value: 5 * time.Minute,
},

Expand Down Expand Up @@ -346,7 +346,13 @@ func flagsInitialization(expertMode bool) []cli.Flag {
Name: "randomizer-update-frequency-bootstrap",
Category: "Release randomizer",
Value: 5 * time.Second,
Hidden: true,
Hidden: expertMode,
},
&cli.IntFlag{
Name: "randomizer-random-fetch-tries",
Usage: "possible errors (tries) for fetching random release",
Value: 10,
Hidden: expertMode,
},
&cli.IntFlag{
Name: "redis-client-maxretries",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/allegro/bigcache/v3 v3.1.0
github.com/goccy/go-json v0.10.3
github.com/gofiber/fiber/v2 v2.52.5
github.com/gofiber/storage/bbolt/v2 v2.0.0
github.com/jedib0t/go-pretty/v6 v6.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofiber/fiber/v2 v2.52.5 h1:tWoP1MJQjGEe4GB5TUGOi7P2E0ZMMRx5ZTG4rT+yGMo=
github.com/gofiber/fiber/v2 v2.52.5/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ=
Expand Down
16 changes: 0 additions & 16 deletions internal/anilibria/anilibria.go

This file was deleted.

183 changes: 124 additions & 59 deletions internal/anilibria/randomizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ package anilibria

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

"github.com/anilibria/alice/internal/utils"
"github.com/goccy/go-json"
futils "github.com/gofiber/fiber/v2/utils"
"github.com/klauspost/compress/zstd"
"github.com/redis/go-redis/v9"
"github.com/rs/zerolog"
"github.com/urfave/cli/v2"
"github.com/valyala/bytebufferpool"
)

type Randomizer struct {
Expand All @@ -31,17 +30,21 @@ type Randomizer struct {
relUpdFreqErr time.Duration
relUpdFreqBoot time.Duration

encoder *zstd.Encoder
decoder *zstd.Decoder

mu sync.RWMutex
releases []string
releases *Releases

rawbufpool *bytebufferpool.Pool
}

func New(c context.Context) *Randomizer {
cli := c.Value(utils.CKCliCtx).(*cli.Context)

var dec *zstd.Decoder
var enc *zstd.Encoder
if cli.Bool("randomizer-redis-zstd-enable") {
enc, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
dec, _ = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
}

Expand All @@ -68,10 +71,13 @@ func New(c context.Context) *Randomizer {
relUpdFreqErr: cli.Duration("randomizer-update-frequency-onerror"),
relUpdFreqBoot: cli.Duration("randomizer-update-frequency-bootstrap"),

encoder: enc,
decoder: dec,

releases: make([]string, 0),
releases: NewReleases(WithFetchTries(cli.Int("randomizer-random-fetch-tries"))),
releasesKey: cli.String("randomizer-releaseskey"),

rawbufpool: &bytebufferpool.Pool{},
}

return r
Expand All @@ -82,10 +88,42 @@ func (m *Randomizer) Bootstrap() {
m.destroy()
}

func (m *Randomizer) Randomize() string {
return m.randomRelease()
func (m *Randomizer) Randomize(region string) (_ string, e error) {
var release *Release
if release, e = m.releases.RandomRelease(region); e != nil {
return
}

return release.Code, e
}

func (m *Randomizer) RawRelease(ident []byte) (rawjsonbuf json.RawMessage, ok bool, e error) {
var release *Release
if release, ok = m.releases.Release(futils.UnsafeString(ident)); !ok {
return
}

rawbuf := m.rawbufpool.Get()
defer m.rawbufpool.Put(rawbuf)

if rawbuf.B, ok = release.Raw(); !ok {
return
}

// decompress chunk response from redis
// rawjsonbuf := m.rawbufpool.Get()
// defer m.rawbufpool.Put(rawjsonbuf)

if rawjsonbuf, e = m.decompressPayload(rawbuf.B); e != nil {
m.log.Warn().Msg("an error occurred while decompress redis response - " + e.Error())
return
}

return
}

//
//
//

func (m *Randomizer) loop() {
Expand All @@ -104,22 +142,30 @@ LOOP:
update.Stop()

var e error
var releases []string
if releases, e = m.lookupReleases(); e != nil {
var chunks, failed, banned int
started, releases := time.Now(), make(map[string]*Release, m.releases.Len()+10)
// m.releases.Len()+10 - avoiding mass alocs in lookupReleases()

if chunks, failed, banned, e = m.lookupReleases(releases); e != nil {
m.log.Error().Msg("could not updated releases for randomizer - " + e.Error())
update.Reset(m.relUpdFreqErr)
continue
}

m.rotateReleases(releases)
parsed := time.Now()
m.log.Info().Msgf("in %s from %d (of %d) chunks added %d releases and %d WW banned",
time.Since(started).String(), failed, chunks, len(releases), banned)

m.releases.Commit(releases)
m.log.Debug().Msgf("new releases commited for %s", time.Since(parsed).String())
update.Reset(m.relUpdFreq)
}
}
}

func (m *Randomizer) destroy() {
if e := m.rclient.Close(); e != nil {
m.log.Error().Msg("could not properly close http client - " + e.Error())
m.log.Error().Msg("could not properly close redis client - " + e.Error())
}
}

Expand All @@ -144,8 +190,7 @@ func (m *Randomizer) peekReleaseKeyChunks() (_ int, e error) {
return strconv.Atoi(futils.UnsafeString(dres))
}

func (m *Randomizer) lookupReleases() (_ []string, e error) { // skipcq: GO-R1005 needed to be kept as it is
var chunks int
func (m *Randomizer) lookupReleases(releases map[string]*Release) (chunks, failed, banned int, e error) {
if chunks, e = m.peekReleaseKeyChunks(); e != nil {
return
} else if chunks == 0 {
Expand All @@ -155,13 +200,7 @@ func (m *Randomizer) lookupReleases() (_ []string, e error) { // skipcq: GO-R100
m.log.Trace().Msgf("release key says about %d chunks", chunks)
m.log.Info().Msgf("staring release parsing from redis with %d chunks", chunks)

// avoid mass allocs
started := time.Now()
releases := make([]string, 0, len(m.releases))

var res string
var errs []string
var total, banned int
errs := make([]string, 0, chunks)

for i := 0; i < chunks; i++ {
select {
Expand All @@ -172,50 +211,80 @@ func (m *Randomizer) lookupReleases() (_ []string, e error) { // skipcq: GO-R100
m.log.Trace().Msgf("parsing chunk %d/%d...", i, chunks)
}

// get compressed chunk response from redis
if res, e = m.rclient.Get(m.rctx, m.releasesKey+strconv.Itoa(i)).Result(); e == redis.Nil {
e = fmt.Errorf("given chunk number %d is not exists", i)
m.log.Warn().Msg(e.Error())
chunk := bytebufferpool.Get()
bytebufferpool.Put(chunk)

// get decompressed chunk from redis
if chunk.B, e = m.chunkFetchFromRedis(m.releasesKey + strconv.Itoa(i)); e != nil {
m.log.Warn().Msg("an error occurred while peeking a releases chunk - " + e.Error())
errs = append(errs, e.Error())
continue
} else if e != nil {
m.log.Warn().Msg("an error occurred while peeking a releases chunk - " + e.Error())
} else if chunk.Len() == 0 {
e = fmt.Errorf("given chunk number %d is not exists", i)
m.log.Warn().Msg(e.Error())
errs = append(errs, e.Error())
continue
}

// decompress chunk response from redis
var dres []byte
if dres, e = m.decompressPayload(futils.UnsafeBytes(res)); e != nil {
m.log.Warn().Msg("an error occurred while decompress redis response - " + e.Error())
// store raw json objects for further query=release responding
var rawReleases RawReleasesChunk
if e = json.Unmarshal(chunk.B, &rawReleases); e != nil {
m.log.Warn().Msg("an error occurred while unmarshal raw release chunk - " + e.Error())
errs = append(errs, e.Error())
continue
}

// get json formated response from decompressed response
var releasesChunk Releases
if e = json.Unmarshal(dres, &releasesChunk); e != nil {
var releasesChunk ReleasesChunk
if e = json.Unmarshal(chunk.B, &releasesChunk); e != nil {
m.log.Warn().Msg("an error occurred while unmarshal release chunk - " + e.Error())
errs = append(errs, e.Error())
continue
}

// parse json chunk response
for _, release := range releasesChunk {
if release.BlockedInfo != nil && release.BlockedInfo.IsBlockedByCopyrights {
m.log.Debug().Msgf("release %d (%s) worldwide banned, skip it...", release.Id, release.Code)
banned++
for id, release := range releasesChunk {
if release == nil {
m.log.Error().Msg("BUG! found an empty release after json.Unmarshal")
continue
}

if zerolog.GlobalLevel() <= zerolog.DebugLevel {
m.log.Trace().Msgf("release %d with code %s found", release.Id, release.Code)
}

total++
releases = append(releases, release.Code)
}
if b, _ := release.IsOverworldBlocked(); b {
m.log.Debug().Msgf("release %d (%s) worldwide banned", release.Id, release.Code)
banned++

// patch raw json for android app
release.PatchBlockReason(BlockedByCopyrights)

var rawBlockedInfo json.RawMessage
if rawBlockedInfo, e = json.Marshal(&release.BlockedInfo); e != nil {
m.log.Warn().Msg("an error occurred in patching release - " + e.Error())
errs = append(errs, e.Error())
continue
}
rawReleases[id]["blockedInfo"] = &rawBlockedInfo

delete(rawReleases[id], "externalPlaylist")
delete(rawReleases[id], "playlist")
}

// save raw json for query=release
rawbuf := m.rawbufpool.Get()
defer m.rawbufpool.Put(rawbuf)

if rawbuf.B, e = json.Marshal(rawReleases[id]); e != nil {
m.log.Warn().Msg("an error occurred in patching release - " + e.Error())
errs = append(errs, e.Error())
continue
}
release.SetRaw(m.compressPayload(rawbuf.B))

releases[release.Code] = release
}
}

if errslen := len(errs); errslen != 0 {
Expand All @@ -227,34 +296,26 @@ func (m *Randomizer) lookupReleases() (_ []string, e error) { // skipcq: GO-R100
}
}

m.log.Info().Msgf("in %s from %d (of %d) chunks added %d releases and %d skipped because of WW ban",
time.Since(started).String(), chunks-len(errs), chunks, total, banned)
return releases, nil
return chunks, chunks - len(errs), banned, nil
}

func (m *Randomizer) rotateReleases(releases []string) {
m.mu.Lock()
defer m.mu.Unlock()
func (m *Randomizer) chunkFetchFromRedis(key string) (chunk []byte, e error) {
var compressed string

m.log.Debug().Msgf("update current %d releases with slice of %d releases",
len(m.releases), len(releases))
m.releases = releases
}

func (m *Randomizer) randomRelease() (_ string) {
if !m.mu.TryRLock() {
m.log.Warn().Msg("could not get randomized release, read lock is not available")
// get compressed chunk response from redis
if compressed, e = m.rclient.Get(m.rctx, key).Result(); e == redis.Nil {
e = nil
return
} else if e != nil {
return
}
defer m.mu.RUnlock()

if len(m.releases) == 0 {
m.log.Warn().Msg("randomizer is not ready yet")
// decompress chunk response from redis
if chunk, e = m.decompressPayload(futils.UnsafeBytes(compressed)); e != nil {
return
}

r := rand.Intn(len(m.releases)) // skipcq: GSC-G404 math/rand is enoght here
return m.releases[r]
return
}

func (m *Randomizer) decompressPayload(payload []byte) ([]byte, error) {
Expand All @@ -264,3 +325,7 @@ func (m *Randomizer) decompressPayload(payload []byte) ([]byte, error) {

return m.decoder.DecodeAll(payload, nil)
}

func (m *Randomizer) compressPayload(payload []byte) []byte {
return m.encoder.EncodeAll(payload, make([]byte, 0, len(payload)))
}
Loading