diff --git a/Google Authenticator.alfredworkflow b/Google Authenticator.alfredworkflow index fddb4dd..9e32e70 100644 Binary files a/Google Authenticator.alfredworkflow and b/Google Authenticator.alfredworkflow differ diff --git a/src/otp.py b/src/otp.py index 3e0ff4e..ad1243c 100644 --- a/src/otp.py +++ b/src/otp.py @@ -47,11 +47,7 @@ def is_otp_secret_valid(secret): return True -def get_hotp_key(key=None, secret=None, hexkey=None): - if hexkey: - key = hexkey.decode('hex') - if secret: - secret = secret.replace(' ', '') - secret = pad_base32_str(secret, '=') - key = base64.b32decode(secret, casefold=True) - return key +def get_hotp_key(secret): + secret = secret.replace(' ', '') + secret = pad_base32_str(secret, '=') + return base64.b32decode(secret, casefold=True) diff --git a/src/storage/apple_keychain_storage.py b/src/storage/apple_keychain_storage.py new file mode 100644 index 0000000..bac0911 --- /dev/null +++ b/src/storage/apple_keychain_storage.py @@ -0,0 +1,123 @@ +import os +import subprocess +import re + +from storage.storage_interface import StorageInterface + +class AppleKeychainStorage(StorageInterface): + def __init__(self, name = 'alfred-totp'): + self._name = name + self._file = name + ".keychain" + self._data = {} + + if not self.keychain_exists(): + self.create_storage() + + self._data = self._parse(self._dump()) + + def get_file(self): + return "~/Library/Keychains/" + self._file + "-db" + + def validate_storage(self): + pass + + def is_empty(self): + return not self._data + + def keychain_exists(self): + # The "security list-keychains -d user" only shows keychains from "Keychain Access" app. + return os.path.isfile(self._get_resolved_file()) + + def _get_resolved_file(self): + return os.path.expanduser(self.get_file()) + + def create_storage(self): + if self.keychain_exists(): + return + + self._run_command(["security", "create-keychain", "-P", self._file]) + self._add_to_keychain_access() + + def _add_to_keychain_access(self): + user_keychains = self.get_user_keychains() + user_keychains.append(self._get_resolved_file()) + + # Removes duplicates. + user_keychains = list(set(user_keychains)) + + self._run_command(["security", "list-keychains", "-d", "user", "-s"] + user_keychains) + + def get_user_keychains(self): + results = [] + lines = self._run_command(["security", "list-keychains", "-d", "user"]) + + for line in lines: + line = line.strip('" ') + results.append(line) + + return results + + def get_secret(self, account): + if account in self._data: + return self._data[account] + else: + raise Exception("Service not found") + + def _dump(self): + return self._run_command(["security", "dump-keychain", "-d", self._file]) + + def _parse(self, raw_dump_data): + account = None + capture_secret = False + results = {} + + if not raw_dump_data: + return {} + + for line in raw_dump_data: + line = line.strip() + + regs = re.search(r'"svce"="([^"]+)"', line) + if regs: + account = regs.group(1) + continue + + if line == "data:" and account: + capture_secret = True + continue + + if capture_secret: + results[account] = line.strip('"') + + # Reset for the next entry + account = None + capture_secret = False + + return results + + def get_accounts(self): + return self._data.keys() + + def add_account(self, account, secret): + if account in self._data: + return False + + try: + subprocess.run( + ["security", "add-generic-password", "-a", self._name, "-s", account, "-w", secret, self._file], + check=True + ) + + self._data[account] = secret + + return True + except subprocess.CalledProcessError as e: + return False + + def _run_command(self, command): + try: + result = subprocess.run(command, capture_output=True, text=True, check=True) + return result.stdout.splitlines() + except subprocess.CalledProcessError as e: + print(f"Error executing security command: {e}") + return [] diff --git a/src/storage/exceptions.py b/src/storage/exceptions.py new file mode 100644 index 0000000..401cf37 --- /dev/null +++ b/src/storage/exceptions.py @@ -0,0 +1,3 @@ +class StorageException(Exception): + def __init__(self, file): + self.file = file diff --git a/src/storage/plain_text_storage.py b/src/storage/plain_text_storage.py new file mode 100644 index 0000000..89ee796 --- /dev/null +++ b/src/storage/plain_text_storage.py @@ -0,0 +1,69 @@ +import os +import configparser + +from storage.exceptions import StorageException + +from storage.storage_interface import StorageInterface + + +class PlainTextStorage(StorageInterface): + _config_file_initial_content = """ +#Examples of valid configurations: +#[google - bob@gmail.com] +#secret=xxxxxxxxxxxxxxxxxx +# +#[evernote - robert] +#secret=yyyyyyyyyyyyyyyyyy +""" + + def __init__(self, config_file='~/.gauth'): + self._config_file = config_file + self.config_file = os.path.expanduser(self._config_file) + + self.config = configparser.RawConfigParser() + + # If the configuration file doesn't exist, create an empty one + if not os.path.isfile(self.config_file): + self.create_storage() + + def get_file(self): + return self._config_file + + def create_storage(self): + if os.path.isfile(self.config_file): + return + + with open(self.config_file, 'w') as f: + f.write(self._config_file_initial_content) + f.close() + + def validate_storage(self): + try: + self.config.read(self.config_file) + except Exception as e: + raise StorageException(self._config_file) from e + + def is_empty(self): + return not self.config.sections() + + def get_secret(self, account): + try: + return self.config.get(account, 'secret') + except: + raise Exception("Service not found") + + def get_accounts(self): + return self.config.sections() + + def add_account(self, account, secret): + config_file = open(self.config_file, 'r+') + try: + self.config.add_section(account) + self.config.set(account, "secret", secret) + self.config.write(config_file) + except configparser.DuplicateSectionError: + return False + finally: + config_file.close() + + return True diff --git a/src/storage/storage_interface.py b/src/storage/storage_interface.py new file mode 100644 index 0000000..acf7139 --- /dev/null +++ b/src/storage/storage_interface.py @@ -0,0 +1,21 @@ +class StorageInterface: + def get_file(self): + pass + + def create_storage(self): + pass + + def validate_storage(self): + pass + + def is_empty(self): + pass + + def get_secret(self, account): + pass + + def get_accounts(self): + pass + + def add_account(self, account, secret): + pass diff --git a/src/workflow.py b/src/workflow.py index 6851470..173b651 100644 --- a/src/workflow.py +++ b/src/workflow.py @@ -1,80 +1,42 @@ # -*- coding: utf-8 -*- -import os -import configparser import time import alfred import otp +from storage.plain_text_storage import PlainTextStorage +from storage.apple_keychain_storage import AppleKeychainStorage +from storage.exceptions import StorageException class AlfredGAuth(alfred.AlfredWorkflow): - _config_file_initial_content = """ -#Examples of valid configurations: -#[google - bob@gmail.com] -#secret=xxxxxxxxxxxxxxxxxx -# -#[evernote - robert] -#secret=yyyyyyyyyyyyyyyyyy -""" - _reserved_words = ['add', 'update', 'remove'] - def __init__(self, config_file='~/.gauth', max_results=20): + def __init__(self, max_results=20): self.max_results = max_results - self._config_file = config_file - self.config_file = os.path.expanduser(self._config_file) - self.config = configparser.RawConfigParser() - self.config.read(self.config_file) + self._init_storage() - # If the configuration file doesn't exist, create an empty one - if not os.path.isfile(self.config_file): - self.create_config() + def _init_storage(self): + self._storage = AppleKeychainStorage() try: - if not self.config.sections(): - # If the configuration file is empty, - # tell the user to add secrets to it - self.write_item(self.config_file_is_empty_item()) - return - except Exception as e: + self._storage.validate_storage() + except StorageException as e: item = self.exception_item(title='{}: Invalid syntax' - .format(self._config_file), - exception=e) + .format(str(e)), + exception=e.__context__) self.write_item(item) - def create_config(self): - with open(self.config_file, 'w') as f: - f.write(self._config_file_initial_content) - f.close() - def config_get_account_token(self, account): try: - secret = self.config.get(account, 'secret') - except: - secret = None - - try: - key = self.config.get(account, 'key') - except: - key = None - - try: - hexkey = self.config.get(account, 'hexkey') - except: - hexkey = None - - try: - key = otp.get_hotp_key(secret=secret, key=key, hexkey=hexkey) + secret = self._storage.get_secret(account) + key = otp.get_hotp_key(secret) except: key = '' return otp.get_totp_token(key) - def config_list_accounts(self): - return self.config.sections() - def filter_by_account(self, account, query): return len(query.strip()) and not query.lower() in str(account).lower() @@ -85,30 +47,40 @@ def account_item(self, account, token, uid=None): def time_remaining_item(self): # The uid for the remaining time will be the current time, - # so it will appears always at the last position in the list + # so it will appear always at the last position in the list time_remaining = otp.get_totp_time_remaining() return alfred.Item({u'uid': time.time(), u'arg': '', u'ignore': 'yes'}, 'Time Remaining: {}s'.format(time_remaining), None, 'time.png') + def detect_empty_storage(self): + if self._storage.is_empty(): + # If the configuration file is empty, + # tell the user to add secrets to it + self.write_item(self.config_file_is_empty_item()) + def config_file_is_empty_item(self): return self.warning_item(title='GAuth is not yet configured', message='You must add your secrets to ' 'the {} file (see documentation)' - .format(self._config_file)) + .format(self._storage.get_file())) def search_by_account_iter(self, query): if self.is_command(query): return + i = 0 - for account in self.config_list_accounts(): + for account in self._storage.get_accounts(): if self.filter_by_account(account, query): continue + token = self.config_get_account_token(account) entry = self.account_item(uid=i, account=account, token=token) + if entry: yield entry i += 1 + if i > 0: yield self.time_remaining_item() else: @@ -116,28 +88,26 @@ def search_by_account_iter(self, query): 'There is no account matching "{}" ' 'on your configuration file ' '({})'.format(query, - self._config_file)) + self._storage.get_file())) - def add_account(self, account, secret): + def _add_account(self, account, secret): if not otp.is_otp_secret_valid(secret): - return "Invalid secret:\n[{0}]".format(secret) - - config_file = open(self.config_file, 'r+') - try: - self.config.add_section(account) - self.config.set(account, "secret", secret) - self.config.write(config_file) - except configparser.DuplicateSectionError: - return "Account already exists:\n[{0}]".format(account) - finally: - config_file.close() + raise Exception("Invalid secret:\n[{0}]".format(secret)) - return "A new account was added:\n[{0}]".format(account) + return self._storage.add_account(account, secret) def do_search_by_account(self, query): + self.detect_empty_storage() + self.write_items(self.search_by_account_iter(query)) def do_add_account(self, query): + if query == "migrate": + self._migrate_storage() + return + + self.detect_empty_storage() + try: account, secret = query.split(",", 1) account = account.strip() @@ -146,8 +116,46 @@ def do_add_account(self, query): return self.write_text('Invalid arguments!\n' 'Please enter: account, secret.') - return self.write_text(self.add_account(account, secret)) + try: + if self._add_account(account, secret): + return self.write_text("A new account was added:\n[{0}]".format(account)) + else: + return self.write_text("Account already exists:\n[{0}]".format(account)) + except Exception as e: + return self.write_text(str(e)) + + def _migrate_storage(self): + if self._storage.__class__.__name__ != 'AppleKeychainStorage': + self.write_text('Storage is already migrated') + return + source_storage = PlainTextStorage() + source_storage_file = source_storage.get_file() + + try: + source_storage.validate_storage() + except StorageException as e: + self.write_text(str(e)) + return + + if source_storage.is_empty(): + self.write_text('No accounts to migrate. Please delete "%s" file.' % source_storage_file) + return + + migrated_account_count = 0 + + try: + for account in source_storage.get_accounts(): + secret = source_storage.get_secret(account) + + if self._add_account(account, secret): + migrated_account_count += 1 + + self.write_text('Migrated %s accounts. Please delete "%s" file.' + % (migrated_account_count, source_storage_file)) + except Exception as e: + # Happens, when secret is invalid. + self.write_text(str(e)) def main(action, query): alfred_gauth = AlfredGAuth() @@ -155,5 +163,9 @@ def main(action, query): if __name__ == "__main__": - main(action=alfred.args()[0], query=alfred.args()[1]) + if len(alfred.args()) < 2: + query = "" + else: + query = alfred.args()[1] + main(action=alfred.args()[0], query=query)