Skip to content

Commit

Permalink
Move app resolution into parse_cli; deal with multiple apps more cleanly
Browse files Browse the repository at this point in the history
  • Loading branch information
kgaughan committed Dec 1, 2024
1 parent d70c0d6 commit 5cceabc
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 65 deletions.
29 changes: 25 additions & 4 deletions src/waitress/adjustments.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Adjustments are tunable parameters.
"""
import getopt
import pkgutil
import socket
import warnings

Expand Down Expand Up @@ -95,6 +96,10 @@ class _int_marker(int):
pass


class AppResolutionError(Exception):
"""The named WSGI application could not be resolved."""


class Adjustments:
"""This class contains tunable parameters."""

Expand Down Expand Up @@ -467,6 +472,7 @@ def parse_args(cls, argv):
"app": None,
}

app = None
opts, args = getopt.getopt(argv, "", long_opts)
for opt, value in opts:
param = opt.lstrip("-").replace("-", "_")
Expand All @@ -481,16 +487,31 @@ def parse_args(cls, argv):
elif param in ("help", "call"):
kw[param] = True
elif param == "app":
kw[param] = value
app = value
elif cls._param_map[param] is asbool:
kw[param] = "true"
else:
kw[param] = value

if kw["app"] is None and len(args) > 0:
kw["app"] = args.pop(0)
if not kw["help"]:
if app is None and len(args) > 0:
app = args.pop(0)
if app is None:
raise AppResolutionError("Specify an application")
if len(args) > 0:
raise AppResolutionError("Provide only one WSGI app")

# Get the WSGI function.
try:
kw["app"] = pkgutil.resolve_name(app)
except (ValueError, ImportError, AttributeError) as exc:
raise AppResolutionError(f"Cannot import WSGI application: {exc}")
if kw["call"]:
kw["app"] = kw["app"]()

del kw["call"]

return kw, args
return kw

@classmethod
def check_sockets(cls, sockets):
Expand Down
32 changes: 9 additions & 23 deletions src/waitress/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,16 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Command line runner.
"""

"""Command line runner."""

import getopt
import logging
import os
import os.path
import pkgutil
import sys

from waitress import serve
from waitress.adjustments import Adjustments
from waitress.adjustments import Adjustments, AppResolutionError
from waitress.utilities import logger

