1 /* 2 * aprsc 3 * 4 * (c) Heikki Hannikainen, OH7LZB <hessu@hes.iki.fi> 5 * 6 * This program is licensed under the BSD license, which can be found 7 * in the file LICENSE. 8 * 9 */ 10 11 #ifndef WORKER_H 12 #define WORKER_H 13 14 #define _GNU_SOURCE 15 #ifndef __USE_UNIX98 16 #define __USE_UNIX98 /* to get PTHREAD_MUTEX_RECURSIVE on Linux */ 17 #endif 18 19 #include <pthread.h> 20 #include <semaphore.h> 21 #include <sys/types.h> 22 #include <sys/socket.h> 23 #include <netinet/in.h> 24 #include <arpa/inet.h> 25 #include <time.h> 26 #include <stdio.h> 27 #include <stdarg.h> 28 29 #include "xpoll.h" 30 #include "rwlock.h" 31 #include "cJSON.h" 32 #include "errno_aprsc.h" 33 #include "ssl.h" 34 35 extern time_t now; /* current wallclock time */ 36 extern time_t tick; /* clocktick - monotonously increasing for timers, not affected by NTP et al */ 37 38 extern void pthreads_profiling_reset(const char *name); 39 40 extern pthread_attr_t pthr_attrs; /* used to setup new threads */ 41 42 /* minimum and maximum length of a callsign on APRS-IS */ 43 #define CALLSIGNLEN_MIN 3 44 #define CALLSIGNLEN_MAX 9 45 46 /* packet length limiters and buffer sizes */ 47 #define PACKETLEN_MIN 10 /* minimum length for a valid APRS-IS packet: "A1A>B1B:\r\n" */ 48 #define PACKETLEN_MAX 512 /* maximum length for a valid APRS-IS packet (incl. CRLF) */ 49 50 /* 51 * Packet length statistics: 52 * 53 * <= 80: about 25% 54 * <= 90: about 36% 55 * <= 100: about 73% 56 * <= 110: about 89% 57 * <= 120: about 94% 58 * <= 130: about 97% 59 * <= 140: about 98.7% 60 * <= 150: about 99.4% 61 */ 62 63 #define PACKETLEN_MAX_SMALL 100 64 #define PACKETLEN_MAX_MEDIUM 180 /* about 99.5% are smaller than this */ 65 #define PACKETLEN_MAX_LARGE PACKETLEN_MAX 66 67 /* number of pbuf_t structures to allocate at a time */ 68 #define PBUF_ALLOCATE_BUNCH_SMALL 2000 /* grow to 2000 in production use */ 69 #define PBUF_ALLOCATE_BUNCH_MEDIUM 2000 /* grow to 2000 in production use */ 70 #define PBUF_ALLOCATE_BUNCH_LARGE 50 /* grow to 50 in production use */ 71 72 /* a packet buffer */ 73 /* Type flags -- some can happen in combinations: T_CWOP + T_WX / T_CWOP + T_POSITION ... */ 74 #define T_POSITION (1 << 0) // Packet is of position type 75 #define T_OBJECT (1 << 1) // packet is an object 76 #define T_ITEM (1 << 2) // packet is an item 77 #define T_MESSAGE (1 << 3) // packet is a message 78 #define T_NWS (1 << 4) // packet is a NWS message 79 #define T_WX (1 << 5) // packet is WX data 80 #define T_TELEMETRY (1 << 6) // packet is telemetry 81 #define T_QUERY (1 << 7) // packet is a query 82 #define T_STATUS (1 << 8) // packet is status 83 #define T_USERDEF (1 << 9) // packet is userdefined 84 #define T_CWOP (1 << 10) // packet is recognized as CWOP 85 #define T_STATCAPA (1 << 11) // packet is station capability response 86 #define T_3RDPARTY (1 << 12) // packet is a 3rd-party } packet 87 #define T_ALL (1 << 15) // set on _all_ packets 88 89 #define F_DUPE (1 << 0) /* Duplicate of a previously seen packet */ 90 #define F_HASPOS (1 << 1) /* This packet has valid parsed position */ 91 #define F_HAS_TCPIP (1 << 2) /* There is a TCPIP* in the path */ 92 #define F_FROM_UPSTR (1 << 3) /* Packet is from an upstream server */ 93 #define F_FROM_DOWNSTR (1 << 4) /* Packet is from a downstream server */ 94 95 struct client_t; /* forward declarator */ 96 97 struct pbuf_t { 98 struct pbuf_t *next; 99 struct client_t *origin; 100 /* where did we get it from (don't send it back) 101 NOTE: This pointer is NOT guaranteed to be valid! 102 It does get invalidated by originating connection close, 103 but it is only used as read-only comparison reference 104 against output client connection pointer. It may 105 point to reused connection entry, but even that does 106 not matter much -- a few packets may be left unrelayed 107 to the a client in some situations, but packets sent 108 a few seconds latter will go through just fine. 109 In case of "dump history" (if we ever do) this pointer 110 is ignored while history dumping is being done. 111 */ 112 113 uint32_t srcname_hash; /* source name hash */ 114 uint32_t srccall_hash; /* srccall hash */ 115 uint32_t dstname_hash; /* srccall hash */ 116 uint16_t packettype; /* bitmask: one or more of T_* */ 117 uint16_t flags; /* bitmask: one or more of F_* */ 118 uint16_t srcname_len; /* parsed length of source (object, item, srcall) name 3..9 */ 119 uint16_t dstcall_len; /* parsed length of destination callsign *including* SSID */ 120 uint16_t dstname_len; /* parsed length of message destination including SSID */ 121 uint16_t entrycall_len; 122 uint32_t seqnum; /* ever increasing counter, dupecheck sets */ 123 time_t t; /* when the packet was received */ 124 125 int packet_len; /* the actual length of the packet, including CRLF */ 126 int buf_len; /* the length of this buffer */ 127 128 const char *srccall_end; /* source callsign with SSID */ 129 const char *dstcall_end_or_ssid; /* end of dest callsign (without SSID) */ 130 const char *dstcall_end; /* end of dest callsign with SSID */ 131 const char *qconst_start; /* "qAX,incomingSSID:" -- for q and e filters */ 132 const char *info_start; /* pointer to start of info field */ 133 const char *srcname; /* source's name (either srccall or object/item name) */ 134 const char *dstname; /* message destination callsign */ 135 136 float lat; /* if the packet is PT_POSITION, latitude and longitude go here */ 137 float lng; /* .. in RADIAN */ 138 float cos_lat; /* cache of COS of LATitude for radial distance filter */ 139 140 char symbol[3]; /* 2(+1) chars of symbol, if any, NUL for not found */ 141 char is_free; /* 1: in global free list, 0: not in global free list */ 142 143 char data[1]; /* contains the whole packet, including CRLF, ready to transmit */ 144 }; 145 146 /* global packet buffer */ 147 extern rwlock_t pbuf_global_rwlock; 148 extern struct pbuf_t *pbuf_global; 149 extern struct pbuf_t **pbuf_global_prevp; 150 extern struct pbuf_t *pbuf_global_dupe; 151 extern struct pbuf_t **pbuf_global_dupe_prevp; 152 153 /* a network client */ 154 typedef enum { 155 CSTATE_INIT, 156 CSTATE_UDP, 157 CSTATE_LOGIN, 158 CSTATE_LOGRESP, 159 CSTATE_CONNECTED, 160 CSTATE_COREPEER 161 } CStateEnum; 162 163 struct worker_t; /* used in client_t, but introduced later */ 164 struct filter_t; /* used in client_t, but introduced later */ 165 166 union sockaddr_u { 167 struct sockaddr sa; 168 struct sockaddr_in si; 169 struct sockaddr_in6 si6; 170 }; 171 172 /* list of message recipient callsigns heard on a client port */ 173 #define CLIENT_HEARD_BUCKETS 32 /* up to ~300 calls in heard list per client */ 174 struct client_heard_t { 175 struct client_heard_t *next; 176 struct client_heard_t **prevp; 177 178 uint32_t hash; 179 180 int call_len; 181 time_t last_heard; 182 183 char callsign[CALLSIGNLEN_MAX+1]; 184 }; 185 186 #define WBUF_ADJUSTER 0 /* Client WBUF adjustment can be usefull -- but code is infant.. */ 187 188 /* error codes for incoming packet drop reasons */ 189 #define INERR_UNKNOWN 0 190 #define INERR_NO_COLON -1 /* no : in packet */ 191 #define INERR_NO_DST -2 /* no > in packet to mark beginning of dstcall */ 192 #define INERR_NO_PATH -3 /* no path found between srccall and : */ 193 #define INERR_INV_SRCCALL -4 /* invalid or too long srccall */ 194 #define INERR_NO_BODY -5 /* no packet body/data after : */ 195 #define INERR_INV_DSTCALL -6 /* invalid or too long dstcall */ 196 #define INERR_DISALLOW_UNVERIFIED -7 /* disallow_unverified = 1, unverified client */ 197 #define INERR_DISALLOW_UNVERIFIED_PATH -8 /* disallow_unverified = 1, TCPXX path */ 198 #define INERR_NOGATE -9 /* packet path has NOGATE/RFONLY */ 199 #define INERR_3RD_PARTY_IP -10 /* 3rd-party packet dropped due to TCPIP/TCPXX in path */ 200 #define INERR_INV_3RD_PARTY -11 /* invalid 3rd-party packet header */ 201 #define INERR_GENERAL_QUERY -12 /* general query ?APRS? dropped */ 202 #define INERR_OUT_OF_PBUFS -13 /* aprsc ran out of packet buffers */ 203 #define INERR_CLASS_FAIL -14 /* aprsc failed to classify packet */ 204 #define INERR_Q_BUG -15 /* aprsc q construct code bugging */ 205 #define INERR_Q_DROP -16 /* q construct drop */ 206 #define INERR_SHORT_PACKET -17 /* too short packet */ 207 #define INERR_LONG_PACKET -18 /* too long packet */ 208 #define INERR_INV_PATH_CALL -19 /* invalid callsign in path */ 209 #define INERR_Q_QAX -20 210 #define INERR_Q_QAZ -21 211 #define INERR_Q_QPATH_MYCALL -22 212 #define INERR_Q_QPATH_CALL_TWICE -23 213 #define INERR_Q_PATH_LOGIN_NOT_LAST -24 214 #define INERR_Q_PATH_CALL_IS_LOCAL_CLIENT -25 215 #define INERR_Q_PATH_CALL_IS_INVALID -26 216 #define INERR_Q_QAU_PATH_CALL_IS_SRCCALL -27 217 #define INERR_Q_NEWQ_BUFFER_SMALL -28 218 #define INERR_Q_NONVAL_MULTI_Q_CALLS -29 219 #define INERR_Q_I_NO_VIACALL -30 220 #define INERR_Q_DISALLOW_PROTOCOL -31 221 #define INERR_EMPTY -32 222 #define INERR_DIS_SRCCALL -33 223 #define INERR_DIS_DX -34 224 #define INERR_DIS_MSG_DST -35 225 226 #define INERR_MIN -35 /* MINIMUM VALUE FOR INERR, GROW WHEN NEEDED! */ 227 /* WHEN ADDING STUFF HERE, REMEMBER TO UPDATE inerr_labels IN incoming.c. Thanks! */ 228 #define INERR_BUCKETS (INERR_MIN*-1 + 1) 229 230 struct portaccount_t { /* Port accounter tracks port usage, and traffic 231 Reporting looks up these via listener list. */ 232 pthread_mutex_t mutex; /* mutex to protect counters, refcount especially */ 233 234 long counter; /* New arriving connects count */ 235 long gauge; /* Number of current connects */ 236 long gauge_max; /* Maximum of the current connects */ 237 238 long long rxbytes, txbytes; 239 long long rxpackets, txpackets; 240 long long rxdrops, rxdupes; 241 long long rxerrs[INERR_BUCKETS]; 242 243 /* record usage references */ 244 int refcount; /* listener = 1, clients ++ */ 245 }; 246 247 extern struct portaccount_t client_connects_tcp; 248 extern struct portaccount_t client_connects_sctp; 249 extern struct portaccount_t client_connects_udp; 250 extern struct portaccount_t inbound_connects; 251 252 struct client_udp_t { /* UDP services can be available at multiple 253 client ports. This is shared refcounted 254 file descriptor for them. */ 255 struct client_udp_t *next; 256 struct client_udp_t **prevp; 257 struct portaccount_t *portaccount; 258 int fd; /* file descriptor */ 259 int refcount; /* Reference count */ 260 int polled; /* Is there a thread polling this? */ 261 uint16_t af; /* Address family */ 262 uint16_t portnum; /* Server UDP port */ 263 }; 264 265 266 #define FILTER_S_SIZE 256 /* how many bytes of filter to store for status display */ 267 268 /* These are the currently used buffer sizes. 269 * Do not increase them too much - we wish to disconnect stuck sockets pretty 270 * quickly, so that hanging servers will not cause data to overflow 271 * the 30-second dupe check window. The current APRS-IS rate is somewhere around 272 * 3-4 Kbytes/second, and then we have the operating system TCP socket buffers too. 273 */ 274 #define FIXED_IOBUFS 0 275 #ifdef FIXED_IOBUFS 276 #define OBUF_SIZE 8000 277 #define IBUF_SIZE 8000 278 #endif 279 280 struct client_t { 281 struct client_t *next; 282 struct client_t **prevp; 283 284 struct client_t *class_next; 285 struct client_t **class_prevp; 286 287 union sockaddr_u addr; 288 struct portaccount_t *portaccount; /* port specific global account accumulator */ 289 struct portaccount_t localaccount; /* client connection specific account accumulator */ 290 291 int ai_protocol; /* IPPROTO_SCTP, UDP, TCP */ 292 293 struct client_udp_t *udpclient; /* pointer to udp service socket, if available */ 294 int udp_port; /* client udp port - if client has requested it */ 295 int udpaddrlen; /* ready to use sockaddr length */ 296 union sockaddr_u udpaddr; /* ready to use sockaddr data */ 297 298 int fd; 299 300 int uplink_index; /* uplink array index */ 301 int portnum; 302 int listener_id; /* which listener is this client connected to */ 303 time_t connect_time;/* Time of connection, wallclock real time */ 304 time_t connect_tick;/* Time of connection, monotonous */ 305 time_t last_read; /* Time of last read - not necessarily last packet... */ 306 time_t keepalive; /* Time of next keepalive chime */ 307 time_t cleanup; /* Time of next cleanup */ 308 309 struct xpoll_fd_t *xfd; /* poll()/select() structure as defined in xpoll.h */ 310 311 #ifdef USE_SSL 312 struct ssl_connection_t *ssl_con; 313 #endif 314 315 /* first stage read buffer - used to crunch out lines to packet buffers */ 316 #ifndef FIXED_IOBUFS 317 char *ibuf; 318 #endif 319 int ibuf_size; /* size of buffer */ 320 int ibuf_end; /* where data in buffer ends */ 321 322 /* output buffer */ 323 #ifndef FIXED_IOBUFS 324 char *obuf; 325 #endif 326 int obuf_size; /* size of buffer */ 327 int obuf_start; /* where data in buffer starts */ 328 int obuf_end; /* where data in buffer ends */ 329 int obuf_flushsize; /* how much data in buf before forced write() at adding ? */ 330 int obuf_writes; /* how many times (since last check) the socket has been written ? */ 331 int obuf_wtime; /* when was last write? */ 332 #if WBUF_ADJUSTER 333 int wbuf_size; /* socket wbuf size */ 334 #endif 335 int32_t flags; /* bit flags on what kind of client this is */ 336 337 #define CLFLAGS_INPORT 0x001 338 #define CLFLAGS_UPLINKPORT 0x002 339 #define CLFLAGS_UDPSUBMIT 0x004 340 #define CLFLAGS_PORT_RO 0x008 341 #define CLFLAGS_USERFILTEROK 0x010 /* Permits entry of user defined filters */ 342 #define CLFLAGS_FULLFEED 0x100 /* Together with filter t/c* -- which really implements it */ 343 #define CLFLAGS_DUPEFEED 0x200 /* Duplicates are also sent to client */ 344 #define CLFLAGS_MESSAGEONLY 0x400 /* Together with filter t/m -- which really implements it */ 345 #define CLFLAGS_CLIENTONLY 0x800 /* Client connected on client-only port */ 346 #define CLFLAGS_IGATE 0x1000 /* Igate port */ 347 #define CLFLAGS_UPLINKMULTI 0x2000 /* Allow multiple parallel outgoing connections */ 348 349 #define VALIDATED_WEAK 1 /* client validated with passcode */ 350 #define VALIDATED_STRONG 3 /* client validated with SSL certificate */ 351 352 CStateEnum state; /* state of the client... one of CSTATE_* */ 353 char warned; /* the client has been warned that it has bad filter definition */ 354 char validated; /* did the client provide a valid passcode */ 355 char username_len; /* length of user name */ 356 char hidden; /* is the user on a hidden listener socket, not shown on status */ 357 char failed_cmds; /* how many login commands have failed */ 358 char quirks_mode; /* is this a known buggy-and-unmaintained application on our blacklist */ 359 char loc_known; /* have we received a position packet from this client */ 360 361 /* the current handler function for incoming lines */ 362 int (*handler_line_in) (struct worker_t *self, struct client_t *c, int l4proto, char *s, int len); 363 int (*write) (struct worker_t *self, struct client_t *c, char *p, int len); 364 365 int (*handler_client_readable) (struct worker_t *self, struct client_t *c); 366 int (*handler_client_writable) (struct worker_t *self, struct client_t *c); 367 368 /* outbound filter chain head */ 369 struct filter_t *posdefaultfilters; 370 struct filter_t *negdefaultfilters; 371 struct filter_t *posuserfilters; 372 struct filter_t *neguserfilters; 373 374 /* List of station callsigns (not objects/items!) which have been 375 * heard by this client. Only collected for filtered ports! 376 * Used for deciding if messages should be routed here. 377 * 378 * client_courtesy lists the srccalls which have originated messages 379 * on this filtered ports, and should have a courtesy position sent. 380 */ 381 struct client_heard_t* client_heard[CLIENT_HEARD_BUCKETS]; 382 struct client_heard_t* client_courtesy[CLIENT_HEARD_BUCKETS]; 383 int client_heard_count; /* number of 'heard' list entries for clients */ 384 int client_courtesy_count; /* number of 'courtesy' list entries for clients */ 385 386 /* coordinates of client, if transmitted */ 387 float lat, lng, cos_lat; 388 389 // Maybe we use these four items, or maybe not. 390 // They are there for experimenting with outgoing queue processing algorithms. 391 /* Pointer to last pointer in pbuf_global(_dupe) */ 392 //struct pbuf_t **pbuf_global_prevp; 393 //struct pbuf_t **pbuf_global_dupe_prevp; 394 //uint32_t last_pbuf_seqnum; 395 //uint32_t last_pbuf_dupe_seqnum; 396 397 char username[16]; /* The callsign */ 398 char app_name[32]; /* application name, from 'user' command */ 399 char app_version[32]; /* application version, from 'user' command */ 400 401 char addr_rem[80]; /* client IP address in text format */ 402 char addr_hex[36]; /* client IP address in hex format */ 403 char addr_loc[80]; /* server IP address in text format */ 404 405 #ifdef USE_SSL 406 /* for SSL cert auth, store SSL certificate subject and issuer */ 407 char cert_subject[256]; 408 char cert_issuer[256]; 409 #endif 410 411 #ifdef FIXED_IOBUFS 412 char ibuf[IBUF_SIZE]; 413 char obuf[OBUF_SIZE]; 414 #endif 415 char filter_s[FILTER_S_SIZE]; 416 }; 417 418 extern struct client_t *client_alloc(void); 419 extern void client_free(struct client_t *c); 420 extern int set_client_sockopt(struct client_t *c); 421 extern int pass_client_to_worker(struct worker_t *wc, struct client_t *c); 422 extern void worker_mark_client_connected(struct worker_t *self, struct client_t *c); 423 extern struct client_t *pseudoclient_setup(int portnum); 424 425 426 /* worker thread structure */ 427 struct worker_t { 428 struct worker_t *next; 429 struct worker_t **prevp; 430 431 int id; /* sequential id for thread */ 432 pthread_t th; /* the thread itself */ 433 434 int shutting_down; /* should I shut down? */ 435 436 struct client_t *clients; /* all clients handled by this thread */ 437 /* c->class_next lists, classified clients for optimized outbound */ 438 struct client_t *clients_dupe; /* dupeclient port clients */ 439 struct client_t *clients_ro; /* read-only clients */ 440 struct client_t *clients_ups; /* upstreams and peers */ 441 struct client_t *clients_other; /* other clients (unoptimized) */ 442 pthread_mutex_t clients_mutex; /* mutex to protect access to the client list by the status dumps */ 443 444 struct client_t *new_clients; /* new clients which passed in by accept */ 445 struct client_t *new_clients_last; /* last client in the list, to support FIFO queuing */ 446 pthread_mutex_t new_clients_mutex; /* mutex to protect *new_clients */ 447 int client_count; /* modified by worker thread only! */ 448 449 struct xpoll_t xp; /* poll/epoll/select wrapper */ 450 451 /* thread-local packet buffer freelist */ 452 struct pbuf_t *pbuf_free_small; /* <= 130 bytes */ 453 struct pbuf_t *pbuf_free_medium; /* 131 >= x <= 300 */ 454 struct pbuf_t *pbuf_free_large; /* 301 >= x <= 600 */ 455 456 /* packets which have been parsed, waiting to be moved into 457 * pbuf_incoming 458 */ 459 struct pbuf_t *pbuf_incoming_local; 460 struct pbuf_t **pbuf_incoming_local_last; 461 462 /* packets which have been parsed, waiting for dupe check */ 463 struct pbuf_t *pbuf_incoming; 464 struct pbuf_t **pbuf_incoming_last; 465 pthread_mutex_t pbuf_incoming_mutex; 466 467 int pbuf_incoming_local_count; /* number of packets parsed, not yet in dupecheck's inbox */ 468 int pbuf_incoming_count; /* number of packets waiting for dupecheck thread to get */ 469 470 /* Pointer to last pointer in pbuf_global(_dupe) */ 471 struct pbuf_t **pbuf_global_prevp; 472 struct pbuf_t **pbuf_global_dupe_prevp; 473 474 uint32_t last_pbuf_seqnum; 475 uint32_t last_pbuf_dupe_seqnum; 476 477 /* how many packets were dropped internally within this worker 478 * (process hangs and time jumps) 479 */ 480 unsigned int internal_packet_drops; 481 }; 482 483 extern cJSON *worker_shutdown_clients; 484 485 extern int workers_running; 486 487 extern void pbuf_init(void); 488 extern void pbuf_free(struct worker_t *self, struct pbuf_t *p); 489 extern void pbuf_free_many(struct pbuf_t **array, int numbufs); 490 extern void pbuf_dump(FILE *fp); 491 extern void pbuf_dupe_dump(FILE *fp); 492 493 extern int client_postread(struct worker_t *self, struct client_t *c, int r); 494 495 extern int client_printf(struct worker_t *self, struct client_t *c, const char *fmt, ...); 496 extern int client_write(struct worker_t *self, struct client_t *c, char *p, int len); 497 extern int client_bad_filter_notify(struct worker_t *self, struct client_t *c, const char *filt); 498 extern void client_close(struct worker_t *self, struct client_t *c, int errnum); 499 extern void client_init(void); 500 501 extern struct worker_t *worker_threads; 502 extern struct worker_t *worker_alloc(void); 503 extern void worker_free_buffers(struct worker_t *self); 504 extern void workers_stop(int stop_all); 505 extern void workers_start(void); 506 507 extern int keepalive_interval; 508 extern int fileno_limit; 509 510 extern struct client_udp_t *udpclients; 511 extern struct client_udp_t *udppeers; 512 extern void client_udp_free(struct client_udp_t *u); 513 extern struct client_udp_t *client_udp_alloc(struct client_udp_t **root, int fd, int portnum); 514 extern struct client_udp_t *client_udp_find(struct client_udp_t *root, int af, int portnum); 515 516 extern void inbound_connects_account(const int add, struct portaccount_t *p); 517 518 extern struct portaccount_t *port_accounter_alloc(void); 519 extern void port_accounter_drop(struct portaccount_t *p); 520 521 extern char *strsockaddr(const struct sockaddr *sa, const int addr_len); 522 extern char *hexsockaddr(const struct sockaddr *sa, const int addr_len); 523 extern void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int txbytes, int txpackets, int rxerr, int rxdupes); 524 525 extern void json_add_rxerrs(cJSON *root, const char *key, long long vals[]); 526 extern int worker_client_list(cJSON *workers, cJSON *clients, cJSON *uplinks, cJSON *peers, cJSON *totals, cJSON *memory); 527 528 #endif 529