1 /* 2 Copyright (c) 2007-2010 iMatix Corporation 3 4 This file is part of 0MQ. 5 6 0MQ is free software; you can redistribute it and/or modify it under 7 the terms of the GNU Lesser General Public License as published by 8 the Free Software Foundation; either version 3 of the License, or 9 (at your option) any later version. 10 11 0MQ is distributed in the hope that it will be useful, 12 but WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 GNU Lesser General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public License 17 along with this program. If not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #ifndef __ZMQ_HPP_INCLUDED__ 21 #define __ZMQ_HPP_INCLUDED__ 22 23 #include "zmq.h" 24 25 #include <cassert> 26 #include <cstring> 27 #include <exception> 28 29 namespace zmq 30 { 31 32 typedef zmq_free_fn free_fn; 33 typedef zmq_pollitem_t pollitem_t; 34 35 class error_t : public std::exception 36 { 37 public: 38 error_t()39 error_t () : errnum (zmq_errno ()) {} 40 what() const41 virtual const char *what () const throw () 42 { 43 return zmq_strerror (errnum); 44 } 45 num() const46 int num () const 47 { 48 return errnum; 49 } 50 51 private: 52 53 int errnum; 54 }; 55 poll(zmq_pollitem_t * items_,int nitems_,long timeout_=-1)56 inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1) 57 { 58 int rc = zmq_poll (items_, nitems_, timeout_); 59 if (rc < 0) 60 throw error_t (); 61 return rc; 62 } 63 device(int device_,void * insocket_,void * outsocket_)64 inline void device (int device_, void * insocket_, void* outsocket_) 65 { 66 int rc = zmq_device (device_, insocket_, outsocket_); 67 if (rc != 0) 68 throw error_t (); 69 } 70 71 class message_t : private zmq_msg_t 72 { 73 friend class socket_t; 74 75 public: 76 message_t()77 inline message_t () 78 { 79 int rc = zmq_msg_init (this); 80 if (rc != 0) 81 throw error_t (); 82 } 83 message_t(size_t size_)84 inline message_t (size_t size_) 85 { 86 int rc = zmq_msg_init_size (this, size_); 87 if (rc != 0) 88 throw error_t (); 89 } 90 message_t(void * data_,size_t size_,free_fn * ffn_,void * hint_=NULL)91 inline message_t (void *data_, size_t size_, free_fn *ffn_, 92 void *hint_ = NULL) 93 { 94 int rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_); 95 if (rc != 0) 96 throw error_t (); 97 } 98 ~message_t()99 inline ~message_t () 100 { 101 int rc = zmq_msg_close (this); 102 assert (rc == 0); 103 } 104 rebuild()105 inline void rebuild () 106 { 107 int rc = zmq_msg_close (this); 108 if (rc != 0) 109 throw error_t (); 110 rc = zmq_msg_init (this); 111 if (rc != 0) 112 throw error_t (); 113 } 114 rebuild(size_t size_)115 inline void rebuild (size_t size_) 116 { 117 int rc = zmq_msg_close (this); 118 if (rc != 0) 119 throw error_t (); 120 rc = zmq_msg_init_size (this, size_); 121 if (rc != 0) 122 throw error_t (); 123 } 124 rebuild(void * data_,size_t size_,free_fn * ffn_,void * hint_=NULL)125 inline void rebuild (void *data_, size_t size_, free_fn *ffn_, 126 void *hint_ = NULL) 127 { 128 int rc = zmq_msg_close (this); 129 if (rc != 0) 130 throw error_t (); 131 rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_); 132 if (rc != 0) 133 throw error_t (); 134 } 135 move(message_t * msg_)136 inline void move (message_t *msg_) 137 { 138 int rc = zmq_msg_move (this, (zmq_msg_t*) msg_); 139 if (rc != 0) 140 throw error_t (); 141 } 142 copy(message_t * msg_)143 inline void copy (message_t *msg_) 144 { 145 int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_); 146 if (rc != 0) 147 throw error_t (); 148 } 149 data()150 inline void *data () 151 { 152 return zmq_msg_data (this); 153 } 154 size()155 inline size_t size () 156 { 157 return zmq_msg_size (this); 158 } 159 160 private: 161 162 // Disable implicit message copying, so that users won't use shared 163 // messages (less efficient) without being aware of the fact. 164 message_t (const message_t&); 165 void operator = (const message_t&); 166 }; 167 168 class context_t 169 { 170 friend class socket_t; 171 172 public: 173 context_t(int io_threads_)174 inline context_t (int io_threads_) 175 { 176 ptr = zmq_init (io_threads_); 177 if (ptr == NULL) 178 throw error_t (); 179 } 180 ~context_t()181 inline ~context_t () 182 { 183 int rc = zmq_term (ptr); 184 assert (rc == 0); 185 } 186 187 // Be careful with this, it's probably only useful for 188 // using the C api together with an existing C++ api. 189 // Normally you should never need to use this. operator void*()190 inline operator void* () 191 { 192 return ptr; 193 } 194 195 private: 196 197 void *ptr; 198 199 context_t (const context_t&); 200 void operator = (const context_t&); 201 }; 202 203 class socket_t 204 { 205 public: 206 socket_t(context_t & context_,int type_)207 inline socket_t (context_t &context_, int type_) 208 { 209 ptr = zmq_socket (context_.ptr, type_); 210 if (ptr == NULL) 211 throw error_t (); 212 } 213 ~socket_t()214 inline ~socket_t () 215 { 216 int rc = zmq_close (ptr); 217 assert (rc == 0); 218 } 219 operator void*()220 inline operator void* () 221 { 222 return ptr; 223 } 224 setsockopt(int option_,const void * optval_,size_t optvallen_)225 inline void setsockopt (int option_, const void *optval_, 226 size_t optvallen_) 227 { 228 int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); 229 if (rc != 0) 230 throw error_t (); 231 } 232 getsockopt(int option_,void * optval_,size_t * optvallen_)233 inline void getsockopt (int option_, void *optval_, 234 size_t *optvallen_) 235 { 236 int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_); 237 if (rc != 0) 238 throw error_t (); 239 } 240 bind(const char * addr_)241 inline void bind (const char *addr_) 242 { 243 int rc = zmq_bind (ptr, addr_); 244 if (rc != 0) 245 throw error_t (); 246 } 247 connect(const char * addr_)248 inline void connect (const char *addr_) 249 { 250 int rc = zmq_connect (ptr, addr_); 251 if (rc != 0) 252 throw error_t (); 253 } 254 send(message_t & msg_,int flags_=0)255 inline bool send (message_t &msg_, int flags_ = 0) 256 { 257 int rc = zmq_send (ptr, &msg_, flags_); 258 if (rc == 0) 259 return true; 260 if (rc == -1 && zmq_errno () == EAGAIN) 261 return false; 262 throw error_t (); 263 } 264 recv(message_t * msg_,int flags_=0)265 inline bool recv (message_t *msg_, int flags_ = 0) 266 { 267 int rc = zmq_recv (ptr, msg_, flags_); 268 if (rc == 0) 269 return true; 270 if (rc == -1 && zmq_errno () == EAGAIN) 271 return false; 272 throw error_t (); 273 } 274 275 private: 276 277 void *ptr; 278 279 socket_t (const socket_t&); 280 void operator = (const socket_t&); 281 }; 282 283 } 284 285 #endif 286