1 /*++
2 /* NAME
3 /* qmgr_deliver 3
4 /* SUMMARY
5 /* deliver one per-site queue entry to that site
6 /* SYNOPSIS
7 /* #include "qmgr.h"
8 /*
9 /* int qmgr_deliver_concurrency;
10 /*
11 /* int qmgr_deliver(transport, fp)
12 /* QMGR_TRANSPORT *transport;
13 /* VSTREAM *fp;
14 /* DESCRIPTION
15 /* This module implements the client side of the `queue manager
16 /* to delivery agent' protocol. The queue manager uses
17 /* asynchronous I/O so that it can drive multiple delivery
18 /* agents in parallel. Depending on the outcome of a delivery
19 /* attempt, the status of messages, queues and transports is
20 /* updated.
21 /*
22 /* qmgr_deliver_concurrency is a global counter that says how
23 /* many delivery processes are in use. This can be used, for
24 /* example, to control the size of the `active' message queue.
25 /*
26 /* qmgr_deliver() executes when a delivery process announces its
27 /* availability for the named transport. It arranges for delivery
28 /* of a suitable queue entry. The \fIfp\fR argument specifies a
29 /* stream that is connected to a delivery process, or a null
30 /* pointer if the transport accepts no connection. Upon completion
31 /* of delivery (successful or not), the stream is closed, so that the
32 /* delivery process is released.
33 /* DIAGNOSTICS
34 /* LICENSE
35 /* .ad
36 /* .fi
37 /* The Secure Mailer license must be distributed with this software.
38 /* AUTHOR(S)
39 /* Wietse Venema
40 /* IBM T.J. Watson Research
41 /* P.O. Box 704
42 /* Yorktown Heights, NY 10598, USA
43 /*
44 /* Preemptive scheduler enhancements:
45 /* Patrik Rak
46 /* Modra 6
47 /* 155 00, Prague, Czech Republic
48 /*
49 /* Wietse Venema
50 /* Google, Inc.
51 /* 111 8th Avenue
52 /* New York, NY 10011, USA
53 /*--*/
54
55 /* System library. */
56
57 #include <sys_defs.h>
58 #include <time.h>
59 #include <string.h>
60
61 /* Utility library. */
62
63 #include <msg.h>
64 #include <vstring.h>
65 #include <vstream.h>
66 #include <vstring_vstream.h>
67 #include <events.h>
68 #include <iostuff.h>
69 #include <stringops.h>
70 #include <mymalloc.h>
71
72 /* Global library. */
73
74 #include <mail_queue.h>
75 #include <mail_proto.h>
76 #include <recipient_list.h>
77 #include <mail_params.h>
78 #include <deliver_request.h>
79 #include <verp_sender.h>
80 #include <dsn_util.h>
81 #include <dsn_buf.h>
82 #include <dsb_scan.h>
83 #include <rcpt_print.h>
84 #include <smtputf8.h>
85
86 /* Application-specific. */
87
88 #include "qmgr.h"
89
90 /*
91 * Important note on the _transport_rate_delay implementation: after
92 * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all
93 * code paths must directly or indirectly invoke qmgr_transport_unthrottle()
94 * or qmgr_transport_throttle(). Otherwise, transports with non-zero
95 * _transport_rate_delay will become stuck.
96 */
97
98 int qmgr_deliver_concurrency;
99
100 /*
101 * Message delivery status codes.
102 */
103 #define DELIVER_STAT_OK 0 /* all recipients delivered */
104 #define DELIVER_STAT_DEFER 1 /* try some recipients later */
105 #define DELIVER_STAT_CRASH 2 /* mailer internal problem */
106
107 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */
108
qmgr_deliver_initial_reply(VSTREAM * stream)109 static int qmgr_deliver_initial_reply(VSTREAM *stream)
110 {
111 if (peekfd(vstream_fileno(stream)) < 0) {
112 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
113 return (DELIVER_STAT_CRASH);
114 } else if (attr_scan(stream, ATTR_FLAG_STRICT,
115 RECV_ATTR_STREQ(MAIL_ATTR_PROTO, MAIL_ATTR_PROTO_DELIVER),
116 ATTR_TYPE_END) != 0) {
117 msg_warn("%s: malformed response", VSTREAM_PATH(stream));
118 return (DELIVER_STAT_DEFER);
119 } else {
120 return (0);
121 }
122 }
123
124 /* qmgr_deliver_final_reply - retrieve final delivery process response */
125
qmgr_deliver_final_reply(VSTREAM * stream,DSN_BUF * dsb)126 static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
127 {
128 int stat;
129
130 if (peekfd(vstream_fileno(stream)) < 0) {
131 msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
132 return (DELIVER_STAT_CRASH);
133 } else if (attr_scan(stream, ATTR_FLAG_STRICT,
134 RECV_ATTR_FUNC(dsb_scan, (void *) dsb),
135 RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat),
136 ATTR_TYPE_END) != 2) {
137 msg_warn("%s: malformed response", VSTREAM_PATH(stream));
138 return (DELIVER_STAT_CRASH);
139 } else {
140 return (stat ? DELIVER_STAT_DEFER : 0);
141 }
142 }
143
144 /* qmgr_deliver_send_request - send delivery request to delivery process */
145
qmgr_deliver_send_request(QMGR_ENTRY * entry,VSTREAM * stream)146 static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
147 {
148 RECIPIENT_LIST list = entry->rcpt_list;
149 RECIPIENT *recipient;
150 QMGR_MESSAGE *message = entry->message;
151 VSTRING *sender_buf = 0;
152 MSG_STATS stats;
153 char *sender;
154 int flags;
155 int smtputf8 = message->smtputf8;
156 const char *addr;
157
158 /*
159 * Todo: integrate with code up-stream that builds the delivery request.
160 */
161 for (recipient = list.info; recipient < list.info + list.len; recipient++)
162 if (var_smtputf8_enable && (addr = recipient->address)[0]
163 && !allascii(addr) && valid_utf8_string(addr, strlen(addr))) {
164 smtputf8 |= SMTPUTF8_FLAG_RECIPIENT;
165 if (message->verp_delims)
166 smtputf8 |= SMTPUTF8_FLAG_SENDER;
167 }
168
169 /*
170 * If variable envelope return path is requested, change prefix+@origin
171 * into prefix+user=domain@origin. Note that with VERP there is only one
172 * recipient per delivery.
173 */
174 if (message->verp_delims == 0) {
175 sender = message->sender;
176 } else {
177 sender_buf = vstring_alloc(100);
178 verp_sender(sender_buf, message->verp_delims,
179 message->sender, list.info);
180 sender = vstring_str(sender_buf);
181 }
182
183 flags = message->tflags
184 | entry->queue->dflags
185 | (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
186 (void) QMGR_MSG_STATS(&stats, message);
187 attr_print(stream, ATTR_FLAG_NONE,
188 SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags),
189 SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name),
190 SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id),
191 SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset),
192 SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length),
193 SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop),
194 SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding),
195 SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8),
196 SEND_ATTR_STR(MAIL_ATTR_SENDER, sender),
197 SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid),
198 SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret),
199 SEND_ATTR_FUNC(msg_stats_print, (const void *) &stats),
200 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
201 SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name),
202 SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr),
203 SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port),
204 SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto),
205 SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo),
206 /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
207 SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method),
208 SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username),
209 SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender),
210 /* XXX Ditto if we want to pass TLS certificate info. */
211 SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident),
212 SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context),
213 SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len),
214 ATTR_TYPE_END);
215 if (sender_buf != 0)
216 vstring_free(sender_buf);
217 for (recipient = list.info; recipient < list.info + list.len; recipient++)
218 attr_print(stream, ATTR_FLAG_NONE,
219 SEND_ATTR_FUNC(rcpt_print, (const void *) recipient),
220 ATTR_TYPE_END);
221 if (vstream_fflush(stream) != 0) {
222 msg_warn("write to process (%s): %m", entry->queue->transport->name);
223 return (-1);
224 } else {
225 if (msg_verbose)
226 msg_info("qmgr_deliver: site `%s'", entry->queue->name);
227 return (0);
228 }
229 }
230
231 /* qmgr_deliver_abort - transport response watchdog */
232
qmgr_deliver_abort(int unused_event,void * context)233 static void qmgr_deliver_abort(int unused_event, void *context)
234 {
235 QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
236 QMGR_QUEUE *queue = entry->queue;
237 QMGR_TRANSPORT *transport = queue->transport;
238 QMGR_MESSAGE *message = entry->message;
239
240 msg_fatal("%s: timeout receiving delivery status from transport: %s",
241 message->queue_id, transport->name);
242 }
243
244 /* qmgr_deliver_update - process delivery status report */
245
qmgr_deliver_update(int unused_event,void * context)246 static void qmgr_deliver_update(int unused_event, void *context)
247 {
248 QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
249 QMGR_QUEUE *queue = entry->queue;
250 QMGR_TRANSPORT *transport = queue->transport;
251 QMGR_MESSAGE *message = entry->message;
252 static DSN_BUF *dsb;
253 int status;
254
255 /*
256 * Release the delivery agent from a "hot" queue entry.
257 */
258 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
259 event_disable_readwrite(vstream_fileno(entry->stream)); \
260 (void) vstream_fclose(entry->stream); \
261 entry->stream = 0; \
262 qmgr_deliver_concurrency--; \
263 } while (0)
264
265 if (dsb == 0)
266 dsb = dsb_create();
267
268 /*
269 * The message transport has responded. Stop the watchdog timer.
270 */
271 event_cancel_timer(qmgr_deliver_abort, context);
272
273 /*
274 * Retrieve the delivery agent status report. The numerical status code
275 * indicates if delivery should be tried again. The reason text is sent
276 * only when a site should be avoided for a while, so that the queue
277 * manager can log why it does not even try to schedule delivery to the
278 * affected recipients.
279 */
280 status = qmgr_deliver_final_reply(entry->stream, dsb);
281
282 /*
283 * The mail delivery process failed for some reason (although delivery
284 * may have been successful). Back off with this transport type for a
285 * while. Dispose of queue entries for this transport that await
286 * selection (the todo lists). Stay away from queue entries that have
287 * been selected (the busy lists), or we would have dangling pointers.
288 * The queue itself won't go away before we dispose of the current queue
289 * entry.
290 */
291 if (status == DELIVER_STAT_CRASH) {
292 message->flags |= DELIVER_STAT_DEFER;
293 #if 0
294 whatsup = concatenate("unknown ", transport->name,
295 " mail transport error", (char *) 0);
296 qmgr_transport_throttle(transport,
297 DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
298 myfree(whatsup);
299 #else
300 qmgr_transport_throttle(transport,
301 DSN_SIMPLE(&dsb->dsn, "4.3.0",
302 "unknown mail transport error"));
303 #endif
304 msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
305 transport->name);
306
307 /*
308 * Assume the worst and write a defer logfile record for each
309 * recipient. This omission was already present in the first queue
310 * manager implementation of 199703, and was fixed 200511.
311 *
312 * To avoid the synchronous qmgr_defer_recipient() operation for each
313 * recipient of this queue entry, release the delivery process and
314 * move the entry back to the todo queue. Let qmgr_defer_transport()
315 * log the recipient asynchronously if possible, and get out of here.
316 * Note: if asynchronous logging is not possible,
317 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
318 * the entry becomes a dangling pointer.
319 */
320 QMGR_DELIVER_RELEASE_AGENT(entry);
321 qmgr_entry_unselect(entry);
322 qmgr_defer_transport(transport, &dsb->dsn);
323 return;
324 }
325
326 /*
327 * This message must be tried again.
328 *
329 * If we have a problem talking to this site, back off with this site for a
330 * while; dispose of queue entries for this site that await selection
331 * (the todo list); stay away from queue entries that have been selected
332 * (the busy list), or we would have dangling pointers. The queue itself
333 * won't go away before we dispose of the current queue entry.
334 *
335 * XXX Caution: DSN_COPY() will panic on empty status or reason.
336 */
337 #define SUSPENDED "delivery temporarily suspended: "
338
339 if (status == DELIVER_STAT_DEFER) {
340 message->flags |= DELIVER_STAT_DEFER;
341 if (VSTRING_LEN(dsb->status)) {
342 /* Sanitize the DSN status/reason from the delivery agent. */
343 if (!dsn_valid(vstring_str(dsb->status)))
344 vstring_strcpy(dsb->status, "4.0.0");
345 if (VSTRING_LEN(dsb->reason) == 0)
346 vstring_strcpy(dsb->reason, "unknown error");
347 vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
348 if (QMGR_QUEUE_READY(queue)) {
349 qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
350 if (QMGR_QUEUE_THROTTLED(queue))
351 qmgr_defer_todo(queue, &dsb->dsn);
352 }
353 }
354 }
355
356 /*
357 * No problems detected. Mark the transport and queue as alive. The queue
358 * itself won't go away before we dispose of the current queue entry.
359 */
360 if (status != DELIVER_STAT_CRASH) {
361 qmgr_transport_unthrottle(transport);
362 if (VSTRING_LEN(dsb->reason) == 0)
363 qmgr_queue_unthrottle(queue);
364 }
365
366 /*
367 * Release the delivery process, and give some other queue entry a chance
368 * to be delivered. When all recipients for a message have been tried,
369 * decide what to do next with this message: defer, bounce, delete.
370 */
371 QMGR_DELIVER_RELEASE_AGENT(entry);
372 qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
373 }
374
375 /* qmgr_deliver - deliver one per-site queue entry */
376
qmgr_deliver(QMGR_TRANSPORT * transport,VSTREAM * stream)377 void qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
378 {
379 QMGR_ENTRY *entry;
380 DSN dsn;
381
382 /*
383 * Find out if this delivery process is really available. Once elected,
384 * the delivery process is supposed to express its happiness. If there is
385 * a problem, wipe the pending deliveries for this transport. This
386 * routine runs in response to an external event, so it does not run
387 * while some other queue manipulation is happening.
388 */
389 if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
390 #if 0
391 whatsup = concatenate(transport->name,
392 " mail transport unavailable", (char *) 0);
393 qmgr_transport_throttle(transport,
394 DSN_SIMPLE(&dsn, "4.3.0", whatsup));
395 myfree(whatsup);
396 #else
397 qmgr_transport_throttle(transport,
398 DSN_SIMPLE(&dsn, "4.3.0",
399 "mail transport unavailable"));
400 #endif
401 qmgr_defer_transport(transport, &dsn);
402 if (stream)
403 (void) vstream_fclose(stream);
404 return;
405 }
406
407 /*
408 * Find a suitable queue entry. Things may have changed since this
409 * transport was allocated. If no suitable entry is found,
410 * unceremoniously disconnect from the delivery process. The delivery
411 * agent request reading routine is prepared for the queue manager to
412 * change its mind for no apparent reason.
413 */
414 if ((entry = qmgr_job_entry_select(transport)) == 0) {
415 (void) vstream_fclose(stream);
416 return;
417 }
418
419 /*
420 * Send the queue file info and recipient info to the delivery process.
421 * If there is a problem, wipe the pending deliveries for this transport.
422 * This routine runs in response to an external event, so it does not run
423 * while some other queue manipulation is happening.
424 */
425 if (qmgr_deliver_send_request(entry, stream) < 0) {
426 qmgr_entry_unselect(entry);
427 #if 0
428 whatsup = concatenate(transport->name,
429 " mail transport unavailable", (char *) 0);
430 qmgr_transport_throttle(transport,
431 DSN_SIMPLE(&dsn, "4.3.0", whatsup));
432 myfree(whatsup);
433 #else
434 qmgr_transport_throttle(transport,
435 DSN_SIMPLE(&dsn, "4.3.0",
436 "mail transport unavailable"));
437 #endif
438 qmgr_defer_transport(transport, &dsn);
439 /* warning: entry may be a dangling pointer here */
440 (void) vstream_fclose(stream);
441 return;
442 }
443
444 /*
445 * If we get this far, go wait for the delivery status report.
446 */
447 qmgr_deliver_concurrency++;
448 entry->stream = stream;
449 event_enable_read(vstream_fileno(stream),
450 qmgr_deliver_update, (void *) entry);
451
452 /*
453 * Guard against broken systems.
454 */
455 event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout);
456 }
457