1##
2# .test.test_protocol
3##
4import sys
5import unittest
6import struct
7import decimal
8import socket
9import time
10from threading import Thread
11
12from ..protocol import element3 as e3
13from ..protocol import xact3 as x3
14from ..protocol import client3 as c3
15from ..protocol import buffer as pq_buf
16from ..python.socket import find_available_port, SocketFactory
17
18def pair(msg):
19	return (msg.type, msg.serialize())
20def pairs(*msgseq):
21	return list(map(pair, msgseq))
22
23long = struct.Struct("!L")
24packl = long.pack
25unpackl = long.unpack
26
27class test_buffer(unittest.TestCase):
28	def setUp(self):
29		self.buffer = pq_buf.pq_message_stream()
30
31	def testMultiByteMessage(self):
32		b = self.buffer
33		b.write(b's')
34		self.assertTrue(b.next_message() is None)
35		b.write(b'\x00\x00')
36		self.assertTrue(b.next_message() is None)
37		b.write(b'\x00\x10')
38		self.assertTrue(b.next_message() is None)
39		data = b'twelve_chars'
40		b.write(data)
41		self.assertEqual(b.next_message(), (b's', data))
42
43	def testSingleByteMessage(self):
44		b = self.buffer
45		b.write(b's')
46		self.assertTrue(b.next_message() is None)
47		b.write(b'\x00')
48		self.assertTrue(b.next_message() is None)
49		b.write(b'\x00\x00\x05')
50		self.assertTrue(b.next_message() is None)
51		b.write(b'b')
52		self.assertEqual(b.next_message(), (b's', b'b'))
53
54	def testEmptyMessage(self):
55		b = self.buffer
56		b.write(b'x')
57		self.assertTrue(b.next_message() is None)
58		b.write(b'\x00\x00\x00')
59		self.assertTrue(b.next_message() is None)
60		b.write(b'\x04')
61		self.assertEqual(b.next_message(), (b'x', b''))
62
63	def testInvalidLength(self):
64		b = self.buffer
65		b.write(b'y\x00\x00\x00\x03')
66		self.assertRaises(ValueError, b.next_message,)
67
68	def testRemainder(self):
69		b = self.buffer
70		b.write(b'r\x00\x00\x00\x05Aremainder')
71		self.assertEqual(b.next_message(), (b'r', b'A'))
72
73	def testLarge(self):
74		b = self.buffer
75		factor = 1024
76		r = 10000
77		b.write(b'X' + packl(factor * r + 4))
78		segment = b'\x00' * factor
79		for x in range(r-1):
80			b.write(segment)
81		b.write(segment)
82		msg = b.next_message()
83		self.assertTrue(msg is not None)
84		self.assertEqual(msg[0], b'X')
85
86	def test_getvalue(self):
87		# Make sure that getvalue() only applies to messages
88		# that have not been read.
89		b = self.buffer
90		# It should be empty.
91		self.assertEqual(b.getvalue(), b'')
92		d = b'F' + packl(28)
93		b.write(d)
94		self.assertEqual(b.getvalue(), d)
95		d1 = b'01'*12 # 24
96		b.write(d1)
97		self.assertEqual(b.getvalue(), d + d1)
98		out = b.read()[0]
99		self.assertEqual(out, (b'F', d1))
100		nd = b'N'
101		b.write(nd)
102		self.assertEqual(b.getvalue(), nd)
103		b.write(packl(4))
104		self.assertEqual(list(b.read()), [(b'N', b'')])
105		self.assertEqual(b.getvalue(), b'')
106		# partial; read one message to exercise
107		# that the appropriate fragment of the first
108		# chunk in the buffer is picked up.
109		first_body = (b'1234' * 3)
110		first = b'v' + packl(len(first_body) + 4) + first_body
111		second_body = (b'4321' * 5)
112		second = b'z' + packl(len(second_body) + 4) + second_body
113		b.write(first + second)
114		self.assertEqual(b.getvalue(), first + second)
115		self.assertEqual(list(b.read(1)), [(b'v', first_body)])
116		self.assertEqual(b.getvalue(), second)
117		self.assertEqual(list(b.read(1)), [(b'z', second_body)])
118		# now, with a third full message in the next chunk
119		third_body = (b'9876' * 10)
120		third = b'3' + packl(len(third_body) + 4) + third_body
121		b.write(first + second)
122		b.write(third)
123		self.assertEqual(b.getvalue(), first + second + third)
124		self.assertEqual(list(b.read(1)), [(b'v', first_body)])
125		self.assertEqual(b.getvalue(), second + third)
126		self.assertEqual(list(b.read(1)), [(b'z', second_body)])
127		self.assertEqual(b.getvalue(), third)
128		self.assertEqual(list(b.read(1)), [(b'3', third_body)])
129		self.assertEqual(b.getvalue(), b'')
130
131##
132# element3 tests
133##
134
135message_samples = [
136	e3.VoidMessage,
137	e3.Startup([
138		(b'user', b'jwp'),
139		(b'database', b'template1'),
140		(b'options', b'-f'),
141	]),
142	e3.Notice((
143		(b'S', b'FATAL'),
144		(b'M', b'a descriptive message'),
145		(b'C', b'FIVEC'),
146		(b'D', b'bleh'),
147		(b'H', b'dont spit into the fan'),
148	)),
149	e3.Notify(123, b'wood_table'),
150	e3.KillInformation(19320, 589483),
151	e3.ShowOption(b'foo', b'bar'),
152	e3.Authentication(4, b'salt'),
153	e3.Complete(b'SELECT'),
154	e3.Ready(b'I'),
155	e3.CancelRequest(4123, 14252),
156	e3.NegotiateSSL(),
157	e3.Password(b'ckr4t'),
158	e3.AttributeTypes(()),
159	e3.AttributeTypes(
160		(123,) * 1
161	),
162	e3.AttributeTypes(
163		(123,0) * 1
164	),
165	e3.AttributeTypes(
166		(123,0) * 2
167	),
168	e3.AttributeTypes(
169		(123,0) * 4
170	),
171	e3.TupleDescriptor(()),
172	e3.TupleDescriptor((
173		(b'name', 123, 1, 1, 0, 0, 1,),
174	)),
175	e3.TupleDescriptor((
176		(b'name', 123, 1, 2, 0, 0, 1,),
177	) * 2),
178	e3.TupleDescriptor((
179		(b'name', 123, 1, 2, 1, 0, 1,),
180	) * 3),
181	e3.TupleDescriptor((
182		(b'name', 123, 1, 1, 0, 0, 1,),
183	) * 1000),
184	e3.Tuple([]),
185	e3.Tuple([b'foo',]),
186	e3.Tuple([None]),
187	e3.Tuple([b'foo',b'bar']),
188	e3.Tuple([None, None]),
189	e3.Tuple([None, b'foo', None]),
190	e3.Tuple([b'bar', None, b'foo', None, b'bleh']),
191	e3.Tuple([b'foo', b'bar'] * 100),
192	e3.Tuple([None] * 100),
193	e3.Query(b'select * from u'),
194	e3.Parse(b'statement_id', b'query', (123, 0)),
195	e3.Parse(b'statement_id', b'query', (123,)),
196	e3.Parse(b'statement_id', b'query', ()),
197	e3.Bind(b'portal_id', b'statement_id',
198		(b'tt',b'\x00\x00'),
199		[b'data',None], (b'ff',b'xx')),
200	e3.Bind(b'portal_id', b'statement_id', (b'tt',), [None], (b'xx',)),
201	e3.Bind(b'portal_id', b'statement_id', (b'ff',), [b'data'], ()),
202	e3.Bind(b'portal_id', b'statement_id', (), [], (b'xx',)),
203	e3.Bind(b'portal_id', b'statement_id', (), [], ()),
204	e3.Execute(b'portal_id', 500),
205	e3.Execute(b'portal_id', 0),
206	e3.DescribeStatement(b'statement_id'),
207	e3.DescribePortal(b'portal_id'),
208	e3.CloseStatement(b'statement_id'),
209	e3.ClosePortal(b'portal_id'),
210	e3.Function(123, (), [], b'xx'),
211	e3.Function(321, (b'tt',), [b'foo'], b'xx'),
212	e3.Function(321, (b'tt',), [None], b'xx'),
213	e3.Function(321, (b'aa', b'aa'), [None,b'a' * 200], b'xx'),
214	e3.FunctionResult(b''),
215	e3.FunctionResult(b'foobar'),
216	e3.FunctionResult(None),
217	e3.CopyToBegin(123, [321,123]),
218	e3.CopyToBegin(0, [10,]),
219	e3.CopyToBegin(123, []),
220	e3.CopyFromBegin(123, [321,123]),
221	e3.CopyFromBegin(0, [10]),
222	e3.CopyFromBegin(123, []),
223	e3.CopyData(b''),
224	e3.CopyData(b'foo'),
225	e3.CopyData(b'a' * 2048),
226	e3.CopyFail(b''),
227	e3.CopyFail(b'iiieeeeee!'),
228]
229
230class test_element3(unittest.TestCase):
231	def test_cat_messages(self):
232		# The optimized implementation will identify adjacent copy data, and
233		# take a more efficient route; so rigorously test the switch between the
234		# two modes.
235		self.assertEqual(e3.cat_messages([]), b'')
236		self.assertEqual(e3.cat_messages([b'foo']), b'd\x00\x00\x00\x07foo')
237		self.assertEqual(e3.cat_messages([b'foo', b'foo']), 2*b'd\x00\x00\x00\x07foo')
238		# copy, other, copy
239		self.assertEqual(e3.cat_messages([b'foo', e3.SynchronizeMessage, b'foo']),
240			b'd\x00\x00\x00\x07foo' + e3.SynchronizeMessage.bytes() + b'd\x00\x00\x00\x07foo')
241		# copy, other, copy*1000
242		self.assertEqual(e3.cat_messages(1000*[b'foo', e3.SynchronizeMessage, b'foo']),
243			1000*(b'd\x00\x00\x00\x07foo' + e3.SynchronizeMessage.bytes() + b'd\x00\x00\x00\x07foo'))
244		# other, copy, copy*1000
245		self.assertEqual(e3.cat_messages(1000*[e3.SynchronizeMessage, b'foo', b'foo']),
246			1000*(e3.SynchronizeMessage.bytes() + 2*b'd\x00\x00\x00\x07foo'))
247		pack_head = struct.Struct("!lH").pack
248		# tuple
249		self.assertEqual(e3.cat_messages([(b'foo',),]),
250			b'D' + pack_head(7 + 4 + 2, 1) + b'\x00\x00\x00\x03foo')
251		# tuple(foo,\N)
252		self.assertEqual(e3.cat_messages([(b'foo',None,),]),
253			b'D' + pack_head(7 + 4 + 4 + 2, 2) + b'\x00\x00\x00\x03foo\xFF\xFF\xFF\xFF')
254		# tuple(foo,\N,bar)
255		self.assertEqual(e3.cat_messages([(b'foo',None,b'bar'),]),
256			b'D' + pack_head(7 + 7 + 4 + 4 + 2, 3) + \
257			b'\x00\x00\x00\x03foo\xFF\xFF\xFF\xFF\x00\x00\x00\x03bar')
258		# too many attributes
259		self.assertRaises((OverflowError, struct.error),
260			e3.cat_messages, [(None,) * 0x10000])
261
262		class ThisEx(Exception):
263			pass
264		class ThatEx(Exception):
265			pass
266		class Bad(e3.Message):
267			def serialize(self):
268				raise ThisEx('foo')
269		self.assertRaises(ThisEx, e3.cat_messages, [Bad()])
270		class NoType(e3.Message):
271			def serialize(self):
272				return b''
273		self.assertRaises(AttributeError, e3.cat_messages, [NoType()])
274		class BadType(e3.Message):
275			type = 123
276			def serialize(self):
277				return b''
278		self.assertRaises((TypeError,struct.error), e3.cat_messages, [BadType()])
279
280
281	def testSerializeParseConsistency(self):
282		for msg in message_samples:
283			smsg = msg.serialize()
284			self.assertEqual(msg, msg.parse(smsg))
285
286	def testEmptyMessages(self):
287		for x in e3.__dict__.values():
288			if isinstance(x, e3.EmptyMessage):
289				xtype = type(x)
290				self.assertTrue(x is xtype())
291
292	def testUnknownNoticeFields(self):
293		N = e3.Notice.parse(b'\x00\x00Z\x00Xklsvdnvldsvkndvlsn\x00Pfoobar\x00Mmessage\x00')
294		E = e3.Error.parse(b'Z\x00Xklsvdnvldsvkndvlsn\x00Pfoobar\x00Mmessage\x00\x00')
295		self.assertEqual(N[b'M'], b'message')
296		self.assertEqual(E[b'M'], b'message')
297		self.assertEqual(N[b'P'], b'foobar')
298		self.assertEqual(E[b'P'], b'foobar')
299		self.assertEqual(len(N), 4)
300		self.assertEqual(len(E), 4)
301
302	def testCompleteExtracts(self):
303		x = e3.Complete(b'FOO BAR 1321')
304		self.assertEqual(x.extract_command(), b'FOO BAR')
305		self.assertEqual(x.extract_count(), 1321)
306		x = e3.Complete(b' CREATE  	TABLE 13210  ')
307		self.assertEqual(x.extract_command(), b'CREATE  	TABLE')
308		self.assertEqual(x.extract_count(), 13210)
309		x = e3.Complete(b'  CREATE  	TABLE  \t713210  ')
310		self.assertEqual(x.extract_command(), b'CREATE  	TABLE')
311		self.assertEqual(x.extract_count(), 713210)
312		x = e3.Complete(b'  CREATE  	TABLE  0 \t13210  ')
313		self.assertEqual(x.extract_command(), b'CREATE  	TABLE')
314		self.assertEqual(x.extract_count(), 13210)
315		x = e3.Complete(b' 0 \t13210 ')
316		self.assertEqual(x.extract_command(), None)
317		self.assertEqual(x.extract_count(), 13210)
318
319##
320# .protocol.xact3 tests
321##
322
323xact_samples = [
324	# Simple contrived exchange.
325	(
326		(
327			e3.Query(b"COMPLETE"),
328		), (
329			e3.Complete(b'COMPLETE'),
330			e3.Ready(b'I'),
331		)
332	),
333	(
334		(
335			e3.Query(b"ROW DATA"),
336		), (
337			e3.TupleDescriptor((
338				(b'foo', 1, 1, 1, 1, 1, 1),
339				(b'bar', 1, 2, 1, 1, 1, 1),
340			)),
341			e3.Tuple((b'lame', b'lame')),
342			e3.Complete(b'COMPLETE'),
343			e3.Ready(b'I'),
344		)
345	),
346	(
347		(
348			e3.Query(b"ROW DATA"),
349		), (
350			e3.TupleDescriptor((
351				(b'foo', 1, 1, 1, 1, 1, 1),
352				(b'bar', 1, 2, 1, 1, 1, 1),
353			)),
354			e3.Tuple((b'lame', b'lame')),
355			e3.Tuple((b'lame', b'lame')),
356			e3.Tuple((b'lame', b'lame')),
357			e3.Tuple((b'lame', b'lame')),
358			e3.Ready(b'I'),
359		)
360	),
361	(
362		(
363			e3.Query(b"NULL"),
364		), (
365			e3.Null(),
366			e3.Ready(b'I'),
367		)
368	),
369	(
370		(
371			e3.Query(b"COPY TO"),
372		), (
373			e3.CopyToBegin(1, [1,2]),
374			e3.CopyData(b'row1'),
375			e3.CopyData(b'row2'),
376			e3.CopyDone(),
377			e3.Complete(b'COPY TO'),
378			e3.Ready(b'I'),
379		)
380	),
381	(
382		(
383			e3.Function(1, [b''], [b''], 1),
384		), (
385			e3.FunctionResult(b'foo'),
386			e3.Ready(b'I'),
387		)
388	),
389	(
390		(
391			e3.Parse(b"NAME", b"SQL", ()),
392		), (
393			e3.ParseComplete(),
394		)
395	),
396	(
397		(
398			e3.Bind(b"NAME", b"STATEMENT_ID", (), (), ()),
399		), (
400			e3.BindComplete(),
401		)
402	),
403	(
404		(
405			e3.Parse(b"NAME", b"SQL", ()),
406			e3.Bind(b"NAME", b"STATEMENT_ID", (), (), ()),
407		), (
408			e3.ParseComplete(),
409			e3.BindComplete(),
410		)
411	),
412	(
413		(
414			e3.Describe(b"STATEMENT_ID"),
415		), (
416			e3.AttributeTypes(()),
417			e3.NoData(),
418		)
419	),
420	(
421		(
422			e3.Describe(b"STATEMENT_ID"),
423		), (
424			e3.AttributeTypes(()),
425			e3.TupleDescriptor(()),
426		)
427	),
428	(
429		(
430			e3.CloseStatement(b"foo"),
431		), (
432			e3.CloseComplete(),
433		),
434	),
435	(
436		(
437			e3.ClosePortal(b"foo"),
438		), (
439			e3.CloseComplete(),
440		),
441	),
442	(
443		(
444			e3.Synchronize(),
445		), (
446			e3.Ready(b'I'),
447		),
448	),
449]
450
451class test_xact3(unittest.TestCase):
452	def testTransactionSamplesAll(self):
453		for xcmd, xres in xact_samples:
454			x = x3.Instruction(xcmd)
455			r = tuple([(y.type, y.serialize()) for y in xres])
456			x.state[1]()
457			self.assertEqual(x.messages, ())
458			x.state[1](r)
459			self.assertEqual(x.state, x3.Complete)
460			rec = []
461			for y in x.completed:
462				for z in y[1]:
463					if type(z) is type(b''):
464						z = e3.CopyData(z)
465					rec.append(z)
466			self.assertEqual(xres, tuple(rec))
467
468	def testClosing(self):
469		c = x3.Closing()
470		self.assertEqual(c.messages, (e3.DisconnectMessage,))
471		c.state[1]()
472		self.assertEqual(c.fatal, True)
473		self.assertEqual(c.error_message.__class__, e3.ClientError)
474		self.assertEqual(c.error_message[b'C'], '08003')
475
476	def testNegotiation(self):
477		# simple successful run
478		n = x3.Negotiation({}, b'')
479		n.state[1]()
480		n.state[1](
481			pairs(
482				e3.Notice(((b'M', b"foobar"),)),
483				e3.Authentication(e3.AuthRequest_OK, b''),
484				e3.KillInformation(0,0),
485				e3.ShowOption(b'name', b'val'),
486				e3.Ready(b'I'),
487			)
488		)
489		self.assertEqual(n.state, x3.Complete)
490		self.assertEqual(n.last_ready.xact_state, b'I')
491		# no killinfo.. should cause protocol error...
492		n = x3.Negotiation({}, b'')
493		n.state[1]()
494		n.state[1](
495			pairs(
496				e3.Notice(((b'M', b"foobar"),)),
497				e3.Authentication(e3.AuthRequest_OK, b''),
498				e3.ShowOption(b'name', b'val'),
499				e3.Ready(b'I'),
500			)
501		)
502		self.assertEqual(n.state, x3.Complete)
503		self.assertEqual(n.last_ready, None)
504		self.assertEqual(n.error_message[b'C'], '08P01')
505		# killinfo twice.. must cause protocol error...
506		n = x3.Negotiation({}, b'')
507		n.state[1]()
508		n.state[1](
509			pairs(
510				e3.Notice(((b'M', b"foobar"),)),
511				e3.Authentication(e3.AuthRequest_OK, b''),
512				e3.ShowOption(b'name', b'val'),
513				e3.KillInformation(0,0),
514				e3.KillInformation(0,0),
515				e3.Ready(b'I'),
516			)
517		)
518		self.assertEqual(n.state, x3.Complete)
519		self.assertEqual(n.last_ready, None)
520		self.assertEqual(n.error_message[b'C'], '08P01')
521		# start with ready message..
522		n = x3.Negotiation({}, b'')
523		n.state[1]()
524		n.state[1](
525			pairs(
526				e3.Notice(((b'M', b"foobar"),)),
527				e3.Ready(b'I'),
528				e3.Authentication(e3.AuthRequest_OK, b''),
529				e3.ShowOption(b'name', b'val'),
530			)
531		)
532		self.assertEqual(n.state, x3.Complete)
533		self.assertEqual(n.last_ready, None)
534		self.assertEqual(n.error_message[b'C'], '08P01')
535		# unsupported authreq
536		n = x3.Negotiation({}, b'')
537		n.state[1]()
538		n.state[1](
539			pairs(
540				e3.Authentication(255, b''),
541			)
542		)
543		self.assertEqual(n.state, x3.Complete)
544		self.assertEqual(n.last_ready, None)
545		self.assertEqual(n.error_message[b'C'], '--AUT')
546
547	def testInstructionAsynchook(self):
548		l = []
549		def hook(data):
550			l.append(data)
551		x = x3.Instruction([
552			e3.Query(b"NOTHING")
553		], asynchook = hook)
554		a1 = e3.Notice(((b'M', b"m1"),))
555		a2 = e3.Notify(0, b'relation', b'parameter')
556		a3 = e3.ShowOption(b'optname', b'optval')
557		# "send" the query message
558		x.state[1]()
559		# "receive" the tuple
560		x.state[1]([(a1.type, a1.serialize()),])
561		a2l = [(a2.type, a2.serialize()),]
562		x.state[1](a2l)
563		# validate that the hook is not fed twice because
564		# it's the exact same message set. (later assertion will validate)
565		x.state[1](a2l)
566		x.state[1]([(a3.type, a3.serialize()),])
567		# we only care about validating that l got everything.
568		self.assertEqual([a1,a2,a3], l)
569		self.assertEqual(x.state[0], x3.Receiving)
570		# validate that the asynchook exception is trapped.
571		class Nee(Exception):
572			pass
573		def ehook(msg):
574			raise Nee("this should **not** be part of the summary")
575		x = x3.Instruction([
576			e3.Query(b"NOTHING")
577		], asynchook = ehook)
578		a1 = e3.Notice(((b'M', b"m1"),))
579		x.state[1]()
580		import sys
581		v = None
582		def exchook(typ, val, tb):
583			nonlocal v
584			v = val
585		seh = sys.excepthook
586		sys.excepthook = exchook
587		# we only care about validating that the exchook got called.
588		x.state[1]([(a1.type, a1.serialize())])
589		sys.excepthook = seh
590		self.assertTrue(isinstance(v, Nee))
591
592class test_client3(unittest.TestCase):
593	def test_timeout(self):
594		portnum = find_available_port()
595		servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
596		with servsock:
597			servsock.bind(('localhost', portnum))
598			pc = c3.Connection(
599				SocketFactory(
600					(socket.AF_INET, socket.SOCK_STREAM),
601					('localhost', portnum)
602				),
603				{}
604			)
605			pc.connect(timeout = 1)
606			try:
607				self.assertEqual(pc.xact.fatal, True)
608				self.assertEqual(pc.xact.__class__, x3.Negotiation)
609			finally:
610				if pc.socket is not None:
611					pc.socket.close()
612
613	def test_SSL_failure(self):
614		portnum = find_available_port()
615		servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
616		with servsock:
617			servsock.bind(('localhost', portnum))
618			pc = c3.Connection(
619				SocketFactory(
620					(socket.AF_INET, socket.SOCK_STREAM),
621					('localhost', portnum)
622				),
623				{}
624			)
625			exc = None
626			servsock.listen(1)
627			def client_thread():
628				pc.connect(ssl = True)
629			client = Thread(target = client_thread)
630			try:
631				client.start()
632				c, addr = servsock.accept()
633				with c:
634					c.send(b'S')
635					c.sendall(b'0000000000000000000000')
636					c.recv(1024)
637					c.close()
638				client.join()
639			finally:
640				if pc.socket is not None:
641					pc.socket.close()
642
643		self.assertEqual(pc.xact.fatal, True)
644		self.assertEqual(pc.xact.__class__, x3.Negotiation)
645		self.assertEqual(pc.xact.error_message.__class__, e3.ClientError)
646		self.assertTrue(hasattr(pc.xact, 'exception'))
647
648	def test_bad_negotiation(self):
649		portnum = find_available_port()
650		servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
651		servsock.bind(('localhost', portnum))
652		pc = c3.Connection(
653			SocketFactory(
654				(socket.AF_INET, socket.SOCK_STREAM),
655				('localhost', portnum)
656			),
657			{}
658		)
659		exc = None
660		servsock.listen(1)
661		def client_thread():
662			pc.connect()
663		client = Thread(target = client_thread)
664		try:
665			client.start()
666			c, addr = servsock.accept()
667			try:
668				c.recv(1024)
669			finally:
670				c.close()
671			time.sleep(0.25)
672			client.join()
673			servsock.close()
674			self.assertEqual(pc.xact.fatal, True)
675			self.assertEqual(pc.xact.__class__, x3.Negotiation)
676			self.assertEqual(pc.xact.error_message.__class__, e3.ClientError)
677			self.assertEqual(pc.xact.error_message[b'C'], '08006')
678		finally:
679			servsock.close()
680			if pc.socket is not None:
681				pc.socket.close()
682
683if __name__ == '__main__':
684	from types import ModuleType
685	this = ModuleType("this")
686	this.__dict__.update(globals())
687	try:
688		unittest.main(this)
689	finally:
690		import gc
691		gc.collect()
692