HELP = """\
Expand Down Expand Up @@ -285,42 +282,31 @@ def show_help(stream, name, error=None): # pragma: no cover

def run(argv=sys.argv, _serve=serve):
"""Command line runner."""
# Add the current directory onto sys.path
sys.path.append(os.getcwd())

name = os.path.basename(argv[0])

try:
kw, args = Adjustments.parse_args(argv[1:])
except getopt.GetoptError as exc:
kw = Adjustments.parse_args(argv[1:])
except (getopt.GetoptError, AppResolutionError) as exc:
show_help(sys.stderr, name, str(exc))
return 1

if kw["help"]:
show_help(sys.stdout, name)
return 0

if kw["app"] is None:
show_help(sys.stderr, name, "Specify an application")
return 1

# set a default level for the logger only if it hasn't been set explicitly
# note that this level does not override any parent logger levels,
# handlers, etc but without it no log messages are emitted by default
if logger.level == logging.NOTSET:
logger.setLevel(logging.INFO)

# Add the current directory onto sys.path
sys.path.append(os.getcwd())

# Get the WSGI function.
try:
app = pkgutil.resolve_name(kw["app"])
except (ValueError, ImportError, AttributeError) as exc:
show_help(sys.stderr, name, str(exc))
return 1
if kw["call"]:
app = app()
app = kw["app"]

# These arguments are specific to the runner, not waitress itself.
del kw["call"], kw["help"], kw["app"]
del kw["help"], kw["app"]

_serve(app, **kw)
return 0
74 changes: 40 additions & 34 deletions tests/test_adjustments.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,48 +395,57 @@ def assertDictContainsSubset(self, subset, dictionary):
self.assertTrue(set(subset.items()) <= set(dictionary.items()))

def test_noargs(self):
opts, args = self.parse([])
self.assertDictEqual(opts, {"call": False, "help": False, "app": None})
self.assertSequenceEqual(args, [])
from waitress.adjustments import AppResolutionError

self.assertRaises(AppResolutionError, self.parse, [])

def test_help(self):
opts, args = self.parse(["--help"])
self.assertDictEqual(opts, {"call": False, "help": True, "app": None})
self.assertSequenceEqual(args, [])
opts = self.parse(["--help"])
self.assertDictEqual(opts, {"help": True, "app": None})

def test_call(self):
opts, args = self.parse(["--call"])
self.assertDictEqual(opts, {"call": True, "help": False, "app": None})
self.assertSequenceEqual(args, [])
def test_app_flag(self):
from tests.fixtureapps import runner as _apps

def test_both(self):
opts, args = self.parse(["--call", "--help"])
self.assertDictEqual(opts, {"call": True, "help": True, "app": None})
self.assertSequenceEqual(args, [])
opts = self.parse(["--app=tests.fixtureapps.runner:app"])
self.assertEqual(opts["app"], _apps.app)

def test_app_flag(self):
opts, args = self.parse(["--app=fred:wilma", "barney:betty"])
self.assertEqual(opts["app"], "fred:wilma")
self.assertSequenceEqual(args, ["barney:betty"])
def test_call(self):
from tests.fixtureapps import runner as _apps

opts = self.parse(["--app=tests.fixtureapps.runner:returns_app", "--call"])
self.assertEqual(opts["app"], _apps.app)

def test_app_arg(self):
opts, args = self.parse(["barney:betty"])
self.assertEqual(opts["app"], "barney:betty")
self.assertSequenceEqual(args, [])
from tests.fixtureapps import runner as _apps

opts = self.parse(["tests.fixtureapps.runner:app"])
self.assertEqual(opts["app"], _apps.app)

def test_excess(self):
from waitress.adjustments import AppResolutionError

self.assertRaises(
AppResolutionError,
self.parse,
["tests.fixtureapps.runner:app", "tests.fixtureapps.runner:app"],
)

def test_positive_boolean(self):
opts, args = self.parse(["--expose-tracebacks"])
opts = self.parse(["--expose-tracebacks", "tests.fixtureapps.runner:app"])
self.assertDictContainsSubset({"expose_tracebacks": "true"}, opts)
self.assertSequenceEqual(args, [])

def test_negative_boolean(self):
opts, args = self.parse(["--no-expose-tracebacks"])
opts = self.parse(["--no-expose-tracebacks", "tests.fixtureapps.runner:app"])
self.assertDictContainsSubset({"expose_tracebacks": "false"}, opts)
self.assertSequenceEqual(args, [])

def test_cast_params(self):
opts, args = self.parse(
["--host=localhost", "--port=80", "--unix-socket-perms=777"]
opts = self.parse(
[
"--host=localhost",
"--port=80",
"--unix-socket-perms=777",
"tests.fixtureapps.runner:app",
]
)
self.assertDictContainsSubset(
{
Expand All @@ -446,28 +455,25 @@ def test_cast_params(self):
},
opts,
)
self.assertSequenceEqual(args, [])

def test_listen_params(self):
opts, args = self.parse(
opts = self.parse(
[
"--listen=test:80",
"tests.fixtureapps.runner:app",
]
)

self.assertDictContainsSubset({"listen": " test:80"}, opts)
self.assertSequenceEqual(args, [])

def test_multiple_listen_params(self):
opts, args = self.parse(
opts = self.parse(
[
"--listen=test:80",
"--listen=test:8080",
"tests.fixtureapps.runner:app",
]
)

self.assertDictContainsSubset({"listen": " test:80 test:8080"}, opts)
self.assertSequenceEqual(args, [])

def test_bad_param(self):
import getopt
Expand Down
9 changes: 5 additions & 4 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ def test_no_app(self):
self.match_output([], 1, "^Error: Specify an application")

def test_multiple_apps_app(self):
self.match_output(["a:a", "b:b"], 1, "^Error: No module named 'a'")
self.match_output(["a:a", "b:b"], 1, "^Error: Provide only one WSGI app")
self.match_output(["--app=a:a", "b:b"], 1, "^Error: Provide only one WSGI app")

def test_bad_apps_app(self):
self.match_output(["a"], 1, "^Error: No module named 'a'")
self.match_output(["a"], 1, "No module named 'a'")

def test_bad_app_module(self):
self.match_output(["nonexistent:a"], 1, "^Error: No module named 'nonexistent'")
self.match_output(["nonexistent:a"], 1, "No module named 'nonexistent'")

def test_cwd_added_to_path(self):
def null_serve(app, **kw):
Expand All @@ -53,7 +54,7 @@ def test_bad_app_object(self):
self.match_output(
["tests.fixtureapps.runner:a"],
1,
"^Error: module 'tests.fixtureapps.runner' has no attribute 'a'",
"module 'tests.fixtureapps.runner' has no attribute 'a'",
)

def test_simple_call(self):
Expand Down

0 comments on commit 5cceabc

Please sign in to comment.