1## 2# .test.test_protocol 3## 4import sys 5import unittest 6import struct 7import decimal 8import socket 9import time 10from threading import Thread 11 12from ..protocol import element3 as e3 13from ..protocol import xact3 as x3 14from ..protocol import client3 as c3 15from ..protocol import buffer as pq_buf 16from ..python.socket import find_available_port, SocketFactory 17 18def pair(msg): 19 return (msg.type, msg.serialize()) 20def pairs(*msgseq): 21 return list(map(pair, msgseq)) 22 23long = struct.Struct("!L") 24packl = long.pack 25unpackl = long.unpack 26 27class test_buffer(unittest.TestCase): 28 def setUp(self): 29 self.buffer = pq_buf.pq_message_stream() 30 31 def testMultiByteMessage(self): 32 b = self.buffer 33 b.write(b's') 34 self.assertTrue(b.next_message() is None) 35 b.write(b'\x00\x00') 36 self.assertTrue(b.next_message() is None) 37 b.write(b'\x00\x10') 38 self.assertTrue(b.next_message() is None) 39 data = b'twelve_chars' 40 b.write(data) 41 self.assertEqual(b.next_message(), (b's', data)) 42 43 def testSingleByteMessage(self): 44 b = self.buffer 45 b.write(b's') 46 self.assertTrue(b.next_message() is None) 47 b.write(b'\x00') 48 self.assertTrue(b.next_message() is None) 49 b.write(b'\x00\x00\x05') 50 self.assertTrue(b.next_message() is None) 51 b.write(b'b') 52 self.assertEqual(b.next_message(), (b's', b'b')) 53 54 def testEmptyMessage(self): 55 b = self.buffer 56 b.write(b'x') 57 self.assertTrue(b.next_message() is None) 58 b.write(b'\x00\x00\x00') 59 self.assertTrue(b.next_message() is None) 60 b.write(b'\x04') 61 self.assertEqual(b.next_message(), (b'x', b'')) 62 63 def testInvalidLength(self): 64 b = self.buffer 65 b.write(b'y\x00\x00\x00\x03') 66 self.assertRaises(ValueError, b.next_message,) 67 68 def testRemainder(self): 69 b = self.buffer 70 b.write(b'r\x00\x00\x00\x05Aremainder') 71 self.assertEqual(b.next_message(), (b'r', b'A')) 72 73 def testLarge(self): 74 b = self.buffer 75 factor = 1024 76 r = 10000 77 b.write(b'X' + packl(factor * r + 4)) 78 segment = b'\x00' * factor 79 for x in range(r-1): 80 b.write(segment) 81 b.write(segment) 82 msg = b.next_message() 83 self.assertTrue(msg is not None) 84 self.assertEqual(msg[0], b'X') 85 86 def test_getvalue(self): 87 # Make sure that getvalue() only applies to messages 88 # that have not been read. 89 b = self.buffer 90 # It should be empty. 91 self.assertEqual(b.getvalue(), b'') 92 d = b'F' + packl(28) 93 b.write(d) 94 self.assertEqual(b.getvalue(), d) 95 d1 = b'01'*12 # 24 96 b.write(d1) 97 self.assertEqual(b.getvalue(), d + d1) 98 out = b.read()[0] 99 self.assertEqual(out, (b'F', d1)) 100 nd = b'N' 101 b.write(nd) 102 self.assertEqual(b.getvalue(), nd) 103 b.write(packl(4)) 104 self.assertEqual(list(b.read()), [(b'N', b'')]) 105 self.assertEqual(b.getvalue(), b'') 106 # partial; read one message to exercise 107 # that the appropriate fragment of the first 108 # chunk in the buffer is picked up. 109 first_body = (b'1234' * 3) 110 first = b'v' + packl(len(first_body) + 4) + first_body 111 second_body = (b'4321' * 5) 112 second = b'z' + packl(len(second_body) + 4) + second_body 113 b.write(first + second) 114 self.assertEqual(b.getvalue(), first + second) 115 self.assertEqual(list(b.read(1)), [(b'v', first_body)]) 116 self.assertEqual(b.getvalue(), second) 117 self.assertEqual(list(b.read(1)), [(b'z', second_body)]) 118 # now, with a third full message in the next chunk 119 third_body = (b'9876' * 10) 120 third = b'3' + packl(len(third_body) + 4) + third_body 121 b.write(first + second) 122 b.write(third) 123 self.assertEqual(b.getvalue(), first + second + third) 124 self.assertEqual(list(b.read(1)), [(b'v', first_body)]) 125 self.assertEqual(b.getvalue(), second + third) 126 self.assertEqual(list(b.read(1)), [(b'z', second_body)]) 127 self.assertEqual(b.getvalue(), third) 128 self.assertEqual(list(b.read(1)), [(b'3', third_body)]) 129 self.assertEqual(b.getvalue(), b'') 130 131## 132# element3 tests 133## 134 135message_samples = [ 136 e3.VoidMessage, 137 e3.Startup([ 138 (b'user', b'jwp'), 139 (b'database', b'template1'), 140 (b'options', b'-f'), 141 ]), 142 e3.Notice(( 143 (b'S', b'FATAL'), 144 (b'M', b'a descriptive message'), 145 (b'C', b'FIVEC'), 146 (b'D', b'bleh'), 147 (b'H', b'dont spit into the fan'), 148 )), 149 e3.Notify(123, b'wood_table'), 150 e3.KillInformation(19320, 589483), 151 e3.ShowOption(b'foo', b'bar'), 152 e3.Authentication(4, b'salt'), 153 e3.Complete(b'SELECT'), 154 e3.Ready(b'I'), 155 e3.CancelRequest(4123, 14252), 156 e3.NegotiateSSL(), 157 e3.Password(b'ckr4t'), 158 e3.AttributeTypes(()), 159 e3.AttributeTypes( 160 (123,) * 1 161 ), 162 e3.AttributeTypes( 163 (123,0) * 1 164 ), 165 e3.AttributeTypes( 166 (123,0) * 2 167 ), 168 e3.AttributeTypes( 169 (123,0) * 4 170 ), 171 e3.TupleDescriptor(()), 172 e3.TupleDescriptor(( 173 (b'name', 123, 1, 1, 0, 0, 1,), 174 )), 175 e3.TupleDescriptor(( 176 (b'name', 123, 1, 2, 0, 0, 1,), 177 ) * 2), 178 e3.TupleDescriptor(( 179 (b'name', 123, 1, 2, 1, 0, 1,), 180 ) * 3), 181 e3.TupleDescriptor(( 182 (b'name', 123, 1, 1, 0, 0, 1,), 183 ) * 1000), 184 e3.Tuple([]), 185 e3.Tuple([b'foo',]), 186 e3.Tuple([None]), 187 e3.Tuple([b'foo',b'bar']), 188 e3.Tuple([None, None]), 189 e3.Tuple([None, b'foo', None]), 190 e3.Tuple([b'bar', None, b'foo', None, b'bleh']), 191 e3.Tuple([b'foo', b'bar'] * 100), 192 e3.Tuple([None] * 100), 193 e3.Query(b'select * from u'), 194 e3.Parse(b'statement_id', b'query', (123, 0)), 195 e3.Parse(b'statement_id', b'query', (123,)), 196 e3.Parse(b'statement_id', b'query', ()), 197 e3.Bind(b'portal_id', b'statement_id', 198 (b'tt',b'\x00\x00'), 199 [b'data',None], (b'ff',b'xx')), 200 e3.Bind(b'portal_id', b'statement_id', (b'tt',), [None], (b'xx',)), 201 e3.Bind(b'portal_id', b'statement_id', (b'ff',), [b'data'], ()), 202 e3.Bind(b'portal_id', b'statement_id', (), [], (b'xx',)), 203 e3.Bind(b'portal_id', b'statement_id', (), [], ()), 204 e3.Execute(b'portal_id', 500), 205 e3.Execute(b'portal_id', 0), 206 e3.DescribeStatement(b'statement_id'), 207 e3.DescribePortal(b'portal_id'), 208 e3.CloseStatement(b'statement_id'), 209 e3.ClosePortal(b'portal_id'), 210 e3.Function(123, (), [], b'xx'), 211 e3.Function(321, (b'tt',), [b'foo'], b'xx'), 212 e3.Function(321, (b'tt',), [None], b'xx'), 213 e3.Function(321, (b'aa', b'aa'), [None,b'a' * 200], b'xx'), 214 e3.FunctionResult(b''), 215 e3.FunctionResult(b'foobar'), 216 e3.FunctionResult(None), 217 e3.CopyToBegin(123, [321,123]), 218 e3.CopyToBegin(0, [10,]), 219 e3.CopyToBegin(123, []), 220 e3.CopyFromBegin(123, [321,123]), 221 e3.CopyFromBegin(0, [10]), 222 e3.CopyFromBegin(123, []), 223 e3.CopyData(b''), 224 e3.CopyData(b'foo'), 225 e3.CopyData(b'a' * 2048), 226 e3.CopyFail(b''), 227 e3.CopyFail(b'iiieeeeee!'), 228] 229 230class test_element3(unittest.TestCase): 231 def test_cat_messages(self): 232 # The optimized implementation will identify adjacent copy data, and 233 # take a more efficient route; so rigorously test the switch between the 234 # two modes. 235 self.assertEqual(e3.cat_messages([]), b'') 236 self.assertEqual(e3.cat_messages([b'foo']), b'd\x00\x00\x00\x07foo') 237 self.assertEqual(e3.cat_messages([b'foo', b'foo']), 2*b'd\x00\x00\x00\x07foo') 238 # copy, other, copy 239 self.assertEqual(e3.cat_messages([b'foo', e3.SynchronizeMessage, b'foo']), 240 b'd\x00\x00\x00\x07foo' + e3.SynchronizeMessage.bytes() + b'd\x00\x00\x00\x07foo') 241 # copy, other, copy*1000 242 self.assertEqual(e3.cat_messages(1000*[b'foo', e3.SynchronizeMessage, b'foo']), 243 1000*(b'd\x00\x00\x00\x07foo' + e3.SynchronizeMessage.bytes() + b'd\x00\x00\x00\x07foo')) 244 # other, copy, copy*1000 245 self.assertEqual(e3.cat_messages(1000*[e3.SynchronizeMessage, b'foo', b'foo']), 246 1000*(e3.SynchronizeMessage.bytes() + 2*b'd\x00\x00\x00\x07foo')) 247 pack_head = struct.Struct("!lH").pack 248 # tuple 249 self.assertEqual(e3.cat_messages([(b'foo',),]), 250 b'D' + pack_head(7 + 4 + 2, 1) + b'\x00\x00\x00\x03foo') 251 # tuple(foo,\N) 252 self.assertEqual(e3.cat_messages([(b'foo',None,),]), 253 b'D' + pack_head(7 + 4 + 4 + 2, 2) + b'\x00\x00\x00\x03foo\xFF\xFF\xFF\xFF') 254 # tuple(foo,\N,bar) 255 self.assertEqual(e3.cat_messages([(b'foo',None,b'bar'),]), 256 b'D' + pack_head(7 + 7 + 4 + 4 + 2, 3) + \ 257 b'\x00\x00\x00\x03foo\xFF\xFF\xFF\xFF\x00\x00\x00\x03bar') 258 # too many attributes 259 self.assertRaises((OverflowError, struct.error), 260 e3.cat_messages, [(None,) * 0x10000]) 261 262 class ThisEx(Exception): 263 pass 264 class ThatEx(Exception): 265 pass 266 class Bad(e3.Message): 267 def serialize(self): 268 raise ThisEx('foo') 269 self.assertRaises(ThisEx, e3.cat_messages, [Bad()]) 270 class NoType(e3.Message): 271 def serialize(self): 272 return b'' 273 self.assertRaises(AttributeError, e3.cat_messages, [NoType()]) 274 class BadType(e3.Message): 275 type = 123 276 def serialize(self): 277 return b'' 278 self.assertRaises((TypeError,struct.error), e3.cat_messages, [BadType()]) 279 280 281 def testSerializeParseConsistency(self): 282 for msg in message_samples: 283 smsg = msg.serialize() 284 self.assertEqual(msg, msg.parse(smsg)) 285 286 def testEmptyMessages(self): 287 for x in e3.__dict__.values(): 288 if isinstance(x, e3.EmptyMessage): 289 xtype = type(x) 290 self.assertTrue(x is xtype()) 291 292 def testUnknownNoticeFields(self): 293 N = e3.Notice.parse(b'\x00\x00Z\x00Xklsvdnvldsvkndvlsn\x00Pfoobar\x00Mmessage\x00') 294 E = e3.Error.parse(b'Z\x00Xklsvdnvldsvkndvlsn\x00Pfoobar\x00Mmessage\x00\x00') 295 self.assertEqual(N[b'M'], b'message') 296 self.assertEqual(E[b'M'], b'message') 297 self.assertEqual(N[b'P'], b'foobar') 298 self.assertEqual(E[b'P'], b'foobar') 299 self.assertEqual(len(N), 4) 300 self.assertEqual(len(E), 4) 301 302 def testCompleteExtracts(self): 303 x = e3.Complete(b'FOO BAR 1321') 304 self.assertEqual(x.extract_command(), b'FOO BAR') 305 self.assertEqual(x.extract_count(), 1321) 306 x = e3.Complete(b' CREATE TABLE 13210 ') 307 self.assertEqual(x.extract_command(), b'CREATE TABLE') 308 self.assertEqual(x.extract_count(), 13210) 309 x = e3.Complete(b' CREATE TABLE \t713210 ') 310 self.assertEqual(x.extract_command(), b'CREATE TABLE') 311 self.assertEqual(x.extract_count(), 713210) 312 x = e3.Complete(b' CREATE TABLE 0 \t13210 ') 313 self.assertEqual(x.extract_command(), b'CREATE TABLE') 314 self.assertEqual(x.extract_count(), 13210) 315 x = e3.Complete(b' 0 \t13210 ') 316 self.assertEqual(x.extract_command(), None) 317 self.assertEqual(x.extract_count(), 13210) 318 319## 320# .protocol.xact3 tests 321## 322 323xact_samples = [ 324 # Simple contrived exchange. 325 ( 326 ( 327 e3.Query(b"COMPLETE"), 328 ), ( 329 e3.Complete(b'COMPLETE'), 330 e3.Ready(b'I'), 331 ) 332 ), 333 ( 334 ( 335 e3.Query(b"ROW DATA"), 336 ), ( 337 e3.TupleDescriptor(( 338 (b'foo', 1, 1, 1, 1, 1, 1), 339 (b'bar', 1, 2, 1, 1, 1, 1), 340 )), 341 e3.Tuple((b'lame', b'lame')), 342 e3.Complete(b'COMPLETE'), 343 e3.Ready(b'I'), 344 ) 345 ), 346 ( 347 ( 348 e3.Query(b"ROW DATA"), 349 ), ( 350 e3.TupleDescriptor(( 351 (b'foo', 1, 1, 1, 1, 1, 1), 352 (b'bar', 1, 2, 1, 1, 1, 1), 353 )), 354 e3.Tuple((b'lame', b'lame')), 355 e3.Tuple((b'lame', b'lame')), 356 e3.Tuple((b'lame', b'lame')), 357 e3.Tuple((b'lame', b'lame')), 358 e3.Ready(b'I'), 359 ) 360 ), 361 ( 362 ( 363 e3.Query(b"NULL"), 364 ), ( 365 e3.Null(), 366 e3.Ready(b'I'), 367 ) 368 ), 369 ( 370 ( 371 e3.Query(b"COPY TO"), 372 ), ( 373 e3.CopyToBegin(1, [1,2]), 374 e3.CopyData(b'row1'), 375 e3.CopyData(b'row2'), 376 e3.CopyDone(), 377 e3.Complete(b'COPY TO'), 378 e3.Ready(b'I'), 379 ) 380 ), 381 ( 382 ( 383 e3.Function(1, [b''], [b''], 1), 384 ), ( 385 e3.FunctionResult(b'foo'), 386 e3.Ready(b'I'), 387 ) 388 ), 389 ( 390 ( 391 e3.Parse(b"NAME", b"SQL", ()), 392 ), ( 393 e3.ParseComplete(), 394 ) 395 ), 396 ( 397 ( 398 e3.Bind(b"NAME", b"STATEMENT_ID", (), (), ()), 399 ), ( 400 e3.BindComplete(), 401 ) 402 ), 403 ( 404 ( 405 e3.Parse(b"NAME", b"SQL", ()), 406 e3.Bind(b"NAME", b"STATEMENT_ID", (), (), ()), 407 ), ( 408 e3.ParseComplete(), 409 e3.BindComplete(), 410 ) 411 ), 412 ( 413 ( 414 e3.Describe(b"STATEMENT_ID"), 415 ), ( 416 e3.AttributeTypes(()), 417 e3.NoData(), 418 ) 419 ), 420 ( 421 ( 422 e3.Describe(b"STATEMENT_ID"), 423 ), ( 424 e3.AttributeTypes(()), 425 e3.TupleDescriptor(()), 426 ) 427 ), 428 ( 429 ( 430 e3.CloseStatement(b"foo"), 431 ), ( 432 e3.CloseComplete(), 433 ), 434 ), 435 ( 436 ( 437 e3.ClosePortal(b"foo"), 438 ), ( 439 e3.CloseComplete(), 440 ), 441 ), 442 ( 443 ( 444 e3.Synchronize(), 445 ), ( 446 e3.Ready(b'I'), 447 ), 448 ), 449] 450 451class test_xact3(unittest.TestCase): 452 def testTransactionSamplesAll(self): 453 for xcmd, xres in xact_samples: 454 x = x3.Instruction(xcmd) 455 r = tuple([(y.type, y.serialize()) for y in xres]) 456 x.state[1]() 457 self.assertEqual(x.messages, ()) 458 x.state[1](r) 459 self.assertEqual(x.state, x3.Complete) 460 rec = [] 461 for y in x.completed: 462 for z in y[1]: 463 if type(z) is type(b''): 464 z = e3.CopyData(z) 465 rec.append(z) 466 self.assertEqual(xres, tuple(rec)) 467 468 def testClosing(self): 469 c = x3.Closing() 470 self.assertEqual(c.messages, (e3.DisconnectMessage,)) 471 c.state[1]() 472 self.assertEqual(c.fatal, True) 473 self.assertEqual(c.error_message.__class__, e3.ClientError) 474 self.assertEqual(c.error_message[b'C'], '08003') 475 476 def testNegotiation(self): 477 # simple successful run 478 n = x3.Negotiation({}, b'') 479 n.state[1]() 480 n.state[1]( 481 pairs( 482 e3.Notice(((b'M', b"foobar"),)), 483 e3.Authentication(e3.AuthRequest_OK, b''), 484 e3.KillInformation(0,0), 485 e3.ShowOption(b'name', b'val'), 486 e3.Ready(b'I'), 487 ) 488 ) 489 self.assertEqual(n.state, x3.Complete) 490 self.assertEqual(n.last_ready.xact_state, b'I') 491 # no killinfo.. should cause protocol error... 492 n = x3.Negotiation({}, b'') 493 n.state[1]() 494 n.state[1]( 495 pairs( 496 e3.Notice(((b'M', b"foobar"),)), 497 e3.Authentication(e3.AuthRequest_OK, b''), 498 e3.ShowOption(b'name', b'val'), 499 e3.Ready(b'I'), 500 ) 501 ) 502 self.assertEqual(n.state, x3.Complete) 503 self.assertEqual(n.last_ready, None) 504 self.assertEqual(n.error_message[b'C'], '08P01') 505 # killinfo twice.. must cause protocol error... 506 n = x3.Negotiation({}, b'') 507 n.state[1]() 508 n.state[1]( 509 pairs( 510 e3.Notice(((b'M', b"foobar"),)), 511 e3.Authentication(e3.AuthRequest_OK, b''), 512 e3.ShowOption(b'name', b'val'), 513 e3.KillInformation(0,0), 514 e3.KillInformation(0,0), 515 e3.Ready(b'I'), 516 ) 517 ) 518 self.assertEqual(n.state, x3.Complete) 519 self.assertEqual(n.last_ready, None) 520 self.assertEqual(n.error_message[b'C'], '08P01') 521 # start with ready message.. 522 n = x3.Negotiation({}, b'') 523 n.state[1]() 524 n.state[1]( 525 pairs( 526 e3.Notice(((b'M', b"foobar"),)), 527 e3.Ready(b'I'), 528 e3.Authentication(e3.AuthRequest_OK, b''), 529 e3.ShowOption(b'name', b'val'), 530 ) 531 ) 532 self.assertEqual(n.state, x3.Complete) 533 self.assertEqual(n.last_ready, None) 534 self.assertEqual(n.error_message[b'C'], '08P01') 535 # unsupported authreq 536 n = x3.Negotiation({}, b'') 537 n.state[1]() 538 n.state[1]( 539 pairs( 540 e3.Authentication(255, b''), 541 ) 542 ) 543 self.assertEqual(n.state, x3.Complete) 544 self.assertEqual(n.last_ready, None) 545 self.assertEqual(n.error_message[b'C'], '--AUT') 546 547 def testInstructionAsynchook(self): 548 l = [] 549 def hook(data): 550 l.append(data) 551 x = x3.Instruction([ 552 e3.Query(b"NOTHING") 553 ], asynchook = hook) 554 a1 = e3.Notice(((b'M', b"m1"),)) 555 a2 = e3.Notify(0, b'relation', b'parameter') 556 a3 = e3.ShowOption(b'optname', b'optval') 557 # "send" the query message 558 x.state[1]() 559 # "receive" the tuple 560 x.state[1]([(a1.type, a1.serialize()),]) 561 a2l = [(a2.type, a2.serialize()),] 562 x.state[1](a2l) 563 # validate that the hook is not fed twice because 564 # it's the exact same message set. (later assertion will validate) 565 x.state[1](a2l) 566 x.state[1]([(a3.type, a3.serialize()),]) 567 # we only care about validating that l got everything. 568 self.assertEqual([a1,a2,a3], l) 569 self.assertEqual(x.state[0], x3.Receiving) 570 # validate that the asynchook exception is trapped. 571 class Nee(Exception): 572 pass 573 def ehook(msg): 574 raise Nee("this should **not** be part of the summary") 575 x = x3.Instruction([ 576 e3.Query(b"NOTHING") 577 ], asynchook = ehook) 578 a1 = e3.Notice(((b'M', b"m1"),)) 579 x.state[1]() 580 import sys 581 v = None 582 def exchook(typ, val, tb): 583 nonlocal v 584 v = val 585 seh = sys.excepthook 586 sys.excepthook = exchook 587 # we only care about validating that the exchook got called. 588 x.state[1]([(a1.type, a1.serialize())]) 589 sys.excepthook = seh 590 self.assertTrue(isinstance(v, Nee)) 591 592class test_client3(unittest.TestCase): 593 def test_timeout(self): 594 portnum = find_available_port() 595 servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 596 with servsock: 597 servsock.bind(('localhost', portnum)) 598 pc = c3.Connection( 599 SocketFactory( 600 (socket.AF_INET, socket.SOCK_STREAM), 601 ('localhost', portnum) 602 ), 603 {} 604 ) 605 pc.connect(timeout = 1) 606 try: 607 self.assertEqual(pc.xact.fatal, True) 608 self.assertEqual(pc.xact.__class__, x3.Negotiation) 609 finally: 610 if pc.socket is not None: 611 pc.socket.close() 612 613 def test_SSL_failure(self): 614 portnum = find_available_port() 615 servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 616 with servsock: 617 servsock.bind(('localhost', portnum)) 618 pc = c3.Connection( 619 SocketFactory( 620 (socket.AF_INET, socket.SOCK_STREAM), 621 ('localhost', portnum) 622 ), 623 {} 624 ) 625 exc = None 626 servsock.listen(1) 627 def client_thread(): 628 pc.connect(ssl = True) 629 client = Thread(target = client_thread) 630 try: 631 client.start() 632 c, addr = servsock.accept() 633 with c: 634 c.send(b'S') 635 c.sendall(b'0000000000000000000000') 636 c.recv(1024) 637 c.close() 638 client.join() 639 finally: 640 if pc.socket is not None: 641 pc.socket.close() 642 643 self.assertEqual(pc.xact.fatal, True) 644 self.assertEqual(pc.xact.__class__, x3.Negotiation) 645 self.assertEqual(pc.xact.error_message.__class__, e3.ClientError) 646 self.assertTrue(hasattr(pc.xact, 'exception')) 647 648 def test_bad_negotiation(self): 649 portnum = find_available_port() 650 servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 651 servsock.bind(('localhost', portnum)) 652 pc = c3.Connection( 653 SocketFactory( 654 (socket.AF_INET, socket.SOCK_STREAM), 655 ('localhost', portnum) 656 ), 657 {} 658 ) 659 exc = None 660 servsock.listen(1) 661 def client_thread(): 662 pc.connect() 663 client = Thread(target = client_thread) 664 try: 665 client.start() 666 c, addr = servsock.accept() 667 try: 668 c.recv(1024) 669 finally: 670 c.close() 671 time.sleep(0.25) 672 client.join() 673 servsock.close() 674 self.assertEqual(pc.xact.fatal, True) 675 self.assertEqual(pc.xact.__class__, x3.Negotiation) 676 self.assertEqual(pc.xact.error_message.__class__, e3.ClientError) 677 self.assertEqual(pc.xact.error_message[b'C'], '08006') 678 finally: 679 servsock.close() 680 if pc.socket is not None: 681 pc.socket.close() 682 683if __name__ == '__main__': 684 from types import ModuleType 685 this = ModuleType("this") 686 this.__dict__.update(globals()) 687 try: 688 unittest.main(this) 689 finally: 690 import gc 691 gc.collect() 692