1 /*
2     Copyright (c) 2012-2013 Martin Sustrik  All rights reserved.
3     Copyright 2016 Garrett D'Amore <garrett@damore.org>
4 
5     Permission is hereby granted, free of charge, to any person obtaining a copy
6     of this software and associated documentation files (the "Software"),
7     to deal in the Software without restriction, including without limitation
8     the rights to use, copy, modify, merge, publish, distribute, sublicense,
9     and/or sell copies of the Software, and to permit persons to whom
10     the Software is furnished to do so, subject to the following conditions:
11 
12     The above copyright notice and this permission notice shall be included
13     in all copies or substantial portions of the Software.
14 
15     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18     THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21     IN THE SOFTWARE.
22 */
23 
24 #include "xsurveyor.h"
25 
26 #include "../../nn.h"
27 #include "../../survey.h"
28 
29 #include "../../utils/err.h"
30 #include "../../utils/cont.h"
31 #include "../../utils/fast.h"
32 #include "../../utils/alloc.h"
33 #include "../../utils/attr.h"
34 
35 #include <stddef.h>
36 
37 /*  Private functions. */
38 static void nn_xsurveyor_destroy (struct nn_sockbase *self);
39 
40 /*  Implementation of nn_sockbase's virtual functions. */
41 static const struct nn_sockbase_vfptr nn_xsurveyor_sockbase_vfptr = {
42     NULL,
43     nn_xsurveyor_destroy,
44     nn_xsurveyor_add,
45     nn_xsurveyor_rm,
46     nn_xsurveyor_in,
47     nn_xsurveyor_out,
48     nn_xsurveyor_events,
49     nn_xsurveyor_send,
50     nn_xsurveyor_recv,
51     NULL,
52     NULL
53 };
54 
nn_xsurveyor_init(struct nn_xsurveyor * self,const struct nn_sockbase_vfptr * vfptr,void * hint)55 void nn_xsurveyor_init (struct nn_xsurveyor *self,
56     const struct nn_sockbase_vfptr *vfptr, void *hint)
57 {
58     nn_sockbase_init (&self->sockbase, vfptr, hint);
59     nn_dist_init (&self->outpipes);
60     nn_fq_init (&self->inpipes);
61 }
62 
nn_xsurveyor_term(struct nn_xsurveyor * self)63 void nn_xsurveyor_term (struct nn_xsurveyor *self)
64 {
65     nn_fq_term (&self->inpipes);
66     nn_dist_term (&self->outpipes);
67     nn_sockbase_term (&self->sockbase);
68 }
69 
nn_xsurveyor_destroy(struct nn_sockbase * self)70 static void nn_xsurveyor_destroy (struct nn_sockbase *self)
71 {
72     struct nn_xsurveyor *xsurveyor;
73 
74     xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
75 
76     nn_xsurveyor_term (xsurveyor);
77     nn_free (xsurveyor);
78 }
79 
nn_xsurveyor_add(struct nn_sockbase * self,struct nn_pipe * pipe)80 int nn_xsurveyor_add (struct nn_sockbase *self, struct nn_pipe *pipe)
81 {
82     struct nn_xsurveyor *xsurveyor;
83     struct nn_xsurveyor_data *data;
84     int rcvprio;
85     size_t sz;
86 
87     xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
88 
89     sz = sizeof (rcvprio);
90     nn_pipe_getopt (pipe, NN_SOL_SOCKET, NN_RCVPRIO, &rcvprio, &sz);
91     nn_assert (sz == sizeof (rcvprio));
92     nn_assert (rcvprio >= 1 && rcvprio <= 16);
93 
94     data = nn_alloc (sizeof (struct nn_xsurveyor_data),
95         "pipe data (xsurveyor)");
96     alloc_assert (data);
97     data->pipe = pipe;
98     nn_fq_add (&xsurveyor->inpipes, &data->initem, pipe, rcvprio);
99     nn_dist_add (&xsurveyor->outpipes, &data->outitem, pipe);
100     nn_pipe_setdata (pipe, data);
101 
102     return 0;
103 }
104 
nn_xsurveyor_rm(struct nn_sockbase * self,struct nn_pipe * pipe)105 void nn_xsurveyor_rm (struct nn_sockbase *self, struct nn_pipe *pipe)
106 {
107     struct nn_xsurveyor *xsurveyor;
108     struct nn_xsurveyor_data *data;
109 
110     xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
111     data = nn_pipe_getdata (pipe);
112 
113     nn_fq_rm (&xsurveyor->inpipes, &data->initem);
114     nn_dist_rm (&xsurveyor->outpipes, &data->outitem);
115 
116     nn_free (data);
117 }
118 
nn_xsurveyor_in(struct nn_sockbase * self,struct nn_pipe * pipe)119 void nn_xsurveyor_in (struct nn_sockbase *self, struct nn_pipe *pipe)
120 {
121     struct nn_xsurveyor *xsurveyor;
122     struct nn_xsurveyor_data *data;
123 
124     xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
125     data = nn_pipe_getdata (pipe);
126 
127     nn_fq_in (&xsurveyor->inpipes, &data->initem);
128 }
129 
nn_xsurveyor_out(struct nn_sockbase * self,struct nn_pipe * pipe)130 void nn_xsurveyor_out (struct nn_sockbase *self, struct nn_pipe *pipe)
131 {
132     struct nn_xsurveyor *xsurveyor;
133     struct nn_xsurveyor_data *data;
134 
135     xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
136     data = nn_pipe_getdata (pipe);
137 
138     nn_dist_out (&xsurveyor->outpipes, &data->outitem);
139 }
140 
nn_xsurveyor_events(struct nn_sockbase * self)141 int nn_xsurveyor_events (struct nn_sockbase *self)
142 {
143     struct nn_xsurveyor *xsurveyor;
144     int events;
145 
146     xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
147 
148     events = NN_SOCKBASE_EVENT_OUT;
149     if (nn_fq_can_recv (&xsurveyor->inpipes))
150         events |= NN_SOCKBASE_EVENT_IN;
151     return events;
152 }
153 
nn_xsurveyor_send(struct nn_sockbase * self,struct nn_msg * msg)154 int nn_xsurveyor_send (struct nn_sockbase *self, struct nn_msg *msg)
155 {
156     return nn_dist_send (
157         &nn_cont (self, struct nn_xsurveyor, sockbase)->outpipes, msg, NULL);
158 }
159 
nn_xsurveyor_recv(struct nn_sockbase * self,struct nn_msg * msg)160 int nn_xsurveyor_recv (struct nn_sockbase *self, struct nn_msg *msg)
161 {
162     int rc;
163     struct nn_xsurveyor *xsurveyor;
164 
165     xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
166 
167     rc = nn_fq_recv (&xsurveyor->inpipes, msg, NULL);
168     if (nn_slow (rc < 0))
169         return rc;
170 
171     /*  Split the header from the body, if needed. */
172     if (!(rc & NN_PIPE_PARSED)) {
173         if (nn_slow (nn_chunkref_size (&msg->body) < sizeof (uint32_t))) {
174             nn_msg_term (msg);
175             return -EAGAIN;
176         }
177         nn_assert (nn_chunkref_size (&msg->sphdr) == 0);
178         nn_chunkref_term (&msg->sphdr);
179         nn_chunkref_init (&msg->sphdr, sizeof (uint32_t));
180         memcpy (nn_chunkref_data (&msg->sphdr), nn_chunkref_data (&msg->body),
181            sizeof (uint32_t));
182         nn_chunkref_trim (&msg->body, sizeof (uint32_t));
183     }
184 
185     return 0;
186 }
187 
nn_xsurveyor_create(void * hint,struct nn_sockbase ** sockbase)188 static int nn_xsurveyor_create (void *hint, struct nn_sockbase **sockbase)
189 {
190     struct nn_xsurveyor *self;
191 
192     self = nn_alloc (sizeof (struct nn_xsurveyor), "socket (xsurveyor)");
193     alloc_assert (self);
194     nn_xsurveyor_init (self, &nn_xsurveyor_sockbase_vfptr, hint);
195     *sockbase = &self->sockbase;
196 
197     return 0;
198 }
199 
nn_xsurveyor_ispeer(int socktype)200 int nn_xsurveyor_ispeer (int socktype)
201 {
202     return socktype == NN_RESPONDENT ? 1 : 0;
203 }
204 
205 struct nn_socktype nn_xsurveyor_socktype = {
206     AF_SP_RAW,
207     NN_SURVEYOR,
208     0,
209     nn_xsurveyor_create,
210     nn_xsurveyor_ispeer,
211 };
212