diff --git a/src/npoapi/bin/npo_media_follow_changes.py b/src/npoapi/bin/npo_media_follow_changes.py index 305830a..6ddf638 100755 --- a/src/npoapi/bin/npo_media_follow_changes.py +++ b/src/npoapi/bin/npo_media_follow_changes.py @@ -6,72 +6,101 @@ import logging import time from datetime import datetime +from sys import stdout import json_stream from npoapi import Media -from io import TextIOWrapper -from sys import stdout -def media_follow_changes(): - client = Media().command_line_client("Get changes feed from the NPO Frontend API", exclude_arguments={"accept"}) - client.add_argument('profile', type=str, nargs='?', help='Profile') - client.add_argument("-s", "--since", type=str, default=None) - client.add_argument("--sleep", type=int, default=5) - client.add_argument("--deletes", type=str, default="ID_ONLY") - client.add_argument("--tail", action='store_true') - client.add_argument('-p', "--properties", type=str, default=None, +class FollowChanges: + def __init__(self): + self.client = Media().command_line_client("Get changes feed from the NPO Frontend API", exclude_arguments={"accept"}) + self.client.add_argument('profile', type=str, nargs='?', help='Profile') + self.client.add_argument("-s", "--since", type=str, default=None) + self.client.add_argument("--sleep", type=int, default=5) + self.client.add_argument("--deletes", type=str, default="ID_ONLY") + self.client.add_argument("--tail", action='store_true') + self.client.add_argument('-p', "--properties", type=str, default=None, help="properties filtering") + self.client.add_argument("--raw", action='store_true', description="No attempts to stream and handle big results. Everything should fit in memory. Simpler, but less efficient.") + self.client.add_argument("--reasonFilter", type=str, default="") + + self.args = self.client.parse_args() + + self.since = self.args.since + if self.since is None: + self.since = datetime.now().isoformat() + self.client.logger.info("No since given, using %s" % self.since) + + self.sinceAsEpoch = int(datetime.fromisoformat(self.since).timestamp() * 1000) - 60000 + + def one_call_raw(self): + response = self.client.changes_raw( + profile=self.args.profile, + since=self.sinceAsEpoch, + properties=self.args.properties, + deletes=self.args.deletes, + reason_filter=self.args.reasonFilter, + stream=False) + self.sinceAsEpoch = json.loads(response)['changes'][0]['publishDate'] + stdout.write(response) - args = client.parse_args() - - since = args.since - if since is None: - since = datetime.now().isoformat() - client.logger.info("No since given, using %s" % since) - - sinceAsEpoch = int(datetime.fromisoformat(since).timestamp() * 1000) - 60000 - - try: - while True: - client.logger.info("since: %s (%s)" % (sinceAsEpoch, datetime.fromtimestamp(sinceAsEpoch/1000).isoformat())) - response = client.changes_raw( - profile=args.profile, - since=sinceAsEpoch, - properties=args.properties, - deletes=args.deletes, - stream=True) - - if response.status != 200: - logging.error("Error %d" % response.status) - continue - data = json_stream.load(response) - changes = data['changes'] - newsince = None - for change in changes.persistent(): - c = json_stream.to_standard_types(change) - newsince = c.get('publishDate') - if not newsince: - logging.error("No publishDate in %s" % c) - break - tail = c.get('tail', False) - if not tail or args.tail: - stdout.write(json.dumps(c)) - stdout.write("\n") - stdout.flush() - if newsince is None: - raise Exception("No tail received?") - if newsince <= sinceAsEpoch: - raise Exception("Since doesn't grow") - sinceAsEpoch = newsince - changes.read_all() - response.close() - time.sleep(args.sleep) - except KeyboardInterrupt: - client.logger.info("interrupted") - - client.exit() + def one_call(self): + response = self.client.changes_raw( + profile=self.args.profile, + since=self.sinceAsEpoch, + properties=self.args.properties, + deletes=self.args.deletes, + reason_filter=self.args.reasonFilter, + stream=True) + + if response.status != 200: + logging.error("Error %d" % response.status) + return + data = json_stream.load(response) + changes = data['changes'] + newsince = None + count = 0 + for change in changes: + count += 1 + c = json_stream.to_standard_types(change) + newsince = c.get('publishDate') + if not newsince: + logging.error("No publishDate in %s" % c) + break + tail = c.get('tail', False) + if not tail or self.args.tail: + stdout.write(json.dumps(c)) + stdout.write("\n") + stdout.flush() + if count == 0: + raise Exception("No changes received!") + if newsince is None: + raise Exception("No tail received?") + if newsince <= self.sinceAsEpoch: + raise Exception("Since doesn't grow") + self.sinceAsEpoch = newsince + changes.read_all() + response.close() + time.sleep(self.args.sleep) + + def follow_changes(self): + try: + while True: + self.client.logger.info("since: %s (%s)" % (self.sinceAsEpoch, datetime.fromtimestamp(self.sinceAsEpoch/1000).isoformat())) + if self.args.raw: + self.one_call_raw() + else: + self.one_call() + except KeyboardInterrupt: + self.client.logger.info("interrupted") + + self.client.exit() + + +def media_follow_changes(): + FollowChanges().follow_changes() if __name__ == "__main__": diff --git a/src/npoapi/media.py b/src/npoapi/media.py index 3a5e5e8..a20d7e6 100644 --- a/src/npoapi/media.py +++ b/src/npoapi/media.py @@ -53,7 +53,7 @@ def changes_raw(self, profile=None, order="ASC", stream=False, limit=10, since:U sinceDate = None if not since is None: if isinstance(since, datetime.datetime): - since = str(since).replace(" ", "T") + sinceDate = str(since).replace(" ", "T") elif type(since) == int:\ sinceDate= str(since) elif not force_oldstyle and (not since.isdigit() or int(since) > 946681200000):