Fix the charset
[fwx] / src / fwx / __init__.py
1 import collections
2 import http.cookies
3 import json
4 import urllib.parse
5
6 _Request = collections.namedtuple(
7     'Request',
8     (
9         'env',
10         'GET',
11         'POST',
12         'accept',
13         'accept_encoding',
14         'accept_language',
15         'content',
16         'content_length',
17         'content_type',
18         'cookie',
19         'method',
20         'path',
21         'parameters',
22         'query',
23         'user_agent',
24     )
25 )
26
27 class Request(_Request):
28     def __new__(cls, method, path, env=None):
29         if env is None:
30             env = {}
31
32         errors = []
33
34         accept = env.get('HTTP_ACCEPT')
35         accept_encoding = env.get('HTTP_ACCEPT_ENCODING')
36         accept_language = env.get('HTTP_ACCEPT_LANGUAGE')
37         content = env.get('CONTENT', '')
38         content_type = env.get('CONTENT_TYPE')
39         query = env.get('QUERY_STRING')
40         user_agent = env.get('HTTP_USER_AGENT')
41
42         content_length = env.get('CONTENT_LENGTH')
43
44         if content_length == '' or content_length is None:
45             content_length = 0
46         else:
47             try:
48                 content_length = int(content_length)
49             except ValueError:
50                 errors.append('Unable to parse Content-Length "{}"'.format(content_length))
51                 content_length = 0
52
53         try:
54             cookie = http.cookies.SimpleCookie(env.get('HTTP_COOKIE'))
55         except:
56             cookie = http.cookies.SimpleCookie()
57
58         try:
59             GET = urllib.parse.parse_qs(query)
60         except:
61             GET = {}
62             errors.append('Unable to parse GET parameters from query string "{}"'.format(query))
63
64         if method == 'POST':
65             try:
66                 if content_type == 'application/x-www-form-urlencoded':
67                     POST = urllib.parse.parse_qs(content)
68                 else:
69                     POST = {}
70                     errors.append('Unable to parse POST parameters from content string "{}"'.format(content))
71
72             except:
73                 POST = {}
74                 errors.append('Unable to parse POST parameters from content string "{}"'.format(content))
75
76         else:
77             POST = {}
78
79         if method == 'GET':
80             parameters = GET
81         elif method == 'POST':
82             parameters = POST
83         else:
84             parameters = None
85
86         result = super().__new__(
87             cls,
88             env=env,
89             GET=GET,
90             POST=POST,
91             accept=accept,
92             accept_encoding=accept_encoding,
93             accept_language=accept_language,
94             content = content,
95             content_length = content_length,
96             content_type = content_type,
97             cookie=cookie,
98             method=method,
99             parameters=parameters,
100             path=path,
101             query=query,
102             user_agent=user_agent,
103         )
104
105         if path.startswith('/'):
106             result.subpath = path[1:]
107         else:
108             result.subpath = path
109
110         return result
111
112 def _get_request_from_env(env):
113     method = env.get('REQUEST_METHOD')
114     path = env.get('PATH_INFO')
115     return Request(method, path, env)
116
117 _Response = collections.namedtuple(
118     'Response',
119     (
120         'status',
121         'content_type',
122         'extra_headers',
123         'content',
124     ),
125 )
126
127 class Response(_Response):
128     def __new__(cls, content, **kwargs):
129         status = kwargs.pop('status', 200)
130         assert isinstance(status, int)
131
132         content_type = kwargs.pop('content_type')
133         assert isinstance(content_type, str)
134
135         extra_headers = kwargs.pop('extra_headers', {})
136         assert isinstance(extra_headers, dict)
137
138         assert len(kwargs) == 0
139
140         return super().__new__(
141             cls,
142             status=status,
143             content_type=content_type,
144             extra_headers=extra_headers,
145             content=content,
146         )
147
148     @property
149     def headers(self):
150         # Start with the defaults
151         result = {
152             'X-Content-Type-Options': 'nosniff',
153         }
154
155         result = {**result, **(self.extra_headers)}
156
157         builtin_headers = {
158             'Content-Type': self.content_type,
159         }
160
161         for key, value in builtin_headers.items():
162             if key in result:
163                 raise Exception('Header "{}" defined twice'.format(key))
164             else:
165                 result[key] = value
166
167         return tuple(sorted(result.items()))
168
169
170 class HTMLResponse(Response):
171     def __new__(cls, content, **kwargs):
172         assert 'content_type' not in kwargs
173
174         return super().__new__(
175             cls,
176             content,
177             content_type='text/html; charset=utf-8',
178             **kwargs,
179         )
180
181 class JSONResponse(Response):
182     def __new__(cls, content_json, **kwargs):
183         assert 'content_type' not in kwargs
184         assert 'content' not in kwargs
185
186         self = super().__new__(
187             cls,
188             content=json.dumps(content_json),
189             content_type='application/json; charset=utf-8',
190             **kwargs,
191         )
192         self.content_json = content_json
193         return self
194
195 class TextResponse(Response):
196     def __new__(cls, content, **kwargs):
197         assert 'content_type' not in kwargs
198
199         return super().__new__(
200             cls,
201             content,
202             content_type='text/plain; charset=utf-8',
203             **kwargs,
204         )
205
206 _RedirectResponse = collections.namedtuple(
207     'RedirectResponse',
208     (
209         'location',
210         'permanent',
211     ),
212 )
213
214 class RedirectResponse(_RedirectResponse):
215     def __new__(cls, location, **kwargs):
216         assert isinstance(location, str)
217
218         permanent = kwargs.pop('permanent', True)
219         assert isinstance(permanent, bool)
220         assert len(kwargs) == 0
221
222         return super().__new__(
223             cls,
224             location=location,
225             permanent=permanent,
226         )
227
228     @property
229     def status(self):
230         return 308 if self.permanent else 307
231
232     @property
233     def headers(self):
234         return (('Location', self.location),)
235
236     @property
237     def content(self):
238         return (b'',)
239
240 def default_file_not_found_handler(request):
241     return TextResponse(
242         'Path "{}" with query "{}" not found'.format(request.path, request.query),
243         status=404,
244     )
245
246 def route_on_subpath(**kwargs):
247     routes = kwargs.pop('routes')
248     file_not_found_handler = kwargs.pop(
249         'file_not_found_handler',
250         default_file_not_found_handler,
251     )
252
253     if routes is None:
254         raise Exception('Keyword argument "routes" is required')
255
256     if len(kwargs) > 0:
257         raise Exception('Unexpected keyword argument')
258
259     def wrapped(request):
260         split_subpath = request.subpath.split('/', 1)
261         subpath = split_subpath[0]
262
263         if len(split_subpath) == 2:
264             request.subpath = split_subpath[1]
265         else:
266             request.subpath = ''
267
268         return routes.get(subpath, file_not_found_handler)(request)
269
270     return wrapped
271
272 REQUEST_METHODS = (
273     'GET',
274     'HEAD',
275     'POST',
276     'PUT',
277     'PATCH',
278     'DELETE',
279     'CONNECT',
280     'OPTIONS',
281     'TRACE',
282 )
283
284 def default_method_not_allowed_handler(request):
285     return TextResponse('', status=405)
286
287 def default_options_handler(handlers):
288     def handler(request):
289         return Response(','.join(handlers.keys()))
290     return handler
291
292 def route_on_method(**kwargs):
293     handlers = {}
294     for method in REQUEST_METHODS:
295         if method in kwargs:
296             handlers[method] = kwargs.pop(method)
297
298     method_not_allowed_handler = kwargs.pop(
299         'method_not_allowed',
300         default_method_not_allowed_handler,
301     )
302
303     assert len(kwargs) == 0
304
305     if 'OPTIONS' not in handlers:
306         handlers['OPTIONS'] = default_options_handler(handlers)
307
308     def handler(request):
309         return handlers.get(
310             request.method.upper(),
311             method_not_allowed_handler,
312         )(request)
313
314     return handler
315
316 def _get_status(response):
317     return {
318         100: '100 Continue',
319         101: '101 Switching Protocols',
320         200: '200 OK',
321         201: '201 Created',
322         202: '202 Accepted',
323         203: '203 Non-Authoritative Information',
324         204: '204 No Content',
325         205: '205 Reset Content',
326         300: '300 Multiple Choices',
327         301: '301 Moved Permanently',
328         304: '304 Not Modified',
329         307: '307 Temporary Redirect',
330         308: '308 Permanent Redirect',
331         400: '400 Bad Request',
332         401: '401 Unauthorized',
333         402: '402 Payment Required',
334         403: '403 Forbidden',
335         404: '404 Not Found',
336         405: '405 Method Not Allowed',
337         406: '406 Not Acceptable',
338         409: '409 Conflict',
339         410: '410 Gone',
340         411: '411 Length Required',
341         412: '412 Precondition Failed',
342         413: '413 Payload Too Large',
343         414: '414 URI Too Long',
344         415: '415 Unsupported Media Type',
345         416: '416 Range Not Satisfiable',
346         417: '417 Expectation Failed',
347         418: "418 I'm a teapot",
348         429: '429 Too Many Requests',
349         431: '431 Request Header Fields Too Large',
350         451: '451 Unavailable For Legal Reasons',
351         500: '500 Internal Server Error',
352         501: '501 Not Implemented',
353         503: '503 Service Unavailable',
354     }[response.status]
355
356 def _get_headers(response):
357     return list(response.headers)
358
359 def _get_content(response):
360     content = response.content
361
362     if isinstance(content, bytes):
363         return (content,)
364
365     if isinstance(content, str):
366         return (content.encode('utf-8'),)
367
368     return content
369
370 def App(handler):
371     def app(env, start_fn):
372         response = handler(_get_request_from_env(env))
373
374         start_fn(_get_status(response), _get_headers(response))
375         return _get_content(response)
376     return app