Change the name of the project from phial to fwx, to avoid pypi conflict
[fwx] / fwx.py
diff --git a/fwx.py b/fwx.py
new file mode 100644 (file)
index 0000000..411c077
--- /dev/null
+++ b/fwx.py
@@ -0,0 +1,265 @@
+import collections
+import http.cookies
+import json
+import urllib.parse
+
+_Request = collections.namedtuple(
+    'Request',
+    (
+        'env',
+        'GET',
+        'accept',
+        'accept_encoding',
+        'accept_language',
+        'content',
+        'content_length',
+        'content_type',
+        'cookie',
+        'method',
+        'path',
+        'parameters',
+        'query',
+        'user_agent',
+    )
+)
+
+class Request(_Request):
+    def __new__(cls, env):
+        errors = []
+
+        accept = env.get('HTTP_ACCEPT')
+        accept_encoding = env.get('HTTP_ACCEPT_ENCODING')
+        accept_language = env.get('HTTP_ACCEPT_LANGUAGE')
+        content = env.get('CONTENT', '')
+        content_type = env.get('CONTENT_TYPE')
+        method = env.get('REQUEST_METHOD')
+        path = env.get('PATH_INFO')
+        query = env.get('QUERY_STRING')
+        user_agent = env.get('HTTP_USER_AGENT')
+
+        content_length = env.get('CONTENT_LENGTH')
+
+        if content_length == '' or content_length is None:
+            content_length = 0
+        else:
+            try:
+                content_length = int(content_length)
+            except ValueError:
+                errors.append('Unable to parse Content-Length "{}"'.format(content_length))
+                content_length = 0
+
+        try:
+            cookie = http.cookies.SimpleCookie(env.get('HTTP_COOKIE'))
+        except:
+            cookie = http.cookies.SimpleCookie()
+
+
+        try:
+            GET = urllib.parse.parse_qs(query)
+        except:
+            GET = {}
+            errors.append('Unable to parse GET parameters from query string "{}"'.format(query))
+
+        if method == 'GET':
+            parameters = GET
+
+        result = super().__new__(
+            cls,
+            env=env,
+            GET=GET,
+            accept=accept,
+            accept_encoding=accept_encoding,
+            accept_language=accept_language,
+            content = content,
+            content_length = content_length,
+            content_type = content_type,
+            cookie=cookie,
+            method=method,
+            parameters=parameters,
+            path=path,
+            query=query,
+            user_agent=user_agent,
+        )
+
+        result.subpath = path
+        return result
+
+_Response = collections.namedtuple(
+    'Response',
+    (
+        'status',
+        'content_type',
+        'extra_headers',
+        'content',
+    ),
+)
+
+class Response(_Response):
+    def __new__(cls, content, **kwargs):
+        status = kwargs.pop('status', 200)
+        assert isinstance(status, int)
+
+        content_type = kwargs.pop('content_type')
+        assert isinstance(content_type, str)
+
+        extra_headers = kwargs.pop('extra_headers', ())
+        assert isinstance(extra_headers, tuple)
+
+        assert len(kwargs) == 0
+
+        return super().__new__(
+            cls,
+            status=status,
+            content_type=content_type,
+            extra_headers=extra_headers,
+            content=content,
+        )
+
+    @property
+    def headers(self):
+        return (
+            ('Content-Type', self.content_type),
+        )
+
+class HTMLResponse(Response):
+    def __new__(cls, content, **kwargs):
+        assert 'content_type' not in kwargs
+
+        return super().__new__(
+            cls,
+            content,
+            content_type='text/html',
+            **kwargs,
+        )
+
+class JSONResponse(Response):
+    def __new__(cls, content_json, **kwargs):
+        assert 'content_type' not in kwargs
+        assert 'content' not in kwargs
+
+        self = super().__new__(
+            cls,
+            content=json.dumps(content_json),
+            content_type='application/json',
+            **kwargs,
+        )
+        self.content_json = content_json
+        return self
+
+class TextResponse(Response):
+    def __new__(cls, content, **kwargs):
+        assert 'content_type' not in kwargs
+
+        return super().__new__(
+            cls,
+            content,
+            content_type='text/plain',
+            **kwargs,
+        )
+
+_RedirectResponse = collections.namedtuple(
+    'RedirectResponse',
+    (
+        'location',
+        'permanent',
+    ),
+)
+
+class RedirectResponse(_RedirectResponse):
+    def __new__(cls, location, **kwargs):
+        assert isinstance(location, str)
+
+        permanent = kwargs.pop('permanent', True)
+        assert isinstance(permanent, bool)
+        assert len(kwargs) == 0
+
+        return super().__new__(
+            cls,
+            location=location,
+            permanent=permanent,
+        )
+
+    @property
+    def status(self):
+        return 308 if self.permanent else 307
+
+    @property
+    def headers(self):
+        return (('Location', self.location),)
+
+    @property
+    def content(self):
+        return (b'',)
+
+REQUEST_METHODS = (
+    'GET',
+    'HEAD',
+    'POST',
+    'PUT',
+    'PATCH',
+    'DELETE',
+    'CONNECT',
+    'OPTIONS',
+    'TRACE',
+)
+
+def default_method_not_allowed_handler(request):
+    return Response('')
+
+def default_options_handler(handlers):
+    def handler(request):
+        return Response(','.join(handlers.keys()))
+    return handler
+
+def route_on_method(**kwargs):
+    handlers = {}
+    for method in REQUEST_METHODS:
+        if method in kwargs:
+            handlers[method] = kwargs.pop(method)
+
+    method_not_allowed_handler = kwargs.pop(
+        'method_not_allowed',
+        default_method_not_allowed_handler,
+    )
+
+    assert len(kwargs) == 0
+
+    if 'OPTIONS' not in handlers:
+        handlers['OPTIONS'] = default_options_handler(handlers)
+
+    def handler(request):
+        return handlers.get(
+            request.method.upper(),
+            method_not_allowed_handler,
+        )(request)
+
+    return handler
+
+def _get_status(response):
+    return {
+        200: '200 OK',
+        307: '307 Temporary Redirect',
+        308: '308 Permanent Redirect',
+    }[response.status]
+
+def _get_headers(response):
+    return list(response.headers)
+
+def _get_content(response):
+    content = response.content
+
+    if isinstance(content, bytes):
+        return (content,)
+
+    if isinstance(content, str):
+        return (content.encode('utf-8'),)
+
+    return content
+
+def App(handler):
+    def app(env, start_fn):
+        response = handler(Request(env))
+
+        start_fn(_get_status(response), _get_headers(response))
+        return _get_content(response)
+    return app