Skip to content

Commit

Permalink
Finalized db table changes. And service updates accordingly.
Browse files Browse the repository at this point in the history
Coses #21
Closes #9
Closes #6
  • Loading branch information
DoctressWasTaken committed Apr 10, 2021
1 parent 1d8b0b2 commit e11eb68
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 58 deletions.
40 changes: 24 additions & 16 deletions postgres/match.sql
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
CREATE TABLE IF NOT EXISTS euw1.match
(
match_id BIGINT PRIMARY KEY,
queue SMALLINT,
timestamp TIMESTAMP,
duration SMALLINT DEFAULT NULL,
win BOOLEAN DEFAULT NULL, -- False: Blue | True: Red
details JSON,
timeline JSON,
roleml JSON
match_id BIGINT PRIMARY KEY,
queue SMALLINT,
timestamp TIMESTAMP,
details_pulled BOOLEAN,
timeline_pulled BOOLEAN
);
-- General lookups
CREATE INDEX ON euw1.match ((timestamp::date), queue);
-- Lookup for undone tasks
-- This will only be used if most data in the table is not null,
-- so to use it data older than your minimum match_details age should be removed
-- see https://stackoverflow.com/questions/5203755/why-does-postgresql-perform-sequential-scan-on-indexed-column
CREATE INDEX ON euw1.match ((details_pulled IS NULL));
CREATE INDEX ON euw1.match ((timeline_pulled IS NULL));


/*
CREATE TABLE IF NOT EXISTS kr.match
CREATE TABLE IF NOT EXISTS euw1.match_data
(
match_id BIGINT PRIMARY KEY,
queue SMALLINT,
timestamp TIMESTAMP,
duration SMALLINT DEFAULT NULL,
win BOOLEAN DEFAULT NULL, -- False: Blue | True: Red
details_pulled BOOLEAN,
timeline_pulled BOOLEAN
win BOOLEAN DEFAULT NULL,
details JSONB, -- If you want to reduce space required use JSON instead of JSONB
timeline JSONB,
roleml JSONB
);
*/
-- General lookups
CREATE INDEX ON euw1.match_data ((timestamp::date), queue, duration, win);
-- Ready for roleml index
CREATE INDEX ON euw1.match_data ((details IS NOT NULL AND timeline IS NOT NULL AND roleml IS NULL));
-- Find finished roleml
CREATE INDEX ON euw1.match_data ((roleml IS NOT NULL));
5 changes: 4 additions & 1 deletion postgres/summoner.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS kr.summoner
CREATE TABLE IF NOT EXISTS euw1.summoner
(

summoner_id VARCHAR(63) PRIMARY KEY,
Expand All @@ -18,3 +18,6 @@ CREATE TABLE IF NOT EXISTS kr.summoner

last_updated DATE DEFAULT CURRENT_DATE
);
-- Improved lookup speed for summoner_id service
-- and match_history service (full refresh tasks)
CREATE INDEX ON euw1.summoner ((account_id IS NULL), (wins_last_updated IS NULL));
16 changes: 9 additions & 7 deletions services/league_ranking/standalone/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def __init__(self):
self.db = PostgresConnector(user=self.server.lower())

