1 /*
2     Copyright (c) 2013 Martin Sustrik  All rights reserved.
3     Copyright 2017 Garrett D'Amore <garrett@damore.org>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a copy
6     of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom
10     the Software is furnished to do so, subject to the following conditions:
11 
12     The above copyright notice and this permission notice shall be included
13     in all copies or substantial portions of the Software.
14 
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18     THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21     IN THE SOFTWARE.
22 */
23 
24 #include "sinproc.h"
25 
26 #include "../../utils/err.h"
27 #include "../../utils/cont.h"
28 #include "../../utils/attr.h"
29 
30 #include <stddef.h>
31 
32 #define NN_SINPROC_STATE_IDLE 1
33 #define NN_SINPROC_STATE_CONNECTING 2
34 #define NN_SINPROC_STATE_READY 3
35 #define NN_SINPROC_STATE_ACTIVE 4
36 #define NN_SINPROC_STATE_DISCONNECTED 5
37 #define NN_SINPROC_STATE_STOPPING_PEER 6
38 #define NN_SINPROC_STATE_STOPPING 7
39 
40 #define NN_SINPROC_ACTION_READY 1
41 #define NN_SINPROC_ACTION_ACCEPTED 2
42 
43 /*  Set when SENT event was sent to the peer but RECEIVED haven't been
44     passed back yet. */
45 #define NN_SINPROC_FLAG_SENDING 1
46 
47 /*  Set when SENT event was received, but the new message cannot be written
48     to the queue yet, i.e. RECEIVED event haven't been returned
49     to the peer yet. */
50 #define NN_SINPROC_FLAG_RECEIVING 2
51 
52 /*  Private functions. */
53 static void nn_sinproc_handler (struct nn_fsm *self, int src, int type,
54     void *srcptr);
55 static void nn_sinproc_shutdown (struct nn_fsm *self, int src, int type,
56     void *srcptr);
57 
58 static int nn_sinproc_send (struct nn_pipebase *self, struct nn_msg *msg);
59 static int nn_sinproc_recv (struct nn_pipebase *self, struct nn_msg *msg);
60 const struct nn_pipebase_vfptr nn_sinproc_pipebase_vfptr = {
61     nn_sinproc_send,
62     nn_sinproc_recv
63 };
64 
nn_sinproc_init(struct nn_sinproc * self,int src,struct nn_ep * ep,struct nn_fsm * owner)65 void nn_sinproc_init (struct nn_sinproc *self, int src,
66     struct nn_ep *ep, struct nn_fsm *owner)
67 {
68     int rcvbuf;
69     size_t sz;
70 
71     nn_fsm_init (&self->fsm, nn_sinproc_handler, nn_sinproc_shutdown,
72         src, self, owner);
73     self->state = NN_SINPROC_STATE_IDLE;
74     self->flags = 0;
75     self->peer = NULL;
76     nn_pipebase_init (&self->pipebase, &nn_sinproc_pipebase_vfptr, ep);
77     sz = sizeof (rcvbuf);
78     nn_ep_getopt (ep, NN_SOL_SOCKET, NN_RCVBUF, &rcvbuf, &sz);
79     nn_assert (sz == sizeof (rcvbuf));
80     nn_msgqueue_init (&self->msgqueue, rcvbuf);
81     nn_msg_init (&self->msg, 0);
82     nn_fsm_event_init (&self->event_connect);
83     nn_fsm_event_init (&self->event_sent);
84     nn_fsm_event_init (&self->event_received);
85     nn_fsm_event_init (&self->event_disconnect);
86     nn_list_item_init (&self->item);
87 }
88 
nn_sinproc_term(struct nn_sinproc * self)89 void nn_sinproc_term (struct nn_sinproc *self)
90 {
91     nn_list_item_term (&self->item);
92     nn_fsm_event_term (&self->event_disconnect);
93     nn_fsm_event_term (&self->event_received);
94     nn_fsm_event_term (&self->event_sent);
95     nn_fsm_event_term (&self->event_connect);
96     nn_msg_term (&self->msg);
97     nn_msgqueue_term (&self->msgqueue);
98     nn_pipebase_term (&self->pipebase);
99     nn_fsm_term (&self->fsm);
100 }
101 
nn_sinproc_isidle(struct nn_sinproc * self)102 int nn_sinproc_isidle (struct nn_sinproc *self)
103 {
104     return nn_fsm_isidle (&self->fsm);
105 }
106 
nn_sinproc_connect(struct nn_sinproc * self,struct nn_fsm * peer)107 void nn_sinproc_connect (struct nn_sinproc *self, struct nn_fsm *peer)
108 {
109     nn_fsm_start (&self->fsm);
110 
111     /*  Start the connecting handshake with the peer. */
112     nn_fsm_raiseto (&self->fsm, peer, &self->event_connect,
113         NN_SINPROC_SRC_PEER, NN_SINPROC_CONNECT, self);
114 }
115 
nn_sinproc_accept(struct nn_sinproc * self,struct nn_sinproc * peer)116 void nn_sinproc_accept (struct nn_sinproc *self, struct nn_sinproc *peer)
117 {
118     nn_assert (!self->peer);
119     self->peer = peer;
120 
121     /*  Start the connecting handshake with the peer. */
122     nn_fsm_raiseto (&self->fsm, &peer->fsm, &self->event_connect,
123         NN_SINPROC_SRC_PEER, NN_SINPROC_READY, self);
124 
125     /*  Notify the state machine. */
126     nn_fsm_start (&self->fsm);
127     nn_fsm_action (&self->fsm, NN_SINPROC_ACTION_READY);
128 }
129 
nn_sinproc_stop(struct nn_sinproc * self)130 void nn_sinproc_stop (struct nn_sinproc *self)
131 {
132     nn_fsm_stop (&self->fsm);
133 }
134 
135 
136 
nn_sinproc_send(struct nn_pipebase * self,struct nn_msg * msg)137 static int nn_sinproc_send (struct nn_pipebase *self, struct nn_msg *msg)
138 {
139     struct nn_sinproc *sinproc;
140     struct nn_msg nmsg;
141 
142     sinproc = nn_cont (self, struct nn_sinproc, pipebase);
143 
144     /*  If the peer have already closed the connection, we cannot send
145         anymore. */
146     if (sinproc->state == NN_SINPROC_STATE_DISCONNECTED)
147         return -ECONNRESET;
148 
149     /*  Sanity checks. */
150     nn_assert_state (sinproc, NN_SINPROC_STATE_ACTIVE);
151     nn_assert (!(sinproc->flags & NN_SINPROC_FLAG_SENDING));
152 
153     nn_msg_init (&nmsg,
154         nn_chunkref_size (&msg->sphdr) +
155         nn_chunkref_size (&msg->body));
156     memcpy (nn_chunkref_data (&nmsg.body),
157         nn_chunkref_data (&msg->sphdr),
158         nn_chunkref_size (&msg->sphdr));
159     memcpy ((char *)nn_chunkref_data (&nmsg.body) +
160         nn_chunkref_size (&msg->sphdr),
161         nn_chunkref_data (&msg->body),
162         nn_chunkref_size (&msg->body));
163     nn_msg_term (msg);
164 
165     /*  Expose the message to the peer. */
166     nn_msg_term (&sinproc->msg);
167     nn_msg_mv (&sinproc->msg, &nmsg);
168 
169     /*  Notify the peer that there's a message to get. */
170     sinproc->flags |= NN_SINPROC_FLAG_SENDING;
171     nn_fsm_raiseto (&sinproc->fsm, &sinproc->peer->fsm,
172         &sinproc->peer->event_sent, NN_SINPROC_SRC_PEER,
173         NN_SINPROC_SENT, sinproc);
174 
175     return 0;
176 }
177 
nn_sinproc_recv(struct nn_pipebase * self,struct nn_msg * msg)178 static int nn_sinproc_recv (struct nn_pipebase *self, struct nn_msg *msg)
179 {
180     int rc;
181     struct nn_sinproc *sinproc;
182 
183     sinproc = nn_cont (self, struct nn_sinproc, pipebase);
184 
185     /*  Sanity check. */
186     nn_assert (sinproc->state == NN_SINPROC_STATE_ACTIVE ||
187         sinproc->state == NN_SINPROC_STATE_DISCONNECTED);
188 
189     /*  Move the message to the caller. */
190     rc = nn_msgqueue_recv (&sinproc->msgqueue, msg);
191     errnum_assert (rc == 0, -rc);
192 
193     /*  If there was a message from peer lingering because of the exceeded
194         buffer limit, try to enqueue it once again. */
195     if (sinproc->state != NN_SINPROC_STATE_DISCONNECTED) {
196         if (nn_slow (sinproc->flags & NN_SINPROC_FLAG_RECEIVING)) {
197             rc = nn_msgqueue_send (&sinproc->msgqueue, &sinproc->peer->msg);
198             nn_assert (rc == 0 || rc == -EAGAIN);
199             if (rc == 0) {
200                 errnum_assert (rc == 0, -rc);
201                 nn_msg_init (&sinproc->peer->msg, 0);
202                 nn_fsm_raiseto (&sinproc->fsm, &sinproc->peer->fsm,
203                     &sinproc->peer->event_received, NN_SINPROC_SRC_PEER,
204                     NN_SINPROC_RECEIVED, sinproc);
205                 sinproc->flags &= ~NN_SINPROC_FLAG_RECEIVING;
206             }
207         }
208     }
209 
210     if (!nn_msgqueue_empty (&sinproc->msgqueue))
211        nn_pipebase_received (&sinproc->pipebase);
212 
213     return 0;
214 }
215 
nn_sinproc_shutdown_events(struct nn_sinproc * self,int src,int type,NN_UNUSED void * srcptr)216 static void nn_sinproc_shutdown_events (struct nn_sinproc *self, int src,
217     int type, NN_UNUSED void *srcptr)
218 {
219     /*  *******************************  */
220     /*  Any-state events                 */
221     /*  *******************************  */
222     switch (src) {
223     case NN_FSM_ACTION:
224         switch (type) {
225         case NN_FSM_STOP:
226             if (self->state != NN_SINPROC_STATE_IDLE &&
227                   self->state != NN_SINPROC_STATE_DISCONNECTED) {
228                 nn_pipebase_stop (&self->pipebase);
229                 nn_assert (self->fsm.state == 2 || self->fsm.state == 3);
230                 nn_fsm_raiseto (&self->fsm, &self->peer->fsm,
231                     &self->peer->event_disconnect, NN_SINPROC_SRC_PEER,
232                     NN_SINPROC_DISCONNECT, self);
233 
234                 self->state = NN_SINPROC_STATE_STOPPING_PEER;
235             } else {
236                 self->state = NN_SINPROC_STATE_STOPPING;
237             }
238             return;
239         default:
240             break;
241         }
242     case NN_SINPROC_SRC_PEER:
243         switch (type) {
244         case NN_SINPROC_RECEIVED:
245             return;
246         }
247     }
248 
249     /*  *******************************  */
250     /*  Regular events                   */
251     /*  *******************************  */
252     switch (self->state) {
253     case NN_SINPROC_STATE_STOPPING_PEER:
254         switch (src) {
255         case NN_SINPROC_SRC_PEER:
256             switch (type) {
257             case NN_SINPROC_DISCONNECT:
258                 self->state = NN_SINPROC_STATE_STOPPING;
259                 return;
260             default:
261                 /*  We could get a notification about state that
262                     was queued earlier, or about a sent message.  We
263                     do not care about those anymore, we're closing! */
264                 return;
265             }
266         default:
267             nn_fsm_bad_source (self->state, src, type);
268         }
269     default:
270         nn_fsm_bad_state (self->state, src, type);
271     }
272 
273     nn_fsm_bad_action (self->state, src, type);
274 }
275 
nn_sinproc_shutdown(struct nn_fsm * self,int src,int type,void * srcptr)276 static void nn_sinproc_shutdown (struct nn_fsm *self, int src, int type,
277     void *srcptr)
278 {
279     struct nn_sinproc *sinproc;
280 
281     sinproc = nn_cont (self, struct nn_sinproc, fsm);
282     nn_assert (sinproc->fsm.state == 3);
283 
284     nn_sinproc_shutdown_events (sinproc, src, type, srcptr);
285 
286     /*  ***************  */
287     /*  States to check  */
288     /*  ***************  */
289 
290     /*  Have we got notification that peer is stopped  */
291     if (nn_slow (sinproc->state != NN_SINPROC_STATE_STOPPING)) {
292         return;
293     }
294 
295     /*  Are all events processed? We can't cancel them unfortunately  */
296     if (nn_fsm_event_active (&sinproc->event_received)
297         || nn_fsm_event_active (&sinproc->event_disconnect))
298     {
299         return;
300     }
301     /*  These events are deemed to be impossible here  */
302     nn_assert (!nn_fsm_event_active (&sinproc->event_connect));
303     nn_assert (!nn_fsm_event_active (&sinproc->event_sent));
304 
305     /*  **********************************************  */
306     /*  All checks are successful. Just stop right now  */
307     /*  **********************************************  */
308 
309     nn_fsm_stopped (&sinproc->fsm, NN_SINPROC_STOPPED);
310     return;
311 }
312 
nn_sinproc_handler(struct nn_fsm * self,int src,int type,void * srcptr)313 static void nn_sinproc_handler (struct nn_fsm *self, int src, int type,
314     void *srcptr)
315 {
316     int rc;
317     struct nn_sinproc *sinproc;
318     int empty;
319 
320     sinproc = nn_cont (self, struct nn_sinproc, fsm);
321 
322     switch (sinproc->state) {
323 
324 /******************************************************************************/
325 /*  IDLE state.                                                               */
326 /******************************************************************************/
327     case NN_SINPROC_STATE_IDLE:
328         switch (src) {
329 
330         case NN_FSM_ACTION:
331             switch (type) {
332             case NN_FSM_START:
333                 sinproc->state = NN_SINPROC_STATE_CONNECTING;
334                 return;
335             default:
336                 nn_fsm_bad_action (sinproc->state, src, type);
337             }
338 
339         default:
340             nn_fsm_bad_source (sinproc->state, src, type);
341         }
342 
343 /******************************************************************************/
344 /*  CONNECTING state.                                                         */
345 /*  CONNECT request was sent to the peer. Now we are waiting for the          */
346 /*  acknowledgement.                                                          */
347 /******************************************************************************/
348     case NN_SINPROC_STATE_CONNECTING:
349         switch (src) {
350 
351         case NN_FSM_ACTION:
352             switch (type) {
353             case NN_SINPROC_ACTION_READY:
354                 sinproc->state = NN_SINPROC_STATE_READY;
355                 return;
356             default:
357                 nn_fsm_bad_action (sinproc->state, src, type);
358             }
359 
360         case NN_SINPROC_SRC_PEER:
361             switch (type) {
362             case NN_SINPROC_READY:
363                 sinproc->peer = (struct nn_sinproc*) srcptr;
364                 rc = nn_pipebase_start (&sinproc->pipebase);
365                 errnum_assert (rc == 0, -rc);
366                 sinproc->state = NN_SINPROC_STATE_ACTIVE;
367                 nn_fsm_raiseto (&sinproc->fsm, &sinproc->peer->fsm,
368                     &sinproc->event_connect,
369                     NN_SINPROC_SRC_PEER, NN_SINPROC_ACCEPTED, self);
370                 return;
371             default:
372                 nn_fsm_bad_action (sinproc->state, src, type);
373             }
374 
375         default:
376             nn_fsm_bad_source (sinproc->state, src, type);
377         }
378 
379 /******************************************************************************/
380 /*  READY state.                                                              */
381 /*                                                                            */
382 /******************************************************************************/
383     case NN_SINPROC_STATE_READY:
384         switch (src) {
385 
386         case NN_SINPROC_SRC_PEER:
387             switch (type) {
388             case NN_SINPROC_READY:
389                 /*  This means both peers sent READY so they are both
390                     ready for receiving messages  */
391                 rc = nn_pipebase_start (&sinproc->pipebase);
392                 errnum_assert (rc == 0, -rc);
393                 sinproc->state = NN_SINPROC_STATE_ACTIVE;
394                 return;
395             case NN_SINPROC_ACCEPTED:
396                 rc = nn_pipebase_start (&sinproc->pipebase);
397 		/*  We can fail this due to excl_add saying we are already
398                     connected. */
399                 if (rc != 0) {
400                     nn_pipebase_stop (&sinproc->pipebase);
401                     sinproc->state = NN_SINPROC_STATE_DISCONNECTED;
402                     sinproc->peer = NULL;
403                     nn_fsm_raise (&sinproc->fsm, &sinproc->event_disconnect,
404                         NN_SINPROC_DISCONNECT);
405                     return;
406                 }
407                 errnum_assert (rc == 0, -rc);
408                 sinproc->state = NN_SINPROC_STATE_ACTIVE;
409                 return;
410             default:
411                 nn_fsm_bad_action (sinproc->state, src, type);
412             }
413 
414         default:
415             nn_fsm_bad_source (sinproc->state, src, type);
416         }
417 
418 /******************************************************************************/
419 /*  ACTIVE state.                                                             */
420 /******************************************************************************/
421     case NN_SINPROC_STATE_ACTIVE:
422         switch (src) {
423 
424         case NN_SINPROC_SRC_PEER:
425             switch (type) {
426             case NN_SINPROC_SENT:
427 
428                 empty = nn_msgqueue_empty (&sinproc->msgqueue);
429 
430                 /*  Push the message to the inbound message queue. */
431                 rc = nn_msgqueue_send (&sinproc->msgqueue,
432                     &sinproc->peer->msg);
433                 if (rc == -EAGAIN) {
434                     sinproc->flags |= NN_SINPROC_FLAG_RECEIVING;
435                     return;
436                 }
437                 errnum_assert (rc == 0, -rc);
438                 nn_msg_init (&sinproc->peer->msg, 0);
439 
440                 /*  Notify the user that there's a message to receive. */
441                 if (empty)
442                     nn_pipebase_received (&sinproc->pipebase);
443 
444                 /*  Notify the peer that the message was received. */
445                 nn_fsm_raiseto (&sinproc->fsm, &sinproc->peer->fsm,
446                     &sinproc->peer->event_received, NN_SINPROC_SRC_PEER,
447                     NN_SINPROC_RECEIVED, sinproc);
448 
449                 return;
450 
451             case NN_SINPROC_RECEIVED:
452                 nn_assert (sinproc->flags & NN_SINPROC_FLAG_SENDING);
453                 nn_pipebase_sent (&sinproc->pipebase);
454                 sinproc->flags &= ~NN_SINPROC_FLAG_SENDING;
455                 return;
456 
457             case NN_SINPROC_DISCONNECT:
458                 nn_pipebase_stop (&sinproc->pipebase);
459                 nn_fsm_raiseto (&sinproc->fsm, &sinproc->peer->fsm,
460                     &sinproc->peer->event_disconnect, NN_SINPROC_SRC_PEER,
461                     NN_SINPROC_DISCONNECT, sinproc);
462                 sinproc->state = NN_SINPROC_STATE_DISCONNECTED;
463                 sinproc->peer = NULL;
464                 nn_fsm_raise (&sinproc->fsm, &sinproc->event_disconnect,
465                     NN_SINPROC_DISCONNECT);
466                 return;
467 
468             default:
469                 nn_fsm_bad_action (sinproc->state, src, type);
470             }
471 
472         default:
473             nn_fsm_bad_source (sinproc->state, src, type);
474         }
475 
476 /******************************************************************************/
477 /*  DISCONNECTED state.                                                       */
478 /*  The peer have already closed the connection, but the object was not yet   */
479 /*  asked to stop.                                                            */
480 /******************************************************************************/
481     case NN_SINPROC_STATE_DISCONNECTED:
482         switch (src) {
483         case NN_SINPROC_SRC_PEER:
484             switch (type) {
485             case NN_SINPROC_RECEIVED:
486                 /*  This case can safely be ignored. It may happen when
487                     nn_close() comes before the already enqueued
488                     NN_SINPROC_RECEIVED has been delivered.  */
489                 return;
490             default:
491                 nn_fsm_bad_action (sinproc->state, src, type);
492             };
493         default:
494             nn_fsm_bad_source (sinproc->state, src, type);
495         }
496 
497 /******************************************************************************/
498 /*  Invalid state.                                                            */
499 /******************************************************************************/
500     default:
501         nn_fsm_bad_state (sinproc->state, src, type);
502     }
503 }
504 
505