Skip to content

Commit

Permalink
feat(example): added working_hours_gspread.py example script (#81)
Browse files Browse the repository at this point in the history
* feat(example): added working_hours_gspread.py example script, updated working_hours.py to be more reusable

* fix(example): improvements to working_hours.py and working_hours_gspread.py examples

* build: added gspread as a dev dependency (used in examples)

* fix: fixed examples

* fix: fixed load_dataframe example in CI

* fix: fixed working_hours example in CI
  • Loading branch information
ErikBjare authored Jan 8, 2024
1 parent b4598ff commit c2b051e
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ test-integration:
test-examples:
cd examples; pytest -v *.py
cd examples; yes | python3 load_dataframe.py
cd examples; python3 working_hours.py
cd examples; python3 working_hours.py 'activitywatch|aw-|github.com' fakedata

typecheck:
poetry run mypy
Expand Down
7 changes: 5 additions & 2 deletions examples/load_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Load ActivityWatch data into a dataframe, and export as CSV.
"""
import os
import socket
from datetime import datetime, timedelta, timezone

import iso8601
Expand All @@ -11,10 +13,11 @@


def build_query() -> str:
hostname = "fakedata" if os.getenv("CI") else socket.gethostname()
canonicalQuery = canonicalEvents(
DesktopQueryParams(
bid_window="aw-watcher-window_",
bid_afk="aw-watcher-afk_",
bid_window=f"aw-watcher-window_{hostname}",
bid_afk=f"aw-watcher-afk_{hostname}",
classes=default_classes,
)
)
Expand Down
76 changes: 48 additions & 28 deletions examples/working_hours.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,25 @@
"""

import json
import re
import logging
import os
from datetime import datetime, timedelta, time
from typing import List, Tuple, Dict

from tabulate import tabulate
import re
import socket
import sys
from datetime import datetime, time, timedelta
from typing import Dict, List, Tuple

import aw_client
from aw_client import queries
from aw_core import Event
from aw_transform import flood
from tabulate import tabulate


EXAMPLE_REGEX = r"activitywatch|algobit|defiarb|github.com"
OUTPUT_HTML = os.environ.get("OUTPUT_HTML", "").lower() == "true"

td1d = timedelta(days=1)
day_offset = timedelta(hours=4)


def _pretty_timedelta(td: timedelta) -> str:
s = str(td)
Expand All @@ -47,19 +50,11 @@ def generous_approx(events: List[dict], max_break: float) -> timedelta:
)


def query(regex: str = EXAMPLE_REGEX, save=True):
def query(regex: str, timeperiods, hostname: str):
print("Querying events...")
td1d = timedelta(days=1)
day_offset = timedelta(hours=4)
print(f" Day offset: {day_offset}")
print("")

now = datetime.now().astimezone()
today = (datetime.combine(now.date(), time()) + day_offset).astimezone()

timeperiods = [(today - i * td1d, today - (i - 1) * td1d) for i in range(5)]
timeperiods.reverse()

