1"""0MQ Message related classes.""" 2 3# 4# Copyright (c) 2013 Brian E. Granger & Min Ragan-Kelley 5# 6# This file is part of pyzmq. 7# 8# pyzmq is free software; you can redistribute it and/or modify it under 9# the terms of the Lesser GNU General Public License as published by 10# the Free Software Foundation; either version 3 of the License, or 11# (at your option) any later version. 12# 13# pyzmq is distributed in the hope that it will be useful, 14# but WITHOUT ANY WARRANTY; without even the implied warranty of 15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16# Lesser GNU General Public License for more details. 17# 18# You should have received a copy of the Lesser GNU General Public License 19# along with this program. If not, see <http://www.gnu.org/licenses/>. 20# 21 22#----------------------------------------------------------------------------- 23# Imports 24#----------------------------------------------------------------------------- 25 26# get version-independent aliases: 27cdef extern from "pyversion_compat.h": 28 pass 29 30 31from cpython cimport Py_DECREF, Py_INCREF 32 33from zmq.utils.buffers cimport asbuffer_r 34 35cdef extern from "Python.h": 36 ctypedef int Py_ssize_t 37 38cdef extern from "mutex.h" nogil: 39 ctypedef struct mutex_t: 40 pass 41 cdef mutex_t* mutex_allocate() 42 cdef void mutex_dallocate(mutex_t*) 43 cdef int mutex_lock(mutex_t*) 44 cdef int mutex_unlock(mutex_t*) 45 46from .libzmq cimport * 47 48from libc.stdio cimport fprintf, stderr as cstderr 49from libc.stdlib cimport malloc, free 50from libc.string cimport memcpy 51 52import time 53from weakref import ref 54 55try: 56 # below 3.3 57 from threading import _Event as Event 58except (ImportError, AttributeError): 59 # python throws ImportError, cython throws AttributeError 60 from threading import Event 61 62import zmq 63from zmq.error import _check_version 64from .checkrc cimport _check_rc 65 66#----------------------------------------------------------------------------- 67# Code 68#----------------------------------------------------------------------------- 69 70ctypedef struct zhint: 71 void *sock 72 mutex_t *mutex 73 size_t id 74 75cdef void free_python_msg(void *data, void *vhint) nogil: 76 """A pure-C function for DECREF'ing Python-owned message data. 77 78 Sends a message on a PUSH socket 79 80 The hint is a `zhint` struct with two values: 81 82 sock (void *): pointer to the Garbage Collector's PUSH socket 83 id (size_t): the id to be used to construct a zmq_msg_t that should be sent on a PUSH socket, 84 signaling the Garbage Collector to remove its reference to the object. 85 86 When the Garbage Collector's PULL socket receives the message, 87 it deletes its reference to the object, 88 allowing Python to free the memory. 89 """ 90 cdef zmq_msg_t msg 91 cdef zhint *hint = <zhint *> vhint 92 cdef int rc 93 94 if hint != NULL: 95 zmq_msg_init_size(&msg, sizeof(size_t)) 96 memcpy(zmq_msg_data(&msg), &hint.id, sizeof(size_t)) 97 rc = mutex_lock(hint.mutex) 98 if rc != 0: 99 fprintf(cstderr, "pyzmq-gc mutex lock failed rc=%d\n", rc) 100 rc = zmq_msg_send(&msg, hint.sock, 0) 101 if rc < 0: 102 # gc socket could have been closed, e.g. during process teardown. 103 # If so, ignore the failure because there's nothing to do. 104 if zmq_errno() != ZMQ_ENOTSOCK: 105 fprintf(cstderr, "pyzmq-gc send failed: %s\n", zmq_strerror(zmq_errno())) 106 rc = mutex_unlock(hint.mutex) 107 if rc != 0: 108 fprintf(cstderr, "pyzmq-gc mutex unlock failed rc=%d\n", rc) 109 110 zmq_msg_close(&msg) 111 free(hint) 112 113 114gc = None 115 116cdef class Frame: 117 def __cinit__(self, object data=None, track=False, copy=None, copy_threshold=None, **kwargs): 118 cdef int rc 119 cdef char *data_c = NULL 120 cdef Py_ssize_t data_len_c=0 121 cdef zhint *hint 122 if copy_threshold is None: 123 copy_threshold = zmq.COPY_THRESHOLD 124 125 # init more as False 126 self.more = False 127 128 # Save the data object in case the user wants the the data as a str. 129 self._data = data 130 self._failed_init = True # bool switch for dealloc 131 self._buffer = None # buffer view of data 132 self._bytes = None # bytes copy of data 133 134 self.tracker_event = None 135 self.tracker = None 136 # self.tracker should start finished 137 # except in the case where we are sharing memory with libzmq 138 if track: 139 self.tracker = zmq._FINISHED_TRACKER 140 141 if isinstance(data, unicode): 142 raise TypeError("Unicode objects not allowed. Only: str/bytes, buffer interfaces.") 143 144 if data is None: 145 rc = zmq_msg_init(&self.zmq_msg) 146 _check_rc(rc) 147 self._failed_init = False 148 return 149 150 asbuffer_r(data, <void **>&data_c, &data_len_c) 151 152 # copy unspecified, apply copy_threshold 153 if copy is None: 154 if copy_threshold and data_len_c < copy_threshold: 155 copy = True 156 else: 157 copy = False 158 159 if copy: 160 # copy message data instead of sharing memory 161 rc = zmq_msg_init_size(&self.zmq_msg, data_len_c) 162 _check_rc(rc) 163 memcpy(zmq_msg_data(&self.zmq_msg), data_c, data_len_c) 164 self._failed_init = False 165 return 166 167 # Getting here means that we are doing a true zero-copy Frame, 168 # where libzmq and Python are sharing memory. 169 # Hook up garbage collection with MessageTracker and zmq_free_fn 170 171 # Event and MessageTracker for monitoring when zmq is done with data: 172 if track: 173 evt = Event() 174 self.tracker_event = evt 175 self.tracker = zmq.MessageTracker(evt) 176 # create the hint for zmq_free_fn 177 # two pointers: the gc context and a message to be sent to the gc PULL socket 178 # allows libzmq to signal to Python when it is done with Python-owned memory. 179 global gc 180 if gc is None: 181 from zmq.utils.garbage import gc 182 183 hint = <zhint *> malloc(sizeof(zhint)) 184 hint.id = gc.store(data, self.tracker_event) 185 if not gc._push_mutex: 186 hint.mutex = mutex_allocate() 187 gc._push_mutex = <size_t> hint.mutex 188 else: 189 hint.mutex = <mutex_t *> <size_t> gc._push_mutex 190 hint.sock = <void *> <size_t> gc._push_socket.underlying 191 192 rc = zmq_msg_init_data( 193 &self.zmq_msg, <void *>data_c, data_len_c, 194 <zmq_free_fn *>free_python_msg, <void *>hint 195 ) 196 if rc != 0: 197 free(hint) 198 _check_rc(rc) 199 self._failed_init = False 200 201 def __init__(self, object data=None, track=False, copy=False, copy_threshold=None): 202 """Enforce signature""" 203 pass 204 205 def __dealloc__(self): 206 cdef int rc 207 if self._failed_init: 208 return 209 # This simply decreases the 0MQ ref-count of zmq_msg. 210 with nogil: 211 rc = zmq_msg_close(&self.zmq_msg) 212 _check_rc(rc) 213 214 # buffer interface code adapted from petsc4py by Lisandro Dalcin, a BSD project 215 216 def __getbuffer__(self, Py_buffer* buffer, int flags): 217 # new-style (memoryview) buffer interface 218 buffer.buf = zmq_msg_data(&self.zmq_msg) 219 buffer.len = zmq_msg_size(&self.zmq_msg) 220 221 buffer.obj = self 222 buffer.readonly = 0 223 buffer.format = "B" 224 buffer.ndim = 1 225 buffer.shape = &(buffer.len) 226 buffer.strides = NULL 227 buffer.suboffsets = NULL 228 buffer.itemsize = 1 229 buffer.internal = NULL 230 231 def __getsegcount__(self, Py_ssize_t *lenp): 232 # required for getreadbuffer 233 if lenp != NULL: 234 lenp[0] = zmq_msg_size(&self.zmq_msg) 235 return 1 236 237 def __getreadbuffer__(self, Py_ssize_t idx, void **p): 238 # old-style (buffer) interface 239 cdef char *data_c = NULL 240 cdef Py_ssize_t data_len_c 241 if idx != 0: 242 raise SystemError("accessing non-existent buffer segment") 243 # read-only, because we don't want to allow 244 # editing of the message in-place 245 data_c = <char *>zmq_msg_data(&self.zmq_msg) 246 data_len_c = zmq_msg_size(&self.zmq_msg) 247 if p != NULL: 248 p[0] = <void*>data_c 249 return data_len_c 250 251 # end buffer interface 252 253 def __copy__(self): 254 """Create a shallow copy of the message. 255 256 This does not copy the contents of the Frame, just the pointer. 257 This will increment the 0MQ ref count of the message, but not 258 the ref count of the Python object. That is only done once when 259 the Python is first turned into a 0MQ message. 260 """ 261 return self.fast_copy() 262 263 cdef Frame fast_copy(self): 264 """Fast, cdef'd version of shallow copy of the Frame.""" 265 cdef Frame new_msg 266 new_msg = Frame() 267 # This does not copy the contents, but just increases the ref-count 268 # of the zmq_msg by one. 269 zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg) 270 # Copy the ref to data so the copy won't create a copy when str is 271 # called. 272 if self._data is not None: 273 new_msg._data = self._data 274 if self._buffer is not None: 275 new_msg._buffer = self._buffer 276 if self._bytes is not None: 277 new_msg._bytes = self._bytes 278 279 # Frame copies share the tracker and tracker_event 280 new_msg.tracker_event = self.tracker_event 281 new_msg.tracker = self.tracker 282 283 return new_msg 284 285 def __len__(self): 286 """Return the length of the message in bytes.""" 287 cdef size_t sz 288 sz = zmq_msg_size(&self.zmq_msg) 289 return sz 290 # return <int>zmq_msg_size(&self.zmq_msg) 291 292 def __str__(self): 293 """Return the str form of the message.""" 294 if isinstance(self._data, bytes): 295 b = self._data 296 else: 297 b = self.bytes 298 if str is unicode: 299 return b.decode() 300 else: 301 return b 302 303 cdef inline object _getbuffer(self): 304 """Deprecated alias for memoryview(self)""" 305 return memoryview(self) 306 307 @property 308 def buffer(self): 309 """A memoryview of the message contents.""" 310 _buffer = self._buffer and self._buffer() 311 if _buffer is not None: 312 return _buffer 313 _buffer = memoryview(self) 314 self._buffer = ref(_buffer) 315 return _buffer 316 317 @property 318 def bytes(self): 319 """The message content as a Python bytes object. 320 321 The first time this property is accessed, a copy of the message 322 contents is made. From then on that same copy of the message is 323 returned. 324 """ 325 if self._bytes is None: 326 self._bytes = copy_zmq_msg_bytes(&self.zmq_msg) 327 return self._bytes 328 329 def set(self, option, value): 330 """Frame.set(option, value) 331 332 Set a Frame option. 333 334 See the 0MQ API documentation for zmq_msg_set 335 for details on specific options. 336 337 .. versionadded:: libzmq-3.2 338 .. versionadded:: 13.0 339 .. versionchanged:: 17.0 340 Added support for `routing_id` and `group`. 341 Only available if draft API is enabled 342 with libzmq >= 4.2. 343 """ 344 cdef int rc 345 cdef uint32_t routing_id 346 347 if option == 'routing_id': 348 routing_id = value 349 rc = zmq_msg_set_routing_id(&self.zmq_msg, routing_id) 350 _check_rc(rc) 351 return 352 elif option == 'group': 353 if isinstance(value, unicode): 354 value = value.encode('utf8') 355 rc = zmq_msg_set_group(&self.zmq_msg, value) 356 _check_rc(rc) 357 return 358 359 rc = zmq_msg_set(&self.zmq_msg, option, value) 360 _check_rc(rc) 361 362 def get(self, option): 363 """Frame.get(option) 364 365 Get a Frame option or property. 366 367 See the 0MQ API documentation for zmq_msg_get and zmq_msg_gets 368 for details on specific options. 369 370 .. versionadded:: libzmq-3.2 371 .. versionadded:: 13.0 372 373 .. versionchanged:: 14.3 374 add support for zmq_msg_gets (requires libzmq-4.1) 375 All message properties are strings. 376 377 .. versionchanged:: 17.0 378 Added support for `routing_id` and `group`. 379 Only available if draft API is enabled 380 with libzmq >= 4.2. 381 """ 382 cdef int rc = 0 383 cdef char *property_c = NULL 384 cdef Py_ssize_t property_len_c = 0 385 cdef uint32_t routing_id 386 387 # zmq_msg_get 388 if isinstance(option, int): 389 rc = zmq_msg_get(&self.zmq_msg, option) 390 _check_rc(rc) 391 return rc 392 393 if option == 'routing_id': 394 routing_id = zmq_msg_routing_id(&self.zmq_msg) 395 if (routing_id == 0): 396 _check_rc(-1) 397 return routing_id 398 elif option == 'group': 399 buf = zmq_msg_group(&self.zmq_msg) 400 if buf == NULL: 401 _check_rc(-1) 402 return buf.decode('utf8') 403 404 # zmq_msg_gets 405 _check_version((4,1), "get string properties") 406 if isinstance(option, unicode): 407 option = option.encode('utf8') 408 409 if not isinstance(option, bytes): 410 raise TypeError("expected str, got: %r" % option) 411 412 property_c = option 413 414 cdef const char *result = <char *>zmq_msg_gets(&self.zmq_msg, property_c) 415 if result == NULL: 416 _check_rc(-1) 417 return result.decode('utf8') 418 419# legacy Message name 420Message = Frame 421 422__all__ = ['Frame', 'Message'] 423