1"""0MQ Message related classes."""
2
3#
4#    Copyright (c) 2013 Brian E. Granger & Min Ragan-Kelley
5#
6#    This file is part of pyzmq.
7#
8#    pyzmq is free software; you can redistribute it and/or modify it under
9#    the terms of the Lesser GNU General Public License as published by
10#    the Free Software Foundation; either version 3 of the License, or
11#    (at your option) any later version.
12#
13#    pyzmq is distributed in the hope that it will be useful,
14#    but WITHOUT ANY WARRANTY; without even the implied warranty of
15#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16#    Lesser GNU General Public License for more details.
17#
18#    You should have received a copy of the Lesser GNU General Public License
19#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20#
21
22#-----------------------------------------------------------------------------
23# Imports
24#-----------------------------------------------------------------------------
25
26# get version-independent aliases:
27cdef extern from "pyversion_compat.h":
28    pass
29
30
31from cpython cimport Py_DECREF, Py_INCREF
32
33from zmq.utils.buffers cimport asbuffer_r
34
35cdef extern from "Python.h":
36    ctypedef int Py_ssize_t
37
38cdef extern from "mutex.h" nogil:
39    ctypedef struct mutex_t:
40        pass
41    cdef mutex_t* mutex_allocate()
42    cdef void mutex_dallocate(mutex_t*)
43    cdef int mutex_lock(mutex_t*)
44    cdef int mutex_unlock(mutex_t*)
45
46from .libzmq cimport *
47
48from libc.stdio cimport fprintf, stderr as cstderr
49from libc.stdlib cimport malloc, free
50from libc.string cimport memcpy
51
52import time
53from weakref import ref
54
55try:
56    # below 3.3
57    from threading import _Event as Event
58except (ImportError, AttributeError):
59    # python throws ImportError, cython throws AttributeError
60    from threading import Event
61
62import zmq
63from zmq.error import _check_version
64from .checkrc cimport _check_rc
65
66#-----------------------------------------------------------------------------
67# Code
68#-----------------------------------------------------------------------------
69
70ctypedef struct zhint:
71    void *sock
72    mutex_t *mutex
73    size_t id
74
75cdef void free_python_msg(void *data, void *vhint) nogil:
76    """A pure-C function for DECREF'ing Python-owned message data.
77
78    Sends a message on a PUSH socket
79
80    The hint is a `zhint` struct with two values:
81
82    sock (void *): pointer to the Garbage Collector's PUSH socket
83    id (size_t): the id to be used to construct a zmq_msg_t that should be sent on a PUSH socket,
84       signaling the Garbage Collector to remove its reference to the object.
85
86    When the Garbage Collector's PULL socket receives the message,
87    it deletes its reference to the object,
88    allowing Python to free the memory.
89    """
90    cdef zmq_msg_t msg
91    cdef zhint *hint = <zhint *> vhint
92    cdef int rc
93
94    if hint != NULL:
95        zmq_msg_init_size(&msg, sizeof(size_t))
96        memcpy(zmq_msg_data(&msg), &hint.id, sizeof(size_t))
97        rc = mutex_lock(hint.mutex)
98        if rc != 0:
99            fprintf(cstderr, "pyzmq-gc mutex lock failed rc=%d\n", rc)
100        rc = zmq_msg_send(&msg, hint.sock, 0)
101        if rc < 0:
102            # gc socket could have been closed, e.g. during process teardown.
103            # If so, ignore the failure because there's nothing to do.
104            if zmq_errno() != ZMQ_ENOTSOCK:
105                fprintf(cstderr, "pyzmq-gc send failed: %s\n", zmq_strerror(zmq_errno()))
106        rc = mutex_unlock(hint.mutex)
107        if rc != 0:
108            fprintf(cstderr, "pyzmq-gc mutex unlock failed rc=%d\n", rc)
109
110        zmq_msg_close(&msg)
111        free(hint)
112
113
114gc = None
115
116cdef class Frame:
117    def __cinit__(self, object data=None, track=False, copy=None, copy_threshold=None, **kwargs):
118        cdef int rc
119        cdef char *data_c = NULL
120        cdef Py_ssize_t data_len_c=0
121        cdef zhint *hint
122        if copy_threshold is None:
123            copy_threshold = zmq.COPY_THRESHOLD
124
125        # init more as False
126        self.more = False
127
128        # Save the data object in case the user wants the the data as a str.
129        self._data = data
130        self._failed_init = True  # bool switch for dealloc
131        self._buffer = None       # buffer view of data
132        self._bytes = None        # bytes copy of data
133
134        self.tracker_event = None
135        self.tracker = None
136        # self.tracker should start finished
137        # except in the case where we are sharing memory with libzmq
138        if track:
139            self.tracker = zmq._FINISHED_TRACKER
140
141        if isinstance(data, unicode):
142            raise TypeError("Unicode objects not allowed. Only: str/bytes, buffer interfaces.")
143
144        if data is None:
145            rc = zmq_msg_init(&self.zmq_msg)
146            _check_rc(rc)
147            self._failed_init = False
148            return
149
150        asbuffer_r(data, <void **>&data_c, &data_len_c)
151
152        # copy unspecified, apply copy_threshold
153        if copy is None:
154            if copy_threshold and data_len_c < copy_threshold:
155                copy = True
156            else:
157                copy = False
158
159        if copy:
160            # copy message data instead of sharing memory
161            rc = zmq_msg_init_size(&self.zmq_msg, data_len_c)
162            _check_rc(rc)
163            memcpy(zmq_msg_data(&self.zmq_msg), data_c, data_len_c)
164            self._failed_init = False
165            return
166
167        # Getting here means that we are doing a true zero-copy Frame,
168        # where libzmq and Python are sharing memory.
169        # Hook up garbage collection with MessageTracker and zmq_free_fn
170
171        # Event and MessageTracker for monitoring when zmq is done with data:
172        if track:
173            evt = Event()
174            self.tracker_event = evt
175            self.tracker = zmq.MessageTracker(evt)
176        # create the hint for zmq_free_fn
177        # two pointers: the gc context and a message to be sent to the gc PULL socket
178        # allows libzmq to signal to Python when it is done with Python-owned memory.
179        global gc
180        if gc is None:
181            from zmq.utils.garbage import gc
182
183        hint = <zhint *> malloc(sizeof(zhint))
184        hint.id = gc.store(data, self.tracker_event)
185        if not gc._push_mutex:
186            hint.mutex = mutex_allocate()
187            gc._push_mutex = <size_t> hint.mutex
188        else:
189            hint.mutex = <mutex_t *> <size_t> gc._push_mutex
190        hint.sock = <void *> <size_t> gc._push_socket.underlying
191
192        rc = zmq_msg_init_data(
193                &self.zmq_msg, <void *>data_c, data_len_c,
194                <zmq_free_fn *>free_python_msg, <void *>hint
195            )
196        if rc != 0:
197            free(hint)
198            _check_rc(rc)
199        self._failed_init = False
200
201    def __init__(self, object data=None, track=False, copy=False, copy_threshold=None):
202        """Enforce signature"""
203        pass
204
205    def __dealloc__(self):
206        cdef int rc
207        if self._failed_init:
208            return
209        # This simply decreases the 0MQ ref-count of zmq_msg.
210        with nogil:
211            rc = zmq_msg_close(&self.zmq_msg)
212        _check_rc(rc)
213
214    # buffer interface code adapted from petsc4py by Lisandro Dalcin, a BSD project
215
216    def __getbuffer__(self, Py_buffer* buffer, int flags):
217        # new-style (memoryview) buffer interface
218        buffer.buf = zmq_msg_data(&self.zmq_msg)
219        buffer.len = zmq_msg_size(&self.zmq_msg)
220
221        buffer.obj = self
222        buffer.readonly = 0
223        buffer.format = "B"
224        buffer.ndim = 1
225        buffer.shape = &(buffer.len)
226        buffer.strides = NULL
227        buffer.suboffsets = NULL
228        buffer.itemsize = 1
229        buffer.internal = NULL
230
231    def __getsegcount__(self, Py_ssize_t *lenp):
232        # required for getreadbuffer
233        if lenp != NULL:
234            lenp[0] = zmq_msg_size(&self.zmq_msg)
235        return 1
236
237    def __getreadbuffer__(self, Py_ssize_t idx, void **p):
238        # old-style (buffer) interface
239        cdef char *data_c = NULL
240        cdef Py_ssize_t data_len_c
241        if idx != 0:
242            raise SystemError("accessing non-existent buffer segment")
243        # read-only, because we don't want to allow
244        # editing of the message in-place
245        data_c = <char *>zmq_msg_data(&self.zmq_msg)
246        data_len_c = zmq_msg_size(&self.zmq_msg)
247        if p != NULL:
248            p[0] = <void*>data_c
249        return data_len_c
250
251    # end buffer interface
252
253    def __copy__(self):
254        """Create a shallow copy of the message.
255
256        This does not copy the contents of the Frame, just the pointer.
257        This will increment the 0MQ ref count of the message, but not
258        the ref count of the Python object. That is only done once when
259        the Python is first turned into a 0MQ message.
260        """
261        return self.fast_copy()
262
263    cdef Frame fast_copy(self):
264        """Fast, cdef'd version of shallow copy of the Frame."""
265        cdef Frame new_msg
266        new_msg = Frame()
267        # This does not copy the contents, but just increases the ref-count
268        # of the zmq_msg by one.
269        zmq_msg_copy(&new_msg.zmq_msg, &self.zmq_msg)
270        # Copy the ref to data so the copy won't create a copy when str is
271        # called.
272        if self._data is not None:
273            new_msg._data = self._data
274        if self._buffer is not None:
275            new_msg._buffer = self._buffer
276        if self._bytes is not None:
277            new_msg._bytes = self._bytes
278
279        # Frame copies share the tracker and tracker_event
280        new_msg.tracker_event = self.tracker_event
281        new_msg.tracker = self.tracker
282
283        return new_msg
284
285    def __len__(self):
286        """Return the length of the message in bytes."""
287        cdef size_t sz
288        sz = zmq_msg_size(&self.zmq_msg)
289        return sz
290        # return <int>zmq_msg_size(&self.zmq_msg)
291
292    def __str__(self):
293        """Return the str form of the message."""
294        if isinstance(self._data, bytes):
295            b = self._data
296        else:
297            b = self.bytes
298        if str is unicode:
299            return b.decode()
300        else:
301            return b
302
303    cdef inline object _getbuffer(self):
304        """Deprecated alias for memoryview(self)"""
305        return memoryview(self)
306
307    @property
308    def buffer(self):
309        """A memoryview of the message contents."""
310        _buffer = self._buffer and self._buffer()
311        if _buffer is not None:
312            return _buffer
313        _buffer = memoryview(self)
314        self._buffer = ref(_buffer)
315        return _buffer
316
317    @property
318    def bytes(self):
319        """The message content as a Python bytes object.
320
321        The first time this property is accessed, a copy of the message
322        contents is made. From then on that same copy of the message is
323        returned.
324        """
325        if self._bytes is None:
326            self._bytes = copy_zmq_msg_bytes(&self.zmq_msg)
327        return self._bytes
328
329    def set(self, option, value):
330        """Frame.set(option, value)
331
332        Set a Frame option.
333
334        See the 0MQ API documentation for zmq_msg_set
335        for details on specific options.
336
337        .. versionadded:: libzmq-3.2
338        .. versionadded:: 13.0
339        .. versionchanged:: 17.0
340            Added support for `routing_id` and `group`.
341            Only available if draft API is enabled
342            with libzmq >= 4.2.
343        """
344        cdef int rc
345        cdef uint32_t routing_id
346
347        if option == 'routing_id':
348            routing_id = value
349            rc = zmq_msg_set_routing_id(&self.zmq_msg, routing_id)
350            _check_rc(rc)
351            return
352        elif option == 'group':
353            if isinstance(value, unicode):
354                value = value.encode('utf8')
355            rc = zmq_msg_set_group(&self.zmq_msg, value)
356            _check_rc(rc)
357            return
358
359        rc = zmq_msg_set(&self.zmq_msg, option, value)
360        _check_rc(rc)
361
362    def get(self, option):
363        """Frame.get(option)
364
365        Get a Frame option or property.
366
367        See the 0MQ API documentation for zmq_msg_get and zmq_msg_gets
368        for details on specific options.
369
370        .. versionadded:: libzmq-3.2
371        .. versionadded:: 13.0
372
373        .. versionchanged:: 14.3
374            add support for zmq_msg_gets (requires libzmq-4.1)
375            All message properties are strings.
376
377        .. versionchanged:: 17.0
378            Added support for `routing_id` and `group`.
379            Only available if draft API is enabled
380            with libzmq >= 4.2.
381        """
382        cdef int rc = 0
383        cdef char *property_c = NULL
384        cdef Py_ssize_t property_len_c = 0
385        cdef uint32_t routing_id
386
387        # zmq_msg_get
388        if isinstance(option, int):
389            rc = zmq_msg_get(&self.zmq_msg, option)
390            _check_rc(rc)
391            return rc
392
393        if option == 'routing_id':
394            routing_id = zmq_msg_routing_id(&self.zmq_msg)
395            if (routing_id == 0):
396                _check_rc(-1)
397            return routing_id
398        elif option == 'group':
399            buf = zmq_msg_group(&self.zmq_msg)
400            if buf == NULL:
401                _check_rc(-1)
402            return buf.decode('utf8')
403
404        # zmq_msg_gets
405        _check_version((4,1), "get string properties")
406        if isinstance(option, unicode):
407            option = option.encode('utf8')
408
409        if not isinstance(option, bytes):
410            raise TypeError("expected str, got: %r" % option)
411
412        property_c = option
413
414        cdef const char *result = <char *>zmq_msg_gets(&self.zmq_msg, property_c)
415        if result == NULL:
416            _check_rc(-1)
417        return result.decode('utf8')
418
419# legacy Message name
420Message = Frame
421
422__all__ = ['Frame', 'Message']
423