Skip to content

Commit

Permalink
--raw and --reasonFilter options.
Browse files Browse the repository at this point in the history
  • Loading branch information
mihxil committed Nov 9, 2023
1 parent ec64ed7 commit 17d19a6
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 59 deletions.
145 changes: 87 additions & 58 deletions src/npoapi/bin/npo_media_follow_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,72 +6,101 @@
import logging
import time
from datetime import datetime
from sys import stdout

import json_stream

from npoapi import Media
from io import TextIOWrapper
from sys import stdout


def media_follow_changes():
client = Media().command_line_client("Get changes feed from the NPO Frontend API", exclude_arguments={"accept"})
client.add_argument('profile', type=str, nargs='?', help='Profile')
client.add_argument("-s", "--since", type=str, default=None)
client.add_argument("--sleep", type=int, default=5)
client.add_argument("--deletes", type=str, default="ID_ONLY")
client.add_argument("--tail", action='store_true')
client.add_argument('-p', "--properties", type=str, default=None,
class FollowChanges:
def __init__(self):
self.client = Media().command_line_client("Get changes feed from the NPO Frontend API", exclude_arguments={"accept"})
self.client.add_argument('profile', type=str, nargs='?', help='Profile')
self.client.add_argument("-s", "--since", type=str, default=None)
self.client.add_argument("--sleep", type=int, default=5)
self.client.add_argument("--deletes", type=str, default="ID_ONLY")
self.client.add_argument("--tail", action='store_true')
self.client.add_argument('-p', "--properties", type=str, default=None,
help="properties filtering")
self.client.add_argument("--raw", action='store_true', description="No attempts to stream and handle big results. Everything should fit in memory. Simpler, but less efficient.")
self.client.add_argument("--reasonFilter", type=str, default="")

self.args = self.client.parse_args()

self.since = self.args.since
if self.since is None:
self.since = datetime.now().isoformat()
self.client.logger.info("No since given, using %s" % self.since)

self.sinceAsEpoch = int(datetime.fromisoformat(self.since).timestamp() * 1000) - 60000

def one_call_raw(self):
response = self.client.changes_raw(
profile=self.args.profile,
since=self.sinceAsEpoch,
properties=self.args.properties,
deletes=self.args.deletes,
reason_filter=self.args.reasonFilter,
stream=False)
self.sinceAsEpoch = json.loads(response)['changes'][0]['publishDate']
stdout.write(response)

args = client.parse_args()

since = args.since
if since is None:
since = datetime.now().isoformat()
client.logger.info("No since given, using %s" % since)

sinceAsEpoch = int(datetime.fromisoformat(since).timestamp() * 1000) - 60000

try:
while True:
client.logger.info("since: %s (%s)" % (sinceAsEpoch, datetime.fromtimestamp(sinceAsEpoch/1000).isoformat()))
response = client.changes_raw(
profile=args.profile,
since=sinceAsEpoch,
properties=args.properties,
deletes=args.deletes,
stream=True)

if response.status != 200:
logging.error("Error %d" % response.status)
continue
data = json_stream.load(response)
changes = data['changes']
newsince = None
for change in changes.persistent():
c = json_stream.to_standard_types(change)
newsince = c.get('publishDate')
if not newsince:
logging.error("No publishDate in %s" % c)
break
tail = c.get('tail', False)
if not tail or args.tail:
stdout.write(json.dumps(c))
stdout.write("\n")
stdout.flush()
if newsince is None:
raise Exception("No tail received?")
if newsince <= sinceAsEpoch:
raise Exception("Since doesn't grow")
sinceAsEpoch = newsince
changes.read_all()
response.close()
time.sleep(args.sleep)
except KeyboardInterrupt:
client.logger.info("interrupted")

client.exit()
def one_call(self):
response = self.client.changes_raw(
profile=self.args.profile,
since=self.sinceAsEpoch,
properties=self.args.properties,
deletes=self.args.deletes,
reason_filter=self.args.reasonFilter,
stream=True)

if response.status != 200:
logging.error("Error %d" % response.status)
return
data = json_stream.load(response)
changes = data['changes']
newsince = None
count = 0
for change in changes:
count += 1
c = json_stream.to_standard_types(change)
newsince = c.get('publishDate')
if not newsince:
logging.error("No publishDate in %s" % c)
break
tail = c.get('tail', False)
if not tail or self.args.tail:
stdout.write(json.dumps(c))
stdout.write("\n")
stdout.flush()
if count == 0:
raise Exception("No changes received!")
if newsince is None:
raise Exception("No tail received?")
if newsince <= self.sinceAsEpoch:
raise Exception("Since doesn't grow")
self.sinceAsEpoch = newsince
changes.read_all()
response.close()
time.sleep(self.args.sleep)

def follow_changes(self):
try:
while True:
self.client.logger.info("since: %s (%s)" % (self.sinceAsEpoch, datetime.fromtimestamp(self.sinceAsEpoch/1000).isoformat()))
if self.args.raw:
self.one_call_raw()
else:
self.one_call()
except KeyboardInterrupt:
self.client.logger.info("interrupted")

self.client.exit()


def media_follow_changes():
FollowChanges().follow_changes()


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion src/npoapi/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def changes_raw(self, profile=None, order="ASC", stream=False, limit=10, since:U
sinceDate = None
if not since is None:
if isinstance(since, datetime.datetime):
since = str(since).replace(" ", "T")
sinceDate = str(since).replace(" ", "T")
elif type(since) == int:\
sinceDate= str(since)
elif not force_oldstyle and (not since.isdigit() or int(since) > 946681200000):
Expand Down

0 comments on commit 17d19a6

Please sign in to comment.