1import datetime
2import base64
3
4import pytest
5from django.db import models
6from django.db.models import Q
7from django.utils.functional import SimpleLazyObject
8from graphql_relay import to_global_id
9from py.test import raises
10
11import graphene
12from graphene.relay import Node
13
14from ..compat import JSONField, MissingType
15from ..fields import DjangoConnectionField
16from ..types import DjangoObjectType
17from ..utils import DJANGO_FILTER_INSTALLED
18from .models import Article, CNNReporter, Film, FilmDetails, Reporter
19
20
21def test_should_query_only_fields():
22    with raises(Exception):
23
24        class ReporterType(DjangoObjectType):
25            class Meta:
26                model = Reporter
27                fields = ("articles",)
28
29        schema = graphene.Schema(query=ReporterType)
30        query = """
31            query ReporterQuery {
32              articles
33            }
34        """
35        result = schema.execute(query)
36        assert not result.errors
37
38
39def test_should_query_simplelazy_objects():
40    class ReporterType(DjangoObjectType):
41        class Meta:
42            model = Reporter
43            fields = ("id",)
44
45    class Query(graphene.ObjectType):
46        reporter = graphene.Field(ReporterType)
47
48        def resolve_reporter(self, info):
49            return SimpleLazyObject(lambda: Reporter(id=1))
50
51    schema = graphene.Schema(query=Query)
52    query = """
53        query {
54          reporter {
55            id
56          }
57        }
58    """
59    result = schema.execute(query)
60    assert not result.errors
61    assert result.data == {"reporter": {"id": "1"}}
62
63
64def test_should_query_wrapped_simplelazy_objects():
65    class ReporterType(DjangoObjectType):
66        class Meta:
67            model = Reporter
68            fields = ("id",)
69
70    class Query(graphene.ObjectType):
71        reporter = graphene.Field(ReporterType)
72
73        def resolve_reporter(self, info):
74            return SimpleLazyObject(lambda: SimpleLazyObject(lambda: Reporter(id=1)))
75
76    schema = graphene.Schema(query=Query)
77    query = """
78        query {
79          reporter {
80            id
81          }
82        }
83    """
84    result = schema.execute(query)
85    assert not result.errors
86    assert result.data == {"reporter": {"id": "1"}}
87
88
89def test_should_query_well():
90    class ReporterType(DjangoObjectType):
91        class Meta:
92            model = Reporter
93
94    class Query(graphene.ObjectType):
95        reporter = graphene.Field(ReporterType)
96
97        def resolve_reporter(self, info):
98            return Reporter(first_name="ABA", last_name="X")
99
100    query = """
101        query ReporterQuery {
102          reporter {
103            firstName,
104            lastName,
105            email
106          }
107        }
108    """
109    expected = {"reporter": {"firstName": "ABA", "lastName": "X", "email": ""}}
110    schema = graphene.Schema(query=Query)
111    result = schema.execute(query)
112    assert not result.errors
113    assert result.data == expected
114
115
116@pytest.mark.skipif(JSONField is MissingType, reason="RangeField should exist")
117def test_should_query_postgres_fields():
118    from django.contrib.postgres.fields import (
119        IntegerRangeField,
120        ArrayField,
121        JSONField,
122        HStoreField,
123    )
124
125    class Event(models.Model):
126        ages = IntegerRangeField(help_text="The age ranges")
127        data = JSONField(help_text="Data")
128        store = HStoreField()
129        tags = ArrayField(models.CharField(max_length=50))
130
131    class EventType(DjangoObjectType):
132        class Meta:
133            model = Event
134
135    class Query(graphene.ObjectType):
136        event = graphene.Field(EventType)
137
138        def resolve_event(self, info):
139            return Event(
140                ages=(0, 10),
141                data={"angry_babies": True},
142                store={"h": "store"},
143                tags=["child", "angry", "babies"],
144            )
145
146    schema = graphene.Schema(query=Query)
147    query = """
148        query myQuery {
149          event {
150            ages
151            tags
152            data
153            store
154          }
155        }
156    """
157    expected = {
158        "event": {
159            "ages": [0, 10],
160            "tags": ["child", "angry", "babies"],
161            "data": '{"angry_babies": true}',
162            "store": '{"h": "store"}',
163        }
164    }
165    result = schema.execute(query)
166    assert not result.errors
167    assert result.data == expected
168
169
170def test_should_node():
171    class ReporterNode(DjangoObjectType):
172        class Meta:
173            model = Reporter
174            interfaces = (Node,)
175
176        @classmethod
177        def get_node(cls, info, id):
178            return Reporter(id=2, first_name="Cookie Monster")
179
180        def resolve_articles(self, info, **args):
181            return [Article(headline="Hi!")]
182
183    class ArticleNode(DjangoObjectType):
184        class Meta:
185            model = Article
186            interfaces = (Node,)
187
188        @classmethod
189        def get_node(cls, info, id):
190            return Article(
191                id=1, headline="Article node", pub_date=datetime.date(2002, 3, 11)
192            )
193
194    class Query(graphene.ObjectType):
195        node = Node.Field()
196        reporter = graphene.Field(ReporterNode)
197        article = graphene.Field(ArticleNode)
198
199        def resolve_reporter(self, info):
200            return Reporter(id=1, first_name="ABA", last_name="X")
201
202    query = """
203        query ReporterQuery {
204          reporter {
205            id,
206            firstName,
207            articles {
208              edges {
209                node {
210                  headline
211                }
212              }
213            }
214            lastName,
215            email
216          }
217          myArticle: node(id:"QXJ0aWNsZU5vZGU6MQ==") {
218            id
219            ... on ReporterNode {
220                firstName
221            }
222            ... on ArticleNode {
223                headline
224                pubDate
225            }
226          }
227        }
228    """
229    expected = {
230        "reporter": {
231            "id": "UmVwb3J0ZXJOb2RlOjE=",
232            "firstName": "ABA",
233            "lastName": "X",
234            "email": "",
235            "articles": {"edges": [{"node": {"headline": "Hi!"}}]},
236        },
237        "myArticle": {
238            "id": "QXJ0aWNsZU5vZGU6MQ==",
239            "headline": "Article node",
240            "pubDate": "2002-03-11",
241        },
242    }
243    schema = graphene.Schema(query=Query)
244    result = schema.execute(query)
245    assert not result.errors
246    assert result.data == expected
247
248
249def test_should_query_onetoone_fields():
250    film = Film(id=1)
251    film_details = FilmDetails(id=1, film=film)
252
253    class FilmNode(DjangoObjectType):
254        class Meta:
255            model = Film
256            interfaces = (Node,)
257
258    class FilmDetailsNode(DjangoObjectType):
259        class Meta:
260            model = FilmDetails
261            interfaces = (Node,)
262
263    class Query(graphene.ObjectType):
264        film = graphene.Field(FilmNode)
265        film_details = graphene.Field(FilmDetailsNode)
266
267        def resolve_film(root, info):
268            return film
269
270        def resolve_film_details(root, info):
271            return film_details
272
273    query = """
274        query FilmQuery {
275          filmDetails {
276            id
277            film {
278              id
279            }
280          }
281          film {
282            id
283            details {
284              id
285            }
286          }
287        }
288    """
289    expected = {
290        "filmDetails": {
291            "id": "RmlsbURldGFpbHNOb2RlOjE=",
292            "film": {"id": "RmlsbU5vZGU6MQ=="},
293        },
294        "film": {
295            "id": "RmlsbU5vZGU6MQ==",
296            "details": {"id": "RmlsbURldGFpbHNOb2RlOjE="},
297        },
298    }
299    schema = graphene.Schema(query=Query)
300    result = schema.execute(query)
301    assert not result.errors
302    assert result.data == expected
303
304
305def test_should_query_connectionfields():
306    class ReporterType(DjangoObjectType):
307        class Meta:
308            model = Reporter
309            interfaces = (Node,)
310            fields = ("articles",)
311
312    class Query(graphene.ObjectType):
313        all_reporters = DjangoConnectionField(ReporterType)
314
315        def resolve_all_reporters(self, info, **args):
316            return [Reporter(id=1)]
317
318    schema = graphene.Schema(query=Query)
319    query = """
320        query ReporterConnectionQuery {
321          allReporters {
322            pageInfo {
323              hasNextPage
324            }
325            edges {
326              node {
327                id
328              }
329            }
330          }
331        }
332    """
333    result = schema.execute(query)
334    assert not result.errors
335    assert result.data == {
336        "allReporters": {
337            "pageInfo": {"hasNextPage": False},
338            "edges": [{"node": {"id": "UmVwb3J0ZXJUeXBlOjE="}}],
339        }
340    }
341
342
343def test_should_keep_annotations():
344    from django.db.models import Count, Avg
345
346    class ReporterType(DjangoObjectType):
347        class Meta:
348            model = Reporter
349            interfaces = (Node,)
350            fields = ("articles",)
351
352    class ArticleType(DjangoObjectType):
353        class Meta:
354            model = Article
355            interfaces = (Node,)
356            filter_fields = ("lang",)
357
358    class Query(graphene.ObjectType):
359        all_reporters = DjangoConnectionField(ReporterType)
360        all_articles = DjangoConnectionField(ArticleType)
361
362        def resolve_all_reporters(self, info, **args):
363            return Reporter.objects.annotate(articles_c=Count("articles")).order_by(
364                "articles_c"
365            )
366
367        def resolve_all_articles(self, info, **args):
368            return Article.objects.annotate(import_avg=Avg("importance")).order_by(
369                "import_avg"
370            )
371
372    schema = graphene.Schema(query=Query)
373    query = """
374        query ReporterConnectionQuery {
375          allReporters {
376            pageInfo {
377              hasNextPage
378            }
379            edges {
380              node {
381                id
382              }
383            }
384          }
385          allArticles {
386            pageInfo {
387              hasNextPage
388            }
389            edges {
390              node {
391                id
392              }
393            }
394          }
395        }
396    """
397    result = schema.execute(query)
398    assert not result.errors
399
400
401@pytest.mark.skipif(
402    not DJANGO_FILTER_INSTALLED, reason="django-filter should be installed"
403)
404def test_should_query_node_filtering():
405    class ReporterType(DjangoObjectType):
406        class Meta:
407            model = Reporter
408            interfaces = (Node,)
409
410    class ArticleType(DjangoObjectType):
411        class Meta:
412            model = Article
413            interfaces = (Node,)
414            filter_fields = ("lang",)
415
416    class Query(graphene.ObjectType):
417        all_reporters = DjangoConnectionField(ReporterType)
418
419    r = Reporter.objects.create(
420        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
421    )
422    Article.objects.create(
423        headline="Article Node 1",
424        pub_date=datetime.date.today(),
425        pub_date_time=datetime.datetime.now(),
426        reporter=r,
427        editor=r,
428        lang="es",
429    )
430    Article.objects.create(
431        headline="Article Node 2",
432        pub_date=datetime.date.today(),
433        pub_date_time=datetime.datetime.now(),
434        reporter=r,
435        editor=r,
436        lang="en",
437    )
438
439    schema = graphene.Schema(query=Query)
440    query = """
441        query NodeFilteringQuery {
442            allReporters {
443                edges {
444                    node {
445                        id
446                        articles(lang: "es") {
447                            edges {
448                                node {
449                                    id
450                                }
451                            }
452                        }
453                    }
454                }
455            }
456        }
457    """
458
459    expected = {
460        "allReporters": {
461            "edges": [
462                {
463                    "node": {
464                        "id": "UmVwb3J0ZXJUeXBlOjE=",
465                        "articles": {
466                            "edges": [{"node": {"id": "QXJ0aWNsZVR5cGU6MQ=="}}]
467                        },
468                    }
469                }
470            ]
471        }
472    }
473
474    result = schema.execute(query)
475    assert not result.errors
476    assert result.data == expected
477
478
479@pytest.mark.skipif(
480    not DJANGO_FILTER_INSTALLED, reason="django-filter should be installed"
481)
482def test_should_query_node_filtering_with_distinct_queryset():
483    class FilmType(DjangoObjectType):
484        class Meta:
485            model = Film
486            interfaces = (Node,)
487            filter_fields = ("genre",)
488
489    class Query(graphene.ObjectType):
490        films = DjangoConnectionField(FilmType)
491
492        # def resolve_all_reporters_with_berlin_films(self, args, context, info):
493        #    return Reporter.objects.filter(Q(films__film__location__contains="Berlin") | Q(a_choice=1))
494
495        def resolve_films(self, info, **args):
496            return Film.objects.filter(
497                Q(details__location__contains="Berlin") | Q(genre__in=["ot"])
498            ).distinct()
499
500    f = Film.objects.create()
501    fd = FilmDetails.objects.create(location="Berlin", film=f)
502
503    schema = graphene.Schema(query=Query)
504    query = """
505        query NodeFilteringQuery {
506            films {
507                edges {
508                    node {
509                        genre
510                    }
511                }
512            }
513        }
514    """
515
516    expected = {"films": {"edges": [{"node": {"genre": "OT"}}]}}
517
518    result = schema.execute(query)
519    assert not result.errors
520    assert result.data == expected
521
522
523@pytest.mark.skipif(
524    not DJANGO_FILTER_INSTALLED, reason="django-filter should be installed"
525)
526def test_should_query_node_multiple_filtering():
527    class ReporterType(DjangoObjectType):
528        class Meta:
529            model = Reporter
530            interfaces = (Node,)
531
532    class ArticleType(DjangoObjectType):
533        class Meta:
534            model = Article
535            interfaces = (Node,)
536            filter_fields = ("lang", "headline")
537
538    class Query(graphene.ObjectType):
539        all_reporters = DjangoConnectionField(ReporterType)
540
541    r = Reporter.objects.create(
542        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
543    )
544    Article.objects.create(
545        headline="Article Node 1",
546        pub_date=datetime.date.today(),
547        pub_date_time=datetime.datetime.now(),
548        reporter=r,
549        editor=r,
550        lang="es",
551    )
552    Article.objects.create(
553        headline="Article Node 2",
554        pub_date=datetime.date.today(),
555        pub_date_time=datetime.datetime.now(),
556        reporter=r,
557        editor=r,
558        lang="es",
559    )
560    Article.objects.create(
561        headline="Article Node 3",
562        pub_date=datetime.date.today(),
563        pub_date_time=datetime.datetime.now(),
564        reporter=r,
565        editor=r,
566        lang="en",
567    )
568
569    schema = graphene.Schema(query=Query)
570    query = """
571        query NodeFilteringQuery {
572            allReporters {
573                edges {
574                    node {
575                        id
576                        articles(lang: "es", headline: "Article Node 1") {
577                            edges {
578                                node {
579                                    id
580                                }
581                            }
582                        }
583                    }
584                }
585            }
586        }
587    """
588
589    expected = {
590        "allReporters": {
591            "edges": [
592                {
593                    "node": {
594                        "id": "UmVwb3J0ZXJUeXBlOjE=",
595                        "articles": {
596                            "edges": [{"node": {"id": "QXJ0aWNsZVR5cGU6MQ=="}}]
597                        },
598                    }
599                }
600            ]
601        }
602    }
603
604    result = schema.execute(query)
605    assert not result.errors
606    assert result.data == expected
607
608
609def test_should_enforce_first_or_last(graphene_settings):
610    graphene_settings.RELAY_CONNECTION_ENFORCE_FIRST_OR_LAST = True
611
612    class ReporterType(DjangoObjectType):
613        class Meta:
614            model = Reporter
615            interfaces = (Node,)
616
617    class Query(graphene.ObjectType):
618        all_reporters = DjangoConnectionField(ReporterType)
619
620    r = Reporter.objects.create(
621        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
622    )
623
624    schema = graphene.Schema(query=Query)
625    query = """
626        query NodeFilteringQuery {
627            allReporters {
628                edges {
629                    node {
630                        id
631                    }
632                }
633            }
634        }
635    """
636
637    expected = {"allReporters": None}
638
639    result = schema.execute(query)
640    assert len(result.errors) == 1
641    assert str(result.errors[0]) == (
642        "You must provide a `first` or `last` value to properly "
643        "paginate the `allReporters` connection."
644    )
645    assert result.data == expected
646
647
648def test_should_error_if_first_is_greater_than_max(graphene_settings):
649    graphene_settings.RELAY_CONNECTION_MAX_LIMIT = 100
650
651    class ReporterType(DjangoObjectType):
652        class Meta:
653            model = Reporter
654            interfaces = (Node,)
655
656    class Query(graphene.ObjectType):
657        all_reporters = DjangoConnectionField(ReporterType)
658
659    assert Query.all_reporters.max_limit == 100
660
661    r = Reporter.objects.create(
662        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
663    )
664
665    schema = graphene.Schema(query=Query)
666    query = """
667        query NodeFilteringQuery {
668            allReporters(first: 101) {
669                edges {
670                    node {
671                        id
672                    }
673                }
674            }
675        }
676    """
677
678    expected = {"allReporters": None}
679
680    result = schema.execute(query)
681    assert len(result.errors) == 1
682    assert str(result.errors[0]) == (
683        "Requesting 101 records on the `allReporters` connection "
684        "exceeds the `first` limit of 100 records."
685    )
686    assert result.data == expected
687
688
689def test_should_error_if_last_is_greater_than_max(graphene_settings):
690    graphene_settings.RELAY_CONNECTION_MAX_LIMIT = 100
691
692    class ReporterType(DjangoObjectType):
693        class Meta:
694            model = Reporter
695            interfaces = (Node,)
696
697    class Query(graphene.ObjectType):
698        all_reporters = DjangoConnectionField(ReporterType)
699
700    assert Query.all_reporters.max_limit == 100
701
702    r = Reporter.objects.create(
703        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
704    )
705
706    schema = graphene.Schema(query=Query)
707    query = """
708        query NodeFilteringQuery {
709            allReporters(last: 101) {
710                edges {
711                    node {
712                        id
713                    }
714                }
715            }
716        }
717    """
718
719    expected = {"allReporters": None}
720
721    result = schema.execute(query)
722    assert len(result.errors) == 1
723    assert str(result.errors[0]) == (
724        "Requesting 101 records on the `allReporters` connection "
725        "exceeds the `last` limit of 100 records."
726    )
727    assert result.data == expected
728
729
730def test_should_query_promise_connectionfields():
731    from promise import Promise
732
733    class ReporterType(DjangoObjectType):
734        class Meta:
735            model = Reporter
736            interfaces = (Node,)
737
738    class Query(graphene.ObjectType):
739        all_reporters = DjangoConnectionField(ReporterType)
740
741        def resolve_all_reporters(self, info, **args):
742            return Promise.resolve([Reporter(id=1)])
743
744    schema = graphene.Schema(query=Query)
745    query = """
746        query ReporterPromiseConnectionQuery {
747            allReporters(first: 1) {
748                edges {
749                    node {
750                        id
751                    }
752                }
753            }
754        }
755    """
756
757    expected = {"allReporters": {"edges": [{"node": {"id": "UmVwb3J0ZXJUeXBlOjE="}}]}}
758
759    result = schema.execute(query)
760    assert not result.errors
761    assert result.data == expected
762
763
764def test_should_query_connectionfields_with_last():
765
766    r = Reporter.objects.create(
767        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
768    )
769
770    class ReporterType(DjangoObjectType):
771        class Meta:
772            model = Reporter
773            interfaces = (Node,)
774
775    class Query(graphene.ObjectType):
776        all_reporters = DjangoConnectionField(ReporterType)
777
778        def resolve_all_reporters(self, info, **args):
779            return Reporter.objects.all()
780
781    schema = graphene.Schema(query=Query)
782    query = """
783        query ReporterLastQuery {
784            allReporters(last: 1) {
785                edges {
786                    node {
787                        id
788                    }
789                }
790            }
791        }
792    """
793
794    expected = {"allReporters": {"edges": [{"node": {"id": "UmVwb3J0ZXJUeXBlOjE="}}]}}
795
796    result = schema.execute(query)
797    assert not result.errors
798    assert result.data == expected
799
800
801def test_should_query_connectionfields_with_manager():
802
803    r = Reporter.objects.create(
804        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
805    )
806
807    r = Reporter.objects.create(
808        first_name="John", last_name="NotDoe", email="johndoe@example.com", a_choice=1
809    )
810
811    class ReporterType(DjangoObjectType):
812        class Meta:
813            model = Reporter
814            interfaces = (Node,)
815
816    class Query(graphene.ObjectType):
817        all_reporters = DjangoConnectionField(ReporterType, on="doe_objects")
818
819        def resolve_all_reporters(self, info, **args):
820            return Reporter.objects.all()
821
822    schema = graphene.Schema(query=Query)
823    query = """
824        query ReporterLastQuery {
825            allReporters(first: 1) {
826                edges {
827                    node {
828                        id
829                    }
830                }
831            }
832        }
833    """
834
835    expected = {"allReporters": {"edges": [{"node": {"id": "UmVwb3J0ZXJUeXBlOjE="}}]}}
836
837    result = schema.execute(query)
838    assert not result.errors
839    assert result.data == expected
840
841
842def test_should_query_dataloader_fields():
843    from promise import Promise
844    from promise.dataloader import DataLoader
845
846    def article_batch_load_fn(keys):
847        queryset = Article.objects.filter(reporter_id__in=keys)
848        return Promise.resolve(
849            [
850                [article for article in queryset if article.reporter_id == id]
851                for id in keys
852            ]
853        )
854
855    article_loader = DataLoader(article_batch_load_fn)
856
857    class ArticleType(DjangoObjectType):
858        class Meta:
859            model = Article
860            interfaces = (Node,)
861
862    class ReporterType(DjangoObjectType):
863        class Meta:
864            model = Reporter
865            interfaces = (Node,)
866            use_connection = True
867
868        articles = DjangoConnectionField(ArticleType)
869
870        def resolve_articles(self, info, **args):
871            return article_loader.load(self.id)
872
873    class Query(graphene.ObjectType):
874        all_reporters = DjangoConnectionField(ReporterType)
875
876    r = Reporter.objects.create(
877        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
878    )
879
880    Article.objects.create(
881        headline="Article Node 1",
882        pub_date=datetime.date.today(),
883        pub_date_time=datetime.datetime.now(),
884        reporter=r,
885        editor=r,
886        lang="es",
887    )
888    Article.objects.create(
889        headline="Article Node 2",
890        pub_date=datetime.date.today(),
891        pub_date_time=datetime.datetime.now(),
892        reporter=r,
893        editor=r,
894        lang="en",
895    )
896
897    schema = graphene.Schema(query=Query)
898    query = """
899        query ReporterPromiseConnectionQuery {
900            allReporters(first: 1) {
901                edges {
902                    node {
903                        id
904                        articles(first: 2) {
905                            edges {
906                                node {
907                                    headline
908                                }
909                            }
910                        }
911                    }
912                }
913            }
914        }
915    """
916
917    expected = {
918        "allReporters": {
919            "edges": [
920                {
921                    "node": {
922                        "id": "UmVwb3J0ZXJUeXBlOjE=",
923                        "articles": {
924                            "edges": [
925                                {"node": {"headline": "Article Node 1"}},
926                                {"node": {"headline": "Article Node 2"}},
927                            ]
928                        },
929                    }
930                }
931            ]
932        }
933    }
934
935    result = schema.execute(query)
936    assert not result.errors
937    assert result.data == expected
938
939
940def test_should_handle_inherited_choices():
941    class BaseModel(models.Model):
942        choice_field = models.IntegerField(choices=((0, "zero"), (1, "one")))
943
944    class ChildModel(BaseModel):
945        class Meta:
946            proxy = True
947
948    class BaseType(DjangoObjectType):
949        class Meta:
950            model = BaseModel
951
952    class ChildType(DjangoObjectType):
953        class Meta:
954            model = ChildModel
955
956    class Query(graphene.ObjectType):
957        base = graphene.Field(BaseType)
958        child = graphene.Field(ChildType)
959
960    schema = graphene.Schema(query=Query)
961    query = """
962        query {
963          child {
964            choiceField
965          }
966        }
967    """
968    result = schema.execute(query)
969    assert not result.errors
970
971
972def test_proxy_model_support():
973    """
974    This test asserts that we can query for all Reporters and proxied Reporters.
975    """
976
977    class ReporterType(DjangoObjectType):
978        class Meta:
979            model = Reporter
980            interfaces = (Node,)
981            use_connection = True
982
983    class CNNReporterType(DjangoObjectType):
984        class Meta:
985            model = CNNReporter
986            interfaces = (Node,)
987            use_connection = True
988
989    reporter = Reporter.objects.create(
990        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
991    )
992
993    cnn_reporter = CNNReporter.objects.create(
994        first_name="Some",
995        last_name="Guy",
996        email="someguy@cnn.com",
997        a_choice=1,
998        reporter_type=2,  # set this guy to be CNN
999    )
1000
1001    class Query(graphene.ObjectType):
1002        all_reporters = DjangoConnectionField(ReporterType)
1003        cnn_reporters = DjangoConnectionField(CNNReporterType)
1004
1005    schema = graphene.Schema(query=Query)
1006    query = """
1007        query ProxyModelQuery {
1008            allReporters {
1009                edges {
1010                    node {
1011                        id
1012                    }
1013                }
1014            }
1015            cnnReporters {
1016                edges {
1017                    node {
1018                        id
1019                    }
1020                }
1021            }
1022        }
1023    """
1024
1025    expected = {
1026        "allReporters": {
1027            "edges": [
1028                {"node": {"id": to_global_id("ReporterType", reporter.id)}},
1029                {"node": {"id": to_global_id("ReporterType", cnn_reporter.id)}},
1030            ]
1031        },
1032        "cnnReporters": {
1033            "edges": [
1034                {"node": {"id": to_global_id("CNNReporterType", cnn_reporter.id)}}
1035            ]
1036        },
1037    }
1038
1039    result = schema.execute(query)
1040    assert not result.errors
1041    assert result.data == expected
1042
1043
1044def test_should_resolve_get_queryset_connectionfields():
1045    reporter_1 = Reporter.objects.create(
1046        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
1047    )
1048    reporter_2 = CNNReporter.objects.create(
1049        first_name="Some",
1050        last_name="Guy",
1051        email="someguy@cnn.com",
1052        a_choice=1,
1053        reporter_type=2,  # set this guy to be CNN
1054    )
1055
1056    class ReporterType(DjangoObjectType):
1057        class Meta:
1058            model = Reporter
1059            interfaces = (Node,)
1060
1061        @classmethod
1062        def get_queryset(cls, queryset, info):
1063            return queryset.filter(reporter_type=2)
1064
1065    class Query(graphene.ObjectType):
1066        all_reporters = DjangoConnectionField(ReporterType)
1067
1068    schema = graphene.Schema(query=Query)
1069    query = """
1070        query ReporterPromiseConnectionQuery {
1071            allReporters(first: 1) {
1072                edges {
1073                    node {
1074                        id
1075                    }
1076                }
1077            }
1078        }
1079    """
1080
1081    expected = {"allReporters": {"edges": [{"node": {"id": "UmVwb3J0ZXJUeXBlOjI="}}]}}
1082
1083    result = schema.execute(query)
1084    assert not result.errors
1085    assert result.data == expected
1086
1087
1088def test_connection_should_limit_after_to_list_length():
1089    reporter_1 = Reporter.objects.create(
1090        first_name="John", last_name="Doe", email="johndoe@example.com", a_choice=1
1091    )
1092    reporter_2 = Reporter.objects.create(
1093        first_name="Some", last_name="Guy", email="someguy@cnn.com", a_choice=1
1094    )
1095
1096    class ReporterType(DjangoObjectType):
1097        class Meta:
1098            model = Reporter
1099            interfaces = (Node,)
1100
1101    class Query(graphene.ObjectType):
1102        all_reporters = DjangoConnectionField(ReporterType)
1103
1104    schema = graphene.Schema(query=Query)
1105    query = """
1106        query ReporterPromiseConnectionQuery ($after: String) {
1107            allReporters(first: 1 after: $after) {
1108                edges {
1109                    node {
1110                        id
1111                    }
1112                }
1113            }
1114        }
1115    """
1116
1117    after = base64.b64encode(b"arrayconnection:10").decode()
1118    result = schema.execute(query, variable_values=dict(after=after))
1119    expected = {"allReporters": {"edges": []}}
1120    assert not result.errors
1121    assert result.data == expected
1122
1123
1124REPORTERS = [
1125    dict(
1126        first_name="First {}".format(i),
1127        last_name="Last {}".format(i),
1128        email="johndoe+{}@example.com".format(i),
1129        a_choice=1,
1130    )
1131    for i in range(6)
1132]
1133
1134
1135def test_should_return_max_limit(graphene_settings):
1136    graphene_settings.RELAY_CONNECTION_MAX_LIMIT = 4
1137    reporters = [Reporter(**kwargs) for kwargs in REPORTERS]
1138    Reporter.objects.bulk_create(reporters)
1139
1140    class ReporterType(DjangoObjectType):
1141        class Meta:
1142            model = Reporter
1143            interfaces = (Node,)
1144
1145    class Query(graphene.ObjectType):
1146        all_reporters = DjangoConnectionField(ReporterType)
1147
1148    schema = graphene.Schema(query=Query)
1149    query = """
1150        query AllReporters {
1151            allReporters {
1152                edges {
1153                    node {
1154                        id
1155                    }
1156                }
1157            }
1158        }
1159    """
1160
1161    result = schema.execute(query)
1162    assert not result.errors
1163    assert len(result.data["allReporters"]["edges"]) == 4
1164
1165
1166def test_should_have_next_page(graphene_settings):
1167    graphene_settings.RELAY_CONNECTION_MAX_LIMIT = 4
1168    reporters = [Reporter(**kwargs) for kwargs in REPORTERS]
1169    Reporter.objects.bulk_create(reporters)
1170    db_reporters = Reporter.objects.all()
1171
1172    class ReporterType(DjangoObjectType):
1173        class Meta:
1174            model = Reporter
1175            interfaces = (Node,)
1176
1177    class Query(graphene.ObjectType):
1178        all_reporters = DjangoConnectionField(ReporterType)
1179
1180    schema = graphene.Schema(query=Query)
1181    query = """
1182        query AllReporters($first: Int, $after: String) {
1183            allReporters(first: $first, after: $after) {
1184                pageInfo {
1185                    hasNextPage
1186                    endCursor
1187                }
1188                edges {
1189                    node {
1190                        id
1191                    }
1192                }
1193            }
1194        }
1195    """
1196
1197    result = schema.execute(query, variable_values={})
1198    assert not result.errors
1199    assert len(result.data["allReporters"]["edges"]) == 4
1200    assert result.data["allReporters"]["pageInfo"]["hasNextPage"]
1201
1202    last_result = result.data["allReporters"]["pageInfo"]["endCursor"]
1203    result2 = schema.execute(query, variable_values=dict(first=4, after=last_result))
1204    assert not result2.errors
1205    assert len(result2.data["allReporters"]["edges"]) == 2
1206    assert not result2.data["allReporters"]["pageInfo"]["hasNextPage"]
1207    gql_reporters = (
1208        result.data["allReporters"]["edges"] + result2.data["allReporters"]["edges"]
1209    )
1210
1211    assert {to_global_id("ReporterType", reporter.id) for reporter in db_reporters} == {
1212        gql_reporter["node"]["id"] for gql_reporter in gql_reporters
1213    }
1214
1215
1216class TestBackwardPagination:
1217    def setup_schema(self, graphene_settings, max_limit):
1218        graphene_settings.RELAY_CONNECTION_MAX_LIMIT = max_limit
1219        reporters = [Reporter(**kwargs) for kwargs in REPORTERS]
1220        Reporter.objects.bulk_create(reporters)
1221
1222        class ReporterType(DjangoObjectType):
1223            class Meta:
1224                model = Reporter
1225                interfaces = (Node,)
1226
1227        class Query(graphene.ObjectType):
1228            all_reporters = DjangoConnectionField(ReporterType)
1229
1230        schema = graphene.Schema(query=Query)
1231        return schema
1232
1233    def do_queries(self, schema):
1234        # Simply last 3
1235        query_last = """
1236            query {
1237                allReporters(last: 3) {
1238                    edges {
1239                        node {
1240                            firstName
1241                        }
1242                    }
1243                }
1244            }
1245        """
1246
1247        result = schema.execute(query_last)
1248        assert not result.errors
1249        assert len(result.data["allReporters"]["edges"]) == 3
1250        assert [
1251            e["node"]["firstName"] for e in result.data["allReporters"]["edges"]
1252        ] == ["First 3", "First 4", "First 5"]
1253
1254        # Use a combination of first and last
1255        query_first_and_last = """
1256            query {
1257                allReporters(first: 4, last: 3) {
1258                    edges {
1259                        node {
1260                            firstName
1261                        }
1262                    }
1263                }
1264            }
1265        """
1266
1267        result = schema.execute(query_first_and_last)
1268        assert not result.errors
1269        assert len(result.data["allReporters"]["edges"]) == 3
1270        assert [
1271            e["node"]["firstName"] for e in result.data["allReporters"]["edges"]
1272        ] == ["First 1", "First 2", "First 3"]
1273
1274        # Use a combination of first and last and after
1275        query_first_last_and_after = """
1276            query queryAfter($after: String) {
1277                allReporters(first: 4, last: 3, after: $after) {
1278                    edges {
1279                        node {
1280                            firstName
1281                        }
1282                    }
1283                }
1284            }
1285        """
1286
1287        after = base64.b64encode(b"arrayconnection:0").decode()
1288        result = schema.execute(
1289            query_first_last_and_after, variable_values=dict(after=after)
1290        )
1291        assert not result.errors
1292        assert len(result.data["allReporters"]["edges"]) == 3
1293        assert [
1294            e["node"]["firstName"] for e in result.data["allReporters"]["edges"]
1295        ] == ["First 2", "First 3", "First 4"]
1296
1297    def test_should_query(self, graphene_settings):
1298        """
1299        Backward pagination should work as expected
1300        """
1301        schema = self.setup_schema(graphene_settings, max_limit=100)
1302        self.do_queries(schema)
1303
1304    def test_should_query_with_low_max_limit(self, graphene_settings):
1305        """
1306        When doing backward pagination (using last) in combination with a max limit higher than the number of objects
1307        we should really retrieve the last ones.
1308        """
1309        schema = self.setup_schema(graphene_settings, max_limit=4)
1310        self.do_queries(schema)
1311
1312
1313def test_should_preserve_prefetch_related(django_assert_num_queries):
1314    class ReporterType(DjangoObjectType):
1315        class Meta:
1316            model = Reporter
1317            interfaces = (graphene.relay.Node,)
1318
1319    class FilmType(DjangoObjectType):
1320        reporters = DjangoConnectionField(ReporterType)
1321
1322        class Meta:
1323            model = Film
1324            interfaces = (graphene.relay.Node,)
1325
1326    class Query(graphene.ObjectType):
1327        films = DjangoConnectionField(FilmType)
1328
1329        def resolve_films(root, info):
1330            qs = Film.objects.prefetch_related("reporters")
1331            return qs
1332
1333    r1 = Reporter.objects.create(first_name="Dave", last_name="Smith")
1334    r2 = Reporter.objects.create(first_name="Jane", last_name="Doe")
1335
1336    f1 = Film.objects.create()
1337    f1.reporters.set([r1, r2])
1338    f2 = Film.objects.create()
1339    f2.reporters.set([r2])
1340
1341    query = """
1342        query {
1343            films {
1344                edges {
1345                    node {
1346                        reporters {
1347                            edges {
1348                                node {
1349                                    firstName
1350                                }
1351                            }
1352                        }
1353                    }
1354                }
1355            }
1356        }
1357    """
1358    schema = graphene.Schema(query=Query)
1359    with django_assert_num_queries(3) as captured:
1360        result = schema.execute(query)
1361    assert not result.errors
1362
1363
1364def test_should_preserve_annotations():
1365    class ReporterType(DjangoObjectType):
1366        class Meta:
1367            model = Reporter
1368            interfaces = (graphene.relay.Node,)
1369
1370    class FilmType(DjangoObjectType):
1371        reporters = DjangoConnectionField(ReporterType)
1372        reporters_count = graphene.Int()
1373
1374        class Meta:
1375            model = Film
1376            interfaces = (graphene.relay.Node,)
1377
1378    class Query(graphene.ObjectType):
1379        films = DjangoConnectionField(FilmType)
1380
1381        def resolve_films(root, info):
1382            qs = Film.objects.prefetch_related("reporters")
1383            return qs.annotate(reporters_count=models.Count("reporters"))
1384
1385    r1 = Reporter.objects.create(first_name="Dave", last_name="Smith")
1386    r2 = Reporter.objects.create(first_name="Jane", last_name="Doe")
1387
1388    f1 = Film.objects.create()
1389    f1.reporters.set([r1, r2])
1390    f2 = Film.objects.create()
1391    f2.reporters.set([r2])
1392
1393    query = """
1394        query {
1395            films {
1396                edges {
1397                    node {
1398                        reportersCount
1399                    }
1400                }
1401            }
1402        }
1403    """
1404    schema = graphene.Schema(query=Query)
1405    result = schema.execute(query)
1406    assert not result.errors, str(result)
1407
1408    expected = {
1409        "films": {
1410            "edges": [{"node": {"reportersCount": 2}}, {"node": {"reportersCount": 1}}]
1411        }
1412    }
1413    assert result.data == expected, str(result.data)
1414
1415
1416def test_connection_should_enable_offset_filtering():
1417    Reporter.objects.create(first_name="John", last_name="Doe")
1418    Reporter.objects.create(first_name="Some", last_name="Guy")
1419
1420    class ReporterType(DjangoObjectType):
1421        class Meta:
1422            model = Reporter
1423            interfaces = (Node,)
1424
1425    class Query(graphene.ObjectType):
1426        all_reporters = DjangoConnectionField(ReporterType)
1427
1428    schema = graphene.Schema(query=Query)
1429    query = """
1430        query {
1431            allReporters(first: 1, offset: 1) {
1432                edges {
1433                    node {
1434                        firstName
1435                        lastName
1436                    }
1437                }
1438            }
1439        }
1440    """
1441
1442    result = schema.execute(query)
1443    assert not result.errors
1444    expected = {
1445        "allReporters": {"edges": [{"node": {"firstName": "Some", "lastName": "Guy"}},]}
1446    }
1447    assert result.data == expected
1448
1449
1450def test_connection_should_enable_offset_filtering_higher_than_max_limit(
1451    graphene_settings,
1452):
1453    graphene_settings.RELAY_CONNECTION_MAX_LIMIT = 2
1454    Reporter.objects.create(first_name="John", last_name="Doe")
1455    Reporter.objects.create(first_name="Some", last_name="Guy")
1456    Reporter.objects.create(first_name="Jane", last_name="Roe")
1457    Reporter.objects.create(first_name="Some", last_name="Lady")
1458
1459    class ReporterType(DjangoObjectType):
1460        class Meta:
1461            model = Reporter
1462            interfaces = (Node,)
1463
1464    class Query(graphene.ObjectType):
1465        all_reporters = DjangoConnectionField(ReporterType)
1466
1467    schema = graphene.Schema(query=Query)
1468    query = """
1469        query {
1470            allReporters(first: 1, offset: 3) {
1471                edges {
1472                    node {
1473                        firstName
1474                        lastName
1475                    }
1476                }
1477            }
1478        }
1479    """
1480
1481    result = schema.execute(query)
1482    assert not result.errors
1483    expected = {
1484        "allReporters": {
1485            "edges": [{"node": {"firstName": "Some", "lastName": "Lady"}},]
1486        }
1487    }
1488    assert result.data == expected
1489
1490
1491def test_connection_should_forbid_offset_filtering_with_before():
1492    class ReporterType(DjangoObjectType):
1493        class Meta:
1494            model = Reporter
1495            interfaces = (Node,)
1496
1497    class Query(graphene.ObjectType):
1498        all_reporters = DjangoConnectionField(ReporterType)
1499
1500    schema = graphene.Schema(query=Query)
1501    query = """
1502        query ReporterPromiseConnectionQuery ($before: String) {
1503            allReporters(first: 1, before: $before, offset: 1) {
1504                edges {
1505                    node {
1506                        firstName
1507                        lastName
1508                    }
1509                }
1510            }
1511        }
1512    """
1513    before = base64.b64encode(b"arrayconnection:2").decode()
1514    result = schema.execute(query, variable_values=dict(before=before))
1515    expected_error = "You can't provide a `before` value at the same time as an `offset` value to properly paginate the `allReporters` connection."
1516    assert len(result.errors) == 1
1517    assert result.errors[0].message == expected_error
1518
1519
1520def test_connection_should_allow_offset_filtering_with_after():
1521    Reporter.objects.create(first_name="John", last_name="Doe")
1522    Reporter.objects.create(first_name="Some", last_name="Guy")
1523    Reporter.objects.create(first_name="Jane", last_name="Roe")
1524    Reporter.objects.create(first_name="Some", last_name="Lady")
1525
1526    class ReporterType(DjangoObjectType):
1527        class Meta:
1528            model = Reporter
1529            interfaces = (Node,)
1530
1531    class Query(graphene.ObjectType):
1532        all_reporters = DjangoConnectionField(ReporterType)
1533
1534    schema = graphene.Schema(query=Query)
1535    query = """
1536        query ReporterPromiseConnectionQuery ($after: String) {
1537            allReporters(first: 1, after: $after, offset: 1) {
1538                edges {
1539                    node {
1540                        firstName
1541                        lastName
1542                    }
1543                }
1544            }
1545        }
1546    """
1547
1548    after = base64.b64encode(b"arrayconnection:0").decode()
1549    result = schema.execute(query, variable_values=dict(after=after))
1550    assert not result.errors
1551    expected = {
1552        "allReporters": {"edges": [{"node": {"firstName": "Jane", "lastName": "Roe"}},]}
1553    }
1554    assert result.data == expected
1555