Skip to content

Commit

Permalink
Waiting for anisette server. Using fallback for SMS2FA
Browse files Browse the repository at this point in the history
  • Loading branch information
danny committed Feb 8, 2025
1 parent 2c847ce commit a8d27c1
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 34 deletions.
29 changes: 24 additions & 5 deletions endpoint/mh_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import ssl
import sys
import time
from collections import OrderedDict
from datetime import datetime, timezone
Expand All @@ -29,9 +30,9 @@ def addCORSHeaders(self):
self.send_header("Access-Control-Allow-Private-Network","true")

def authenticate(self):
user = config.getEndpointUser()
passw = config.getEndpointPass()
if (user is None or user == "") and (passw is None or passw == ""):
endpoint_user = config.getEndpointUser()
endpoint_pass = config.getEndpointPass()
if (endpoint_user is None or endpoint_user == "") and (endpoint_pass is None or endpoint_pass == ""):
return True

auth_header = self.headers.get('authorization')
Expand All @@ -40,7 +41,7 @@ def authenticate(self):
if auth_type.lower() == 'basic':
auth_decoded = base64.b64decode(auth_encoded).decode('utf-8')
username, password = auth_decoded.split(':', 1)
if username == user and password == passw:
if username == endpoint_user and password == endpoint_pass:
return True

return False
Expand Down Expand Up @@ -149,8 +150,26 @@ def getAuth(regenerate=False, second_factor='sms'):
return j['dsid'], j['searchPartyToken']



def check_if_anisette_is_reachable(max_retries=3, retry_delay=10):
server_url = config.getAnisetteServer()
logging.info(f'Checking if Anisette {server_url} is reachable')
for attempt in range(max_retries):
try:
response = requests.get(server_url, timeout=5)
response.raise_for_status()
return
except (requests.RequestException, requests.HTTPError) as e:
logger.error(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
logger.error(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
logger.error(f"Max retries reached. Program will exit. Make sure your Anisette is reachable and start again with 'docker start -ai macless-haystack'")
sys.exit()

if __name__ == "__main__":
logging.debug(f'Searching for token at ' + config.getConfigFile())
check_if_anisette_is_reachable()
logging.info(f'Searching for token at ' + config.getConfigFile())
if not os.path.exists(config.getConfigFile()):
logging.info(f'No auth-token found.')
apple_cryptography.registerDevice()
Expand Down
82 changes: 53 additions & 29 deletions endpoint/register/pypush_gsa_icloud.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

import urllib3
from getpass import getpass
import plistlib as plist
Expand Down Expand Up @@ -37,6 +39,7 @@ def icloud_login_mobileme(username='', password=''):
username = input('Apple ID: ')
if not password:
password = getpass('Password: ')

g = gsa_authenticate(username, password)
pet = g["t"]["com.apple.gs.idms.pet"]["token"]
adsid = g["adsid"]
Expand Down Expand Up @@ -71,10 +74,10 @@ def icloud_login_mobileme(username='', password=''):
def gsa_authenticate(username, password):
# Password is None as we'll provide it later
usr = srp.User(username, bytes(), hash_alg=srp.SHA256, ng_type=srp.NG_2048)
_, A = usr.start_authentication()
_, a = usr.start_authentication()
logger.info("Authentication request initialization")
r = gsa_authenticated_request(
{"A2k": A, "ps": ["s2k", "s2k_fo"], "u": username, "o": "init"})
{"A2k": a, "ps": ["s2k", "s2k_fo"], "u": username, "o": "init"})

if r["sp"] not in ["s2k", "s2k_fo"]:
logger.warning(f"This implementation only supports s2k and sk2_fo. Server returned {r['sp']}")
Expand Down Expand Up @@ -219,15 +222,13 @@ def decrypt_cbc(usr, data):
return padder.update(data) + padder.finalize()


WAITING_TIME = 60


def sms_second_factor(dsid, idms_token):
identity_token = base64.b64encode(
(dsid + ":" + idms_token).encode()).decode()

# TODO: Actually do this request to get user prompt data
# a = requests.get("https://gsa.apple.com/auth", verify=False)
# This request isn't strictly necessary though,
# and most accounts should have their id 1 SMS, if not contribute ;)

headers = {
"User-Agent": "Xcode",
"Accept-Language": "en-us",
Expand All @@ -238,39 +239,46 @@ def sms_second_factor(dsid, idms_token):
}

headers.update(generate_anisette_headers())

# Extract the "boot_args" from the auth page to get the id of the trusted phone number
pattern = r'<script.*class="boot_args">\s*(.*?)\s*</script>'
auth = requests.get("https://gsa.apple.com/auth", headers=headers, verify=False)
sms_id = 1
match = re.search(pattern, auth.text, re.DOTALL)
if match:
boot_args = json.loads(match.group(1).strip())
try:
try:
sms_id = boot_args["direct"]["phoneNumberVerification"]["trustedPhoneNumber"]["id"]
except KeyError as e:
logger.debug(match.group(1).strip())
logger.error("Key for sms id not found. Using the first phone number")
except KeyError as e:
logger.debug(match.group(1).strip())
logger.error("Key for sms id not found. Using the first phone number")
else:
logger.debug(auth.text)
logger.error("Script for sms id not found. Using the first phone number")

logger.info(f"Using phone with id {sms_id} for SMS2FA")
body = {"phoneNumber": {"id": sms_id }, "mode": "sms"}

# This will send the 2FA code to the user's phone over SMS
# We don't care about the response, it's just some HTML with a form for entering the code
# Easier to just use a text prompt
requests.put(
"https://gsa.apple.com/auth/verify/phone/",
json=body,
headers=headers,
verify=False,
timeout=5
)
logger.debug(auth.text)
logger.error("Script for sms id not found. Using the first phone number")

logger.info(f"Using phone with id {sms_id} for SMS2FA")
body = {"phoneNumber": {"id": sms_id}, "mode": "sms"}
for handler in logger.handlers:
handler.flush()
# Prompt for the 2FA code. It's just a string like '123456', no dashes or spaces
code = input("Enter SMS 2FA code: ")
start_time = time.perf_counter()
code = input(
f"Enter SMS 2FA code (If you do not receive a code, wait {WAITING_TIME}s and press Enter. An attempt will be made to request the SMS in another way.): ")
end_time = time.perf_counter()

if code == "":
elapsed_time = int(end_time - start_time)
if elapsed_time < WAITING_TIME:
waiting_time = WAITING_TIME - elapsed_time
logger.info(
f"You only waited {elapsed_time} seconds. The next request will be started in {waiting_time} seconds")
time.sleep(waiting_time)
code = input(f"Enter SMS 2FA code if you have received it in the meantime, otherwise press Enter: ")

if code == "":
code = request_code(headers)
else:
code = request_code(headers)

body['securityCode'] = {'code': code}

Expand All @@ -294,3 +302,19 @@ def sms_second_factor(dsid, idms_token):
else:
raise Exception(
"2FA unsuccessful. Maybe wrong code or wrong number. Check your account details.")


def request_code(headers):
# This will send the 2FA code to the user's phone over SMS
# We don't care about the response, it's just some HTML with a form for entering the code
# Easier to just use a text prompt
body = {"phoneNumber": {"id": 1}, "mode": "sms"}
requests.put(
"https://gsa.apple.com/auth/verify/phone/",
json=body,
headers=headers,
verify=False,
timeout=5
)
code = input(f"Enter SMS 2FA code:")
return code

0 comments on commit a8d27c1

Please sign in to comment.