1 /* 2 Copyright (c) 2007-2011 iMatix Corporation 3 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 4 5 This file is part of 0MQ. 6 7 0MQ is free software; you can redistribute it and/or modify it under 8 the terms of the GNU Lesser General Public License as published by 9 the Free Software Foundation; either version 3 of the License, or 10 (at your option) any later version. 11 12 0MQ is distributed in the hope that it will be useful, 13 but WITHOUT ANY WARRANTY; without even the implied warranty of 14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 GNU Lesser General Public License for more details. 16 17 You should have received a copy of the GNU Lesser General Public License 18 along with this program. If not, see <http://www.gnu.org/licenses/>. 19 */ 20 21 #ifndef __ZMQ_HPP_INCLUDED__ 22 #define __ZMQ_HPP_INCLUDED__ 23 24 #include "zmq.h" 25 26 #include <algorithm> 27 #include <cassert> 28 #include <cstring> 29 #include <exception> 30 31 // Detect whether the compiler supports C++11 rvalue references. 32 #if (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && defined(__GXX_EXPERIMENTAL_CXX0X__)) 33 # define ZMQ_HAS_RVALUE_REFS 34 #endif 35 #if (defined(__clang__)) 36 # if __has_feature(cxx_rvalue_references) 37 # define ZMQ_HAS_RVALUE_REFS 38 # endif 39 #endif 40 #if (defined(_MSC_VER) && (_MSC_VER >= 1600)) 41 # define ZMQ_HAS_RVALUE_REFS 42 #endif 43 44 // In order to prevent unused variable warnings when building in non-debug 45 // mode use this macro to make assertions. 46 #ifndef NDEBUG 47 # define ZMQ_ASSERT(expression) assert(expression) 48 #else 49 # define ZMQ_ASSERT(expression) (expression) 50 #endif 51 52 namespace zmq 53 { 54 55 typedef zmq_free_fn free_fn; 56 typedef zmq_pollitem_t pollitem_t; 57 58 class error_t : public std::exception 59 { 60 public: 61 error_t()62 error_t () : errnum (zmq_errno ()) {} 63 what() const64 virtual const char *what () const throw () 65 { 66 return zmq_strerror (errnum); 67 } 68 num() const69 int num () const 70 { 71 return errnum; 72 } 73 74 private: 75 76 int errnum; 77 }; 78 poll(zmq_pollitem_t * items_,int nitems_,long timeout_=-1)79 inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1) 80 { 81 int rc = zmq_poll (items_, nitems_, timeout_); 82 if (rc < 0) 83 throw error_t (); 84 return rc; 85 } 86 device(int device_,void * insocket_,void * outsocket_)87 inline void device (int device_, void * insocket_, void* outsocket_) 88 { 89 int rc = zmq_device (device_, insocket_, outsocket_); 90 if (rc != 0) 91 throw error_t (); 92 } 93 version(int * major_,int * minor_,int * patch_)94 inline void version (int *major_, int *minor_, int *patch_) 95 { 96 zmq_version (major_, minor_, patch_); 97 } 98 99 class message_t : private zmq_msg_t 100 { 101 friend class socket_t; 102 103 public: 104 message_t()105 inline message_t () 106 { 107 int rc = zmq_msg_init (this); 108 if (rc != 0) 109 throw error_t (); 110 } 111 message_t(size_t size_)112 inline message_t (size_t size_) 113 { 114 int rc = zmq_msg_init_size (this, size_); 115 if (rc != 0) 116 throw error_t (); 117 } 118 message_t(void * data_,size_t size_,free_fn * ffn_,void * hint_=NULL)119 inline message_t (void *data_, size_t size_, free_fn *ffn_, 120 void *hint_ = NULL) 121 { 122 int rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_); 123 if (rc != 0) 124 throw error_t (); 125 } 126 ~message_t()127 inline ~message_t () 128 { 129 int rc = zmq_msg_close (this); 130 ZMQ_ASSERT (rc == 0); 131 } 132 rebuild()133 inline void rebuild () 134 { 135 int rc = zmq_msg_close (this); 136 if (rc != 0) 137 throw error_t (); 138 rc = zmq_msg_init (this); 139 if (rc != 0) 140 throw error_t (); 141 } 142 rebuild(size_t size_)143 inline void rebuild (size_t size_) 144 { 145 int rc = zmq_msg_close (this); 146 if (rc != 0) 147 throw error_t (); 148 rc = zmq_msg_init_size (this, size_); 149 if (rc != 0) 150 throw error_t (); 151 } 152 rebuild(void * data_,size_t size_,free_fn * ffn_,void * hint_=NULL)153 inline void rebuild (void *data_, size_t size_, free_fn *ffn_, 154 void *hint_ = NULL) 155 { 156 int rc = zmq_msg_close (this); 157 if (rc != 0) 158 throw error_t (); 159 rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_); 160 if (rc != 0) 161 throw error_t (); 162 } 163 move(message_t * msg_)164 inline void move (message_t *msg_) 165 { 166 int rc = zmq_msg_move (this, (zmq_msg_t*) msg_); 167 if (rc != 0) 168 throw error_t (); 169 } 170 copy(message_t * msg_)171 inline void copy (message_t *msg_) 172 { 173 int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_); 174 if (rc != 0) 175 throw error_t (); 176 } 177 data()178 inline void *data () 179 { 180 return zmq_msg_data (this); 181 } 182 size()183 inline size_t size () 184 { 185 return zmq_msg_size (this); 186 } 187 188 private: 189 190 // Disable implicit message copying, so that users won't use shared 191 // messages (less efficient) without being aware of the fact. 192 message_t (const message_t&); 193 void operator = (const message_t&); 194 }; 195 196 class context_t 197 { 198 friend class socket_t; 199 200 public: 201 context_t(int io_threads_)202 inline context_t (int io_threads_) 203 { 204 ptr = zmq_init (io_threads_); 205 if (ptr == NULL) 206 throw error_t (); 207 } 208 209 #ifdef ZMQ_HAS_RVALUE_REFS context_t(context_t && rhs)210 inline context_t(context_t&& rhs) : ptr(rhs.ptr) 211 { 212 rhs.ptr = NULL; 213 } operator =(context_t && rhs)214 inline context_t& operator=(context_t&& rhs) 215 { 216 std::swap(ptr, rhs.ptr); 217 return *this; 218 } 219 #endif 220 ~context_t()221 inline ~context_t () 222 { 223 if (ptr == NULL) 224 return; 225 int rc = zmq_term (ptr); 226 ZMQ_ASSERT (rc == 0); 227 } 228 229 // Be careful with this, it's probably only useful for 230 // using the C api together with an existing C++ api. 231 // Normally you should never need to use this. operator void*()232 inline operator void* () 233 { 234 return ptr; 235 } 236 237 private: 238 239 void *ptr; 240 241 context_t (const context_t&); 242 void operator = (const context_t&); 243 }; 244 245 class socket_t 246 { 247 public: 248 socket_t(context_t & context_,int type_)249 inline socket_t (context_t &context_, int type_) 250 { 251 ptr = zmq_socket (context_.ptr, type_); 252 if (ptr == NULL) 253 throw error_t (); 254 } 255 256 #ifdef ZMQ_HAS_RVALUE_REFS socket_t(socket_t && rhs)257 inline socket_t(socket_t&& rhs) : ptr(rhs.ptr) 258 { 259 rhs.ptr = NULL; 260 } operator =(socket_t && rhs)261 inline socket_t& operator=(socket_t&& rhs) 262 { 263 std::swap(ptr, rhs.ptr); 264 return *this; 265 } 266 #endif 267 ~socket_t()268 inline ~socket_t () 269 { 270 close(); 271 } 272 operator void*()273 inline operator void* () 274 { 275 return ptr; 276 } 277 close()278 inline void close() 279 { 280 if(ptr == NULL) 281 // already closed 282 return ; 283 int rc = zmq_close (ptr); 284 if (rc != 0) 285 throw error_t (); 286 ptr = 0 ; 287 } 288 setsockopt(int option_,const void * optval_,size_t optvallen_)289 inline void setsockopt (int option_, const void *optval_, 290 size_t optvallen_) 291 { 292 int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); 293 if (rc != 0) 294 throw error_t (); 295 } 296 getsockopt(int option_,void * optval_,size_t * optvallen_)297 inline void getsockopt (int option_, void *optval_, 298 size_t *optvallen_) 299 { 300 int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_); 301 if (rc != 0) 302 throw error_t (); 303 } 304 bind(const char * addr_)305 inline void bind (const char *addr_) 306 { 307 int rc = zmq_bind (ptr, addr_); 308 if (rc != 0) 309 throw error_t (); 310 } 311 connect(const char * addr_)312 inline void connect (const char *addr_) 313 { 314 int rc = zmq_connect (ptr, addr_); 315 if (rc != 0) 316 throw error_t (); 317 } 318 send(message_t & msg_,int flags_=0)319 inline bool send (message_t &msg_, int flags_ = 0) 320 { 321 int rc = zmq_send (ptr, &msg_, flags_); 322 if (rc == 0) 323 return true; 324 if (rc == -1 && zmq_errno () == EAGAIN) 325 return false; 326 throw error_t (); 327 } 328 recv(message_t * msg_,int flags_=0)329 inline bool recv (message_t *msg_, int flags_ = 0) 330 { 331 int rc = zmq_recv (ptr, msg_, flags_); 332 if (rc == 0) 333 return true; 334 if (rc == -1 && zmq_errno () == EAGAIN) 335 return false; 336 throw error_t (); 337 } 338 339 private: 340 341 void *ptr; 342 343 socket_t (const socket_t&); 344 void operator = (const socket_t&); 345 }; 346 347 } 348 349 #endif 350