1# Copyright 2014 Tycho Andersen 2# Copyright 2014 Sean Vig 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from __future__ import division, absolute_import 17 18import ctypes.util 19import functools 20import platform 21import six 22import struct 23import weakref 24 25try: 26 from xcffib._ffi import ffi 27except ImportError: 28 from xcffib.ffi_build import ffi 29 30if platform.system() == "Darwin": 31 soname = "libxcb.dylib" 32elif platform.system() == "Windows": 33 soname = "libxcb.dll" 34else: 35 soname = ctypes.util.find_library("xcb") 36 if soname is None: 37 soname = "libxcb.so" 38lib = ffi.dlopen(soname) 39 40__xcb_proto_version__ = "1.14" 41__version__ = "0.11.1" 42 43X_PROTOCOL = lib.X_PROTOCOL 44X_PROTOCOL_REVISION = lib.X_PROTOCOL_REVISION 45 46XCB_NONE = lib.XCB_NONE 47XCB_COPY_FROM_PARENT = lib.XCB_COPY_FROM_PARENT 48XCB_CURRENT_TIME = lib.XCB_CURRENT_TIME 49XCB_NO_SYMBOL = lib.XCB_NO_SYMBOL 50 51# For xpyb compatibility 52NONE = XCB_NONE 53CopyFromParent = XCB_COPY_FROM_PARENT 54CurrentTime = XCB_CURRENT_TIME 55NoSymbol = XCB_NO_SYMBOL 56 57XCB_CONN_ERROR = lib.XCB_CONN_ERROR 58XCB_CONN_CLOSED_EXT_NOTSUPPORTED = lib.XCB_CONN_CLOSED_EXT_NOTSUPPORTED 59XCB_CONN_CLOSED_MEM_INSUFFICIENT = lib.XCB_CONN_CLOSED_MEM_INSUFFICIENT 60XCB_CONN_CLOSED_REQ_LEN_EXCEED = lib.XCB_CONN_CLOSED_REQ_LEN_EXCEED 61XCB_CONN_CLOSED_PARSE_ERR = lib.XCB_CONN_CLOSED_PARSE_ERR 62# XCB_CONN_CLOSED_INVALID_SCREEN = lib.XCB_CONN_CLOSED_INVALID_SCREEN 63# XCB_CONN_CLOSED_FDPASSING_FAILED = lib.XCB_CONN_CLOSED_FDPASSING_FAILED 64 65cffi_explicit_lifetimes = weakref.WeakKeyDictionary() 66 67 68def type_pad(t, i): 69 return -i & (3 if t > 4 else t - 1) 70 71 72def visualtype_to_c_struct(vt): 73 # let ffi be a kwarg so cairocffi can pass in its ffi 74 # cfficairo needs an xcb_visualtype_t 75 s = ffi.new("struct xcb_visualtype_t *") 76 77 s.visual_id = vt.visual_id 78 s._class = vt._class 79 s.bits_per_rgb_value = vt.bits_per_rgb_value 80 s.colormap_entries = vt.colormap_entries 81 s.red_mask = vt.red_mask 82 s.green_mask = vt.green_mask 83 s.blue_mask = vt.blue_mask 84 85 return s 86 87 88class Unpacker(object): 89 90 def __init__(self, known_max=None): 91 self.size = 0 92 self.offset = 0 93 self.known_max = known_max 94 if self.known_max is not None: 95 self._resize(known_max) 96 97 def pad(self, thing): 98 if isinstance(thing, type) and issubclass(thing, (Struct, Union)): 99 if hasattr(thing, "fixed_size"): 100 size = thing.fixed_size 101 else: 102 size = 4 103 else: 104 size = struct.calcsize(thing) 105 106 self.offset += type_pad(size, self.offset) 107 108 def unpack(self, fmt, increment=True): 109 fmt = "=" + fmt 110 size = struct.calcsize(fmt) 111 if size > self.size - self.offset: 112 self._resize(size) 113 ret = struct.unpack_from(fmt, self.buf, self.offset) 114 115 if increment: 116 self.offset += size 117 return ret 118 119 def cast(self, typ): 120 assert self.offset == 0 121 return ffi.cast(typ, self.cdata) 122 123 def copy(self): 124 raise NotImplementedError 125 126 @classmethod 127 def synthetic(cls, data, format): 128 self = cls.__new__(cls) 129 self.buf = data 130 self.offset = 0 131 self.size = len(data) 132 self.size 133 134 135class CffiUnpacker(Unpacker): 136 137 def __init__(self, cdata, known_max=None): 138 self.cdata = cdata 139 Unpacker.__init__(self, known_max) 140 141 def _resize(self, increment): 142 if self.offset + increment > self.size: 143 if self.known_max is not None: 144 assert self.size + increment <= self.known_max 145 self.size = self.offset + increment 146 self.buf = ffi.buffer(self.cdata, self.size) 147 148 def copy(self): 149 new = CffiUnpacker(self.cdata, self.known_max) 150 new.offset = self.offset 151 new.size = self.size 152 return new 153 154 155class MemoryUnpacker(Unpacker): 156 157 def __init__(self, buf): 158 self.buf = buf 159 Unpacker.__init__(self, len(self.buf)) 160 161 def _resize(self, increment): 162 if self.size + increment > self.known_max: 163 raise XcffibException("resizing memory buffer to be too big") 164 self.size += increment 165 166 def copy(self): 167 new = MemoryUnpacker(self.buf) 168 new.offset = self.offset 169 new.size = self.size 170 return new 171 172 173def popcount(n): 174 return bin(n).count('1') 175 176 177class XcffibException(Exception): 178 179 """ Generic XcbException; replaces xcb.Exception. """ 180 pass 181 182 183class ConnectionException(XcffibException): 184 REASONS = { 185 lib.XCB_CONN_ERROR: ( 186 'xcb connection errors because of socket, ' 187 'pipe and other stream errors.'), 188 lib.XCB_CONN_CLOSED_EXT_NOTSUPPORTED: ( 189 'xcb connection shutdown because extension not supported'), 190 lib.XCB_CONN_CLOSED_MEM_INSUFFICIENT: ( 191 'malloc(), calloc() and realloc() error upon failure, ' 192 'for eg ENOMEM'), 193 lib.XCB_CONN_CLOSED_REQ_LEN_EXCEED: ( 194 'Connection closed, exceeding request length that server ' 195 'accepts.'), 196 lib.XCB_CONN_CLOSED_PARSE_ERR: ( 197 'Connection closed, error during parsing display string.'), 198 # lib.XCB_CONN_CLOSED_INVALID_SCREEN: ( 199 # 'Connection closed because the server does not have a screen ' 200 # 'matching the display.'), 201 # lib.XCB_CONN_CLOSED_FDPASSING_FAILED: ( 202 # 'Connection closed because some FD passing operation failed'), 203 } 204 205 def __init__(self, err): 206 XcffibException.__init__( 207 self, self.REASONS.get(err, "Unknown connection error.")) 208 209 210class ProtocolException(XcffibException): 211 pass 212 213 214core = None 215core_events = None 216core_errors = None 217# we use _setup here instead of just setup because of a nose bug that triggers 218# when doing the packaging builds in debian: 219# https://code.google.com/p/python-nose/issues/detail?id=326 220_setup = None 221 222extensions = {} 223 224# This seems a bit over engineered to me; it seems unlikely there will ever be 225# a core besides xproto, so why not just hardcode that? 226 227 228def _add_core(value, __setup, events, errors): 229 if not issubclass(value, Extension): 230 raise XcffibException( 231 "Extension type not derived from xcffib.Extension") 232 if not issubclass(__setup, Struct): 233 raise XcffibException("Setup type not derived from xcffib.Struct") 234 235 global core 236 global core_events 237 global core_errors 238 global _setup 239 240 core = value 241 core_events = events 242 core_errors = errors 243 _setup = __setup 244 245 246def _add_ext(key, value, events, errors): 247 if not issubclass(value, Extension): 248 raise XcffibException( 249 "Extension type not derived from xcffib.Extension") 250 extensions[key] = (value, events, errors) 251 252 253class ExtensionKey(object): 254 255 """ This definitely isn't needed, but we keep it around for compatibility 256 with xpyb. 257 """ 258 259 def __init__(self, name): 260 self.name = name 261 262 def __hash__(self): 263 return hash(self.name) 264 265 def __eq__(self, o): 266 return self.name == o.name 267 268 def __ne__(self, o): 269 return self.name != o.name 270 271 def to_cffi(self): 272 c_key = ffi.new("struct xcb_extension_t *") 273 c_key.name = name = ffi.new('char[]', self.name.encode()) 274 cffi_explicit_lifetimes[c_key] = name 275 # xpyb doesn't ever set global_id, which seems wrong, but whatever. 276 c_key.global_id = 0 277 278 return c_key 279 280 281class Protobj(object): 282 283 """ Note: Unlike xcb.Protobj, this does NOT implement the sequence 284 protocol. I found this behavior confusing: Protobj would implement the 285 sequence protocol on self.buf, and then List would go and implement it on 286 List. 287 288 Instead, when we need to create a new event from an existing event, we 289 repack that event into a MemoryUnpacker and use that instead (see 290 eventToUnpacker in the generator for more info.) 291 """ 292 293 def __init__(self, unpacker): 294 """ 295 Params: 296 - unpacker: an Unpacker object 297 """ 298 299 # if we don't know the size right now, we expect it to be calculated 300 # based on stuff in the structure, so we don't save it right now. 301 if unpacker.known_max is not None: 302 self.bufsize = unpacker.known_max 303 304 305class Buffer(Protobj): 306 def __init__(self, unpacker): 307 Protobj.__init__(self, unpacker) 308 self.buf = unpacker.buf 309 310 311class Struct(Protobj): 312 pass 313 314 315class Union(Protobj): 316 @classmethod 317 def synthetic(cls, data=[], fmt=""): 318 self = cls.__new__(cls) 319 self.__init__(MemoryUnpacker(struct.pack(fmt, *data))) 320 return self 321 322 323class Cookie(object): 324 reply_type = None 325 326 def __init__(self, conn, sequence, is_checked): 327 self.conn = conn 328 self.sequence = sequence 329 self.is_checked = is_checked 330 331 def reply(self): 332 data = self.conn.wait_for_reply(self.sequence) 333 return self.reply_type(data) 334 335 def check(self): 336 # Request is not void and checked. 337 assert self.is_checked and self.reply_type is None, ( 338 "Request is not void and checked") 339 self.conn.request_check(self.sequence) 340 341 def discard_reply(self): 342 return self.conn.discard_reply(self.sequence) 343 344 345class VoidCookie(Cookie): 346 347 def reply(self): 348 raise XcffibException("No reply for this message type") 349 350 351class Extension(object): 352 353 def __init__(self, conn, key=None): 354 self.conn = conn 355 356 if key is None: 357 self.c_key = ffi.NULL 358 else: 359 c_key = key.to_cffi() 360 cffi_explicit_lifetimes[self] = c_key 361 self.c_key = c_key 362 363 def send_request(self, opcode, data, cookie=VoidCookie, reply=None, 364 is_checked=False): 365 data = data.getvalue() 366 367 assert len(data) > 3, "xcb_send_request data must be ast least 4 bytes" 368 369 xcb_req = ffi.new("xcb_protocol_request_t *") 370 xcb_req.count = 2 371 xcb_req.ext = self.c_key 372 xcb_req.opcode = opcode 373 xcb_req.isvoid = issubclass(cookie, VoidCookie) 374 375 # XXX: send_request here will use the memory *before* the passed in 376 # xcb_parts pointer in some cases, so we need to allocate some for it 377 # to use, although we don't use it ourselves. 378 # 379 # http://lists.freedesktop.org/archives/xcb/2014-February/009307.html 380 xcb_parts = ffi.new("struct iovec[4]") 381 382 # Here we need this iov_base to keep this memory alive until the end of 383 # the function. 384 xcb_parts[2].iov_base = iov_base = ffi.new('char[]', data) # noqa 385 xcb_parts[2].iov_len = len(data) 386 xcb_parts[3].iov_base = ffi.NULL 387 xcb_parts[3].iov_len = -len(data) & 3 # is this really necessary? 388 389 flags = lib.XCB_REQUEST_CHECKED if is_checked else 0 390 391 seq = self.conn.send_request(flags, xcb_parts + 2, xcb_req) 392 393 return cookie(self.conn, seq, is_checked) 394 395 def __getattr__(self, name): 396 if name.endswith("Checked"): 397 real = name[:-len("Checked")] 398 is_checked = True 399 elif name.endswith("Unchecked"): 400 real = name[:-len("Unchecked")] 401 is_checked = False 402 else: 403 raise AttributeError(name) 404 405 real = getattr(self, real) 406 407 return functools.partial(real, is_checked=is_checked) 408 409 410class List(Protobj): 411 412 def __init__(self, unpacker, typ, count=None): 413 Protobj.__init__(self, unpacker) 414 415 self.list = [] 416 old = unpacker.offset 417 418 if isinstance(typ, str): 419 self.list = list(unpacker.unpack("%d%s" % (count, typ))) 420 elif count is not None: 421 for _ in range(count): 422 item = typ(unpacker) 423 self.list.append(item) 424 else: 425 assert unpacker.known_max is not None 426 while unpacker.offset < unpacker.known_max: 427 item = typ(unpacker) 428 self.list.append(item) 429 430 self.bufsize = unpacker.offset - old 431 432 self.raw = bytes(unpacker.buf[old:old + self.bufsize]) 433 434 assert count is None or count == len(self.list) 435 436 def __str__(self): 437 return str(self.list) 438 439 def __len__(self): 440 return len(self.list) 441 442 def __iter__(self): 443 return iter(self.list) 444 445 def __getitem__(self, key): 446 return self.list[key] 447 448 def __setitem__(self, key, value): 449 self.list[key] = value 450 451 def __delitem__(self, key): 452 del self.list[key] 453 454 def to_string(self): 455 """ A helper for converting a List of chars to a native string. Dies if 456 the list contents are not something that could be reasonably converted 457 to a string. """ 458 return ''.join(chr(six.byte2int(i)) for i in self) 459 460 def to_utf8(self): 461 return six.b('').join(self).decode('utf-8') 462 463 def to_atoms(self): 464 """ A helper for converting a List of chars to an array of atoms """ 465 return struct.unpack("<" + "%dI" % (len(self) // 4), b''.join(self)) 466 467 def buf(self): 468 return self.raw 469 470 @classmethod 471 def synthetic(cls, list=None): 472 if list is None: 473 list = [] 474 475 self = cls.__new__(cls) 476 self.list = list[:] 477 return self 478 479 480class OffsetMap(object): 481 482 def __init__(self, core): 483 self.offsets = [(0, core)] 484 485 def add(self, offset, things): 486 self.offsets.append((offset, things)) 487 self.offsets.sort(key=lambda x: x[0], reverse=True) 488 489 def __getitem__(self, item): 490 try: 491 offset, things = next((k, v) for k, v in self.offsets if item >= k) 492 return things[item - offset] 493 except StopIteration: 494 raise IndexError(item) 495 496 497class Connection(object): 498 499 """ `auth` here should be '<name>:<data>', a format bequeathed to us from 500 xpyb. """ 501 def __init__(self, display=None, fd=-1, auth=None): 502 if auth is not None: 503 [name, data] = auth.split(six.b(':')) 504 505 c_auth = ffi.new("xcb_auth_info_t *") 506 c_auth.name = ffi.new('char[]', name) 507 c_auth.namelen = len(name) 508 c_auth.data = ffi.new('char[]', data) 509 c_auth.datalen = len(data) 510 else: 511 c_auth = ffi.NULL 512 513 if display is None: 514 display = ffi.NULL 515 else: 516 display = display.encode('latin1') 517 518 i = ffi.new("int *") 519 520 if fd > 0: 521 self._conn = lib.xcb_connect_to_fd(fd, c_auth) 522 elif c_auth != ffi.NULL: 523 self._conn = lib.xcb_connect_to_display_with_auth_info(display, c_auth, i) 524 else: 525 self._conn = lib.xcb_connect(display, i) 526 self.pref_screen = i[0] 527 self.invalid() 528 self._init_x() 529 530 def _init_x(self): 531 if core is None: 532 raise XcffibException("No core protocol object has been set. " 533 "Did you import xcffib.xproto?") 534 535 self.core = core(self) 536 self.setup = self.get_setup() 537 538 self._event_offsets = OffsetMap(core_events) 539 self._error_offsets = OffsetMap(core_errors) 540 self._setup_extensions() 541 542 def _setup_extensions(self): 543 for key, (_, events, errors) in extensions.items(): 544 # We're explicitly not putting this as an argument to the next call 545 # as a hack for lifetime management. 546 c_ext = key.to_cffi() 547 reply = lib.xcb_get_extension_data(self._conn, c_ext) 548 self._event_offsets.add(reply.first_event, events) 549 self._error_offsets.add(reply.first_error, errors) 550 551 def __call__(self, key): 552 return extensions[key][0](self, key) 553 554 def invalid(self): 555 if self._conn is None: 556 raise XcffibException("Invalid connection.") 557 err = lib.xcb_connection_has_error(self._conn) 558 if err > 0: 559 raise ConnectionException(err) 560 561 def ensure_connected(f): 562 """ 563 Check that the connection is valid both before and 564 after the function is invoked. 565 """ 566 @functools.wraps(f) 567 def wrapper(*args): 568 self = args[0] 569 self.invalid() 570 try: 571 return f(*args) 572 finally: 573 self.invalid() 574 return wrapper 575 576 @ensure_connected 577 def get_setup(self): 578 self._setup = lib.xcb_get_setup(self._conn) 579 580 # No idea where this 8 comes from either, similar complate to the 581 # sizeof(xcb_generic_reply_t) below. 582 buf = CffiUnpacker(self._setup, known_max=8 + self._setup.length * 4) 583 584 return _setup(buf) 585 586 @ensure_connected 587 def get_screen_pointers(self): 588 """ 589 Returns the xcb_screen_t for every screen 590 useful for other bindings 591 """ 592 root_iter = lib.xcb_setup_roots_iterator(self._setup) 593 594 screens = [root_iter.data] 595 for i in range(self._setup.roots_len - 1): 596 lib.xcb_screen_next(ffi.addressof((root_iter))) 597 screens.append(root_iter.data) 598 return screens 599 600 @ensure_connected 601 def wait_for_event(self): 602 e = lib.xcb_wait_for_event(self._conn) 603 e = ffi.gc(e, lib.free) 604 self.invalid() 605 return self.hoist_event(e) 606 607 @ensure_connected 608 def poll_for_event(self): 609 e = lib.xcb_poll_for_event(self._conn) 610 self.invalid() 611 if e != ffi.NULL: 612 return self.hoist_event(e) 613 else: 614 return None 615 616 def has_error(self): 617 return lib.xcb_connection_has_error(self._conn) 618 619 @ensure_connected 620 def get_file_descriptor(self): 621 return lib.xcb_get_file_descriptor(self._conn) 622 623 @ensure_connected 624 def get_maximum_request_length(self): 625 return lib.xcb_get_maximum_request_length(self._conn) 626 627 @ensure_connected 628 def prefetch_maximum_request_length(self): 629 return lib.xcb_prefetch_maximum_request_length(self._conn) 630 631 @ensure_connected 632 def flush(self): 633 return lib.xcb_flush(self._conn) 634 635 @ensure_connected 636 def generate_id(self): 637 return lib.xcb_generate_id(self._conn) 638 639 def disconnect(self): 640 if self._conn is not None: 641 lib.xcb_disconnect(self._conn) 642 self._conn = None 643 644 def _process_error(self, c_error): 645 self.invalid() 646 if c_error != ffi.NULL: 647 error = self._error_offsets[c_error.error_code] 648 buf = CffiUnpacker(c_error) 649 raise error(buf) 650 651 @ensure_connected 652 def wait_for_reply(self, sequence): 653 error_p = ffi.new("xcb_generic_error_t **") 654 data = lib.xcb_wait_for_reply(self._conn, sequence, error_p) 655 data = ffi.gc(data, lib.free) 656 657 try: 658 self._process_error(error_p[0]) 659 finally: 660 if error_p[0] != ffi.NULL: 661 lib.free(error_p[0]) 662 663 if data == ffi.NULL: 664 # No data and no error => bad sequence number 665 raise XcffibException("Bad sequence number %d" % sequence) 666 667 reply = ffi.cast("xcb_generic_reply_t *", data) 668 669 # this is 32 and not `sizeof(xcb_generic_reply_t) == 8` because, 670 # according to the X11 protocol specs: "Every reply consists of 32 bytes 671 # followed by zero or more additional bytes of data, as specified in the 672 # length field." 673 return CffiUnpacker(data, known_max=32 + reply.length * 4) 674 675 @ensure_connected 676 def request_check(self, sequence): 677 cookie = ffi.new("xcb_void_cookie_t [1]") 678 cookie[0].sequence = sequence 679 680 err = lib.xcb_request_check(self._conn, cookie[0]) 681 self._process_error(err) 682 683 def hoist_event(self, e): 684 """ Hoist an xcb_generic_event_t to the right xcffib structure. """ 685 if e.response_type == 0: 686 return self._process_error(ffi.cast("xcb_generic_error_t *", e)) 687 688 # We mask off the high bit here because events sent with SendEvent have 689 # this bit set. We don't actually care where the event came from, so we 690 # just throw this away. Maybe we could expose this, if anyone actually 691 # cares about it. 692 event = self._event_offsets[e.response_type & 0x7f] 693 694 buf = CffiUnpacker(e) 695 return event(buf) 696 697 @ensure_connected 698 def send_request(self, flags, xcb_parts, xcb_req): 699 return lib.xcb_send_request(self._conn, flags, xcb_parts, xcb_req) 700 701 @ensure_connected 702 def discard_reply(self, sequence): 703 return lib.xcb_discard_reply(self._conn, sequence) 704 705 706# More backwards compatibility 707connect = Connection 708 709 710class Response(Protobj): 711 712 def __init__(self, unpacker): 713 Protobj.__init__(self, unpacker) 714 715 # These (and the ones in Reply) aren't used internally and I suspect 716 # they're not used by anyone else, but they're here for xpyb 717 # compatibility. 718 # 719 # In some cases (e.g. creating synthetic events from memory), we don't 720 # have the sequence number (since the event was fake), so only try to 721 # get these attributes if we are really using a cffi buffer. 722 if isinstance(unpacker, CffiUnpacker): 723 resp = unpacker.cast("xcb_generic_event_t *") 724 self.response_type = resp.response_type 725 self.sequence = resp.sequence 726 else: 727 self.response_type = None 728 self.sequence = None 729 730 731class Reply(Response): 732 733 def __init__(self, unpacker): 734 Response.__init__(self, unpacker) 735 736 # also for compat 737 resp = unpacker.cast("xcb_generic_reply_t *") 738 self.length = resp.length 739 740 741class Event(Response): 742 pass 743 744 745class Error(Response, XcffibException): 746 747 def __init__(self, unpacker): 748 Response.__init__(self, unpacker) 749 XcffibException.__init__(self) 750 self.code = unpacker.unpack('B', increment=False) 751 752 753def pack_list(from_, pack_type): 754 """ Return the wire packed version of `from_`. `pack_type` should be some 755 subclass of `xcffib.Struct`, or a string that can be passed to 756 `struct.pack`. You must pass `size` if `pack_type` is a struct.pack string. 757 """ 758 # We need from_ to not be empty 759 if len(from_) == 0: 760 return bytes() 761 762 if pack_type == 'c': 763 if isinstance(from_, bytes): 764 # Catch Python 3 bytes and Python 2 strings 765 # PY3 is "helpful" in that when you do tuple(b'foo') you get 766 # (102, 111, 111) instead of something more reasonable like 767 # (b'f', b'o', b'o'), so we rebuild from_ as a tuple of bytes 768 from_ = [six.int2byte(b) for b in six.iterbytes(from_)] 769 elif isinstance(from_, six.string_types): 770 # Catch Python 3 strings and Python 2 unicode strings, both of 771 # which we encode to bytes as utf-8 772 # Here we create the tuple of bytes from the encoded string 773 from_ = [six.int2byte(b) for b in bytearray(from_, 'utf-8')] 774 elif isinstance(from_[0], six.integer_types): 775 # Pack from_ as char array, where from_ may be an array of ints 776 # possibly greater than 256 777 def to_bytes(v): 778 for _ in range(4): 779 v, r = divmod(v, 256) 780 yield r 781 from_ = [six.int2byte(b) for i in from_ for b in to_bytes(i)] 782 783 if isinstance(pack_type, six.string_types): 784 return struct.pack("=%d%s" % (len(from_), pack_type), *from_) 785 else: 786 buf = six.BytesIO() 787 for item in from_: 788 # If we can't pack it, you'd better have packed it yourself. But 789 # let's not confuse things which aren't our Probobjs for packable 790 # things. 791 if isinstance(item, Protobj) and hasattr(item, "pack"): 792 buf.write(item.pack()) 793 else: 794 buf.write(item) 795 return buf.getvalue() 796 797 798def wrap(ptr): 799 c_conn = ffi.cast('xcb_connection_t *', ptr) 800 conn = Connection.__new__(Connection) 801 conn._conn = c_conn 802 conn._init_x() 803 conn.invalid() 804 805 # ptr owns the memory for c_conn, even after the cast 806 # we should keep it alive 807 cffi_explicit_lifetimes[conn] = ptr 808 809 return conn 810