Skip to content

Commit

Permalink
Rewrite using Selenium
Browse files Browse the repository at this point in the history
  • Loading branch information
trewq34 committed Sep 15, 2022
1 parent 9577f96 commit 87853bb
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 152 deletions.
311 changes: 165 additions & 146 deletions auther/providers/helpers/azuread.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
"""
Original code from David Poirier's aws_azuread_login project (https://github.com/david-poirier/aws_azuread_login) and licensed under Apache-2.0
Modified to work with one or many IAM roles
"""

from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import ElementNotVisibleException, ElementNotSelectableException, NoSuchElementException, TimeoutException
from selenium.webdriver.support import expected_conditions as EC
from seleniumwire import webdriver
import uuid
import zlib
import base64
import zlib
from datetime import datetime
import urllib
import pyppeteer
import asyncio
import time
import getpass
import os
import xml.etree.ElementTree
from datetime import datetime
import os

class AzureAdUnknownResponse(Exception):
pass
Expand All @@ -40,129 +41,134 @@ def create_login_url(app_id, tenant_id):

return f'https://login.microsoftonline.com/{tenant_id}/saml2?SAMLRequest={urllib.parse.quote(encoded)}'

async def _load_login(url):
launch_options = {"headless": bool(int(os.environ.get('AUTHER_HEADLESS', 1)))}

chromium_exe = os.environ.get('CHROME_BIN', '')

browser = await pyppeteer.launch(executablePath=chromium_exe , options=launch_options, args=[
'--no-sandbox',
'--single-process',
'--disable-dev-shm-usage',
'--disable-gpu',
'--no-zygote',
'--auth-server-whitelist="_"',
'--auth-negotiate-delegate-whitelist="_"'
])
page = await browser.newPage()
response = await page.goto(
url,
options={
"waitUntil": ["load", "domcontentloaded", "networkidle0", "networkidle2"]
},
)
if response.status != 200:
raise AzureAdUnknownResponse(
f"Invalid status code: {response.status} - check app id and tenant id and try again"
)
return browser, page

def samlHandler(request):
def _req_interceptor(request):
if request.url == 'https://signin.aws.amazon.com/saml':
global roles
roles = _get_roles(request.postData)
roles = _get_roles(request.body)

def _load_login(url):
options = Options()
options.headless = bool(int(os.environ.get('AUTHER_HEADLESS', 1)))
options.add_argument('--auth-server-whitelist="_"')
options.add_argument('--auth-negotiate-delegate-whitelist="_"')

chromium_exe = os.environ.get('AUTHER_CHROME_BIN', ChromeDriverManager().install())

service = Service(executable_path=chromium_exe)
driver = webdriver.Chrome(service=service, options=options)
driver.scopes = [
'.*signin.aws.amazon.com.*',
]
driver.request_interceptor = _req_interceptor

wait = WebDriverWait(driver, timeout=3, poll_frequency=0.25, ignored_exceptions=[ElementNotVisibleException, ElementNotSelectableException, NoSuchElementException, TimeoutException])

async def _check_for_visible_element(page, selector):
try:
element = await page.J(selector)
return element and await element.isIntersectingViewport()
except pyppeteer.errors.NetworkError:
return False
driver.get(url)

async def _input_username(page, username):
input_selector = 'input[type="email"][name="loginfmt"]'
submit_selector = 'input[type="submit"][value="Next"]'
error_selector = "#usernameError"
return driver, wait

def _input_username(driver, wait, username):
while True:
input_selector = driver.find_element(by=By.NAME, value='loginfmt')
submit_selector = driver.find_element(by=By.CSS_SELECTOR, value='input[type="submit"][value="Next"]')

while username in [None, ""]:
username = input("Username: ")

await page.type(input_selector, username)
await page.click(submit_selector)
input_selector.clear()
input_selector.send_keys(username)
submit_selector.click()

