1 /*
2 Copyright (c) 2013 Martin Sustrik All rights reserved.
3 Copyright 2017 Garrett D'Amore <garrett@damore.org>
4
5 Permission is hereby granted, free of charge, to any person obtaining a copy
6 of this software and associated documentation files (the "Software"),
7 to deal in the Software without restriction, including without limitation
8 the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 and/or sell copies of the Software, and to permit persons to whom
10 the Software is furnished to do so, subject to the following conditions:
11
12 The above copyright notice and this permission notice shall be included
13 in all copies or substantial portions of the Software.
14
15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 IN THE SOFTWARE.
22 */
23
24 #include "sinproc.h"
25
26 #include "../../utils/err.h"
27 #include "../../utils/cont.h"
28 #include "../../utils/attr.h"
29
30 #include <stddef.h>
31
32 #define NN_SINPROC_STATE_IDLE 1
33 #define NN_SINPROC_STATE_CONNECTING 2
34 #define NN_SINPROC_STATE_READY 3
35 #define NN_SINPROC_STATE_ACTIVE 4
36 #define NN_SINPROC_STATE_DISCONNECTED 5
37 #define NN_SINPROC_STATE_STOPPING_PEER 6
38 #define NN_SINPROC_STATE_STOPPING 7
39
40 #define NN_SINPROC_ACTION_READY 1
41 #define NN_SINPROC_ACTION_ACCEPTED 2
42
43 /* Set when SENT event was sent to the peer but RECEIVED haven't been
44 passed back yet. */
45 #define NN_SINPROC_FLAG_SENDING 1
46
47 /* Set when SENT event was received, but the new message cannot be written
48 to the queue yet, i.e. RECEIVED event haven't been returned
49 to the peer yet. */
50 #define NN_SINPROC_FLAG_RECEIVING 2
51
52 /* Private functions. */
53 static void nn_sinproc_handler (struct nn_fsm *self, int src, int type,
54 void *srcptr);
55 static void nn_sinproc_shutdown (struct nn_fsm *self, int src, int type,
56 void *srcptr);
57
58 static int nn_sinproc_send (struct nn_pipebase *self, struct nn_msg *msg);
59 static int nn_sinproc_recv (struct nn_pipebase *self, struct nn_msg *msg);
60 const struct nn_pipebase_vfptr nn_sinproc_pipebase_vfptr = {
61 nn_sinproc_send,
62 nn_sinproc_recv
63 };
64
nn_sinproc_init(struct nn_sinproc * self,int src,struct nn_ep * ep,struct nn_fsm * owner)65 void nn_sinproc_init (struct nn_sinproc *self, int src,
66 struct nn_ep *ep, struct nn_fsm *owner)
67 {
68 int rcvbuf;
69 size_t sz;
70
71 nn_fsm_init (&self->fsm, nn_sinproc_handler, nn_sinproc_shutdown,
72 src, self, owner);
73 self->state = NN_SINPROC_STATE_IDLE;
74 self->flags = 0;
75 self->peer = NULL;
76 nn_pipebase_init (&self->pipebase, &nn_sinproc_pipebase_vfptr, ep);
77 sz = sizeof (rcvbuf);
78 nn_ep_getopt (ep, NN_SOL_SOCKET, NN_RCVBUF, &rcvbuf, &sz);
79 nn_assert (sz == sizeof (rcvbuf));
80 nn_msgqueue_init (&self->msgqueue, rcvbuf);
81 nn_msg_init (&self->msg, 0);
82 nn_fsm_event_init (&self->event_connect);
83 nn_fsm_event_init (&self->event_sent);
84 nn_fsm_event_init (&self->event_received);
85 nn_fsm_event_init (&self->event_disconnect);
86 nn_list_item_init (&self->item);
87 }
88
nn_sinproc_term(struct nn_sinproc * self)89 void nn_sinproc_term (struct nn_sinproc *self)
90 {
91 nn_list_item_term (&self->item);
92 nn_fsm_event_term (&self->event_disconnect);
93 nn_fsm_event_term (&self->event_received);
94 nn_fsm_event_term (&self->event_sent);
95 nn_fsm_event_term (&self->event_connect);
96 nn_msg_term (&self->msg);
97 nn_msgqueue_term (&self->msgqueue);
98 nn_pipebase_term (&self->pipebase);
99 nn_fsm_term (&self->fsm);
100 }
101
nn_sinproc_isidle(struct nn_sinproc * self)102 int nn_sinproc_isidle (struct nn_sinproc *self)
103 {
104 return nn_fsm_isidle (&self->fsm);
105 }
106
nn_sinproc_connect(struct nn_sinproc * self,struct nn_fsm * peer)107 void nn_sinproc_connect (struct nn_sinproc *self, struct nn_fsm *peer)
108 {
109 nn_fsm_start (&self->fsm);
110
111 /* Start the connecting handshake with the peer. */
112 nn_fsm_raiseto (&self->fsm, peer, &self->event_connect,
113 NN_SINPROC_SRC_PEER, NN_SINPROC_CONNECT, self);
114 }
115
nn_sinproc_accept(struct nn_sinproc * self,struct nn_sinproc * peer)116 void nn_sinproc_accept (struct nn_sinproc *self, struct nn_sinproc *peer)
117 {
118 nn_assert (!self->peer);
119 self->peer = peer;
120
121 /* Start the connecting handshake with the peer. */
122 nn_fsm_raiseto (&self->fsm, &peer->fsm, &self->event_connect,
123 NN_SINPROC_SRC_PEER, NN_SINPROC_READY, self);
124
125 /* Notify the state machine. */
126 nn_fsm_start (&self->fsm);
127 nn_fsm_action (&self->fsm, NN_SINPROC_ACTION_READY);
128 }
129
nn_sinproc_stop(struct nn_sinproc * self)130 void nn_sinproc_stop (struct nn_sinproc *self)
131 {
132 nn_fsm_stop (&self->fsm);
133 }
134
135
136
nn_sinproc_send(struct nn_pipebase * self,struct nn_msg * msg)137 static int nn_sinproc_send (struct nn_pipebase *self, struct nn_msg *msg)
138 {
139 struct nn_sinproc *sinproc;
140 struct nn_msg nmsg;
141
142 sinproc = nn_cont (self, struct nn_sinproc, pipebase);
143
144 /* If the peer have already closed the connection, we cannot send
145 anymore. */
146 if (sinproc->state == NN_SINPROC_STATE_DISCONNECTED)
147 return -ECONNRESET;
148
149 /* Sanity checks. */
150 nn_assert_state (sinproc, NN_SINPROC_STATE_ACTIVE);
151 nn_assert (!(sinproc->flags & NN_SINPROC_FLAG_SENDING));
152
153 nn_msg_init (&nmsg,
154 nn_chunkref_size (&msg->sphdr) +
155 nn_chunkref_size (&msg->body));
156 memcpy (nn_chunkref_data (&nmsg.body),
157 nn_chunkref_data (&msg->sphdr),
158 nn_chunkref_size (&msg->sphdr));
159 memcpy ((char *)nn_chunkref_data (&nmsg.body) +
160 nn_chunkref_size (&msg->sphdr),
161 nn_chunkref_data (&msg->body),
162 nn_chunkref_size (&msg->body));
163 nn_msg_term (msg);
164
165 /* Expose the message to the peer. */
166 nn_msg_term (&sinproc->msg);
167 nn_msg_mv (&sinproc->msg, &nmsg);
168
169 /* Notify the peer that there's a message to get. */
170 sinproc->flags |= NN_SINPROC_FLAG_SENDING;
171 nn_fsm_raiseto (&sinproc->fsm, &sinproc->peer->fsm,
172 &sinproc->peer->event_sent, NN_SINPROC_SRC_PEER,
173 NN_SINPROC_SENT, sinproc);
174
175 return 0;
176 }
177
nn_sinproc_recv(struct nn_pipebase * self,struct nn_msg * msg)178 static int nn_sinproc_recv (struct nn_pipebase *self, struct nn_msg *msg)
179 {
180 int rc;
181 struct nn_sinproc *sinproc;
182
183 sinproc = nn_cont (self, struct nn_sinproc, pipebase);
184
185 /* Sanity check. */
186 nn_assert (sinproc->state == NN_SINPROC_STATE_ACTIVE ||
187 sinproc->state == NN_SINPROC_STATE_DISCONNECTED);
188
189 /* Move the message to the caller. */
190 rc = nn_msgqueue_recv (&sinproc->msgqueue, msg);
191 errnum_assert (rc == 0, -rc);
192
193 /* If there was a message from peer lingering because of the exceeded
194 buffer limit, try to enqueue it once again. */
195 if (sinproc->state != NN_SINPROC_STATE_DISCONNECTED) {
196 if (nn_slow (sinproc->flags & NN_SINPROC_FLAG_RECEIVING)) {
197 rc = nn_msgqueue_send (&sinproc->msgqueue, &sinproc->peer->msg);
198 nn_assert (rc == 0 || rc == -EAGAIN);
199 if (rc == 0) {
200 errnum_assert (rc == 0, -rc);
201 nn_msg_init (&sinproc->peer->msg, 0);
202 nn_fsm_raiseto (&sinproc->fsm, &sinproc->peer->fsm,
203 &sinproc->peer->event_received, NN_SINPROC_SRC_PEER,
204 NN_SINPROC_RECEIVED, sinproc);
205 sinproc->flags &= ~NN_SINPROC_FLAG_RECEIVING;
206 }
207 }
208 }
209
210 if (!nn_msgqueue_empty (&sinproc->msgqueue))
211 nn_pipebase_received (&sinproc->pipebase);
212
213 return 0;
214 }
215
nn_sinproc_shutdown_events(struct nn_sinproc * self,int src,int type,NN_UNUSED void * srcptr)216 static void nn_sinproc_shutdown_events (struct nn_sinproc *self, int src,
217 int type, NN_UNUSED void *srcptr)
218 {
219 /* ******************************* */
220 /* Any-state events */
221 /* ******************************* */
222 switch (src) {
223 case NN_FSM_ACTION:
224 switch (type) {
225 case NN_FSM_STOP:
226 if (self->state != NN_SINPROC_STATE_IDLE &&
227 self->state != NN_SINPROC_STATE_DISCONNECTED) {
228 nn_pipebase_stop (&self->pipebase);
229 nn_assert (self->fsm.state == 2 || self->fsm.state == 3);
230 nn_fsm_raiseto (&self->fsm, &self->peer->fsm,
231 &self->peer->event_disconnect, NN_SINPROC_SRC_PEER,
232 NN_SINPROC_DISCONNECT, self);
233
234 self->state = NN_SINPROC_STATE_STOPPING_PEER;
235 } else {
236 self->state = NN_SINPROC_STATE_STOPPING;
237 }
238 return;
239 default:
240 break;
241 }
242 case NN_SINPROC_SRC_PEER:
243 switch (type) {
244 case NN_SINPROC_RECEIVED:
245 return;
246 }
247 }
248
249 /* ******************************* */
250 /* Regular events */
251 /* ******************************* */
252 switch (self->state) {
253 case NN_SINPROC_STATE_STOPPING_PEER:
254 switch (src) {
255 case NN_SINPROC_SRC_PEER:
256 switch (type) {
257 case NN_SINPROC_DISCONNECT:
258 self->state = NN_SINPROC_STATE_STOPPING;
259 return;
260 default:
261 /* We could get a notification about state that
262 was queued earlier, or about a sent message. We
263 do not care about those anymore, we're closing! */
264 return;
265 }
266 default:
267 nn_fsm_bad_source (self->state, src, type);
268 }
269 default:
270 nn_fsm_bad_state (self->state, src, type);
271 }
272
273 nn_fsm_bad_action (self->state, src, type);
274 }
275
nn_sinproc_shutdown(struct nn_fsm * self,int src,int type,void * srcptr)276 static void nn_sinproc_shutdown (struct nn_fsm *self, int src, int type,
277 void *srcptr)
278 {
279 struct nn_sinproc *sinproc;
280
281 sinproc = nn_cont (self, struct nn_sinproc, fsm);
282 nn_assert (sinproc->fsm.state == 3);
283
284 nn_sinproc_shutdown_events (sinproc, src, type, srcptr);
285
286 /* *************** */
287 /* States to check */
288 /* *************** */
289
290 /* Have we got notification that peer is stopped */
291 if (nn_slow (sinproc->state != NN_SINPROC_STATE_STOPPING)) {
292 return;
293 }
294
295 /* Are all events processed? We can't cancel them unfortunately */
296 if (nn_fsm_event_active (&sinproc->event_received)
297 || nn_fsm_event_active (&sinproc->event_disconnect))
298 {
299 return;
300 }
301 /* These events are deemed to be impossible here */
302 nn_assert (!nn_fsm_event_active (&sinproc->event_connect));
303 nn_assert (!nn_fsm_event_active (&sinproc->event_sent));
304
305 /* ********************************************** */
306 /* All checks are successful. Just stop right now */
307 /* ********************************************** */
308
309 nn_fsm_stopped (&sinproc->fsm, NN_SINPROC_STOPPED);
310 return;
311 }
312
nn_sinproc_handler(struct nn_fsm * self,int src,int type,void * srcptr)313 static void nn_sinproc_handler (struct nn_fsm *self, int src, int type,
314 void *srcptr)
315 {
316 int rc;
317 struct nn_sinproc *sinproc;
318 int empty;
319
320 sinproc = nn_cont (self, struct nn_sinproc, fsm);
321
322 switch (sinproc->state) {
323
324 /******************************************************************************/
325 /* IDLE state. */
326 /******************************************************************************/
327 case NN_SINPROC_STATE_IDLE:
328 switch (src) {
329
330 case NN_FSM_ACTION:
331 switch (type) {
332 case NN_FSM_START:
333 sinproc->state = NN_SINPROC_STATE_CONNECTING;
334 return;
335 default:
336 nn_fsm_bad_action (sinproc->state, src, type);
337 }
338
339 default:
340 nn_fsm_bad_source (sinproc->state, src, type);
341 }
342
343 /******************************************************************************/
344 /* CONNECTING state. */
345 /* CONNECT request was sent to the peer. Now we are waiting for the */
346 /* acknowledgement. */
347 /******************************************************************************/
348 case NN_SINPROC_STATE_CONNECTING:
349 switch (src) {
350
351 case NN_FSM_ACTION:
352 switch (type) {
353 case NN_SINPROC_ACTION_READY:
354 sinproc->state = NN_SINPROC_STATE_READY;
355 return;
356 default:
357 nn_fsm_bad_action (sinproc->state, src, type);
358 }
359
360 case NN_SINPROC_SRC_PEER:
361 switch (type) {
362 case NN_SINPROC_READY:
363 sinproc->peer = (struct nn_sinproc*) srcptr;
364 rc = nn_pipebase_start (&sinproc->pipebase);
365 errnum_assert (rc == 0, -rc);
366 sinproc->state = NN_SINPROC_STATE_ACTIVE;
367 nn_fsm_raiseto (&sinproc->fsm, &sinproc->peer->fsm,
368 &sinproc->event_connect,
369 NN_SINPROC_SRC_PEER, NN_SINPROC_ACCEPTED, self);
370 return;
371 default:
372 nn_fsm_bad_action (sinproc->state, src, type);
373 }
374
375 default:
376 nn_fsm_bad_source (sinproc->state, src, type);
377 }
378
379 /******************************************************************************/
380 /* READY state. */
381 /* */
382 /******************************************************************************/
383 case NN_SINPROC_STATE_READY:
384 switch (src) {
385
386 case NN_SINPROC_SRC_PEER:
387 switch (type) {
388 case NN_SINPROC_READY:
389 /* This means both peers sent READY so they are both
390 ready for receiving messages */
391 rc = nn_pipebase_start (&sinproc->pipebase);
392 errnum_assert (rc == 0, -rc);
393 sinproc->state = NN_SINPROC_STATE_ACTIVE;
394 return;
395 case NN_SINPROC_ACCEPTED:
396 rc = nn_pipebase_start (&sinproc->pipebase);
397 /* We can fail this due to excl_add saying we are already
398 connected. */
399 if (rc != 0) {
400 nn_pipebase_stop (&sinproc->pipebase);
401 sinproc->state = NN_SINPROC_STATE_DISCONNECTED;
402 sinproc->peer = NULL;
403 nn_fsm_raise (&sinproc->fsm, &sinproc->event_disconnect,
404 NN_SINPROC_DISCONNECT);
405 return;
406 }
407 errnum_assert (rc == 0, -rc);
408 sinproc->state = NN_SINPROC_STATE_ACTIVE;
409 return;
410 default:
411 nn_fsm_bad_action (sinproc->state, src, type);
412 }
413
414 default:
415 nn_fsm_bad_source (sinproc->state, src, type);
416 }
417
418 /******************************************************************************/
419 /* ACTIVE state. */
420 /******************************************************************************/
421 case NN_SINPROC_STATE_ACTIVE:
422 switch (src) {
423
424 case NN_SINPROC_SRC_PEER:
425 switch (type) {
426 case NN_SINPROC_SENT:
427
428 empty = nn_msgqueue_empty (&sinproc->msgqueue);
429
430 /* Push the message to the inbound message queue. */
431 rc = nn_msgqueue_send (&sinproc->msgqueue,
432 &sinproc->peer->msg);
433 if (rc == -EAGAIN) {
434 sinproc->flags |= NN_SINPROC_FLAG_RECEIVING;
435 return;
436 }
437 errnum_assert (rc == 0, -rc);
438 nn_msg_init (&sinproc->peer->msg, 0);
439
440 /* Notify the user that there's a message to receive. */
441 if (empty)
442 nn_pipebase_received (&sinproc->pipebase);
443
444 /* Notify the peer that the message was received. */
445 nn_fsm_raiseto (&sinproc->fsm, &sinproc->peer->fsm,
446 &sinproc->peer->event_received, NN_SINPROC_SRC_PEER,
447 NN_SINPROC_RECEIVED, sinproc);
448
449 return;
450
451 case NN_SINPROC_RECEIVED:
452 nn_assert (sinproc->flags & NN_SINPROC_FLAG_SENDING);
453 nn_pipebase_sent (&sinproc->pipebase);
454 sinproc->flags &= ~NN_SINPROC_FLAG_SENDING;
455 return;
456
457 case NN_SINPROC_DISCONNECT:
458 nn_pipebase_stop (&sinproc->pipebase);
459 nn_fsm_raiseto (&sinproc->fsm, &sinproc->peer->fsm,
460 &sinproc->peer->event_disconnect, NN_SINPROC_SRC_PEER,
461 NN_SINPROC_DISCONNECT, sinproc);
462 sinproc->state = NN_SINPROC_STATE_DISCONNECTED;
463 sinproc->peer = NULL;
464 nn_fsm_raise (&sinproc->fsm, &sinproc->event_disconnect,
465 NN_SINPROC_DISCONNECT);
466 return;
467
468 default:
469 nn_fsm_bad_action (sinproc->state, src, type);
470 }
471
472 default:
473 nn_fsm_bad_source (sinproc->state, src, type);
474 }
475
476 /******************************************************************************/
477 /* DISCONNECTED state. */
478 /* The peer have already closed the connection, but the object was not yet */
479 /* asked to stop. */
480 /******************************************************************************/
481 case NN_SINPROC_STATE_DISCONNECTED:
482 switch (src) {
483 case NN_SINPROC_SRC_PEER:
484 switch (type) {
485 case NN_SINPROC_RECEIVED:
486 /* This case can safely be ignored. It may happen when
487 nn_close() comes before the already enqueued
488 NN_SINPROC_RECEIVED has been delivered. */
489 return;
490 default:
491 nn_fsm_bad_action (sinproc->state, src, type);
492 };
493 default:
494 nn_fsm_bad_source (sinproc->state, src, type);
495 }
496
497 /******************************************************************************/
498 /* Invalid state. */
499 /******************************************************************************/
500 default:
501 nn_fsm_bad_state (sinproc->state, src, type);
502 }
503 }
504
505