1 /*
2 * aprsc
3 *
4 * (c) Heikki Hannikainen, OH7LZB <hessu@hes.iki.fi>
5 *
6 * This program is licensed under the BSD license, which can be found
7 * in the file LICENSE.
8 *
9 */
10
11 /*
12 * outgoing.c: handle outgoing packets in the worker thread
13 */
14
15 #include <string.h>
16 #include <stdlib.h>
17
18 #include "outgoing.h"
19 #include "hlog.h"
20 #include "filter.h"
21 #include "status.h"
22
23 /*
24 * send a single packet to all clients (and peers and uplinks) which
25 * should have a copy
26 */
27
send_single(struct worker_t * self,struct client_t * c,char * data,int len)28 static inline void send_single(struct worker_t *self, struct client_t *c, char *data, int len)
29 {
30 /* if we're going to use the UDP sidechannel, account for UDP, otherwise
31 * its TCP or SCTP or something.
32 */
33 if (c->udp_port && c->udpclient)
34 clientaccount_add( c, IPPROTO_UDP, 0, 0, 0, 1, 0, 0);
35 else
36 clientaccount_add( c, c->ai_protocol, 0, 0, 0, 1, 0, 0);
37
38 c->write(self, c, data, len);
39 }
40
process_outgoing_single(struct worker_t * self,struct pbuf_t * pb)41 static void process_outgoing_single(struct worker_t *self, struct pbuf_t *pb)
42 {
43 struct client_t *c, *cnext;
44 struct client_t *origin = pb->origin; /* reduce pointer deferencing in tight loops */
45
46 /*
47 // debug dump
48 if (self->id == 0) {
49 hlog(LOG_DEBUG, "o: %*s", pb->packet_len-2, pb->data);
50 hlog(LOG_DEBUG, "b:%s%s",
51 (pb->flags & F_FROM_UPSTR) ? " from_upstr" : "",
52 (pb->flags & F_FROM_DOWNSTR) ? " from_downstr" : ""
53 );
54 }
55 */
56
57 /* specific tight loops */
58
59 if (pb->flags & F_DUPE) {
60 /* Duplicate packet. Don't send, unless client especially wants! */
61 if (self->clients_dupe) {
62 /* if we have any dupe clients at all, generate a version with "dup\t"
63 * prefix to avoid regular clients processing dupes
64 */
65 char dupe_sendbuf[PACKETLEN_MAX+7];
66 memcpy(dupe_sendbuf, "dup\t", 4);
67 memcpy(dupe_sendbuf + 4, pb->data, pb->packet_len);
68 int dupe_len = pb->packet_len + 4;
69
70 for (c = self->clients_dupe; (c); c = cnext) {
71 cnext = c->class_next; // client_write() MAY destroy the client object!
72 send_single(self, c, dupe_sendbuf, dupe_len);
73 }
74 }
75
76 /* Check if I have the client which sent this dupe, and
77 * increment it's dupe counter
78 */
79 /* OPTIMIZE: we walk through all clients for each dupe - how to find it quickly? */
80 for (c = self->clients; (c); c = c->next) {
81 if (c == origin) {
82 clientaccount_add(c, -1, 0, 0, 0, 0, 0, 1);
83 break;
84 }
85 }
86
87 return;
88 }
89
90 if (pb->flags & F_FROM_DOWNSTR) {
91 /* client is from downstream, send to upstreams and peers */
92 for (c = self->clients_ups; (c); c = cnext) {
93 cnext = c->class_next; // client_write() MAY destroy the client object!
94 if (c != origin)
95 send_single(self, c, pb->data, pb->packet_len);
96 }
97 }
98
99 /* packet came from anywhere and is not a dupe - let's go through the
100 * clients who connected us
101 */
102 for (c = self->clients_other; (c); c = cnext) {
103 cnext = c->class_next; // client_write() MAY destroy the client object!
104
105 /* If not full feed, process filters to see if the packet should be sent. */
106 if (( (c->flags & CLFLAGS_FULLFEED) != CLFLAGS_FULLFEED) && filter_process(self, c, pb) < 1) {
107 //hlog(LOG_DEBUG, "fd %d: Not fullfeed or not matching filter, not sending.", c->fd);
108 continue;
109 }
110
111 /* Do not send packet back to the source client.
112 This may reject a packet that came from a socket that got
113 closed a few milliseconds ago and its client_t got
114 recycled on a newly connected client, but if the new client
115 is a long living one, all further packets will be accepted
116 just fine.
117 Very unlikely check, so check for this last.
118 */
119 if (c == origin) {
120 //hlog(LOG_DEBUG, "%d: not sending to client: originated from this socketsocket", c->fd);
121 continue;
122 }
123
124 send_single(self, c, pb->data, pb->packet_len);
125 }
126 }
127
128
129 /*
130 * Process outgoing packets from the global packet queue, write them to clients
131 */
132
process_outgoing(struct worker_t * self)133 void process_outgoing(struct worker_t *self)
134 {
135 struct pbuf_t *pb;
136 int e;
137
138 if ((e = rwl_rdlock(&pbuf_global_rwlock))) {
139 hlog(LOG_CRIT, "worker: Failed to rdlock pbuf_global_rwlock!");
140 exit(1);
141 }
142
143 while ((pb = *self->pbuf_global_prevp)) {
144 //__sync_synchronize();
145 /* Some safety checks against bugs and overload conditions */
146 if (pb->is_free) {
147 hlog(LOG_ERR, "worker %d: process_outgoing got pbuf %d marked free, age %d (now %d t %d)\n%.*s",
148 self->id, pb->seqnum, tick - pb->t, tick, pb->t, pb->packet_len-2, pb->data);
149 abort(); /* this would be pretty bad, so we crash immediately */
150 } else if (pb->t > tick + 2) {
151 /* 2-second offset is normal in case of one thread updating tick earlier than another
152 * and a little thread scheduling luck
153 */
154 hlog(LOG_ERR, "worker %d: process_outgoing got packet %d from future with t %d > tick %d!\n%.*s",
155 self->id, pb->seqnum, pb->t, tick, pb->packet_len-2, pb->data);
156 self->internal_packet_drops++;
157 if (self->internal_packet_drops > 10)
158 status_error(86400, "packet_drop_future");
159 } else if (tick - pb->t > 5) {
160 /* this is a bit too old, are we stuck? */
161 hlog(LOG_ERR, "worker %d: process_outgoing got packet %d aged %d sec (now %d t %d)\n%.*s",
162 self->id, pb->seqnum, tick - pb->t, tick, pb->t, pb->packet_len-2, pb->data);
163 self->internal_packet_drops++;
164 if (self->internal_packet_drops > 10)
165 status_error(86400, "packet_drop_hang");
166 } else {
167 process_outgoing_single(self, pb);
168 }
169 self->last_pbuf_seqnum = pb->seqnum;
170 self->pbuf_global_prevp = &pb->next;
171 }
172
173 while ((pb = *self->pbuf_global_dupe_prevp)) {
174 if (pb->is_free) {
175 hlog(LOG_ERR, "worker %d: process_outgoing got dupe %d marked free, age %d (now %d t %d)\n%.*s",
176 self->id, pb->seqnum, tick - pb->t, tick, pb->t, pb->packet_len-2, pb->data);
177 abort();
178 } else if (pb->t > tick + 2) {
179 hlog(LOG_ERR, "worker: process_outgoing got dupe from future %d with t %d > tick %d!\n%.*s",
180 pb->seqnum, pb->t, tick, pb->packet_len-2, pb->data);
181 } else if (tick - pb->t > 5) {
182 hlog(LOG_ERR, "worker: process_outgoing got dupe %d aged %d sec\n%.*s",
183 pb->seqnum, tick - pb->t, pb->packet_len-2, pb->data);
184 } else {
185 process_outgoing_single(self, pb);
186 }
187 self->last_pbuf_dupe_seqnum = pb->seqnum;
188 self->pbuf_global_dupe_prevp = &pb->next;
189 }
190
191 if ((e = rwl_rdunlock(&pbuf_global_rwlock))) {
192 hlog(LOG_CRIT, "worker: Failed to rdunlock pbuf_global_rwlock!");
193 exit(1);
194 }
195 }
196
197