while True:
if not await _check_for_visible_element(page, input_selector):
return
elif await _check_for_visible_element(page, error_selector):
username = None
await page.evaluate(
f"() => document.querySelector('{input_selector}').value = ''"
)
print("Unknown username, try again")
break
# wait for one of the above to appear
await asyncio.sleep(0.25)
try:
if wait.until(EC.presence_of_element_located((By.ID, "usernameError"))):
username = None
print("Unknown username, try again")
break
except:
pass

try:
if wait.until(EC.none_of(EC.visibility_of_element_located((By.NAME, "loginfmt")))):
return
except:
pass

# wait for one of the above to appear
time.sleep(0.25)

async def _input_password(page, password):
input_selector = 'input[type="password"][name="passwd"],input[type="password"][name="Password"]'
submit_selector = 'input[type="submit"][value="Sign in"],span[class=submit]'
error_selector = "#passwordError"

def _input_password(driver, wait, password):
while True:
input_selector = driver.find_element(by=By.NAME, value='passwd') or driver.find_element(by=By.NAME, value='Password')
submit_selector = driver.find_element(by=By.CSS_SELECTOR, value='input[type="submit"][value="Sign in"]') or driver.find_element(by=By.CSS_SELECTOR, value='span[class=submit]')

while password in [None, ""]:
password = getpass.getpass("Password: ")

await page.focus(input_selector)
await page.keyboard.type(password)
await page.click(submit_selector)
await asyncio.sleep(1)
input_selector.clear()
input_selector.send_keys(password)
submit_selector.click()

while True:
if not await _check_for_visible_element(page, input_selector):
return
elif await _check_for_visible_element(page, error_selector):
password = None
await page.evaluate(
f"() => document.querySelector('{input_selector}').value = ''"
)
print("Incorrect password, try again")
break
# wait for one of the above to appear
await asyncio.sleep(0.25)
try:
if wait.until(EC.presence_of_element_located((By.ID, "passwordError"))):
password = None
print("Incorrect password, try again")
break
except:
pass

async def _input_code(page, code):
input_selector = 'input[name="otc"]'
submit_selector = 'input[type="submit"][value="Verify"]'
error_selector = "#idSpan_SAOTCC_Error_OTC"
try:
if wait.until(EC.none_of(EC.visibility_of_element_located((By.NAME, "passwd")), EC.visibility_of_element_located((By.NAME, "Password")))):
return
except:
pass

# wait for one of the above to appear
time.sleep(0.25)

def _input_code(driver, wait, code):
while True:
input_selector = driver.find_element(by=By.NAME, value='otc')
submit_selector = driver.find_element(by=By.CSS_SELECTOR, value='input[type="submit"][value="Verify"]')

while code is None:
code = input("One-time code: ")

await page.type(input_selector, code)
await page.click(submit_selector)
input_selector.clear()
input_selector.send_keys(code)
submit_selector.click()

while True:
if not await _check_for_visible_element(page, input_selector):
return
elif await _check_for_visible_element(page, error_selector):
code = None
await page.evaluate(
f"() => document.querySelector('{input_selector}').value = ''"
)
print("Incorrect code, try again")
break
try:
if wait.until(EC.presence_of_element_located((By.ID, "idSpan_SAOTCC_Error_OTC"))):
code = None
print("Incorrect code, try again")
break
except:
pass

try:
if wait.until(EC.none_of(EC.visibility_of_element_located((By.NAME, "otc")))):
return
except:
pass

# wait for one of the above to appear
await asyncio.sleep(0.25)
time.sleep(0.25)

async def _input_stay_signed_in(page, stay_signed_in):
def _input_stay_signed_in(driver, stay_signed_in=False):
if stay_signed_in:
await page.click('input[type="checkbox"][name="DontShowAgain"]')
await page.click('input[type="submit"][value="Yes"]')
DontShowAgain = driver.find_element(by=By.CSS_SELECTOR, value='input[type="checkbox"][name="DontShowAgain"]')
submit = driver.find_element(by=By.CSS_SELECTOR, value='input[type="submit"][value="Yes"]')
DontShowAgain.click()
submit.click()
return
else:
await page.click('input[type="button"][value="No"]')
submit = driver.find_element(by=By.CSS_SELECTOR, value='input[type="button"][value="No"]')
submit.click()
return

