1"""Test pagination feature"""
2
3from itertools import product
4from collections import namedtuple
5import json
6
7import pytest
8
9from flask.views import MethodView
10
11from flask_smorest import Api, Blueprint, Page
12from flask_smorest.pagination import PaginationParameters
13
14from .utils import get_schemas
15
16CUSTOM_PAGINATION_PARAMS = (2, 5, 10)
17
18
19def pagination_blueprint(collection, schemas, as_method_view, custom_params):
20    """Return a basic API sample with pagination"""
21
22    blp = Blueprint('test', __name__, url_prefix='/test')
23
24    if custom_params:
25        page, page_size, max_page_size = CUSTOM_PAGINATION_PARAMS
26    else:
27        page, page_size, max_page_size = None, None, None
28
29    if as_method_view:
30        @blp.route('/')
31        class Resource(MethodView):
32            @blp.response(200, schemas.DocSchema(many=True))
33            @blp.paginate(
34                page=page, page_size=page_size, max_page_size=max_page_size)
35            def get(self, pagination_parameters):
36                pagination_parameters.item_count = len(collection.items)
37                return collection.items[
38                    pagination_parameters.first_item:
39                    pagination_parameters.last_item + 1
40                ]
41    else:
42        @blp.route('/')
43        @blp.response(200, schemas.DocSchema(many=True))
44        @blp.paginate(
45            page=page, page_size=page_size, max_page_size=max_page_size)
46        def get_resources(pagination_parameters):
47            pagination_parameters.item_count = len(collection.items)
48            return collection.items[
49                pagination_parameters.first_item:
50                pagination_parameters.last_item + 1
51            ]
52
53    return blp
54
55
56def post_pagination_blueprint(
57        collection, schemas, as_method_view, custom_params):
58    """Return a basic API sample with post-pagination"""
59
60    blp = Blueprint('test', __name__, url_prefix='/test')
61
62    if custom_params:
63        page, page_size, max_page_size = CUSTOM_PAGINATION_PARAMS
64    else:
65        page, page_size, max_page_size = None, None, None
66
67    if as_method_view:
68        @blp.route('/')
69        class Resource(MethodView):
70            @blp.response(200, schemas.DocSchema(many=True))
71            @blp.paginate(Page, page=page,
72                          page_size=page_size, max_page_size=max_page_size)
73            def get(self):
74                return collection.items
75    else:
76        @blp.route('/')
77        @blp.response(200, schemas.DocSchema(many=True))
78        @blp.paginate(Page, page=page,
79                      page_size=page_size, max_page_size=max_page_size)
80        def get_resources():
81            return collection.items
82
83    return blp
84
85
86@pytest.fixture(params=product(
87    # Pagination in function/ post-pagination
88    (pagination_blueprint, post_pagination_blueprint),
89    # MethodView
90    (True, False),
91    # Custom parameters
92    (True, False),
93))
94def app_fixture(request, collection, schemas, app):
95    """Return an app client for each configuration
96
97    - pagination in function / post-pagination
98    - function / method view
99    - default / custom pagination parameters
100    """
101    blp_factory, as_method_view, custom_params = request.param
102    blueprint = blp_factory(collection, schemas, as_method_view, custom_params)
103    api = Api(app)
104    api.register_blueprint(blueprint)
105    return namedtuple('AppFixture', ('client', 'custom_params'))(
106        app.test_client(), custom_params)
107
108
109class TestPagination:
110
111    def test_pagination_parameters_repr(self):
112        assert(repr(PaginationParameters(1, 10)) ==
113               "PaginationParameters(page=1,page_size=10)")
114
115    def test_page_repr(self):
116        page_params = PaginationParameters(1, 2)
117        assert (repr(Page([1, 2, 3, 4, 5], page_params)) ==
118                "Page(collection=[1, 2, 3, 4, 5],page_params={})"
119                .format(repr(page_params)))
120
121    @pytest.mark.parametrize('header_name', ('X-Dummy-Name', None))
122    def test_pagination_custom_header_field_name(self, app, header_name):
123        """Test PAGINATION_HEADER_FIELD_NAME overriding"""
124        api = Api(app)
125
126        class CustomBlueprint(Blueprint):
127            PAGINATION_HEADER_FIELD_NAME = header_name
128
129        blp = CustomBlueprint('test', __name__, url_prefix='/test')
130
131        @blp.route('/')
132        @blp.response(200)
133        @blp.paginate()
134        def func(pagination_parameters):
135            pagination_parameters.item_count = 2
136            return [1, 2]
137
138        api.register_blueprint(blp)
139        client = app.test_client()
140        response = client.get('/test/')
141        assert response.status_code == 200
142        assert 'X-Pagination' not in response.headers
143        if header_name is not None:
144            assert response.headers[header_name] == (
145                '{"total": 2, "total_pages": 1, '
146                '"first_page": 1, "last_page": 1, "page": 1}'
147            )
148            # Also check there is only one pagination header
149            assert len(response.headers.getlist(header_name)) == 1
150
151    def test_pagination_header_documentation(self, app):
152        """Test pagination header is documented"""
153        api = Api(app)
154
155        class CustomBlueprint(Blueprint):
156            PAGINATION_HEADER_FIELD_NAME = 'X-Custom-Pagination-Header'
157
158        blp = CustomBlueprint('test', __name__, url_prefix='/test')
159
160        @blp.route('/')
161        @blp.response(200)
162        @blp.paginate()
163        def func(pagination_parameters):
164            """Dummy view func"""
165
166        api.register_blueprint(blp)
167        spec = api.spec.to_dict()
168        get = spec['paths']['/test/']['get']
169        assert 'PaginationMetadata' in get_schemas(api.spec)
170        assert get['responses']['200']['headers'] == {
171            'X-Custom-Pagination-Header': {
172                'description': 'Pagination metadata',
173                'schema': {'$ref': '#/components/schemas/PaginationMetadata'},
174            }
175        }
176
177    @pytest.mark.parametrize('header_name', ('X-Pagination', None))
178    def test_pagination_item_count_missing(self, app, header_name):
179        """If item_count was not set, pass and warn"""
180        api = Api(app)
181
182        class CustomBlueprint(Blueprint):
183            PAGINATION_HEADER_FIELD_NAME = header_name
184
185        blp = CustomBlueprint('test', __name__, url_prefix='/test')
186
187        @blp.route('/')
188        @blp.response(200)
189        @blp.paginate()
190        def func(pagination_parameters):
191            # Here, we purposely forget to set item_count
192            # pagination_parameters.item_count = 2
193            return [1, 2]
194
195        api.register_blueprint(blp)
196        client = app.test_client()
197
198        with pytest.warns(None) as record:
199            response = client.get('/test/')
200        if header_name is None:
201            assert not record
202        else:
203            assert len(record) == 1
204            assert record[0].category == UserWarning
205            assert str(record[0].message) == (
206                'item_count not set in endpoint test.func.'
207            )
208        assert response.status_code == 200
209        assert 'X-Pagination' not in response.headers
210
211    @pytest.mark.parametrize('collection', [1000, ], indirect=True)
212    def test_pagination_parameters(self, app_fixture):
213        # page = 2, page_size = 5
214        response = app_fixture.client.get(
215            '/test/', query_string={'page': 2, 'page_size': 5})
216        assert response.status_code == 200
217        data = response.json
218        headers = response.headers
219        assert len(data) == 5
220        assert data[0] == {'field': 5, 'item_id': 6}
221        assert data[4] == {'field': 9, 'item_id': 10}
222        assert json.loads(headers['X-Pagination']) == {
223            'total': 1000, 'total_pages': 200,
224            'page': 2, 'first_page': 1, 'last_page': 200,
225            'previous_page': 1, 'next_page': 3,
226        }
227        # page = 334, page_size = 3
228        # last page is incomplete if total not multiple of page_size
229        response = app_fixture.client.get(
230            '/test/', query_string={'page': 334, 'page_size': 3})
231        assert response.status_code == 200
232        data = response.json
233        headers = response.headers
234        assert len(data) == 1
235        assert json.loads(headers['X-Pagination']) == {
236            'total': 1000, 'total_pages': 334,
237            'page': 334, 'first_page': 1, 'last_page': 334,
238            'previous_page': 333,
239        }
240
241    @pytest.mark.parametrize('collection', [1000, ], indirect=True)
242    def test_pagination_parameters_default_page_page_size(self, app_fixture):
243        # Default: page = 1, page_size = 10
244        # Custom: page = 2, page_size = 5
245        response = app_fixture.client.get('/test/')
246        assert response.status_code == 200
247        data = response.json
248        headers = response.headers
249        if app_fixture.custom_params is False:
250            assert len(data) == 10
251            assert data[0] == {'field': 0, 'item_id': 1}
252            assert data[9] == {'field': 9, 'item_id': 10}
253            assert json.loads(headers['X-Pagination']) == {
254                'total': 1000, 'total_pages': 100,
255                'page': 1, 'first_page': 1, 'last_page': 100,
256                'next_page': 2,
257            }
258        else:
259            assert len(data) == 5
260            assert data[0] == {'field': 5, 'item_id': 6}
261            assert data[4] == {'field': 9, 'item_id': 10}
262            assert json.loads(headers['X-Pagination']) == {
263                'total': 1000, 'total_pages': 200,
264                'page': 2, 'first_page': 1, 'last_page': 200,
265                'previous_page': 1, 'next_page': 3,
266            }
267
268    def test_pagination_empty_collection(self, app_fixture):
269        # empty collection -> 200 with empty list, partial pagination metadata
270        response = app_fixture.client.get('/test/')
271        assert response.status_code == 200
272        assert json.loads(response.headers['X-Pagination']) == {
273            'total': 0, 'total_pages': 0,
274        }
275        assert response.json == []
276
277    @pytest.mark.parametrize('collection', [1000, ], indirect=True)
278    def test_pagination_page_out_of_range(self, app_fixture):
279        # page = 120, page_size = 10
280        # page out of range -> 200 with empty list, partial pagination metadata
281        response = app_fixture.client.get(
282            '/test/', query_string={'page': 120, 'page_size': 10})
283        assert response.status_code == 200
284        assert json.loads(response.headers['X-Pagination']) == {
285            'total': 1000, 'total_pages': 100,
286            'first_page': 1, 'last_page': 100,
287        }
288        assert response.json == []
289
290    @pytest.mark.parametrize('collection', [1000, ], indirect=True)
291    def test_pagination_min_page_page_size(self, app_fixture):
292        client = app_fixture.client
293        # page < 1 => 422
294        response = client.get('/test/', query_string={'page': 0})
295        assert response.status_code == 422
296        response = client.get('/test/', query_string={'page': -42})
297        assert response.status_code == 422
298        # page_size < 1 => 422
299        response = client.get('/test/', query_string={'page_size': 0})
300        assert response.status_code == 422
301        response = client.get('/test/', query_string={'page_size': -42})
302        assert response.status_code == 422
303
304    @pytest.mark.parametrize('collection', [1000, ], indirect=True)
305    def test_pagination_max_page_size(self, app_fixture):
306        client = app_fixture.client
307        # default: page_size > 100 => 422
308        # custom: page_size > 10 => 422
309        response = client.get('/test/', query_string={'page_size': 101})
310        assert response.status_code == 422
311        response = client.get('/test/', query_string={'page_size': 11})
312        if app_fixture.custom_params is False:
313            assert response.status_code == 200
314        else:
315            assert response.status_code == 422
316
317    def test_pagination_parameters_not_in_query_string(self, app):
318        api = Api(app)
319        blp = Blueprint('test', __name__, url_prefix='/test')
320
321        @blp.route('/')
322        @blp.response(200)
323        @blp.paginate(Page)
324        def func():
325            return range(30)
326
327        api.register_blueprint(blp)
328        client = app.test_client()
329
330        # Pagination params in query string: OK
331        response = client.get(
332            '/test/',
333            query_string={'page': 2, 'page_size': 20}
334        )
335        assert response.json == list(range(20, 30))
336
337        # Pagination params in another location are ignored
338        response = client.get(
339            '/test/',
340            data=json.dumps({'page': 2, 'page_size': 20}),
341        )
342        assert response.json == list(range(0, 10))
343
344    def test_pagination_parameters_and_query_string_args(self, app, schemas):
345        api = Api(app)
346        blp = Blueprint('test', __name__, url_prefix='/test')
347
348        @blp.route('/')
349        @blp.arguments(schemas.QueryArgsSchema, location="query")
350        @blp.response(200)
351        @blp.paginate(Page)
352        def func(query_args):
353            assert query_args['arg1'] == 'Test'
354            assert query_args['arg2'] == 12
355            return range(30)
356
357        api.register_blueprint(blp)
358        client = app.test_client()
359
360        # Pagination params in query string: OK
361        response = client.get(
362            '/test/',
363            query_string={
364                'page': 2, 'page_size': 20, 'arg1': 'Test', 'arg2': 12}
365        )
366        assert response.json == list(range(20, 30))
367