diff --git a/api_tools/check_for_new_lectures.py b/api_tools/check_for_new_lectures.py new file mode 100644 index 00000000..445b32c5 --- /dev/null +++ b/api_tools/check_for_new_lectures.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# api_tools/sample_requests.py +# +# This file is part of MUESLI. +# +# Copyright (C) 2021, Christian Heusel +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import threading +import logging +import requests +import coloredlogs + +from header import authenticate, STATIC_HEADERS, MUESLI_URL + +# This could be replaced by cmd input vars +WAIT_TIME_SECONDS = 60 * 60 # 1h +USERNAME = "test@example.com" +SEMESTER = "2021 SS" + + +def get(endpoint, header): + endpoint = MUESLI_URL + endpoint + request = requests.get(endpoint, headers=header) + return request.json() + + +def get_lectures(): + headers = authenticate(username=USERNAME, save=True) + endpoint = "/api/v1/lectures" + return get(endpoint, headers) + + +def extract_lecture_titles(l): + return [r["name"] for r in l if r["term"] == SEMESTER] + + +def run_event_loop(): + ticker = threading.Event() + old = set(extract_lecture_titles(get_lectures())) + + while not ticker.wait(WAIT_TIME_SECONDS): + lectures = get_lectures() + current = set(extract_lecture_titles(lectures)) + if current.difference(old): + logging.info("New lecture found: %s", current.difference(old)) + else: + logging.info("No new lecture found!") + old = current + + +def main(): + coloredlogs.install(level=logging.DEBUG, fmt='%(asctime)s → %(message)s') + logging.info("Will check every %s seconds for new lectures on \"%s\"!", + WAIT_TIME_SECONDS, MUESLI_URL) + run_event_loop() + + +if __name__ == "__main__": + main() diff --git a/api_tools/header.py b/api_tools/header.py index b1fec2b3..d7be1123 100644 --- a/api_tools/header.py +++ b/api_tools/header.py @@ -19,20 +19,22 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . - """ This module provides tooling to authenticate ageinst the Muesli API v1 """ -STATIC_HEADERS = {'Accept': 'application/json'} -MUESLI_URL = "http://127.0.0.1:8080" - import json +import logging from os import path, stat from ast import literal_eval import requests +import getpass + +STATIC_HEADERS = {'Accept': 'application/json'} +MUESLI_URL = "https://muesli.mathi.uni-heidelberg.de" -def authenticate(username, password, save=False, url=MUESLI_URL) -> dict: + +def authenticate(username="", password="", save=False, url=MUESLI_URL) -> dict: """A function to authenticate as a user and get the valid headers for this user. Args: @@ -51,28 +53,37 @@ def authenticate(username, password, save=False, url=MUESLI_URL) -> dict: 'Accept': 'application/json' } """ + if not username: + username = input('Please enter your username: ') + logging.debug("Username: %s", username) + header_name = username if path.isfile(header_name) and stat(header_name).st_size != 0: with open(header_name, 'r') as header: header_content = literal_eval(header.read()) - print('Read header from "{}"!'.format(header_name)) + logging.debug('Read header from file "%s"!', header_name) else: - r = requests.post( - url+'/api/v1/login', - data={"email": username, "password": password} - ) + if not password: + password = getpass.getpass(prompt='Password: ') + + r = requests.post(url + '/api/v1/login', + data={ + "email": username, + "password": password + }) token = r.json().get("token", "") - print(json.dumps(r.json())) - header_content = {'Authorization': 'Bearer '+token} + logging.debug(json.dumps(r.json())) + header_content = {'Authorization': 'Bearer ' + token} header_content.update(STATIC_HEADERS) if save: - savePassword(username, header_content) + save_password(username, header_content) return header_content -def savePassword(username, header_content): + +def save_password(username, header_content): """ saves a header generated by authenticate() to a file. """ header_name = username with open(header_name, 'w') as header: header.write("# This file was generated by: {}!\n".format(__file__)) header.write(json.dumps(header_content)) - print('Created header file for user "{}"!'.format(header_name)) + logging.info('Created header file for user "%s"!', header_name) diff --git a/api_tools/requirements.txt b/api_tools/requirements.txt new file mode 100644 index 00000000..daba4a59 --- /dev/null +++ b/api_tools/requirements.txt @@ -0,0 +1,2 @@ +requests +coloredlogs diff --git a/api_tools/sample_requests.py b/api_tools/sample_requests.py index a7d29e5c..3284098d 100644 --- a/api_tools/sample_requests.py +++ b/api_tools/sample_requests.py @@ -28,7 +28,7 @@ from header import authenticate STATIC_HEADERS = {'Accept': 'application/json'} -MUESLI_URL = "http://localhost:8080" +MUESLI_URL = "https://muesli.mathi.uni-heidelberg.de" def get(endpoint, header):