self.url = (
f"http://{self.server.lower()}.api.riotgames.com/lol/"
+ "league-exp/v4/entries/RANKED_SOLO_5x5/%s/%s?page=%s"
f"http://{self.server.lower()}.api.riotgames.com/lol/"
+ "league-exp/v4/entries/RANKED_SOLO_5x5/%s/%s?page=%s"
)
self.rankmanager = RankManager()
self.retry_after = datetime.now()
Expand Down Expand Up @@ -99,10 +99,10 @@ async def process_rank(self, tier, division, worker=5):
if line["summoner_id"] in tasks:
task = tasks[line["summoner_id"]]
if task == (
line["summoner_id"],
int(line["rank"]),
int(line["wins"]),
int(line["losses"]),
line["summoner_id"],
int(line["rank"]),
int(line["wins"]),
int(line["losses"]),
):
del tasks[line["summoner_id"]]
self.logging.info("Upserting %s changed user.", len(tasks))
Expand Down Expand Up @@ -183,7 +183,9 @@ async def fetch(self, session, url):
raise Non200Exception()
if response.status in [429, 430]:
if "Retry-At" in response.headers:
self.retry_after = datetime.strptime(response.headers["Retry-At"], "%Y-%m-%d %H:%M:%S.%f")
self.retry_after = datetime.strptime(
response.headers["Retry-At"], "%Y-%m-%d %H:%M:%S.%f"
)
raise RatelimitException()
if response.status == 404:
raise NotFoundException()
Expand Down
4 changes: 2 additions & 2 deletions services/match_details/manager/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ async def get_tasks(self):
"""
SELECT match_id
FROM %s.match
WHERE details IS NULL
AND DATE(timestamp) >= %s
WHERE details_pulled IS NULL
AND timestamp::date >= %s
LIMIT $1;
"""
% (self.server.lower(), self.details_cutoff),
Expand Down
43 changes: 27 additions & 16 deletions services/match_details/worker/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def __init__(self):
self.stopped = False
self.retry_after = datetime.now()
self.url = (
f"http://{self.server.lower()}.api.riotgames.com/lol/"
+ "match/v4/matches/%s"
f"http://{self.server.lower()}.api.riotgames.com/lol/"
+ "match/v4/matches/%s"
)

