1 
2 #include "precompiled.hpp"
3 
4 #include "platform.hpp"
5 
6 #if defined ZMQ_HAVE_NORM
7 
8 #include "norm_engine.hpp"
9 #include "session_base.hpp"
10 #include "v2_protocol.hpp"
11 
norm_engine_t(io_thread_t * parent_,const options_t & options_)12 zmq::norm_engine_t::norm_engine_t (io_thread_t *parent_,
13                                    const options_t &options_) :
14     io_object_t (parent_),
15     zmq_session (NULL),
16     options (options_),
17     norm_instance (NORM_INSTANCE_INVALID),
18     norm_session (NORM_SESSION_INVALID),
19     is_sender (false),
20     is_receiver (false),
21     zmq_encoder (0),
22     norm_tx_stream (NORM_OBJECT_INVALID),
23     tx_first_msg (true),
24     tx_more_bit (false),
25     zmq_output_ready (false),
26     norm_tx_ready (false),
27     tx_index (0),
28     tx_len (0),
29     zmq_input_ready (false)
30 {
31     int rc = tx_msg.init ();
32     errno_assert (0 == rc);
33 }
34 
~norm_engine_t()35 zmq::norm_engine_t::~norm_engine_t ()
36 {
37     shutdown (); // in case it was not already called
38 }
39 
40 
init(const char * network_,bool send,bool recv)41 int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
42 {
43     // Parse the "network_" address int "iface", "addr", and "port"
44     // norm endpoint format: [id,][<iface>;]<addr>:<port>
45     // First, look for optional local NormNodeId
46     // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId)
47     NormNodeId localId = NORM_NODE_ANY;
48     const char *ifacePtr = strchr (network_, ',');
49     if (NULL != ifacePtr) {
50         size_t idLen = ifacePtr - network_;
51         if (idLen > 31)
52             idLen = 31;
53         char idText[32];
54         strncpy (idText, network_, idLen);
55         idText[idLen] = '\0';
56         localId = (NormNodeId) atoi (idText);
57         ifacePtr++;
58     } else {
59         ifacePtr = network_;
60     }
61 
62     // Second, look for optional multicast ifaceName
63     char ifaceName[256];
64     const char *addrPtr = strchr (ifacePtr, ';');
65     if (NULL != addrPtr) {
66         size_t ifaceLen = addrPtr - ifacePtr;
67         if (ifaceLen > 255)
68             ifaceLen = 255; // return error instead?
69         strncpy (ifaceName, ifacePtr, ifaceLen);
70         ifaceName[ifaceLen] = '\0';
71         ifacePtr = ifaceName;
72         addrPtr++;
73     } else {
74         addrPtr = ifacePtr;
75         ifacePtr = NULL;
76     }
77 
78     // Finally, parse IP address and port number
79     const char *portPtr = strrchr (addrPtr, ':');
80     if (NULL == portPtr) {
81         errno = EINVAL;
82         return -1;
83     }
84 
85     char addr[256];
86     size_t addrLen = portPtr - addrPtr;
87     if (addrLen > 255)
88         addrLen = 255;
89     strncpy (addr, addrPtr, addrLen);
90     addr[addrLen] = '\0';
91     portPtr++;
92     unsigned short portNumber = atoi (portPtr);
93 
94     if (NORM_INSTANCE_INVALID == norm_instance) {
95         if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance ())) {
96             // errno set by whatever caused NormCreateInstance() to fail
97             return -1;
98         }
99     }
100 
101     // TBD - What do we use for our local NormNodeId?
102     //       (for now we use automatic, IP addr based assignment or passed in 'id')
103     //       a) Use ZMQ Identity somehow?
104     //       b) Add function to use iface addr
105     //       c) Randomize and implement a NORM session layer
106     //          conflict detection/resolution protocol
107 
108     norm_session = NormCreateSession (norm_instance, addr, portNumber, localId);
109     if (NORM_SESSION_INVALID == norm_session) {
110         int savedErrno = errno;
111         NormDestroyInstance (norm_instance);
112         norm_instance = NORM_INSTANCE_INVALID;
113         errno = savedErrno;
114         return -1;
115     }
116     // There's many other useful NORM options that could be applied here
117     if (NormIsUnicastAddress (addr)) {
118         NormSetDefaultUnicastNack (norm_session, true);
119     } else {
120         // These only apply for multicast sessions
121         //NormSetTTL(norm_session, options.multicast_hops);  // ZMQ default is 1
122         NormSetTTL (
123           norm_session,
124           255); // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported
125         NormSetRxPortReuse (
126           norm_session,
127           true); // port reuse doesn't work for non-connected unicast
128         NormSetLoopback (norm_session,
129                          true); // needed when multicast users on same machine
130         if (NULL != ifacePtr) {
131             // Note a bad interface may not be caught until sender or receiver start
132             // (Since sender/receiver is not yet started, this always succeeds here)
133             NormSetMulticastInterface (norm_session, ifacePtr);
134         }
135     }
136 
137     if (recv) {
138         // The alternative NORM_SYNC_CURRENT here would provide "instant"
139         // receiver sync to the sender's _current_ message transmission.
140         // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
141         NormSetDefaultSyncPolicy (norm_session, NORM_SYNC_STREAM);
142         if (!NormStartReceiver (norm_session, 2 * 1024 * 1024)) {
143             // errno set by whatever failed
144             int savedErrno = errno;
145             NormDestroyInstance (norm_instance); // session gets closed, too
146             norm_session = NORM_SESSION_INVALID;
147             norm_instance = NORM_INSTANCE_INVALID;
148             errno = savedErrno;
149             return -1;
150         }
151         is_receiver = true;
152     }
153 
154     if (send) {
155         // Pick a random sender instance id (aka norm sender session id)
156         NormSessionId instanceId = NormGetRandomSessionId ();
157         // TBD - provide "options" for some NORM sender parameters
158         if (!NormStartSender (norm_session, instanceId, 2 * 1024 * 1024, 1400,
159                               16, 4)) {
160             // errno set by whatever failed
161             int savedErrno = errno;
162             NormDestroyInstance (norm_instance); // session gets closed, too
163             norm_session = NORM_SESSION_INVALID;
164             norm_instance = NORM_INSTANCE_INVALID;
165             errno = savedErrno;
166             return -1;
167         }
168         NormSetCongestionControl (norm_session, true);
169         norm_tx_ready = true;
170         is_sender = true;
171         if (NORM_OBJECT_INVALID
172             == (norm_tx_stream =
173                   NormStreamOpen (norm_session, 2 * 1024 * 1024))) {
174             // errno set by whatever failed
175             int savedErrno = errno;
176             NormDestroyInstance (norm_instance); // session gets closed, too
177             norm_session = NORM_SESSION_INVALID;
178             norm_instance = NORM_INSTANCE_INVALID;
179             errno = savedErrno;
180             return -1;
181         }
182     }
183 
184     //NormSetMessageTrace(norm_session, true);
185     //NormSetDebugLevel(3);
186     //NormOpenDebugLog(norm_instance, "normLog.txt");
187 
188     return 0; // no error
189 } // end zmq::norm_engine_t::init()
190 
shutdown()191 void zmq::norm_engine_t::shutdown ()
192 {
193     // TBD - implement a more graceful shutdown option
194     if (is_receiver) {
195         NormStopReceiver (norm_session);
196 
197         // delete any active NormRxStreamState
198         rx_pending_list.Destroy ();
199         rx_ready_list.Destroy ();
200         msg_ready_list.Destroy ();
201 
202         is_receiver = false;
203     }
204     if (is_sender) {
205         NormStopSender (norm_session);
206         is_sender = false;
207     }
208     if (NORM_SESSION_INVALID != norm_session) {
209         NormDestroySession (norm_session);
210         norm_session = NORM_SESSION_INVALID;
211     }
212     if (NORM_INSTANCE_INVALID != norm_instance) {
213         NormStopInstance (norm_instance);
214         NormDestroyInstance (norm_instance);
215         norm_instance = NORM_INSTANCE_INVALID;
216     }
217 } // end zmq::norm_engine_t::shutdown()
218 
plug(io_thread_t * io_thread_,session_base_t * session_)219 void zmq::norm_engine_t::plug (io_thread_t *io_thread_,
220                                session_base_t *session_)
221 {
222     // TBD - we may assign the NORM engine to an io_thread in the future???
223     zmq_session = session_;
224     if (is_sender)
225         zmq_output_ready = true;
226     if (is_receiver)
227         zmq_input_ready = true;
228 
229     fd_t normDescriptor = NormGetDescriptor (norm_instance);
230     norm_descriptor_handle = add_fd (normDescriptor);
231     // Set POLLIN for notification of pending NormEvents
232     set_pollin (norm_descriptor_handle);
233 
234     if (is_sender)
235         send_data ();
236 
237 } // end zmq::norm_engine_t::init()
238 
unplug()239 void zmq::norm_engine_t::unplug ()
240 {
241     rm_fd (norm_descriptor_handle);
242 
243     zmq_session = NULL;
244 } // end zmq::norm_engine_t::unplug()
245 
terminate()246 void zmq::norm_engine_t::terminate ()
247 {
248     unplug ();
249     shutdown ();
250     delete this;
251 }
252 
restart_output()253 void zmq::norm_engine_t::restart_output ()
254 {
255     // There's new message data available from the session
256     zmq_output_ready = true;
257     if (norm_tx_ready)
258         send_data ();
259 
260 } // end zmq::norm_engine_t::restart_output()
261 
send_data()262 void zmq::norm_engine_t::send_data ()
263 {
264     // Here we write as much as is available or we can
265     while (zmq_output_ready && norm_tx_ready) {
266         if (0 == tx_len) {
267             // Our tx_buffer needs data to send
268             // Get more data from encoder
269             size_t space = BUFFER_SIZE;
270             unsigned char *bufPtr = (unsigned char *) tx_buffer;
271             tx_len = zmq_encoder.encode (&bufPtr, space);
272             if (0 == tx_len) {
273                 if (tx_first_msg) {
274                     // We don't need to mark eom/flush until a message is sent
275                     tx_first_msg = false;
276                 } else {
277                     // A prior message was completely written to stream, so
278                     // mark end-of-message and possibly flush (to force packet transmission,
279                     // even if it's not a full segment so message gets delivered quickly)
280                     // NormStreamMarkEom(norm_tx_stream);  // the flush below marks eom
281                     // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging
282                     // but makes sure content is delivered quickly.  Positive acknowledgements
283                     // with flush override would make NORM more succinct here
284                     NormStreamFlush (norm_tx_stream, true, NORM_FLUSH_ACTIVE);
285                 }
286                 // Need to pull and load a new message to send
287                 if (-1 == zmq_session->pull_msg (&tx_msg)) {
288                     // We need to wait for "restart_output()" to be called by ZMQ
289                     zmq_output_ready = false;
290                     break;
291                 }
292                 zmq_encoder.load_msg (&tx_msg);
293                 // Should we write message size header for NORM to use? Or expect NORM
294                 // receiver to decode ZMQ message framing format(s)?
295                 // OK - we need to use a byte to denote when the ZMQ frame is the _first_
296                 //      frame of a message so it can be decoded properly when a receiver
297                 //      'syncs' mid-stream.  We key off the the state of the 'more_flag'
298                 //      I.e.,If  more_flag _was_ false previously, this is the first
299                 //      frame of a ZMQ message.
300                 if (tx_more_bit)
301                     tx_buffer[0] =
302                       (char) 0xff; // this is not first frame of message
303                 else
304                     tx_buffer[0] = 0x00; // this is first frame of message
305                 tx_more_bit = (0 != (tx_msg.flags () & msg_t::more));
306                 // Go ahead an get a first chunk of the message
307                 bufPtr++;
308                 space--;
309                 tx_len = 1 + zmq_encoder.encode (&bufPtr, space);
310                 tx_index = 0;
311             }
312         }
313         // Do we have data in our tx_buffer pending
314         if (tx_index < tx_len) {
315             // We have data in our tx_buffer to send, so write it to the stream
316             tx_index += NormStreamWrite (norm_tx_stream, tx_buffer + tx_index,
317                                          tx_len - tx_index);
318             if (tx_index < tx_len) {
319                 // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
320                 norm_tx_ready = false;
321                 break;
322             }
323             tx_len = 0; // all buffered data was written
324         }
325     } // end while (zmq_output_ready && norm_tx_ready)
326 } // end zmq::norm_engine_t::send_data()
327 
in_event()328 void zmq::norm_engine_t::in_event ()
329 {
330     // This means a NormEvent is pending, so call NormGetNextEvent() and handle
331     NormEvent event;
332     if (!NormGetNextEvent (norm_instance, &event)) {
333         // NORM has died before we unplugged?!
334         zmq_assert (false);
335         return;
336     }
337 
338     switch (event.type) {
339         case NORM_TX_QUEUE_VACANCY:
340         case NORM_TX_QUEUE_EMPTY:
341             if (!norm_tx_ready) {
342                 norm_tx_ready = true;
343                 send_data ();
344             }
345             break;
346 
347         case NORM_RX_OBJECT_NEW:
348             //break;
349         case NORM_RX_OBJECT_UPDATED:
350             recv_data (event.object);
351             break;
352 
353         case NORM_RX_OBJECT_ABORTED: {
354             NormRxStreamState *rxState =
355               (NormRxStreamState *) NormObjectGetUserData (event.object);
356             if (NULL != rxState) {
357                 // Remove the state from the list it's in
358                 // This is now unnecessary since deletion takes care of list removal
359                 // but in the interest of being clear ...
360                 NormRxStreamState::List *list = rxState->AccessList ();
361                 if (NULL != list)
362                     list->Remove (*rxState);
363             }
364             delete rxState;
365             break;
366         }
367         case NORM_REMOTE_SENDER_INACTIVE:
368             // Here we free resources used for this formerly active sender.
369             // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
370             //  get some messages delivered twice.  NORM_SYNC_CURRENT would
371             // mitigate that but might miss data at startup. Always tradeoffs.
372             // Instead of immediately deleting, we could instead initiate a
373             // user configurable timeout here to wait some amount of time
374             // after this event to declare the remote sender truly dead
375             // and delete its state???
376             NormNodeDelete (event.sender);
377             break;
378 
379         default:
380             // We ignore some NORM events
381             break;
382     }
383 } // zmq::norm_engine_t::in_event()
384 
restart_input()385 bool zmq::norm_engine_t::restart_input ()
386 {
387     // TBD - should we check/assert that zmq_input_ready was false???
388     zmq_input_ready = true;
389     // Process any pending received messages
390     if (!msg_ready_list.IsEmpty ())
391         recv_data (NORM_OBJECT_INVALID);
392 
393     return true;
394 } // end zmq::norm_engine_t::restart_input()
395 
recv_data(NormObjectHandle object)396 void zmq::norm_engine_t::recv_data (NormObjectHandle object)
397 {
398     if (NORM_OBJECT_INVALID != object) {
399         // Call result of NORM_RX_OBJECT_UPDATED notification
400         // This is a rx_ready indication for a new or existing rx stream
401         // First, determine if this is a stream we already know
402         zmq_assert (NORM_OBJECT_STREAM == NormObjectGetType (object));
403         // Since there can be multiple senders (publishers), we keep
404         // state for each separate rx stream.
405         NormRxStreamState *rxState =
406           (NormRxStreamState *) NormObjectGetUserData (object);
407         if (NULL == rxState) {
408             // This is a new stream, so create rxState with zmq decoder, etc
409             rxState = new (std::nothrow)
410               NormRxStreamState (object, options.maxmsgsize, options.zero_copy,
411                                  options.in_batch_size);
412             errno_assert (rxState);
413 
414             if (!rxState->Init ()) {
415                 errno_assert (false);
416                 delete rxState;
417                 return;
418             }
419             NormObjectSetUserData (object, rxState);
420         } else if (!rxState->IsRxReady ()) {
421             // Existing non-ready stream, so remove from pending
422             // list to be promoted to rx_ready_list ...
423             rx_pending_list.Remove (*rxState);
424         }
425         if (!rxState->IsRxReady ()) {
426             // TBD - prepend up front for immediate service?
427             rxState->SetRxReady (true);
428             rx_ready_list.Append (*rxState);
429         }
430     }
431     // This loop repeats until we've read all data available from "rx ready" inbound streams
432     // and pushed any accumulated messages we can up to the zmq session.
433     while (!rx_ready_list.IsEmpty ()
434            || (zmq_input_ready && !msg_ready_list.IsEmpty ())) {
435         // Iterate through our rx_ready streams, reading data into the decoder
436         // (This services incoming "rx ready" streams in a round-robin fashion)
437         NormRxStreamState::List::Iterator iterator (rx_ready_list);
438         NormRxStreamState *rxState;
439         while (NULL != (rxState = iterator.GetNextItem ())) {
440             switch (rxState->Decode ()) {
441                 case 1: // msg completed
442                     // Complete message decoded, move this stream to msg_ready_list
443                     // to push the message up to the session below.  Note the stream
444                     // will be returned to the "rx_ready_list" after that's done
445                     rx_ready_list.Remove (*rxState);
446                     msg_ready_list.Append (*rxState);
447                     continue;
448 
449                 case -1: // decoding error (shouldn't happen w/ NORM, but ...)
450                     // We need to re-sync this stream (decoder buffer was reset)
451                     rxState->SetSync (false);
452                     break;
453 
454                 default: // 0 - need more data
455                     break;
456             }
457             // Get more data from this stream
458             NormObjectHandle stream = rxState->GetStreamHandle ();
459             // First, make sure we're in sync ...
460             while (!rxState->InSync ()) {
461                 // seek NORM message start
462                 if (!NormStreamSeekMsgStart (stream)) {
463                     // Need to wait for more data
464                     break;
465                 }
466                 // read message 'flag' byte to see if this it's a 'final' frame
467                 char syncFlag;
468                 unsigned int numBytes = 1;
469                 if (!NormStreamRead (stream, &syncFlag, &numBytes)) {
470                     // broken stream (shouldn't happen after seek msg start?)
471                     zmq_assert (false);
472                     continue;
473                 }
474                 if (0 == numBytes) {
475                     // This probably shouldn't happen either since we found msg start
476                     // Need to wait for more data
477                     break;
478                 }
479                 if (0 == syncFlag)
480                     rxState->SetSync (true);
481                 // else keep seeking ...
482             } // end while(!rxState->InSync())
483             if (!rxState->InSync ()) {
484                 // Need more data for this stream, so remove from "rx ready"
485                 // list and iterate to next "rx ready" stream
486                 rxState->SetRxReady (false);
487                 // Move from rx_ready_list to rx_pending_list
488                 rx_ready_list.Remove (*rxState);
489                 rx_pending_list.Append (*rxState);
490                 continue;
491             }
492             // Now we're actually ready to read data from the NORM stream to the zmq_decoder
493             // the underlying zmq_decoder->get_buffer() call sets how much is needed.
494             unsigned int numBytes = rxState->GetBytesNeeded ();
495             if (!NormStreamRead (stream, rxState->AccessBuffer (), &numBytes)) {
496                 // broken NORM stream, so re-sync
497                 rxState->Init (); // TBD - check result
498                 // This will retry syncing, and getting data from this stream
499                 // since we don't increment the "it" iterator
500                 continue;
501             }
502             rxState->IncrementBufferCount (numBytes);
503             if (0 == numBytes) {
504                 // All the data available has been read
505                 // Need to wait for NORM_RX_OBJECT_UPDATED for this stream
506                 rxState->SetRxReady (false);
507                 // Move from rx_ready_list to rx_pending_list
508                 rx_ready_list.Remove (*rxState);
509                 rx_pending_list.Append (*rxState);
510             }
511         } // end while(NULL != (rxState = iterator.GetNextItem()))
512 
513         if (zmq_input_ready) {
514             // At this point, we've made a pass through the "rx_ready" stream list
515             // Now make a pass through the "msg_pending" list (if the zmq session
516             // ready for more input).  This may possibly return streams back to
517             // the "rx ready" stream list after their pending message is handled
518             NormRxStreamState::List::Iterator iterator (msg_ready_list);
519             NormRxStreamState *rxState;
520             while (NULL != (rxState = iterator.GetNextItem ())) {
521                 msg_t *msg = rxState->AccessMsg ();
522                 int rc = zmq_session->push_msg (msg);
523                 if (-1 == rc) {
524                     if (EAGAIN == errno) {
525                         // need to wait until session calls "restart_input()"
526                         zmq_input_ready = false;
527                         break;
528                     } else {
529                         // session rejected message?
530                         // TBD - handle this better
531                         zmq_assert (false);
532                     }
533                 }
534                 // else message was accepted.
535                 msg_ready_list.Remove (*rxState);
536                 if (
537                   rxState
538                     ->IsRxReady ()) // Move back to "rx_ready" list to read more data
539                     rx_ready_list.Append (*rxState);
540                 else // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED
541                     msg_ready_list.Append (*rxState);
542             } // end while(NULL != (rxState = iterator.GetNextItem()))
543         }     // end if (zmq_input_ready)
544     } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
545 
546     // Alert zmq of the messages we have pushed up
547     zmq_session->flush ();
548 
549 } // end zmq::norm_engine_t::recv_data()
550 
NormRxStreamState(NormObjectHandle normStream,int64_t maxMsgSize,bool zeroCopy,int inBatchSize)551 zmq::norm_engine_t::NormRxStreamState::NormRxStreamState (
552   NormObjectHandle normStream,
553   int64_t maxMsgSize,
554   bool zeroCopy,
555   int inBatchSize) :
556     norm_stream (normStream),
557     max_msg_size (maxMsgSize),
558     zero_copy (zeroCopy),
559     in_batch_size (inBatchSize),
560     in_sync (false),
561     rx_ready (false),
562     zmq_decoder (NULL),
563     skip_norm_sync (false),
564     buffer_ptr (NULL),
565     buffer_size (0),
566     buffer_count (0),
567     prev (NULL),
568     next (NULL),
569     list (NULL)
570 {
571 }
572 
~NormRxStreamState()573 zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState ()
574 {
575     if (NULL != zmq_decoder) {
576         delete zmq_decoder;
577         zmq_decoder = NULL;
578     }
579     if (NULL != list) {
580         list->Remove (*this);
581         list = NULL;
582     }
583 }
584 
Init()585 bool zmq::norm_engine_t::NormRxStreamState::Init ()
586 {
587     in_sync = false;
588     skip_norm_sync = false;
589     if (NULL != zmq_decoder)
590         delete zmq_decoder;
591     zmq_decoder =
592       new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy);
593     alloc_assert (zmq_decoder);
594     if (NULL != zmq_decoder) {
595         buffer_count = 0;
596         buffer_size = 0;
597         zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
598         return true;
599     } else {
600         return false;
601     }
602 } // end zmq::norm_engine_t::NormRxStreamState::Init()
603 
604 // This decodes any pending data sitting in our stream decoder buffer
605 // It returns 1 upon message completion, -1 on error, 1 on msg completion
Decode()606 int zmq::norm_engine_t::NormRxStreamState::Decode ()
607 {
608     // If we have pending bytes to decode, process those first
609     while (buffer_count > 0) {
610         // There's pending data for the decoder to decode
611         size_t processed = 0;
612 
613         // This a bit of a kludgy approach used to weed
614         // out the NORM ZMQ message transport "syncFlag" byte
615         // from the ZMQ message stream being decoded (but it works!)
616         if (skip_norm_sync) {
617             buffer_ptr++;
618             buffer_count--;
619             skip_norm_sync = false;
620         }
621 
622         int rc = zmq_decoder->decode (buffer_ptr, buffer_count, processed);
623         buffer_ptr += processed;
624         buffer_count -= processed;
625         switch (rc) {
626             case 1:
627                 // msg completed
628                 if (0 == buffer_count) {
629                     buffer_size = 0;
630                     zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
631                 }
632                 skip_norm_sync = true;
633                 return 1;
634             case -1:
635                 // decoder error (reset decoder and state variables)
636                 in_sync = false;
637                 skip_norm_sync = false; // will get consumed by norm sync check
638                 Init ();
639                 break;
640 
641             case 0:
642                 // need more data, keep decoding until buffer exhausted
643                 break;
644         }
645     }
646     // Reset buffer pointer/count for next read
647     buffer_count = 0;
648     buffer_size = 0;
649     zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
650     return 0; //  need more data
651 
652 } // end zmq::norm_engine_t::NormRxStreamState::Decode()
653 
List()654 zmq::norm_engine_t::NormRxStreamState::List::List () : head (NULL), tail (NULL)
655 {
656 }
657 
~List()658 zmq::norm_engine_t::NormRxStreamState::List::~List ()
659 {
660     Destroy ();
661 }
662 
Destroy()663 void zmq::norm_engine_t::NormRxStreamState::List::Destroy ()
664 {
665     NormRxStreamState *item = head;
666     while (NULL != item) {
667         Remove (*item);
668         delete item;
669         item = head;
670     }
671 } // end zmq::norm_engine_t::NormRxStreamState::List::Destroy()
672 
Append(NormRxStreamState & item)673 void zmq::norm_engine_t::NormRxStreamState::List::Append (
674   NormRxStreamState &item)
675 {
676     item.prev = tail;
677     if (NULL != tail)
678         tail->next = &item;
679     else
680         head = &item;
681     item.next = NULL;
682     tail = &item;
683     item.list = this;
684 } // end zmq::norm_engine_t::NormRxStreamState::List::Append()
685 
Remove(NormRxStreamState & item)686 void zmq::norm_engine_t::NormRxStreamState::List::Remove (
687   NormRxStreamState &item)
688 {
689     if (NULL != item.prev)
690         item.prev->next = item.next;
691     else
692         head = item.next;
693     if (NULL != item.next)
694         item.next->prev = item.prev;
695     else
696         tail = item.prev;
697     item.prev = item.next = NULL;
698     item.list = NULL;
699 } // end zmq::norm_engine_t::NormRxStreamState::List::Remove()
700 
Iterator(const List & list)701 zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator (
702   const List &list) :
703     next_item (list.head)
704 {
705 }
706 
707 zmq::norm_engine_t::NormRxStreamState *
GetNextItem()708 zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem ()
709 {
710     NormRxStreamState *nextItem = next_item;
711     if (NULL != nextItem)
712         next_item = nextItem->next;
713     return nextItem;
714 } // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
715 
get_endpoint() const716 const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const
717 {
718     return _empty_endpoint;
719 }
720 
721 #endif // ZMQ_HAVE_NORM
722