e0abb7523994c58847b1ee6501fcbb6299aacae4
[fwx] / src / fwx / __init__.py
1 import collections
2 import http.cookies
3 import json
4 import urllib.parse
5
6 _Request = collections.namedtuple(
7     'Request',
8     (
9         'env',
10         'GET',
11         'POST',
12         'accept',
13         'accept_encoding',
14         'accept_language',
15         'content',
16         'content_length',
17         'content_type',
18         'cookie',
19         'method',
20         'path',
21         'parameters',
22         'query',
23         'user_agent',
24     )
25 )
26
27 class Request(_Request):
28     def __new__(cls, method, path, env=None):
29         if env is None:
30             env = {}
31
32         errors = []
33
34         accept = env.get('HTTP_ACCEPT')
35         accept_encoding = env.get('HTTP_ACCEPT_ENCODING')
36         accept_language = env.get('HTTP_ACCEPT_LANGUAGE')
37         content = env.get('CONTENT', '')
38         content_type = env.get('CONTENT_TYPE')
39         query = env.get('QUERY_STRING')
40         user_agent = env.get('HTTP_USER_AGENT')
41
42         content_length = env.get('CONTENT_LENGTH')
43
44         if content_length == '' or content_length is None:
45             content_length = 0
46         else:
47             try:
48                 content_length = int(content_length)
49             except ValueError:
50                 errors.append('Unable to parse Content-Length "{}"'.format(content_length))
51                 content_length = 0
52
53         try:
54             cookie = http.cookies.SimpleCookie(env.get('HTTP_COOKIE'))
55         except:
56             cookie = http.cookies.SimpleCookie()
57
58         try:
59             GET = urllib.parse.parse_qs(query)
60         except:
61             GET = {}
62             errors.append('Unable to parse GET parameters from query string "{}"'.format(query))
63
64         if method == 'POST':
65             try:
66                 if content_type == 'application/x-www-form-urlencoded':
67                     POST = urllib.parse.parse_qs(content)
68                 else:
69                     POST = {}
70                     errors.append('Unable to parse POST parameters from content string "{}"'.format(content))
71
72             except:
73                 POST = {}
74                 errors.append('Unable to parse POST parameters from content string "{}"'.format(content))
75
76         else:
77             POST = {}
78
79         if method == 'GET':
80             parameters = GET
81         elif method == 'POST':
82             parameters = POST
83         else:
84             parameters = None
85
86         result = super().__new__(
87             cls,
88             env=env,
89             GET=GET,
90             POST=POST,
91             accept=accept,
92             accept_encoding=accept_encoding,
93             accept_language=accept_language,
94             content = content,
95             content_length = content_length,
96             content_type = content_type,
97             cookie=cookie,
98             method=method,
99             parameters=parameters,
100             path=path,
101             query=query,
102             user_agent=user_agent,
103         )
104
105         if path.startswith('/'):
106             result.subpath = path[1:]
107         else:
108             result.subpath = path
109
110         return result
111
112 def _get_request_from_env(env):
113     method = env.get('REQUEST_METHOD')
114     path = env.get('PATH_INFO')
115     return Request(method, path, env)
116
117 _Response = collections.namedtuple(
118     'Response',
119     (
120         'status',
121         'content_type',
122         'extra_headers',
123         'content',
124     ),
125 )
126
127 class Response(_Response):
128     def __new__(cls, content, **kwargs):
129         status = kwargs.pop('status', 200)
130         assert isinstance(status, int)
131
132         content_type = kwargs.pop('content_type')
133         assert isinstance(content_type, str)
134
135         extra_headers = kwargs.pop('extra_headers', ())
136         assert isinstance(extra_headers, tuple)
137
138         assert len(kwargs) == 0
139
140         return super().__new__(
141             cls,
142             status=status,
143             content_type=content_type,
144             extra_headers=extra_headers,
145             content=content,
146         )
147
148     @property
149     def headers(self):
150         return (
151             ('Content-Type', self.content_type),
152         )
153
154 class HTMLResponse(Response):
155     def __new__(cls, content, **kwargs):
156         assert 'content_type' not in kwargs
157
158         return super().__new__(
159             cls,
160             content,
161             content_type='text/html',
162             **kwargs,
163         )
164
165 class JSONResponse(Response):
166     def __new__(cls, content_json, **kwargs):
167         assert 'content_type' not in kwargs
168         assert 'content' not in kwargs
169
170         self = super().__new__(
171             cls,
172             content=json.dumps(content_json),
173             content_type='application/json',
174             **kwargs,
175         )
176         self.content_json = content_json
177         return self
178
179 class TextResponse(Response):
180     def __new__(cls, content, **kwargs):
181         assert 'content_type' not in kwargs
182
183         return super().__new__(
184             cls,
185             content,
186             content_type='text/plain',
187             **kwargs,
188         )
189
190 _RedirectResponse = collections.namedtuple(
191     'RedirectResponse',
192     (
193         'location',
194         'permanent',
195     ),
196 )
197
198 class RedirectResponse(_RedirectResponse):
199     def __new__(cls, location, **kwargs):
200         assert isinstance(location, str)
201
202         permanent = kwargs.pop('permanent', True)
203         assert isinstance(permanent, bool)
204         assert len(kwargs) == 0
205
206         return super().__new__(
207             cls,
208             location=location,
209             permanent=permanent,
210         )
211
212     @property
213     def status(self):
214         return 308 if self.permanent else 307
215
216     @property
217     def headers(self):
218         return (('Location', self.location),)
219
220     @property
221     def content(self):
222         return (b'',)
223
224 def default_file_not_found_handler(request):
225     return TextResponse(
226         'Path "{}" with query "{}" not found'.format(request.path, request.query),
227         status=404,
228     )
229
230 def route_on_subpath(**kwargs):
231     routes = kwargs.pop('routes')
232     file_not_found_handler = kwargs.pop(
233         'file_not_found_handler',
234         default_file_not_found_handler,
235     )
236
237     if routes is None:
238         raise Exception('Keyword argument "routes" is required')
239
240     if len(kwargs) > 0:
241         raise Exception('Unexpected keyword argument')
242
243     def wrapped(request):
244         split_subpath = request.subpath.split('/', 1)
245         subpath = split_subpath[0]
246
247         if len(split_subpath) == 2:
248             request.subpath = split_subpath[1]
249         else:
250             request.subpath = ''
251
252         return routes.get(subpath, file_not_found_handler)(request)
253
254     return wrapped
255
256 REQUEST_METHODS = (
257     'GET',
258     'HEAD',
259     'POST',
260     'PUT',
261     'PATCH',
262     'DELETE',
263     'CONNECT',
264     'OPTIONS',
265     'TRACE',
266 )
267
268 def default_method_not_allowed_handler(request):
269     return Response('')
270
271 def default_options_handler(handlers):
272     def handler(request):
273         return Response(','.join(handlers.keys()))
274     return handler
275
276 def route_on_method(**kwargs):
277     handlers = {}
278     for method in REQUEST_METHODS:
279         if method in kwargs:
280             handlers[method] = kwargs.pop(method)
281
282     method_not_allowed_handler = kwargs.pop(
283         'method_not_allowed',
284         default_method_not_allowed_handler,
285     )
286
287     assert len(kwargs) == 0
288
289     if 'OPTIONS' not in handlers:
290         handlers['OPTIONS'] = default_options_handler(handlers)
291
292     def handler(request):
293         return handlers.get(
294             request.method.upper(),
295             method_not_allowed_handler,
296         )(request)
297
298     return handler
299
300 def _get_status(response):
301     return {
302         200: '200 OK',
303         307: '307 Temporary Redirect',
304         308: '308 Permanent Redirect',
305     }[response.status]
306
307 def _get_headers(response):
308     return list(response.headers)
309
310 def _get_content(response):
311     content = response.content
312
313     if isinstance(content, bytes):
314         return (content,)
315
316     if isinstance(content, str):
317         return (content.encode('utf-8'),)
318
319     return content
320
321 def App(handler):
322     def app(env, start_fn):
323         response = handler(_get_request_from_env(env))
324
325         start_fn(_get_status(response), _get_headers(response))
326         return _get_content(response)
327     return app