categories: List[Tuple[List[str], Dict]] = [
(
["Work"],
Expand All @@ -75,8 +70,8 @@ def query(regex: str = EXAMPLE_REGEX, save=True):

canonicalQuery = queries.canonicalEvents(
queries.DesktopQueryParams(
bid_window="aw-watcher-window_",
bid_afk="aw-watcher-afk_",
bid_window=f"aw-watcher-window_{hostname}",
bid_afk=f"aw-watcher-afk_{hostname}",
classes=categories,
filter_classes=[["Work"]],
)
Expand All @@ -89,16 +84,38 @@ def query(regex: str = EXAMPLE_REGEX, save=True):

res = aw.query(query, timeperiods)

for break_time in [0, 5 * 60, 10 * 60, 15 * 60]:
_print(
timeperiods, res, break_time, {"category_rule": categories[0][1]["regex"]}
)
return res

if save:
fn = "working_hours_events.json"
with open(fn, "w") as f:
print(f"Saving to {fn}...")
json.dump(res, f, indent=2)

def main():
if len(sys.argv) < 2:
print("Usage: python3 working_hours.py <regex> [hostname]")
exit(1)

regex = sys.argv[1]
print(f"Using regex: {regex}")

if len(sys.argv) > 2:
hostname = sys.argv[2]
print(f"Using hostname: {hostname}")
else:
hostname = socket.gethostname()

now = datetime.now().astimezone()
today = (datetime.combine(now.date(), time()) + day_offset).astimezone()

timeperiods = [(today - i * td1d, today - (i - 1) * td1d) for i in range(5)]
timeperiods.reverse()

res = query(regex, timeperiods, hostname)

for break_time in [0, 5 * 60, 15 * 60]:
_print(timeperiods, res, break_time, {"regex": regex})

fn = "working_hours_events.json"
with open(fn, "w") as f:
print(f"Saving to {fn}...")
json.dump(res, f, indent=2)


def _print(timeperiods, res, break_time, params: dict):
Expand Down Expand Up @@ -134,4 +151,7 @@ def _print(timeperiods, res, break_time, params: dict):


if __name__ == "__main__":
query()
# ignore log warnings in aw_transform
logging.getLogger("aw_transform").setLevel(logging.ERROR)

main()
122 changes: 122 additions & 0 deletions examples/working_hours_gspread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""
This script uses ActivityWatch events to updates a Google Sheet
with the any events matching a regex for the last `days_back` days.
It uses the `working_hours.py` example to calculate the working hours
and the `gspread` library to interact with Google Sheets.
The Google Sheet is identified by its key, which is hardcoded in the script.
The script uses a service account for authentication with the Google Sheets API.
The script assumes that the Google Sheet has a worksheet for each hostname, named "worked-{hostname}".
If such a worksheet does not exist, the script will fail.
The working hours are calculated generously, meaning that if the time between two consecutive
events is less than `break_time` (10 minutes by default), it is considered as working time.
Usage:
python3 working_hours_gspread.py <sheet_key> <regex>
"""
import socket
import sys
from datetime import datetime, time, timedelta

import gspread

import working_hours

td1d = timedelta(days=1)
break_time = 10 * 60


def update_sheet(sheet_key: str, regex: str):
"""
Update the Google Sheet with the working hours for the last `days_back` days.
1. Open the sheet and get the last entry
2. Query the working hours for the days since the last entry
3. Update the last entry in the Google Sheet (if any)
4. Append any new entries
"""

hostname = socket.gethostname()
hostname_display = hostname.replace(".localdomain", "").replace(".local", "")

try:
gc = gspread.service_account()
except Exception as e:
print(e)
print(
"Failed to authenticate with Google Sheets API.\n"
"Make sure you have a service account key in ~/.config/gspread/service_account.json\n"
"See https://gspread.readthedocs.io/en/latest/oauth2.html#for-bots-using-service-account"
)
exit(1)

# Open the sheet
sh = gc.open_by_key(sheet_key)
print(f"Updating document: {sh.title}")
worksheet = sh.worksheet(f"worked-{hostname_display}")
print(f"Updating worksheet: {worksheet.title}")

# Get the most recent entry from the Google Sheet
values = worksheet.get_all_values()
if values:
last_row = values[-1]
last_date = datetime.strptime(last_row[0], "%Y-%m-%d").date()
else:
last_date = None

last_datetime = (
(datetime.combine(last_date, time()) + working_hours.day_offset).astimezone()
if last_date
else None
)

if last_datetime:
print(f"Last entry: {last_datetime}")

now = datetime.now().astimezone()
today = (
datetime.combine(now.date(), time()) + working_hours.day_offset
).astimezone()

# Create a list of time periods to query, from last_date or days_back_on_new back if None
days_back_on_new = 30
days_back = (today - last_datetime).days + 1 if last_datetime else days_back_on_new
timeperiods = [(today - i * td1d, today - (i - 1) * td1d) for i in range(days_back)]
timeperiods.reverse()

# Run the query function from the original script and get the result
res = working_hours.query(regex, timeperiods, hostname)

# Iterate over the result and update or append the data to the Google Sheet
for tp, r in zip(timeperiods, res):
date = tp[0].date()
duration = (
working_hours.generous_approx(r["events"], break_time).total_seconds()
/ 3600
)
row = [str(date), duration]

# If the date is the same as the last entry, update it
if last_date and date == last_date:
print(f"Updating {row}")
worksheet.update_cell(len(worksheet.get_all_values()), 2, duration)
# If the date is later than the last entry, append it
elif not last_date or date > last_date:
print(f"Appending {row}")
worksheet.append_row(row, value_input_option="USER_ENTERED")
else:
print(f"Skipping {row}")


if __name__ == "__main__":
if len(sys.argv) == 3:
sheet_key = sys.argv[1]
regex = sys.argv[2]
else:
print("Usage: python3 working_hours_gspread.py <sheet_key> <regex>")
exit(1)

update_sheet(sheet_key, regex)
Loading

0 comments on commit c2b051e

Please sign in to comment.