1# Copyright (c) 2021, Oracle and/or its affiliates.
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License, version 2.0, as
5# published by the Free Software Foundation.
6#
7# This program is also distributed with certain software (including
8# but not limited to OpenSSL) that is licensed under separate terms,
9# as designated in a particular file or component or in included license
10# documentation.  The authors of MySQL hereby grant you an
11# additional permission to link the program and your derivative works
12# with the separately licensed software that they have included with
13# MySQL.
14#
15# Without limiting anything contained in the foregoing, this file,
16# which is part of MySQL Connector/Python, is also subject to the
17# Universal FOSS Exception, version 1.0, a copy of which can be found at
18# http://oss.oracle.com/licenses/universal-foss-exception.
19#
20# This program is distributed in the hope that it will be useful, but
21# WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
23# See the GNU General Public License, version 2.0, for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program; if not, write to the Free Software Foundation, Inc.,
27# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
28
29import os
30import threading
31import time
32import unittest
33
34import mysqlx
35import tests
36
37
38@unittest.skipIf(tests.MYSQL_VERSION < (8, 0, 25), "XPlugin not compatible")
39class CollectionAddTests(tests.MySQLxTests):
40    """Tests for collection.find()."""
41
42    def _drop_collection_if_exists(self, name):
43        collection = self.schema.get_collection(name)
44        if collection.exists_in_database():
45            self.schema.drop_collection(name)
46
47    @tests.foreach_session()
48    def test_collection_find1(self):
49        """Test collection.find.fields."""
50        self._drop_collection_if_exists("mycoll5")
51        collection = self.schema.create_collection("mycoll5")
52        collection.add(
53            {"_id": 1, "name": "a", "age": 21},
54            {"_id": 2, "name": "b"},
55            {"_id": 3, "name": "c"},
56        ).execute()
57        result = (
58            collection.find().fields("sum($.age)").group_by("$.age").execute()
59        ).fetch_all()
60        self.assertEqual(len(result), 2)
61        self.schema.drop_collection("mycoll5")
62
63    @unittest.skip("TODO: Fix me")
64    @tests.foreach_session()
65    def test_collection_find2(self):
66        """Test the collection.find.groupby and having."""
67        self._drop_collection_if_exists("mycoll6")
68        collection = self.schema.create_collection("mycoll6")
69        collection.add(
70            {"_id": 1, "a": 1, "b": 2, "c": 100},
71            {"_id": 2, "a": 2, "b": 1, "c": 200},
72            {"_id": 3, "a": 3, "b": 2, "c": 300},
73        ).execute()
74        result = (
75            collection.find()
76            .fields("$.a, $.b")
77            .group_by("$.b")
78            .having("$.a > 1")
79            .execute()
80        ).fetch_all()
81        self.assertEqual(len(result), 2)
82        self.schema.drop_collection("mycoll6")
83
84    @tests.foreach_session()
85    def test_collection_find3(self):
86        """Test collection.find with sort."""
87        self._drop_collection_if_exists("mycoll9")
88        collection = self.schema.create_collection("mycoll9")
89        collection.add(
90            {"_id": 1, "a": 1, "b": 2, "c": 100},
91            {"_id": 2, "a": 2, "b": 1, "c": 200},
92            {"_id": 3, "a": 3, "b": 2, "c": 300},
93        ).execute()
94        result = collection.find().fields("$.a, $b").sort("a DESC").execute()
95        row = result.fetch_all()
96        self.assertEqual(row[0]["$.a"], 3)
97        self.schema.drop_collection("mycoll9")
98
99    @tests.foreach_session()
100    def test_collection_find4(self):
101        """Test collection.find with limit with offset."""
102        self._drop_collection_if_exists("mycoll10")
103        collection = self.schema.create_collection("mycoll10")
104        collection.add(
105            {"_id": 1, "a": 1, "b": 2, "c": 100},
106            {"_id": 2, "a": 2, "b": 1, "c": 200},
107            {"_id": 3, "a": 3, "b": 2, "c": 300},
108        ).execute()
109        result = (
110            collection.find("$.a > 1")
111            .fields("$.a")
112            .limit(2)
113            .offset(1)
114            .execute()
115        )
116        row = result.fetch_all()
117        self.schema.drop_collection("mycoll10")
118
119    @unittest.skip("TODO: Fix me")
120    @tests.foreach_session()
121    def test_collection_find5(self):
122        """Test collection.find with like."""
123        self._drop_collection_if_exists("mycoll11")
124        collection = self.schema.create_collection("mycoll11")
125        collection.add(
126            {"_id": 1, "name": "Sana"},
127            {"_id": 2, "name": "Sam"},
128            {"_id": 3, "name": "amr"},
129        ).execute()
130        result = collection.find("$.name like S*").execute()
131        row = result.fetch_all()
132        self.assertEqual(len(row), 2)
133        self.assertEqual(row[1]["name"], "Sam")
134        self.schema.drop_collection("mycoll11")
135
136    @tests.foreach_session()
137    def test_collection_find6(self):
138        """Test collection.find with bind."""
139        self._drop_collection_if_exists("mycoll11")
140        collection = self.schema.create_collection("mycoll11")
141        collection.add(
142            {"_id": 1, "name": "Sana"},
143            {"_id": 2, "name": "Sam"},
144            {"_id": 3, "name": "amr"},
145        ).execute()
146        result = (
147            collection.find("$.name  == :name").bind("name", "Sana").execute()
148        )
149        row = result.fetch_all()[0]
150        self.assertEqual(row["_id"], 1)
151        self.schema.drop_collection("mycoll11")
152
153    @tests.foreach_session()
154    def test_collection_find7(self):
155        """Test collection.find with parameter list."""
156        self._drop_collection_if_exists("mycoll19")
157        collection = self.schema.create_collection("mycoll19")
158        collection.add(
159            {"_id": 1, "name": "Sana"},
160            {"_id": 2, "name": "Sam"},
161            {"_id": 3, "name": "amr"},
162        ).execute()
163        result = (
164            collection.find("$._id > 1").fields("$._id", "$.name").execute()
165        ).fetch_all()
166        self.assertEqual(len(result), 2)
167        self.schema.drop_collection("mycoll19")
168
169    @unittest.skipIf(
170        tests.MYSQL_EXTERNAL_SERVER,
171        "Test not available for external MySQL servers",
172    )
173    @tests.foreach_session()
174    def test_collection_find8(self):
175        """Test collection.find.groupby with parameter list."""
176        self._drop_collection_if_exists("mycoll20")
177        collection = self.schema.create_collection(
178            "mycol20",
179        )
180        collection.add(
181            {"_id": 1, "a": 1, "b": 2, "c": 100},
182            {"_id": 2, "a": 2, "b": 2, "c": 300},
183            {"_id": 3, "a": 3, "b": 2, "c": 100},
184        ).execute()
185        result = (
186            collection.find()
187            .fields("$a,$.b,$.c")
188            .group_by("$.b", "$.c")
189            .execute()
190        ).fetch_all()
191        self.assertEqual(len(result), 2)
192        self.schema.drop_collection("mycol20")
193
194    @tests.foreach_session()
195    def test_collection_find9(self):
196        """Test collection.find.sort with param list."""
197        self._drop_collection_if_exists("mycoll21")
198        collection = self.schema.create_collection("mycoll21")
199        collection.add(
200            {"_id": 1, "a": 1, "b": 10, "c": 100},
201            {"_id": 2, "a": 1, "b": 11, "c": 200},
202            {"_id": 3, "a": 2, "b": 10, "c": 300},
203        ).execute()
204        result = (
205            collection.find()
206            .fields("$.a, $.b")
207            .sort("a ASC", "b DESC")
208            .execute()
209        ).fetch_all()
210        self.assertEqual(result[0]["$.b"], 11)
211        self.schema.drop_collection("mycoll21")
212
213    @tests.foreach_session()
214    def test_collection_find10(self):
215        """Test collection.find using where() condition."""
216        self._drop_collection_if_exists("newcoll1")
217        collection = self.schema.create_collection("newcoll1")
218        collection.add(
219            {"_id": 1, "a": 1, "b": 10, "c": 100},
220            {"_id": 2, "a": 1, "b": 11, "c": 200},
221            {"_id": 3, "a": 2, "b": 10, "c": 300},
222        ).execute()
223        result = collection.find().where("$.c  >= 200").execute()
224        self.assertEqual(len(result.fetch_all()), 2)
225        self.schema.drop_collection("newcoll1")
226
227    @unittest.skipUnless(
228        tests.ARCH_64BIT, "Test available only for 64 bit platforms"
229    )
230    @unittest.skipIf(os.name == "nt", "Test not available for Windows")
231    @tests.foreach_session()
232    def test_collection_find11(self):
233        """Test collection.find with offset as large positive number."""
234        self._drop_collection_if_exists("newcoll2")
235        collection = self.schema.create_collection(
236            "newcoll2",
237        )
238        collection.add(
239            {"_id": 1, "a": 1, "b": 10, "c": 100},
240            {"_id": 2, "a": 1, "b": 11, "c": 200},
241            {"_id": 3, "a": 2, "b": 10, "c": 300},
242        ).execute()
243        result = collection.find().limit(2).offset(92898832378723).execute()
244        self.assertEqual(len(result.fetch_all()), 0)
245        self.schema.drop_collection("newcoll2")
246
247    @tests.foreach_session()
248    def test_collection_find12(self):
249        """Test collection.find with offset as negative number."""
250        self._drop_collection_if_exists("mycoll3")
251        collection = self.schema.create_collection("newcoll3")
252        collection.add(
253            {"_id": 1, "a": 1, "b": 10, "c": 100},
254            {"_id": 2, "a": 1, "b": 11, "c": 200},
255            {"_id": 3, "a": 2, "b": 10, "c": 300},
256        ).execute()
257        self.assertRaises(
258            ValueError,
259            collection.find().limit(2).offset,
260            -2378723,
261        )
262        self.schema.drop_collection("newcoll3")
263
264    @tests.foreach_session()
265    def test_operator1(self):
266        """Test binary operator and."""
267        self._drop_collection_if_exists("mycoll1")
268        collection = self.schema.create_collection("mycoll1")
269        collection.add(
270            {"_id": 1, "name": "Sana"},
271            {"_id": 2, "name": "Sam"},
272            {"_id": 3, "name": "amr"},
273        ).execute()
274        result = (
275            collection.find("$.name  == :name and $._id == :id")
276            .bind('{"name":"Sana" ,"id":1}')
277            .execute()
278        )
279        row = result.fetch_all()[0]
280        self.assertEqual(row["name"], "Sana")
281        self.schema.drop_collection("mycoll1")
282
283    @tests.foreach_session()
284    def test_operator4(self):
285        """Test 'between' operator."""
286        self._drop_collection_if_exists("mycoll2")
287        collection = self.schema.create_collection("mycoll2")
288        collection.add(
289            {"_id": 1, "name": "Sana"},
290            {"_id": 2, "name": "Sam"},
291            {"_id": 3, "name": "amr"},
292            {"_id": 4, "name": "abc"},
293            {"_id": 5, "name": "def"},
294        ).execute()
295        result = collection.find("$._id between 2 and 4").execute()
296        self.assertEqual(len(result.fetch_all()), 3)
297        self.schema.drop_collection("mycoll2")
298
299    # Testing the contains operator with single operand on both sides
300
301    @tests.foreach_session()
302    def test_contains_operator_test1(self):
303        """Test IN operator with string on both sides - With LHS in RHS."""
304        self._drop_collection_if_exists("mycoll1")
305        collection = self.schema.create_collection("mycoll1")
306        collection.add({"name": "a"}, {"name": "b"}).execute()
307        result = collection.find("'a' IN $.name").execute()
308        self.assertEqual(len(result.fetch_all()), 1)
309        self.schema.drop_collection("mycoll1")
310
311    @tests.foreach_session()
312    def test_contains_operator2(self):
313        """Test IN operator with int as operand - With LHS in RHS."""
314        self._drop_collection_if_exists("mycoll2")
315        collection = self.schema.create_collection("mycoll2")
316        collection.add(
317            {"name": "a", "age": 21}, {"name": "b", "age": 21}
318        ).execute()
319        result = collection.find("21 IN $.age").execute()
320        self.assertEqual(len(result.fetch_all()), 2)
321        self.schema.drop_collection("mycoll2")
322
323    @unittest.skip("TODO: Fix me")
324    @tests.foreach_session()
325    def test_contains_operator3(self):
326        """Test IN operator with boolean as operand - With LHS in RHS."""
327        self._drop_collection_if_exists("mycoll3")
328        collection = self.schema.create_collection("mycoll3")
329        collection.add(
330            {"name": "a", "age": 21, "ARR": [1, 4]},
331            {"name": "b", "age": 21, "ARR": 2},
332        ).execute()
333        result = collection.find("(!false && true) IN [true]").execute()
334        self.assertEqual(len(result.fetch_all()), 2)
335        self.schema.drop_collection("mycoll3")
336
337    @tests.foreach_session()
338    def test_contains_operator4(self):
339        """Test NOT IN operator with string operand - With LHS not in RHS."""
340        self._drop_collection_if_exists("mycoll4")
341        collection = self.schema.create_collection("mycoll4")
342        collection.add({"name": "a"}, {"name": "b"}, {"name": "c"}).execute()
343        result = collection.find("$.name NOT IN 'a'").execute()
344        self.assertEqual(len(result.fetch_all()), 2)
345        self.schema.drop_collection("mycoll4")
346
347    @tests.foreach_session()
348    def test_contains_operator5(self):
349        """Test NOT IN operator with int as operand - With LHS not in RHS."""
350        self._drop_collection_if_exists("mycoll5")
351        collection = self.schema.create_collection("mycoll5")
352        collection.add(
353            {"name": "a", "age": 21}, {"name": "b", "age": 22}
354        ).execute()
355        result = collection.find("21 NOT IN $.age").execute()
356        self.assertEqual(len(result.fetch_all()), 1)
357        self.schema.drop_collection("mycoll5")
358
359    @tests.foreach_session()
360    def test_contains_operator6(self):
361        self._drop_collection_if_exists("mycoll6")
362        collection = self.schema.create_collection("mycoll6")
363        collection.add(
364            {"name": "a", "age": 21}, {"name": "b", "age": 21}
365        ).execute()
366        result = collection.find("'b' NOT IN $.name").execute()
367        self.assertEqual(len(result.fetch_all()), 1)
368        self.schema.drop_collection("mycoll6")
369
370    @tests.foreach_session()
371    def test_contains_operator7(self):
372        """Test IN operator with different datatypes as operands."""
373        self._drop_collection_if_exists("mycoll7")
374        collection = self.schema.create_collection("mycoll7")
375        collection.add(
376            {"name": "a", "age": 21}, {"name": "b", "age": 22}
377        ).execute()
378        result = collection.find("21 IN $.name").execute()
379        result1 = collection.find("'b' IN $.age").limit(1).execute()
380        self.assertEqual(len(result.fetch_all()), 0)
381        self.assertEqual(len(result1.fetch_all()), 0),
382        self.schema.drop_collection("mycoll7")
383
384    @tests.foreach_session()
385    def test_contains_operator8(self):
386        """Test IN operator with single element on LHS and array/list on
387        RHS and vice versa."""
388        self._drop_collection_if_exists("mycoll8")
389        collection = self.schema.create_collection("mycoll8")
390        collection.add(
391            {"_id": 1, "name": "a", "age": 21, "prof": ["x", "y"]},
392            {"_id": 2, "name": "b", "age": 24, "prof": ["p", "q"]},
393            {"_id": 3, "name": "c", "age": 26},
394        ).execute()
395        result = collection.find("$.age IN [21,23,24,28]").execute()
396        result1 = collection.find("$.name IN ['a','b','c','d','e']").execute()
397        result2 = collection.find("$.age IN (21,23)").execute()
398        result3 = (
399            collection.find()
400            .fields("21 IN (22,23) as test")
401            .limit(1)
402            .execute()
403        )
404        result4 = collection.find('["p","q"] IN $.prof').execute()
405        self.assertEqual(len(result.fetch_all()), 2)
406        self.assertEqual(len(result1.fetch_all()), 3)
407        self.assertEqual(len(result2.fetch_all()), 1)
408        self.assertEqual(result3.fetch_all()[0].test, False)
409        self.assertEqual(len(result4.fetch_all()), 1)
410        self.schema.drop_collection("mycoll8")
411
412    @tests.foreach_session()
413    def test_contains_operator9(self):
414        """Test IN operator with single element on LHS and dict on RHS and
415        vice versa."""
416        self._drop_collection_if_exists("mycoll9")
417        collection = self.schema.create_collection("mycoll9")
418        collection.add(
419            {
420                "_id": 1,
421                "name": "joy",
422                "age": 21,
423                "additionalinfo": {
424                    "company": "xyz",
425                    "vehicle": "bike",
426                    "hobbies": [
427                        "reading",
428                        "music",
429                        "playing",
430                        {"a1": "x", "b1": "y", "c1": "z"},
431                    ],
432                },
433            }
434        ).execute()
435        result = collection.find(
436            "'reading' IN $.additionalinfo.hobbies"
437        ).execute()
438        result1 = (
439            collection.find()
440            .fields("'music' IN $.age as test")
441            .limit(1)
442            .execute()
443        )
444        result2 = (
445            collection.find()
446            .fields("'boxing' IN $.additionalinfo.hobbies as test1")
447            .limit(1)
448            .execute()
449        )
450        result3 = collection.find(
451            '{"a1":"x","b1":"y","c1":"z"} IN $.additionalinfo.hobbies'
452        ).execute()
453        self.assertEqual(len(result.fetch_all()), 1)
454        self.assertFalse(result1.fetch_all()[0].test)
455        self.assertFalse(result2.fetch_all()[0].test1)
456        self.assertEqual(len(result3.fetch_all()), 1)
457        self.schema.drop_collection("mycoll9")
458
459    @tests.foreach_session()
460    def test_contains_operator10(self):
461        """Test IN operator with array/list operand on LHS and array/list on
462        RHS."""
463        self._drop_collection_if_exists("mycoll10")
464        collection = self.schema.create_collection("mycoll10")
465        collection.add(
466            {
467                "_id": 1,
468                "name": "joy",
469                "age": 21,
470                "additionalinfo": {
471                    "company": "xyz",
472                    "vehicle": "bike",
473                    "hobbies": ["reading", "music", "playing"],
474                },
475            },
476            {
477                "_id": 2,
478                "name": "happy",
479                "age": 24,
480                "additionalinfo": {
481                    "company": "abc",
482                    "vehicle": "car",
483                    "hobbies": ["playing", "painting", "boxing"],
484                },
485            },
486        ).execute()
487        result = collection.find(
488            '["playing","painting","boxing"] IN $.additionalinfo.hobbies'
489        ).execute()
490        result1 = (
491            collection.find()
492            .fields('["happy","joy"] IN $.name as test')
493            .limit(1)
494            .execute()
495        )
496        result2 = (
497            collection.find()
498            .fields('["car","bike"] NOT IN $.additionalinfo.vehicle as test1')
499            .limit(1)
500            .execute()
501        )
502        self.assertEqual(len(result.fetch_all()), 1)
503        self.assertFalse(result1.fetch_all()[0].test)
504        self.assertTrue(result2.fetch_all()[0].test1)
505        self.schema.drop_collection("mycoll10")
506
507    @tests.foreach_session()
508    def test_contains_operator11(self):
509        """Test IN operator with dict on LHS and dict on RHS."""
510        self._drop_collection_if_exists("mycoll11")
511        collection = self.schema.create_collection("mycoll11")
512        collection.add(
513            {
514                "_id": 1,
515                "name": "joy",
516                "age": 21,
517                "additionalinfo": [
518                    {"company": "xyz", "vehicle": "bike"},
519                    {"company": "abc", "vehicle": "car"},
520                    {"company": "mno", "vehicle": "zeep"},
521                ],
522            },
523            {
524                "_id": 2,
525                "name": "happy",
526                "age": 24,
527                "additionalinfo": [
528                    {"company": "abc", "vehicle": "car"},
529                    {"company": "pqr", "vehicle": "bicycle"},
530                ],
531            },
532            {
533                "_id": 3,
534                "name": "nice",
535                "age": 25,
536                "additionalinfo": {"company": "def", "vehicle": "none"},
537            },
538        ).execute()
539        result = collection.find(
540            '{"company":"abc","vehicle":"car"} IN $.additionalinfo'
541        ).execute()
542        result1 = (
543            collection.find()
544            .fields('{"vehicle":"car"} NOT IN $.additionalinfo as test')
545            .limit(1)
546            .execute()
547        )
548        result2 = (
549            collection.find()
550            .fields('{"company":"mno"} IN $.additionalinfo as test1')
551            .limit(1)
552            .execute()
553        )
554        result3 = collection.find(
555            '{"company":"abc","vehicle":"car"} NOT IN $.additionalinfo'
556        ).execute()
557        self.assertEqual(len(result.fetch_all()), 2)
558        self.assertFalse(result1.fetch_all()[0].test)
559        self.assertTrue(result2.fetch_all()[0].test1)
560        self.assertEqual(len(result3.fetch_all()), 1)
561        self.schema.drop_collection("mycoll11")
562
563    @tests.foreach_session()
564    def test_contains_operator12(self):
565        """Test IN operator with operands having expressions."""
566        self._drop_collection_if_exists("mycoll12")
567        collection = self.schema.create_collection("mycoll12")
568        collection.add(
569            {"_id": 1, "name": "a", "age": 21},
570            {"_id": 2, "name": "b"},
571            {"_id": 3, "name": "c"},
572        ).execute()
573        result = (
574            collection.find()
575            .fields("(1>5) IN (true, false) as test")
576            .limit(1)
577            .execute()
578        )
579        result1 = (
580            collection.find()
581            .fields("('a'>'b') in (true, false) as test1")
582            .limit(1)
583            .execute()
584        )
585        result2 = (
586            collection.find()
587            .fields(
588                "true IN [(1>5), !(false), (true || false), (false && true)] as test2"
589            )
590            .limit(1)
591            .execute()
592        )
593        self.assertTrue(result.fetch_all()[0].test)
594        self.assertTrue(result1.fetch_all()[0].test1)
595        self.assertTrue(result2.fetch_all()[0].test2)
596        self.schema.drop_collection("mycoll12")
597
598    @tests.foreach_session()
599    def test_contains_operator13(self):
600        """Test IN operator with operands having expressions."""
601        self._drop_collection_if_exists("mycoll13")
602        collection = self.schema.create_collection("mycoll13")
603        collection.add(
604            {"_id": 1, "name": "a", "age": 21},
605            {"_id": 2, "name": "b"},
606            {"_id": 3, "name": "c"},
607        ).execute()
608        result = collection.find("(1+5) IN (1,2,3,4,5,6)").execute()
609        result1 = collection.find("(2+3) IN (1,2,3,4)").limit(1).execute()
610        result2 = collection.find("(1+5) IN (1,2,3,4,5,6)").execute()
611        self.assertEqual(len(result.fetch_all()), 3)
612        self.assertEqual(len(result1.fetch_all()), 0)
613        self.assertEqual(len(result2.fetch_all()), 3)
614        self.schema.drop_collection("mycoll13")
615
616    @tests.foreach_session()
617    def test_contains_operator14(self):
618        """Test IN operator: search for empty string in a field and field in
619        empty string."""
620        self._drop_collection_if_exists("mycoll14")
621        collection = self.schema.create_collection("mycoll14")
622        collection.add(
623            {"_id": 1, "name": "a", "age": 21},
624            {"_id": 2, "name": "b"},
625            {"_id": 3, "name": "c"},
626        ).execute()
627        result = collection.find("'' IN $.name").execute()
628        result1 = collection.find("$.name IN ['', ' ']").execute()
629        result2 = collection.find("$.name IN ('', ' ')").execute()
630        self.assertEqual(len(result.fetch_all()), 0)
631        self.assertEqual(len(result1.fetch_all()), 0)
632        self.assertEqual(len(result2.fetch_all()), 0)
633        self.schema.drop_collection("mycoll14")
634
635    @tests.foreach_session()
636    def test_collection_s_s_lock(self):
637        """Test shared-shared lock."""
638        config = tests.get_mysqlx_config()
639        self._drop_collection_if_exists("mycoll11")
640        collection = self.schema.create_collection("mycoll1")
641        collection.add(
642            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
643        ).execute()
644
645        locking = threading.Event()
646        waiting = threading.Event()
647
648        def thread_a(locking, waiting):
649            session1 = mysqlx.get_session(config)
650            schema1 = session1.get_schema(config["schema"])
651            collection = schema1.get_collection("mycoll1")
652
653            session1.start_transaction()
654            collection.find("name = 'James'").lock_shared().execute()
655            locking.set()
656            time.sleep(2)
657            locking.clear()
658            if waiting.is_set():
659                session1.commit()
660                self.fail(
661                    "Collection_S_S_Lock_test IS NOT OK. Other thread is "
662                    "waiting while it is not expected to!"
663                )
664            session1.commit()
665
666        def thread_b(locking, waiting):
667            session2 = mysqlx.get_session(config)
668            schema2 = session2.get_schema(config["schema"])
669            collection = schema2.get_collection("mycoll1")
670
671            if not locking.wait(2):
672                self.fail(
673                    "Collection_S_S_Lock_test IS NOT OK. Other thread has not "
674                    "set the lock!"
675                )
676            session2.start_transaction()
677
678            waiting.set()
679            collection.find("name = 'James'").lock_shared().execute()
680            waiting.clear()
681
682            session2.commit()
683
684        client1 = threading.Thread(
685            target=thread_a,
686            args=(
687                locking,
688                waiting,
689            ),
690        )
691        client2 = threading.Thread(
692            target=thread_b,
693            args=(
694                locking,
695                waiting,
696            ),
697        )
698
699        client1.start()
700        client2.start()
701
702        client1.join()
703        client2.join()
704        self.schema.drop_collection("mycoll1")
705
706    @tests.foreach_session()
707    def test_collection_s_x_lock(self):
708        config = tests.get_mysqlx_config()
709        """Test shared-exclusive lock."""
710        self._drop_collection_if_exists("mycoll2")
711        collection = self.schema.create_collection("mycoll2")
712        collection.add(
713            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
714        ).execute()
715
716        locking = threading.Event()
717        waiting = threading.Event()
718
719        def thread_a(locking, waiting):
720            session1 = mysqlx.get_session(config)
721            schema1 = session1.get_schema(config["schema"])
722            collection = schema1.get_collection("mycoll2")
723
724            session1.start_transaction()
725            collection.find("name = 'James'").lock_shared().execute()
726            locking.set()
727            time.sleep(2)
728            locking.clear()
729            if not waiting.is_set():
730                session1.commit()
731                self.fail(
732                    "Collection_S_X_Lock_test IS NOT OK. Other thread is not "
733                    "waiting while it is expected to!"
734                )
735            session1.commit()
736
737        def thread_b(locking, waiting):
738            session1 = mysqlx.get_session(config)
739            schema1 = session1.get_schema(config["schema"])
740            collection = schema1.get_collection("mycoll2")
741
742            if not locking.wait(2):
743                self.fail(
744                    "Collection_S_X_Lock_test IS NOT OK. Other thread has not "
745                    "set the lock!"
746                )
747            session1.start_transaction()
748            waiting.set()
749            collection.find("name = 'James'").lock_exclusive().execute()
750            waiting.clear()
751            session1.commit()
752
753        client1 = threading.Thread(
754            target=thread_a,
755            args=(
756                locking,
757                waiting,
758            ),
759        )
760        client2 = threading.Thread(
761            target=thread_b,
762            args=(
763                locking,
764                waiting,
765            ),
766        )
767
768        client1.start()
769        client2.start()
770
771        client1.join()
772        client2.join()
773        self.schema.drop_collection("mycoll2")
774
775    @tests.foreach_session()
776    def test_collection_x_x_lock(self):
777        """Test exclusive-exclusive lock."""
778        config = tests.get_mysqlx_config()
779        self._drop_collection_if_exists("mycoll3")
780        collection = self.schema.create_collection("mycoll3")
781        collection.add(
782            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
783        ).execute()
784
785        locking = threading.Event()
786        waiting = threading.Event()
787
788        def thread_a(locking, waiting):
789            session1 = mysqlx.get_session(config)
790            schema1 = session1.get_schema(config["schema"])
791            collection = schema1.get_collection("mycoll3")
792
793            session1.start_transaction()
794            collection.find("name = 'James'").lock_exclusive().execute()
795            locking.set()
796            time.sleep(2)
797            locking.clear()
798            if not waiting.is_set():
799                session1.commit()
800                self.fail(
801                    "Collection_X_X_Lock_test IS NOT OK. Other thread is not "
802                    "waiting while it is expected to!"
803                )
804            session1.commit()
805
806        def thread_b(locking, waiting):
807            session2 = mysqlx.get_session(config)
808            schema2 = session2.get_schema(config["schema"])
809            collection = schema2.get_collection("mycoll3")
810
811            if not locking.wait(2):
812                self.fail(
813                    "Collection_X_X_Lock_test IS NOT OK. Other thread has not "
814                    "set the lock!"
815                )
816            session2.start_transaction()
817
818            waiting.set()
819            collection.find("name = 'James'").lock_exclusive().execute()
820            waiting.clear()
821
822            session2.commit()
823
824        client1 = threading.Thread(
825            target=thread_a,
826            args=(
827                locking,
828                waiting,
829            ),
830        )
831        client2 = threading.Thread(
832            target=thread_b,
833            args=(
834                locking,
835                waiting,
836            ),
837        )
838
839        client1.start()
840        client2.start()
841
842        client1.join()
843        client2.join()
844        self.schema.drop_collection("mycoll3")
845
846    @tests.foreach_session()
847    def test_collection_x_s_lock(self):
848        """Test exclusive-exclusive lock."""
849        config = tests.get_mysqlx_config()
850        self._drop_collection_if_exists("mycoll4")
851        collection = self.schema.create_collection("mycoll4")
852        collection.add(
853            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
854        ).execute()
855
856        locking = threading.Event()
857        waiting = threading.Event()
858
859        def thread_a(locking, waiting):
860            session1 = mysqlx.get_session(config)
861            schema1 = session1.get_schema(config["schema"])
862            collection = schema1.get_collection("mycoll4")
863
864            session1.start_transaction()
865            collection.find("name = 'James'").lock_exclusive().execute()
866            locking.set()
867            time.sleep(2)
868            locking.clear()
869            if not waiting.is_set():
870                session1.commit()
871                self.fail(
872                    "Collection_X_S_Lock_test IS NOT OK. Other thread is not "
873                    "waiting while it is expected to!"
874                )
875            session1.commit()
876
877        def thread_b(locking, waiting):
878            session2 = mysqlx.get_session(config)
879            schema2 = session2.get_schema(config["schema"])
880            collection = schema2.get_collection("mycoll4")
881
882            if not locking.wait(2):
883                self.fail(
884                    "Collection_X_S_Lock_test IS NOT OK. Other thread has not "
885                    "set the lock!"
886                )
887            session2.start_transaction()
888
889            waiting.set()
890            collection.find("name = 'James'").lock_shared().execute()
891            waiting.clear()
892
893            session2.commit()
894
895        client1 = threading.Thread(
896            target=thread_a,
897            args=(
898                locking,
899                waiting,
900            ),
901        )
902        client2 = threading.Thread(
903            target=thread_b,
904            args=(
905                locking,
906                waiting,
907            ),
908        )
909
910        client1.start()
911        client2.start()
912
913        client1.join()
914        client2.join()
915        self.schema.drop_collection("mycoll4")
916
917    @tests.foreach_session()
918    def test_collection_multiple_lock_calls(self):
919        """Test multiple lock calls."""
920        config = tests.get_mysqlx_config()
921        self._drop_collection_if_exists("mycoll5")
922        collection = self.schema.create_collection("mycoll5")
923        collection.add(
924            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
925        ).execute()
926
927        locking = threading.Event()
928        waiting = threading.Event()
929
930        def thread_a(locking, waiting):
931            session1 = mysqlx.get_session(config)
932            schema1 = session1.get_schema(config["schema"])
933            collection = schema1.get_collection("mycoll5")
934
935            session1.start_transaction()
936            collection.find(
937                "name = 'James'"
938            ).lock_exclusive().lock_shared().lock_exclusive().execute()
939            locking.set()
940            time.sleep(2)
941            locking.clear()
942            if not waiting.is_set():
943                session1.commit()
944                self.fail(
945                    "Collection_Multiple_Lock_calls_test IS NOT OK. Other "
946                    "thread is not waiting while it is expected to!"
947                )
948            session1.commit()
949
950        def thread_b(locking, waiting):
951            session2 = mysqlx.get_session(config)
952            schema2 = session2.get_schema(config["schema"])
953            collection = schema2.get_collection("mycoll5")
954
955            if not locking.wait(2):
956                self.fail(
957                    "Collection_Multiple_Lock_calls_test IS NOT OK. Other "
958                    "thread has not set the lock!"
959                )
960            session2.start_transaction()
961
962            waiting.set()
963            collection.find(
964                "name = 'James'"
965            ).lock_shared().lock_exclusive().lock_exclusive().lock_shared().execute()
966            waiting.clear()
967
968            session2.commit()
969
970        client1 = threading.Thread(
971            target=thread_a,
972            args=(
973                locking,
974                waiting,
975            ),
976        )
977        client2 = threading.Thread(
978            target=thread_b,
979            args=(
980                locking,
981                waiting,
982            ),
983        )
984
985        client1.start()
986        client2.start()
987
988        client1.join()
989        client2.join()
990        self.schema.drop_collection("mycoll5")
991
992    @tests.foreach_session()
993    def test_collection_x_lock_modify(self):
994        """Test lock exclusive and modify - modify will be blocked until the
995        lock is released."""
996        config = tests.get_mysqlx_config()
997        self._drop_collection_if_exists("mycoll6")
998        collection = self.schema.create_collection("mycoll6")
999        collection.add(
1000            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1001        ).execute()
1002
1003        locking = threading.Event()
1004        waiting = threading.Event()
1005
1006        def thread_a(locking, waiting):
1007            session1 = mysqlx.get_session(config)
1008            schema1 = session1.get_schema(config["schema"])
1009            collection = schema1.get_collection("mycoll6")
1010
1011            session1.start_transaction()
1012            collection.find(
1013                "$.name = 'James'"
1014            ).lock_exclusive().lock_shared().lock_exclusive().execute()
1015            locking.set()
1016            time.sleep(2)
1017            locking.clear()
1018            if not waiting.is_set():
1019                session1.commit()
1020                self.fail(
1021                    "Collection_X_Lock_Modify_test IS NOT OK. Other thread is "
1022                    "not waiting while it is expected to!"
1023                )
1024            session1.commit()
1025
1026        def thread_b(locking, waiting):
1027            session2 = mysqlx.get_session(config)
1028            schema2 = session2.get_schema(config["schema"])
1029            collection = schema2.get_collection("mycoll6")
1030
1031            if not locking.wait(2):
1032                self.fail(
1033                    "Collection_X_Lock_Modify_test IS NOT OK. Other thread has "
1034                    "not set the lock!"
1035                )
1036            session2.start_transaction()
1037
1038            waiting.set()
1039            collection.modify("$.name == 'James'").set("$.age", 30).execute()
1040            waiting.clear()
1041
1042            session2.commit()
1043
1044        client1 = threading.Thread(
1045            target=thread_a,
1046            args=(
1047                locking,
1048                waiting,
1049            ),
1050        )
1051        client2 = threading.Thread(
1052            target=thread_b,
1053            args=(
1054                locking,
1055                waiting,
1056            ),
1057        )
1058
1059        client1.start()
1060        client2.start()
1061
1062        client1.join()
1063        client2.join()
1064        self.schema.drop_collection("mycoll6")
1065
1066    @tests.foreach_session()
1067    def test_collection_s_lock_modifyt(self):
1068        """Test lock shared and modify - modify will be blocked until the lock
1069        is released, but will be able to read."""
1070        config = tests.get_mysqlx_config()
1071        self._drop_collection_if_exists("mycoll7")
1072        collection = self.schema.create_collection("mycoll7")
1073        collection.add(
1074            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1075        ).execute()
1076
1077        locking = threading.Event()
1078        waiting = threading.Event()
1079
1080        def thread_a(locking, waiting):
1081            session1 = mysqlx.get_session(config)
1082            schema1 = session1.get_schema(config["schema"])
1083            collection = schema1.get_collection("mycoll7")
1084
1085            session1.start_transaction()
1086            collection.find(
1087                "$.name = 'James'"
1088            ).lock_exclusive().lock_shared().execute()
1089            locking.set()
1090            time.sleep(2)
1091            locking.clear()
1092            if not waiting.is_set():
1093                session1.commit()
1094                #                return
1095                raise Exception(
1096                    "Collection_S_Lock_Modify_test IS NOT OK. Other thread is "
1097                    "not waiting while it is expected to!"
1098                )
1099            session1.commit()
1100
1101        def thread_b(locking, waiting):
1102            session2 = mysqlx.get_session(config)
1103            schema2 = session2.get_schema(config["schema"])
1104            collection = schema2.get_collection("mycoll7")
1105
1106            if not locking.wait(2):
1107                raise Exception(
1108                    "Collection_S_Lock_Modify_test IS NOT OK. Other thread has "
1109                    "not set the lock!"
1110                )
1111            session2.start_transaction()
1112
1113            result = collection.find("$.name == 'James'").execute()
1114            assert result.fetch_all()[0]["age"] == 23
1115            waiting.set()
1116            collection.modify("$.name == 'James'").set("$.age", 30).execute()
1117            waiting.clear()
1118
1119            session2.commit()
1120
1121        client1 = threading.Thread(
1122            target=thread_a,
1123            args=(
1124                locking,
1125                waiting,
1126            ),
1127        )
1128        client2 = threading.Thread(
1129            target=thread_b,
1130            args=(
1131                locking,
1132                waiting,
1133            ),
1134        )
1135
1136        client1.start()
1137        client2.start()
1138
1139        client1.join()
1140        client2.join()
1141        self.schema.drop_collection("mycoll7")
1142
1143    @tests.foreach_session()
1144    def _lock_contention_test(self, lock_type_1, lock_type_2, lock_contention):
1145        """Test reading an exclusively locked document using lock_shared and
1146        the 'NOWAIT' waiting option."""
1147        config = tests.get_mysqlx_config()
1148        self._drop_collection_if_exists("mycoll")
1149        collection = self.schema.create_collection("mycoll")
1150        collection.add(
1151            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1152        ).execute()
1153
1154        locking = threading.Event()
1155        waiting = threading.Event()
1156
1157        errors = []
1158
1159        def thread_a(locking, waiting):
1160            session = mysqlx.get_session(config)
1161            schema = session.get_schema(config["schema"])
1162            collection = schema.get_collection("mycoll")
1163
1164            session.start_transaction()
1165            result = collection.find("name = 'James'")
1166            if lock_type_1 == "S":
1167                result.lock_shared().execute()
1168            else:
1169                result.lock_exclusive().execute()
1170
1171            locking.set()
1172            time.sleep(2)
1173            locking.clear()
1174
1175            if not waiting.is_set():
1176                errors.append(
1177                    "{0}-{1} lock test failure."
1178                    "".format(lock_type_1, lock_type_2)
1179                )
1180                session.commit()
1181                return
1182
1183        def thread_b(locking, waiting):
1184            session = mysqlx.get_session(config)
1185            schema = session.get_schema(config["schema"])
1186            collection = schema.get_collection("mycoll")
1187
1188            if not locking.wait(2):
1189                errors.append(
1190                    "{0}-{0} lock test failure."
1191                    "".format(lock_type_1, lock_type_2)
1192                )
1193                session.commit()
1194                return
1195
1196            session.start_transaction()
1197            if lock_type_2 == "S":
1198                result = collection.find("name = 'Fred'").lock_shared(
1199                    lock_contention
1200                )
1201            else:
1202                result = collection.find("name = 'Fred'").lock_exclusive(
1203                    lock_contention
1204                )
1205
1206            if lock_contention == mysqlx.LockContention.NOWAIT and (
1207                lock_type_1 == "X" or lock_type_2 == "X"
1208            ):
1209                try:
1210                    result.execute()
1211                except Exception:
1212                    pass
1213                session.rollback()
1214
1215            waiting.set()
1216
1217            session.start_transaction()
1218            result.execute()
1219            session.commit()
1220            waiting.clear()
1221
1222        client1 = threading.Thread(
1223            target=thread_a,
1224            args=(
1225                locking,
1226                waiting,
1227            ),
1228        )
1229        client2 = threading.Thread(
1230            target=thread_b,
1231            args=(
1232                locking,
1233                waiting,
1234            ),
1235        )
1236
1237        client1.start()
1238        client2.start()
1239
1240        client1.join()
1241        client2.join()
1242        self.schema.drop_collection("mycoll")
1243
1244    @unittest.skip("TODO: Fix me")
1245    @unittest.skipIf(
1246        tests.MYSQL_VERSION < (8, 0, 5), "Lock contention unavailable."
1247    )
1248    def test_lock_shared_with_nowait(self):
1249        self._lock_contention_test("S", "S", mysqlx.LockContention.NOWAIT)
1250        self._lock_contention_test("S", "X", mysqlx.LockContention.NOWAIT)
1251
1252    @tests.foreach_session()
1253    def test_collection_x_s_nowait(self):
1254        config = tests.get_mysqlx_config()
1255        self._drop_collection_if_exists("mycoll8")
1256        collection = self.schema.create_collection("mycoll8")
1257        collection.add(
1258            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1259        ).execute()
1260
1261        locking = threading.Event()
1262        waiting = threading.Event()
1263
1264        def thread_a(locking, waiting):
1265            session1 = mysqlx.get_session(config)
1266            schema1 = session1.get_schema(config["schema"])
1267            collection = schema1.get_collection("mycoll8")
1268
1269            session1.start_transaction()
1270            collection.find("name = 'James'").lock_exclusive().execute()
1271            locking.set()
1272            time.sleep(2)
1273            locking.clear()
1274            if waiting.is_set():
1275                session1.commit()
1276                self.fail(
1277                    "Collection_X_S_NOWAIT_test IS NOT OK. Other thread is "
1278                    "waiting while it is not expected to!"
1279                )
1280            session1.commit()
1281
1282        def thread_b(locking, waiting):
1283            session2 = mysqlx.get_session(config)
1284            schema2 = session2.get_schema(config["schema"])
1285            collection = schema2.get_collection("mycoll8")
1286
1287            if not locking.wait(2):
1288                self.fail(
1289                    "Collection_X_S_NOWAIT_test IS NOT OK. Other thread has "
1290                    "not set the lock!"
1291                )
1292
1293            session2.start_transaction()
1294            waiting.set()
1295            try:
1296                collection.find("name = 'James'").lock_exclusive(
1297                    mysqlx.LockContention.NOWAIT
1298                ).execute()
1299            except Exception:
1300                pass
1301
1302            waiting.clear()
1303
1304            session2.commit()
1305
1306        client1 = threading.Thread(
1307            target=thread_a,
1308            args=(
1309                locking,
1310                waiting,
1311            ),
1312        )
1313        client2 = threading.Thread(
1314            target=thread_b,
1315            args=(
1316                locking,
1317                waiting,
1318            ),
1319        )
1320
1321        client1.start()
1322        client2.start()
1323
1324        client1.join()
1325        client2.join()
1326        self.schema.drop_collection("mycoll8")
1327
1328    @tests.foreach_session()
1329    def test_collection_x_x_nowait(self):
1330        """Test exclusive-exclusive with NOWAIT lockcontention."""
1331        config = tests.get_mysqlx_config()
1332        self._drop_collection_if_exists("mycoll")
1333        collection = self.schema.create_collection("mycoll")
1334        collection.add(
1335            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1336        ).execute()
1337
1338        session1 = mysqlx.get_session(config)
1339        schema1 = session1.get_schema(config["schema"])
1340        collection1 = schema1.get_collection("mycoll")
1341        session1.start_transaction()
1342        collection1.find("name = 'James'").lock_exclusive().execute()
1343
1344        session2 = mysqlx.get_session(config)
1345        schema2 = session2.get_schema(config["schema"])
1346        collection2 = schema2.get_collection("mycoll")
1347        session2.start_transaction()
1348        self.assertRaises(
1349            mysqlx.OperationalError,
1350            collection2.find("name = 'James'")
1351            .lock_exclusive(mysqlx.LockContention.NOWAIT)
1352            .execute,
1353        )
1354        session2.rollback()
1355        session1.rollback()
1356        self.schema.drop_collection("mycoll")
1357        session2.close()
1358        session1.close()
1359
1360    @unittest.skip("TODO: Fix me")
1361    @tests.foreach_session()
1362    def test_collection_s_s_nowait(self):
1363        """Test shared-shared with NOWAIT lockcontention."""
1364        config = tests.get_mysqlx_config()
1365        self._drop_collection_if_exists("mycoll9")
1366        collection = self.schema.create_collection("mycoll9")
1367        collection.add(
1368            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1369        ).execute()
1370
1371        locking = threading.Event()
1372        waiting = threading.Event()
1373
1374        def thread_a(locking, waiting):
1375            session1 = mysqlx.get_session(config)
1376            schema1 = session1.get_schema(config["schema"])
1377            collection = schema1.get_collection("mycoll9")
1378
1379            session1.start_transaction()
1380            collection.find("name = 'James'").lock_shared().execute()
1381            locking.set()
1382            time.sleep(2)
1383            locking.clear()
1384            if waiting.is_set():
1385                session1.commit()
1386                self.fail(
1387                    "Collection_S_S_NOWAIT_test IS NOT OK. Other thread "
1388                    "is waiting while it is not expected to!"
1389                )
1390            session1.commit()
1391
1392        def thread_b(locking, waiting):
1393            session2 = mysqlx.get_session(config)
1394            schema2 = session2.get_schema(config["schema"])
1395            collection = schema2.get_collection("mycoll9")
1396
1397            if not locking.wait(2):
1398                self.fail(
1399                    "Collection_S_S_NOWAIT_test IS NOT OK. Other thread "
1400                    "has not set the lock!"
1401                )
1402
1403            session2.start_transaction()
1404            waiting.set()
1405            collection.find("name = 'James'").lock_shared(
1406                mysqlx.LockContention.NOWAIT
1407            ).execute()
1408            res = result.fetch_all()
1409            self.assertEqual(len(res), 1)
1410
1411            waiting.clear()
1412
1413            session2.commit()
1414
1415        client1 = threading.Thread(
1416            target=thread_a,
1417            args=(
1418                locking,
1419                waiting,
1420            ),
1421        )
1422        client2 = threading.Thread(
1423            target=thread_b,
1424            args=(
1425                locking,
1426                waiting,
1427            ),
1428        )
1429
1430        client1.start()
1431        client2.start()
1432
1433        client1.join()
1434        client2.join()
1435        self.schema.drop_collection("mycoll9")
1436
1437    @tests.foreach_session()
1438    def test_collection_s_x_nowait(self):
1439        """Test shared-exclusive with NOWAIT lockcontention."""
1440        config = tests.get_mysqlx_config()
1441        self._drop_collection_if_exists("mycoll10")
1442        collection = self.schema.create_collection("mycoll10")
1443        collection.add(
1444            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1445        ).execute()
1446
1447        session1 = mysqlx.get_session(config)
1448        schema1 = session1.get_schema(config["schema"])
1449        collection1 = schema1.get_collection("mycoll10")
1450        session1.start_transaction()
1451        collection1.find("name = 'James'").lock_shared().execute()
1452
1453        session2 = mysqlx.get_session(config)
1454        schema2 = session2.get_schema("test")
1455        collection2 = schema2.get_collection("mycoll10")
1456        session2.start_transaction()
1457        self.assertRaises(
1458            mysqlx.OperationalError,
1459            collection2.find("name = 'James'")
1460            .lock_exclusive(mysqlx.LockContention.NOWAIT)
1461            .execute,
1462        )
1463        session2.rollback()
1464        session1.rollback()
1465
1466        self.schema.drop_collection("mycoll10")
1467        session2.close()
1468        session1.close()
1469
1470    @tests.foreach_session()
1471    def test_collection_x_s_skip_locked(self):
1472        """Test exclusive-shared with SKIP LOCKED lockcontention."""
1473        config = tests.get_mysqlx_config()
1474        self._drop_collection_if_exists("mycoll")
1475        collection = self.schema.create_collection("mycoll")
1476        collection.add(
1477            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1478        ).execute()
1479
1480        # `session2.lock_exclusive(SKIP_LOCKED) returns data immediately.
1481        session1 = mysqlx.get_session(config)
1482        schema1 = session1.get_schema(config["schema"])
1483        collection1 = schema1.get_collection("mycoll")
1484        session1.start_transaction()
1485        result = collection1.find("name = 'James'").lock_exclusive().execute()
1486
1487        session2 = mysqlx.get_session(config)
1488        schema2 = session2.get_schema(config["schema"])
1489        collection2 = schema2.get_collection("mycoll")
1490        session2.start_transaction()
1491        result = (
1492            collection2.find("name = 'James'")
1493            .lock_shared(mysqlx.LockContention.SKIP_LOCKED)
1494            .execute()
1495        ).fetch_all()
1496        self.assertEqual(len(result), 0)
1497        session2.rollback()
1498
1499        session1.rollback()
1500        self.schema.drop_collection("mycoll")
1501        session2.close()
1502        session1.close()
1503
1504    @tests.foreach_session()
1505    def test_collection_x_x_skip_locked(self):
1506        """Test exclusive-exclusive with SKIP LOCKED lockcontention."""
1507        config = tests.get_mysqlx_config()
1508        self._drop_collection_if_exists("mycoll")
1509        collection = self.schema.create_collection("mycoll")
1510        collection.add(
1511            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1512        ).execute()
1513
1514        # `session2.lock_exclusive(SKIP_LOCKED) returns data immediately.
1515        session1 = mysqlx.get_session(config)
1516        schema1 = session1.get_schema(config["schema"])
1517        collection1 = schema1.get_collection("mycoll")
1518        session1.start_transaction()
1519        result = collection1.find("name = 'James'").lock_exclusive().execute()
1520
1521        session2 = mysqlx.get_session(config)
1522        schema2 = session2.get_schema(config["schema"])
1523        collection2 = schema2.get_collection("mycoll")
1524        session2.start_transaction()
1525        result = (
1526            collection2.find("name = 'James'")
1527            .lock_exclusive(mysqlx.LockContention.SKIP_LOCKED)
1528            .execute()
1529        )
1530        res = result.fetch_all()
1531        assert len(res) == 0
1532        session2.rollback()
1533
1534        session1.rollback()
1535        self.schema.drop_collection("mycoll")
1536        session2.close()
1537        session1.close()
1538
1539    @tests.foreach_session()
1540    def test_collection_s_s_skip_locked(self):
1541        """Test shared-shared with SKIP LOCKED lockcontention."""
1542        config = tests.get_mysqlx_config()
1543        self._drop_collection_if_exists("mycoll")
1544        collection = self.schema.create_collection("mycoll")
1545        collection.add(
1546            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1547        ).execute()
1548
1549        session1 = mysqlx.get_session(config)
1550        schema1 = session1.get_schema(config["schema"])
1551        collection1 = schema1.get_collection("mycoll")
1552        session1.start_transaction()
1553        result = collection1.find("name = 'James'").lock_shared().execute()
1554
1555        session2 = mysqlx.get_session(config)
1556        schema2 = session2.get_schema(config["schema"])
1557        collection2 = schema2.get_collection("mycoll")
1558        session2.start_transaction()
1559        result = (
1560            collection2.find("name = 'James'")
1561            .lock_shared(mysqlx.LockContention.SKIP_LOCKED)
1562            .execute()
1563        )
1564        res = result.fetch_all()
1565        assert len(res) == 1
1566        session2.rollback()
1567
1568        session1.rollback()
1569        self.schema.drop_collection("mycoll")
1570        session2.close()
1571        session1.close()
1572
1573    @tests.foreach_session()
1574    def test_collection_s_x_skip_locked(self):
1575        """Test shared-exclusive with SKIP LOCKED lockcontention."""
1576        config = tests.get_mysqlx_config()
1577        self._drop_collection_if_exists("mycoll")
1578        collection = self.schema.create_collection("mycoll")
1579        collection.add(
1580            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1581        ).execute()
1582
1583        # `session2.lock_exclusive(SKIP_LOCKED) returns data immediately.
1584        session1 = mysqlx.get_session(config)
1585        schema1 = session1.get_schema(config["schema"])
1586        collection1 = schema1.get_collection("mycoll")
1587        session1.start_transaction()
1588        result = collection1.find("name = 'James'").lock_shared().execute()
1589
1590        session2 = mysqlx.get_session(config)
1591        schema2 = session2.get_schema(config["schema"])
1592        collection2 = schema2.get_collection("mycoll")
1593        session2.start_transaction()
1594        result = (
1595            collection2.find("name = 'James'")
1596            .lock_exclusive(mysqlx.LockContention.SKIP_LOCKED)
1597            .execute()
1598        )
1599        res = result.fetch_all()
1600        assert len(res) == 0
1601        session2.rollback()
1602
1603        session1.rollback()
1604        self.schema.drop_collection("mycoll")
1605        session2.close()
1606        session1.close()
1607
1608    @tests.foreach_session()
1609    def test_collection_s_s_default(self):
1610        """Test exclusive-shared with DEFAULT lockcontention."""
1611        config = tests.get_mysqlx_config()
1612        self._drop_collection_if_exists("mycoll1")
1613        collection = self.schema.create_collection("mycoll1")
1614        collection.add(
1615            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1616        ).execute()
1617
1618        locking = threading.Event()
1619        waiting = threading.Event()
1620
1621        def thread_a(locking, waiting):
1622            session1 = mysqlx.get_session(config)
1623            schema1 = session1.get_schema(config["schema"])
1624            collection = schema1.get_collection("mycoll1")
1625
1626            session1.start_transaction()
1627            collection.find("name = 'James'").lock_shared().execute()
1628            locking.set()
1629            time.sleep(2)
1630            locking.clear()
1631            if waiting.is_set():
1632                session1.commit()
1633                self.fail(
1634                    "Collection_S_S_DEFAULT_test IS NOT OK. Other thread is "
1635                    "waiting while it is not expected to!"
1636                )
1637            session1.commit()
1638
1639        def thread_b(locking, waiting):
1640            session2 = mysqlx.get_session(config)
1641            schema2 = session2.get_schema(config["schema"])
1642            collection = schema2.get_collection("mycoll1")
1643
1644            if not locking.wait(2):
1645                self.fail(
1646                    "Collection_S_S_DEFAULT_test IS NOT OK. Other thread has "
1647                    "not set the lock!"
1648                )
1649            session2.start_transaction()
1650
1651            waiting.set()
1652            collection.find("name = 'James'").lock_shared(
1653                mysqlx.LockContention.DEFAULT
1654            ).execute()
1655            waiting.clear()
1656
1657            session2.commit()
1658
1659        client1 = threading.Thread(
1660            target=thread_a,
1661            args=(
1662                locking,
1663                waiting,
1664            ),
1665        )
1666        client2 = threading.Thread(
1667            target=thread_b,
1668            args=(
1669                locking,
1670                waiting,
1671            ),
1672        )
1673
1674        client1.start()
1675        client2.start()
1676
1677        client1.join()
1678        client2.join()
1679        self.schema.drop_collection("mycoll1")
1680
1681    @tests.foreach_session()
1682    def test_collection_s_x_default(self):
1683        """Test shared-exclusive lock."""
1684        config = tests.get_mysqlx_config()
1685        session1 = mysqlx.get_session(config)
1686        self._drop_collection_if_exists("mycoll12")
1687        collection = self.schema.create_collection("mycoll2")
1688        collection.add(
1689            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1690        ).execute()
1691
1692        locking = threading.Event()
1693        waiting = threading.Event()
1694
1695        def thread_a(locking, waiting):
1696            session1 = mysqlx.get_session(config)
1697            schema1 = session1.get_schema(config["schema"])
1698            collection = schema1.get_collection("mycoll2")
1699
1700            session1.start_transaction()
1701            collection.find("name = 'James'").lock_shared().execute()
1702            locking.set()
1703            time.sleep(2)
1704            locking.clear()
1705            if not waiting.is_set():
1706                session1.commit()
1707                self.fail(
1708                    "Collection_S_X_DEFAULT_test IS NOT OK. Other thread is "
1709                    "not waiting while it is expected to!"
1710                )
1711            session1.commit()
1712
1713        def thread_b(locking, waiting):
1714            session1 = mysqlx.get_session(config)
1715            schema1 = session1.get_schema(config["schema"])
1716            collection = schema1.get_collection("mycoll2")
1717
1718            if not locking.wait(2):
1719                self.fail(
1720                    "Collection_S_X_DEFAULT_test IS NOT OK. Other thread has "
1721                    "not set the lock!"
1722                )
1723            session1.start_transaction()
1724            waiting.set()
1725            collection.find("name = 'James'").lock_exclusive(
1726                mysqlx.LockContention.DEFAULT
1727            ).execute()
1728            waiting.clear()
1729            session1.commit()
1730
1731        client1 = threading.Thread(
1732            target=thread_a,
1733            args=(
1734                locking,
1735                waiting,
1736            ),
1737        )
1738        client2 = threading.Thread(
1739            target=thread_b,
1740            args=(
1741                locking,
1742                waiting,
1743            ),
1744        )
1745
1746        client1.start()
1747        client2.start()
1748
1749        client1.join()
1750        client2.join()
1751        self.schema.drop_collection("mycoll2")
1752
1753    @tests.foreach_session()
1754    def test_collection_x_x_default(self):
1755        """Test exclusive-exclusive lock."""
1756        config = tests.get_mysqlx_config()
1757        self._drop_collection_if_exists("mycoll13")
1758        collection = self.schema.create_collection("mycoll3")
1759        collection.add(
1760            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1761        ).execute()
1762
1763        locking = threading.Event()
1764        waiting = threading.Event()
1765
1766        def thread_a(locking, waiting):
1767            session1 = mysqlx.get_session(config)
1768            schema1 = session1.get_schema(config["schema"])
1769            collection = schema1.get_collection("mycoll3")
1770
1771            session1.start_transaction()
1772            collection.find("name = 'James'").lock_exclusive().execute()
1773            locking.set()
1774            time.sleep(2)
1775            locking.clear()
1776            if not waiting.is_set():
1777                session1.commit()
1778                self.fail(
1779                    "Collection_X_X_DEFAULT_test IS NOT OK. Other thread is "
1780                    "not waiting while it is expected to!"
1781                )
1782            session1.commit()
1783
1784        def thread_b(locking, waiting):
1785            session2 = mysqlx.get_session(config)
1786            schema2 = session2.get_schema(config["schema"])
1787            collection = schema2.get_collection("mycoll3")
1788
1789            if not locking.wait(2):
1790                self.fail(
1791                    "Collection_X_X_DEFAULT_test IS NOT OK. Other thread has "
1792                    "not set the lock!"
1793                )
1794            session2.start_transaction()
1795
1796            waiting.set()
1797            collection.find("name = 'James'").lock_exclusive(
1798                mysqlx.LockContention.DEFAULT
1799            ).execute()
1800            waiting.clear()
1801
1802            session2.commit()
1803
1804        client1 = threading.Thread(
1805            target=thread_a,
1806            args=(
1807                locking,
1808                waiting,
1809            ),
1810        )
1811        client2 = threading.Thread(
1812            target=thread_b,
1813            args=(
1814                locking,
1815                waiting,
1816            ),
1817        )
1818
1819        client1.start()
1820        client2.start()
1821
1822        client1.join()
1823        client2.join()
1824        self.schema.drop_collection("mycoll3")
1825
1826    @tests.foreach_session()
1827    def test_collection_x_s_default(self):
1828        """Test exclusive-exclusive lock."""
1829        config = tests.get_mysqlx_config()
1830        self._drop_collection_if_exists("mycoll4")
1831        collection = self.schema.create_collection("mycoll4")
1832        collection.add(
1833            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1834        ).execute()
1835
1836        locking = threading.Event()
1837        waiting = threading.Event()
1838
1839        def thread_a(locking, waiting):
1840            session1 = mysqlx.get_session(config)
1841            schema1 = session1.get_schema(config["schema"])
1842            collection = schema1.get_collection("mycoll4")
1843
1844            session1.start_transaction()
1845            collection.find("name = 'James'").lock_exclusive().execute()
1846            locking.set()
1847            time.sleep(2)
1848            locking.clear()
1849            if not waiting.is_set():
1850                session1.commit()
1851                self.fail(
1852                    "Collection_X_S_DEFAULT_test IS NOT OK. Other thread is "
1853                    "not waiting while it is expected to!"
1854                )
1855            session1.commit()
1856
1857        def thread_b(locking, waiting):
1858            session2 = mysqlx.get_session(config)
1859            schema2 = session2.get_schema(config["schema"])
1860            collection = schema2.get_collection("mycoll4")
1861
1862            if not locking.wait(2):
1863                self.fail(
1864                    "Collection_X_S_DEFAULT_test IS NOT OK. Other thread has "
1865                    "not set the lock!"
1866                )
1867            session2.start_transaction()
1868
1869            waiting.set()
1870            collection.find("name = 'James'").lock_shared(
1871                mysqlx.LockContention.DEFAULT
1872            ).execute()
1873            waiting.clear()
1874
1875            session2.commit()
1876
1877        client1 = threading.Thread(
1878            target=thread_a,
1879            args=(
1880                locking,
1881                waiting,
1882            ),
1883        )
1884        client2 = threading.Thread(
1885            target=thread_b,
1886            args=(
1887                locking,
1888                waiting,
1889            ),
1890        )
1891
1892        client1.start()
1893        client2.start()
1894
1895        client1.join()
1896        client2.join()
1897        self.schema.drop_collection("mycoll4")
1898
1899    @tests.foreach_session()
1900    def test_collection_multiple_lock_contention_calls(self):
1901        """Test multiple lock calls."""
1902        config = tests.get_mysqlx_config()
1903        self._drop_collection_if_exists("mycoll5")
1904
1905        collection = self.schema.create_collection("mycoll5")
1906        collection.add(
1907            {"name": "Joe", "age": 21}, {"name": "James", "age": 23}
1908        ).execute()
1909
1910        locking = threading.Event()
1911        waiting = threading.Event()
1912
1913        errors = []
1914
1915        def thread_a(locking, waiting):
1916            session1 = mysqlx.get_session(config)
1917            schema1 = session1.get_schema(config["schema"])
1918            collection = schema1.get_collection("mycoll5")
1919
1920            session1.start_transaction()
1921            collection.find(
1922                "name = 'James'"
1923            ).lock_exclusive().lock_shared().lock_exclusive().execute()
1924            locking.set()
1925            time.sleep(2)
1926            locking.clear()
1927            if waiting.is_set():
1928                session1.commit()
1929                self.fail(
1930                    "Collection_Multiple_LockContention_calls_test IS NOT OK. "
1931                    "Other thread is not waiting while it is expected to!"
1932                )
1933            session1.commit()
1934
1935        def thread_b(locking, waiting):
1936            session2 = mysqlx.get_session(config)
1937            schema2 = session2.get_schema(config["schema"])
1938            collection = schema2.get_collection("mycoll5")
1939
1940            if not locking.wait(2):
1941                self.fail(
1942                    "Collection_Multiple_LockContention_calls_test IS NOT OK. "
1943                    "Other thread has not set the lock!"
1944                )
1945            session2.start_transaction()
1946
1947            waiting.set()
1948            result = (
1949                collection.find("name = 'James'")
1950                .lock_shared(mysqlx.LockContention.DEFAULT)
1951                .lock_exclusive(mysqlx.LockContention.SKIP_LOCKED)
1952                .lock_exclusive(mysqlx.LockContention.NOWAIT)
1953                .lock_shared(mysqlx.LockContention.SKIP_LOCKED)
1954                .execute()
1955            )
1956            res = result.fetch_all()
1957            self.assertEqual(len(res), 0)
1958            waiting.clear()
1959
1960            session2.commit()
1961
1962        client1 = threading.Thread(
1963            target=thread_a,
1964            args=(
1965                locking,
1966                waiting,
1967            ),
1968        )
1969        client2 = threading.Thread(
1970            target=thread_b,
1971            args=(
1972                locking,
1973                waiting,
1974            ),
1975        )
1976
1977        client1.start()
1978        client2.start()
1979        client1.join()
1980        client2.join()
1981        self.schema.drop_collection("mycoll5")
1982
1983    @tests.foreach_session()
1984    def test_parameter_binding(self):
1985        """Test the MCPY-354 issue ( parameter_binding) with parameter list."""
1986        self._drop_collection_if_exists("mycoll1")
1987        collection = self.schema.create_collection("mycoll1")
1988        collection.add(
1989            {"_id": 1, "name": "a"}, {"_id": 2, "name": "b"}
1990        ).execute()
1991        result = (
1992            collection.find("name == :name").bind('{"name":"b"}').execute()
1993        )
1994        row = result.fetch_all()
1995        self.assertEqual(row[0]["_id"], 2)
1996        self.schema.drop_collection("mycoll1")
1997
1998    @tests.foreach_session()
1999    def test_parameter_binding(self):
2000        """Test the MCPY-354 issue ( parameter_binding) with dict and date
2001        datatype."""
2002        self._drop_collection_if_exists("mycoll2")
2003        collection = self.schema.create_collection("mycoll2")
2004        collection.add(
2005            {"_id": 1, "bday": "2000-10-10"},
2006            {"_id": 2, "bday": "2000-10-11"},
2007        ).execute()
2008        result = (
2009            collection.find("bday == :bday")
2010            .bind("bday", "2000-10-11")
2011            .execute()
2012        )
2013        row = result.fetch_all()
2014        self.assertEqual(row[0]["_id"], 2)
2015        self.schema.drop_collection("mycoll2")
2016
2017    @tests.foreach_session()
2018    def Csbufferring_test1():
2019        """Test client-side buffering."""
2020        self._drop_collection_if_exists("mycoll3")
2021        collection = self.schema.create_collection("mycoll3")
2022        collection.add(
2023            {"_id": 1, "name": "a"}, {"_id": 2, "name": "b"}
2024        ).execute()
2025        result1 = (
2026            collection.find("name == :name").bind('{"name":"b"}').execute()
2027        )
2028        result2 = (
2029            collection.find("name == :name").bind('{"name":"a"}').execute()
2030        )
2031        row2 = result2.fetch_all()
2032        self.assertEqual(row2[0]["_id"], 1)
2033        row1 = result1.fetch_all()
2034        self.assertEqual(row1[0]["_id"], 2)
2035        self.schema.drop_collection("mycoll3")
2036