diff --git a/functions/aws/control/distributor_queue.py b/functions/aws/control/distributor_queue.py index 37f54e5..b83333c 100644 --- a/functions/aws/control/distributor_queue.py +++ b/functions/aws/control/distributor_queue.py @@ -41,18 +41,6 @@ def push( We must use a single shard - everything is serialized. """ counter_val = counter.sum - print("Distributor push", event) - print( - "Distributor push", - { - "key": self._type_serializer.serialize("faaskeeper"), - "timestamp": self._type_serializer.serialize(counter_val), - "sourceIP": ip, - "sourcePort": port, - "user_timestamp": user_timestamp, - **event.serialize(self._type_serializer), - }, - ) self._queue.write( "", { @@ -88,14 +76,12 @@ def push( """We must use a single shard - everything is serialized. """ # FIXME: is it safe here to serialize the types? - print("Distributor push", event) payload: Dict[str, str] = { "sourceIP": ip, "sourcePort": port, "user_timestamp": user_timestamp, **event.serialize(self._type_serializer), } - print("Distributor push", payload) # if "data" in payload: # binary_data = payload["data"]["B"] # del payload["data"] diff --git a/functions/aws/distributor.py b/functions/aws/distributor.py index d699eed..c5da897 100644 --- a/functions/aws/distributor.py +++ b/functions/aws/distributor.py @@ -128,7 +128,6 @@ def handler(event: dict, context): else: raise NotImplementedError() - logging.info("Begin processing event", write_event) # FIXME: hide under abstraction, boto3 deserialize operation: DistributorEvent @@ -153,13 +152,11 @@ def handler(event: dict, context): else: raise NotImplementedError() try: - logging.info(f"Prepared event", write_event) begin_write = time.time() # write new data for r in regions: ret = operation.execute(config.user_storage, epoch_counters[r]) end_write = time.time() - logging.info("Finished region operation") begin_watch = time.time() # start watch delivery for r in regions: @@ -169,17 +166,13 @@ def handler(event: dict, context): # ) # FIXME: other watchers # FIXME: reenable submission - logging.info( - region_watches[r].query_watches( - operation.node.path, [WatchType.GET_DATA] - ) + region_watches[r].query_watches( + operation.node.path, [WatchType.GET_DATA] ) end_watch = time.time() - logging.info("Finished watch dispatch") for r in regions: epoch_counters[r].update(counters) - logging.info("Updated epoch counters") begin_notify = time.time() if ret: # notify client about success @@ -203,7 +196,6 @@ def handler(event: dict, context): {"status": "failure", "reason": "distributor failured"}, ) end_notify = time.time() - logging.info("Finished notifying the client") except Exception: print("Failure!") import traceback @@ -218,13 +210,11 @@ def handler(event: dict, context): write_event, {"status": "failure", "reason": "distributor failured"}, ) - logging.info("Start waiting for watchers") begin_watch_wait = time.time() for f in watches_submitters: f.result() end_watch_wait = time.time() end = time.time() - logging.info("Finish waiting for watchers") global repetitions global sum_total diff --git a/functions/aws/model/user_storage.py b/functions/aws/model/user_storage.py index a4b8a00..2dc416c 100644 --- a/functions/aws/model/user_storage.py +++ b/functions/aws/model/user_storage.py @@ -132,13 +132,10 @@ def write(self, node: Node): def update(self, node: Node, updates: Set[NodeDataType] = set()): # we need to download the data from storage if not node.has_data or not node.has_children or not node.has_created: - logging.info("Start reading from S3") node_data = self._storage.read(node.path) - logging.info("Finish reading from S3") read_node = S3Reader.deserialize( node.path, node_data, not node.has_data, not node.has_children ) - logging.info("Finish deserialize from S3") if not node.has_data: node.data = read_node.data if not node.has_children: @@ -147,11 +144,8 @@ def update(self, node: Node, updates: Set[NodeDataType] = set()): node.created = read_node.created if not node.has_modified: node.modified = read_node.modified - logging.info("Start writing to S3") s3_data = S3Reader.serialize(node) - logging.info("Finish data conversion") self._storage.write(node.path, s3_data) # S3Reader.serialize(node)) - logging.info("Finish writing to S3") return OpResult.SUCCESS def delete(self, node: Node): diff --git a/functions/aws/writer.py b/functions/aws/writer.py index aeca373..529d994 100644 --- a/functions/aws/writer.py +++ b/functions/aws/writer.py @@ -428,8 +428,6 @@ def handler(event: dict, context): else: raise NotImplementedError() - logging.info(record) - logging.info(f"Begin processing event {write_event}") op = get_object(write_event["op"]) if op not in ops: logging.error( @@ -444,7 +442,6 @@ def handler(event: dict, context): ret = ops[op](event_id, write_event) if ret: - logging.info("Processing finished, result ", ret) if ret["status"] == "failure": logging.error(f"Failed processing write event {event_id}: {ret}") # Failure - notify client