1# Copyright (c) 2021, Oracle and/or its affiliates. 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License, version 2.0, as 5# published by the Free Software Foundation. 6# 7# This program is also distributed with certain software (including 8# but not limited to OpenSSL) that is licensed under separate terms, 9# as designated in a particular file or component or in included license 10# documentation. The authors of MySQL hereby grant you an 11# additional permission to link the program and your derivative works 12# with the separately licensed software that they have included with 13# MySQL. 14# 15# Without limiting anything contained in the foregoing, this file, 16# which is part of MySQL Connector/Python, is also subject to the 17# Universal FOSS Exception, version 1.0, a copy of which can be found at 18# http://oss.oracle.com/licenses/universal-foss-exception. 19# 20# This program is distributed in the hope that it will be useful, but 21# WITHOUT ANY WARRANTY; without even the implied warranty of 22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 23# See the GNU General Public License, version 2.0, for more details. 24# 25# You should have received a copy of the GNU General Public License 26# along with this program; if not, write to the Free Software Foundation, Inc., 27# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 28 29import os 30import threading 31import time 32import unittest 33 34import mysqlx 35import tests 36 37 38@unittest.skipIf(tests.MYSQL_VERSION < (8, 0, 25), "XPlugin not compatible") 39class CollectionAddTests(tests.MySQLxTests): 40 """Tests for collection.find().""" 41 42 def _drop_collection_if_exists(self, name): 43 collection = self.schema.get_collection(name) 44 if collection.exists_in_database(): 45 self.schema.drop_collection(name) 46 47 @tests.foreach_session() 48 def test_collection_find1(self): 49 """Test collection.find.fields.""" 50 self._drop_collection_if_exists("mycoll5") 51 collection = self.schema.create_collection("mycoll5") 52 collection.add( 53 {"_id": 1, "name": "a", "age": 21}, 54 {"_id": 2, "name": "b"}, 55 {"_id": 3, "name": "c"}, 56 ).execute() 57 result = ( 58 collection.find().fields("sum($.age)").group_by("$.age").execute() 59 ).fetch_all() 60 self.assertEqual(len(result), 2) 61 self.schema.drop_collection("mycoll5") 62 63 @unittest.skip("TODO: Fix me") 64 @tests.foreach_session() 65 def test_collection_find2(self): 66 """Test the collection.find.groupby and having.""" 67 self._drop_collection_if_exists("mycoll6") 68 collection = self.schema.create_collection("mycoll6") 69 collection.add( 70 {"_id": 1, "a": 1, "b": 2, "c": 100}, 71 {"_id": 2, "a": 2, "b": 1, "c": 200}, 72 {"_id": 3, "a": 3, "b": 2, "c": 300}, 73 ).execute() 74 result = ( 75 collection.find() 76 .fields("$.a, $.b") 77 .group_by("$.b") 78 .having("$.a > 1") 79 .execute() 80 ).fetch_all() 81 self.assertEqual(len(result), 2) 82 self.schema.drop_collection("mycoll6") 83 84 @tests.foreach_session() 85 def test_collection_find3(self): 86 """Test collection.find with sort.""" 87 self._drop_collection_if_exists("mycoll9") 88 collection = self.schema.create_collection("mycoll9") 89 collection.add( 90 {"_id": 1, "a": 1, "b": 2, "c": 100}, 91 {"_id": 2, "a": 2, "b": 1, "c": 200}, 92 {"_id": 3, "a": 3, "b": 2, "c": 300}, 93 ).execute() 94 result = collection.find().fields("$.a, $b").sort("a DESC").execute() 95 row = result.fetch_all() 96 self.assertEqual(row[0]["$.a"], 3) 97 self.schema.drop_collection("mycoll9") 98 99 @tests.foreach_session() 100 def test_collection_find4(self): 101 """Test collection.find with limit with offset.""" 102 self._drop_collection_if_exists("mycoll10") 103 collection = self.schema.create_collection("mycoll10") 104 collection.add( 105 {"_id": 1, "a": 1, "b": 2, "c": 100}, 106 {"_id": 2, "a": 2, "b": 1, "c": 200}, 107 {"_id": 3, "a": 3, "b": 2, "c": 300}, 108 ).execute() 109 result = ( 110 collection.find("$.a > 1") 111 .fields("$.a") 112 .limit(2) 113 .offset(1) 114 .execute() 115 ) 116 row = result.fetch_all() 117 self.schema.drop_collection("mycoll10") 118 119 @unittest.skip("TODO: Fix me") 120 @tests.foreach_session() 121 def test_collection_find5(self): 122 """Test collection.find with like.""" 123 self._drop_collection_if_exists("mycoll11") 124 collection = self.schema.create_collection("mycoll11") 125 collection.add( 126 {"_id": 1, "name": "Sana"}, 127 {"_id": 2, "name": "Sam"}, 128 {"_id": 3, "name": "amr"}, 129 ).execute() 130 result = collection.find("$.name like S*").execute() 131 row = result.fetch_all() 132 self.assertEqual(len(row), 2) 133 self.assertEqual(row[1]["name"], "Sam") 134 self.schema.drop_collection("mycoll11") 135 136 @tests.foreach_session() 137 def test_collection_find6(self): 138 """Test collection.find with bind.""" 139 self._drop_collection_if_exists("mycoll11") 140 collection = self.schema.create_collection("mycoll11") 141 collection.add( 142 {"_id": 1, "name": "Sana"}, 143 {"_id": 2, "name": "Sam"}, 144 {"_id": 3, "name": "amr"}, 145 ).execute() 146 result = ( 147 collection.find("$.name == :name").bind("name", "Sana").execute() 148 ) 149 row = result.fetch_all()[0] 150 self.assertEqual(row["_id"], 1) 151 self.schema.drop_collection("mycoll11") 152 153 @tests.foreach_session() 154 def test_collection_find7(self): 155 """Test collection.find with parameter list.""" 156 self._drop_collection_if_exists("mycoll19") 157 collection = self.schema.create_collection("mycoll19") 158 collection.add( 159 {"_id": 1, "name": "Sana"}, 160 {"_id": 2, "name": "Sam"}, 161 {"_id": 3, "name": "amr"}, 162 ).execute() 163 result = ( 164 collection.find("$._id > 1").fields("$._id", "$.name").execute() 165 ).fetch_all() 166 self.assertEqual(len(result), 2) 167 self.schema.drop_collection("mycoll19") 168 169 @unittest.skipIf( 170 tests.MYSQL_EXTERNAL_SERVER, 171 "Test not available for external MySQL servers", 172 ) 173 @tests.foreach_session() 174 def test_collection_find8(self): 175 """Test collection.find.groupby with parameter list.""" 176 self._drop_collection_if_exists("mycoll20") 177 collection = self.schema.create_collection( 178 "mycol20", 179 ) 180 collection.add( 181 {"_id": 1, "a": 1, "b": 2, "c": 100}, 182 {"_id": 2, "a": 2, "b": 2, "c": 300}, 183 {"_id": 3, "a": 3, "b": 2, "c": 100}, 184 ).execute() 185 result = ( 186 collection.find() 187 .fields("$a,$.b,$.c") 188 .group_by("$.b", "$.c") 189 .execute() 190 ).fetch_all() 191 self.assertEqual(len(result), 2) 192 self.schema.drop_collection("mycol20") 193 194 @tests.foreach_session() 195 def test_collection_find9(self): 196 """Test collection.find.sort with param list.""" 197 self._drop_collection_if_exists("mycoll21") 198 collection = self.schema.create_collection("mycoll21") 199 collection.add( 200 {"_id": 1, "a": 1, "b": 10, "c": 100}, 201 {"_id": 2, "a": 1, "b": 11, "c": 200}, 202 {"_id": 3, "a": 2, "b": 10, "c": 300}, 203 ).execute() 204 result = ( 205 collection.find() 206 .fields("$.a, $.b") 207 .sort("a ASC", "b DESC") 208 .execute() 209 ).fetch_all() 210 self.assertEqual(result[0]["$.b"], 11) 211 self.schema.drop_collection("mycoll21") 212 213 @tests.foreach_session() 214 def test_collection_find10(self): 215 """Test collection.find using where() condition.""" 216 self._drop_collection_if_exists("newcoll1") 217 collection = self.schema.create_collection("newcoll1") 218 collection.add( 219 {"_id": 1, "a": 1, "b": 10, "c": 100}, 220 {"_id": 2, "a": 1, "b": 11, "c": 200}, 221 {"_id": 3, "a": 2, "b": 10, "c": 300}, 222 ).execute() 223 result = collection.find().where("$.c >= 200").execute() 224 self.assertEqual(len(result.fetch_all()), 2) 225 self.schema.drop_collection("newcoll1") 226 227 @unittest.skipUnless( 228 tests.ARCH_64BIT, "Test available only for 64 bit platforms" 229 ) 230 @unittest.skipIf(os.name == "nt", "Test not available for Windows") 231 @tests.foreach_session() 232 def test_collection_find11(self): 233 """Test collection.find with offset as large positive number.""" 234 self._drop_collection_if_exists("newcoll2") 235 collection = self.schema.create_collection( 236 "newcoll2", 237 ) 238 collection.add( 239 {"_id": 1, "a": 1, "b": 10, "c": 100}, 240 {"_id": 2, "a": 1, "b": 11, "c": 200}, 241 {"_id": 3, "a": 2, "b": 10, "c": 300}, 242 ).execute() 243 result = collection.find().limit(2).offset(92898832378723).execute() 244 self.assertEqual(len(result.fetch_all()), 0) 245 self.schema.drop_collection("newcoll2") 246 247 @tests.foreach_session() 248 def test_collection_find12(self): 249 """Test collection.find with offset as negative number.""" 250 self._drop_collection_if_exists("mycoll3") 251 collection = self.schema.create_collection("newcoll3") 252 collection.add( 253 {"_id": 1, "a": 1, "b": 10, "c": 100}, 254 {"_id": 2, "a": 1, "b": 11, "c": 200}, 255 {"_id": 3, "a": 2, "b": 10, "c": 300}, 256 ).execute() 257 self.assertRaises( 258 ValueError, 259 collection.find().limit(2).offset, 260 -2378723, 261 ) 262 self.schema.drop_collection("newcoll3") 263 264 @tests.foreach_session() 265 def test_operator1(self): 266 """Test binary operator and.""" 267 self._drop_collection_if_exists("mycoll1") 268 collection = self.schema.create_collection("mycoll1") 269 collection.add( 270 {"_id": 1, "name": "Sana"}, 271 {"_id": 2, "name": "Sam"}, 272 {"_id": 3, "name": "amr"}, 273 ).execute() 274 result = ( 275 collection.find("$.name == :name and $._id == :id") 276 .bind('{"name":"Sana" ,"id":1}') 277 .execute() 278 ) 279 row = result.fetch_all()[0] 280 self.assertEqual(row["name"], "Sana") 281 self.schema.drop_collection("mycoll1") 282 283 @tests.foreach_session() 284 def test_operator4(self): 285 """Test 'between' operator.""" 286 self._drop_collection_if_exists("mycoll2") 287 collection = self.schema.create_collection("mycoll2") 288 collection.add( 289 {"_id": 1, "name": "Sana"}, 290 {"_id": 2, "name": "Sam"}, 291 {"_id": 3, "name": "amr"}, 292 {"_id": 4, "name": "abc"}, 293 {"_id": 5, "name": "def"}, 294 ).execute() 295 result = collection.find("$._id between 2 and 4").execute() 296 self.assertEqual(len(result.fetch_all()), 3) 297 self.schema.drop_collection("mycoll2") 298 299 # Testing the contains operator with single operand on both sides 300 301 @tests.foreach_session() 302 def test_contains_operator_test1(self): 303 """Test IN operator with string on both sides - With LHS in RHS.""" 304 self._drop_collection_if_exists("mycoll1") 305 collection = self.schema.create_collection("mycoll1") 306 collection.add({"name": "a"}, {"name": "b"}).execute() 307 result = collection.find("'a' IN $.name").execute() 308 self.assertEqual(len(result.fetch_all()), 1) 309 self.schema.drop_collection("mycoll1") 310 311 @tests.foreach_session() 312 def test_contains_operator2(self): 313 """Test IN operator with int as operand - With LHS in RHS.""" 314 self._drop_collection_if_exists("mycoll2") 315 collection = self.schema.create_collection("mycoll2") 316 collection.add( 317 {"name": "a", "age": 21}, {"name": "b", "age": 21} 318 ).execute() 319 result = collection.find("21 IN $.age").execute() 320 self.assertEqual(len(result.fetch_all()), 2) 321 self.schema.drop_collection("mycoll2") 322 323 @unittest.skip("TODO: Fix me") 324 @tests.foreach_session() 325 def test_contains_operator3(self): 326 """Test IN operator with boolean as operand - With LHS in RHS.""" 327 self._drop_collection_if_exists("mycoll3") 328 collection = self.schema.create_collection("mycoll3") 329 collection.add( 330 {"name": "a", "age": 21, "ARR": [1, 4]}, 331 {"name": "b", "age": 21, "ARR": 2}, 332 ).execute() 333 result = collection.find("(!false && true) IN [true]").execute() 334 self.assertEqual(len(result.fetch_all()), 2) 335 self.schema.drop_collection("mycoll3") 336 337 @tests.foreach_session() 338 def test_contains_operator4(self): 339 """Test NOT IN operator with string operand - With LHS not in RHS.""" 340 self._drop_collection_if_exists("mycoll4") 341 collection = self.schema.create_collection("mycoll4") 342 collection.add({"name": "a"}, {"name": "b"}, {"name": "c"}).execute() 343 result = collection.find("$.name NOT IN 'a'").execute() 344 self.assertEqual(len(result.fetch_all()), 2) 345 self.schema.drop_collection("mycoll4") 346 347 @tests.foreach_session() 348 def test_contains_operator5(self): 349 """Test NOT IN operator with int as operand - With LHS not in RHS.""" 350 self._drop_collection_if_exists("mycoll5") 351 collection = self.schema.create_collection("mycoll5") 352 collection.add( 353 {"name": "a", "age": 21}, {"name": "b", "age": 22} 354 ).execute() 355 result = collection.find("21 NOT IN $.age").execute() 356 self.assertEqual(len(result.fetch_all()), 1) 357 self.schema.drop_collection("mycoll5") 358 359 @tests.foreach_session() 360 def test_contains_operator6(self): 361 self._drop_collection_if_exists("mycoll6") 362 collection = self.schema.create_collection("mycoll6") 363 collection.add( 364 {"name": "a", "age": 21}, {"name": "b", "age": 21} 365 ).execute() 366 result = collection.find("'b' NOT IN $.name").execute() 367 self.assertEqual(len(result.fetch_all()), 1) 368 self.schema.drop_collection("mycoll6") 369 370 @tests.foreach_session() 371 def test_contains_operator7(self): 372 """Test IN operator with different datatypes as operands.""" 373 self._drop_collection_if_exists("mycoll7") 374 collection = self.schema.create_collection("mycoll7") 375 collection.add( 376 {"name": "a", "age": 21}, {"name": "b", "age": 22} 377 ).execute() 378 result = collection.find("21 IN $.name").execute() 379 result1 = collection.find("'b' IN $.age").limit(1).execute() 380 self.assertEqual(len(result.fetch_all()), 0) 381 self.assertEqual(len(result1.fetch_all()), 0), 382 self.schema.drop_collection("mycoll7") 383 384 @tests.foreach_session() 385 def test_contains_operator8(self): 386 """Test IN operator with single element on LHS and array/list on 387 RHS and vice versa.""" 388 self._drop_collection_if_exists("mycoll8") 389 collection = self.schema.create_collection("mycoll8") 390 collection.add( 391 {"_id": 1, "name": "a", "age": 21, "prof": ["x", "y"]}, 392 {"_id": 2, "name": "b", "age": 24, "prof": ["p", "q"]}, 393 {"_id": 3, "name": "c", "age": 26}, 394 ).execute() 395 result = collection.find("$.age IN [21,23,24,28]").execute() 396 result1 = collection.find("$.name IN ['a','b','c','d','e']").execute() 397 result2 = collection.find("$.age IN (21,23)").execute() 398 result3 = ( 399 collection.find() 400 .fields("21 IN (22,23) as test") 401 .limit(1) 402 .execute() 403 ) 404 result4 = collection.find('["p","q"] IN $.prof').execute() 405 self.assertEqual(len(result.fetch_all()), 2) 406 self.assertEqual(len(result1.fetch_all()), 3) 407 self.assertEqual(len(result2.fetch_all()), 1) 408 self.assertEqual(result3.fetch_all()[0].test, False) 409 self.assertEqual(len(result4.fetch_all()), 1) 410 self.schema.drop_collection("mycoll8") 411 412 @tests.foreach_session() 413 def test_contains_operator9(self): 414 """Test IN operator with single element on LHS and dict on RHS and 415 vice versa.""" 416 self._drop_collection_if_exists("mycoll9") 417 collection = self.schema.create_collection("mycoll9") 418 collection.add( 419 { 420 "_id": 1, 421 "name": "joy", 422 "age": 21, 423 "additionalinfo": { 424 "company": "xyz", 425 "vehicle": "bike", 426 "hobbies": [ 427 "reading", 428 "music", 429 "playing", 430 {"a1": "x", "b1": "y", "c1": "z"}, 431 ], 432 }, 433 } 434 ).execute() 435 result = collection.find( 436 "'reading' IN $.additionalinfo.hobbies" 437 ).execute() 438 result1 = ( 439 collection.find() 440 .fields("'music' IN $.age as test") 441 .limit(1) 442 .execute() 443 ) 444 result2 = ( 445 collection.find() 446 .fields("'boxing' IN $.additionalinfo.hobbies as test1") 447 .limit(1) 448 .execute() 449 ) 450 result3 = collection.find( 451 '{"a1":"x","b1":"y","c1":"z"} IN $.additionalinfo.hobbies' 452 ).execute() 453 self.assertEqual(len(result.fetch_all()), 1) 454 self.assertFalse(result1.fetch_all()[0].test) 455 self.assertFalse(result2.fetch_all()[0].test1) 456 self.assertEqual(len(result3.fetch_all()), 1) 457 self.schema.drop_collection("mycoll9") 458 459 @tests.foreach_session() 460 def test_contains_operator10(self): 461 """Test IN operator with array/list operand on LHS and array/list on 462 RHS.""" 463 self._drop_collection_if_exists("mycoll10") 464 collection = self.schema.create_collection("mycoll10") 465 collection.add( 466 { 467 "_id": 1, 468 "name": "joy", 469 "age": 21, 470 "additionalinfo": { 471 "company": "xyz", 472 "vehicle": "bike", 473 "hobbies": ["reading", "music", "playing"], 474 }, 475 }, 476 { 477 "_id": 2, 478 "name": "happy", 479 "age": 24, 480 "additionalinfo": { 481 "company": "abc", 482 "vehicle": "car", 483 "hobbies": ["playing", "painting", "boxing"], 484 }, 485 }, 486 ).execute() 487 result = collection.find( 488 '["playing","painting","boxing"] IN $.additionalinfo.hobbies' 489 ).execute() 490 result1 = ( 491 collection.find() 492 .fields('["happy","joy"] IN $.name as test') 493 .limit(1) 494 .execute() 495 ) 496 result2 = ( 497 collection.find() 498 .fields('["car","bike"] NOT IN $.additionalinfo.vehicle as test1') 499 .limit(1) 500 .execute() 501 ) 502 self.assertEqual(len(result.fetch_all()), 1) 503 self.assertFalse(result1.fetch_all()[0].test) 504 self.assertTrue(result2.fetch_all()[0].test1) 505 self.schema.drop_collection("mycoll10") 506 507 @tests.foreach_session() 508 def test_contains_operator11(self): 509 """Test IN operator with dict on LHS and dict on RHS.""" 510 self._drop_collection_if_exists("mycoll11") 511 collection = self.schema.create_collection("mycoll11") 512 collection.add( 513 { 514 "_id": 1, 515 "name": "joy", 516 "age": 21, 517 "additionalinfo": [ 518 {"company": "xyz", "vehicle": "bike"}, 519 {"company": "abc", "vehicle": "car"}, 520 {"company": "mno", "vehicle": "zeep"}, 521 ], 522 }, 523 { 524 "_id": 2, 525 "name": "happy", 526 "age": 24, 527 "additionalinfo": [ 528 {"company": "abc", "vehicle": "car"}, 529 {"company": "pqr", "vehicle": "bicycle"}, 530 ], 531 }, 532 { 533 "_id": 3, 534 "name": "nice", 535 "age": 25, 536 "additionalinfo": {"company": "def", "vehicle": "none"}, 537 }, 538 ).execute() 539 result = collection.find( 540 '{"company":"abc","vehicle":"car"} IN $.additionalinfo' 541 ).execute() 542 result1 = ( 543 collection.find() 544 .fields('{"vehicle":"car"} NOT IN $.additionalinfo as test') 545 .limit(1) 546 .execute() 547 ) 548 result2 = ( 549 collection.find() 550 .fields('{"company":"mno"} IN $.additionalinfo as test1') 551 .limit(1) 552 .execute() 553 ) 554 result3 = collection.find( 555 '{"company":"abc","vehicle":"car"} NOT IN $.additionalinfo' 556 ).execute() 557 self.assertEqual(len(result.fetch_all()), 2) 558 self.assertFalse(result1.fetch_all()[0].test) 559 self.assertTrue(result2.fetch_all()[0].test1) 560 self.assertEqual(len(result3.fetch_all()), 1) 561 self.schema.drop_collection("mycoll11") 562 563 @tests.foreach_session() 564 def test_contains_operator12(self): 565 """Test IN operator with operands having expressions.""" 566 self._drop_collection_if_exists("mycoll12") 567 collection = self.schema.create_collection("mycoll12") 568 collection.add( 569 {"_id": 1, "name": "a", "age": 21}, 570 {"_id": 2, "name": "b"}, 571 {"_id": 3, "name": "c"}, 572 ).execute() 573 result = ( 574 collection.find() 575 .fields("(1>5) IN (true, false) as test") 576 .limit(1) 577 .execute() 578 ) 579 result1 = ( 580 collection.find() 581 .fields("('a'>'b') in (true, false) as test1") 582 .limit(1) 583 .execute() 584 ) 585 result2 = ( 586 collection.find() 587 .fields( 588 "true IN [(1>5), !(false), (true || false), (false && true)] as test2" 589 ) 590 .limit(1) 591 .execute() 592 ) 593 self.assertTrue(result.fetch_all()[0].test) 594 self.assertTrue(result1.fetch_all()[0].test1) 595 self.assertTrue(result2.fetch_all()[0].test2) 596 self.schema.drop_collection("mycoll12") 597 598 @tests.foreach_session() 599 def test_contains_operator13(self): 600 """Test IN operator with operands having expressions.""" 601 self._drop_collection_if_exists("mycoll13") 602 collection = self.schema.create_collection("mycoll13") 603 collection.add( 604 {"_id": 1, "name": "a", "age": 21}, 605 {"_id": 2, "name": "b"}, 606 {"_id": 3, "name": "c"}, 607 ).execute() 608 result = collection.find("(1+5) IN (1,2,3,4,5,6)").execute() 609 result1 = collection.find("(2+3) IN (1,2,3,4)").limit(1).execute() 610 result2 = collection.find("(1+5) IN (1,2,3,4,5,6)").execute() 611 self.assertEqual(len(result.fetch_all()), 3) 612 self.assertEqual(len(result1.fetch_all()), 0) 613 self.assertEqual(len(result2.fetch_all()), 3) 614 self.schema.drop_collection("mycoll13") 615 616 @tests.foreach_session() 617 def test_contains_operator14(self): 618 """Test IN operator: search for empty string in a field and field in 619 empty string.""" 620 self._drop_collection_if_exists("mycoll14") 621 collection = self.schema.create_collection("mycoll14") 622 collection.add( 623 {"_id": 1, "name": "a", "age": 21}, 624 {"_id": 2, "name": "b"}, 625 {"_id": 3, "name": "c"}, 626 ).execute() 627 result = collection.find("'' IN $.name").execute() 628 result1 = collection.find("$.name IN ['', ' ']").execute() 629 result2 = collection.find("$.name IN ('', ' ')").execute() 630 self.assertEqual(len(result.fetch_all()), 0) 631 self.assertEqual(len(result1.fetch_all()), 0) 632 self.assertEqual(len(result2.fetch_all()), 0) 633 self.schema.drop_collection("mycoll14") 634 635 @tests.foreach_session() 636 def test_collection_s_s_lock(self): 637 """Test shared-shared lock.""" 638 config = tests.get_mysqlx_config() 639 self._drop_collection_if_exists("mycoll11") 640 collection = self.schema.create_collection("mycoll1") 641 collection.add( 642 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 643 ).execute() 644 645 locking = threading.Event() 646 waiting = threading.Event() 647 648 def thread_a(locking, waiting): 649 session1 = mysqlx.get_session(config) 650 schema1 = session1.get_schema(config["schema"]) 651 collection = schema1.get_collection("mycoll1") 652 653 session1.start_transaction() 654 collection.find("name = 'James'").lock_shared().execute() 655 locking.set() 656 time.sleep(2) 657 locking.clear() 658 if waiting.is_set(): 659 session1.commit() 660 self.fail( 661 "Collection_S_S_Lock_test IS NOT OK. Other thread is " 662 "waiting while it is not expected to!" 663 ) 664 session1.commit() 665 666 def thread_b(locking, waiting): 667 session2 = mysqlx.get_session(config) 668 schema2 = session2.get_schema(config["schema"]) 669 collection = schema2.get_collection("mycoll1") 670 671 if not locking.wait(2): 672 self.fail( 673 "Collection_S_S_Lock_test IS NOT OK. Other thread has not " 674 "set the lock!" 675 ) 676 session2.start_transaction() 677 678 waiting.set() 679 collection.find("name = 'James'").lock_shared().execute() 680 waiting.clear() 681 682 session2.commit() 683 684 client1 = threading.Thread( 685 target=thread_a, 686 args=( 687 locking, 688 waiting, 689 ), 690 ) 691 client2 = threading.Thread( 692 target=thread_b, 693 args=( 694 locking, 695 waiting, 696 ), 697 ) 698 699 client1.start() 700 client2.start() 701 702 client1.join() 703 client2.join() 704 self.schema.drop_collection("mycoll1") 705 706 @tests.foreach_session() 707 def test_collection_s_x_lock(self): 708 config = tests.get_mysqlx_config() 709 """Test shared-exclusive lock.""" 710 self._drop_collection_if_exists("mycoll2") 711 collection = self.schema.create_collection("mycoll2") 712 collection.add( 713 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 714 ).execute() 715 716 locking = threading.Event() 717 waiting = threading.Event() 718 719 def thread_a(locking, waiting): 720 session1 = mysqlx.get_session(config) 721 schema1 = session1.get_schema(config["schema"]) 722 collection = schema1.get_collection("mycoll2") 723 724 session1.start_transaction() 725 collection.find("name = 'James'").lock_shared().execute() 726 locking.set() 727 time.sleep(2) 728 locking.clear() 729 if not waiting.is_set(): 730 session1.commit() 731 self.fail( 732 "Collection_S_X_Lock_test IS NOT OK. Other thread is not " 733 "waiting while it is expected to!" 734 ) 735 session1.commit() 736 737 def thread_b(locking, waiting): 738 session1 = mysqlx.get_session(config) 739 schema1 = session1.get_schema(config["schema"]) 740 collection = schema1.get_collection("mycoll2") 741 742 if not locking.wait(2): 743 self.fail( 744 "Collection_S_X_Lock_test IS NOT OK. Other thread has not " 745 "set the lock!" 746 ) 747 session1.start_transaction() 748 waiting.set() 749 collection.find("name = 'James'").lock_exclusive().execute() 750 waiting.clear() 751 session1.commit() 752 753 client1 = threading.Thread( 754 target=thread_a, 755 args=( 756 locking, 757 waiting, 758 ), 759 ) 760 client2 = threading.Thread( 761 target=thread_b, 762 args=( 763 locking, 764 waiting, 765 ), 766 ) 767 768 client1.start() 769 client2.start() 770 771 client1.join() 772 client2.join() 773 self.schema.drop_collection("mycoll2") 774 775 @tests.foreach_session() 776 def test_collection_x_x_lock(self): 777 """Test exclusive-exclusive lock.""" 778 config = tests.get_mysqlx_config() 779 self._drop_collection_if_exists("mycoll3") 780 collection = self.schema.create_collection("mycoll3") 781 collection.add( 782 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 783 ).execute() 784 785 locking = threading.Event() 786 waiting = threading.Event() 787 788 def thread_a(locking, waiting): 789 session1 = mysqlx.get_session(config) 790 schema1 = session1.get_schema(config["schema"]) 791 collection = schema1.get_collection("mycoll3") 792 793 session1.start_transaction() 794 collection.find("name = 'James'").lock_exclusive().execute() 795 locking.set() 796 time.sleep(2) 797 locking.clear() 798 if not waiting.is_set(): 799 session1.commit() 800 self.fail( 801 "Collection_X_X_Lock_test IS NOT OK. Other thread is not " 802 "waiting while it is expected to!" 803 ) 804 session1.commit() 805 806 def thread_b(locking, waiting): 807 session2 = mysqlx.get_session(config) 808 schema2 = session2.get_schema(config["schema"]) 809 collection = schema2.get_collection("mycoll3") 810 811 if not locking.wait(2): 812 self.fail( 813 "Collection_X_X_Lock_test IS NOT OK. Other thread has not " 814 "set the lock!" 815 ) 816 session2.start_transaction() 817 818 waiting.set() 819 collection.find("name = 'James'").lock_exclusive().execute() 820 waiting.clear() 821 822 session2.commit() 823 824 client1 = threading.Thread( 825 target=thread_a, 826 args=( 827 locking, 828 waiting, 829 ), 830 ) 831 client2 = threading.Thread( 832 target=thread_b, 833 args=( 834 locking, 835 waiting, 836 ), 837 ) 838 839 client1.start() 840 client2.start() 841 842 client1.join() 843 client2.join() 844 self.schema.drop_collection("mycoll3") 845 846 @tests.foreach_session() 847 def test_collection_x_s_lock(self): 848 """Test exclusive-exclusive lock.""" 849 config = tests.get_mysqlx_config() 850 self._drop_collection_if_exists("mycoll4") 851 collection = self.schema.create_collection("mycoll4") 852 collection.add( 853 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 854 ).execute() 855 856 locking = threading.Event() 857 waiting = threading.Event() 858 859 def thread_a(locking, waiting): 860 session1 = mysqlx.get_session(config) 861 schema1 = session1.get_schema(config["schema"]) 862 collection = schema1.get_collection("mycoll4") 863 864 session1.start_transaction() 865 collection.find("name = 'James'").lock_exclusive().execute() 866 locking.set() 867 time.sleep(2) 868 locking.clear() 869 if not waiting.is_set(): 870 session1.commit() 871 self.fail( 872 "Collection_X_S_Lock_test IS NOT OK. Other thread is not " 873 "waiting while it is expected to!" 874 ) 875 session1.commit() 876 877 def thread_b(locking, waiting): 878 session2 = mysqlx.get_session(config) 879 schema2 = session2.get_schema(config["schema"]) 880 collection = schema2.get_collection("mycoll4") 881 882 if not locking.wait(2): 883 self.fail( 884 "Collection_X_S_Lock_test IS NOT OK. Other thread has not " 885 "set the lock!" 886 ) 887 session2.start_transaction() 888 889 waiting.set() 890 collection.find("name = 'James'").lock_shared().execute() 891 waiting.clear() 892 893 session2.commit() 894 895 client1 = threading.Thread( 896 target=thread_a, 897 args=( 898 locking, 899 waiting, 900 ), 901 ) 902 client2 = threading.Thread( 903 target=thread_b, 904 args=( 905 locking, 906 waiting, 907 ), 908 ) 909 910 client1.start() 911 client2.start() 912 913 client1.join() 914 client2.join() 915 self.schema.drop_collection("mycoll4") 916 917 @tests.foreach_session() 918 def test_collection_multiple_lock_calls(self): 919 """Test multiple lock calls.""" 920 config = tests.get_mysqlx_config() 921 self._drop_collection_if_exists("mycoll5") 922 collection = self.schema.create_collection("mycoll5") 923 collection.add( 924 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 925 ).execute() 926 927 locking = threading.Event() 928 waiting = threading.Event() 929 930 def thread_a(locking, waiting): 931 session1 = mysqlx.get_session(config) 932 schema1 = session1.get_schema(config["schema"]) 933 collection = schema1.get_collection("mycoll5") 934 935 session1.start_transaction() 936 collection.find( 937 "name = 'James'" 938 ).lock_exclusive().lock_shared().lock_exclusive().execute() 939 locking.set() 940 time.sleep(2) 941 locking.clear() 942 if not waiting.is_set(): 943 session1.commit() 944 self.fail( 945 "Collection_Multiple_Lock_calls_test IS NOT OK. Other " 946 "thread is not waiting while it is expected to!" 947 ) 948 session1.commit() 949 950 def thread_b(locking, waiting): 951 session2 = mysqlx.get_session(config) 952 schema2 = session2.get_schema(config["schema"]) 953 collection = schema2.get_collection("mycoll5") 954 955 if not locking.wait(2): 956 self.fail( 957 "Collection_Multiple_Lock_calls_test IS NOT OK. Other " 958 "thread has not set the lock!" 959 ) 960 session2.start_transaction() 961 962 waiting.set() 963 collection.find( 964 "name = 'James'" 965 ).lock_shared().lock_exclusive().lock_exclusive().lock_shared().execute() 966 waiting.clear() 967 968 session2.commit() 969 970 client1 = threading.Thread( 971 target=thread_a, 972 args=( 973 locking, 974 waiting, 975 ), 976 ) 977 client2 = threading.Thread( 978 target=thread_b, 979 args=( 980 locking, 981 waiting, 982 ), 983 ) 984 985 client1.start() 986 client2.start() 987 988 client1.join() 989 client2.join() 990 self.schema.drop_collection("mycoll5") 991 992 @tests.foreach_session() 993 def test_collection_x_lock_modify(self): 994 """Test lock exclusive and modify - modify will be blocked until the 995 lock is released.""" 996 config = tests.get_mysqlx_config() 997 self._drop_collection_if_exists("mycoll6") 998 collection = self.schema.create_collection("mycoll6") 999 collection.add( 1000 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1001 ).execute() 1002 1003 locking = threading.Event() 1004 waiting = threading.Event() 1005 1006 def thread_a(locking, waiting): 1007 session1 = mysqlx.get_session(config) 1008 schema1 = session1.get_schema(config["schema"]) 1009 collection = schema1.get_collection("mycoll6") 1010 1011 session1.start_transaction() 1012 collection.find( 1013 "$.name = 'James'" 1014 ).lock_exclusive().lock_shared().lock_exclusive().execute() 1015 locking.set() 1016 time.sleep(2) 1017 locking.clear() 1018 if not waiting.is_set(): 1019 session1.commit() 1020 self.fail( 1021 "Collection_X_Lock_Modify_test IS NOT OK. Other thread is " 1022 "not waiting while it is expected to!" 1023 ) 1024 session1.commit() 1025 1026 def thread_b(locking, waiting): 1027 session2 = mysqlx.get_session(config) 1028 schema2 = session2.get_schema(config["schema"]) 1029 collection = schema2.get_collection("mycoll6") 1030 1031 if not locking.wait(2): 1032 self.fail( 1033 "Collection_X_Lock_Modify_test IS NOT OK. Other thread has " 1034 "not set the lock!" 1035 ) 1036 session2.start_transaction() 1037 1038 waiting.set() 1039 collection.modify("$.name == 'James'").set("$.age", 30).execute() 1040 waiting.clear() 1041 1042 session2.commit() 1043 1044 client1 = threading.Thread( 1045 target=thread_a, 1046 args=( 1047 locking, 1048 waiting, 1049 ), 1050 ) 1051 client2 = threading.Thread( 1052 target=thread_b, 1053 args=( 1054 locking, 1055 waiting, 1056 ), 1057 ) 1058 1059 client1.start() 1060 client2.start() 1061 1062 client1.join() 1063 client2.join() 1064 self.schema.drop_collection("mycoll6") 1065 1066 @tests.foreach_session() 1067 def test_collection_s_lock_modifyt(self): 1068 """Test lock shared and modify - modify will be blocked until the lock 1069 is released, but will be able to read.""" 1070 config = tests.get_mysqlx_config() 1071 self._drop_collection_if_exists("mycoll7") 1072 collection = self.schema.create_collection("mycoll7") 1073 collection.add( 1074 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1075 ).execute() 1076 1077 locking = threading.Event() 1078 waiting = threading.Event() 1079 1080 def thread_a(locking, waiting): 1081 session1 = mysqlx.get_session(config) 1082 schema1 = session1.get_schema(config["schema"]) 1083 collection = schema1.get_collection("mycoll7") 1084 1085 session1.start_transaction() 1086 collection.find( 1087 "$.name = 'James'" 1088 ).lock_exclusive().lock_shared().execute() 1089 locking.set() 1090 time.sleep(2) 1091 locking.clear() 1092 if not waiting.is_set(): 1093 session1.commit() 1094 # return 1095 raise Exception( 1096 "Collection_S_Lock_Modify_test IS NOT OK. Other thread is " 1097 "not waiting while it is expected to!" 1098 ) 1099 session1.commit() 1100 1101 def thread_b(locking, waiting): 1102 session2 = mysqlx.get_session(config) 1103 schema2 = session2.get_schema(config["schema"]) 1104 collection = schema2.get_collection("mycoll7") 1105 1106 if not locking.wait(2): 1107 raise Exception( 1108 "Collection_S_Lock_Modify_test IS NOT OK. Other thread has " 1109 "not set the lock!" 1110 ) 1111 session2.start_transaction() 1112 1113 result = collection.find("$.name == 'James'").execute() 1114 assert result.fetch_all()[0]["age"] == 23 1115 waiting.set() 1116 collection.modify("$.name == 'James'").set("$.age", 30).execute() 1117 waiting.clear() 1118 1119 session2.commit() 1120 1121 client1 = threading.Thread( 1122 target=thread_a, 1123 args=( 1124 locking, 1125 waiting, 1126 ), 1127 ) 1128 client2 = threading.Thread( 1129 target=thread_b, 1130 args=( 1131 locking, 1132 waiting, 1133 ), 1134 ) 1135 1136 client1.start() 1137 client2.start() 1138 1139 client1.join() 1140 client2.join() 1141 self.schema.drop_collection("mycoll7") 1142 1143 @tests.foreach_session() 1144 def _lock_contention_test(self, lock_type_1, lock_type_2, lock_contention): 1145 """Test reading an exclusively locked document using lock_shared and 1146 the 'NOWAIT' waiting option.""" 1147 config = tests.get_mysqlx_config() 1148 self._drop_collection_if_exists("mycoll") 1149 collection = self.schema.create_collection("mycoll") 1150 collection.add( 1151 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1152 ).execute() 1153 1154 locking = threading.Event() 1155 waiting = threading.Event() 1156 1157 errors = [] 1158 1159 def thread_a(locking, waiting): 1160 session = mysqlx.get_session(config) 1161 schema = session.get_schema(config["schema"]) 1162 collection = schema.get_collection("mycoll") 1163 1164 session.start_transaction() 1165 result = collection.find("name = 'James'") 1166 if lock_type_1 == "S": 1167 result.lock_shared().execute() 1168 else: 1169 result.lock_exclusive().execute() 1170 1171 locking.set() 1172 time.sleep(2) 1173 locking.clear() 1174 1175 if not waiting.is_set(): 1176 errors.append( 1177 "{0}-{1} lock test failure." 1178 "".format(lock_type_1, lock_type_2) 1179 ) 1180 session.commit() 1181 return 1182 1183 def thread_b(locking, waiting): 1184 session = mysqlx.get_session(config) 1185 schema = session.get_schema(config["schema"]) 1186 collection = schema.get_collection("mycoll") 1187 1188 if not locking.wait(2): 1189 errors.append( 1190 "{0}-{0} lock test failure." 1191 "".format(lock_type_1, lock_type_2) 1192 ) 1193 session.commit() 1194 return 1195 1196 session.start_transaction() 1197 if lock_type_2 == "S": 1198 result = collection.find("name = 'Fred'").lock_shared( 1199 lock_contention 1200 ) 1201 else: 1202 result = collection.find("name = 'Fred'").lock_exclusive( 1203 lock_contention 1204 ) 1205 1206 if lock_contention == mysqlx.LockContention.NOWAIT and ( 1207 lock_type_1 == "X" or lock_type_2 == "X" 1208 ): 1209 try: 1210 result.execute() 1211 except Exception: 1212 pass 1213 session.rollback() 1214 1215 waiting.set() 1216 1217 session.start_transaction() 1218 result.execute() 1219 session.commit() 1220 waiting.clear() 1221 1222 client1 = threading.Thread( 1223 target=thread_a, 1224 args=( 1225 locking, 1226 waiting, 1227 ), 1228 ) 1229 client2 = threading.Thread( 1230 target=thread_b, 1231 args=( 1232 locking, 1233 waiting, 1234 ), 1235 ) 1236 1237 client1.start() 1238 client2.start() 1239 1240 client1.join() 1241 client2.join() 1242 self.schema.drop_collection("mycoll") 1243 1244 @unittest.skip("TODO: Fix me") 1245 @unittest.skipIf( 1246 tests.MYSQL_VERSION < (8, 0, 5), "Lock contention unavailable." 1247 ) 1248 def test_lock_shared_with_nowait(self): 1249 self._lock_contention_test("S", "S", mysqlx.LockContention.NOWAIT) 1250 self._lock_contention_test("S", "X", mysqlx.LockContention.NOWAIT) 1251 1252 @tests.foreach_session() 1253 def test_collection_x_s_nowait(self): 1254 config = tests.get_mysqlx_config() 1255 self._drop_collection_if_exists("mycoll8") 1256 collection = self.schema.create_collection("mycoll8") 1257 collection.add( 1258 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1259 ).execute() 1260 1261 locking = threading.Event() 1262 waiting = threading.Event() 1263 1264 def thread_a(locking, waiting): 1265 session1 = mysqlx.get_session(config) 1266 schema1 = session1.get_schema(config["schema"]) 1267 collection = schema1.get_collection("mycoll8") 1268 1269 session1.start_transaction() 1270 collection.find("name = 'James'").lock_exclusive().execute() 1271 locking.set() 1272 time.sleep(2) 1273 locking.clear() 1274 if waiting.is_set(): 1275 session1.commit() 1276 self.fail( 1277 "Collection_X_S_NOWAIT_test IS NOT OK. Other thread is " 1278 "waiting while it is not expected to!" 1279 ) 1280 session1.commit() 1281 1282 def thread_b(locking, waiting): 1283 session2 = mysqlx.get_session(config) 1284 schema2 = session2.get_schema(config["schema"]) 1285 collection = schema2.get_collection("mycoll8") 1286 1287 if not locking.wait(2): 1288 self.fail( 1289 "Collection_X_S_NOWAIT_test IS NOT OK. Other thread has " 1290 "not set the lock!" 1291 ) 1292 1293 session2.start_transaction() 1294 waiting.set() 1295 try: 1296 collection.find("name = 'James'").lock_exclusive( 1297 mysqlx.LockContention.NOWAIT 1298 ).execute() 1299 except Exception: 1300 pass 1301 1302 waiting.clear() 1303 1304 session2.commit() 1305 1306 client1 = threading.Thread( 1307 target=thread_a, 1308 args=( 1309 locking, 1310 waiting, 1311 ), 1312 ) 1313 client2 = threading.Thread( 1314 target=thread_b, 1315 args=( 1316 locking, 1317 waiting, 1318 ), 1319 ) 1320 1321 client1.start() 1322 client2.start() 1323 1324 client1.join() 1325 client2.join() 1326 self.schema.drop_collection("mycoll8") 1327 1328 @tests.foreach_session() 1329 def test_collection_x_x_nowait(self): 1330 """Test exclusive-exclusive with NOWAIT lockcontention.""" 1331 config = tests.get_mysqlx_config() 1332 self._drop_collection_if_exists("mycoll") 1333 collection = self.schema.create_collection("mycoll") 1334 collection.add( 1335 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1336 ).execute() 1337 1338 session1 = mysqlx.get_session(config) 1339 schema1 = session1.get_schema(config["schema"]) 1340 collection1 = schema1.get_collection("mycoll") 1341 session1.start_transaction() 1342 collection1.find("name = 'James'").lock_exclusive().execute() 1343 1344 session2 = mysqlx.get_session(config) 1345 schema2 = session2.get_schema(config["schema"]) 1346 collection2 = schema2.get_collection("mycoll") 1347 session2.start_transaction() 1348 self.assertRaises( 1349 mysqlx.OperationalError, 1350 collection2.find("name = 'James'") 1351 .lock_exclusive(mysqlx.LockContention.NOWAIT) 1352 .execute, 1353 ) 1354 session2.rollback() 1355 session1.rollback() 1356 self.schema.drop_collection("mycoll") 1357 session2.close() 1358 session1.close() 1359 1360 @unittest.skip("TODO: Fix me") 1361 @tests.foreach_session() 1362 def test_collection_s_s_nowait(self): 1363 """Test shared-shared with NOWAIT lockcontention.""" 1364 config = tests.get_mysqlx_config() 1365 self._drop_collection_if_exists("mycoll9") 1366 collection = self.schema.create_collection("mycoll9") 1367 collection.add( 1368 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1369 ).execute() 1370 1371 locking = threading.Event() 1372 waiting = threading.Event() 1373 1374 def thread_a(locking, waiting): 1375 session1 = mysqlx.get_session(config) 1376 schema1 = session1.get_schema(config["schema"]) 1377 collection = schema1.get_collection("mycoll9") 1378 1379 session1.start_transaction() 1380 collection.find("name = 'James'").lock_shared().execute() 1381 locking.set() 1382 time.sleep(2) 1383 locking.clear() 1384 if waiting.is_set(): 1385 session1.commit() 1386 self.fail( 1387 "Collection_S_S_NOWAIT_test IS NOT OK. Other thread " 1388 "is waiting while it is not expected to!" 1389 ) 1390 session1.commit() 1391 1392 def thread_b(locking, waiting): 1393 session2 = mysqlx.get_session(config) 1394 schema2 = session2.get_schema(config["schema"]) 1395 collection = schema2.get_collection("mycoll9") 1396 1397 if not locking.wait(2): 1398 self.fail( 1399 "Collection_S_S_NOWAIT_test IS NOT OK. Other thread " 1400 "has not set the lock!" 1401 ) 1402 1403 session2.start_transaction() 1404 waiting.set() 1405 collection.find("name = 'James'").lock_shared( 1406 mysqlx.LockContention.NOWAIT 1407 ).execute() 1408 res = result.fetch_all() 1409 self.assertEqual(len(res), 1) 1410 1411 waiting.clear() 1412 1413 session2.commit() 1414 1415 client1 = threading.Thread( 1416 target=thread_a, 1417 args=( 1418 locking, 1419 waiting, 1420 ), 1421 ) 1422 client2 = threading.Thread( 1423 target=thread_b, 1424 args=( 1425 locking, 1426 waiting, 1427 ), 1428 ) 1429 1430 client1.start() 1431 client2.start() 1432 1433 client1.join() 1434 client2.join() 1435 self.schema.drop_collection("mycoll9") 1436 1437 @tests.foreach_session() 1438 def test_collection_s_x_nowait(self): 1439 """Test shared-exclusive with NOWAIT lockcontention.""" 1440 config = tests.get_mysqlx_config() 1441 self._drop_collection_if_exists("mycoll10") 1442 collection = self.schema.create_collection("mycoll10") 1443 collection.add( 1444 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1445 ).execute() 1446 1447 session1 = mysqlx.get_session(config) 1448 schema1 = session1.get_schema(config["schema"]) 1449 collection1 = schema1.get_collection("mycoll10") 1450 session1.start_transaction() 1451 collection1.find("name = 'James'").lock_shared().execute() 1452 1453 session2 = mysqlx.get_session(config) 1454 schema2 = session2.get_schema("test") 1455 collection2 = schema2.get_collection("mycoll10") 1456 session2.start_transaction() 1457 self.assertRaises( 1458 mysqlx.OperationalError, 1459 collection2.find("name = 'James'") 1460 .lock_exclusive(mysqlx.LockContention.NOWAIT) 1461 .execute, 1462 ) 1463 session2.rollback() 1464 session1.rollback() 1465 1466 self.schema.drop_collection("mycoll10") 1467 session2.close() 1468 session1.close() 1469 1470 @tests.foreach_session() 1471 def test_collection_x_s_skip_locked(self): 1472 """Test exclusive-shared with SKIP LOCKED lockcontention.""" 1473 config = tests.get_mysqlx_config() 1474 self._drop_collection_if_exists("mycoll") 1475 collection = self.schema.create_collection("mycoll") 1476 collection.add( 1477 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1478 ).execute() 1479 1480 # `session2.lock_exclusive(SKIP_LOCKED) returns data immediately. 1481 session1 = mysqlx.get_session(config) 1482 schema1 = session1.get_schema(config["schema"]) 1483 collection1 = schema1.get_collection("mycoll") 1484 session1.start_transaction() 1485 result = collection1.find("name = 'James'").lock_exclusive().execute() 1486 1487 session2 = mysqlx.get_session(config) 1488 schema2 = session2.get_schema(config["schema"]) 1489 collection2 = schema2.get_collection("mycoll") 1490 session2.start_transaction() 1491 result = ( 1492 collection2.find("name = 'James'") 1493 .lock_shared(mysqlx.LockContention.SKIP_LOCKED) 1494 .execute() 1495 ).fetch_all() 1496 self.assertEqual(len(result), 0) 1497 session2.rollback() 1498 1499 session1.rollback() 1500 self.schema.drop_collection("mycoll") 1501 session2.close() 1502 session1.close() 1503 1504 @tests.foreach_session() 1505 def test_collection_x_x_skip_locked(self): 1506 """Test exclusive-exclusive with SKIP LOCKED lockcontention.""" 1507 config = tests.get_mysqlx_config() 1508 self._drop_collection_if_exists("mycoll") 1509 collection = self.schema.create_collection("mycoll") 1510 collection.add( 1511 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1512 ).execute() 1513 1514 # `session2.lock_exclusive(SKIP_LOCKED) returns data immediately. 1515 session1 = mysqlx.get_session(config) 1516 schema1 = session1.get_schema(config["schema"]) 1517 collection1 = schema1.get_collection("mycoll") 1518 session1.start_transaction() 1519 result = collection1.find("name = 'James'").lock_exclusive().execute() 1520 1521 session2 = mysqlx.get_session(config) 1522 schema2 = session2.get_schema(config["schema"]) 1523 collection2 = schema2.get_collection("mycoll") 1524 session2.start_transaction() 1525 result = ( 1526 collection2.find("name = 'James'") 1527 .lock_exclusive(mysqlx.LockContention.SKIP_LOCKED) 1528 .execute() 1529 ) 1530 res = result.fetch_all() 1531 assert len(res) == 0 1532 session2.rollback() 1533 1534 session1.rollback() 1535 self.schema.drop_collection("mycoll") 1536 session2.close() 1537 session1.close() 1538 1539 @tests.foreach_session() 1540 def test_collection_s_s_skip_locked(self): 1541 """Test shared-shared with SKIP LOCKED lockcontention.""" 1542 config = tests.get_mysqlx_config() 1543 self._drop_collection_if_exists("mycoll") 1544 collection = self.schema.create_collection("mycoll") 1545 collection.add( 1546 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1547 ).execute() 1548 1549 session1 = mysqlx.get_session(config) 1550 schema1 = session1.get_schema(config["schema"]) 1551 collection1 = schema1.get_collection("mycoll") 1552 session1.start_transaction() 1553 result = collection1.find("name = 'James'").lock_shared().execute() 1554 1555 session2 = mysqlx.get_session(config) 1556 schema2 = session2.get_schema(config["schema"]) 1557 collection2 = schema2.get_collection("mycoll") 1558 session2.start_transaction() 1559 result = ( 1560 collection2.find("name = 'James'") 1561 .lock_shared(mysqlx.LockContention.SKIP_LOCKED) 1562 .execute() 1563 ) 1564 res = result.fetch_all() 1565 assert len(res) == 1 1566 session2.rollback() 1567 1568 session1.rollback() 1569 self.schema.drop_collection("mycoll") 1570 session2.close() 1571 session1.close() 1572 1573 @tests.foreach_session() 1574 def test_collection_s_x_skip_locked(self): 1575 """Test shared-exclusive with SKIP LOCKED lockcontention.""" 1576 config = tests.get_mysqlx_config() 1577 self._drop_collection_if_exists("mycoll") 1578 collection = self.schema.create_collection("mycoll") 1579 collection.add( 1580 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1581 ).execute() 1582 1583 # `session2.lock_exclusive(SKIP_LOCKED) returns data immediately. 1584 session1 = mysqlx.get_session(config) 1585 schema1 = session1.get_schema(config["schema"]) 1586 collection1 = schema1.get_collection("mycoll") 1587 session1.start_transaction() 1588 result = collection1.find("name = 'James'").lock_shared().execute() 1589 1590 session2 = mysqlx.get_session(config) 1591 schema2 = session2.get_schema(config["schema"]) 1592 collection2 = schema2.get_collection("mycoll") 1593 session2.start_transaction() 1594 result = ( 1595 collection2.find("name = 'James'") 1596 .lock_exclusive(mysqlx.LockContention.SKIP_LOCKED) 1597 .execute() 1598 ) 1599 res = result.fetch_all() 1600 assert len(res) == 0 1601 session2.rollback() 1602 1603 session1.rollback() 1604 self.schema.drop_collection("mycoll") 1605 session2.close() 1606 session1.close() 1607 1608 @tests.foreach_session() 1609 def test_collection_s_s_default(self): 1610 """Test exclusive-shared with DEFAULT lockcontention.""" 1611 config = tests.get_mysqlx_config() 1612 self._drop_collection_if_exists("mycoll1") 1613 collection = self.schema.create_collection("mycoll1") 1614 collection.add( 1615 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1616 ).execute() 1617 1618 locking = threading.Event() 1619 waiting = threading.Event() 1620 1621 def thread_a(locking, waiting): 1622 session1 = mysqlx.get_session(config) 1623 schema1 = session1.get_schema(config["schema"]) 1624 collection = schema1.get_collection("mycoll1") 1625 1626 session1.start_transaction() 1627 collection.find("name = 'James'").lock_shared().execute() 1628 locking.set() 1629 time.sleep(2) 1630 locking.clear() 1631 if waiting.is_set(): 1632 session1.commit() 1633 self.fail( 1634 "Collection_S_S_DEFAULT_test IS NOT OK. Other thread is " 1635 "waiting while it is not expected to!" 1636 ) 1637 session1.commit() 1638 1639 def thread_b(locking, waiting): 1640 session2 = mysqlx.get_session(config) 1641 schema2 = session2.get_schema(config["schema"]) 1642 collection = schema2.get_collection("mycoll1") 1643 1644 if not locking.wait(2): 1645 self.fail( 1646 "Collection_S_S_DEFAULT_test IS NOT OK. Other thread has " 1647 "not set the lock!" 1648 ) 1649 session2.start_transaction() 1650 1651 waiting.set() 1652 collection.find("name = 'James'").lock_shared( 1653 mysqlx.LockContention.DEFAULT 1654 ).execute() 1655 waiting.clear() 1656 1657 session2.commit() 1658 1659 client1 = threading.Thread( 1660 target=thread_a, 1661 args=( 1662 locking, 1663 waiting, 1664 ), 1665 ) 1666 client2 = threading.Thread( 1667 target=thread_b, 1668 args=( 1669 locking, 1670 waiting, 1671 ), 1672 ) 1673 1674 client1.start() 1675 client2.start() 1676 1677 client1.join() 1678 client2.join() 1679 self.schema.drop_collection("mycoll1") 1680 1681 @tests.foreach_session() 1682 def test_collection_s_x_default(self): 1683 """Test shared-exclusive lock.""" 1684 config = tests.get_mysqlx_config() 1685 session1 = mysqlx.get_session(config) 1686 self._drop_collection_if_exists("mycoll12") 1687 collection = self.schema.create_collection("mycoll2") 1688 collection.add( 1689 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1690 ).execute() 1691 1692 locking = threading.Event() 1693 waiting = threading.Event() 1694 1695 def thread_a(locking, waiting): 1696 session1 = mysqlx.get_session(config) 1697 schema1 = session1.get_schema(config["schema"]) 1698 collection = schema1.get_collection("mycoll2") 1699 1700 session1.start_transaction() 1701 collection.find("name = 'James'").lock_shared().execute() 1702 locking.set() 1703 time.sleep(2) 1704 locking.clear() 1705 if not waiting.is_set(): 1706 session1.commit() 1707 self.fail( 1708 "Collection_S_X_DEFAULT_test IS NOT OK. Other thread is " 1709 "not waiting while it is expected to!" 1710 ) 1711 session1.commit() 1712 1713 def thread_b(locking, waiting): 1714 session1 = mysqlx.get_session(config) 1715 schema1 = session1.get_schema(config["schema"]) 1716 collection = schema1.get_collection("mycoll2") 1717 1718 if not locking.wait(2): 1719 self.fail( 1720 "Collection_S_X_DEFAULT_test IS NOT OK. Other thread has " 1721 "not set the lock!" 1722 ) 1723 session1.start_transaction() 1724 waiting.set() 1725 collection.find("name = 'James'").lock_exclusive( 1726 mysqlx.LockContention.DEFAULT 1727 ).execute() 1728 waiting.clear() 1729 session1.commit() 1730 1731 client1 = threading.Thread( 1732 target=thread_a, 1733 args=( 1734 locking, 1735 waiting, 1736 ), 1737 ) 1738 client2 = threading.Thread( 1739 target=thread_b, 1740 args=( 1741 locking, 1742 waiting, 1743 ), 1744 ) 1745 1746 client1.start() 1747 client2.start() 1748 1749 client1.join() 1750 client2.join() 1751 self.schema.drop_collection("mycoll2") 1752 1753 @tests.foreach_session() 1754 def test_collection_x_x_default(self): 1755 """Test exclusive-exclusive lock.""" 1756 config = tests.get_mysqlx_config() 1757 self._drop_collection_if_exists("mycoll13") 1758 collection = self.schema.create_collection("mycoll3") 1759 collection.add( 1760 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1761 ).execute() 1762 1763 locking = threading.Event() 1764 waiting = threading.Event() 1765 1766 def thread_a(locking, waiting): 1767 session1 = mysqlx.get_session(config) 1768 schema1 = session1.get_schema(config["schema"]) 1769 collection = schema1.get_collection("mycoll3") 1770 1771 session1.start_transaction() 1772 collection.find("name = 'James'").lock_exclusive().execute() 1773 locking.set() 1774 time.sleep(2) 1775 locking.clear() 1776 if not waiting.is_set(): 1777 session1.commit() 1778 self.fail( 1779 "Collection_X_X_DEFAULT_test IS NOT OK. Other thread is " 1780 "not waiting while it is expected to!" 1781 ) 1782 session1.commit() 1783 1784 def thread_b(locking, waiting): 1785 session2 = mysqlx.get_session(config) 1786 schema2 = session2.get_schema(config["schema"]) 1787 collection = schema2.get_collection("mycoll3") 1788 1789 if not locking.wait(2): 1790 self.fail( 1791 "Collection_X_X_DEFAULT_test IS NOT OK. Other thread has " 1792 "not set the lock!" 1793 ) 1794 session2.start_transaction() 1795 1796 waiting.set() 1797 collection.find("name = 'James'").lock_exclusive( 1798 mysqlx.LockContention.DEFAULT 1799 ).execute() 1800 waiting.clear() 1801 1802 session2.commit() 1803 1804 client1 = threading.Thread( 1805 target=thread_a, 1806 args=( 1807 locking, 1808 waiting, 1809 ), 1810 ) 1811 client2 = threading.Thread( 1812 target=thread_b, 1813 args=( 1814 locking, 1815 waiting, 1816 ), 1817 ) 1818 1819 client1.start() 1820 client2.start() 1821 1822 client1.join() 1823 client2.join() 1824 self.schema.drop_collection("mycoll3") 1825 1826 @tests.foreach_session() 1827 def test_collection_x_s_default(self): 1828 """Test exclusive-exclusive lock.""" 1829 config = tests.get_mysqlx_config() 1830 self._drop_collection_if_exists("mycoll4") 1831 collection = self.schema.create_collection("mycoll4") 1832 collection.add( 1833 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1834 ).execute() 1835 1836 locking = threading.Event() 1837 waiting = threading.Event() 1838 1839 def thread_a(locking, waiting): 1840 session1 = mysqlx.get_session(config) 1841 schema1 = session1.get_schema(config["schema"]) 1842 collection = schema1.get_collection("mycoll4") 1843 1844 session1.start_transaction() 1845 collection.find("name = 'James'").lock_exclusive().execute() 1846 locking.set() 1847 time.sleep(2) 1848 locking.clear() 1849 if not waiting.is_set(): 1850 session1.commit() 1851 self.fail( 1852 "Collection_X_S_DEFAULT_test IS NOT OK. Other thread is " 1853 "not waiting while it is expected to!" 1854 ) 1855 session1.commit() 1856 1857 def thread_b(locking, waiting): 1858 session2 = mysqlx.get_session(config) 1859 schema2 = session2.get_schema(config["schema"]) 1860 collection = schema2.get_collection("mycoll4") 1861 1862 if not locking.wait(2): 1863 self.fail( 1864 "Collection_X_S_DEFAULT_test IS NOT OK. Other thread has " 1865 "not set the lock!" 1866 ) 1867 session2.start_transaction() 1868 1869 waiting.set() 1870 collection.find("name = 'James'").lock_shared( 1871 mysqlx.LockContention.DEFAULT 1872 ).execute() 1873 waiting.clear() 1874 1875 session2.commit() 1876 1877 client1 = threading.Thread( 1878 target=thread_a, 1879 args=( 1880 locking, 1881 waiting, 1882 ), 1883 ) 1884 client2 = threading.Thread( 1885 target=thread_b, 1886 args=( 1887 locking, 1888 waiting, 1889 ), 1890 ) 1891 1892 client1.start() 1893 client2.start() 1894 1895 client1.join() 1896 client2.join() 1897 self.schema.drop_collection("mycoll4") 1898 1899 @tests.foreach_session() 1900 def test_collection_multiple_lock_contention_calls(self): 1901 """Test multiple lock calls.""" 1902 config = tests.get_mysqlx_config() 1903 self._drop_collection_if_exists("mycoll5") 1904 1905 collection = self.schema.create_collection("mycoll5") 1906 collection.add( 1907 {"name": "Joe", "age": 21}, {"name": "James", "age": 23} 1908 ).execute() 1909 1910 locking = threading.Event() 1911 waiting = threading.Event() 1912 1913 errors = [] 1914 1915 def thread_a(locking, waiting): 1916 session1 = mysqlx.get_session(config) 1917 schema1 = session1.get_schema(config["schema"]) 1918 collection = schema1.get_collection("mycoll5") 1919 1920 session1.start_transaction() 1921 collection.find( 1922 "name = 'James'" 1923 ).lock_exclusive().lock_shared().lock_exclusive().execute() 1924 locking.set() 1925 time.sleep(2) 1926 locking.clear() 1927 if waiting.is_set(): 1928 session1.commit() 1929 self.fail( 1930 "Collection_Multiple_LockContention_calls_test IS NOT OK. " 1931 "Other thread is not waiting while it is expected to!" 1932 ) 1933 session1.commit() 1934 1935 def thread_b(locking, waiting): 1936 session2 = mysqlx.get_session(config) 1937 schema2 = session2.get_schema(config["schema"]) 1938 collection = schema2.get_collection("mycoll5") 1939 1940 if not locking.wait(2): 1941 self.fail( 1942 "Collection_Multiple_LockContention_calls_test IS NOT OK. " 1943 "Other thread has not set the lock!" 1944 ) 1945 session2.start_transaction() 1946 1947 waiting.set() 1948 result = ( 1949 collection.find("name = 'James'") 1950 .lock_shared(mysqlx.LockContention.DEFAULT) 1951 .lock_exclusive(mysqlx.LockContention.SKIP_LOCKED) 1952 .lock_exclusive(mysqlx.LockContention.NOWAIT) 1953 .lock_shared(mysqlx.LockContention.SKIP_LOCKED) 1954 .execute() 1955 ) 1956 res = result.fetch_all() 1957 self.assertEqual(len(res), 0) 1958 waiting.clear() 1959 1960 session2.commit() 1961 1962 client1 = threading.Thread( 1963 target=thread_a, 1964 args=( 1965 locking, 1966 waiting, 1967 ), 1968 ) 1969 client2 = threading.Thread( 1970 target=thread_b, 1971 args=( 1972 locking, 1973 waiting, 1974 ), 1975 ) 1976 1977 client1.start() 1978 client2.start() 1979 client1.join() 1980 client2.join() 1981 self.schema.drop_collection("mycoll5") 1982 1983 @tests.foreach_session() 1984 def test_parameter_binding(self): 1985 """Test the MCPY-354 issue ( parameter_binding) with parameter list.""" 1986 self._drop_collection_if_exists("mycoll1") 1987 collection = self.schema.create_collection("mycoll1") 1988 collection.add( 1989 {"_id": 1, "name": "a"}, {"_id": 2, "name": "b"} 1990 ).execute() 1991 result = ( 1992 collection.find("name == :name").bind('{"name":"b"}').execute() 1993 ) 1994 row = result.fetch_all() 1995 self.assertEqual(row[0]["_id"], 2) 1996 self.schema.drop_collection("mycoll1") 1997 1998 @tests.foreach_session() 1999 def test_parameter_binding(self): 2000 """Test the MCPY-354 issue ( parameter_binding) with dict and date 2001 datatype.""" 2002 self._drop_collection_if_exists("mycoll2") 2003 collection = self.schema.create_collection("mycoll2") 2004 collection.add( 2005 {"_id": 1, "bday": "2000-10-10"}, 2006 {"_id": 2, "bday": "2000-10-11"}, 2007 ).execute() 2008 result = ( 2009 collection.find("bday == :bday") 2010 .bind("bday", "2000-10-11") 2011 .execute() 2012 ) 2013 row = result.fetch_all() 2014 self.assertEqual(row[0]["_id"], 2) 2015 self.schema.drop_collection("mycoll2") 2016 2017 @tests.foreach_session() 2018 def Csbufferring_test1(): 2019 """Test client-side buffering.""" 2020 self._drop_collection_if_exists("mycoll3") 2021 collection = self.schema.create_collection("mycoll3") 2022 collection.add( 2023 {"_id": 1, "name": "a"}, {"_id": 2, "name": "b"} 2024 ).execute() 2025 result1 = ( 2026 collection.find("name == :name").bind('{"name":"b"}').execute() 2027 ) 2028 result2 = ( 2029 collection.find("name == :name").bind('{"name":"a"}').execute() 2030 ) 2031 row2 = result2.fetch_all() 2032 self.assertEqual(row2[0]["_id"], 1) 2033 row1 = result1.fetch_all() 2034 self.assertEqual(row1[0]["_id"], 2) 2035 self.schema.drop_collection("mycoll3") 2036