1 /*
2 Copyright (c) 2012-2013 Martin Sustrik All rights reserved.
3 Copyright 2016 Garrett D'Amore <garrett@damore.org>
4
5 Permission is hereby granted, free of charge, to any person obtaining a copy
6 of this software and associated documentation files (the "Software"),
7 to deal in the Software without restriction, including without limitation
8 the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 and/or sell copies of the Software, and to permit persons to whom
10 the Software is furnished to do so, subject to the following conditions:
11
12 The above copyright notice and this permission notice shall be included
13 in all copies or substantial portions of the Software.
14
15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 IN THE SOFTWARE.
22 */
23
24 #include "xsurveyor.h"
25
26 #include "../../nn.h"
27 #include "../../survey.h"
28
29 #include "../../utils/err.h"
30 #include "../../utils/cont.h"
31 #include "../../utils/fast.h"
32 #include "../../utils/alloc.h"
33 #include "../../utils/attr.h"
34
35 #include <stddef.h>
36
37 /* Private functions. */
38 static void nn_xsurveyor_destroy (struct nn_sockbase *self);
39
40 /* Implementation of nn_sockbase's virtual functions. */
41 static const struct nn_sockbase_vfptr nn_xsurveyor_sockbase_vfptr = {
42 NULL,
43 nn_xsurveyor_destroy,
44 nn_xsurveyor_add,
45 nn_xsurveyor_rm,
46 nn_xsurveyor_in,
47 nn_xsurveyor_out,
48 nn_xsurveyor_events,
49 nn_xsurveyor_send,
50 nn_xsurveyor_recv,
51 NULL,
52 NULL
53 };
54
nn_xsurveyor_init(struct nn_xsurveyor * self,const struct nn_sockbase_vfptr * vfptr,void * hint)55 void nn_xsurveyor_init (struct nn_xsurveyor *self,
56 const struct nn_sockbase_vfptr *vfptr, void *hint)
57 {
58 nn_sockbase_init (&self->sockbase, vfptr, hint);
59 nn_dist_init (&self->outpipes);
60 nn_fq_init (&self->inpipes);
61 }
62
nn_xsurveyor_term(struct nn_xsurveyor * self)63 void nn_xsurveyor_term (struct nn_xsurveyor *self)
64 {
65 nn_fq_term (&self->inpipes);
66 nn_dist_term (&self->outpipes);
67 nn_sockbase_term (&self->sockbase);
68 }
69
nn_xsurveyor_destroy(struct nn_sockbase * self)70 static void nn_xsurveyor_destroy (struct nn_sockbase *self)
71 {
72 struct nn_xsurveyor *xsurveyor;
73
74 xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
75
76 nn_xsurveyor_term (xsurveyor);
77 nn_free (xsurveyor);
78 }
79
nn_xsurveyor_add(struct nn_sockbase * self,struct nn_pipe * pipe)80 int nn_xsurveyor_add (struct nn_sockbase *self, struct nn_pipe *pipe)
81 {
82 struct nn_xsurveyor *xsurveyor;
83 struct nn_xsurveyor_data *data;
84 int rcvprio;
85 size_t sz;
86
87 xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
88
89 sz = sizeof (rcvprio);
90 nn_pipe_getopt (pipe, NN_SOL_SOCKET, NN_RCVPRIO, &rcvprio, &sz);
91 nn_assert (sz == sizeof (rcvprio));
92 nn_assert (rcvprio >= 1 && rcvprio <= 16);
93
94 data = nn_alloc (sizeof (struct nn_xsurveyor_data),
95 "pipe data (xsurveyor)");
96 alloc_assert (data);
97 data->pipe = pipe;
98 nn_fq_add (&xsurveyor->inpipes, &data->initem, pipe, rcvprio);
99 nn_dist_add (&xsurveyor->outpipes, &data->outitem, pipe);
100 nn_pipe_setdata (pipe, data);
101
102 return 0;
103 }
104
nn_xsurveyor_rm(struct nn_sockbase * self,struct nn_pipe * pipe)105 void nn_xsurveyor_rm (struct nn_sockbase *self, struct nn_pipe *pipe)
106 {
107 struct nn_xsurveyor *xsurveyor;
108 struct nn_xsurveyor_data *data;
109
110 xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
111 data = nn_pipe_getdata (pipe);
112
113 nn_fq_rm (&xsurveyor->inpipes, &data->initem);
114 nn_dist_rm (&xsurveyor->outpipes, &data->outitem);
115
116 nn_free (data);
117 }
118
nn_xsurveyor_in(struct nn_sockbase * self,struct nn_pipe * pipe)119 void nn_xsurveyor_in (struct nn_sockbase *self, struct nn_pipe *pipe)
120 {
121 struct nn_xsurveyor *xsurveyor;
122 struct nn_xsurveyor_data *data;
123
124 xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
125 data = nn_pipe_getdata (pipe);
126
127 nn_fq_in (&xsurveyor->inpipes, &data->initem);
128 }
129
nn_xsurveyor_out(struct nn_sockbase * self,struct nn_pipe * pipe)130 void nn_xsurveyor_out (struct nn_sockbase *self, struct nn_pipe *pipe)
131 {
132 struct nn_xsurveyor *xsurveyor;
133 struct nn_xsurveyor_data *data;
134
135 xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
136 data = nn_pipe_getdata (pipe);
137
138 nn_dist_out (&xsurveyor->outpipes, &data->outitem);
139 }
140
nn_xsurveyor_events(struct nn_sockbase * self)141 int nn_xsurveyor_events (struct nn_sockbase *self)
142 {
143 struct nn_xsurveyor *xsurveyor;
144 int events;
145
146 xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
147
148 events = NN_SOCKBASE_EVENT_OUT;
149 if (nn_fq_can_recv (&xsurveyor->inpipes))
150 events |= NN_SOCKBASE_EVENT_IN;
151 return events;
152 }
153
nn_xsurveyor_send(struct nn_sockbase * self,struct nn_msg * msg)154 int nn_xsurveyor_send (struct nn_sockbase *self, struct nn_msg *msg)
155 {
156 return nn_dist_send (
157 &nn_cont (self, struct nn_xsurveyor, sockbase)->outpipes, msg, NULL);
158 }
159
nn_xsurveyor_recv(struct nn_sockbase * self,struct nn_msg * msg)160 int nn_xsurveyor_recv (struct nn_sockbase *self, struct nn_msg *msg)
161 {
162 int rc;
163 struct nn_xsurveyor *xsurveyor;
164
165 xsurveyor = nn_cont (self, struct nn_xsurveyor, sockbase);
166
167 rc = nn_fq_recv (&xsurveyor->inpipes, msg, NULL);
168 if (nn_slow (rc < 0))
169 return rc;
170
171 /* Split the header from the body, if needed. */
172 if (!(rc & NN_PIPE_PARSED)) {
173 if (nn_slow (nn_chunkref_size (&msg->body) < sizeof (uint32_t))) {
174 nn_msg_term (msg);
175 return -EAGAIN;
176 }
177 nn_assert (nn_chunkref_size (&msg->sphdr) == 0);
178 nn_chunkref_term (&msg->sphdr);
179 nn_chunkref_init (&msg->sphdr, sizeof (uint32_t));
180 memcpy (nn_chunkref_data (&msg->sphdr), nn_chunkref_data (&msg->body),
181 sizeof (uint32_t));
182 nn_chunkref_trim (&msg->body, sizeof (uint32_t));
183 }
184
185 return 0;
186 }
187
nn_xsurveyor_create(void * hint,struct nn_sockbase ** sockbase)188 static int nn_xsurveyor_create (void *hint, struct nn_sockbase **sockbase)
189 {
190 struct nn_xsurveyor *self;
191
192 self = nn_alloc (sizeof (struct nn_xsurveyor), "socket (xsurveyor)");
193 alloc_assert (self);
194 nn_xsurveyor_init (self, &nn_xsurveyor_sockbase_vfptr, hint);
195 *sockbase = &self->sockbase;
196
197 return 0;
198 }
199
nn_xsurveyor_ispeer(int socktype)200 int nn_xsurveyor_ispeer (int socktype)
201 {
202 return socktype == NN_RESPONDENT ? 1 : 0;
203 }
204
205 struct nn_socktype nn_xsurveyor_socktype = {
206 AF_SP_RAW,
207 NN_SURVEYOR,
208 0,
209 nn_xsurveyor_create,
210 nn_xsurveyor_ispeer,
211 };
212