5 _Request = collections.namedtuple(
25 class Request(_Request):
26 def __new__(cls, env):
27 accept = env.get('HTTP_ACCEPT')
28 accept_encoding = env.get('HTTP_ACCEPT_ENCODING')
29 accept_language = env.get('HTTP_ACCEPT_LANGUAGE')
30 content = env.get('CONTENT', '')
31 content_length = env.get('CONTENT_LENGTH')
32 content_type = env.get('CONTENT_TYPE')
33 cookies = env.get('HTTP_COOKIE')
34 method = env.get('REQUEST_METHOD')
35 path = env.get('PATH_INFO')
36 query = env.get('QUERY_STRING')
37 user_agent = env.get('HTTP_USER_AGENT')
39 GET = urllib.parse.parse_qs(query)
44 result = super().__new__(
49 accept_encoding=accept_encoding,
50 accept_language=accept_language,
52 content_length = content_length,
53 content_type = content_type,
56 parameters=parameters,
59 user_agent=user_agent,
65 _Response = collections.namedtuple(
75 class Response(_Response):
76 def __new__(cls, content, **kwargs):
77 status = kwargs.pop('status', 200)
78 assert isinstance(status, int)
80 content_type = kwargs.pop('content_type')
81 assert isinstance(content_type, str)
83 extra_headers = kwargs.pop('extra_headers', ())
84 assert isinstance(extra_headers, tuple)
86 assert len(kwargs) == 0
88 return super().__new__(
91 content_type=content_type,
92 extra_headers=extra_headers,
99 ('Content-Type', self.content_type),
102 class HTMLResponse(Response):
103 def __new__(cls, content, **kwargs):
104 assert 'content_type' not in kwargs
106 return super().__new__(
109 content_type='text/html',
113 class JSONResponse(Response):
114 def __new__(cls, content_json, **kwargs):
115 assert 'content_type' not in kwargs
116 assert 'content' not in kwargs
118 self = super().__new__(
120 content=json.dumps(content_json),
121 content_type='application/json',
124 self.content_json = content_json
127 class TextResponse(Response):
128 def __new__(cls, content, **kwargs):
129 assert 'content_type' not in kwargs
131 return super().__new__(
134 content_type='text/plain',
138 _RedirectResponse = collections.namedtuple(
146 class RedirectResponse(_RedirectResponse):
147 def __new__(cls, location, **kwargs):
148 assert isinstance(location, str)
150 permanent = kwargs.pop('permanent', True)
151 assert isinstance(permanent, bool)
152 assert len(kwargs) == 0
154 return super().__new__(
162 return 308 if self.permanent else 307
166 return (('Location', self.location),)
172 def _get_status(response):
175 307: '307 Temporary Redirect',
176 308: '308 Permanent Redirect',
179 def _get_headers(response):
180 return list(response.headers)
182 def _get_content(response):
183 content = response.content
185 if isinstance(content, bytes):
188 if isinstance(content, str):
189 return (content.encode('utf-8'),)
194 def app(env, start_fn):
195 response = handler(Request(env))
197 start_fn(_get_status(response), _get_headers(response))
198 return _get_content(response)