1"""Test flask-smorest on more or less realistic examples"""
2
3import json
4from contextlib import contextmanager
5
6import pytest
7
8import marshmallow as ma
9from flask.views import MethodView
10
11from flask_smorest import Api, Blueprint, abort, Page
12from flask_smorest.pagination import PaginationMetadataSchema
13from flask_smorest.utils import get_appcontext
14from .mocks import ItemNotFound
15from .utils import build_ref, get_schemas
16
17
18def implicit_data_and_schema_etag_blueprint(collection, schemas):
19    """Blueprint with implicit data and schema ETag computation
20
21    ETag computed automatically from result data with same schema as data
22    Post-pagination is used to reduce boilerplate even more
23    """
24
25    DocSchema = schemas.DocSchema
26
27    blp = Blueprint('test', __name__, url_prefix='/test')
28
29    @blp.route('/')
30    class Resource(MethodView):
31
32        @blp.etag
33        @blp.response(200, DocSchema(many=True))
34        @blp.paginate(Page)
35        def get(self):
36            return collection.items
37
38        @blp.etag
39        @blp.arguments(DocSchema)
40        @blp.response(201, DocSchema)
41        def post(self, new_item):
42            return collection.post(new_item)
43
44    @blp.route('/<int:item_id>')
45    class ResourceById(MethodView):
46
47        def _get_item(self, item_id):
48            try:
49                return collection.get_by_id(item_id)
50            except ItemNotFound:
51                abort(404)
52
53        @blp.etag
54        @blp.response(200, DocSchema)
55        def get(self, item_id):
56            return self._get_item(item_id)
57
58        @blp.etag
59        @blp.arguments(DocSchema)
60        @blp.response(200, DocSchema)
61        def put(self, new_item, item_id):
62            item = self._get_item(item_id)
63            # Check ETag is a manual action and schema must be provided
64            blp.check_etag(item, DocSchema)
65            return collection.put(item_id, new_item)
66
67        @blp.etag
68        @blp.response(204)
69        def delete(self, item_id):
70            item = self._get_item(item_id)
71            # Check ETag is a manual action and schema must be provided
72            blp.check_etag(item, DocSchema)
73            collection.delete(item_id)
74
75    return blp
76
77
78def implicit_data_explicit_schema_etag_blueprint(collection, schemas):
79    """Blueprint with implicit ETag computation, explicit schema
80
81    ETag computed automatically with specific ETag schema
82    """
83
84    DocSchema = schemas.DocSchema
85    DocEtagSchema = schemas.DocEtagSchema
86
87    blp = Blueprint('test', __name__, url_prefix='/test')
88
89    @blp.route('/')
90    class Resource(MethodView):
91
92        @blp.etag(DocEtagSchema(many=True))
93        @blp.response(200, DocSchema(many=True))
94        @blp.paginate()
95        def get(self, pagination_parameters):
96            pagination_parameters.item_count = len(collection.items)
97            return collection.items[
98                pagination_parameters.first_item:
99                pagination_parameters.last_item + 1
100            ]
101
102        @blp.etag(DocEtagSchema)
103        @blp.arguments(DocSchema)
104        @blp.response(201, DocSchema)
105        def post(self, new_item):
106            return collection.post(new_item)
107
108    @blp.route('/<int:item_id>')
109    class ResourceById(MethodView):
110
111        def _get_item(self, item_id):
112            try:
113                return collection.get_by_id(item_id)
114            except ItemNotFound:
115                abort(404)
116
117        @blp.etag(DocEtagSchema)
118        @blp.response(200, DocSchema)
119        def get(self, item_id):
120            item = self._get_item(item_id)
121            return item
122
123        @blp.etag(DocEtagSchema)
124        @blp.arguments(DocSchema)
125        @blp.response(200, DocSchema)
126        def put(self, new_item, item_id):
127            item = self._get_item(item_id)
128            # Check ETag is a manual action, ETag schema is used
129            blp.check_etag(item)
130            new_item = collection.put(item_id, new_item)
131            return new_item
132
133        @blp.etag(DocEtagSchema)
134        @blp.response(204)
135        def delete(self, item_id):
136            item = self._get_item(item_id)
137            # Check ETag is a manual action, ETag schema is used
138            blp.check_etag(item)
139            collection.delete(item_id)
140
141    return blp
142
143
144def explicit_data_no_schema_etag_blueprint(collection, schemas):
145    """Blueprint with explicit ETag computation, no schema
146
147    ETag computed without schema from arbitrary data
148
149    We're using item['db_field'] for ETag data as a dummy example.
150    """
151
152    DocSchema = schemas.DocSchema
153
154    blp = Blueprint('test', __name__, url_prefix='/test')
155
156    @blp.route('/')
157    class Resource(MethodView):
158
159        @blp.etag
160        @blp.response(200, DocSchema(many=True))
161        @blp.paginate()
162        def get(self, pagination_parameters):
163            pagination_parameters.item_count = len(collection.items)
164            # It is better to rely on automatic ETag here, as it includes
165            # pagination metadata.
166            return collection.items[
167                pagination_parameters.first_item:
168                pagination_parameters.last_item + 1
169            ]
170
171        @blp.etag
172        @blp.arguments(DocSchema)
173        @blp.response(201, DocSchema)
174        def post(self, new_item):
175            # Compute ETag using arbitrary data and no schema
176            blp.set_etag(new_item['db_field'])
177            return collection.post(new_item)
178
179    @blp.route('/<int:item_id>')
180    class ResourceById(MethodView):
181
182        def _get_item(self, item_id):
183            try:
184                return collection.get_by_id(item_id)
185            except ItemNotFound:
186                abort(404)
187
188        @blp.etag
189        @blp.response(200, DocSchema)
190        def get(self, item_id):
191            item = self._get_item(item_id)
192            # Compute ETag using arbitrary data and no schema
193            blp.set_etag(item['db_field'])
194            return item
195
196        @blp.etag
197        @blp.arguments(DocSchema)
198        @blp.response(200, DocSchema)
199        def put(self, new_item, item_id):
200            item = self._get_item(item_id)
201            # Check ETag is a manual action, no shema used
202            blp.check_etag(item['db_field'])
203            new_item = collection.put(item_id, new_item)
204            # Compute ETag using arbitrary data and no schema
205            blp.set_etag(new_item['db_field'])
206            return new_item
207
208        @blp.etag
209        @blp.response(204)
210        def delete(self, item_id):
211            item = self._get_item(item_id)
212            # Check ETag is a manual action, no shema used
213            blp.check_etag(item['db_field'])
214            collection.delete(item_id)
215
216    return blp
217
218
219@pytest.fixture(params=[
220    (implicit_data_and_schema_etag_blueprint, 'Schema'),
221    (implicit_data_explicit_schema_etag_blueprint, 'ETag schema'),
222    (explicit_data_no_schema_etag_blueprint, 'No schema'),
223])
224def blueprint_fixture(request, collection, schemas):
225    blp_factory = request.param[0]
226    return blp_factory(collection, schemas), request.param[1]
227
228
229class TestFullExample:
230
231    def test_examples(self, app, blueprint_fixture, schemas):
232
233        blueprint, bp_schema = blueprint_fixture
234
235        api = Api(app)
236        api.register_blueprint(blueprint)
237
238        client = app.test_client()
239
240        @contextmanager
241        def assert_counters(
242                schema_load, schema_dump, etag_schema_load, etag_schema_dump):
243            """Check number of calls to dump/load methods of schemas"""
244            schemas.DocSchema.reset_load_count()
245            schemas.DocSchema.reset_dump_count()
246            schemas.DocEtagSchema.reset_load_count()
247            schemas.DocEtagSchema.reset_dump_count()
248            yield
249            assert schemas.DocSchema.load_count == schema_load
250            assert schemas.DocSchema.dump_count == schema_dump
251            assert schemas.DocEtagSchema.load_count == etag_schema_load
252            assert schemas.DocEtagSchema.dump_count == etag_schema_dump
253
254        # GET collection without ETag: OK
255        with assert_counters(0, 1, 0, 1 if bp_schema == 'ETag schema' else 0):
256            response = client.get('/test/')
257            assert response.status_code == 200
258            list_etag = response.headers['ETag']
259            assert len(response.json) == 0
260            assert json.loads(response.headers['X-Pagination']) == {
261                'total': 0, 'total_pages': 0}
262
263        # GET collection with correct ETag: Not modified
264        with assert_counters(0, 1, 0, 1 if bp_schema == 'ETag schema' else 0):
265            response = client.get(
266                '/test/',
267                headers={'If-None-Match': list_etag}
268            )
269        assert response.status_code == 304
270
271        # POST item_1
272        item_1_data = {'field': 0}
273        with assert_counters(1, 1, 0, 1 if bp_schema == 'ETag schema' else 0):
274            response = client.post(
275                '/test/',
276                data=json.dumps(item_1_data),
277                content_type='application/json'
278            )
279        assert response.status_code == 201
280        item_1_id = response.json['item_id']
281
282        # GET collection with wrong/outdated ETag: OK
283        with assert_counters(0, 1, 0, 1 if bp_schema == 'ETag schema' else 0):
284            response = client.get(
285                '/test/',
286                headers={'If-None-Match': list_etag}
287            )
288        assert response.status_code == 200
289        list_etag = response.headers['ETag']
290        assert len(response.json) == 1
291        assert response.json[0] == {'field': 0, 'item_id': 1}
292        assert json.loads(response.headers['X-Pagination']) == {
293            'total': 1, 'total_pages': 1, 'page': 1,
294            'first_page': 1, 'last_page': 1}
295
296        # GET by ID without ETag: OK
297        with assert_counters(0, 1, 0, 1 if bp_schema == 'ETag schema' else 0):
298            response = client.get('/test/{}'.format(item_1_id))
299        assert response.status_code == 200
300        item_etag = response.headers['ETag']
301
302        # GET by ID with correct ETag: Not modified
303        with assert_counters(0, 0 if bp_schema == 'No schema' else 1,
304                             0, 1 if bp_schema == 'ETag schema' else 0):
305            response = client.get(
306                '/test/{}'.format(item_1_id),
307                headers={'If-None-Match': item_etag}
308            )
309        assert response.status_code == 304
310
311        # PUT without ETag: Precondition required error
312        item_1_data['field'] = 1
313        with assert_counters(0, 0, 0, 0):
314            response = client.put(
315                '/test/{}'.format(item_1_id),
316                data=json.dumps(item_1_data),
317                content_type='application/json'
318            )
319        assert response.status_code == 428
320
321        # PUT with correct ETag: OK
322        with assert_counters(1, 2 if bp_schema == 'Schema' else 1,
323                             0, 2 if bp_schema == 'ETag schema' else 0):
324            response = client.put(
325                '/test/{}'.format(item_1_id),
326                data=json.dumps(item_1_data),
327                content_type='application/json',
328                headers={'If-Match': item_etag}
329            )
330        assert response.status_code == 200
331        new_item_etag = response.headers['ETag']
332
333        # PUT with wrong/outdated ETag: Precondition failed error
334        item_1_data['field'] = 2
335        with assert_counters(1, 1 if bp_schema == 'Schema' else 0,
336                             0, 1 if bp_schema == 'ETag schema' else 0):
337            response = client.put(
338                '/test/{}'.format(item_1_id),
339                data=json.dumps(item_1_data),
340                content_type='application/json',
341                headers={'If-Match': item_etag}
342            )
343        assert response.status_code == 412
344
345        # GET by ID with wrong/outdated ETag: OK
346        with assert_counters(0, 1, 0, 1 if bp_schema == 'ETag schema' else 0):
347            response = client.get(
348                '/test/{}'.format(item_1_id),
349                headers={'If-None-Match': item_etag}
350            )
351        assert response.status_code == 200
352
353        # GET collection with pagination set to 1 element per page
354        with assert_counters(0, 1, 0, 1 if bp_schema == 'ETag schema' else 0):
355            response = client.get(
356                '/test/',
357                headers={'If-None-Match': list_etag},
358                query_string={'page': 1, 'page_size': 1}
359            )
360        assert response.status_code == 200
361        list_etag = response.headers['ETag']
362        assert len(response.json) == 1
363        assert response.json[0] == {'field': 1, 'item_id': 1}
364        assert json.loads(response.headers['X-Pagination']) == {
365            'total': 1, 'total_pages': 1, 'page': 1,
366            'first_page': 1, 'last_page': 1}
367
368        # POST item_2
369        item_2_data = {'field': 1}
370        with assert_counters(1, 1, 0, 1 if bp_schema == 'ETag schema' else 0):
371            response = client.post(
372                '/test/',
373                data=json.dumps(item_2_data),
374                content_type='application/json'
375            )
376        assert response.status_code == 201
377
378        # GET collection with pagination set to 1 element per page
379        # Content is the same (item_1) but pagination metadata has changed
380        # so we don't get a 304 and the data is returned again
381        with assert_counters(0, 1, 0, 1 if bp_schema == 'ETag schema' else 0):
382            response = client.get(
383                '/test/',
384                headers={'If-None-Match': list_etag},
385                query_string={'page': 1, 'page_size': 1}
386            )
387        assert response.status_code == 200
388        list_etag = response.headers['ETag']
389        assert len(response.json) == 1
390        assert response.json[0] == {'field': 1, 'item_id': 1}
391        assert json.loads(response.headers['X-Pagination']) == {
392            'total': 2, 'total_pages': 2, 'page': 1,
393            'first_page': 1, 'last_page': 2, 'next_page': 2}
394
395        # DELETE without ETag: Precondition required error
396        with assert_counters(0, 0, 0, 0):
397            response = client.delete('/test/{}'.format(item_1_id))
398        assert response.status_code == 428
399
400        # DELETE with wrong/outdated ETag: Precondition failed error
401        with assert_counters(0, 1 if bp_schema == 'Schema' else 0,
402                             0, 1 if bp_schema == 'ETag schema' else 0):
403            response = client.delete(
404                '/test/{}'.format(item_1_id),
405                headers={'If-Match': item_etag}
406            )
407        assert response.status_code == 412
408
409        # DELETE with correct ETag: No Content
410        with assert_counters(0, 1 if bp_schema == 'Schema' else 0,
411                             0, 1 if bp_schema == 'ETag schema' else 0):
412            response = client.delete(
413                '/test/{}'.format(item_1_id),
414                headers={'If-Match': new_item_etag}
415            )
416        assert response.status_code == 204
417
418
419class TestCustomExamples:
420
421    @pytest.mark.parametrize('openapi_version', ('2.0', '3.0.2'))
422    def test_response_payload_wrapping(self, app, schemas, openapi_version):
423        """Demonstrates how to wrap response payload in a data field"""
424
425        class WrapperBlueprint(Blueprint):
426
427            # Wrap payload data
428            @staticmethod
429            def _prepare_response_content(data):
430                if data is not None:
431                    return {'data': data}
432                return None
433
434            # Document data wrapper
435            # The schema is not used to dump the payload, only to generate doc
436            @staticmethod
437            def _make_doc_response_schema(schema):
438                if schema:
439                    return type(
440                        'Wrap' + schema.__class__.__name__,
441                        (ma.Schema, ),
442                        {'data': ma.fields.Nested(schema)},
443                    )
444                return None
445
446        app.config['OPENAPI_VERSION'] = openapi_version
447        api = Api(app)
448        client = app.test_client()
449        blp = WrapperBlueprint('test', __name__, url_prefix='/test')
450
451        @blp.route('/')
452        @blp.response(200, schemas.DocSchema)
453        def func():
454            return {'item_id': 1, 'db_field': 42}
455
456        api.register_blueprint(blp)
457        spec = api.spec.to_dict()
458
459        # Test data is wrapped
460        resp = client.get('/test/')
461        assert resp.json == {'data': {'item_id': 1, 'field': 42}}
462
463        # Test wrapping is correctly documented
464        if openapi_version == '3.0.2':
465            content = spec['paths']['/test/']['get']['responses']['200'][
466                'content']['application/json']
467        else:
468            content = spec['paths']['/test/']['get']['responses']['200']
469        assert content['schema'] == build_ref(api.spec, 'schema', 'WrapDoc')
470        assert get_schemas(api.spec)['WrapDoc'] == {
471            'type': 'object',
472            'properties': {'data': build_ref(api.spec, 'schema', 'Doc')}
473        }
474        assert 'Doc' in get_schemas(api.spec)
475
476    @pytest.mark.parametrize('openapi_version', ('2.0', '3.0.2'))
477    def test_pagination_in_response_payload(
478            self, app, schemas, openapi_version
479    ):
480        """Demonstrates how to add pagination metadata in response payload"""
481
482        class WrapperBlueprint(Blueprint):
483
484            # Set pagination metadata in app context
485            def _set_pagination_metadata(self, page_params, result, headers):
486                page_meta = self._make_pagination_metadata(
487                    page_params.page, page_params.page_size,
488                    page_params.item_count)
489                get_appcontext()['pagination_metadata'] = page_meta
490                return result, headers
491
492            # Wrap payload data and add pagination metadata if any
493            @staticmethod
494            def _prepare_response_content(data):
495                if data is not None:
496                    ret = {'data': data}
497                    page_meta = get_appcontext().get('pagination_metadata')
498                    if page_meta is not None:
499                        ret['pagination'] = page_meta
500                    return ret
501                return None
502
503            # Document data wrapper and pagination in payload
504            @staticmethod
505            def _prepare_response_doc(doc, doc_info, spec, **kwargs):
506                operation = doc_info.get('response', {})
507                if operation:
508                    success_code = doc_info['success_status_code']
509                    response = operation.get('responses', {}).get(success_code)
510                    if response is not None:
511                        if 'schema' in response:
512                            schema = response['schema']
513                            response['schema'] = type(
514                                'Wrap' + schema.__class__.__name__,
515                                (ma.Schema, ),
516                                {'data': ma.fields.Nested(schema)},
517                            )
518                            if 'pagination' in doc_info:
519                                schema = response['schema']
520                                response['schema'] = type(
521                                    'Pagination' + schema.__name__,
522                                    (schema, ),
523                                    {
524                                        'pagination': ma.fields.Nested(
525                                            PaginationMetadataSchema)
526                                    },
527                                )
528                return super(
529                    WrapperBlueprint, WrapperBlueprint
530                )._prepare_response_doc(doc, doc_info, spec=spec, **kwargs)
531
532        app.config['OPENAPI_VERSION'] = openapi_version
533        api = Api(app)
534        client = app.test_client()
535        blp = WrapperBlueprint('test', __name__, url_prefix='/test')
536
537        @blp.route('/')
538        @blp.response(200, schemas.DocSchema(many=True))
539        @blp.paginate(Page)
540        def func():
541            return [
542                {'item_id': 1, 'db_field': 42},
543                {'item_id': 2, 'db_field': 69},
544            ]
545
546        api.register_blueprint(blp)
547        spec = api.spec.to_dict()
548
549        # Test data is wrapped and pagination metadata added
550        resp = client.get('/test/')
551        assert resp.json == {
552            'data': [{'field': 42, 'item_id': 1}, {'field': 69, 'item_id': 2}],
553            'pagination': {
554                'page': 1, 'first_page': 1, 'last_page': 1,
555                'total': 2, 'total_pages': 1,
556            }
557        }
558
559        # Test pagination is correctly documented
560        if openapi_version == '3.0.2':
561            content = spec['paths']['/test/']['get']['responses']['200'][
562                'content']['application/json']
563        else:
564            content = spec['paths']['/test/']['get']['responses']['200']
565        assert content['schema'] == build_ref(
566            api.spec, 'schema', 'PaginationWrapDoc')
567        assert get_schemas(api.spec)['PaginationWrapDoc'] == {
568            'type': 'object',
569            'properties': {
570                'data': {
571                    'items': build_ref(api.spec, 'schema', 'Doc'),
572                    'type': 'array',
573                },
574                'pagination': build_ref(
575                    api.spec, 'schema', 'PaginationMetadata'),
576            }
577        }
578        assert 'Doc' in get_schemas(api.spec)
579