-
-
Notifications
You must be signed in to change notification settings - Fork 628
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor non-API routes into separate files. (#2082)
- Loading branch information
1 parent
3b6ed15
commit a4ed15f
Showing
16 changed files
with
1,663 additions
and
1,546 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import uuid | ||
import yaml | ||
from datetime import datetime | ||
from flask import render_template | ||
from os import getenv, path | ||
|
||
from lib import assets_helper, db | ||
from lib.github import is_up_to_date | ||
from lib.utils import get_video_duration | ||
from settings import settings | ||
|
||
|
||
def template(template_name, **context): | ||
""" | ||
This is a template response wrapper that shares the | ||
same function signature as Flask's render_template() method | ||
but also injects some global context.""" | ||
|
||
# Add global contexts | ||
context['date_format'] = settings['date_format'] | ||
context['default_duration'] = settings['default_duration'] | ||
context['default_streaming_duration'] = ( | ||
settings['default_streaming_duration']) | ||
context['template_settings'] = { | ||
'imports': ['from lib.utils import template_handle_unicode'], | ||
'default_filters': ['template_handle_unicode'], | ||
} | ||
context['up_to_date'] = is_up_to_date() | ||
context['use_24_hour_clock'] = settings['use_24_hour_clock'] | ||
|
||
return render_template(template_name, context=context) | ||
|
||
|
||
def prepare_default_asset(**kwargs): | ||
if kwargs['mimetype'] not in ['image', 'video', 'webpage']: | ||
return | ||
|
||
asset_id = 'default_{}'.format(uuid.uuid4().hex) | ||
duration = ( | ||
int(get_video_duration(kwargs['uri']).total_seconds()) | ||
if "video" == kwargs['mimetype'] | ||
else kwargs['duration'] | ||
) | ||
|
||
return { | ||
'asset_id': asset_id, | ||
'duration': duration, | ||
'end_date': kwargs['end_date'], | ||
'is_active': 1, | ||
'is_enabled': True, | ||
'is_processing': 0, | ||
'mimetype': kwargs['mimetype'], | ||
'name': kwargs['name'], | ||
'nocache': 0, | ||
'play_order': 0, | ||
'skip_asset_check': 0, | ||
'start_date': kwargs['start_date'], | ||
'uri': kwargs['uri'] | ||
} | ||
|
||
|
||
def add_default_assets(): | ||
settings.load() | ||
|
||
datetime_now = datetime.now() | ||
default_asset_settings = { | ||
'start_date': datetime_now, | ||
'end_date': datetime_now.replace(year=datetime_now.year + 6), | ||
'duration': settings['default_duration'] | ||
} | ||
|
||
default_assets_yaml = path.join( | ||
getenv('HOME'), '.screenly/default_assets.yml') | ||
|
||
with open(default_assets_yaml, 'r') as yaml_file: | ||
default_assets = yaml.safe_load(yaml_file).get('assets') | ||
with db.conn(settings['database']) as conn: | ||
for default_asset in default_assets: | ||
default_asset_settings.update({ | ||
'name': default_asset.get('name'), | ||
'uri': default_asset.get('uri'), | ||
'mimetype': default_asset.get('mimetype') | ||
}) | ||
asset = prepare_default_asset(**default_asset_settings) | ||
if asset: | ||
assets_helper.create(conn, asset) | ||
|
||
|
||
def remove_default_assets(): | ||
settings.load() | ||
with db.conn(settings['database']) as conn: | ||
for asset in assets_helper.read(conn): | ||
if asset['asset_id'].startswith('default_'): | ||
assets_helper.delete(conn, asset['asset_id']) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,266 @@ | ||
import ipaddress | ||
import logging | ||
import psutil | ||
from datetime import timedelta | ||
from flask import Blueprint, request | ||
from hurry.filesize import size | ||
from os import getenv, statvfs | ||
from platform import machine | ||
from urllib.parse import urlparse | ||
|
||
from anthias_app.helpers import ( | ||
add_default_assets, | ||
remove_default_assets, | ||
template, | ||
) | ||
from lib import ( | ||
diagnostics, | ||
device_helper, | ||
) | ||
from lib.auth import authorized | ||
from lib.utils import ( | ||
connect_to_redis, | ||
get_balena_supervisor_version, | ||
get_node_ip, | ||
get_node_mac_address, | ||
is_balena_app, | ||
is_demo_node, | ||
is_docker, | ||
) | ||
from settings import ( | ||
CONFIGURABLE_SETTINGS, | ||
DEFAULTS, | ||
settings, | ||
ZmqPublisher, | ||
) | ||
|
||
r = connect_to_redis() | ||
anthias_app_bp = Blueprint('anthias_app', __name__) | ||
|
||
|
||
@anthias_app_bp.route('/') | ||
@authorized | ||
def index(): | ||
player_name = settings['player_name'] | ||
my_ip = urlparse(request.host_url).hostname | ||
is_demo = is_demo_node() | ||
balena_uuid = getenv("BALENA_APP_UUID", None) | ||
|
||
ws_addresses = [] | ||
|
||
if settings['use_ssl']: | ||
ws_addresses.append('wss://' + my_ip + '/ws/') | ||
else: | ||
ws_addresses.append('ws://' + my_ip + '/ws/') | ||
|
||
if balena_uuid: | ||
ws_addresses.append( | ||
'wss://{}.balena-devices.com/ws/'.format(balena_uuid)) | ||
|
||
return template( | ||
'index.html', | ||
ws_addresses=ws_addresses, | ||
player_name=player_name, | ||
is_demo=is_demo, | ||
is_balena=is_balena_app(), | ||
) | ||
|
||
|
||
@anthias_app_bp.route('/settings', methods=["GET", "POST"]) | ||
@authorized | ||
def settings_page(): | ||
context = {'flash': None} | ||
|
||
if request.method == "POST": | ||
try: | ||
# Put some request variables in local variables to make them | ||
# easier to read. | ||
current_pass = request.form.get('current-password', '') | ||
auth_backend = request.form.get('auth_backend', '') | ||
|
||
if ( | ||
auth_backend != settings['auth_backend'] | ||
and settings['auth_backend'] | ||
): | ||
if not current_pass: | ||
raise ValueError( | ||
"Must supply current password to change " | ||
"authentication method" | ||
) | ||
if not settings.auth.check_password(current_pass): | ||
raise ValueError("Incorrect current password.") | ||
|
||
prev_auth_backend = settings['auth_backend'] | ||
if not current_pass and prev_auth_backend: | ||
current_pass_correct = None | ||
else: | ||
current_pass_correct = ( | ||
settings | ||
.auth_backends[prev_auth_backend] | ||
.check_password(current_pass) | ||
) | ||
next_auth_backend = settings.auth_backends[auth_backend] | ||
next_auth_backend.update_settings(current_pass_correct) | ||
settings['auth_backend'] = auth_backend | ||
|
||
for field, default in list(CONFIGURABLE_SETTINGS.items()): | ||
value = request.form.get(field, default) | ||
|
||
if not value and field in [ | ||
'default_duration', | ||
'default_streaming_duration', | ||
]: | ||
value = str(0) | ||
if isinstance(default, bool): | ||
value = value == 'on' | ||
|
||
if field == 'default_assets' and settings[field] != value: | ||
if value: | ||
add_default_assets() | ||
else: | ||
remove_default_assets() | ||
|
||
settings[field] = value | ||
|
||
settings.save() | ||
publisher = ZmqPublisher.get_instance() | ||
publisher.send_to_viewer('reload') | ||
context['flash'] = { | ||
'class': "success", | ||
'message': "Settings were successfully saved.", | ||
} | ||
except ValueError as e: | ||
context['flash'] = {'class': "danger", 'message': e} | ||
except IOError as e: | ||
context['flash'] = {'class': "danger", 'message': e} | ||
except OSError as e: | ||
context['flash'] = {'class': "danger", 'message': e} | ||
else: | ||
settings.load() | ||
for field, default in list(DEFAULTS['viewer'].items()): | ||
context[field] = settings[field] | ||
|
||
auth_backends = [] | ||
for backend in settings.auth_backends_list: | ||
if backend.template: | ||
html, ctx = backend.template | ||
context.update(ctx) | ||
else: | ||
html = None | ||
auth_backends.append({ | ||
'name': backend.name, | ||
'text': backend.display_name, | ||
'template': html, | ||
'selected': ( | ||
'selected' | ||
if settings['auth_backend'] == backend.name | ||
else '' | ||
) | ||
}) | ||
|
||
try: | ||
ip_addresses = get_node_ip().split() | ||
except Exception as error: | ||
logging.warning(f"Error getting IP addresses: {error}") | ||
ip_addresses = ['IP_ADDRESS'] | ||
|
||
context.update({ | ||
'user': settings['user'], | ||
'need_current_password': bool(settings['auth_backend']), | ||
'is_balena': is_balena_app(), | ||
'is_docker': is_docker(), | ||
'auth_backend': settings['auth_backend'], | ||
'auth_backends': auth_backends, | ||
'ip_addresses': ip_addresses, | ||
'host_user': getenv('HOST_USER') | ||
}) | ||
|
||
return template('settings.html', **context) | ||
|
||
|
||
@anthias_app_bp.route('/system-info') | ||
@authorized | ||
def system_info(): | ||
loadavg = diagnostics.get_load_avg()['15 min'] | ||
display_power = r.get('display_power') | ||
|
||
# Calculate disk space | ||
slash = statvfs("/") | ||
free_space = size(slash.f_bavail * slash.f_frsize) | ||
|
||
# Memory | ||
virtual_memory = psutil.virtual_memory() | ||
memory = { | ||
'total': virtual_memory.total >> 20, | ||
'used': virtual_memory.used >> 20, | ||
'free': virtual_memory.free >> 20, | ||
'shared': virtual_memory.shared >> 20, | ||
'buff': virtual_memory.buffers >> 20, | ||
'available': virtual_memory.available >> 20 | ||
} | ||
|
||
# Get uptime | ||
system_uptime = timedelta(seconds=diagnostics.get_uptime()) | ||
|
||
# Player name for title | ||
player_name = settings['player_name'] | ||
|
||
device_model = device_helper.parse_cpu_info().get('model') | ||
|
||
if device_model is None and machine() == 'x86_64': | ||
device_model = 'Generic x86_64 Device' | ||
|
||
version = '{}@{}'.format( | ||
diagnostics.get_git_branch(), | ||
diagnostics.get_git_short_hash() | ||
) | ||
|
||
return template( | ||
'system-info.html', | ||
player_name=player_name, | ||
loadavg=loadavg, | ||
free_space=free_space, | ||
uptime=system_uptime, | ||
memory=memory, | ||
display_power=display_power, | ||
device_model=device_model, | ||
version=version, | ||
mac_address=get_node_mac_address(), | ||
is_balena=is_balena_app(), | ||
) | ||
|
||
|
||
@anthias_app_bp.route('/integrations') | ||
@authorized | ||
def integrations(): | ||
|
||
context = { | ||
'player_name': settings['player_name'], | ||
'is_balena': is_balena_app(), | ||
} | ||
|
||
if context['is_balena']: | ||
context['balena_device_id'] = getenv('BALENA_DEVICE_UUID') | ||
context['balena_app_id'] = getenv('BALENA_APP_ID') | ||
context['balena_app_name'] = getenv('BALENA_APP_NAME') | ||
context['balena_supervisor_version'] = get_balena_supervisor_version() | ||
context['balena_host_os_version'] = getenv('BALENA_HOST_OS_VERSION') | ||
context['balena_device_name_at_init'] = getenv( | ||
'BALENA_DEVICE_NAME_AT_INIT') | ||
|
||
return template('integrations.html', **context) | ||
|
||
|
||
@anthias_app_bp.route('/splash-page') | ||
def splash_page(): | ||
ip_addresses = [] | ||
|
||
for ip_address in get_node_ip().split(): | ||
ip_address_object = ipaddress.ip_address(ip_address) | ||
|
||
if isinstance(ip_address_object, ipaddress.IPv6Address): | ||
ip_addresses.append(f'http://[{ip_address}]') | ||
else: | ||
ip_addresses.append(f'http://{ip_address}') | ||
|
||
return template('splash-page.html', ip_addresses=ip_addresses) |
Empty file.
Oops, something went wrong.