c7d644200ef7dc2ad9b528e8b78215a56ea9ff71
[fwx] / src / fwx / __init__.py
1 import collections
2 import http.cookies
3 import json
4 import urllib.parse
5
6 _Request = collections.namedtuple(
7     'Request',
8     (
9         'env',
10         'GET',
11         'accept',
12         'accept_encoding',
13         'accept_language',
14         'content',
15         'content_length',
16         'content_type',
17         'cookie',
18         'method',
19         'path',
20         'parameters',
21         'query',
22         'user_agent',
23     )
24 )
25
26 class Request(_Request):
27     def __new__(cls, env, **kwargs):
28         errors = []
29
30         accept = env.get('HTTP_ACCEPT')
31         accept_encoding = env.get('HTTP_ACCEPT_ENCODING')
32         accept_language = env.get('HTTP_ACCEPT_LANGUAGE')
33         content = env.get('CONTENT', '')
34         content_type = env.get('CONTENT_TYPE')
35         method = env.get('REQUEST_METHOD')
36         path = env.get('PATH_INFO')
37         query = env.get('QUERY_STRING')
38         user_agent = env.get('HTTP_USER_AGENT')
39
40         content_length = env.get('CONTENT_LENGTH')
41
42         if content_length == '' or content_length is None:
43             content_length = 0
44         else:
45             try:
46                 content_length = int(content_length)
47             except ValueError:
48                 errors.append('Unable to parse Content-Length "{}"'.format(content_length))
49                 content_length = 0
50
51         try:
52             cookie = http.cookies.SimpleCookie(env.get('HTTP_COOKIE'))
53         except:
54             cookie = http.cookies.SimpleCookie()
55
56
57         try:
58             GET = urllib.parse.parse_qs(query)
59         except:
60             GET = {}
61             errors.append('Unable to parse GET parameters from query string "{}"'.format(query))
62
63         if method == 'GET':
64             parameters = GET
65         elif method == 'POST':
66             raise Exception('not yet implemented')
67         else:
68             parameters = None
69
70         result = super().__new__(
71             cls,
72             env=env,
73             GET=GET,
74             accept=accept,
75             accept_encoding=accept_encoding,
76             accept_language=accept_language,
77             content = content,
78             content_length = content_length,
79             content_type = content_type,
80             cookie=cookie,
81             method=method,
82             parameters=parameters,
83             path=path,
84             query=query,
85             user_agent=user_agent,
86         )
87
88         if path.startswith('/'):
89             result.subpath = path[1:]
90         else:
91             result.subpath = path
92
93         return result
94
95 def _get_request_from_env(env):
96     return Request(env)
97
98 _Response = collections.namedtuple(
99     'Response',
100     (
101         'status',
102         'content_type',
103         'extra_headers',
104         'content',
105     ),
106 )
107
108 class Response(_Response):
109     def __new__(cls, content, **kwargs):
110         status = kwargs.pop('status', 200)
111         assert isinstance(status, int)
112
113         content_type = kwargs.pop('content_type')
114         assert isinstance(content_type, str)
115
116         extra_headers = kwargs.pop('extra_headers', ())
117         assert isinstance(extra_headers, tuple)
118
119         assert len(kwargs) == 0
120
121         return super().__new__(
122             cls,
123             status=status,
124             content_type=content_type,
125             extra_headers=extra_headers,
126             content=content,
127         )
128
129     @property
130     def headers(self):
131         return (
132             ('Content-Type', self.content_type),
133         )
134
135 class HTMLResponse(Response):
136     def __new__(cls, content, **kwargs):
137         assert 'content_type' not in kwargs
138
139         return super().__new__(
140             cls,
141             content,
142             content_type='text/html',
143             **kwargs,
144         )
145
146 class JSONResponse(Response):
147     def __new__(cls, content_json, **kwargs):
148         assert 'content_type' not in kwargs
149         assert 'content' not in kwargs
150
151         self = super().__new__(
152             cls,
153             content=json.dumps(content_json),
154             content_type='application/json',
155             **kwargs,
156         )
157         self.content_json = content_json
158         return self
159
160 class TextResponse(Response):
161     def __new__(cls, content, **kwargs):
162         assert 'content_type' not in kwargs
163
164         return super().__new__(
165             cls,
166             content,
167             content_type='text/plain',
168             **kwargs,
169         )
170
171 _RedirectResponse = collections.namedtuple(
172     'RedirectResponse',
173     (
174         'location',
175         'permanent',
176     ),
177 )
178
179 class RedirectResponse(_RedirectResponse):
180     def __new__(cls, location, **kwargs):
181         assert isinstance(location, str)
182
183         permanent = kwargs.pop('permanent', True)
184         assert isinstance(permanent, bool)
185         assert len(kwargs) == 0
186
187         return super().__new__(
188             cls,
189             location=location,
190             permanent=permanent,
191         )
192
193     @property
194     def status(self):
195         return 308 if self.permanent else 307
196
197     @property
198     def headers(self):
199         return (('Location', self.location),)
200
201     @property
202     def content(self):
203         return (b'',)
204
205 def default_file_not_found_handler(request):
206     return Response('', status=404)
207
208 def route_on_subpath(**kwargs):
209     routes = kwargs.pop('routes')
210     file_not_found_handler = kwargs.pop(
211         'file_not_found_hanlder',
212         default_file_not_found_handler,
213     )
214
215     if routes is None:
216         raise Exception('Keyword argument "routes" is required')
217
218     if len(kwargs) > 0:
219         raise Exception('Unexpected keyword argument')
220
221     def wrapped(request):
222         split_subpath = request.subpath.split('/', 1)
223         subpath = split_subpath[0]
224
225         if len(split_subpath) == 2:
226             request.subpath = split_subpath[1]
227         else:
228             request.subpath = ''
229
230         return routes.get(subpath, file_not_found_handler)(request)
231
232     return wrapped
233
234 REQUEST_METHODS = (
235     'GET',
236     'HEAD',
237     'POST',
238     'PUT',
239     'PATCH',
240     'DELETE',
241     'CONNECT',
242     'OPTIONS',
243     'TRACE',
244 )
245
246 def default_method_not_allowed_handler(request):
247     return Response('')
248
249 def default_options_handler(handlers):
250     def handler(request):
251         return Response(','.join(handlers.keys()))
252     return handler
253
254 def route_on_method(**kwargs):
255     handlers = {}
256     for method in REQUEST_METHODS:
257         if method in kwargs:
258             handlers[method] = kwargs.pop(method)
259
260     method_not_allowed_handler = kwargs.pop(
261         'method_not_allowed',
262         default_method_not_allowed_handler,
263     )
264
265     assert len(kwargs) == 0
266
267     if 'OPTIONS' not in handlers:
268         handlers['OPTIONS'] = default_options_handler(handlers)
269
270     def handler(request):
271         return handlers.get(
272             request.method.upper(),
273             method_not_allowed_handler,
274         )(request)
275
276     return handler
277
278 def _get_status(response):
279     return {
280         200: '200 OK',
281         307: '307 Temporary Redirect',
282         308: '308 Permanent Redirect',
283     }[response.status]
284
285 def _get_headers(response):
286     return list(response.headers)
287
288 def _get_content(response):
289     content = response.content
290
291     if isinstance(content, bytes):
292         return (content,)
293
294     if isinstance(content, str):
295         return (content.encode('utf-8'),)
296
297     return content
298
299 def App(handler):
300     def app(env, start_fn):
301         response = handler(_get_request_from_env(env))
302
303         start_fn(_get_status(response), _get_headers(response))
304         return _get_content(response)
305     return app