1# nghttp2 - HTTP/2 C Library
2
3# Copyright (c) 2013 Tatsuhiro Tsujikawa
4
5# Permission is hereby granted, free of charge, to any person obtaining
6# a copy of this software and associated documentation files (the
7# "Software"), to deal in the Software without restriction, including
8# without limitation the rights to use, copy, modify, merge, publish,
9# distribute, sublicense, and/or sell copies of the Software, and to
10# permit persons to whom the Software is furnished to do so, subject to
11# the following conditions:
12
13# The above copyright notice and this permission notice shall be
14# included in all copies or substantial portions of the Software.
15
16# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23cimport cnghttp2
24
25from libc.stdlib cimport malloc, free
26from libc.string cimport memcpy, memset
27from libc.stdint cimport uint8_t, uint16_t, uint32_t, int32_t
28import logging
29
30
31DEFAULT_HEADER_TABLE_SIZE = cnghttp2.NGHTTP2_DEFAULT_HEADER_TABLE_SIZE
32DEFLATE_MAX_HEADER_TABLE_SIZE = 4096
33
34HD_ENTRY_OVERHEAD = 32
35
36class HDTableEntry:
37
38    def __init__(self, name, namelen, value, valuelen):
39        self.name = name
40        self.namelen = namelen
41        self.value = value
42        self.valuelen = valuelen
43
44    def space(self):
45        return self.namelen + self.valuelen + HD_ENTRY_OVERHEAD
46
47cdef _get_pybytes(uint8_t *b, uint16_t blen):
48    return b[:blen]
49
50cdef class HDDeflater:
51    '''Performs header compression. The constructor takes
52    |hd_table_bufsize_max| parameter, which limits the usage of header
53    table in the given amount of bytes. This is necessary because the
54    header compressor and decompressor share the same amount of
55    header table and the decompressor decides that number. The
56    compressor may not want to use all header table size because of
57    limited memory availability. In that case, the
58    |hd_table_bufsize_max| can be used to cap the upper limit of table
59    size whatever the header table size is chosen by the decompressor.
60    The default value of |hd_table_bufsize_max| is 4096 bytes.
61
62    The following example shows how to compress request header sets:
63
64        import binascii, nghttp2
65
66        deflater = nghttp2.HDDeflater()
67        res = deflater.deflate([(b'foo', b'bar'),
68                              (b'baz', b'buz')])
69        print(binascii.b2a_hex(res))
70
71    '''
72
73    cdef cnghttp2.nghttp2_hd_deflater *_deflater
74
75    def __cinit__(self, hd_table_bufsize_max = DEFLATE_MAX_HEADER_TABLE_SIZE):
76        rv = cnghttp2.nghttp2_hd_deflate_new(&self._deflater,
77                                             hd_table_bufsize_max)
78        if rv != 0:
79            raise Exception(_strerror(rv))
80
81    def __dealloc__(self):
82        cnghttp2.nghttp2_hd_deflate_del(self._deflater)
83
84    def deflate(self, headers):
85        '''Compresses the |headers|. The |headers| must be sequence of tuple
86        of name/value pair, which are sequence of bytes (not unicode
87        string).
88
89        This function returns the encoded header block in byte string.
90        An exception will be raised on error.
91
92        '''
93        cdef cnghttp2.nghttp2_nv *nva = <cnghttp2.nghttp2_nv*>\
94                                        malloc(sizeof(cnghttp2.nghttp2_nv)*\
95                                        len(headers))
96        cdef cnghttp2.nghttp2_nv *nvap = nva
97
98        for k, v in headers:
99            nvap[0].name = k
100            nvap[0].namelen = len(k)
101            nvap[0].value = v
102            nvap[0].valuelen = len(v)
103            nvap[0].flags = cnghttp2.NGHTTP2_NV_FLAG_NONE
104            nvap += 1
105
106        cdef size_t outcap = 0
107        cdef ssize_t rv
108        cdef uint8_t *out
109        cdef size_t outlen
110
111        outlen = cnghttp2.nghttp2_hd_deflate_bound(self._deflater,
112                                                   nva, len(headers))
113
114        out = <uint8_t*>malloc(outlen)
115
116        rv = cnghttp2.nghttp2_hd_deflate_hd(self._deflater, out, outlen,
117                                            nva, len(headers))
118        free(nva)
119
120        if rv < 0:
121            free(out)
122
123            raise Exception(_strerror(rv))
124
125        cdef bytes res
126
127        try:
128            res = out[:rv]
129        finally:
130            free(out)
131
132        return res
133
134    def change_table_size(self, hd_table_bufsize_max):
135        '''Changes header table size to |hd_table_bufsize_max| byte.
136
137        An exception will be raised on error.
138
139        '''
140        cdef int rv
141        rv = cnghttp2.nghttp2_hd_deflate_change_table_size(self._deflater,
142                                                           hd_table_bufsize_max)
143        if rv != 0:
144            raise Exception(_strerror(rv))
145
146    def get_hd_table(self):
147        '''Returns copy of current dynamic header table.'''
148        cdef size_t length = cnghttp2.nghttp2_hd_deflate_get_num_table_entries(
149            self._deflater)
150        cdef const cnghttp2.nghttp2_nv *nv
151        res = []
152        for i in range(62, length + 1):
153            nv = cnghttp2.nghttp2_hd_deflate_get_table_entry(self._deflater, i)
154            k = _get_pybytes(nv.name, nv.namelen)
155            v = _get_pybytes(nv.value, nv.valuelen)
156            res.append(HDTableEntry(k, nv.namelen, v, nv.valuelen))
157        return res
158
159cdef class HDInflater:
160    '''Performs header decompression.
161
162    The following example shows how to compress request header sets:
163
164        data = b'0082c5ad82bd0f000362617a0362757a'
165        inflater = nghttp2.HDInflater()
166        hdrs = inflater.inflate(data)
167        print(hdrs)
168
169    '''
170
171    cdef cnghttp2.nghttp2_hd_inflater *_inflater
172
173    def __cinit__(self):
174        rv = cnghttp2.nghttp2_hd_inflate_new(&self._inflater)
175        if rv != 0:
176            raise Exception(_strerror(rv))
177
178    def __dealloc__(self):
179        cnghttp2.nghttp2_hd_inflate_del(self._inflater)
180
181    def inflate(self, data):
182        '''Decompresses the compressed header block |data|. The |data| must be
183        byte string (not unicode string).
184
185        '''
186        cdef cnghttp2.nghttp2_nv nv
187        cdef int inflate_flags
188        cdef ssize_t rv
189        cdef uint8_t *buf = data
190        cdef size_t buflen = len(data)
191        res = []
192        while True:
193            inflate_flags = 0
194            rv = cnghttp2.nghttp2_hd_inflate_hd2(self._inflater, &nv,
195                                                 &inflate_flags,
196                                                 buf, buflen, 1)
197            if rv < 0:
198                raise Exception(_strerror(rv))
199            buf += rv
200            buflen -= rv
201            if inflate_flags & cnghttp2.NGHTTP2_HD_INFLATE_EMIT:
202                # may throw
203                res.append((nv.name[:nv.namelen], nv.value[:nv.valuelen]))
204            if inflate_flags & cnghttp2.NGHTTP2_HD_INFLATE_FINAL:
205                break
206
207        cnghttp2.nghttp2_hd_inflate_end_headers(self._inflater)
208        return res
209
210    def change_table_size(self, hd_table_bufsize_max):
211        '''Changes header table size to |hd_table_bufsize_max| byte.
212
213        An exception will be raised on error.
214
215        '''
216        cdef int rv
217        rv = cnghttp2.nghttp2_hd_inflate_change_table_size(self._inflater,
218                                                           hd_table_bufsize_max)
219        if rv != 0:
220            raise Exception(_strerror(rv))
221
222    def get_hd_table(self):
223        '''Returns copy of current dynamic header table.'''
224        cdef size_t length = cnghttp2.nghttp2_hd_inflate_get_num_table_entries(
225            self._inflater)
226        cdef const cnghttp2.nghttp2_nv *nv
227        res = []
228        for i in range(62, length + 1):
229            nv = cnghttp2.nghttp2_hd_inflate_get_table_entry(self._inflater, i)
230            k = _get_pybytes(nv.name, nv.namelen)
231            v = _get_pybytes(nv.value, nv.valuelen)
232            res.append(HDTableEntry(k, nv.namelen, v, nv.valuelen))
233        return res
234
235cdef _strerror(int liberror_code):
236    return cnghttp2.nghttp2_strerror(liberror_code).decode('utf-8')
237
238def print_hd_table(hdtable):
239    '''Convenient function to print |hdtable| to the standard output. This
240    function does not work if header name/value cannot be decoded using
241    UTF-8 encoding.
242
243    s=N means the entry occupies N bytes in header table.
244
245    '''
246    idx = 0
247    for entry in hdtable:
248        idx += 1
249        print('[{}] (s={}) {}: {}'\
250              .format(idx, entry.space(),
251                      entry.name.decode('utf-8'),
252                      entry.value.decode('utf-8')))
253
254try:
255    import socket
256    import io
257    import asyncio
258    import traceback
259    import sys
260    import email.utils
261    import datetime
262    import time
263    import ssl as tls
264    from urllib.parse import urlparse
265except ImportError:
266    asyncio = None
267
268# body generator flags
269DATA_OK = 0
270DATA_EOF = 1
271DATA_DEFERRED = 2
272
273class _ByteIOWrapper:
274
275    def __init__(self, b):
276        self.b = b
277
278    def generate(self, n):
279        data = self.b.read1(n)
280        if not data:
281            return None, DATA_EOF
282        return data, DATA_OK
283
284def wrap_body(body):
285    if body is None:
286        return body
287    elif isinstance(body, str):
288        return _ByteIOWrapper(io.BytesIO(body.encode('utf-8'))).generate
289    elif isinstance(body, bytes):
290        return _ByteIOWrapper(io.BytesIO(body)).generate
291    elif isinstance(body, io.IOBase):
292        return _ByteIOWrapper(body).generate
293    else:
294        # assume that callable in the form f(n) returning tuple byte
295        # string and flag.
296        return body
297
298def negotiated_protocol(ssl_obj):
299    protocol = ssl_obj.selected_alpn_protocol()
300    if protocol:
301        logging.info('alpn, protocol:%s', protocol)
302        return protocol
303
304    protocol = ssl_obj.selected_npn_protocol()
305    if protocol:
306        logging.info('npn, protocol:%s', protocol)
307        return protocol
308
309    return None
310
311def set_application_protocol(ssl_ctx):
312    app_protos = [cnghttp2.NGHTTP2_PROTO_VERSION_ID.decode('utf-8')]
313    ssl_ctx.set_npn_protocols(app_protos)
314    if tls.HAS_ALPN:
315        ssl_ctx.set_alpn_protocols(app_protos)
316
317cdef _get_stream_user_data(cnghttp2.nghttp2_session *session,
318                           int32_t stream_id):
319    cdef void *stream_user_data
320
321    stream_user_data = cnghttp2.nghttp2_session_get_stream_user_data\
322                       (session, stream_id)
323    if stream_user_data == NULL:
324        return None
325
326    return <object>stream_user_data
327
328cdef size_t _make_nva(cnghttp2.nghttp2_nv **nva_ptr, headers):
329    cdef cnghttp2.nghttp2_nv *nva
330    cdef size_t nvlen
331
332    nvlen = len(headers)
333    nva = <cnghttp2.nghttp2_nv*>malloc(sizeof(cnghttp2.nghttp2_nv) * nvlen)
334    for i, (k, v) in enumerate(headers):
335        nva[i].name = k
336        nva[i].namelen = len(k)
337        nva[i].value = v
338        nva[i].valuelen = len(v)
339        nva[i].flags = cnghttp2.NGHTTP2_NV_FLAG_NONE
340
341    nva_ptr[0] = nva
342
343    return nvlen
344
345cdef int server_on_header(cnghttp2.nghttp2_session *session,
346                          const cnghttp2.nghttp2_frame *frame,
347                          const uint8_t *name, size_t namelen,
348                          const uint8_t *value, size_t valuelen,
349                          uint8_t flags,
350                          void *user_data):
351    cdef http2 = <_HTTP2SessionCoreBase>user_data
352    logging.debug('server_on_header, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
353
354    handler = _get_stream_user_data(session, frame.hd.stream_id)
355    return on_header(name, namelen, value, valuelen, flags, handler)
356
357cdef int client_on_header(cnghttp2.nghttp2_session *session,
358                          const cnghttp2.nghttp2_frame *frame,
359                          const uint8_t *name, size_t namelen,
360                          const uint8_t *value, size_t valuelen,
361                          uint8_t flags,
362                          void *user_data):
363    cdef http2 = <_HTTP2SessionCoreBase>user_data
364    logging.debug('client_on_header, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
365
366    if frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
367        handler = _get_stream_user_data(session, frame.hd.stream_id)
368    elif frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
369        handler = _get_stream_user_data(session, frame.push_promise.promised_stream_id)
370
371    return on_header(name, namelen, value, valuelen, flags, handler)
372
373
374cdef int on_header(const uint8_t *name, size_t namelen,
375                          const uint8_t *value, size_t valuelen,
376                          uint8_t flags,
377                          object handler):
378    if not handler:
379        return 0
380
381    key = name[:namelen]
382    values = value[:valuelen].split(b'\x00')
383    if key == b':scheme':
384        handler.scheme = values[0]
385    elif key == b':method':
386        handler.method = values[0]
387    elif key == b':authority' or key == b'host':
388        handler.host = values[0]
389    elif key == b':path':
390        handler.path = values[0]
391    elif key == b':status':
392        handler.status = values[0]
393
394    if key == b'cookie':
395        handler.cookies.extend(values)
396    else:
397        for v in values:
398            handler.headers.append((key, v))
399
400    return 0
401
402cdef int server_on_begin_request_headers(cnghttp2.nghttp2_session *session,
403                                         const cnghttp2.nghttp2_frame *frame,
404                                         void *user_data):
405    cdef http2 = <_HTTP2SessionCore>user_data
406
407    handler = http2._make_handler(frame.hd.stream_id)
408    cnghttp2.nghttp2_session_set_stream_user_data(session, frame.hd.stream_id,
409                                                  <void*>handler)
410
411    return 0
412
413cdef int server_on_begin_headers(cnghttp2.nghttp2_session *session,
414                                 const cnghttp2.nghttp2_frame *frame,
415                                 void *user_data):
416    if frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
417        if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST:
418            return server_on_begin_request_headers(session, frame, user_data)
419
420    return 0
421
422cdef int server_on_frame_recv(cnghttp2.nghttp2_session *session,
423                              const cnghttp2.nghttp2_frame *frame,
424                              void *user_data):
425    cdef http2 = <_HTTP2SessionCore>user_data
426    logging.debug('server_on_frame_recv, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
427
428    if frame.hd.type == cnghttp2.NGHTTP2_DATA:
429        if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
430            handler = _get_stream_user_data(session, frame.hd.stream_id)
431            if not handler:
432                return 0
433            try:
434                handler.on_request_done()
435            except:
436                sys.stderr.write(traceback.format_exc())
437                return http2._rst_stream(frame.hd.stream_id)
438    elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
439        if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST:
440            handler = _get_stream_user_data(session, frame.hd.stream_id)
441            if not handler:
442                return 0
443            if handler.cookies:
444                handler.headers.append((b'cookie',
445                                        b'; '.join(handler.cookies)))
446                handler.cookies = None
447            try:
448                handler.on_headers()
449                if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
450                    handler.on_request_done()
451            except:
452                sys.stderr.write(traceback.format_exc())
453                return http2._rst_stream(frame.hd.stream_id)
454    elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
455        if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK):
456            http2._stop_settings_timer()
457
458    return 0
459
460cdef int on_data_chunk_recv(cnghttp2.nghttp2_session *session,
461                                   uint8_t flags,
462                                   int32_t stream_id, const uint8_t *data,
463                                   size_t length, void *user_data):
464    cdef http2 = <_HTTP2SessionCoreBase>user_data
465
466    handler = _get_stream_user_data(session, stream_id)
467    if not handler:
468        return 0
469
470    try:
471        handler.on_data(data[:length])
472    except:
473        sys.stderr.write(traceback.format_exc())
474        return http2._rst_stream(stream_id)
475
476    return 0
477
478cdef int server_on_frame_send(cnghttp2.nghttp2_session *session,
479                              const cnghttp2.nghttp2_frame *frame,
480                              void *user_data):
481    cdef http2 = <_HTTP2SessionCore>user_data
482    logging.debug('server_on_frame_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
483
484    if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
485        # For PUSH_PROMISE, send push response immediately
486        handler = _get_stream_user_data\
487                  (session, frame.push_promise.promised_stream_id)
488        if not handler:
489            return 0
490
491        http2.send_response(handler)
492    elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
493        if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK) != 0:
494            return 0
495        http2._start_settings_timer()
496    elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
497        if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM) and \
498           cnghttp2.nghttp2_session_check_server_session(session):
499            # Send RST_STREAM if remote is not closed yet
500            if cnghttp2.nghttp2_session_get_stream_remote_close(
501                    session, frame.hd.stream_id) == 0:
502                http2._rst_stream(frame.hd.stream_id, cnghttp2.NGHTTP2_NO_ERROR)
503
504cdef int server_on_frame_not_send(cnghttp2.nghttp2_session *session,
505                                  const cnghttp2.nghttp2_frame *frame,
506                                  int lib_error_code,
507                                  void *user_data):
508    cdef http2 = <_HTTP2SessionCore>user_data
509    logging.debug('server_on_frame_not_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
510
511    if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
512        # We have to remove handler here. Without this, it is not
513        # removed until session is terminated.
514        handler = _get_stream_user_data\
515                  (session, frame.push_promise.promised_stream_id)
516        if not handler:
517            return 0
518        http2._remove_handler(handler)
519
520cdef int on_stream_close(cnghttp2.nghttp2_session *session,
521                                int32_t stream_id,
522                                uint32_t error_code,
523                                void *user_data):
524    cdef http2 = <_HTTP2SessionCoreBase>user_data
525    logging.debug('on_stream_close, stream_id:%s', stream_id)
526
527    handler = _get_stream_user_data(session, stream_id)
528    if not handler:
529        return 0
530
531    try:
532        handler.on_close(error_code)
533    except:
534        sys.stderr.write(traceback.format_exc())
535
536    http2._remove_handler(handler)
537
538    return 0
539
540cdef ssize_t data_source_read(cnghttp2.nghttp2_session *session,
541                              int32_t stream_id,
542                              uint8_t *buf, size_t length,
543                              uint32_t *data_flags,
544                              cnghttp2.nghttp2_data_source *source,
545                              void *user_data):
546    cdef http2 = <_HTTP2SessionCoreBase>user_data
547    generator = <object>source.ptr
548
549    http2.enter_callback()
550    try:
551        data, flag = generator(length)
552    except:
553        sys.stderr.write(traceback.format_exc())
554        return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
555    finally:
556        http2.leave_callback()
557
558    if flag == DATA_DEFERRED:
559        return cnghttp2.NGHTTP2_ERR_DEFERRED
560
561    if data:
562        nread = len(data)
563        memcpy(buf, <uint8_t*>data, nread)
564    else:
565        nread = 0
566
567    if flag == DATA_EOF:
568        data_flags[0] = cnghttp2.NGHTTP2_DATA_FLAG_EOF
569        if cnghttp2.nghttp2_session_check_server_session(session):
570            # Send RST_STREAM if remote is not closed yet
571            if cnghttp2.nghttp2_session_get_stream_remote_close(
572                    session, stream_id) == 0:
573                http2._rst_stream(stream_id, cnghttp2.NGHTTP2_NO_ERROR)
574    elif flag != DATA_OK:
575        return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
576
577    return nread
578
579cdef int client_on_begin_headers(cnghttp2.nghttp2_session *session,
580                                 const cnghttp2.nghttp2_frame *frame,
581                                 void *user_data):
582    cdef http2 = <_HTTP2ClientSessionCore>user_data
583
584    if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
585        # Generate a temporary handler until the headers are all received
586        push_handler = BaseResponseHandler()
587        http2._add_handler(push_handler, frame.push_promise.promised_stream_id)
588        cnghttp2.nghttp2_session_set_stream_user_data(session, frame.push_promise.promised_stream_id,
589                                                      <void*>push_handler)
590
591    return 0
592
593cdef int client_on_frame_recv(cnghttp2.nghttp2_session *session,
594                              const cnghttp2.nghttp2_frame *frame,
595                              void *user_data):
596    cdef http2 = <_HTTP2ClientSessionCore>user_data
597    logging.debug('client_on_frame_recv, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
598
599    if frame.hd.type == cnghttp2.NGHTTP2_DATA:
600        if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
601            handler = _get_stream_user_data(session, frame.hd.stream_id)
602            if not handler:
603                return 0
604            try:
605                handler.on_response_done()
606            except:
607                sys.stderr.write(traceback.format_exc())
608                return http2._rst_stream(frame.hd.stream_id)
609    elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
610        if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_RESPONSE or frame.headers.cat == cnghttp2.NGHTTP2_HCAT_PUSH_RESPONSE:
611            handler = _get_stream_user_data(session, frame.hd.stream_id)
612
613            if not handler:
614                return 0
615            # TODO handle 1xx non-final response
616            if handler.cookies:
617                handler.headers.append((b'cookie',
618                                        b'; '.join(handler.cookies)))
619                handler.cookies = None
620            try:
621                handler.on_headers()
622                if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
623                    handler.on_response_done()
624            except:
625                sys.stderr.write(traceback.format_exc())
626                return http2._rst_stream(frame.hd.stream_id)
627    elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
628        if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK):
629            http2._stop_settings_timer()
630    elif frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
631        handler = _get_stream_user_data(session, frame.hd.stream_id)
632        if not handler:
633            return 0
634        # Get the temporary push_handler which now should have all of the header data
635        push_handler = _get_stream_user_data(session, frame.push_promise.promised_stream_id)
636        if not push_handler:
637            return 0
638        # Remove the temporary handler
639        http2._remove_handler(push_handler)
640        cnghttp2.nghttp2_session_set_stream_user_data(session, frame.push_promise.promised_stream_id,
641                                                      <void*>NULL)
642
643        try:
644            handler.on_push_promise(push_handler)
645        except:
646            sys.stderr.write(traceback.format_exc())
647            return http2._rst_stream(frame.hd.stream_id)
648
649    return 0
650
651cdef int client_on_frame_send(cnghttp2.nghttp2_session *session,
652                              const cnghttp2.nghttp2_frame *frame,
653                              void *user_data):
654    cdef http2 = <_HTTP2ClientSessionCore>user_data
655    logging.debug('client_on_frame_send, type:%s, stream_id:%s', frame.hd.type, frame.hd.stream_id)
656
657    if frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
658        if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK) != 0:
659            return 0
660        http2._start_settings_timer()
661
662cdef class _HTTP2SessionCoreBase:
663    cdef cnghttp2.nghttp2_session *session
664    cdef transport
665    cdef handler_class
666    cdef handlers
667    cdef settings_timer
668    cdef inside_callback
669
670    def __cinit__(self, transport, handler_class=None):
671        self.session = NULL
672        self.transport = transport
673        self.handler_class = handler_class
674        self.handlers = set()
675        self.settings_timer = None
676        self.inside_callback = False
677
678    def __dealloc__(self):
679        cnghttp2.nghttp2_session_del(self.session)
680
681    def data_received(self, data):
682        cdef ssize_t rv
683
684        rv = cnghttp2.nghttp2_session_mem_recv(self.session, data, len(data))
685        if rv < 0:
686            raise Exception('nghttp2_session_mem_recv failed: {}'.format\
687                            (_strerror(rv)))
688        self.send_data()
689
690    OUTBUF_MAX = 65535
691    SETTINGS_TIMEOUT = 5.0
692
693    def send_data(self):
694        cdef ssize_t outbuflen
695        cdef const uint8_t *outbuf
696
697        while True:
698            if self.transport.get_write_buffer_size() > self.OUTBUF_MAX:
699                break
700            outbuflen = cnghttp2.nghttp2_session_mem_send(self.session, &outbuf)
701            if outbuflen == 0:
702                break
703            if outbuflen < 0:
704                raise Exception('nghttp2_session_mem_send faild: {}'.format\
705                                (_strerror(outbuflen)))
706            self.transport.write(outbuf[:outbuflen])
707
708        if self.transport.get_write_buffer_size() == 0 and \
709           cnghttp2.nghttp2_session_want_read(self.session) == 0 and \
710           cnghttp2.nghttp2_session_want_write(self.session) == 0:
711            self.transport.close()
712
713    def resume(self, stream_id):
714        cnghttp2.nghttp2_session_resume_data(self.session, stream_id)
715        if not self.inside_callback:
716            self.send_data()
717
718    def enter_callback(self):
719        self.inside_callback = True
720
721    def leave_callback(self):
722        self.inside_callback = False
723
724    def _make_handler(self, stream_id):
725        logging.debug('_make_handler, stream_id:%s', stream_id)
726        handler = self.handler_class(self, stream_id)
727        self.handlers.add(handler)
728        return handler
729
730    def _remove_handler(self, handler):
731        logging.debug('_remove_handler, stream_id:%s', handler.stream_id)
732        self.handlers.remove(handler)
733
734    def _add_handler(self, handler, stream_id):
735        logging.debug('_add_handler, stream_id:%s', stream_id)
736        handler.stream_id = stream_id
737        handler.http2 = self
738        handler.remote_address = self._get_remote_address()
739        handler.client_certificate = self._get_client_certificate()
740        self.handlers.add(handler)
741
742    def _rst_stream(self, stream_id,
743                   error_code=cnghttp2.NGHTTP2_INTERNAL_ERROR):
744        cdef int rv
745
746        rv = cnghttp2.nghttp2_submit_rst_stream\
747             (self.session, cnghttp2.NGHTTP2_FLAG_NONE,
748              stream_id, error_code)
749
750        return rv
751
752    def _get_remote_address(self):
753        return self.transport.get_extra_info('peername')
754
755    def _get_client_certificate(self):
756        sock = self.transport.get_extra_info('socket')
757        try:
758            return sock.getpeercert()
759        except AttributeError:
760            return None
761
762    def _start_settings_timer(self):
763        loop = asyncio.get_event_loop()
764        self.settings_timer = loop.call_later(self.SETTINGS_TIMEOUT,
765                                              self._settings_timeout)
766
767    def _stop_settings_timer(self):
768        if self.settings_timer:
769            self.settings_timer.cancel()
770            self.settings_timer = None
771
772    def _settings_timeout(self):
773        cdef int rv
774
775        logging.debug('_settings_timeout')
776
777        self.settings_timer = None
778
779        rv = cnghttp2.nghttp2_session_terminate_session\
780             (self.session, cnghttp2.NGHTTP2_SETTINGS_TIMEOUT)
781        try:
782            self.send_data()
783        except Exception as err:
784            sys.stderr.write(traceback.format_exc())
785            self.transport.close()
786            return
787
788    def _log_request(self, handler):
789        now = datetime.datetime.now()
790        tv = time.mktime(now.timetuple())
791        datestr = email.utils.formatdate(timeval=tv, localtime=False,
792                                        usegmt=True)
793        try:
794            method = handler.method.decode('utf-8')
795        except:
796            method = handler.method
797        try:
798            path = handler.path.decode('utf-8')
799        except:
800            path = handler.path
801        logging.info('%s - - [%s] "%s %s HTTP/2" %s - %s', handler.remote_address[0],
802                          datestr, method, path, handler.status,
803                          'P' if handler.pushed else '-')
804
805    def close(self):
806        rv = cnghttp2.nghttp2_session_terminate_session\
807             (self.session, cnghttp2.NGHTTP2_NO_ERROR)
808        try:
809            self.send_data()
810        except Exception as err:
811            sys.stderr.write(traceback.format_exc())
812            self.transport.close()
813            return
814
815cdef class _HTTP2SessionCore(_HTTP2SessionCoreBase):
816    def __cinit__(self, *args, **kwargs):
817        cdef cnghttp2.nghttp2_session_callbacks *callbacks
818        cdef cnghttp2.nghttp2_settings_entry iv[2]
819        cdef int rv
820
821        super(_HTTP2SessionCore, self).__init__(*args, **kwargs)
822
823        rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks)
824
825        if rv != 0:
826            raise Exception('nghttp2_session_callbacks_new failed: {}'.format\
827                            (_strerror(rv)))
828
829        cnghttp2.nghttp2_session_callbacks_set_on_header_callback(
830            callbacks, server_on_header)
831        cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback(
832            callbacks, server_on_begin_headers)
833        cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(
834            callbacks, server_on_frame_recv)
835        cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(
836            callbacks, on_stream_close)
837        cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback(
838            callbacks, server_on_frame_send)
839        cnghttp2.nghttp2_session_callbacks_set_on_frame_not_send_callback(
840            callbacks, server_on_frame_not_send)
841        cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
842            callbacks, on_data_chunk_recv)
843
844        rv = cnghttp2.nghttp2_session_server_new(&self.session, callbacks,
845                                                 <void*>self)
846
847        cnghttp2.nghttp2_session_callbacks_del(callbacks)
848
849        if rv != 0:
850            raise Exception('nghttp2_session_server_new failed: {}'.format\
851                            (_strerror(rv)))
852
853        iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS
854        iv[0].value = 100
855        iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE
856        iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE
857
858        rv = cnghttp2.nghttp2_submit_settings(self.session,
859                                              cnghttp2.NGHTTP2_FLAG_NONE,
860                                              iv, sizeof(iv) // sizeof(iv[0]))
861
862        if rv != 0:
863            raise Exception('nghttp2_submit_settings failed: {}'.format\
864                            (_strerror(rv)))
865
866    def send_response(self, handler):
867        cdef cnghttp2.nghttp2_data_provider prd
868        cdef cnghttp2.nghttp2_data_provider *prd_ptr
869        cdef cnghttp2.nghttp2_nv *nva
870        cdef size_t nvlen
871        cdef int rv
872
873        logging.debug('send_response, stream_id:%s', handler.stream_id)
874
875        nva = NULL
876        nvlen = _make_nva(&nva, handler.response_headers)
877
878        if handler.response_body:
879            prd.source.ptr = <void*>handler.response_body
880            prd.read_callback = data_source_read
881            prd_ptr = &prd
882        else:
883            prd_ptr = NULL
884
885        rv = cnghttp2.nghttp2_submit_response(self.session, handler.stream_id,
886                                              nva, nvlen, prd_ptr)
887
888        free(nva)
889
890        if rv != 0:
891            # TODO Ignore return value
892            self._rst_stream(handler.stream_id)
893            raise Exception('nghttp2_submit_response failed: {}'.format\
894                            (_strerror(rv)))
895
896        self._log_request(handler)
897
898    def push(self, handler, promised_handler):
899        cdef cnghttp2.nghttp2_nv *nva
900        cdef size_t nvlen
901        cdef int32_t promised_stream_id
902
903        self.handlers.add(promised_handler)
904
905        nva = NULL
906        nvlen = _make_nva(&nva, promised_handler.headers)
907
908        promised_stream_id = cnghttp2.nghttp2_submit_push_promise\
909                             (self.session,
910                              cnghttp2.NGHTTP2_FLAG_NONE,
911                              handler.stream_id,
912                              nva, nvlen,
913                              <void*>promised_handler)
914        if promised_stream_id < 0:
915            raise Exception('nghttp2_submit_push_promise failed: {}'.format\
916                            (_strerror(promised_stream_id)))
917
918        promised_handler.stream_id = promised_stream_id
919
920        logging.debug('push, stream_id:%s', promised_stream_id)
921
922        return promised_handler
923
924    def connection_lost(self):
925        self._stop_settings_timer()
926
927        for handler in self.handlers:
928            handler.on_close(cnghttp2.NGHTTP2_INTERNAL_ERROR)
929        self.handlers = set()
930
931cdef class _HTTP2ClientSessionCore(_HTTP2SessionCoreBase):
932    def __cinit__(self, *args, **kwargs):
933        cdef cnghttp2.nghttp2_session_callbacks *callbacks
934        cdef cnghttp2.nghttp2_settings_entry iv[2]
935        cdef int rv
936
937        super(_HTTP2ClientSessionCore, self).__init__(*args, **kwargs)
938
939        rv = cnghttp2.nghttp2_session_callbacks_new(&callbacks)
940
941        if rv != 0:
942            raise Exception('nghttp2_session_callbacks_new failed: {}'.format\
943                            (_strerror(rv)))
944
945        cnghttp2.nghttp2_session_callbacks_set_on_header_callback(
946            callbacks, client_on_header)
947        cnghttp2.nghttp2_session_callbacks_set_on_begin_headers_callback(
948            callbacks, client_on_begin_headers)
949        cnghttp2.nghttp2_session_callbacks_set_on_frame_recv_callback(
950            callbacks, client_on_frame_recv)
951        cnghttp2.nghttp2_session_callbacks_set_on_stream_close_callback(
952            callbacks, on_stream_close)
953        cnghttp2.nghttp2_session_callbacks_set_on_frame_send_callback(
954            callbacks, client_on_frame_send)
955        cnghttp2.nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
956            callbacks, on_data_chunk_recv)
957
958        rv = cnghttp2.nghttp2_session_client_new(&self.session, callbacks,
959                                                 <void*>self)
960
961        cnghttp2.nghttp2_session_callbacks_del(callbacks)
962
963        if rv != 0:
964            raise Exception('nghttp2_session_client_new failed: {}'.format\
965                            (_strerror(rv)))
966
967        iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS
968        iv[0].value = 100
969        iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE
970        iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE
971
972        rv = cnghttp2.nghttp2_submit_settings(self.session,
973                                              cnghttp2.NGHTTP2_FLAG_NONE,
974                                              iv, sizeof(iv) // sizeof(iv[0]))
975
976        if rv != 0:
977            raise Exception('nghttp2_submit_settings failed: {}'.format\
978                            (_strerror(rv)))
979
980    def send_request(self, method, scheme, host, path, headers, body, handler):
981        cdef cnghttp2.nghttp2_data_provider prd
982        cdef cnghttp2.nghttp2_data_provider *prd_ptr
983        cdef cnghttp2.nghttp2_priority_spec *pri_ptr
984        cdef cnghttp2.nghttp2_nv *nva
985        cdef size_t nvlen
986        cdef int32_t stream_id
987
988        body = wrap_body(body)
989
990        custom_headers = _encode_headers(headers)
991        headers = [
992            (b':method', method.encode('utf-8')),
993            (b':scheme', scheme.encode('utf-8')),
994            (b':authority', host.encode('utf-8')),
995            (b':path', path.encode('utf-8'))
996        ]
997        headers.extend(custom_headers)
998
999        nva = NULL
1000        nvlen = _make_nva(&nva, headers)
1001
1002        if body:
1003            prd.source.ptr = <void*>body
1004            prd.read_callback = data_source_read
1005            prd_ptr = &prd
1006        else:
1007            prd_ptr = NULL
1008
1009        # TODO: Enable priorities
1010        pri_ptr = NULL
1011
1012        stream_id = cnghttp2.nghttp2_submit_request\
1013                             (self.session, pri_ptr,
1014                              nva, nvlen, prd_ptr,
1015                              <void*>handler)
1016        free(nva)
1017
1018        if stream_id < 0:
1019            raise Exception('nghttp2_submit_request failed: {}'.format\
1020                            (_strerror(stream_id)))
1021
1022        logging.debug('request, stream_id:%s', stream_id)
1023
1024        self._add_handler(handler, stream_id)
1025        cnghttp2.nghttp2_session_set_stream_user_data(self.session, stream_id,
1026                                                  <void*>handler)
1027
1028        return handler
1029
1030    def push(self, push_promise, handler):
1031        if handler:
1032            # push_promise accepted, fill in the handler with the stored
1033            # headers from the push_promise
1034            handler.status = push_promise.status
1035            handler.scheme = push_promise.scheme
1036            handler.method = push_promise.method
1037            handler.host = push_promise.host
1038            handler.path = push_promise.path
1039            handler.cookies = push_promise.cookies
1040            handler.stream_id = push_promise.stream_id
1041            handler.http2 = self
1042            handler.pushed = True
1043
1044            self._add_handler(handler, handler.stream_id)
1045
1046            cnghttp2.nghttp2_session_set_stream_user_data(self.session, handler.stream_id,
1047                                                      <void*>handler)
1048        else:
1049            # push_promise rejected, reset the stream
1050            self._rst_stream(push_promise.stream_id,
1051                              error_code=cnghttp2.NGHTTP2_NO_ERROR)
1052
1053if asyncio:
1054
1055    class BaseRequestHandler:
1056
1057        """HTTP/2 request (stream) handler base class.
1058
1059        The class is used to handle the HTTP/2 stream. By default, it does
1060        not nothing. It must be subclassed to handle each event callback
1061        method.
1062
1063        The first callback method invoked is on_headers(). It is called
1064        when HEADERS frame, which includes request header fields, is
1065        arrived.
1066
1067        If request has request body, on_data(data) is invoked for each
1068        chunk of received data.
1069
1070        When whole request is received, on_request_done() is invoked.
1071
1072        When stream is closed, on_close(error_code) is called.
1073
1074        The application can send response using send_response() method. It
1075        can be used in on_headers(), on_data() or on_request_done().
1076
1077        The application can push resource using push() method. It must be
1078        used before send_response() call.
1079
1080        The following instance variables are available:
1081
1082        client_address
1083          Contains a tuple of the form (host, port) referring to the client's
1084          address.
1085
1086        client_certificate
1087          May contain the client certifcate in its non-binary form
1088
1089        stream_id
1090          Stream ID of this stream
1091
1092        scheme
1093          Scheme of the request URI. This is a value of :scheme header field.
1094
1095        method
1096          Method of this stream. This is a value of :method header field.
1097
1098        host
1099          This is a value of :authority or host header field.
1100
1101        path
1102          This is a value of :path header field.
1103
1104        headers
1105          Request header fields
1106
1107        """
1108
1109        def __init__(self, http2, stream_id):
1110            self.headers = []
1111            self.cookies = []
1112            # Stream ID. For promised stream, it is initially -1.
1113            self.stream_id = stream_id
1114            self.http2 = http2
1115            # address of the client
1116            self.remote_address = self.http2._get_remote_address()
1117            # certificate of the client
1118            self._client_certificate = self.http2._get_client_certificate()
1119            # :scheme header field in request
1120            self.scheme = None
1121            # :method header field in request
1122            self.method = None
1123            # :authority or host header field in request
1124            self.host = None
1125            # :path header field in request
1126            self.path = None
1127            # HTTP status
1128            self.status = None
1129            # True if this is a handler for pushed resource
1130            self.pushed = False
1131
1132        @property
1133        def client_address(self):
1134            return self.remote_address
1135
1136        @property
1137        def client_certificate(self):
1138            return self._client_certificate
1139
1140        def on_headers(self):
1141
1142            '''Called when request HEADERS is arrived.
1143
1144            '''
1145            pass
1146
1147        def on_data(self, data):
1148
1149            '''Called when a chunk of request body is arrived. This method
1150            will be called multiple times until all data are received.
1151
1152            '''
1153            pass
1154
1155        def on_request_done(self):
1156
1157            '''Called when whole request was received
1158
1159            '''
1160            pass
1161
1162        def on_close(self, error_code):
1163
1164            '''Called when stream is about to close.
1165
1166            '''
1167            pass
1168
1169        def send_response(self, status=200, headers=None, body=None):
1170
1171            '''Send response. The status is HTTP status code. The headers is
1172            additional response headers. The :status header field is
1173            appended by the library. The body is the response body. It
1174            could be None if response body is empty. Or it must be
1175            instance of either str, bytes, io.IOBase or callable,
1176            called body generator, which takes one parameter,
1177            size. The body generator generates response body. It can
1178            pause generation of response so that it can wait for slow
1179            backend data generation. When invoked, it should return
1180            tuple, byte string and flag. The flag is either DATA_OK,
1181            DATA_EOF and DATA_DEFERRED. For non-empty byte string and
1182            it is not the last chunk of response, DATA_OK is returned
1183            as flag.  If this is the last chunk of the response (byte
1184            string is possibly None), DATA_EOF must be returned as
1185            flag.  If there is no data available right now, but
1186            additional data are anticipated, return tuple (None,
1187            DATA_DEFERRD).  When data arrived, call resume() and
1188            restart response body transmission.
1189
1190            Only the body generator can pause response body
1191            generation; instance of io.IOBase must not block.
1192
1193            If instance of str is specified as body, it is encoded
1194            using UTF-8.
1195
1196            The headers is a list of tuple of the form (name,
1197            value). The name and value can be either unicode string or
1198            byte string.
1199
1200            On error, exception will be thrown.
1201
1202            '''
1203            if self.status is not None:
1204                raise Exception('response has already been sent')
1205
1206            if not status:
1207                raise Exception('status must not be empty')
1208
1209            body = wrap_body(body)
1210
1211            self._set_response_prop(status, headers, body)
1212            self.http2.send_response(self)
1213
1214        def push(self, path, method='GET', request_headers=None,
1215                 status=200, headers=None, body=None):
1216
1217            '''Push a resource. The path is a path portion of request URI
1218            for this
1219            resource. The method is a method to access this
1220            resource. The request_headers is additional request
1221            headers to access this resource. The :scheme, :method,
1222            :authority and :path are appended by the library. The
1223            :scheme and :authority are inherited from the request (not
1224            request_headers parameter).
1225
1226            The status is HTTP status code. The headers is additional
1227            response headers. The :status header field is appended by
1228            the library. The body is the response body. It has the
1229            same semantics of body parameter of send_response().
1230
1231            The headers and request_headers are a list of tuple of the
1232            form (name, value). The name and value can be either
1233            unicode string or byte string.
1234
1235            On error, exception will be thrown.
1236
1237            '''
1238            if not status:
1239                raise Exception('status must not be empty')
1240
1241            if not method:
1242                raise Exception('method must not be empty')
1243
1244            if not path:
1245                raise Exception('path must not be empty')
1246
1247            body = wrap_body(body)
1248
1249            promised_handler = self.http2._make_handler(-1)
1250            promised_handler.pushed = True
1251            promised_handler.scheme = self.scheme
1252            promised_handler.method = method.encode('utf-8')
1253            promised_handler.host = self.host
1254            promised_handler.path = path.encode('utf-8')
1255            promised_handler._set_response_prop(status, headers, body)
1256
1257            headers = [
1258                (b':method', promised_handler.method),
1259                (b':scheme', promised_handler.scheme),
1260                (b':authority', promised_handler.host),
1261                (b':path', promised_handler.path)
1262            ]
1263            headers.extend(_encode_headers(request_headers))
1264
1265            promised_handler.headers = headers
1266
1267            return self.http2.push(self, promised_handler)
1268
1269        def _set_response_prop(self, status, headers, body):
1270            self.status = status
1271
1272            if headers is None:
1273                headers = []
1274
1275            self.response_headers = [(b':status', str(status).encode('utf-8'))]
1276            self.response_headers.extend(_encode_headers(headers))
1277
1278            self.response_body = body
1279
1280        def resume(self):
1281            self.http2.resume(self.stream_id)
1282
1283    def _encode_headers(headers):
1284        if not headers:
1285            return []
1286        return [(k if isinstance(k, bytes) else k.encode('utf-8'),
1287                 v if isinstance(v, bytes) else v.encode('utf-8')) \
1288                for k, v in headers]
1289
1290    class _HTTP2Session(asyncio.Protocol):
1291
1292        def __init__(self, RequestHandlerClass):
1293            asyncio.Protocol.__init__(self)
1294            self.RequestHandlerClass = RequestHandlerClass
1295            self.http2 = None
1296
1297        def connection_made(self, transport):
1298            address = transport.get_extra_info('peername')
1299            logging.info('connection_made, address:%s, port:%s', address[0], address[1])
1300
1301            self.transport = transport
1302            sock = self.transport.get_extra_info('socket')
1303            try:
1304                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1305            except OSError as e:
1306                logging.info('failed to set tcp-nodelay: %s', str(e))
1307            ssl_ctx = self.transport.get_extra_info('sslcontext')
1308            if ssl_ctx:
1309                ssl_obj = self.transport.get_extra_info('ssl_object')
1310                protocol = negotiated_protocol(ssl_obj)
1311                if protocol is None or protocol.encode('utf-8') != \
1312                   cnghttp2.NGHTTP2_PROTO_VERSION_ID:
1313                    self.transport.abort()
1314                    return
1315            try:
1316                self.http2 = _HTTP2SessionCore\
1317                             (self.transport,
1318                              self.RequestHandlerClass)
1319            except Exception as err:
1320                sys.stderr.write(traceback.format_exc())
1321                self.transport.abort()
1322                return
1323
1324
1325        def connection_lost(self, exc):
1326            logging.info('connection_lost')
1327            if self.http2:
1328                self.http2.connection_lost()
1329                self.http2 = None
1330
1331        def data_received(self, data):
1332            try:
1333                self.http2.data_received(data)
1334            except Exception as err:
1335                sys.stderr.write(traceback.format_exc())
1336                self.transport.close()
1337                return
1338
1339        def resume_writing(self):
1340            try:
1341                self.http2.send_data()
1342            except Exception as err:
1343                sys.stderr.write(traceback.format_exc())
1344                self.transport.close()
1345                return
1346
1347    class HTTP2Server:
1348
1349        '''HTTP/2 server.
1350
1351        This class builds on top of the asyncio event loop. On
1352        construction, RequestHandlerClass must be given, which must be a
1353        subclass of BaseRequestHandler class.
1354
1355        '''
1356        def __init__(self, address, RequestHandlerClass, ssl=None):
1357
1358            '''address is a tuple of the listening address and port (e.g.,
1359            ('127.0.0.1', 8080)). RequestHandlerClass must be a subclass
1360            of BaseRequestHandler class to handle a HTTP/2 stream.  The
1361            ssl can be ssl.SSLContext instance. If it is not None, the
1362            resulting server is SSL/TLS capable.
1363
1364            '''
1365            def session_factory():
1366                return _HTTP2Session(RequestHandlerClass)
1367
1368            self.loop = asyncio.get_event_loop()
1369
1370            if ssl:
1371                set_application_protocol(ssl)
1372
1373            coro = self.loop.create_server(session_factory,
1374                                           host=address[0], port=address[1],
1375                                           ssl=ssl)
1376            self.server = self.loop.run_until_complete(coro)
1377            logging.info('listen, address:%s, port:%s', address[0], address[1])
1378
1379        def serve_forever(self):
1380            try:
1381                self.loop.run_forever()
1382            finally:
1383                self.server.close()
1384                self.loop.close()
1385
1386
1387
1388    class BaseResponseHandler:
1389
1390        """HTTP/2 response (stream) handler base class.
1391
1392        The class is used to handle the HTTP/2 stream. By default, it does
1393        not nothing. It must be subclassed to handle each event callback
1394        method.
1395
1396        The first callback method invoked is on_headers(). It is called
1397        when HEADERS frame, which includes response header fields, is
1398        arrived.
1399
1400        If response has a body, on_data(data) is invoked for each
1401        chunk of received data.
1402
1403        When whole response is received, on_response_done() is invoked.
1404
1405        When stream is closed or underlying connection is lost,
1406        on_close(error_code) is called.
1407
1408        The application can send follow up requests using HTTP2Client.send_request() method.
1409
1410        The application can handle push resource using on_push_promise() method.
1411
1412        The following instance variables are available:
1413
1414        server_address
1415          Contains a tuple of the form (host, port) referring to the server's
1416          address.
1417
1418        stream_id
1419          Stream ID of this stream
1420
1421        scheme
1422          Scheme of the request URI. This is a value of :scheme header field.
1423
1424        method
1425          Method of this stream. This is a value of :method header field.
1426
1427        host
1428          This is a value of :authority or host header field.
1429
1430        path
1431          This is a value of :path header field.
1432
1433        headers
1434          Response header fields.  There is a special exception.  If this
1435          object is passed to push_promise(), this instance variable contains
1436          pushed request header fields.
1437
1438        """
1439
1440        def __init__(self, http2=None, stream_id=-1):
1441            self.headers = []
1442            self.cookies = []
1443            # Stream ID. For promised stream, it is initially -1.
1444            self.stream_id = stream_id
1445            self.http2 = http2
1446            # address of the server
1447            self.remote_address = None
1448            # :scheme header field in request
1449            self.scheme = None
1450            # :method header field in request
1451            self.method = None
1452            # :authority or host header field in request
1453            self.host = None
1454            # :path header field in request
1455            self.path = None
1456            # HTTP status
1457            self.status = None
1458            # True if this is a handler for pushed resource
1459            self.pushed = False
1460
1461        @property
1462        def server_address(self):
1463            return self.remote_address
1464
1465        def on_headers(self):
1466
1467            '''Called when response HEADERS is arrived.
1468
1469            '''
1470            pass
1471
1472        def on_data(self, data):
1473
1474            '''Called when a chunk of response body is arrived. This method
1475            will be called multiple times until all data are received.
1476
1477            '''
1478            pass
1479
1480        def on_response_done(self):
1481
1482            '''Called when whole response was received
1483
1484            '''
1485            pass
1486
1487        def on_close(self, error_code):
1488
1489            '''Called when stream is about to close.
1490
1491            '''
1492            pass
1493
1494        def on_push_promise(self, push_promise):
1495
1496            '''Called when a push is promised. Default behavior is to
1497            cancel the push. If application overrides this method,
1498            it should call either accept_push or reject_push.
1499
1500            '''
1501            self.reject_push(push_promise)
1502
1503        def reject_push(self, push_promise):
1504
1505            '''Convenience method equivalent to calling accept_push
1506            with a falsy value.
1507
1508            '''
1509            self.http2.push(push_promise, None)
1510
1511        def accept_push(self, push_promise, handler=None):
1512
1513            '''Accept a push_promise and provider a handler for the
1514            new stream. If a falsy value is supplied for the handler,
1515            the push is rejected.
1516
1517            '''
1518            self.http2.push(push_promise, handler)
1519
1520        def resume(self):
1521            self.http2.resume(self.stream_id)
1522
1523    class _HTTP2ClientSession(asyncio.Protocol):
1524
1525        def __init__(self, client):
1526            asyncio.Protocol.__init__(self)
1527            self.http2 = None
1528            self.pending = []
1529            self.client = client
1530
1531        def connection_made(self, transport):
1532            address = transport.get_extra_info('peername')
1533            logging.info('connection_made, address:%s, port:%s', address[0], address[1])
1534
1535            self.transport = transport
1536            sock = self.transport.get_extra_info('socket')
1537            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1538            ssl_ctx = self.transport.get_extra_info('sslcontext')
1539            if ssl_ctx:
1540                ssl_obj = self.transport.get_extra_info('ssl_object')
1541                protocol = negotiated_protocol(ssl_obj)
1542                if protocol is None or protocol.encode('utf-8') != \
1543                   cnghttp2.NGHTTP2_PROTO_VERSION_ID:
1544                    self.transport.abort()
1545
1546            self.http2 = _HTTP2ClientSessionCore(self.transport)
1547
1548	    # Clear pending requests
1549            send_pending = self.pending
1550            self.pending = []
1551            for method,scheme,host,path,headers,body,handler in send_pending:
1552                self.send_request(method=method, scheme=scheme, host=host, path=path,\
1553                                  headers=headers, body=body, handler=handler)
1554            self.http2.send_data()
1555
1556        def connection_lost(self, exc):
1557            logging.info('connection_lost')
1558            if self.http2:
1559                self.http2 = None
1560            self.client.close()
1561
1562        def data_received(self, data):
1563            try:
1564                self.http2.data_received(data)
1565            except Exception as err:
1566                sys.stderr.write(traceback.format_exc())
1567                self.transport.close()
1568                return
1569
1570        def resume_writing(self):
1571            try:
1572                self.http2.send_data()
1573            except Exception as err:
1574                sys.stderr.write(traceback.format_exc())
1575                self.transport.close()
1576                return
1577
1578        def send_request(self, method, scheme, host, path, headers, body, handler):
1579            try:
1580		# Waiting until connection established
1581                if not self.http2:
1582                    self.pending.append([method, scheme, host, path, headers, body, handler])
1583                    return
1584
1585                self.http2.send_request(method=method, scheme=scheme, host=host, path=path,\
1586                                        headers=headers, body=body, handler=handler)
1587                self.http2.send_data()
1588            except Exception as err:
1589                sys.stderr.write(traceback.format_exc())
1590                self.transport.close()
1591                return
1592
1593        def close(self):
1594            if self.http2:
1595                self.http2.close()
1596
1597
1598    class HTTP2Client:
1599
1600        '''HTTP/2 client.
1601
1602        This class builds on top of the asyncio event loop.
1603
1604        '''
1605        def __init__(self, address, loop=None, ssl=None):
1606
1607            '''address is a tuple of the connect address and port (e.g.,
1608            ('127.0.0.1', 8080)). The ssl can be ssl.SSLContext instance.
1609            If it is not None, the resulting client is SSL/TLS capable.
1610            '''
1611
1612            self.address = address
1613            self.session = _HTTP2ClientSession(self)
1614            def session_factory():
1615                return self.session
1616
1617            if ssl:
1618                set_application_protocol(ssl)
1619
1620            self.loop = loop
1621            if not self.loop:
1622                self.loop = asyncio.get_event_loop()
1623
1624            coro = self.loop.create_connection(session_factory,
1625                                           host=address[0], port=address[1],
1626                                           ssl=ssl)
1627
1628            if ssl:
1629                self.scheme = 'https'
1630            else:
1631                self.scheme = 'http'
1632
1633            self.transport,_ = self.loop.run_until_complete(coro)
1634            logging.info('connect, address:%s, port:%s', self.address[0], self.address[1])
1635
1636        @property
1637        def io_loop(self):
1638            return self.loop
1639
1640        def close(self):
1641            self.session.close()
1642
1643        def send_request(self, method='GET', url='/', headers=None, body=None, handler=None):
1644            url = urlparse(url)
1645            scheme = url.scheme if url.scheme else self.scheme
1646            host = url.netloc if url.netloc else self.address[0]+':'+str(self.address[1])
1647            path = url.path
1648            if url.params:
1649                path += ';'+url.params
1650            if url.query:
1651                path += '?'+url.query
1652            if url.fragment:
1653                path += '#'+url.fragment
1654
1655            self.session.send_request(method=method, scheme=scheme, host=host, path=path,\
1656                                      headers=headers, body=body, handler=handler)
1657