1## 2# .protocol.element3 3## 4'PQ version 3.0 elements' 5import sys 6import os 7import pprint 8from struct import unpack, Struct 9from .message_types import message_types 10from ..python.structlib import ushort_pack, ushort_unpack, ulong_pack, ulong_unpack 11 12try: 13 from ..port.optimized import parse_tuple_message, pack_tuple_data 14except ImportError: 15 def pack_tuple_data(atts, 16 none = None, 17 ulong_pack = ulong_pack, 18 blen = bytes.__len__ 19 ): 20 return b''.join([ 21 b'\xff\xff\xff\xff' 22 if x is none 23 else (ulong_pack(blen(x)) + x) 24 for x in atts 25 ]) 26 27try: 28 from ..port.optimized import cat_messages 29except ImportError: 30 from ..python.structlib import lH_pack, long_pack 31 # Special case tuple()'s 32 def _pack_tuple(t, 33 blen = bytes.__len__, 34 tlen = tuple.__len__, 35 pack_head = lH_pack, 36 ulong_pack = ulong_pack, 37 ptd = pack_tuple_data, 38 ): 39 # NOTE: duplicated from above 40 r = b''.join([ 41 b'\xff\xff\xff\xff' 42 if x is None 43 else (ulong_pack(blen(x)) + x) 44 for x in t 45 ]) 46 return pack_head((blen(r) + 6, tlen(t))) + r 47 48 def cat_messages(messages, 49 lpack = long_pack, 50 blen = bytes.__len__, 51 tuple = tuple, 52 pack_tuple = _pack_tuple 53 ): 54 return b''.join([ 55 (x.bytes() if x.__class__ is not bytes else ( 56 b'd' + lpack(blen(x) + 4) + x 57 )) if x.__class__ is not tuple else ( 58 b'D' + pack_tuple(x) 59 ) for x in messages 60 ]) 61 del _pack_tuple, lH_pack, long_pack 62 63StringFormat = b'\x00\x00' 64BinaryFormat = b'\x00\x01' 65 66class Message(object): 67 bytes_struct = Struct("!cL") 68 __slots__ = () 69 def __repr__(self): 70 return '%s.%s(%s)' %( 71 type(self).__module__, 72 type(self).__name__, 73 ', '.join([repr(getattr(self, x)) for x in self.__slots__]) 74 ) 75 76 def __eq__(self, ob): 77 return isinstance(ob, type(self)) and self.type == ob.type and \ 78 not False in ( 79 getattr(self, x) == getattr(ob, x) 80 for x in self.__slots__ 81 ) 82 83 def bytes(self): 84 data = self.serialize() 85 return self.bytes_struct.pack(self.type, len(data) + 4) + data 86 87 @classmethod 88 def parse(typ, data): 89 return typ(data) 90 91class StringMessage(Message): 92 """ 93 A message based on a single string component. 94 """ 95 type = b'' 96 __slots__ = ('data',) 97 98 def __repr__(self): 99 return '%s.%s(%s)' %( 100 type(self).__module__, 101 type(self).__name__, 102 repr(self.data), 103 ) 104 105 def __getitem__(self, i): 106 return self.data.__getitem__(i) 107 108 def __init__(self, data): 109 self.data = data 110 111 def serialize(self): 112 return bytes(self.data) + b'\x00' 113 114 @classmethod 115 def parse(typ, data): 116 if not data.endswith(b'\x00'): 117 raise ValueError("string message not NUL-terminated") 118 return typ(data[:-1]) 119 120class TupleMessage(tuple, Message): 121 """ 122 A message who's data is based on a tuple structure. 123 """ 124 type = b'' 125 __slots__ = () 126 127 def __repr__(self): 128 return '%s.%s(%s)' %( 129 type(self).__module__, 130 type(self).__name__, 131 tuple.__repr__(self) 132 ) 133 134class Void(Message): 135 """ 136 An absolutely empty message. When serialized, it always yields an empty 137 string. 138 """ 139 type = b'' 140 __slots__ = () 141 142 def bytes(self): 143 return b'' 144 145 def serialize(self): 146 return b'' 147 148 def __new__(typ, *args, **kw): 149 return VoidMessage 150VoidMessage = Message.__new__(Void) 151 152def dict_message_repr(self): 153 return '%s.%s(**%s)' %( 154 type(self).__module__, 155 type(self).__name__, 156 pprint.pformat(dict(self)) 157 ) 158 159class WireMessage(Message): 160 def __init__(self, typ_data): 161 self.type = message_types[typ_data[0][0]] 162 self.data = typ_data[1] 163 164 def serialize(self): 165 return self[1] 166 167 @classmethod 168 def parse(typ, data): 169 if ulong_unpack(data[1:5]) != len(data) - 1: 170 raise ValueError( 171 "invalid wire message where data is %d bytes and " \ 172 "internal size stamp is %d bytes" %( 173 len(data), ulong_unpack(data[1:5]) + 1 174 ) 175 ) 176 return typ((data[0:1], data[5:])) 177 178class EmptyMessage(Message): 179 'An abstract message that is always empty' 180 __slots__ = () 181 type = b'' 182 183 def __new__(typ): 184 return typ.SingleInstance 185 186 def serialize(self): 187 return b'' 188 189 @classmethod 190 def parse(typ, data): 191 if data != b'': 192 raise ValueError("empty message(%r) had data" %(typ.type,)) 193 return typ.SingleInstance 194 195class Notify(Message): 196 'Asynchronous notification message' 197 type = message_types[b'A'[0]] 198 __slots__ = ('pid', 'channel', 'payload',) 199 200 def __init__(self, pid, channel, payload = b''): 201 self.pid = pid 202 self.channel = channel 203 self.payload = payload 204 205 def serialize(self): 206 return ulong_pack(self.pid) + \ 207 self.channel + b'\x00' + \ 208 self.payload + b'\x00' 209 210 @classmethod 211 def parse(typ, data): 212 pid = ulong_unpack(data) 213 channel, payload, _ = data[4:].split(b'\x00', 2) 214 return typ(pid, channel, payload) 215 216class ShowOption(Message): 217 """ShowOption(name, value) 218 GUC variable information from backend""" 219 type = message_types[b'S'[0]] 220 __slots__ = ('name', 'value') 221 222 def __init__(self, name, value): 223 self.name = name 224 self.value = value 225 226 def serialize(self): 227 return self.name + b'\x00' + self.value + b'\x00' 228 229 @classmethod 230 def parse(typ, data): 231 return typ(*(data.split(b'\x00', 2)[0:2])) 232 233class Complete(StringMessage): 234 'Command completion message.' 235 type = message_types[b'C'[0]] 236 __slots__ = () 237 238 @classmethod 239 def parse(typ, data): 240 return typ(data.rstrip(b'\x00')) 241 242 def extract_count(self): 243 """ 244 Extract the last set of digits as an integer. 245 """ 246 # Find the last sequence of digits. 247 # If there are no fields consisting only of digits, there is no count. 248 for x in reversed(self.data.split()): 249 if x.isdigit(): 250 return int(x) 251 return None 252 253 def extract_command(self): 254 """ 255 Strip all the *surrounding* digits and spaces from the command tag, 256 and return that string. 257 """ 258 return self.data.strip(b'\c\n\t 0123456789') or None 259 260class Null(EmptyMessage): 261 'Null command' 262 type = message_types[b'I'[0]] 263 __slots__ = () 264NullMessage = Message.__new__(Null) 265Null.SingleInstance = NullMessage 266 267class NoData(EmptyMessage): 268 'Null command' 269 type = message_types[b'n'[0]] 270 __slots__ = () 271NoDataMessage = Message.__new__(NoData) 272NoData.SingleInstance = NoDataMessage 273 274class ParseComplete(EmptyMessage): 275 'Parse reaction' 276 type = message_types[b'1'[0]] 277 __slots__ = () 278ParseCompleteMessage = Message.__new__(ParseComplete) 279ParseComplete.SingleInstance = ParseCompleteMessage 280 281class BindComplete(EmptyMessage): 282 'Bind reaction' 283 type = message_types[b'2'[0]] 284 __slots__ = () 285BindCompleteMessage = Message.__new__(BindComplete) 286BindComplete.SingleInstance = BindCompleteMessage 287 288class CloseComplete(EmptyMessage): 289 'Close statement or Portal' 290 type = message_types[b'3'[0]] 291 __slots__ = () 292CloseCompleteMessage = Message.__new__(CloseComplete) 293CloseComplete.SingleInstance = CloseCompleteMessage 294 295class Suspension(EmptyMessage): 296 'Portal was suspended, more tuples for reading' 297 type = message_types[b's'[0]] 298 __slots__ = () 299SuspensionMessage = Message.__new__(Suspension) 300Suspension.SingleInstance = SuspensionMessage 301 302class Ready(Message): 303 'Ready for new query' 304 type = message_types[b'Z'[0]] 305 possible_states = ( 306 message_types[b'I'[0]], 307 message_types[b'E'[0]], 308 message_types[b'T'[0]], 309 ) 310 __slots__ = ('xact_state',) 311 312 def __init__(self, data): 313 if data not in self.possible_states: 314 raise ValueError("invalid state for Ready message: " + repr(data)) 315 self.xact_state = data 316 317 def serialize(self): 318 return self.xact_state 319 320class Notice(Message, dict): 321 """ 322 Notification message 323 324 Used by PQ to emit INFO, NOTICE, and WARNING messages among other 325 severities. 326 """ 327 type = message_types[b'N'[0]] 328 __slots__ = () 329 __repr__ = dict_message_repr 330 331 def serialize(self): 332 return b'\x00'.join([ 333 k + v for k, v in self.items() 334 if k and v is not None 335 ]) + b'\x00' 336 337 @classmethod 338 def parse(typ, data, msgtypes = message_types): 339 return typ([ 340 (msgtypes[x[0]], x[1:]) 341 # "if x" reduce empty fields 342 for x in data.split(b'\x00') if x 343 ]) 344 345class ClientNotice(Notice): 346 __slots__ = () 347 348 def serialize(self): 349 raise RuntimeError("cannot serialize ClientNotice") 350 351 @classmethod 352 def parse(self): 353 raise RuntimeError("cannot parse ClientNotice") 354 355class Error(Notice): 356 """Incoming error""" 357 type = message_types[b'E'[0]] 358 __slots__ = () 359 360class ClientError(Error): 361 __slots__ = () 362 363 def serialize(self): 364 raise RuntimeError("cannot serialize ClientError") 365 366 @classmethod 367 def parse(self): 368 raise RuntimeError("cannot serialize ClientError") 369 370class FunctionResult(Message): 371 """Function result value""" 372 type = message_types[b'V'[0]] 373 __slots__ = ('result',) 374 375 def __init__(self, datum): 376 self.result = datum 377 378 def serialize(self): 379 return self.result is None and b'\xff\xff\xff\xff' or \ 380 ulong_pack(len(self.result)) + self.result 381 382 @classmethod 383 def parse(typ, data): 384 if data == b'\xff\xff\xff\xff': 385 return typ(None) 386 size = ulong_unpack(data[0:4]) 387 data = data[4:] 388 if size != len(data): 389 raise ValueError( 390 "data length(%d) is not equal to the specified message size(%d)" %( 391 len(data), size 392 ) 393 ) 394 return typ(data) 395 396class AttributeTypes(TupleMessage): 397 """Tuple attribute types""" 398 type = message_types[b't'[0]] 399 __slots__ = () 400 401 def serialize(self): 402 return ushort_pack(len(self)) + b''.join([ulong_pack(x) for x in self]) 403 404 @classmethod 405 def parse(typ, data): 406 ac = ushort_unpack(data[0:2]) 407 args = data[2:] 408 if len(args) != ac * 4: 409 raise ValueError("invalid argument type data size") 410 return typ(unpack('!%dL'%(ac,), args)) 411 412class TupleDescriptor(TupleMessage): 413 """Tuple description""" 414 type = message_types[b'T'[0]] 415 struct = Struct("!LhLhlh") 416 __slots__ = () 417 418 def keys(self): 419 return [x[0] for x in self] 420 421 def serialize(self): 422 return ushort_pack(len(self)) + b''.join([ 423 x[0] + b'\x00' + self.struct.pack(*x[1:]) 424 for x in self 425 ]) 426 427 @classmethod 428 def parse(typ, data): 429 ac = ushort_unpack(data[0:2]) 430 atts = [] 431 data = data[2:] 432 ca = 0 433 while ca < ac: 434 # End Of Attribute Name 435 eoan = data.index(b'\x00') 436 name = data[0:eoan] 437 data = data[eoan+1:] 438 # name, relationId, columnNumber, typeId, typlen, typmod, format 439 atts.append((name,) + typ.struct.unpack(data[0:18])) 440 data = data[18:] 441 ca += 1 442 return typ(atts) 443 444class Tuple(TupleMessage): 445 """Incoming tuple""" 446 type = message_types[b'D'[0]] 447 __slots__ = () 448 449 def serialize(self): 450 return ushort_pack(len(self)) + pack_tuple_data(self) 451 452 @classmethod 453 def parse(typ, data, 454 T = tuple, ulong_unpack = ulong_unpack, 455 len = len 456 ): 457 natts = ushort_unpack(data[0:2]) 458 atts = [] 459 offset = 2 460 add = atts.append 461 462 while natts > 0: 463 alo = offset 464 offset += 4 465 size = data[alo:offset] 466 if size == b'\xff\xff\xff\xff': 467 att = None 468 else: 469 al = ulong_unpack(size) 470 ao = offset 471 offset = ao + al 472 att = data[ao:offset] 473 add(att) 474 natts -= 1 475 return T(atts) 476 try: 477 parse = parse_tuple_message 478 except NameError: 479 # This is an override when port.optimized is available. 480 pass 481 482class KillInformation(Message): 483 'Backend cancellation information' 484 type = message_types[b'K'[0]] 485 struct = Struct("!LL") 486 __slots__ = ('pid', 'key') 487 488 def __init__(self, pid, key): 489 self.pid = pid 490 self.key = key 491 492 def serialize(self): 493 return self.struct.pack(self.pid, self.key) 494 495 @classmethod 496 def parse(typ, data): 497 return typ(*typ.struct.unpack(data)) 498 499class CancelRequest(KillInformation): 500 'Abort the query in the specified backend' 501 type = b'' 502 from .version import CancelRequestCode as version 503 packed_version = version.bytes() 504 __slots__ = ('pid', 'key') 505 506 def serialize(self): 507 return self.packed_version + self.struct.pack( 508 self.pid, self.key 509 ) 510 511 def bytes(self): 512 data = self.serialize() 513 return ulong_pack(len(data) + 4) + self.serialize() 514 515 @classmethod 516 def parse(typ, data): 517 if data[0:4] != typ.packed_version: 518 raise ValueError("invalid cancel query code") 519 return typ(*typ.struct.unpack(data[4:])) 520 521class NegotiateSSL(Message): 522 "Discover backend's SSL support" 523 type = b'' 524 from .version import NegotiateSSLCode as version 525 packed_version = version.bytes() 526 __slots__ = () 527 528 def __new__(typ): 529 return NegotiateSSLMessage 530 531 def bytes(self): 532 data = self.serialize() 533 return ulong_pack(len(data) + 4) + data 534 535 def serialize(self): 536 return self.packed_version 537 538 @classmethod 539 def parse(typ, data): 540 if data != typ.packed_version: 541 raise ValueError("invalid SSL Negotiation code") 542 return NegotiateSSLMessage 543NegotiateSSLMessage = Message.__new__(NegotiateSSL) 544 545class Startup(Message, dict): 546 """ 547 Initiate a connection using the given keywords. 548 """ 549 type = b'' 550 from postgresql.protocol.version import V3_0 as version 551 packed_version = version.bytes() 552 __slots__ = () 553 __repr__ = dict_message_repr 554 555 def serialize(self): 556 return self.packed_version + b''.join([ 557 k + b'\x00' + v + b'\x00' 558 for k, v in self.items() 559 if v is not None 560 ]) + b'\x00' 561 562 def bytes(self): 563 data = self.serialize() 564 return ulong_pack(len(data) + 4) + data 565 566 @classmethod 567 def parse(typ, data): 568 if data[0:4] != typ.packed_version: 569 raise ValueError("invalid version code {1}".format(repr(data[0:4]))) 570 kw = dict() 571 key = None 572 for value in data[4:].split(b'\x00')[:-2]: 573 if key is None: 574 key = value 575 continue 576 kw[key] = value 577 key = None 578 return typ(kw) 579 580AuthRequest_OK = 0 581AuthRequest_Cleartext = 3 582AuthRequest_Password = AuthRequest_Cleartext 583AuthRequest_Crypt = 4 584AuthRequest_MD5 = 5 585 586# Unsupported by pg_protocol. 587AuthRequest_KRB4 = 1 588AuthRequest_KRB5 = 2 589AuthRequest_SCMC = 6 590AuthRequest_SSPI = 9 591AuthRequest_GSS = 7 592AuthRequest_GSSContinue = 8 593 594AuthNameMap = { 595 AuthRequest_Password : 'Cleartext', 596 AuthRequest_Crypt : 'Crypt', 597 AuthRequest_MD5 : 'MD5', 598 599 AuthRequest_KRB4 : 'Kerberos4', 600 AuthRequest_KRB5 : 'Kerberos5', 601 AuthRequest_SCMC : 'SCM Credential', 602 AuthRequest_SSPI : 'SSPI', 603 AuthRequest_GSS : 'GSS', 604 AuthRequest_GSSContinue : 'GSSContinue', 605} 606 607class Authentication(Message): 608 """Authentication(request, salt)""" 609 type = message_types[b'R'[0]] 610 __slots__ = ('request', 'salt') 611 612 def __init__(self, request, salt): 613 self.request = request 614 self.salt = salt 615 616 def serialize(self): 617 return ulong_pack(self.request) + self.salt 618 619 @classmethod 620 def parse(typ, data): 621 return typ(ulong_unpack(data[0:4]), data[4:]) 622 623class Password(StringMessage): 624 'Password supplement' 625 type = message_types[b'p'[0]] 626 __slots__ = ('data',) 627 628class Disconnect(EmptyMessage): 629 'Close the connection' 630 type = message_types[b'X'[0]] 631 __slots__ = () 632DisconnectMessage = Message.__new__(Disconnect) 633Disconnect.SingleInstance = DisconnectMessage 634 635class Flush(EmptyMessage): 636 'Flush' 637 type = message_types[b'H'[0]] 638 __slots__ = () 639FlushMessage = Message.__new__(Flush) 640Flush.SingleInstance = FlushMessage 641 642class Synchronize(EmptyMessage): 643 'Synchronize' 644 type = message_types[b'S'[0]] 645 __slots__ = () 646SynchronizeMessage = Message.__new__(Synchronize) 647Synchronize.SingleInstance = SynchronizeMessage 648 649class Query(StringMessage): 650 """Execute the query with the given arguments""" 651 type = message_types[b'Q'[0]] 652 __slots__ = ('data',) 653 654class Parse(Message): 655 """Parse a query with the specified argument types""" 656 type = message_types[b'P'[0]] 657 __slots__ = ('name', 'statement', 'argtypes') 658 659 def __init__(self, name, statement, argtypes): 660 self.name = name 661 self.statement = statement 662 self.argtypes = argtypes 663 664 @classmethod 665 def parse(typ, data): 666 name, statement, args = data.split(b'\x00', 2) 667 ac = ushort_unpack(args[0:2]) 668 args = args[2:] 669 if len(args) != ac * 4: 670 raise ValueError("invalid argument type data") 671 at = unpack('!%dL'%(ac,), args) 672 return typ(name, statement, at) 673 674 def serialize(self): 675 ac = ushort_pack(len(self.argtypes)) 676 return self.name + b'\x00' + self.statement + b'\x00' + ac + b''.join([ 677 ulong_pack(x) for x in self.argtypes 678 ]) 679 680class Bind(Message): 681 """ 682 Bind a parsed statement with the given arguments to a Portal 683 684 Bind( 685 name, # Portal/Cursor identifier 686 statement, # Prepared Statement name/identifier 687 aformats, # Argument formats; Sequence of BinaryFormat or StringFormat. 688 arguments, # Argument data; Sequence of None or argument data(str). 689 rformats, # Result formats; Sequence of BinaryFormat or StringFormat. 690 ) 691 """ 692 type = message_types[b'B'[0]] 693 __slots__ = ('name', 'statement', 'aformats', 'arguments', 'rformats') 694 695 def __init__(self, name, statement, aformats, arguments, rformats): 696 self.name = name 697 self.statement = statement 698 self.aformats = aformats 699 self.arguments = arguments 700 self.rformats = rformats 701 702 def serialize(self, len = len): 703 args = self.arguments 704 ac = ushort_pack(len(args)) 705 ad = pack_tuple_data(tuple(args)) 706 return \ 707 self.name + b'\x00' + self.statement + b'\x00' + \ 708 ac + b''.join(self.aformats) + ac + ad + \ 709 ushort_pack(len(self.rformats)) + b''.join(self.rformats) 710 711 @classmethod 712 def parse(typ, message_data): 713 name, statement, data = message_data.split(b'\x00', 2) 714 ac = ushort_unpack(data[:2]) 715 offset = 2 + (2 * ac) 716 aformats = unpack(("2s" * ac), data[2:offset]) 717 718 natts = ushort_unpack(data[offset:offset+2]) 719 args = list() 720 offset += 2 721 722 while natts > 0: 723 alo = offset 724 offset += 4 725 size = data[alo:offset] 726 if size == b'\xff\xff\xff\xff': 727 att = None 728 else: 729 al = ulong_unpack(size) 730 ao = offset 731 offset = ao + al 732 att = data[ao:offset] 733 args.append(att) 734 natts -= 1 735 736 rfc = ushort_unpack(data[offset:offset+2]) 737 ao = offset + 2 738 offset = ao + (2 * rfc) 739 rformats = unpack(("2s" * rfc), data[ao:offset]) 740 741 return typ(name, statement, aformats, args, rformats) 742 743class Execute(Message): 744 """Fetch results from the specified Portal""" 745 type = message_types[b'E'[0]] 746 __slots__ = ('name', 'max') 747 748 def __init__(self, name, max = 0): 749 self.name = name 750 self.max = max 751 752 def serialize(self): 753 return self.name + b'\x00' + ulong_pack(self.max) 754 755 @classmethod 756 def parse(typ, data): 757 name, max = data.split(b'\x00', 1) 758 return typ(name, ulong_unpack(max)) 759 760class Describe(StringMessage): 761 """Describe a Portal or Prepared Statement""" 762 type = message_types[b'D'[0]] 763 __slots__ = ('data',) 764 765 def serialize(self): 766 return self.subtype + self.data + b'\x00' 767 768 @classmethod 769 def parse(typ, data): 770 if data[0:1] != typ.subtype: 771 raise ValueError( 772 "invalid Describe message subtype, %r; expected %r" %( 773 typ.subtype, data[0:1] 774 ) 775 ) 776 return super().parse(data[1:]) 777 778class DescribeStatement(Describe): 779 subtype = message_types[b'S'[0]] 780 __slots__ = ('data',) 781 782class DescribePortal(Describe): 783 subtype = message_types[b'P'[0]] 784 __slots__ = ('data',) 785 786class Close(StringMessage): 787 """Generic Close""" 788 type = message_types[b'C'[0]] 789 __slots__ = () 790 791 def serialize(self): 792 return self.subtype + self.data + b'\x00' 793 794 @classmethod 795 def parse(typ, data): 796 if data[0:1] != typ.subtype: 797 raise ValueError( 798 "invalid Close message subtype, %r; expected %r" %( 799 typ.subtype, data[0:1] 800 ) 801 ) 802 return super().parse(data[1:]) 803 804class CloseStatement(Close): 805 """Close the specified Statement""" 806 subtype = message_types[b'S'[0]] 807 __slots__ = () 808 809class ClosePortal(Close): 810 """Close the specified Portal""" 811 subtype = message_types[b'P'[0]] 812 __slots__ = () 813 814class Function(Message): 815 """Execute the specified function with the given arguments""" 816 type = message_types[b'F'[0]] 817 __slots__ = ('oid', 'aformats', 'arguments', 'rformat') 818 819 def __init__(self, oid, aformats, args, rformat): 820 self.oid = oid 821 self.aformats = aformats 822 self.arguments = args 823 self.rformat = rformat 824 825 def serialize(self): 826 ac = ushort_pack(len(self.arguments)) 827 return ulong_pack(self.oid) + \ 828 ac + b''.join(self.aformats) + \ 829 ac + pack_tuple_data(tuple(self.arguments)) + self.rformat 830 831 @classmethod 832 def parse(typ, data): 833 oid = ulong_unpack(data[0:4]) 834 835 ac = ushort_unpack(data[4:6]) 836 offset = 6 + (2 * ac) 837 aformats = unpack(("2s" * ac), data[6:offset]) 838 839 natts = ushort_unpack(data[offset:offset+2]) 840 args = list() 841 offset += 2 842 843 while natts > 0: 844 alo = offset 845 offset += 4 846 size = data[alo:offset] 847 if size == b'\xff\xff\xff\xff': 848 att = None 849 else: 850 al = ulong_unpack(size) 851 ao = offset 852 offset = ao + al 853 att = data[ao:offset] 854 args.append(att) 855 natts -= 1 856 857 return typ(oid, aformats, args, data[offset:]) 858 859class CopyBegin(Message): 860 type = None 861 struct = Struct("!BH") 862 __slots__ = ('format', 'formats') 863 864 def __init__(self, format, formats): 865 self.format = format 866 self.formats = formats 867 868 def serialize(self): 869 return self.struct.pack(self.format, len(self.formats)) + b''.join([ 870 ushort_pack(x) for x in self.formats 871 ]) 872 873 @classmethod 874 def parse(typ, data): 875 format, natts = typ.struct.unpack(data[:3]) 876 formats_str = data[3:] 877 if len(formats_str) != natts * 2: 878 raise ValueError("number of formats and data do not match up") 879 return typ(format, [ 880 ushort_unpack(formats_str[x:x+2]) for x in range(0, natts * 2, 2) 881 ]) 882 883class CopyToBegin(CopyBegin): 884 """Begin copying to""" 885 type = message_types[b'H'[0]] 886 __slots__ = ('format', 'formats') 887 888class CopyFromBegin(CopyBegin): 889 """Begin copying from""" 890 type = message_types[b'G'[0]] 891 __slots__ = ('format', 'formats') 892 893class CopyData(Message): 894 type = message_types[b'd'[0]] 895 __slots__ = ('data',) 896 897 def __init__(self, data): 898 self.data = bytes(data) 899 900 def serialize(self): 901 return self.data 902 903 @classmethod 904 def parse(typ, data): 905 return typ(data) 906 907class CopyFail(StringMessage): 908 type = message_types[b'f'[0]] 909 __slots__ = ('data',) 910 911class CopyDone(EmptyMessage): 912 type = message_types[b'c'[0]] 913 __slots__ = ('data',) 914CopyDoneMessage = Message.__new__(CopyDone) 915CopyDone.SingleInstance = CopyDoneMessage 916