1# 2# Authors: Sergey Satskiy 3# 4# $Id: netschedule_tests_pack_4_18.py 591938 2019-08-22 17:35:24Z satskyse $ 5# 6 7""" 8Netschedule server tests pack for the features appeared in NS-4.20.0 and up 9""" 10 11import time 12import socket 13from netschedule_tests_pack import TestBase 14from netschedule_tests_pack_4_10 import execAny 15from netschedule_tests_pack_4_10 import getClientInfo 16from netschedule_tests_pack_4_10 import getAffinityInfo 17from netschedule_tests_pack_4_10 import getNotificationInfo 18# Works for python 2.5. Python 2.7 has it in urlparse module 19from cgi import parse_qs 20 21 22class Scenario1500( TestBase ): 23 " scenario 1500 " 24 25 def __init__( self, netschedule ): 26 TestBase.__init__( self, netschedule ) 27 return 28 29 @staticmethod 30 def getScenario(): 31 " Provides the scenario " 32 return "Submit a job without affinity, " \ 33 "submit a job with an affinity, " \ 34 "get and commit the job with affinity, read the job with affinity" 35 36 def execute( self ): 37 " Returns True if successfull " 38 self.fromScratch() 39 jobID1 = self.ns.submitJob( 'TEST', 'bla' ) 40 jobID1 = jobID1 # pylint is happy 41 jobID2 = self.ns.submitJob( 'TEST', 'bla', 'aff0' ) 42 43 receivedJobID = self.ns.getJob( 'TEST', -1, 'aff0' )[ 0 ] 44 if jobID2 != receivedJobID: 45 raise Exception( "Unexpected job for execution" ) 46 47 ns_client = self.getNetScheduleService( 'TEST', 'scenario1500' ) 48 ns_client.set_client_identification( 'readnode', 'readsession' ) 49 50 execAny( ns_client, "PUT " + jobID2 + " 0 nooutput" ) 51 output = execAny( ns_client, "READ2 reader_aff=0 any_aff=0 aff=aff0" ) 52 values = parse_qs( output, True, True ) 53 jobKey = values[ 'job_key' ][ 0 ] 54 if jobKey != jobID2: 55 raise Exception( "Unexpected job for reading" ) 56 return True 57 58 59class Scenario1501( TestBase ): 60 " scenario 1501 " 61 62 def __init__( self, netschedule ): 63 TestBase.__init__( self, netschedule ) 64 return 65 66 @staticmethod 67 def getScenario(): 68 " Provides the scenario " 69 return "submit a job with an affinity, " \ 70 "get and commit the job with affinity, read the job with another affinity" 71 72 def execute( self ): 73 " Returns True if successfull " 74 self.fromScratch() 75 jobID = self.ns.submitJob( 'TEST', 'bla', 'aff0' ) 76 77 receivedJobID = self.ns.getJob( 'TEST', -1, 'aff0' )[ 0 ] 78 if jobID != receivedJobID: 79 raise Exception( "Unexpected job for execution" ) 80 81 ns_client = self.getNetScheduleService( 'TEST', 'scenario1501' ) 82 ns_client.set_client_identification( 'readnode', 'readsession' ) 83 84 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 85 output = execAny( ns_client, "READ2 reader_aff=0 any_aff=0 aff=other_affinity" ) 86 values = parse_qs( output, True, True ) 87 if 'job_key' in values: 88 raise Exception( "Expected no job for reading, received one" ) 89 return True 90 91 92class Scenario1502( TestBase ): 93 " Scenario 1502 " 94 95 def __init__( self, netschedule ): 96 TestBase.__init__( self, netschedule ) 97 return 98 99 @staticmethod 100 def getScenario(): 101 " Provides the scenario " 102 return "CHRAFF as anon" 103 104 def execute( self ): 105 " Should return True if the execution completed successfully " 106 self.fromScratch() 107 108 ns_client = self.getNetScheduleService( 'TEST', 'scenario1502' ) 109 try: 110 ns_client.set_client_identification( '', '' ) 111 except: 112 pass 113 114 try: 115 execAny( ns_client, "CHRAFF add=a1" ) 116 except Exception as excp: 117 if "cannot use CHRAFF command" in str( excp ): 118 return True 119 raise 120 return False 121 122 123class Scenario1503( TestBase ): 124 " Scenario 119 " 125 126 def __init__( self, netschedule ): 127 TestBase.__init__( self, netschedule ) 128 return 129 130 @staticmethod 131 def getScenario(): 132 " Provides the scenario " 133 return "CHRAFF as identified, STAT CLIENTS" 134 135 def execute( self ): 136 " Should return True if the execution completed successfully " 137 self.fromScratch() 138 139 ns_client = self.getNetScheduleService( 'TEST', 'scenario1503' ) 140 ns_client.set_client_identification( 'mynode', 'mysession' ) 141 execAny( ns_client, "CHRAFF add=a1,a2" ) 142 143 client = getClientInfo( ns_client, 'mynode', verbose = False ) 144 if client[ 'number_of_reader_preferred_affinities' ] != 2: 145 raise Exception( 'Unexpected length of reader_preferred_affinities' ) 146 if client[ 'type' ] not in [ 'unknown', 'reader', 'reader | admin' ]: 147 raise Exception( 'Unexpected client type: ' + client[ 'type' ] ) 148 149 return True 150 151 152class Scenario1504( TestBase ): 153 " Scenario 1504 " 154 155 def __init__( self, netschedule ): 156 TestBase.__init__( self, netschedule ) 157 self.warning = "" 158 159 @staticmethod 160 def getScenario(): 161 " Provides the scenario " 162 return "CHRAFF as identified (rm), STAT CLIENTS" 163 164 def report_warning( self, msg, server ): 165 self.warning = msg 166 167 def execute( self ): 168 " Should return True if the execution completed successfully " 169 self.fromScratch() 170 171 ns_client = self.getNetScheduleService( 'TEST', 'scenario1504' ) 172 ns_client.set_client_identification( 'node', 'session' ) 173 ns_client.on_warning = self.report_warning 174 execAny( ns_client, "CHRAFF del=a1,a2" ) 175 176 getClientInfo( ns_client, 'node', 1, 1 ) 177 if "unknown affinity to delete" in self.warning: 178 return True 179 raise Exception( "The expected warning has not received" ) 180 181 182class Scenario1505( TestBase ): 183 " Scenario 1505 " 184 185 def __init__( self, netschedule ): 186 TestBase.__init__( self, netschedule ) 187 self.warning = "" 188 189 @staticmethod 190 def getScenario(): 191 " Provides the scenario " 192 return "CHRAFF as identified (add, rm), STAT CLIENTS" 193 194 def report_warning( self, msg, server ): 195 self.warning = msg 196 197 def execute( self ): 198 " Should return True if the execution completed successfully " 199 self.fromScratch() 200 201 ns_client = self.getNetScheduleService( 'TEST', 'scenario1505' ) 202 ns_client.set_client_identification( 'node', 'session' ) 203 ns_client.on_warning = self.report_warning 204 205 execAny( ns_client, "CHRAFF add=a1,a2,a3" ) 206 execAny( ns_client, "CHRAFF add=a2,a4 del=a1" ) 207 client = getClientInfo( ns_client, 'node' ) 208 if "already registered affinity to add" not in self.warning: 209 raise Exception( "The expected warning has not received" ) 210 211 if len( client[ 'reader_preferred_affinities' ] ) != 3 or \ 212 'a2' not in client[ 'reader_preferred_affinities' ] or \ 213 'a3' not in client[ 'reader_preferred_affinities' ] or \ 214 'a4' not in client[ 'reader_preferred_affinities' ]: 215 raise Exception( "Unexpected reader preferred affinities" ) 216 return True 217 218class Scenario1506( TestBase ): 219 " Scenario 1506 " 220 221 def __init__( self, netschedule ): 222 TestBase.__init__( self, netschedule ) 223 224 @staticmethod 225 def getScenario(): 226 " Provides the scenario " 227 return "CHRAFF as identified (add), CLRN, STAT CLIENTS" 228 229 def execute( self ): 230 " Should return True if the execution completed successfully " 231 self.fromScratch() 232 233 ns_client = self.getNetScheduleService( 'TEST', 'scenario1506' ) 234 ns_client.set_client_identification( 'node', 'session' ) 235 execAny( ns_client, "CHRAFF add=a1,a2" ) 236 237 execAny( ns_client, 'CLRN' ) 238 client = getClientInfo( ns_client, 'node' ) 239 if 'number_of_reader_preferred_affinities' in client: 240 if client[ 'number_of_reader_preferred_affinities' ] != 0: 241 raise Exception( "Expected no reader preferred affinities, got some." ) 242 return True 243 244class Scenario1507( TestBase ): 245 " Scenario 1507 " 246 247 def __init__( self, netschedule ): 248 TestBase.__init__( self, netschedule ) 249 250 @staticmethod 251 def getScenario(): 252 " Provides the scenario " 253 return "CHRAFF as identified (add), connect with another session, " \ 254 "STAT CLIENTS" 255 256 def execute( self ): 257 " Should return True if the execution completed successfully " 258 self.fromScratch() 259 260 ns_client = self.getNetScheduleService( 'TEST', 'scenario1507' ) 261 ns_client.set_client_identification( 'node', 'session' ) 262 execAny( ns_client, "CHRAFF add=a1,a2" ) 263 264 self.ns.submitJob( 'TEST', 'bla', '', '', 'node', 'other_session' ) 265 266 client = getClientInfo( ns_client, 'node' ) 267 if 'number_of_reader_preferred_affinities' in client: 268 if client[ 'number_of_reader_preferred_affinities' ] != 0: 269 raise Exception( "Expected no reader preferred affinities, got some." ) 270 return True 271 272class Scenario1508( TestBase ): 273 " Scenario 1508 " 274 275 def __init__( self, netschedule ): 276 TestBase.__init__( self, netschedule ) 277 278 @staticmethod 279 def getScenario(): 280 " Provides the scenario " 281 return "READ2 as anon" 282 283 def execute( self ): 284 " Should return True if the execution completed successfully " 285 self.fromScratch() 286 287 try: 288 ns_client = self.getNetScheduleService( 'TEST', 'scenario1508' ) 289 try: 290 ns_client.set_client_identification( '', '' ) 291 except: 292 pass 293 294 execAny( ns_client, 'READ2 reader_aff=1 any_aff=1' ) 295 except Exception as exc: 296 if "Anonymous client" in str( exc ): 297 return True 298 raise 299 return False 300 301class Scenario1509( TestBase ): 302 " Scenario 1509 " 303 304 def __init__( self, netschedule ): 305 TestBase.__init__( self, netschedule ) 306 307 @staticmethod 308 def getScenario(): 309 " Provides the scenario " 310 return "SUBMIT with a1, CHRAFF as identified (add a0, a1, a2), " \ 311 "READ2 reader_aff = 1" 312 313 def execute( self ): 314 " Should return True if the execution completed successfully " 315 self.fromScratch() 316 317 jobID = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 318 319 ns_client = self.getNetScheduleService( 'TEST', 'scenario1509' ) 320 ns_client.set_client_identification( 'node', 'session' ) 321 execAny( ns_client, "CHRAFF add=a0,a1,a2" ) 322 323 receivedJobID = self.ns.getJob( 'TEST', -1, 'a1' )[ 0 ] 324 if jobID != receivedJobID: 325 raise Exception( "Unexpected job for execution" ) 326 327 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 328 output = execAny( ns_client, 329 'READ2 reader_aff=1 any_aff=0' ) 330 values = parse_qs( output, True, True ) 331 receivedJobID = values[ 'job_key' ][ 0 ] 332 333 if jobID != receivedJobID: 334 raise Exception( "Received job ID does not match. Expected: " + \ 335 jobID + " Received: " + receivedJobID ) 336 return True 337 338class Scenario1510( TestBase ): 339 " Scenario 1510 " 340 341 def __init__( self, netschedule ): 342 TestBase.__init__( self, netschedule ) 343 344 @staticmethod 345 def getScenario(): 346 " Provides the scenario " 347 return "SUBMIT with a1, CHRAFF as identified (add a0, a2), " \ 348 "READ2 reader_aff = 1" 349 350 def execute( self ): 351 " Should return True if the execution completed successfully " 352 self.fromScratch() 353 354 jobID = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 355 356 ns_client = self.getNetScheduleService( 'TEST', 'scenario1510' ) 357 ns_client.set_client_identification( 'node', 'session' ) 358 execAny( ns_client, "CHRAFF add=a0,a2" ) 359 360 receivedJobID = self.ns.getJob( 'TEST', -1, 'a1' )[ 0 ] 361 if jobID != receivedJobID: 362 raise Exception( "Unexpected job for execution" ) 363 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 364 365 output = execAny( ns_client, 366 'READ2 reader_aff=1 any_aff=0' ) 367 if output != "no_more_jobs=true": 368 raise Exception( "Expect no jobs, but received one." ) 369 return True 370 371 372class Scenario1511( TestBase ): 373 " Scenario 1511 " 374 375 def __init__( self, netschedule ): 376 TestBase.__init__( self, netschedule ) 377 378 @staticmethod 379 def getScenario(): 380 " Provides the scenario " 381 return "CHRAFF as identified (add a0, a1, a2), SUBMIT with a1, " \ 382 "READ2 reader_aff = 1" 383 384 def execute( self ): 385 " Should return True if the execution completed successfully " 386 self.fromScratch() 387 388 ns_client = self.getNetScheduleService( 'TEST', 'scenario1511' ) 389 ns_client.set_client_identification( 'node', 'session' ) 390 execAny( ns_client, "CHRAFF add=a0,a1,a2" ) 391 392 jobID = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 393 receivedJobID = self.ns.getJob( 'TEST', -1, 'a1' )[ 0 ] 394 if jobID != receivedJobID: 395 raise Exception( "Unexpected job for execution" ) 396 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 397 398 output = execAny( ns_client, 399 'READ2 reader_aff=1 any_aff=0' ) 400 values = parse_qs( output, True, True ) 401 receivedJobID = values[ 'job_key' ][ 0 ] 402 403 if jobID != receivedJobID: 404 raise Exception( "Received job ID does not match. Expected: " + \ 405 jobID + " Received: " + receivedJobID ) 406 return True 407 408class Scenario1512( TestBase ): 409 " Scenario 1512 " 410 411 def __init__( self, netschedule ): 412 TestBase.__init__( self, netschedule ) 413 414 @staticmethod 415 def getScenario(): 416 " Provides the scenario " 417 return "CHRAFF as identified (add a0, a1, a2), SUBMIT with a5, " \ 418 "READ2 reader_aff = 1" 419 420 def execute( self ): 421 " Should return True if the execution completed successfully " 422 self.fromScratch() 423 424 ns_client = self.getNetScheduleService( 'TEST', 'scenario1512' ) 425 ns_client.set_client_identification( 'node', 'session' ) 426 execAny( ns_client, "CHRAFF add=a0,a1,a2" ) 427 428 jobID = self.ns.submitJob( 'TEST', 'bla', 'a5' ) 429 receivedJobID = self.ns.getJob( 'TEST', -1, 'a5' )[ 0 ] 430 if jobID != receivedJobID: 431 raise Exception( "Unexpected job for execution" ) 432 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 433 434 output = execAny( ns_client, 'READ2 reader_aff=1 any_aff=0' ) 435 436 if output != "no_more_jobs=true": 437 raise Exception( "Expect no jobs, but received one." ) 438 return True 439 440class Scenario1513( TestBase ): 441 " Scenario 1513 " 442 443 def __init__( self, netschedule ): 444 TestBase.__init__( self, netschedule ) 445 446 @staticmethod 447 def getScenario(): 448 " Provides the scenario " 449 return "SUBMIT with a1, CHRAFF add=a2, READ2 reader_aff = 1 any_aff=1" 450 451 def execute( self ): 452 " Should return True if the execution completed successfully " 453 self.fromScratch() 454 455 jobID = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 456 457 ns_client = self.getNetScheduleService( 'TEST', 'scenario1513' ) 458 ns_client.set_client_identification( 'node', 'session' ) 459 execAny( ns_client, "CHRAFF add=a2" ) 460 461 receivedJobID = self.ns.getJob( 'TEST', -1, 'a1' )[ 0 ] 462 if jobID != receivedJobID: 463 raise Exception( "Unexpected job for execution" ) 464 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 465 466 output = execAny( ns_client, 467 'READ2 reader_aff=1 any_aff=1' ) 468 values = parse_qs( output, True, True ) 469 receivedJobID = values[ 'job_key' ][ 0 ] 470 471 if jobID != receivedJobID: 472 raise Exception( "Received job ID does not match. Expected: " + 473 jobID + " Received: " + receivedJobID ) 474 return True 475 476class Scenario1514( TestBase ): 477 " Scenario 1514 " 478 479 def __init__( self, netschedule ): 480 TestBase.__init__( self, netschedule ) 481 482 @staticmethod 483 def getScenario(): 484 " Provides the scenario " 485 return "SUBMIT with a0, SUBMIT with a1, SUBMIT with no aff, " \ 486 "CHRAFF add=a2, READ2 aff=a1 reader_aff = 1 any_aff=1" 487 488 def execute( self ): 489 " Should return True if the execution completed successfully " 490 self.fromScratch() 491 492 jobID1 = self.ns.submitJob( 'TEST', 'bla', 'a0' ) 493 jobID2 = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 494 jobID3 = self.ns.submitJob( 'TEST', 'bla', '' ) 495 496 ns_client = self.getNetScheduleService( 'TEST', 'scenario1514' ) 497 ns_client.set_client_identification( 'node', 'session' ) 498 499 receivedJobID = self.ns.getJob( 'TEST', -1, 'a0' )[ 0 ] 500 if jobID1 != receivedJobID: 501 raise Exception( "Unexpected job for execution" ) 502 execAny( ns_client, "PUT " + jobID1 + " 0 nooutput" ) 503 receivedJobID = self.ns.getJob( 'TEST', -1, 'a1' )[ 0 ] 504 if jobID2 != receivedJobID: 505 raise Exception( "Unexpected job for execution" ) 506 execAny( ns_client, "PUT " + jobID2 + " 0 nooutput" ) 507 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 508 if jobID3 != receivedJobID: 509 raise Exception( "Unexpected job for execution" ) 510 execAny( ns_client, "PUT " + jobID3 + " 0 nooutput" ) 511 512 513 execAny( ns_client, "CHRAFF add=a2" ) 514 output = execAny( ns_client, 515 'READ2 reader_aff=1 any_aff=1 aff=a1' ) 516 values = parse_qs( output, True, True ) 517 receivedJobID = values[ 'job_key' ][ 0 ] 518 519 if jobID2 != receivedJobID: 520 raise Exception( "Received job ID does not match. Expected: " + \ 521 jobID2 + " Received: " + receivedJobID ) 522 return True 523 524class Scenario1515( TestBase ): 525 " Scenario 1515 " 526 527 def __init__( self, netschedule ): 528 TestBase.__init__( self, netschedule ) 529 530 @staticmethod 531 def getScenario(): 532 " Provides the scenario " 533 return "SUBMIT with a1, SUBMIT with a2, SUBMIT with no aff, " \ 534 "CHRAFF add=a2, READ2 aff=a5 reader_aff = 1 any_aff=1" 535 536 def execute( self ): 537 " Should return True if the execution completed successfully " 538 self.fromScratch() 539 540 jobID1 = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 541 jobID2 = self.ns.submitJob( 'TEST', 'bla', 'a2' ) 542 jobID3 = self.ns.submitJob( 'TEST', 'bla', '' ) 543 544 ns_client = self.getNetScheduleService( 'TEST', 'scenario1515' ) 545 ns_client.set_client_identification( 'node', 'session' ) 546 547 receivedJobID = self.ns.getJob( 'TEST', -1, 'a1' )[ 0 ] 548 if jobID1 != receivedJobID: 549 raise Exception( "Unexpected job for execution" ) 550 execAny( ns_client, "PUT " + jobID1 + " 0 nooutput" ) 551 receivedJobID = self.ns.getJob( 'TEST', -1, 'a2' )[ 0 ] 552 if jobID2 != receivedJobID: 553 raise Exception( "Unexpected job for execution" ) 554 execAny( ns_client, "PUT " + jobID2 + " 0 nooutput" ) 555 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 556 if jobID3 != receivedJobID: 557 raise Exception( "Unexpected job for execution" ) 558 execAny( ns_client, "PUT " + jobID3 + " 0 nooutput" ) 559 560 execAny( ns_client, "CHRAFF add=a2" ) 561 output = execAny( ns_client, 562 'READ2 reader_aff=1 any_aff=1 aff=a5' ) 563 values = parse_qs( output, True, True ) 564 receivedJobID = values[ 'job_key' ][ 0 ] 565 566 if jobID2 != receivedJobID: 567 raise Exception( "Received job ID does not match. Expected: " + \ 568 jobID2 + " Received: " + receivedJobID ) 569 return True 570 571class Scenario1516( TestBase ): 572 " Scenario 1516 " 573 574 def __init__( self, netschedule ): 575 TestBase.__init__( self, netschedule ) 576 577 @staticmethod 578 def getScenario(): 579 " Provides the scenario " 580 return "SUBMIT with a1, SUBMIT with a2, SUBMIT with no aff, " \ 581 "CHRAFF add=a2, READ2 aff=a5 reader_aff=0 any_aff=1" 582 583 def execute( self ): 584 " Should return True if the execution completed successfully " 585 self.fromScratch() 586 587 jobID1 = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 588 jobID2 = self.ns.submitJob( 'TEST', 'bla', 'a2' ) 589 jobID3 = self.ns.submitJob( 'TEST', 'bla', '' ) 590 591 ns_client = self.getNetScheduleService( 'TEST', 'scenario1516' ) 592 ns_client.set_client_identification( 'node', 'session' ) 593 594 receivedJobID = self.ns.getJob( 'TEST', -1, 'a1' )[ 0 ] 595 if jobID1 != receivedJobID: 596 raise Exception( "Unexpected job for execution" ) 597 execAny( ns_client, "PUT " + jobID1 + " 0 nooutput" ) 598 receivedJobID = self.ns.getJob( 'TEST', -1, 'a2' )[ 0 ] 599 if jobID2 != receivedJobID: 600 raise Exception( "Unexpected job for execution" ) 601 execAny( ns_client, "PUT " + jobID2 + " 0 nooutput" ) 602 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 603 if jobID3 != receivedJobID: 604 raise Exception( "Unexpected job for execution" ) 605 execAny( ns_client, "PUT " + jobID3 + " 0 nooutput" ) 606 607 execAny( ns_client, "CHRAFF add=a2" ) 608 output = execAny( ns_client, 609 'READ2 reader_aff=0 any_aff=1 aff=a5' ) 610 values = parse_qs( output, True, True ) 611 receivedJobID = values[ 'job_key' ][ 0 ] 612 613 if jobID1 != receivedJobID: 614 raise Exception( "Received job ID does not match. Expected: " + \ 615 jobID1 + " Received: " + receivedJobID ) 616 return True 617 618class Scenario1517( TestBase ): 619 " Scenario 1517 " 620 621 def __init__( self, netschedule ): 622 TestBase.__init__( self, netschedule ) 623 624 @staticmethod 625 def getScenario(): 626 " Provides the scenario " 627 return "SUBMIT with a1, SUBMIT with a2, SUBMIT with no aff, " \ 628 "CHRAFF add=a7, READ2 aff=a5 reader_aff=1 any_aff=0" 629 630 def execute( self ): 631 " Should return True if the execution completed successfully " 632 self.fromScratch() 633 634 jobID1 = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 635 jobID2 = self.ns.submitJob( 'TEST', 'bla', 'a2' ) 636 jobID3 = self.ns.submitJob( 'TEST', 'bla', '' ) 637 638 ns_client = self.getNetScheduleService( 'TEST', 'scenario1517' ) 639 ns_client.set_client_identification( 'node', 'session' ) 640 641 receivedJobID = self.ns.getJob( 'TEST', -1, 'a1' )[ 0 ] 642 if jobID1 != receivedJobID: 643 raise Exception( "Unexpected job for execution" ) 644 execAny( ns_client, "PUT " + jobID1 + " 0 nooutput" ) 645 receivedJobID = self.ns.getJob( 'TEST', -1, 'a2' )[ 0 ] 646 if jobID2 != receivedJobID: 647 raise Exception( "Unexpected job for execution" ) 648 execAny( ns_client, "PUT " + jobID2 + " 0 nooutput" ) 649 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 650 if jobID3 != receivedJobID: 651 raise Exception( "Unexpected job for execution" ) 652 execAny( ns_client, "PUT " + jobID3 + " 0 nooutput" ) 653 654 execAny( ns_client, "CHRAFF add=a7" ) 655 output = execAny( ns_client, 656 'READ2 reader_aff=1 any_aff=0 aff=a5' ) 657 if output != "no_more_jobs=true": 658 raise Exception( "Expect no jobs, but received one." ) 659 return True 660 661class Scenario1518( TestBase ): 662 " Scenario 11518 " 663 664 def __init__( self, netschedule ): 665 TestBase.__init__( self, netschedule ) 666 667 @staticmethod 668 def getScenario(): 669 " Provides the scenario " 670 return "SUBMIT with a1, SUBMIT with a2, SUBMIT with no aff, " \ 671 "CHRAFF add=a7, READ2 aff=a5 reader_aff=1 any_aff=1" 672 673 def execute( self ): 674 " Should return True if the execution completed successfully " 675 self.fromScratch() 676 677 jobID1 = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 678 jobID2 = self.ns.submitJob( 'TEST', 'bla', 'a2' ) 679 jobID3 = self.ns.submitJob( 'TEST', 'bla', '' ) 680 681 ns_client = self.getNetScheduleService( 'TEST', 'scenario1518' ) 682 ns_client.set_client_identification( 'node', 'session' ) 683 684 receivedJobID = self.ns.getJob( 'TEST', -1, 'a1' )[ 0 ] 685 if jobID1 != receivedJobID: 686 raise Exception( "Unexpected job for execution" ) 687 execAny( ns_client, "PUT " + jobID1 + " 0 nooutput" ) 688 receivedJobID = self.ns.getJob( 'TEST', -1, 'a2' )[ 0 ] 689 if jobID2 != receivedJobID: 690 raise Exception( "Unexpected job for execution" ) 691 execAny( ns_client, "PUT " + jobID2 + " 0 nooutput" ) 692 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 693 if jobID3 != receivedJobID: 694 raise Exception( "Unexpected job for execution" ) 695 execAny( ns_client, "PUT " + jobID3 + " 0 nooutput" ) 696 697 execAny( ns_client, "CHRAFF add=a7" ) 698 output = execAny( ns_client, 699 'READ2 reader_aff=1 any_aff=1 aff=a5' ) 700 values = parse_qs( output, True, True ) 701 receivedJobID = values[ 'job_key' ][ 0 ] 702 703 if jobID1 != receivedJobID: 704 raise Exception( "Received job ID does not match. Expected: " + 705 jobID1 + " Received: " + receivedJobID ) 706 return True 707 708class Scenario1519( TestBase ): 709 " Scenario 1519 " 710 711 def __init__( self, netschedule ): 712 TestBase.__init__( self, netschedule ) 713 714 @staticmethod 715 def getScenario(): 716 " Provides the scenario " 717 return "SUBMIT with a1, restart netschedule, CHRAFF add=a1, READ2 reader_aff=1" 718 719 def execute( self ): 720 " Should return True if the execution completed successfully " 721 self.fromScratch() 722 723 jobID1 = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 724 725 self.ns.shutdown() 726 self.ns.resetPID() 727 time.sleep( 5 ) 728 if self.ns.isRunning(): 729 raise Exception( "Cannot shutdown netschedule" ) 730 731 self.ns.start() 732 if not self.ns.isRunning(): 733 raise Exception( "Cannot start netschedule" ) 734 735 ns_client = self.getNetScheduleService( 'TEST', 'scenario1519' ) 736 ns_client.set_client_identification( 'node', 'session' ) 737 738 receivedJobs = self.ns.getJob( 'TEST', -1, 'a1' ) 739 if receivedJobs is None: 740 raise Exception( "Expected job was not provided" ) 741 receivedJobID = receivedJobs[ 0 ] 742 if jobID1 != receivedJobID: 743 raise Exception( "Unexpected job for execution" ) 744 execAny( ns_client, "PUT " + jobID1 + " 0 nooutput" ) 745 746 execAny( ns_client, "CHRAFF add=a1" ) 747 output = execAny( ns_client, 'READ2 reader_aff=1 any_aff=0' ) 748 values = parse_qs( output, True, True ) 749 receivedJobID = values[ 'job_key' ][ 0 ] 750 751 if jobID1 != receivedJobID: 752 raise Exception( "Received job ID does not match. Expected: " + 753 jobID1 + " Received: " + receivedJobID ) 754 return True 755 756 757class Scenario1520( TestBase ): 758 " Scenario 1520 " 759 760 def __init__( self, netschedule ): 761 TestBase.__init__( self, netschedule ) 762 763 @staticmethod 764 def getScenario(): 765 " Provides the scenario " 766 return "READ2 with timeout and port and affinity a1" 767 768 def execute( self ): 769 " Should return True if the execution completed successfully " 770 self.fromScratch() 771 772 ns_client = self.getNetScheduleService( 'TEST', 'scenario1520' ) 773 ns_client.set_client_identification( 'node', 'session' ) 774 775 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 776 notifSocket.bind( ( "", 0 ) ) 777 notifPort = notifSocket.getsockname()[ 1 ] 778 779 execAny( ns_client, 780 'READ2 reader_aff=0 any_aff=0 exclusive_new_aff=0 aff=a1 port=' + str( notifPort ) + ' timeout=3' ) 781 782 # Submit a job 783 jobID = self.ns.submitJob( 'TEST', 'input' ) 784 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 785 if jobID != receivedJobID: 786 raise Exception( "Unexpected job for execution" ) 787 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 788 789 time.sleep( 3 ) 790 result = self.getNotif( notifSocket ) 791 if result != 0: 792 notifSocket.close() 793 raise Exception( "Expect no notifications but received some" ) 794 notifSocket.close() 795 return True 796 797 def getNotif( self, s ): 798 " Retrieves notifications " 799 try: 800 data = s.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 801 if "queue=TEST" not in data: 802 raise Exception( "Unexpected notification in socket" ) 803 return 1 804 except Exception as ex: 805 if "Unexpected notification in socket" in str( ex ): 806 raise 807 pass 808 return 0 809 810class Scenario1521( TestBase ): 811 " Scenario 1521 " 812 813 def __init__( self, netschedule ): 814 TestBase.__init__( self, netschedule ) 815 816 @staticmethod 817 def getScenario(): 818 " Provides the scenario " 819 return "READ2 with timeout and port, and explicit aff a1" 820 821 def execute( self ): 822 " Should return True if the execution completed successfully " 823 self.fromScratch() 824 825 ns_client = self.getNetScheduleService( 'TEST', 'scenario1521' ) 826 ns_client.set_client_identification( 'node', 'session' ) 827 828 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 829 notifSocket.bind( ( "", 0 ) ) 830 notifPort = notifSocket.getsockname()[ 1 ] 831 832 execAny( ns_client, 833 'READ2 reader_aff=0 any_aff=0 exclusive_new_aff=0 aff=a1 port=' + str( notifPort ) + ' timeout=5' ) 834 835 # Submit a job 836 jobID = self.ns.submitJob( 'TEST', 'input', 'a1' ) 837 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 838 if jobID != receivedJobID: 839 raise Exception( "Unexpected job for execution" ) 840 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 841 842 time.sleep( 3 ) 843 result = self.getNotif( notifSocket ) 844 notifSocket.close() 845 if result == 0: 846 raise Exception( "Expected notification(s), received nothing" ) 847 return True 848 849 def getNotif( self, s ): 850 " Retrieves notifications " 851 try: 852 data = s.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 853 if "queue=TEST" not in data: 854 raise Exception( "Unexpected notification in socket" ) 855 return 1 856 except Exception as ex: 857 if "Unexpected notification in socket" in str( ex ): 858 raise 859 pass 860 return 0 861 862class Scenario1522( TestBase ): 863 " Scenario 1522 " 864 865 def __init__( self, netschedule ): 866 TestBase.__init__( self, netschedule ) 867 868 @staticmethod 869 def getScenario(): 870 " Provides the scenario " 871 return "READ2 with timeout and port, and explicit aff a1, and any_aff=1" 872 873 def execute( self ): 874 " Should return True if the execution completed successfully " 875 self.fromScratch() 876 877 ns_client = self.getNetScheduleService( 'TEST', 'scenario1522' ) 878 ns_client.set_client_identification( 'node', 'session' ) 879 880 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 881 notifSocket.bind( ( "", 0 ) ) 882 notifPort = notifSocket.getsockname()[ 1 ] 883 884 execAny( ns_client, 885 'READ2 reader_aff=0 any_aff=1 exclusive_new_aff=0 aff=a1 port=' + str( notifPort ) + ' timeout=3' ) 886 887 # Submit a job 888 jobID = self.ns.submitJob( 'TEST', 'input', 'a2' ) 889 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 890 if jobID != receivedJobID: 891 raise Exception( "Unexpected job for execution" ) 892 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 893 894 time.sleep( 3 ) 895 result = self.getNotif( notifSocket ) 896 notifSocket.close() 897 if result == 0: 898 raise Exception( "Expected notification(s), received nothing" ) 899 return True 900 901 def getNotif( self, s ): 902 " Retrieves notifications " 903 try: 904 data = s.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 905 if "queue=TEST" not in data: 906 raise Exception( "Unexpected notification in socket" ) 907 return 1 908 except Exception as ex: 909 if "Unexpected notification in socket" in str( ex ): 910 raise 911 pass 912 return 0 913 914 915class Scenario1523( TestBase ): 916 " Scenario 1523 " 917 918 def __init__( self, netschedule ): 919 TestBase.__init__( self, netschedule ) 920 921 @staticmethod 922 def getScenario(): 923 " Provides the scenario " 924 return "CHRAFF add=a3; READ2 with timeout and port, wnode_aff=1" 925 926 def execute( self ): 927 " Should return True if the execution completed successfully " 928 self.fromScratch() 929 930 ns_client = self.getNetScheduleService( 'TEST', 'scenario1523' ) 931 ns_client.set_client_identification( 'node', 'session' ) 932 execAny( ns_client, "CHRAFF add=a3" ) 933 934 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 935 notifSocket.bind( ( "", 0 ) ) 936 notifPort = notifSocket.getsockname()[ 1 ] 937 938 execAny( ns_client, 939 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1 port=' + str( notifPort ) + ' timeout=3' ) 940 941 # Submit a job 942 jobID = self.ns.submitJob( 'TEST', 'input', 'a3' ) 943 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 944 if jobID != receivedJobID: 945 raise Exception( "Unexpected job for execution" ) 946 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 947 948 time.sleep( 3 ) 949 result = self.getNotif( notifSocket ) 950 notifSocket.close() 951 if result == 0: 952 raise Exception( "Expected one notification, received nothing" ) 953 return True 954 955 def getNotif( self, s ): 956 " Retrieves notifications " 957 try: 958 data = s.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 959 if "queue=TEST" not in data: 960 raise Exception( "Unexpected notification in socket" ) 961 return 1 962 except Exception as ex: 963 if "Unexpected notification in socket" in str( ex ): 964 raise 965 pass 966 return 0 967 968 969class Scenario1524( TestBase ): 970 " Scenario 1524 " 971 972 def __init__( self, netschedule ): 973 TestBase.__init__( self, netschedule ) 974 975 @staticmethod 976 def getScenario(): 977 " Provides the scenario " 978 return "CHRAFF add=a3; READ2 with timeout and port, wnode_aff=1" 979 980 def execute( self ): 981 " Should return True if the execution completed successfully " 982 self.fromScratch() 983 984 ns_client = self.getNetScheduleService( 'TEST', 'scenario1524' ) 985 ns_client.set_client_identification( 'node', 'session' ) 986 execAny( ns_client, "CHRAFF add=a3" ) 987 988 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 989 notifSocket.bind( ( "", 0 ) ) 990 notifPort = notifSocket.getsockname()[ 1 ] 991 992 execAny( ns_client, 993 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=0 port=' + str( notifPort ) + ' timeout=3' ) 994 995 # Submit a job 996 jobID = self.ns.submitJob( 'TEST', 'input', 'a4' ) 997 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 998 if jobID != receivedJobID: 999 raise Exception( "Unexpected job for execution" ) 1000 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 1001 1002 time.sleep( 3 ) 1003 result = self.getNotif( notifSocket ) 1004 notifSocket.close() 1005 if result != 0: 1006 raise Exception( "Received notifications when not expected" ) 1007 return True 1008 1009 def getNotif( self, s ): 1010 " Retrieves notifications " 1011 try: 1012 data = s.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 1013 if "queue=TEST" not in data: 1014 raise Exception( "Unexpected notification in socket" ) 1015 return 1 1016 except Exception as ex: 1017 if "Unexpected notification in socket" in str( ex ): 1018 raise 1019 pass 1020 return 0 1021 1022class Scenario1525( TestBase ): 1023 " Scenario 1525 " 1024 1025 def __init__( self, netschedule ): 1026 TestBase.__init__( self, netschedule ) 1027 1028 @staticmethod 1029 def getScenario(): 1030 " Provides the scenario " 1031 return "CWREAD as anonymous" 1032 1033 def execute( self ): 1034 " Should return True if the execution completed successfully " 1035 self.fromScratch() 1036 1037 ns_client = self.getNetScheduleService( 'TEST', 'scenario1525' ) 1038 try: 1039 ns_client.set_client_identification( '', '' ) 1040 except: 1041 pass 1042 1043 try: 1044 execAny( ns_client, 'CWREAD' ) 1045 except Exception as exc: 1046 if "no client_node and client_session at handshake" in str( exc ): 1047 return True 1048 raise 1049 raise Exception( "Expected auth exception, got nothing" ) 1050 1051class Scenario1526( TestBase ): 1052 " Scenario 1526 " 1053 1054 def __init__( self, netschedule ): 1055 TestBase.__init__( self, netschedule ) 1056 1057 @staticmethod 1058 def getScenario(): 1059 " Provides the scenario " 1060 return "CWREAD as identified" 1061 1062 def execute( self ): 1063 " Should return True if the execution completed successfully " 1064 self.fromScratch() 1065 1066 ns_client = self.getNetScheduleService( 'TEST', 'scenario1526' ) 1067 ns_client.set_client_identification( 'mynode', 'mysession' ) 1068 execAny( ns_client, 'CWREAD' ) 1069 return True 1070 1071class Scenario1527( TestBase ): 1072 " Scenario 1527 " 1073 1074 def __init__( self, netschedule ): 1075 TestBase.__init__( self, netschedule ) 1076 1077 @staticmethod 1078 def getScenario(): 1079 " Provides the scenario " 1080 return "READ2, CWREAD, SUBMIT -> no notifications" 1081 1082 def execute( self ): 1083 " Should return True if the execution completed successfully " 1084 self.fromScratch() 1085 1086 ns_client = self.getNetScheduleService( 'TEST', 'scenario1527' ) 1087 ns_client.set_client_identification( 'node', 'session' ) 1088 1089 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 1090 notifSocket.bind( ( "", 0 ) ) 1091 notifPort = notifSocket.getsockname()[ 1 ] 1092 1093 execAny( ns_client, 1094 'READ2 reader_aff=0 any_aff=1 exclusive_new_aff=0 port=' + str( notifPort ) + ' timeout=3' ) 1095 execAny( ns_client, 'CWREAD' ) 1096 1097 # Submit a job 1098 jobID = self.ns.submitJob( 'TEST', 'input', 'a4' ) 1099 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 1100 if jobID != receivedJobID: 1101 raise Exception( "Unexpected job for execution" ) 1102 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 1103 1104 time.sleep( 3 ) 1105 result = self.getNotif( notifSocket ) 1106 notifSocket.close() 1107 if result != 0: 1108 raise Exception( "Received notifications when not expected" ) 1109 return True 1110 1111 def getNotif( self, s ): 1112 " Retrieves notifications " 1113 try: 1114 data = s.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 1115 if "queue=TEST" not in data: 1116 raise Exception( "Unexpected notification in socket" ) 1117 return 1 1118 except Exception as ex: 1119 if "Unexpected notification in socket" in str( ex ): 1120 raise 1121 pass 1122 return 0 1123 1124class Scenario1528( TestBase ): 1125 " Scenario 1528 " 1126 1127 def __init__( self, netschedule ): 1128 TestBase.__init__( self, netschedule ) 1129 1130 @staticmethod 1131 def getScenario(): 1132 " Provides the scenario " 1133 return "CHRAFF add=a2, STAT AFFINITIES VERBOSE" 1134 1135 def execute( self ): 1136 " Should return True if the execution completed successfully " 1137 self.fromScratch() 1138 1139 ns_client = self.getNetScheduleService( 'TEST', 'scenario241' ) 1140 ns_client.set_client_identification( 'mynode', 'mysession' ) 1141 execAny( ns_client, "CHRAFF add=a2" ) 1142 1143 aff = getAffinityInfo( ns_client, True, 1, 0 ) 1144 if aff[ 'affinity_token' ] != 'a2': 1145 raise Exception( "Expected aff token: a2, received: " + 1146 aff[ 'affinity_token' ] ) 1147 if aff[ 'clients__explicit_wget' ] is not None: 1148 raise Exception( "Expected 0 WGET clients, received: " + 1149 str( len( aff[ 'clients__explicit_wget' ] ) ) ) 1150 if aff[ 'clients__explicit_read' ] is not None: 1151 raise Exception( "Expected 0 wait read clients, received: " + 1152 str( len( aff[ 'clients__explicit_read' ] ) ) ) 1153 if aff[ 'wn_clients__preferred' ] is not None: 1154 raise Exception( "Expected 0 preferred clients, received: " + 1155 str( len( aff[ 'wn_clients__preferred' ] ) ) ) 1156 if len( aff[ 'reader_clients__preferred' ] ) != 1: 1157 raise Exception( "Expected 1 preferred read client, received: " + 1158 str( len( aff[ 'reader_clients__preferred' ] ) ) ) 1159 if aff[ 'reader_clients__preferred' ][ 0 ] != 'mynode': 1160 raise Exception( "Unexpected reader preferred client. " 1161 "Expected 'mynode', received: '" + 1162 aff[ 'reader_clients__preferred' ][ 0 ] + "'" ) 1163 return True 1164 1165class Scenario1529( TestBase ): 1166 " Scenario 1529 " 1167 1168 def __init__( self, netschedule ): 1169 TestBase.__init__( self, netschedule ) 1170 1171 @staticmethod 1172 def getScenario(): 1173 " Provides the scenario " 1174 return "CHRAFF del=a3, STAT AFFINITIES VERBOSE" 1175 1176 def execute( self ): 1177 " Should return True if the execution completed successfully " 1178 self.fromScratch() 1179 1180 ns_client = self.getNetScheduleService( 'TEST', 'scenario242' ) 1181 ns_client.set_client_identification( 'mynode', 'mysession' ) 1182 ns_client.on_warning = None 1183 1184 execAny( ns_client, "CHRAFF del=a3" ) 1185 1186 getAffinityInfo( ns_client, True, 0, 0 ) 1187 return True 1188 1189class Scenario1530( TestBase ): 1190 " Scenario 1530 " 1191 1192 def __init__( self, netschedule ): 1193 TestBase.__init__( self, netschedule ) 1194 1195 @staticmethod 1196 def getScenario(): 1197 " Provides the scenario " 1198 return "CHRAFF add=a4, CHRAFF del=a4, STAT AFFINITIES VERBOSE" 1199 1200 def execute( self ): 1201 " Should return True if the execution completed successfully " 1202 self.fromScratch() 1203 1204 ns_client = self.getNetScheduleService( 'TEST', 'scenario1530' ) 1205 ns_client.set_client_identification( 'mynode', 'mysession' ) 1206 execAny( ns_client, "CHRAFF add=a4" ) 1207 execAny( ns_client, "CHRAFF del=a4" ) 1208 1209 aff = getAffinityInfo( ns_client, True, 1, 0 ) 1210 if aff[ 'jobs' ] is not None: 1211 raise Exception( "Expected no jobs, received: " + 1212 str( len( aff[ 'jobs' ] ) ) ) 1213 1214 if aff[ 'affinity_token' ] != 'a4': 1215 raise Exception( "Expected aff token: a4, received: " + 1216 aff[ 'affinity_token' ] ) 1217 if aff[ 'clients__explicit_wget' ] is not None: 1218 raise Exception( "Expected 0 WGET clients, received: " + 1219 str( len( aff[ 'clients__explicit_wget' ] ) ) ) 1220 if aff[ 'clients__explicit_read' ] is not None: 1221 raise Exception( "Expected 0 wait read clients, received: " + 1222 str( len( aff[ 'clients__explicit_read' ] ) ) ) 1223 if aff[ 'wn_clients__preferred' ] is not None: 1224 raise Exception( "Expected 0 preferred clients, received: " + 1225 str( len( aff[ 'wn_clients__preferred' ] ) ) ) 1226 if aff[ 'reader_clients__preferred' ] is not None: 1227 raise Exception( "Expected 0 read preferred clients, received: " + 1228 str( len( aff[ 'reader_clients__preferred' ] ) ) ) 1229 return True 1230 1231class Scenario1531( TestBase ): 1232 " Scenario 1531 " 1233 1234 def __init__( self, netschedule ): 1235 TestBase.__init__( self, netschedule ) 1236 1237 @staticmethod 1238 def getScenario(): 1239 " Provides the scenario " 1240 return "READ2 as anonymous" 1241 1242 def execute( self ): 1243 " Should return True if the execution completed successfully " 1244 self.fromScratch() 1245 ns_client = self.getNetScheduleService( 'TEST', 'scenario1531' ) 1246 try: 1247 ns_client.set_client_identification( '', '' ) 1248 except: 1249 pass 1250 1251 try: 1252 execAny( ns_client, 1253 'READ2 reader_aff=0 any_aff=1' ) 1254 except Exception as exc: 1255 if "Anonymous client" in str( exc ): 1256 return True 1257 raise 1258 return False 1259 1260class Scenario1532( TestBase ): 1261 " Scenario 1532 " 1262 1263 def __init__( self, netschedule ): 1264 TestBase.__init__( self, netschedule ) 1265 1266 @staticmethod 1267 def getScenario(): 1268 " Provides the scenario " 1269 return "READ2, STAT NOTIFICATIONS" 1270 1271 def execute( self ): 1272 " Should return True if the execution completed successfully " 1273 self.fromScratch() 1274 1275 ns_client = self.getNetScheduleService( 'TEST', 'scenario1532' ) 1276 ns_client.set_client_identification( 'node', 'session' ) 1277 1278 execAny( ns_client, 1279 'READ2 reader_aff=0 any_aff=1 exclusive_new_aff=0 port=9007 timeout=5' ) 1280 time.sleep( 2 ) 1281 # STAT NOTIFICATIONS 1282 ns_client = self.getNetScheduleService( 'TEST', 'scenario1532' ) 1283 info = getNotificationInfo( ns_client, True, 1, 0 ) 1284 1285 if info[ "use_preferred_affinities" ] != False: 1286 raise Exception( "Unexpected use_preferred_affinities" ) 1287 if info[ "any_job" ] != True: 1288 raise Exception( "Unexpected any_job" ) 1289 if info[ "slow_rate_active" ] != False: 1290 raise Exception( "Unexpected slow_rate_active" ) 1291 if info[ 'active' ] != False: 1292 raise Exception( "Unexpected active" ) 1293 if info[ 'reason' ] != 'READ': 1294 raise Exception( "Unexpected reason" ) 1295 1296 return True 1297 1298class Scenario1533( TestBase ): 1299 " Scenario 1533 " 1300 1301 def __init__( self, netschedule ): 1302 TestBase.__init__( self, netschedule ) 1303 1304 @staticmethod 1305 def getScenario(): 1306 " Provides the scenario " 1307 return "READ2 any_aff = 1 exclusive_new_aff=1" 1308 1309 def execute( self ): 1310 " Should return True if the execution completed successfully " 1311 self.fromScratch() 1312 1313 ns_client = self.getNetScheduleService( 'TEST', 'scenario1533' ) 1314 ns_client.set_client_identification( 'node', 'session' ) 1315 try: 1316 execAny( ns_client, 1317 'READ2 reader_aff=1 any_aff=1 exclusive_new_aff=1' ) 1318 except Exception as exc: 1319 if "forbidden" in str( exc ): 1320 return True 1321 raise 1322 raise Exception( "Expected exception, got nothing" ) 1323 1324class Scenario1534( TestBase ): 1325 " Scenario 1534 " 1326 1327 def __init__( self, netschedule ): 1328 TestBase.__init__( self, netschedule ) 1329 1330 @staticmethod 1331 def getScenario(): 1332 " Provides the scenario " 1333 return "SUBMIT without aff, CHRAFF as identified (add a0, a1, a2), " \ 1334 "READ2 reader_aff = 1 exclusive_new_aff=1" 1335 1336 def report_warning( self, msg, server ): 1337 " Callback to report a warning " 1338 self.warning = msg 1339 1340 def execute( self ): 1341 " Should return True if the execution completed successfully " 1342 self.fromScratch() 1343 1344 jobID = self.ns.submitJob( 'TEST', 'bla' ) 1345 1346 ns_client = self.getNetScheduleService( 'TEST', 'scenario1534' ) 1347 ns_client.set_client_identification( 'node', 'session' ) 1348 ns_client.on_warning = self.report_warning 1349 execAny( ns_client, "CHRAFF add=a0,a1,a2 del=a3,a4,a5" ) 1350 1351 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 1352 if jobID != receivedJobID: 1353 raise Exception( "Unexpected job for execution" ) 1354 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 1355 1356 output = execAny( ns_client, 1357 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1358 values = parse_qs( output, True, True ) 1359 receivedJobID = values[ 'job_key' ][ 0 ] 1360 passport = values[ 'auth_token' ][ 0 ] 1361 1362 if jobID != receivedJobID: 1363 raise Exception( "Received job ID does not match. Expected: " + 1364 jobID + " Received: " + receivedJobID ) 1365 1366 execAny( ns_client, 'RDRB ' + jobID + ' ' + passport ) 1367 output = execAny( ns_client, 1368 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1369 if output != "no_more_jobs=false": 1370 raise Exception( "Expect no job (it's in the blacklist), " 1371 "but received one: " + output ) 1372 return True 1373 1374class Scenario1535( TestBase ): 1375 " Scenario 1535 " 1376 1377 def __init__( self, netschedule ): 1378 TestBase.__init__( self, netschedule ) 1379 1380 @staticmethod 1381 def getScenario(): 1382 " Provides the scenario " 1383 return "SUBMIT with a7, CHRAFF as identified (add a0, a1, a2), " \ 1384 "READ2 reader_aff = 1 exclusive_new_aff=1" 1385 1386 def report_warning( self, msg, server ): 1387 " Callback to report a warning " 1388 self.warning = msg 1389 1390 def execute( self ): 1391 " Should return True if the execution completed successfully " 1392 self.fromScratch() 1393 1394 jobID = self.ns.submitJob( 'TEST', 'bla', 'a7' ) 1395 1396 ns_client = self.getNetScheduleService( 'TEST', 'scenario1535' ) 1397 ns_client.set_client_identification( 'node', 'session' ) 1398 ns_client.on_warning = self.report_warning 1399 execAny( ns_client, "CHRAFF add=a0,a1,a2 del=a3,a4,a5" ) 1400 1401 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 1402 if jobID != receivedJobID: 1403 raise Exception( "Unexpected job for execution" ) 1404 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 1405 1406 output = execAny( ns_client, 1407 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1408 values = parse_qs( output, True, True ) 1409 receivedJobID = values[ 'job_key' ][ 0 ] 1410 passport = values[ 'auth_token' ][ 0 ] 1411 1412 if jobID != receivedJobID: 1413 raise Exception( "Received job ID does not match. Expected: " + 1414 jobID + " Received: " + receivedJobID ) 1415 1416 execAny( ns_client, 'RDRB ' + jobID + ' ' + passport ) 1417 output = execAny( ns_client, 1418 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1419 if output != "no_more_jobs=false": 1420 raise Exception( "Expect no job (it's in the blacklist), " 1421 "but received one: " + output ) 1422 return True 1423 1424class Scenario1536( TestBase ): 1425 " Scenario 1536 " 1426 1427 def __init__( self, netschedule ): 1428 TestBase.__init__( self, netschedule ) 1429 1430 @staticmethod 1431 def getScenario(): 1432 " Provides the scenario " 1433 return "SUBMIT with a7, CHRAFF as identified (add a0, a1, a2), " \ 1434 "READ2 reader_aff = 1 exclusive_new_aff=1" 1435 1436 def execute( self ): 1437 " Should return True if the execution completed successfully " 1438 self.fromScratch() 1439 1440 jobID = self.ns.submitJob( 'TEST', 'bla', 'a7' ) 1441 1442 ns_client = self.getNetScheduleService( 'TEST', 'scenario1536' ) 1443 ns_client.set_client_identification( 'node', 'session' ) 1444 execAny( ns_client, "CHRAFF add=a0,a1,a2" ) 1445 1446 ns_client2 = self.getNetScheduleService( 'TEST', 'scenario1536' ) 1447 ns_client2.set_client_identification( 'node2', 'session2' ) 1448 execAny( ns_client2, "CHRAFF add=a7" ) 1449 1450 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 1451 if jobID != receivedJobID: 1452 raise Exception( "Unexpected job for execution" ) 1453 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 1454 1455 output = execAny( ns_client, 1456 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1457 if output != "no_more_jobs=true": 1458 raise Exception( "Expect no job (non-unique affinity), " 1459 "but received one: " + output ) 1460 1461 execAny( ns_client2, "CHRAFF del=a7" ) 1462 output = execAny( ns_client, 1463 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1464 values = parse_qs( output, True, True ) 1465 receivedJobID = values[ 'job_key' ][ 0 ] 1466 passport = values[ 'auth_token' ][ 0 ] 1467 1468 if jobID != receivedJobID: 1469 raise Exception( "Received job ID does not match. Expected: " + 1470 jobID + " Received: " + receivedJobID ) 1471 1472 execAny( ns_client, 'RDRB ' + jobID + ' ' + passport ) 1473 output = execAny( ns_client, 1474 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1475 if output != "no_more_jobs=false": 1476 raise Exception( "Expect no job (it's in the blacklist), " 1477 "but received one: " + output ) 1478 return True 1479 1480class Scenario1537( TestBase ): 1481 " Scenario 1537 " 1482 1483 def __init__( self, netschedule ): 1484 TestBase.__init__( self, netschedule ) 1485 1486 @staticmethod 1487 def getScenario(): 1488 " Provides the scenario " 1489 return "Read notifications and blacklists" 1490 1491 def getNotif( self, s ): 1492 " Retrieves notifications " 1493 try: 1494 data = s.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 1495 if "queue=TEST" not in data: 1496 raise Exception( "Unexpected notification in socket" ) 1497 return 1 1498 except Exception as ex: 1499 if "Unexpected notification in socket" in str( ex ): 1500 raise 1501 pass 1502 return 0 1503 1504 def execute( self ): 1505 " Should return True if the execution completed successfully " 1506 self.fromScratch() 1507 1508 jobID = self.ns.submitJob( 'TEST', 'bla', 'a0' ) 1509 1510 ns_client = self.getNetScheduleService( 'TEST', 'scenario1537' ) 1511 ns_client.set_client_identification( 'node', 'session' ) 1512 execAny( ns_client, "CHRAFF add=a0" ) 1513 1514 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 1515 if jobID != receivedJobID: 1516 raise Exception( "Unexpected job for execution" ) 1517 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 1518 1519 output = execAny( ns_client, 1520 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1521 values = parse_qs( output, True, True ) 1522 receivedJobID = values[ 'job_key' ][ 0 ] 1523 passport = values[ 'auth_token' ][ 0 ] 1524 1525 if jobID != receivedJobID: 1526 raise Exception( "Received job ID does not match. Expected: " + 1527 jobID + " Received: " + receivedJobID ) 1528 1529 execAny( ns_client, 'RDRB ' + jobID + ' ' + passport ) 1530 # Here: pending job and it is in the client black list 1531 1532 ns_client2 = self.getNetScheduleService( 'TEST', 'scenario1537' ) 1533 ns_client2.set_client_identification( 'node2', 'session2' ) 1534 output = execAny( ns_client2, 1535 'READ2 reader_aff=1 any_aff=1 exclusive_new_aff=0' ) 1536 values = parse_qs( output, True, True ) 1537 receivedJobID = values[ 'job_key' ][ 0 ] 1538 passport = values[ 'auth_token' ][ 0 ] 1539 1540 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 1541 notifSocket.bind( ( "", 0 ) ) 1542 notifPort = notifSocket.getsockname()[ 1 ] 1543 1544 # The first client waits for a job 1545 execAny( ns_client, "READ2 reader_aff=0 any_aff=0 exclusive_new_aff=0 " 1546 "aff=a0 port=" + str( notifPort ) + " timeout=3" ) 1547 1548 # Sometimes it takes so long to spawn grid_cli that the next 1549 # command is sent before GET2 is sent. So, we have a sleep here 1550 time.sleep( 2 ) 1551 1552 # Return the job 1553 execAny( ns_client2, 'RDRB ' + jobID + ' ' + passport ) 1554 1555 result = self.getNotif( notifSocket ) 1556 notifSocket.close() 1557 if result != 0: 1558 raise Exception( "Received notifications when not expected" ) 1559 return True 1560 1561class Scenario1538( TestBase ): 1562 " Scenario 1538 " 1563 1564 def __init__( self, netschedule ): 1565 TestBase.__init__( self, netschedule ) 1566 1567 @staticmethod 1568 def getScenario(): 1569 " Provides the scenario " 1570 return "Notifications and exclusive affinities" 1571 1572 def execute( self ): 1573 " Should return True if the execution completed successfully " 1574 self.fromScratch() 1575 1576 jobID = self.ns.submitJob( 'TEST', 'bla', 'a0' ) 1577 1578 # First client holds a0 affinity 1579 ns_client1 = self.getNetScheduleService( 'TEST', 'scenario1538' ) 1580 ns_client1.set_client_identification( 'node1', 'session1' ) 1581 execAny( ns_client1, "CHRAFF add=a0" ) 1582 1583 # Second client holds a100 affinity 1584 ns_client2 = self.getNetScheduleService( 'TEST', 'scenario1538' ) 1585 ns_client2.set_client_identification( 'node2', 'session2' ) 1586 execAny( ns_client2, "CHRAFF add=a100" ) 1587 1588 # Socket to receive notifications 1589 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 1590 notifSocket.bind( ( "", 0 ) ) 1591 notifPort = notifSocket.getsockname()[ 1 ] 1592 1593 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 1594 if jobID != receivedJobID: 1595 raise Exception( "Unexpected job for execution" ) 1596 execAny( ns_client1, "PUT " + jobID + " 0 nooutput" ) 1597 1598 # Second client tries to get the - should get nothing 1599 output = execAny( ns_client2, 1600 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1 ' 1601 'port=' + str( notifPort ) + ' timeout=3' ) 1602 if output != "no_more_jobs=true": 1603 notifSocket.close() 1604 raise Exception( "Expect no jobs, received: " + output ) 1605 1606 time.sleep( 4 ) 1607 try: 1608 # Exception is expected 1609 data = notifSocket.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 1610 notifSocket.close() 1611 raise Exception( "Expected no notifications, received one: " + 1612 data ) 1613 except Exception as exc: 1614 if "Resource temporarily unavailable" not in str( exc ): 1615 notifSocket.close() 1616 raise 1617 1618 # Second client tries to get another pending job 1619 output = execAny( ns_client2, 1620 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1 ' 1621 'port=' + str( notifPort ) + ' timeout=3' ) 1622 1623 # Should get notifications after this submit 1624 jobID = self.ns.submitJob( 'TEST', 'bla', 'a5' ) # analysis:ignore 1625 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 1626 if jobID != receivedJobID: 1627 raise Exception( "Unexpected job for execution" ) 1628 execAny( ns_client1, "PUT " + jobID + " 0 nooutput" ) 1629 1630 time.sleep( 4 ) 1631 data = notifSocket.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 1632 notifSocket.close() 1633 1634 if "queue=TEST" not in data: 1635 raise Exception( "Expected notification, received garbage: " + 1636 data ) 1637 return True 1638 1639class Scenario1539( TestBase ): 1640 " Scenario 1539 " 1641 1642 def __init__( self, netschedule ): 1643 TestBase.__init__( self, netschedule ) 1644 1645 @staticmethod 1646 def getScenario(): 1647 " Provides the scenario " 1648 return "Read notifications and exclusive affinities" 1649 1650 def execute( self ): 1651 " Should return True if the execution completed successfully " 1652 self.fromScratch() 1653 1654 jobID = self.ns.submitJob( 'TEST', 'bla', 'a0' ) 1655 1656 # First client holds a0 affinity 1657 ns_client1 = self.getNetScheduleService( 'TEST', 'scenario1539' ) 1658 ns_client1.set_client_identification( 'node1', 'session1' ) 1659 execAny( ns_client1, "CHRAFF add=a0" ) 1660 1661 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 1662 if jobID != receivedJobID: 1663 raise Exception( "Unexpected job for execution" ) 1664 execAny( ns_client1, "PUT " + jobID + " 0 nooutput" ) 1665 1666 output = execAny( ns_client1, 1667 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1668 values = parse_qs( output, True, True ) 1669 receivedJobID = values[ 'job_key' ][ 0 ] 1670 passport = values[ 'auth_token' ][ 0 ] # analysis:ignore 1671 1672 if jobID != receivedJobID: 1673 raise Exception( "Unexpected received job ID" ) 1674 1675 1676 # Second client holds a100 affinity 1677 ns_client2 = self.getNetScheduleService( 'TEST', 'scenario1539' ) 1678 ns_client2.set_client_identification( 'node2', 'session2' ) 1679 execAny( ns_client2, "CHRAFF add=a100" ) 1680 1681 # Socket to receive notifications 1682 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 1683 notifSocket.bind( ( "", 0 ) ) 1684 notifPort = notifSocket.getsockname()[ 1 ] 1685 1686 # Second client tries to get the pending job - should get nothing 1687 output = execAny( ns_client2, 1688 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1 ' 1689 'port=' + str( notifPort ) + ' timeout=3' ) 1690 if output != "no_more_jobs=true": 1691 notifSocket.close() 1692 raise Exception( "Expect no jobs, received: " + output ) 1693 1694 time.sleep( 4 ) 1695 try: 1696 # Exception is expected 1697 data = notifSocket.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 1698 notifSocket.close() 1699 raise Exception( "Expected no notifications, received one: " + 1700 data ) 1701 except Exception as exc: 1702 if "Resource temporarily unavailable" not in str( exc ): 1703 notifSocket.close() 1704 raise 1705 1706 # Second client tries to get another job 1707 output = execAny( ns_client2, 1708 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1 ' 1709 'port=' + str( notifPort ) + ' timeout=3' ) 1710 1711 # Should get notifications after this clear because 1712 # the a0 affinity becomes available 1713 execAny( ns_client1, "CLRN" ) 1714 1715 time.sleep( 4 ) 1716 data = notifSocket.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 1717 notifSocket.close() 1718 1719 if "queue=TEST" not in data: 1720 raise Exception( "Expected notification, received garbage: " + 1721 data ) 1722 return True 1723 1724class Scenario1540( TestBase ): 1725 " Scenario 1540 " 1726 1727 def __init__( self, netschedule ): 1728 TestBase.__init__( self, netschedule ) 1729 1730 @staticmethod 1731 def getScenario(): 1732 " Provides the scenario " 1733 return "Reset client affinity" 1734 1735 def execute( self ): 1736 " Should return True if the execution completed successfully " 1737 self.fromScratch() 1738 1739 # First client holds a0 affinity 1740 ns_client = self.getNetScheduleService( 'TEST', 'scenario310' ) 1741 ns_client.set_client_identification( 'node1', 'session1' ) 1742 execAny( ns_client, "CHRAFF add=a0" ) 1743 1744 # Reader timeout is 5 sec 1745 time.sleep( 7 ) 1746 1747 ns_admin = self.getNetScheduleService( 'TEST', 'scenario310' ) 1748 info = getClientInfo( ns_admin, 'node1' ) 1749 if info[ 'reader_preferred_affinities_reset' ] != True: 1750 raise Exception( "Expected to have preferred affinities reset, " 1751 "received: " + 1752 str( info[ 'reader_preferred_affinities_reset' ] ) ) 1753 return True 1754 1755class Scenario1541( TestBase ): 1756 " Scenario 1541 " 1757 1758 def __init__( self, netschedule ): 1759 TestBase.__init__( self, netschedule ) 1760 1761 @staticmethod 1762 def getScenario(): 1763 " Provides the scenario " 1764 return "Reset client affinity" 1765 1766 def execute( self ): 1767 " Should return True if the execution completed successfully " 1768 self.fromScratch() 1769 1770 # First client holds a0 affinity 1771 ns_client = self.getNetScheduleService( 'TEST', 'scenario1541' ) 1772 ns_client.set_client_identification( 'node1', 'session1' ) 1773 execAny( ns_client, "CHRAFF add=a0" ) 1774 1775 ns_admin = self.getNetScheduleService( 'TEST', 'scenario310' ) 1776 1777 affInfo = getAffinityInfo( ns_admin ) 1778 if affInfo[ 'affinity_token' ] != 'a0': 1779 raise Exception( "Unexpected affinity registry content " 1780 "after adding 1 reader preferred affinity (token)" ) 1781 if affInfo[ 'reader_clients__preferred' ] != [ 'node1' ]: 1782 raise Exception( "Unexpected affinity registry content " 1783 "after adding 1 reader preferred affinity (node)" ) 1784 info = getClientInfo( ns_admin, 'node1' ) 1785 if info[ 'reader_preferred_affinities_reset' ] != False: 1786 raise Exception( "Expected to have reader preferred affinities non reset, " 1787 "received: " + 1788 str( info[ 'reader_preferred_affinities_reset' ] ) ) 1789 1790 # Reader timeout is 5 sec 1791 time.sleep( 7 ) 1792 1793 info = getClientInfo( ns_admin, 'node1' ) 1794 if info[ 'reader_preferred_affinities_reset' ] != True: 1795 raise Exception( "Expected to have reader preferred affinities reset, " 1796 "received: " + 1797 str( info[ 'reader_preferred_affinities_reset' ] ) ) 1798 1799 affInfo = getAffinityInfo( ns_admin ) 1800 if affInfo[ 'affinity_token' ] != 'a0': 1801 raise Exception( "Unexpected affinity registry content " 1802 "after reader is expired (token)" ) 1803 if affInfo[ 'reader_clients__preferred' ] != None: 1804 raise Exception( "Unexpected affinity registry content " 1805 "after reader is expired (node)" ) 1806 1807 try: 1808 output = execAny( ns_client, 1809 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1810 except Exception as excpt: 1811 if "ePrefAffExpired" in str( excpt ) or "expired" in str( excpt ): 1812 return True 1813 raise 1814 1815 raise Exception( "Expected exception in READ2 and did not get it: " + 1816 output ) 1817 1818class Scenario1542( TestBase ): 1819 " Scenario 1542 " 1820 1821 def __init__( self, netschedule ): 1822 TestBase.__init__( self, netschedule ) 1823 1824 @staticmethod 1825 def getScenario(): 1826 " Provides the scenario " 1827 return "Reset client affinity" 1828 1829 def execute( self ): 1830 " Should return True if the execution completed successfully " 1831 self.fromScratch() 1832 1833 # First client holds a0 affinity 1834 ns_client = self.getNetScheduleService( 'TEST', 'scenario1542' ) 1835 ns_client.set_client_identification( 'node1', 'session1' ) 1836 execAny( ns_client, "CHRAFF add=a0" ) 1837 1838 ns_admin = self.getNetScheduleService( 'TEST', 'scenario1542' ) 1839 1840 execAny( ns_client, 1841 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=0 ' 1842 'port=9007 timeout=4' ) 1843 1844 info = getNotificationInfo( ns_admin ) 1845 if info[ 'client_node' ] != 'node1': 1846 raise Exception( "Unexpected client in the notifications list: " + 1847 info[ 'client_node' ] ) 1848 1849 # Reader timeout is 5 sec 1850 time.sleep( 7 ) 1851 1852 info = getClientInfo( ns_admin, 'node1' ) 1853 if info[ 'reader_preferred_affinities_reset' ] != True: 1854 raise Exception( "Expected to have reader preferred affinities reset, " 1855 "received: " + 1856 str( info[ 'reader_preferred_affinities_reset' ] ) ) 1857 1858 affInfo = getAffinityInfo( ns_admin ) 1859 if affInfo[ 'affinity_token' ] != 'a0': 1860 raise Exception( "Unexpected affinity registry content " 1861 "after reader is expired (token)" ) 1862 if 'reader_clients__preferred' in affInfo: 1863 if affInfo[ 'reader_clients__preferred' ] != None: 1864 raise Exception( "Unexpected affinity registry content " 1865 "after reader is expired (node)" ) 1866 return True 1867 1868class Scenario1543( TestBase ): 1869 " Scenario 1543 " 1870 1871 def __init__( self, netschedule ): 1872 TestBase.__init__( self, netschedule ) 1873 1874 @staticmethod 1875 def getScenario(): 1876 " Provides the scenario " 1877 return "Change affinity, wait till reader expired, set affinity" 1878 1879 def execute( self ): 1880 " Should return True if the execution completed successfully " 1881 self.fromScratch( 7 ) 1882 1883 ns_client = self.getNetScheduleService( 'TEST', 'scenario1543' ) 1884 ns_client.set_client_identification( 'mynode', 'mysession' ) 1885 execAny( ns_client, "CHRAFF add=a1,a2" ) 1886 1887 client = getClientInfo( ns_client, 'mynode', verbose = False ) 1888 if client[ 'reader_preferred_affinities_reset' ] != False: 1889 raise Exception( "Expected non-resetted reader preferred affinities" ) 1890 1891 # wait till the reader is expired 1892 time.sleep( 12 ) 1893 1894 client = getClientInfo( ns_client, 'mynode', verbose = False ) 1895 if client[ 'reader_preferred_affinities_reset' ] != True: 1896 raise Exception( "Expected resetted reader preferred affinities" ) 1897 if client[ 'number_of_reader_preferred_affinities' ] != 0: 1898 raise Exception( 'Unexpected length of reader_preferred_affinities' ) 1899 1900 execAny( ns_client, 'SETRAFF' ) 1901 client = getClientInfo( ns_client, 'mynode', verbose = False ) 1902 1903 if client[ 'reader_preferred_affinities_reset' ] != False: 1904 raise Exception( "Expected non-resetted reader preferred affinities" \ 1905 " after SETRAFF" ) 1906 if client[ 'number_of_reader_preferred_affinities' ] != 0: 1907 raise Exception( 'Unexpected length of reader_preferred_affinities' ) 1908 1909 execAny( ns_client, 'SETRAFF a4,a7' ) 1910 client = getClientInfo( ns_client, 'mynode', verbose = False ) 1911 if client[ 'reader_preferred_affinities_reset' ] != False: 1912 raise Exception( "Expected non-resetted reader preferred affinities" \ 1913 " after SETRAFF" ) 1914 if client[ 'number_of_reader_preferred_affinities' ] != 2: 1915 raise Exception( 'Unexpected length of reader_preferred_affinities' ) 1916 1917 return True 1918 1919class Scenario1544( TestBase ): 1920 " Scenario 1544 " 1921 1922 def __init__( self, netschedule ): 1923 TestBase.__init__( self, netschedule ) 1924 return 1925 1926 @staticmethod 1927 def getScenario(): 1928 " Provides the scenario " 1929 return "READ2 for outdated job" 1930 1931 def execute( self ): 1932 " Should return True if the execution completed successfully " 1933 self.fromScratch( 4 ) 1934 1935 # Client #2 plays a passive role of holding an affinity (a2) 1936 ns_client2 = self.getNetScheduleService( 'TEST', 'scenario1544' ) 1937 ns_client2.set_client_identification( 'node2', 'session2' ) 1938 execAny( ns_client2, "CHRAFF add=a2" ) 1939 1940 jobID = self.ns.submitJob( 'TEST', 'bla', 'a2' ) 1941 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 1942 if jobID != receivedJobID: 1943 raise Exception( "Unexpected job for execution" ) 1944 execAny( ns_client2, "PUT " + jobID + " 0 nooutput" ) 1945 1946 ns_client1 = self.getNetScheduleService( 'TEST', 'scenario1544' ) 1947 ns_client1.set_client_identification( 'node1', 'session1' ) 1948 execAny( ns_client1, "CHRAFF add=a1" ) 1949 1950 output = execAny( ns_client1, 1951 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1952 if output != 'no_more_jobs=true': 1953 raise Exception( "Expected no job, received: '" + output + "'" ) 1954 1955 # 10 seconds till the job becomes obsolete 1956 time.sleep( 12 ) 1957 1958 output = execAny( ns_client1, 1959 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1' ) 1960 values = parse_qs( output, True, True ) 1961 jobKey = values[ 'job_key' ][ 0 ] 1962 if jobKey != jobID: 1963 raise Exception( "Expected a job, received: '" + output + "'" ) 1964 1965 return True 1966 1967class Scenario1545( TestBase ): 1968 " Scenario 1545 " 1969 1970 def __init__( self, netschedule ): 1971 TestBase.__init__( self, netschedule ) 1972 return 1973 1974 @staticmethod 1975 def getScenario(): 1976 " Provides the scenario " 1977 return "Notifications for outdated READ job" 1978 1979 def execute( self ): 1980 " Should return True if the execution completed successfully " 1981 self.fromScratch( 4 ) 1982 1983 # Client #2 plays a passive role of holding an affinity (a2) 1984 ns_client2 = self.getNetScheduleService( 'TEST', 'scenario1545' ) 1985 ns_client2.set_client_identification( 'node2', 'session2' ) 1986 execAny( ns_client2, "CHRAFF add=a2" ) 1987 1988 jobID = self.ns.submitJob( 'TEST', 'bla', 'a2' ) 1989 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 1990 if jobID != receivedJobID: 1991 raise Exception( "Unexpected job for execution" ) 1992 execAny( ns_client2, "PUT " + jobID + " 0 nooutput" ) 1993 1994 ns_client1 = self.getNetScheduleService( 'TEST', 'scenario1545' ) 1995 ns_client1.set_client_identification( 'node1', 'session1' ) 1996 execAny( ns_client1, "CHRAFF add=a1" ) 1997 1998 # Socket to receive notifications 1999 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 2000 notifSocket.bind( ( "", 0 ) ) 2001 notifPort = notifSocket.getsockname()[ 1 ] 2002 2003 # Second client tries to get the pending job - should get nothing 2004 output = execAny( ns_client1, 2005 'READ2 reader_aff=1 any_aff=0 exclusive_new_aff=1 port=' + str( notifPort ) + ' timeout=15' ) 2006 if output != "no_more_jobs=true": 2007 notifSocket.close() 2008 raise Exception( "Expect no jobs, received: " + output ) 2009 2010 # 10 seconds till the job becomes outdated 2011 time.sleep( 12 ) 2012 2013 data = notifSocket.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 2014 notifSocket.close() 2015 if "queue=TEST" not in data: 2016 raise Exception( "Expected notification, received garbage: " + data ) 2017 return True 2018 2019 2020class Scenario1600( TestBase ): 2021 " Scenario 1600 " 2022 2023 def __init__( self, netschedule ): 2024 TestBase.__init__( self, netschedule ) 2025 2026 @staticmethod 2027 def getScenario(): 2028 " Provides the scenario " 2029 return "SUBMIT with a1, CHRAFF as identified (add a0, a2), " \ 2030 "READ2 reader_aff = 1 affinity_may_change=1/0" 2031 2032 def execute( self ): 2033 " Should return True if the execution completed successfully " 2034 self.fromScratch() 2035 2036 jobID = self.ns.submitJob( 'TEST', 'bla', 'a1' ) 2037 2038 ns_client = self.getNetScheduleService( 'TEST', 'scenario1600' ) 2039 ns_client.set_client_identification( 'node', 'session' ) 2040 execAny( ns_client, "CHRAFF add=a0,a2" ) 2041 2042 receivedJobID = self.ns.getJob( 'TEST', -1, 'a1' )[ 0 ] 2043 if jobID != receivedJobID: 2044 raise Exception( "Unexpected job for execution" ) 2045 2046 output = execAny( ns_client, 2047 'READ2 reader_aff=1 any_aff=0 affinity_may_change=1' ) 2048 if output != "no_more_jobs=false": 2049 raise Exception( "Expect no more jobs == false, but received: " + output ) 2050 output = execAny( ns_client, 2051 'READ2 reader_aff=1 any_aff=0 affinity_may_change=0' ) 2052 if output != "no_more_jobs=true": 2053 raise Exception( "Expect no more jobs == true, but received: " + output ) 2054 return True 2055 2056class Scenario1601( TestBase ): 2057 " Scenario 1601 " 2058 2059 def __init__( self, netschedule ): 2060 TestBase.__init__( self, netschedule ) 2061 2062 @staticmethod 2063 def getScenario(): 2064 " Provides the scenario " 2065 return "SUBMIT with g1 " \ 2066 "READ2 g='g2' group_may_change=1/0" 2067 2068 def execute( self ): 2069 " Should return True if the execution completed successfully " 2070 self.fromScratch() 2071 2072 jobID = self.ns.submitJob( 'TEST', 'bla', group='g1' ) 2073 2074 ns_client = self.getNetScheduleService( 'TEST', 'scenario1601' ) 2075 ns_client.set_client_identification( 'node', 'session' ) 2076 2077 receivedJobID = self.ns.getJob( 'TEST' )[ 0 ] 2078 if jobID != receivedJobID: 2079 raise Exception( "Unexpected job for execution" ) 2080 2081 output = execAny( ns_client, 2082 'READ2 reader_aff=0 any_aff=1 group=g2 group_may_change=1' ) 2083 if output != "no_more_jobs=false": 2084 raise Exception( "Expect no more jobs == false, but received: " + output ) 2085 output = execAny( ns_client, 2086 'READ2 reader_aff=0 any_aff=1 group=g2 group_may_change=0' ) 2087 if output != "no_more_jobs=true": 2088 raise Exception( "Expect no more jobs == true, but received: " + output ) 2089 return True 2090 2091 2092class Scenario1602( TestBase ): 2093 " Scenario 1602 " 2094 2095 def __init__( self, netschedule ): 2096 TestBase.__init__( self, netschedule ) 2097 2098 @staticmethod 2099 def getScenario(): 2100 " Provides the scenario " 2101 return "SUBMIT with a2, SUBMIT with a1 " \ 2102 "GET2 a=a1,a2 prioritized_aff=0, " \ 2103 "GET2 a=a1,a2 prioritized_aff=1" 2104 2105 def execute( self ): 2106 " Should return True if the execution completed successfully " 2107 self.fromScratch() 2108 2109 jobIDa2 = self.ns.submitJob( 'TEST', 'bla', affinity='a2' ) 2110 jobIDa1 = self.ns.submitJob( 'TEST', 'bla', affinity='a1' ) 2111 2112 ns_client = self.getNetScheduleService( 'TEST', 'scenario1602' ) 2113 ns_client.set_client_identification( 'node', 'session' ) 2114 2115 output = execAny( ns_client, 2116 'GET2 wnode_aff=0 any_aff=0 ' 2117 'aff=a1,a2 prioritized_aff=0' ) 2118 values = parse_qs( output, True, True ) 2119 jobKey = values[ 'job_key' ][ 0 ] 2120 if jobKey != jobIDa2: 2121 raise Exception( "Unexpected job for executing. " 2122 "Expected with affinity a2" ) 2123 2124 execAny( ns_client, "RETURN2 job_key=" + jobKey + 2125 " auth_token=" + values[ 'auth_token' ][ 0 ] + 2126 " blacklist=0" ) 2127 output = execAny( ns_client, 2128 'GET2 wnode_aff=0 any_aff=0 ' 2129 'aff=a1,a2 prioritized_aff=1' ) 2130 values = parse_qs( output, True, True ) 2131 jobKey = values[ 'job_key' ][ 0 ] 2132 if jobKey != jobIDa1: 2133 raise Exception( "Unexpected job for executing. " 2134 "Expected with affinity a1" ) 2135 output = execAny( ns_client, 2136 'GET2 wnode_aff=0 any_aff=0 ' 2137 'aff=a1,a2 prioritized_aff=0' ) 2138 values = parse_qs( output, True, True ) 2139 jobKey = values[ 'job_key' ][ 0 ] 2140 if jobKey != jobIDa2: 2141 raise Exception( "Unexpected job for executing. " 2142 "Expected with affinity a2 (second time)" ) 2143 return True 2144 2145 2146class Scenario1603( TestBase ): 2147 " Scenario 1603 " 2148 2149 def __init__( self, netschedule ): 2150 TestBase.__init__( self, netschedule ) 2151 2152 @staticmethod 2153 def getScenario(): 2154 " Provides the scenario " 2155 return "SUBMIT with a2, SUBMIT with a1 " \ 2156 "GET2 a=a1,a2 wnode_aff=1 prioritized_aff=1 => ERR, " \ 2157 "GET2 a=a1,a2 any_aff=1 prioritized_aff=1 => ERR, " \ 2158 "GET2 a=a1,a2 exclusive_new_aff=1 prioritized_aff=1 => ERR" 2159 2160 def execute( self ): 2161 " Should return True if the execution completed successfully " 2162 self.fromScratch() 2163 2164 self.ns.submitJob( 'TEST', 'bla', affinity='a2' ) 2165 self.ns.submitJob( 'TEST', 'bla', affinity='a1' ) 2166 2167 ns_client = self.getNetScheduleService( 'TEST', 'scenario1603' ) 2168 ns_client.set_client_identification( 'node', 'session' ) 2169 2170 ex = False 2171 try: 2172 execAny( ns_client, 'GET2 wnode_aff=1 any_aff=0 ' 2173 'aff=a1,a2 prioritized_aff=1' ) 2174 #except Exception, exc: 2175 except Exception: 2176 ex = True 2177 #print str( exc ) 2178 2179 if ex == False: 2180 raise Exception( "Expected exception, got none (case 1)" ) 2181 2182 ex = False 2183 try: 2184 execAny( ns_client, 'GET2 wnode_aff=0 any_aff=1 ' 2185 'aff=a1,a2 prioritized_aff=1' ) 2186 #except Exception, exc: 2187 except Exception: 2188 ex = True 2189 #print str( exc ) 2190 2191 if ex == False: 2192 raise Exception( "Expected exception, got none (case 2)" ) 2193 2194 ex = False 2195 try: 2196 execAny( ns_client, 'GET2 wnode_aff=0 any_aff=0 exclusive_new_aff=1 ' 2197 'aff=a1,a2 prioritized_aff=1' ) 2198 #except Exception, exc: 2199 except Exception: 2200 ex = True 2201 #print str( exc ) 2202 2203 if ex == False: 2204 raise Exception( "Expected exception, got none (case 3)" ) 2205 2206 return True 2207 2208class Scenario1604( TestBase ): 2209 " Scenario 1604 " 2210 2211 def __init__( self, netschedule ): 2212 TestBase.__init__( self, netschedule ) 2213 2214 @staticmethod 2215 def getScenario(): 2216 " Provides the scenario " 2217 return "GET2 with prioritized_aff=1" 2218 2219 def execute( self ): 2220 " Should return True if the execution completed successfully " 2221 self.fromScratch() 2222 2223 ns_client = self.getNetScheduleService( 'TEST', 'scenario1604' ) 2224 ns_client.set_client_identification( 'node', 'session' ) 2225 2226 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 2227 notifSocket.bind( ( "", 0 ) ) 2228 notifPort = notifSocket.getsockname()[ 1 ] 2229 2230 execAny( ns_client, 2231 'GET2 wnode_aff=0 any_aff=0 exclusive_new_aff=0 aff=a1,a2 port=' + str( notifPort ) + ' timeout=3 prioritized_aff=1' ) 2232 2233 # Submit a job 2234 self.ns.submitJob( 'TEST', 'input', affinity='a2' ) 2235 2236 time.sleep( 3 ) 2237 result = self.getNotif( notifSocket ) 2238 notifSocket.close() 2239 if result != 1: 2240 raise Exception( "Expect notifications but received none" ) 2241 return True 2242 2243 def getNotif( self, s ): 2244 " Retrieves notifications " 2245 try: 2246 data = s.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 2247 if "queue=TEST" not in data: 2248 raise Exception( "Unexpected notification in socket" ) 2249 return 1 2250 except Exception as ex: 2251 if "Unexpected notification in socket" in str( ex ): 2252 raise 2253 pass 2254 return 0 2255 2256 2257class Scenario1605( TestBase ): 2258 " Scenario 1605 " 2259 2260 def __init__( self, netschedule ): 2261 TestBase.__init__( self, netschedule ) 2262 2263 @staticmethod 2264 def getScenario(): 2265 " Provides the scenario " 2266 return "SUBMIT with a2, SUBMIT with a1 " \ 2267 "READ2 a=a1,a2 prioritized_aff=0, " \ 2268 "READ2 a=a1,a2 prioritized_aff=1" 2269 2270 def execute( self ): 2271 " Should return True if the execution completed successfully " 2272 self.fromScratch() 2273 2274 jobIDa2 = self.ns.submitJob( 'TEST', 'bla', affinity='a2' ) 2275 jobIDa1 = self.ns.submitJob( 'TEST', 'bla', affinity='a1' ) 2276 2277 ns_client = self.getNetScheduleService( 'TEST', 'scenario1602' ) 2278 ns_client.set_client_identification( 'node', 'session' ) 2279 2280 jobID = self.ns.getJob( 'TEST' )[ 0 ] 2281 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 2282 jobID = self.ns.getJob( 'TEST' )[ 0 ] 2283 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 2284 2285 # READ2 test 2286 output = execAny( ns_client, 2287 'READ2 reader_aff=0 any_aff=0 ' 2288 'aff=a1,a2 prioritized_aff=0' ) 2289 values = parse_qs( output, True, True ) 2290 jobKey = values[ 'job_key' ][ 0 ] 2291 if jobKey != jobIDa2: 2292 raise Exception( "Unexpected job for executing. " 2293 "Expected with affinity a2" ) 2294 2295 execAny( ns_client, "RDRB job_key=" + jobKey + 2296 " auth_token=" + values[ 'auth_token' ][ 0 ] ) 2297 2298 ns_client1 = self.getNetScheduleService( 'TEST', 'scenario1602' ) 2299 ns_client1.set_client_identification( 'node1', 'session' ) 2300 2301 output = execAny( ns_client1, 2302 'READ2 reader_aff=0 any_aff=0 ' 2303 'aff=a1,a2 prioritized_aff=1' ) 2304 values = parse_qs( output, True, True ) 2305 jobKey = values[ 'job_key' ][ 0 ] 2306 if jobKey != jobIDa1: 2307 raise Exception( "Unexpected job for executing. " 2308 "Expected with affinity a1" ) 2309 output = execAny( ns_client1, 2310 'READ2 reader_aff=0 any_aff=0 ' 2311 'aff=a1,a2 prioritized_aff=0' ) 2312 values = parse_qs( output, True, True ) 2313 jobKey = values[ 'job_key' ][ 0 ] 2314 if jobKey != jobIDa2: 2315 raise Exception( "Unexpected job for executing. " 2316 "Expected with affinity a2 (second time)" ) 2317 return True 2318 2319 2320class Scenario1606( TestBase ): 2321 " Scenario 1606 " 2322 2323 def __init__( self, netschedule ): 2324 TestBase.__init__( self, netschedule ) 2325 2326 @staticmethod 2327 def getScenario(): 2328 " Provides the scenario " 2329 return "SUBMIT with a2, SUBMIT with a1 " \ 2330 "READ2 a=a1,a2 reader_aff=1 prioritized_aff=1 => ERR, " \ 2331 "READ2 a=a1,a2 any_aff=1 prioritized_aff=1 => ERR, " \ 2332 "READ2 a=a1,a2 exclusive_new_aff=1 prioritized_aff=1 => ERR" 2333 2334 def execute( self ): 2335 " Should return True if the execution completed successfully " 2336 self.fromScratch() 2337 2338 self.ns.submitJob( 'TEST', 'bla', affinity='a2' ) 2339 self.ns.submitJob( 'TEST', 'bla', affinity='a1' ) 2340 2341 ns_client = self.getNetScheduleService( 'TEST', 'scenario1606' ) 2342 ns_client.set_client_identification( 'node', 'session' ) 2343 2344 ex = False 2345 try: 2346 execAny( ns_client, 'READ2 reader_aff=1 any_aff=0 ' 2347 'aff=a1,a2 prioritized_aff=1' ) 2348 #except Exception, exc: 2349 except Exception: 2350 ex = True 2351 #print str( exc ) 2352 2353 if ex == False: 2354 raise Exception( "Expected exception, got none (case 1)" ) 2355 2356 ex = False 2357 try: 2358 execAny( ns_client, 'READ2 reader_aff=0 any_aff=1 ' 2359 'aff=a1,a2 prioritized_aff=1' ) 2360 #except Exception, exc: 2361 except Exception: 2362 ex = True 2363 #print str( exc ) 2364 2365 if ex == False: 2366 raise Exception( "Expected exception, got none (case 2)" ) 2367 2368 ex = False 2369 try: 2370 execAny( ns_client, 'READ2 reader_aff=0 any_aff=0 exclusive_new_aff=1 ' 2371 'aff=a1,a2 prioritized_aff=1' ) 2372 #except Exception, exc: 2373 except Exception: 2374 ex = True 2375 #print str( exc ) 2376 2377 if ex == False: 2378 raise Exception( "Expected exception, got none (case 3)" ) 2379 2380 return True 2381 2382 2383class Scenario1607( TestBase ): 2384 " Scenario 1607 " 2385 2386 def __init__( self, netschedule ): 2387 TestBase.__init__( self, netschedule ) 2388 2389 @staticmethod 2390 def getScenario(): 2391 " Provides the scenario " 2392 return "READ2 with prioritized_aff=1" 2393 2394 def execute( self ): 2395 " Should return True if the execution completed successfully " 2396 self.fromScratch() 2397 2398 ns_client = self.getNetScheduleService( 'TEST', 'scenario1607' ) 2399 ns_client.set_client_identification( 'node', 'session' ) 2400 2401 notifSocket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM ) 2402 notifSocket.bind( ( "", 0 ) ) 2403 notifPort = notifSocket.getsockname()[ 1 ] 2404 2405 execAny( ns_client, 2406 'READ2 reader_aff=0 any_aff=0 exclusive_new_aff=0 aff=a1,a2 port=' + str( notifPort ) + ' timeout=3 prioritized_aff=1' ) 2407 2408 # Submit a job 2409 jobID = self.ns.submitJob( 'TEST', 'input', affinity='a2' ) 2410 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 2411 2412 time.sleep( 3 ) 2413 result = self.getNotif( notifSocket ) 2414 notifSocket.close() 2415 if result != 1: 2416 raise Exception( "Expect notifications but received none" ) 2417 return True 2418 2419 def getNotif( self, s ): 2420 " Retrieves notifications " 2421 try: 2422 data = s.recv( 8192, socket.MSG_DONTWAIT ).decode('utf-8') 2423 if "queue=TEST" not in data: 2424 raise Exception( "Unexpected notification in socket" ) 2425 return 1 2426 except Exception as ex: 2427 if "Unexpected notification in socket" in str( ex ): 2428 raise 2429 pass 2430 return 0 2431 2432class Scenario1608( TestBase ): 2433 " Scenario1608 " 2434 2435 def __init__( self, netschedule ): 2436 TestBase.__init__( self, netschedule ) 2437 return 2438 2439 def report_warning( self, msg, server ): 2440 " Just ignore it " 2441 return 2442 2443 @staticmethod 2444 def getScenario(): 2445 " Provides the scenario " 2446 return "Checks RDRB blacklist=0|1" 2447 2448 def execute( self ): 2449 " Should return True if the execution completed successfully " 2450 self.fromScratch( 5 ) 2451 2452 ns_client = self.getNetScheduleService( 'TEST', 'scenario1608' ) 2453 ns_client.set_client_identification( 'node', 'session' ) 2454 ns_client.on_warning = self.report_warning 2455 2456 jobID1 = self.ns.submitJob( 'TEST', 'bla' ) 2457 jobID2 = self.ns.submitJob( 'TEST', 'bla' ) 2458 2459 jobID = self.ns.getJob( 'TEST' )[ 0 ] 2460 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 2461 jobID = self.ns.getJob( 'TEST' )[ 0 ] 2462 execAny( ns_client, "PUT " + jobID + " 0 nooutput" ) 2463 2464 2465 output = execAny( ns_client, 'READ2 reader_aff=0 any_aff=1' ) 2466 values = parse_qs( output, True, True ) 2467 jobKey = values[ 'job_key' ][ 0 ] 2468 if jobKey != jobID1: 2469 raise Exception( "Unexpected job for executing. " 2470 "Expected the first job" ) 2471 2472 execAny( ns_client, "RDRB job_key=" + jobKey + 2473 " auth_token=" + values[ 'auth_token' ][ 0 ] + 2474 " blacklist=0" ) 2475 output = execAny( ns_client, 'READ2 reader_aff=0 any_aff=1' ) 2476 values = parse_qs( output, True, True ) 2477 jobKey = values[ 'job_key' ][ 0 ] 2478 if jobKey != jobID1: 2479 raise Exception( "Unexpected job for executing. " 2480 "Expected the first job after RDRB blacklist=0" ) 2481 2482 execAny( ns_client, "RDRB job_key=" + jobKey + 2483 " auth_token=" + values[ 'auth_token' ][ 0 ] + 2484 " blacklist=1" ) 2485 output = execAny( ns_client, 'READ2 reader_aff=0 any_aff=1' ) 2486 values = parse_qs( output, True, True ) 2487 jobKey = values[ 'job_key' ][ 0 ] 2488 if jobKey != jobID2: 2489 raise Exception( "Unexpected job for executing. " 2490 "Expected the second job after RDRB blacklist=1" ) 2491 2492 return True 2493 2494