self.buffered_elements = (
Expand All @@ -58,35 +58,44 @@ async def prepare(self, conn):
self.match_update = await conn.prepare(
"""
UPDATE %s.match
SET duration = $1,
win = $2,
details = $3
WHERE match_id = $4
SET details_pulled = TRUE
WHERE match_id = $1
"""
% self.server.lower()
)
self.match_data_update = await conn.prepare(
"""
INSERT INTO %s.match_data (match_id, duration, win, details)
VALUES ($1, $2, $3, $4)
ON CONFLICT (match_id) DO UPDATE
SET details = EXCLUDED.details
""" % self.server.lower()
)

async def flush_manager(self, match_details):
"""Update entries in postgres once enough tasks are done."""
try:
update_sets = []
update_match_sets = []
update_match_data_sets = []
for match in match_details:
if not match[1]:
continue
details = match[1]
# Team Details
update_sets.append(
update_match_sets.append((int(match[0]),))
update_match_data_sets.append(
(
int(match[0]),
details["gameDuration"],
details["teams"][0]["win"] == "Win",
json.dumps(details),
int(match[0]),
)
)
if update_sets:
if update_match_sets:
async with self.db.get_connection() as db:
await self.match_update.executemany(update_sets)
self.logging.info("Inserted %s match_details.", len(update_sets))
await self.match_data_update.executemany(update_match_data_sets)
await self.match_update.executemany(update_match_sets)
self.logging.info("Inserted %s match_details.", len(update_match_sets))

except Exception as err:
traceback.print_tb(err.__traceback__)
Expand All @@ -96,9 +105,9 @@ async def get_task(self):
"""Return tasks to the async worker."""
async with self.redis.get_connection() as buffer:
if not (
tasks := await buffer.spop(
"%s_match_details_tasks" % self.server, self.batch_size
)
tasks := await buffer.spop(
"%s_match_details_tasks" % self.server, self.batch_size
)
):
return tasks
if self.stopped:
Expand Down Expand Up @@ -177,7 +186,9 @@ async def fetch(self, session, url) -> dict:
if response.status in [429, 430]:
if response.status == 430:
if "Retry-At" in response.headers:
self.retry_after = datetime.strptime(response.headers["Retry-At"], "%Y-%m-%d %H:%M:%S.%f")
self.retry_after = datetime.strptime(
response.headers["Retry-At"], "%Y-%m-%d %H:%M:%S.%f"
)
elif response.status == 429:
self.logging.info(response.status)
delay = 1
Expand Down
4 changes: 3 additions & 1 deletion services/match_history/worker/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ async def fetch(self, session, url) -> dict:
if response.status in [429, 430]:
if response.status == 430:
if "Retry-At" in response.headers:
self.retry_after = datetime.strptime(response.headers["Retry-At"], "%Y-%m-%d %H:%M:%S.%f")
self.retry_after = datetime.strptime(
response.headers["Retry-At"], "%Y-%m-%d %H:%M:%S.%f"
)
elif response.status == 429:
self.logging.info(response.status)
delay = 1
Expand Down
2 changes: 1 addition & 1 deletion services/match_timeline/manager/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def get_tasks(self):
"""
SELECT match_id
FROM %s.match
WHERE timeline IS NULL
WHERE timeline_pulled IS NULL
AND DATE(timestamp) >= %s
LIMIT $1;
"""
Expand Down
31 changes: 22 additions & 9 deletions services/match_timeline/worker/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,31 +58,42 @@ async def prepare(self, conn):
self.match_update = await conn.prepare(
"""
UPDATE %s.match
SET timeline = $1
WHERE match_id = $2
SET timeline_pulled = TRUE
WHERE match_id = $1
"""
% self.server.lower()
)
self.match_data_update = await conn.prepare(
"""
INSERT INTO %s.match_data (match_id, timeline)
VALUES ($1, $2)
ON CONFLICT (match_id) DO UPDATE
SET timeline = EXCLUDED.timeline
""" % self.server.lower()
)

async def flush_manager(self, match_timelines):
"""Update entries in postgres once enough tasks are done."""
try:
update_sets = []
update_match_sets = []
update_match_data_sets = []
for match in match_timelines:
if not match[1]:
continue
timeline = match[1]
# Team Details
update_sets.append(
update_match_sets.append((int(match[0]),))
update_match_data_sets.append(
(
json.dumps(timeline),
int(match[0]),
json.dumps(timeline),
)
)
if update_sets:
if update_match_sets:
async with self.db.get_connection() as db:
await self.match_update.executemany(update_sets)
self.logging.info("Inserted %s match_timelines.", len(update_sets))
await self.match_data_update.executemany(update_match_data_sets)
await self.match_update.executemany(update_match_sets)
self.logging.info("Inserted %s match_timelines.", len(update_match_sets))

except Exception as err:
traceback.print_tb(err.__traceback__)
Expand Down Expand Up @@ -173,7 +184,9 @@ async def fetch(self, session, url) -> dict:
if response.status in [429, 430]:
if response.status == 430:
if "Retry-At" in response.headers:
self.retry_after = datetime.strptime(response.headers["Retry-At"], "%Y-%m-%d %H:%M:%S.%f")
self.retry_after = datetime.strptime(
response.headers["Retry-At"], "%Y-%m-%d %H:%M:%S.%f"
)
elif response.status == 429:
self.logging.info(response.status)
delay = 1
Expand Down
8 changes: 4 additions & 4 deletions services/roleML/standalone/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ async def get_tasks(self):
SELECT match_id,
details,
timeline
FROM %s.match
WHERE roleml IS NULL
FROM %s.match_data
WHERE details IS NOT NULL
AND timeline IS NOT NULL
AND details IS NOT NULL
AND roleml IS NULL
AND queue IN (%s)
AND duration >= 60 * 12
LIMIT $1;
Expand All @@ -59,7 +59,7 @@ async def update_db(self, results):
async with self.db.get_connection() as db:
await db.executemany(
"""
UPDATE %s.match
UPDATE %s.match_data
SET roleml = $1
WHERE match_id = $2
"""
Expand Down
4 changes: 3 additions & 1 deletion services/summoner_id/worker/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ async def fetch(self, session, url):
if response.status in [429, 430]:
if response.status == 430:
if "Retry-At" in response.headers:
self.retry_after = datetime.strptime(response.headers["Retry-At"], "%Y-%m-%d %H:%M:%S.%f")
self.retry_after = datetime.strptime(
response.headers["Retry-At"], "%Y-%m-%d %H:%M:%S.%f"
)
elif response.status == 429:
self.logging.info(response.status)
delay = 1
Expand Down

0 comments on commit e11eb68

Please sign in to comment.