1 /*
2 Copyright (c) 2013-2014 Martin Sustrik All rights reserved.
3 Copyright 2016 Garrett D'Amore <garrett@damore.org>
4
5 Permission is hereby granted, free of charge, to any person obtaining a copy
6 of this software and associated documentation files (the "Software"),
7 to deal in the Software without restriction, including without limitation
8 the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 and/or sell copies of the Software, and to permit persons to whom
10 the Software is furnished to do so, subject to the following conditions:
11
12 The above copyright notice and this permission notice shall be included
13 in all copies or substantial portions of the Software.
14
15 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
18 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 IN THE SOFTWARE.
22 */
23
24 #include "xbus.h"
25
26 #include "../../nn.h"
27 #include "../../bus.h"
28
29 #include "../../utils/err.h"
30 #include "../../utils/cont.h"
31 #include "../../utils/fast.h"
32 #include "../../utils/alloc.h"
33 #include "../../utils/attr.h"
34
35 #include <stddef.h>
36 #include <string.h>
37
38 /* To make the algorithm super efficient we directly cast pipe pointers to
39 pipe IDs (rather than maintaining a hash table). For this to work, it is
40 neccessary for the pointer to fit in 64-bit ID. */
41 CT_ASSERT (sizeof (uint64_t) >= sizeof (struct nn_pipe*));
42
43 /* Implementation of nn_sockbase's virtual functions. */
44 static void nn_xbus_destroy (struct nn_sockbase *self);
45 static const struct nn_sockbase_vfptr nn_xbus_sockbase_vfptr = {
46 NULL,
47 nn_xbus_destroy,
48 nn_xbus_add,
49 nn_xbus_rm,
50 nn_xbus_in,
51 nn_xbus_out,
52 nn_xbus_events,
53 nn_xbus_send,
54 nn_xbus_recv,
55 NULL,
56 NULL
57 };
58
nn_xbus_init(struct nn_xbus * self,const struct nn_sockbase_vfptr * vfptr,void * hint)59 void nn_xbus_init (struct nn_xbus *self,
60 const struct nn_sockbase_vfptr *vfptr, void *hint)
61 {
62 nn_sockbase_init (&self->sockbase, vfptr, hint);
63 nn_dist_init (&self->outpipes);
64 nn_fq_init (&self->inpipes);
65 }
66
nn_xbus_term(struct nn_xbus * self)67 void nn_xbus_term (struct nn_xbus *self)
68 {
69 nn_fq_term (&self->inpipes);
70 nn_dist_term (&self->outpipes);
71 nn_sockbase_term (&self->sockbase);
72 }
73
nn_xbus_destroy(struct nn_sockbase * self)74 static void nn_xbus_destroy (struct nn_sockbase *self)
75 {
76 struct nn_xbus *xbus;
77
78 xbus = nn_cont (self, struct nn_xbus, sockbase);
79
80 nn_xbus_term (xbus);
81 nn_free (xbus);
82 }
83
nn_xbus_add(struct nn_sockbase * self,struct nn_pipe * pipe)84 int nn_xbus_add (struct nn_sockbase *self, struct nn_pipe *pipe)
85 {
86 struct nn_xbus *xbus;
87 struct nn_xbus_data *data;
88 int rcvprio;
89 size_t sz;
90
91 xbus = nn_cont (self, struct nn_xbus, sockbase);
92
93 sz = sizeof (rcvprio);
94 nn_pipe_getopt (pipe, NN_SOL_SOCKET, NN_RCVPRIO, &rcvprio, &sz);
95 nn_assert (sz == sizeof (rcvprio));
96 nn_assert (rcvprio >= 1 && rcvprio <= 16);
97
98 data = nn_alloc (sizeof (struct nn_xbus_data), "pipe data (xbus)");
99 alloc_assert (data);
100 nn_fq_add (&xbus->inpipes, &data->initem, pipe, rcvprio);
101 nn_dist_add (&xbus->outpipes, &data->outitem, pipe);
102 nn_pipe_setdata (pipe, data);
103
104 return 0;
105 }
106
nn_xbus_rm(struct nn_sockbase * self,struct nn_pipe * pipe)107 void nn_xbus_rm (struct nn_sockbase *self, struct nn_pipe *pipe)
108 {
109 struct nn_xbus *xbus;
110 struct nn_xbus_data *data;
111
112 xbus = nn_cont (self, struct nn_xbus, sockbase);
113 data = nn_pipe_getdata (pipe);
114
115 nn_fq_rm (&xbus->inpipes, &data->initem);
116 nn_dist_rm (&xbus->outpipes, &data->outitem);
117
118 nn_free (data);
119 }
120
nn_xbus_in(struct nn_sockbase * self,struct nn_pipe * pipe)121 void nn_xbus_in (struct nn_sockbase *self, struct nn_pipe *pipe)
122 {
123 struct nn_xbus *xbus;
124 struct nn_xbus_data *data;
125
126 xbus = nn_cont (self, struct nn_xbus, sockbase);
127 data = nn_pipe_getdata (pipe);
128
129 nn_fq_in (&xbus->inpipes, &data->initem);
130 }
131
nn_xbus_out(struct nn_sockbase * self,struct nn_pipe * pipe)132 void nn_xbus_out (struct nn_sockbase *self, struct nn_pipe *pipe)
133 {
134 struct nn_xbus *xbus;
135 struct nn_xbus_data *data;
136
137 xbus = nn_cont (self, struct nn_xbus, sockbase);
138 data = nn_pipe_getdata (pipe);
139
140 nn_dist_out (&xbus->outpipes, &data->outitem);
141 }
142
nn_xbus_events(struct nn_sockbase * self)143 int nn_xbus_events (struct nn_sockbase *self)
144 {
145 return (nn_fq_can_recv (&nn_cont (self, struct nn_xbus,
146 sockbase)->inpipes) ? NN_SOCKBASE_EVENT_IN : 0) | NN_SOCKBASE_EVENT_OUT;
147 }
148
nn_xbus_send(struct nn_sockbase * self,struct nn_msg * msg)149 int nn_xbus_send (struct nn_sockbase *self, struct nn_msg *msg)
150 {
151 size_t hdrsz;
152 struct nn_pipe *exclude;
153
154 hdrsz = nn_chunkref_size (&msg->sphdr);
155 if (hdrsz == 0)
156 exclude = NULL;
157 else if (hdrsz == sizeof (uint64_t)) {
158 memcpy (&exclude, nn_chunkref_data (&msg->sphdr), sizeof (exclude));
159 nn_chunkref_term (&msg->sphdr);
160 nn_chunkref_init (&msg->sphdr, 0);
161 }
162 else
163 return -EINVAL;
164
165 return nn_dist_send (&nn_cont (self, struct nn_xbus, sockbase)->outpipes,
166 msg, exclude);
167 }
168
nn_xbus_recv(struct nn_sockbase * self,struct nn_msg * msg)169 int nn_xbus_recv (struct nn_sockbase *self, struct nn_msg *msg)
170 {
171 int rc;
172 struct nn_xbus *xbus;
173 struct nn_pipe *pipe;
174
175 xbus = nn_cont (self, struct nn_xbus, sockbase);
176
177 while (1) {
178
179 /* Get next message in fair-queued manner. */
180 rc = nn_fq_recv (&xbus->inpipes, msg, &pipe);
181 if (nn_slow (rc < 0))
182 return rc;
183
184 /* The message should have no header. Drop malformed messages. */
185 if (nn_chunkref_size (&msg->sphdr) == 0)
186 break;
187 nn_msg_term (msg);
188 }
189
190 /* Add pipe ID to the message header. */
191 nn_chunkref_term (&msg->sphdr);
192 nn_chunkref_init (&msg->sphdr, sizeof (uint64_t));
193 memset (nn_chunkref_data (&msg->sphdr), 0, sizeof (uint64_t));
194 memcpy (nn_chunkref_data (&msg->sphdr), &pipe, sizeof (pipe));
195
196 return 0;
197 }
198
nn_xbus_create(void * hint,struct nn_sockbase ** sockbase)199 static int nn_xbus_create (void *hint, struct nn_sockbase **sockbase)
200 {
201 struct nn_xbus *self;
202
203 self = nn_alloc (sizeof (struct nn_xbus), "socket (bus)");
204 alloc_assert (self);
205 nn_xbus_init (self, &nn_xbus_sockbase_vfptr, hint);
206 *sockbase = &self->sockbase;
207
208 return 0;
209 }
210
nn_xbus_ispeer(int socktype)211 int nn_xbus_ispeer (int socktype)
212 {
213 return socktype == NN_BUS ? 1 : 0;
214 }
215
216 struct nn_socktype nn_xbus_socktype = {
217 AF_SP_RAW,
218 NN_BUS,
219 0,
220 nn_xbus_create,
221 nn_xbus_ispeer,
222 };
223