def _get_roles(encoded_xml):
saml_roles = []
Expand Down Expand Up @@ -190,59 +196,74 @@ def _get_roles(encoded_xml):
saml_roles.append((assertion_encoded, role_arn, principal_arn))
return saml_roles

async def _auth(url, username=None, password=None, stay_signed_in=False):
browser, page = await _load_login(url)
def _auth(url, username=None, password=None, stay_signed_in=False):
driver, wait = _load_login(url)

global roles
roles = []

before = datetime.now()
during = datetime.now()

found = []

while not roles:
time_diff = (during-before).seconds
if (time_diff >= 60):
raise Exception('hit timeout')

try:
try:
await page.screenshot({'path': '/root/.aws/timeout.png'})
if 'loginfmt' not in found and wait.until(EC.visibility_of_element_located((By.NAME, "loginfmt"))):
_input_username(driver, wait, username)
found.append('loginfmt')
except:
await page.screenshot({'path': 'timeout.png'})
raise Exception('hit timeout')
pass

page.on("request", samlHandler)

if await _check_for_visible_element(
page, 'input[type="email"][name="loginfmt"]'
):
await _input_username(page, username)
elif await _check_for_visible_element(
page, 'input[type="password"][name="passwd"]'
):
await _input_password(page, password)
elif await _check_for_visible_element(
page, 'input[type="password"][name="Password"]'
):
await _input_password(page, password)
elif await _check_for_visible_element(page, 'input[name="otc"]'):
await _input_code(page, None)
elif await _check_for_visible_element(
page, 'input[type="checkbox"][name="DontShowAgain"]'
):
await _input_stay_signed_in(page, stay_signed_in)
elif await _check_for_visible_element(
page, 'div[data-bind="text: unsafe_exceptionMessage"]'
):
try:
await page.screenshot({'path': '/root/.aws/failure.png'})
if 'passwd' not in found and wait.until(EC.any_of(EC.visibility_of_element_located((By.NAME, "passwd")), EC.visibility_of_element_located((By.NAME, "Password")))):
_input_password(driver, wait, password)
found.append('passwd')
except:
await page.screenshot({'path': 'failure.png'})
print(
'Something went wrong - set "headless=False" in the do_login method and try again to debug.'
)
await browser.close()
pass

try:
if 'notify' not in found and 'otc' not in found and wait.until(EC.visibility_of_element_located((By.ID, "idDiv_SAOTCAS_Title"))):
print('MFA notification sent, approve on your device.')
found.append('notify')
except:
pass

try:
if 'otc' not in found and 'notify' not in found and wait.until(EC.visibility_of_element_located((By.NAME, "otc"))):
_input_code(driver, wait, None)
found.append('otc')
except:
pass

try:
if 'DontShowAgain' not in found and wait.until(EC.visibility_of_element_located((By.NAME, "DontShowAgain"))):
_input_stay_signed_in(driver, stay_signed_in)
found.append('DontShowAgain')
except:
pass

try:
if wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'div[data-bind="text: unsafe_exceptionMessage"]'))):
print(
'Something went wrong - set "headless=False" in the do_login method and try again to debug.'
)
driver.quit()
break

during = datetime.now()
except:
pass
except KeyboardInterrupt:
driver.quit()
break
else:
# wait for a known option to appear
await asyncio.sleep(0.25)
except:
print('in except, found nothing')
during = datetime.now()

if roles:
Expand All @@ -255,11 +276,9 @@ def do_login(
username=None,
password=None,
stay_signed_in=False):
return asyncio.get_event_loop().run_until_complete(
_auth(
url,
username=username,
password=password,
stay_signed_in=stay_signed_in,
)
return _auth(
url,
username=username,
password=password,
stay_signed_in=stay_signed_in,
)
Loading

0 comments on commit 87853bb

Please sign in to comment.