1 /*
2  *	aprsc
3  *
4  *	(c) Heikki Hannikainen, OH7LZB <hessu@hes.iki.fi>
5  *
6  *     This program is licensed under the BSD license, which can be found
7  *     in the file LICENSE.
8  *
9  */
10 
11 /*
12  *	outgoing.c: handle outgoing packets in the worker thread
13  */
14 
15 #include <string.h>
16 #include <stdlib.h>
17 
18 #include "outgoing.h"
19 #include "hlog.h"
20 #include "filter.h"
21 #include "status.h"
22 
23 /*
24  *	send a single packet to all clients (and peers and uplinks) which
25  *	should have a copy
26  */
27 
send_single(struct worker_t * self,struct client_t * c,char * data,int len)28 static inline void send_single(struct worker_t *self, struct client_t *c, char *data, int len)
29 {
30 	/* if we're going to use the UDP sidechannel, account for UDP, otherwise
31 	 * its TCP or SCTP or something.
32 	 */
33 	if (c->udp_port && c->udpclient)
34 		clientaccount_add( c, IPPROTO_UDP, 0, 0, 0, 1, 0, 0);
35 	else
36 		clientaccount_add( c, c->ai_protocol, 0, 0, 0, 1, 0, 0);
37 
38 	c->write(self, c, data, len);
39 }
40 
process_outgoing_single(struct worker_t * self,struct pbuf_t * pb)41 static void process_outgoing_single(struct worker_t *self, struct pbuf_t *pb)
42 {
43 	struct client_t *c, *cnext;
44 	struct client_t *origin = pb->origin; /* reduce pointer deferencing in tight loops */
45 
46 	/*
47 	// debug dump
48 	if (self->id == 0) {
49 		hlog(LOG_DEBUG, "o: %*s", pb->packet_len-2, pb->data);
50 		hlog(LOG_DEBUG, "b:%s%s",
51 			(pb->flags & F_FROM_UPSTR) ? " from_upstr" : "",
52 			(pb->flags & F_FROM_DOWNSTR) ? " from_downstr" : ""
53 			);
54 	}
55 	*/
56 
57 	/* specific tight loops */
58 
59 	if (pb->flags & F_DUPE) {
60 		/* Duplicate packet. Don't send, unless client especially wants! */
61 		if (self->clients_dupe) {
62 			/* if we have any dupe clients at all, generate a version with "dup\t"
63 			 * prefix to avoid regular clients processing dupes
64 			 */
65 			char dupe_sendbuf[PACKETLEN_MAX+7];
66 			memcpy(dupe_sendbuf, "dup\t", 4);
67 			memcpy(dupe_sendbuf + 4, pb->data, pb->packet_len);
68 			int dupe_len = pb->packet_len + 4;
69 
70 			for (c = self->clients_dupe; (c); c = cnext) {
71 				cnext = c->class_next; // client_write() MAY destroy the client object!
72 				send_single(self, c, dupe_sendbuf, dupe_len);
73 			}
74 		}
75 
76 		/* Check if I have the client which sent this dupe, and
77 		 * increment it's dupe counter
78 		 */
79 		/* OPTIMIZE: we walk through all clients for each dupe - how to find it quickly? */
80 		for (c = self->clients; (c); c = c->next) {
81 			if (c == origin) {
82 				clientaccount_add(c, -1, 0, 0, 0, 0, 0, 1);
83 				break;
84 			}
85 		}
86 
87 		return;
88 	}
89 
90 	if (pb->flags & F_FROM_DOWNSTR) {
91 		/* client is from downstream, send to upstreams and peers */
92 		for (c = self->clients_ups; (c); c = cnext) {
93 			cnext = c->class_next; // client_write() MAY destroy the client object!
94 			if (c != origin)
95 				send_single(self, c, pb->data, pb->packet_len);
96 		}
97 	}
98 
99 	/* packet came from anywhere and is not a dupe - let's go through the
100 	 * clients who connected us
101 	 */
102 	for (c = self->clients_other; (c); c = cnext) {
103 		cnext = c->class_next; // client_write() MAY destroy the client object!
104 
105 		/* If not full feed, process filters to see if the packet should be sent. */
106 		if (( (c->flags & CLFLAGS_FULLFEED) != CLFLAGS_FULLFEED) && filter_process(self, c, pb) < 1) {
107 			//hlog(LOG_DEBUG, "fd %d: Not fullfeed or not matching filter, not sending.", c->fd);
108 			continue;
109 		}
110 
111 		/* Do not send packet back to the source client.
112 		   This may reject a packet that came from a socket that got
113 		   closed a few milliseconds ago and its client_t got
114 		   recycled on a newly connected client, but if the new client
115 		   is a long living one, all further packets will be accepted
116 		   just fine.
117 		   Very unlikely check, so check for this last.
118 		 */
119 		if (c == origin) {
120 			//hlog(LOG_DEBUG, "%d: not sending to client: originated from this socketsocket", c->fd);
121 			continue;
122 		}
123 
124 		send_single(self, c, pb->data, pb->packet_len);
125 	}
126 }
127 
128 
129 /*
130  *	Process outgoing packets from the global packet queue, write them to clients
131  */
132 
process_outgoing(struct worker_t * self)133 void process_outgoing(struct worker_t *self)
134 {
135 	struct pbuf_t *pb;
136 	int e;
137 
138 	if ((e = rwl_rdlock(&pbuf_global_rwlock))) {
139 		hlog(LOG_CRIT, "worker: Failed to rdlock pbuf_global_rwlock!");
140 		exit(1);
141 	}
142 
143 	while ((pb = *self->pbuf_global_prevp)) {
144 		//__sync_synchronize();
145 		/* Some safety checks against bugs and overload conditions */
146 		if (pb->is_free) {
147 			hlog(LOG_ERR, "worker %d: process_outgoing got pbuf %d marked free, age %d (now %d t %d)\n%.*s",
148 				self->id, pb->seqnum, tick - pb->t, tick, pb->t, pb->packet_len-2, pb->data);
149 			abort(); /* this would be pretty bad, so we crash immediately */
150 		} else if (pb->t > tick + 2) {
151 			/* 2-second offset is normal in case of one thread updating tick earlier than another
152 			 * and a little thread scheduling luck
153 			 */
154 			hlog(LOG_ERR, "worker %d: process_outgoing got packet %d from future with t %d > tick %d!\n%.*s",
155 				self->id, pb->seqnum, pb->t, tick, pb->packet_len-2, pb->data);
156 			self->internal_packet_drops++;
157 			if (self->internal_packet_drops > 10)
158 				status_error(86400, "packet_drop_future");
159 		} else if (tick - pb->t > 5) {
160 			/* this is a bit too old, are we stuck? */
161 			hlog(LOG_ERR, "worker %d: process_outgoing got packet %d aged %d sec (now %d t %d)\n%.*s",
162 				self->id, pb->seqnum, tick - pb->t, tick, pb->t, pb->packet_len-2, pb->data);
163 			self->internal_packet_drops++;
164 			if (self->internal_packet_drops > 10)
165 				status_error(86400, "packet_drop_hang");
166 		} else {
167 			process_outgoing_single(self, pb);
168 		}
169 		self->last_pbuf_seqnum = pb->seqnum;
170 		self->pbuf_global_prevp = &pb->next;
171 	}
172 
173 	while ((pb = *self->pbuf_global_dupe_prevp)) {
174 		if (pb->is_free) {
175 			hlog(LOG_ERR, "worker %d: process_outgoing got dupe %d marked free, age %d (now %d t %d)\n%.*s",
176 				self->id, pb->seqnum, tick - pb->t, tick, pb->t, pb->packet_len-2, pb->data);
177 			abort();
178 		} else if (pb->t > tick + 2) {
179 			hlog(LOG_ERR, "worker: process_outgoing got dupe from future %d with t %d > tick %d!\n%.*s",
180 				pb->seqnum, pb->t, tick, pb->packet_len-2, pb->data);
181 		} else if (tick - pb->t > 5) {
182 			hlog(LOG_ERR, "worker: process_outgoing got dupe %d aged %d sec\n%.*s",
183 				pb->seqnum, tick - pb->t, pb->packet_len-2, pb->data);
184 		} else {
185 			process_outgoing_single(self, pb);
186 		}
187 		self->last_pbuf_dupe_seqnum = pb->seqnum;
188 		self->pbuf_global_dupe_prevp = &pb->next;
189 	}
190 
191 	if ((e = rwl_rdunlock(&pbuf_global_rwlock))) {
192 		hlog(LOG_CRIT, "worker: Failed to rdunlock pbuf_global_rwlock!");
193 		exit(1);
194 	}
195 }
196 
197