1--[[
2Copyright (c) 2016, Andrew Lewis <nerf@judo.za.org>
3Copyright (c) 2016, Vsevolod Stakhov <vsevolod@highsecure.ru>
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9    http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16]]--
17
18if confighelp then
19  return
20end
21
22-- A plugin that pushes metadata (or whole messages) to external services
23
24local redis_params
25local lua_util = require "lua_util"
26local rspamd_http = require "rspamd_http"
27local rspamd_util = require "rspamd_util"
28local rspamd_logger = require "rspamd_logger"
29local ucl = require "ucl"
30local E = {}
31local N = 'metadata_exporter'
32
33local settings = {
34  pusher_enabled = {},
35  pusher_format = {},
36  pusher_select = {},
37  mime_type = 'text/plain',
38  defer = false,
39  mail_from = '',
40  mail_to = 'postmaster@localhost',
41  helo = 'rspamd',
42  email_template = [[From: "Rspamd" <$mail_from>
43To: $mail_to
44Subject: Spam alert
45Date: $date
46MIME-Version: 1.0
47Message-ID: <$our_message_id>
48Content-type: text/plain; charset=utf-8
49Content-Transfer-Encoding: 8bit
50
51Authenticated username: $user
52IP: $ip
53Queue ID: $qid
54SMTP FROM: $from
55SMTP RCPT: $rcpt
56MIME From: $header_from
57MIME To: $header_to
58MIME Date: $header_date
59Subject: $header_subject
60Message-ID: $message_id
61Action: $action
62Score: $score
63Symbols: $symbols]],
64}
65
66local function get_general_metadata(task, flatten, no_content)
67  local r = {}
68  local ip = task:get_from_ip()
69  if ip and ip:is_valid() then
70    r.ip = tostring(ip)
71  else
72    r.ip = 'unknown'
73  end
74  r.user = task:get_user() or 'unknown'
75  r.qid = task:get_queue_id() or 'unknown'
76  r.subject = task:get_subject() or 'unknown'
77  r.action = task:get_metric_action('default')
78
79  local s = task:get_metric_score('default')[1]
80  r.score = flatten and string.format('%.2f', s) or s
81
82  local fuzzy = task:get_mempool():get_variable("fuzzy_hashes", "fstrings")
83  if fuzzy and #fuzzy > 0 then
84    local fz = {}
85    for _,h in ipairs(fuzzy) do
86      table.insert(fz, h)
87    end
88    if not flatten then
89      r.fuzzy = fz
90    else
91      r.fuzzy = table.concat(fz, ', ')
92    end
93  else
94    r.fuzzy = 'unknown'
95  end
96
97  local rcpt = task:get_recipients('smtp')
98  if rcpt then
99    local l = {}
100    for _, a in ipairs(rcpt) do
101      table.insert(l, a['addr'])
102    end
103    if not flatten then
104      r.rcpt = l
105    else
106      r.rcpt = table.concat(l, ', ')
107    end
108  else
109    r.rcpt = 'unknown'
110  end
111  local from = task:get_from('smtp')
112  if ((from or E)[1] or E).addr then
113    r.from = from[1].addr
114  else
115    r.from = 'unknown'
116  end
117  local syminf = task:get_symbols_all()
118  if flatten then
119    local l = {}
120    for _, sym in ipairs(syminf) do
121      local txt
122      if sym.options then
123        local topt = table.concat(sym.options, ', ')
124        txt = sym.name .. '(' .. string.format('%.2f', sym.score) .. ')' .. ' [' .. topt .. ']'
125      else
126        txt = sym.name .. '(' .. string.format('%.2f', sym.score) .. ')'
127      end
128      table.insert(l, txt)
129    end
130    r.symbols = table.concat(l, '\n\t')
131  else
132    r.symbols = syminf
133  end
134  local function process_header(name)
135    local hdr = task:get_header_full(name)
136    if hdr then
137      local l = {}
138      for _, h in ipairs(hdr) do
139        table.insert(l, h.decoded)
140      end
141      if not flatten then
142        return l
143      else
144        return table.concat(l, '\n')
145      end
146    else
147      return 'unknown'
148    end
149  end
150  if not no_content then
151    r.header_from = process_header('from')
152    r.header_to = process_header('to')
153    r.header_subject = process_header('subject')
154    r.header_date = process_header('date')
155    r.message_id = task:get_message_id()
156  end
157  return r
158end
159
160local formatters = {
161  default = function(task)
162    return task:get_content(), {}
163  end,
164  email_alert = function(task, rule, extra)
165    local meta = get_general_metadata(task, true)
166    local display_emails = {}
167    local mail_targets = {}
168    meta.mail_from = rule.mail_from or settings.mail_from
169    local mail_rcpt = rule.mail_to or settings.mail_to
170    if type(mail_rcpt) ~= 'table' then
171      table.insert(display_emails, string.format('<%s>', mail_rcpt))
172      table.insert(mail_targets, mail_rcpt)
173    else
174      for _, e in ipairs(mail_rcpt) do
175        table.insert(display_emails, string.format('<%s>', e))
176        table.insert(mail_targets, mail_rcpt)
177      end
178    end
179    if rule.email_alert_sender then
180      local x = task:get_from('smtp')
181      if x and string.len(x[1].addr) > 0 then
182        table.insert(mail_targets, x)
183        table.insert(display_emails, string.format('<%s>', x[1].addr))
184      end
185    end
186    if rule.email_alert_user then
187      local x = task:get_user()
188      if x then
189        table.insert(mail_targets, x)
190        table.insert(display_emails, string.format('<%s>', x))
191      end
192    end
193    if rule.email_alert_recipients then
194      local x = task:get_recipients('smtp')
195      if x then
196        for _, e in ipairs(x) do
197          if string.len(e.addr) > 0 then
198            table.insert(mail_targets, e.addr)
199            table.insert(display_emails, string.format('<%s>', e.addr))
200          end
201        end
202      end
203    end
204    meta.mail_to = table.concat(display_emails, ', ')
205    meta.our_message_id = rspamd_util.random_hex(12) .. '@rspamd'
206    meta.date = rspamd_util.time_to_string(rspamd_util.get_time())
207    return lua_util.template(rule.email_template or settings.email_template, meta), { mail_targets = mail_targets}
208  end,
209  json = function(task)
210    return ucl.to_format(get_general_metadata(task), 'json-compact')
211  end
212}
213
214local function is_spam(action)
215  return (action == 'reject' or action == 'add header' or action == 'rewrite subject')
216end
217
218local selectors = {
219  default = function(task)
220    return true
221  end,
222  is_spam = function(task)
223    local action = task:get_metric_action('default')
224    return is_spam(action)
225  end,
226  is_spam_authed = function(task)
227    if not task:get_user() then
228      return false
229    end
230    local action = task:get_metric_action('default')
231    return is_spam(action)
232  end,
233  is_reject = function(task)
234    local action = task:get_metric_action('default')
235    return (action == 'reject')
236  end,
237  is_reject_authed = function(task)
238    if not task:get_user() then
239      return false
240    end
241    local action = task:get_metric_action('default')
242    return (action == 'reject')
243  end,
244}
245
246local function maybe_defer(task, rule)
247  if rule.defer then
248    rspamd_logger.warnx(task, 'deferring message')
249    task:set_pre_result('soft reject', 'deferred', N)
250  end
251end
252
253local pushers = {
254  redis_pubsub = function(task, formatted, rule)
255    local _,ret,upstream
256    local function redis_pub_cb(err)
257      if err then
258        rspamd_logger.errx(task, 'got error %s when publishing on server %s',
259            err, upstream:get_addr())
260        return maybe_defer(task, rule)
261      end
262      return true
263    end
264    ret,_,upstream = rspamd_redis_make_request(task,
265      redis_params, -- connect params
266      nil, -- hash key
267      true, -- is write
268      redis_pub_cb, --callback
269      'PUBLISH', -- command
270      {rule.channel, formatted} -- arguments
271    )
272    if not ret then
273      rspamd_logger.errx(task, 'error connecting to redis')
274      maybe_defer(task, rule)
275    end
276  end,
277  http = function(task, formatted, rule)
278    local function http_callback(err, code)
279      if err then
280        rspamd_logger.errx(task, 'got error %s in http callback', err)
281        return maybe_defer(task, rule)
282      end
283      if code ~= 200 then
284        rspamd_logger.errx(task, 'got unexpected http status: %s', code)
285        return maybe_defer(task, rule)
286      end
287      return true
288    end
289    local hdrs = {}
290    if rule.meta_headers then
291      local gm = get_general_metadata(task, false, true)
292      local pfx = rule.meta_header_prefix or 'X-Rspamd-'
293      for k, v in pairs(gm) do
294        if type(v) == 'table' then
295          hdrs[pfx .. k] = ucl.to_format(v, 'json-compact')
296        else
297          hdrs[pfx .. k] = v
298        end
299      end
300    end
301    rspamd_http.request({
302      task=task,
303      url=rule.url,
304      body=formatted,
305      callback=http_callback,
306      mime_type=rule.mime_type or settings.mime_type,
307      headers=hdrs,
308    })
309  end,
310  send_mail = function(task, formatted, rule, extra)
311    local lua_smtp = require "lua_smtp"
312    local function sendmail_cb(ret, err)
313      if not ret then
314        rspamd_logger.errx(task, 'SMTP export error: %s', err)
315        maybe_defer(task, rule)
316      end
317    end
318
319    lua_smtp.sendmail({
320      task = task,
321      host = rule.smtp,
322      port = rule.smtp_port or settings.smtp_port or 25,
323      from = rule.mail_from or settings.mail_from,
324      recipients = extra.mail_targets or rule.mail_to or settings.mail_to,
325      helo = rule.helo or settings.helo,
326      timeout = rule.timeout or settings.timeout,
327    }, formatted, sendmail_cb)
328  end,
329}
330
331local opts = rspamd_config:get_all_opt(N)
332if not opts then return end
333local process_settings = {
334  select = function(val)
335    selectors.custom = assert(load(val))()
336  end,
337  format = function(val)
338    formatters.custom = assert(load(val))()
339  end,
340  push = function(val)
341    pushers.custom = assert(load(val))()
342  end,
343  custom_push = function(val)
344    if type(val) == 'table' then
345      for k, v in pairs(val) do
346        pushers[k] = assert(load(v))()
347      end
348    end
349  end,
350  custom_select = function(val)
351    if type(val) == 'table' then
352      for k, v in pairs(val) do
353        selectors[k] = assert(load(v))()
354      end
355    end
356  end,
357  custom_format = function(val)
358    if type(val) == 'table' then
359      for k, v in pairs(val) do
360        formatters[k] = assert(load(v))()
361      end
362    end
363  end,
364  pusher_enabled = function(val)
365    if type(val) == 'string' then
366      if pushers[val] then
367        settings.pusher_enabled[val] = true
368      else
369        rspamd_logger.errx(rspamd_config, 'Pusher type: %s is invalid', val)
370      end
371    elseif type(val) == 'table' then
372      for _, v in ipairs(val) do
373        if pushers[v] then
374          settings.pusher_enabled[v] = true
375        else
376          rspamd_logger.errx(rspamd_config, 'Pusher type: %s is invalid', val)
377        end
378      end
379    end
380  end,
381}
382for k, v in pairs(opts) do
383  local f = process_settings[k]
384  if f then
385    f(opts[k])
386  else
387    settings[k] = v
388  end
389end
390if type(settings.rules) ~= 'table' then
391  -- Legacy config
392  settings.rules = {}
393  if not next(settings.pusher_enabled) then
394    if pushers.custom then
395      rspamd_logger.infox(rspamd_config, 'Custom pusher implicitly enabled')
396      settings.pusher_enabled.custom = true
397    else
398      -- Check legacy options
399      if settings.url then
400        rspamd_logger.warnx(rspamd_config, 'HTTP pusher implicitly enabled')
401        settings.pusher_enabled.http = true
402      end
403      if settings.channel then
404        rspamd_logger.warnx(rspamd_config, 'Redis Pubsub pusher implicitly enabled')
405        settings.pusher_enabled.redis_pubsub = true
406      end
407      if settings.smtp and settings.mail_to then
408        rspamd_logger.warnx(rspamd_config, 'SMTP pusher implicitly enabled')
409        settings.pusher_enabled.send_mail = true
410      end
411    end
412  end
413  if not next(settings.pusher_enabled) then
414    rspamd_logger.errx(rspamd_config, 'No push backend enabled')
415    return
416  end
417  if settings.formatter then
418    settings.format = formatters[settings.formatter]
419    if not settings.format then
420      rspamd_logger.errx(rspamd_config, 'No such formatter: %s', settings.formatter)
421      return
422    end
423  end
424  if settings.selector then
425    settings.select = selectors[settings.selector]
426    if not settings.select then
427      rspamd_logger.errx(rspamd_config, 'No such selector: %s', settings.selector)
428      return
429    end
430  end
431  for k in pairs(settings.pusher_enabled) do
432    local formatter = settings.pusher_format[k]
433    local selector = settings.pusher_select[k]
434    if not formatter then
435      settings.pusher_format[k] = settings.formatter or 'default'
436      rspamd_logger.infox(rspamd_config, 'Using default formatter for %s pusher', k)
437    else
438      if not formatters[formatter] then
439        rspamd_logger.errx(rspamd_config, 'No such formatter: %s - disabling %s', formatter, k)
440        settings.pusher_enabled.k = nil
441      end
442    end
443    if not selector then
444      settings.pusher_select[k] = settings.selector or 'default'
445      rspamd_logger.infox(rspamd_config, 'Using default selector for %s pusher', k)
446    else
447      if not selectors[selector] then
448        rspamd_logger.errx(rspamd_config, 'No such selector: %s - disabling %s', selector, k)
449        settings.pusher_enabled.k = nil
450      end
451    end
452  end
453  if settings.pusher_enabled.redis_pubsub then
454    redis_params = rspamd_parse_redis_server(N)
455    if not redis_params then
456      rspamd_logger.errx(rspamd_config, 'No redis servers are specified')
457      settings.pusher_enabled.redis_pubsub = nil
458    else
459      local r = {}
460      r.backend = 'redis_pubsub'
461      r.channel = settings.channel
462      r.defer = settings.defer
463      r.selector = settings.pusher_select.redis_pubsub
464      r.formatter = settings.pusher_format.redis_pubsub
465      settings.rules[r.backend:upper()] = r
466    end
467  end
468  if settings.pusher_enabled.http then
469    if not settings.url then
470      rspamd_logger.errx(rspamd_config, 'No URL is specified')
471      settings.pusher_enabled.http = nil
472    else
473      local r = {}
474      r.backend = 'http'
475      r.url = settings.url
476      r.mime_type = settings.mime_type
477      r.defer = settings.defer
478      r.selector = settings.pusher_select.http
479      r.formatter = settings.pusher_format.http
480      settings.rules[r.backend:upper()] = r
481    end
482  end
483  if settings.pusher_enabled.send_mail then
484    if not (settings.mail_to and settings.smtp) then
485      rspamd_logger.errx(rspamd_config, 'No mail_to and/or smtp setting is specified')
486      settings.pusher_enabled.send_mail = nil
487    else
488      local r = {}
489      r.backend = 'send_mail'
490      r.mail_to = settings.mail_to
491      r.mail_from = settings.mail_from
492      r.helo = settings.hello
493      r.smtp = settings.smtp
494      r.smtp_port = settings.smtp_port
495      r.email_template = settings.email_template
496      r.defer = settings.defer
497      r.selector = settings.pusher_select.send_mail
498      r.formatter = settings.pusher_format.send_mail
499      settings.rules[r.backend:upper()] = r
500    end
501  end
502  if not next(settings.pusher_enabled) then
503    rspamd_logger.errx(rspamd_config, 'No push backend enabled')
504    return
505  end
506elseif not next(settings.rules) then
507  lua_util.debugm(N, rspamd_config, 'No rules enabled')
508  return
509end
510if not settings.rules or not next(settings.rules) then
511  rspamd_logger.errx(rspamd_config, 'No rules enabled')
512  return
513end
514local backend_required_elements = {
515  http = {
516    'url',
517  },
518  smtp = {
519    'mail_to',
520    'smtp',
521  },
522  redis_pubsub = {
523    'channel',
524  },
525}
526local check_element = {
527  selector = function(k, v)
528    if not selectors[v] then
529      rspamd_logger.errx(rspamd_config, 'Rule %s has invalid selector %s', k, v)
530      return false
531    else
532      return true
533    end
534  end,
535  formatter = function(k, v)
536    if not formatters[v] then
537      rspamd_logger.errx(rspamd_config, 'Rule %s has invalid formatter %s', k, v)
538      return false
539    else
540      return true
541    end
542  end,
543}
544local backend_check = {
545  default = function(k, rule)
546    local reqset = backend_required_elements[rule.backend]
547    if reqset then
548      for _, e in ipairs(reqset) do
549        if not rule[e] then
550          rspamd_logger.errx(rspamd_config, 'Rule %s misses required setting %s', k, e)
551          settings.rules[k] = nil
552        end
553      end
554    end
555    for sett, v in pairs(rule) do
556      local f = check_element[sett]
557      if f then
558        if not f(sett, v) then
559          settings.rules[k] = nil
560        end
561      end
562    end
563  end,
564}
565backend_check.redis_pubsub = function(k, rule)
566  if not redis_params then
567    redis_params = rspamd_parse_redis_server(N)
568  end
569  if not redis_params then
570    rspamd_logger.errx(rspamd_config, 'No redis servers are specified')
571    settings.rules[k] = nil
572  else
573    backend_check.default(k, rule)
574  end
575end
576setmetatable(backend_check, {
577  __index = function()
578    return backend_check.default
579  end,
580})
581for k, v in pairs(settings.rules) do
582  if type(v) == 'table' then
583    local backend = v.backend
584    if not backend then
585      rspamd_logger.errx(rspamd_config, 'Rule %s has no backend', k)
586      settings.rules[k] = nil
587    elseif not pushers[backend] then
588      rspamd_logger.errx(rspamd_config, 'Rule %s has invalid backend %s', k, backend)
589      settings.rules[k] = nil
590    else
591      local f = backend_check[backend]
592      f(k, v)
593    end
594  else
595    rspamd_logger.errx(rspamd_config, 'Rule %s has bad type: %s', k, type(v))
596    settings.rules[k] = nil
597  end
598end
599
600local function gen_exporter(rule)
601  return function (task)
602    if task:has_flag('skip') then return end
603    local selector = rule.selector or 'default'
604    local selected = selectors[selector](task)
605    if selected then
606      lua_util.debugm(N, task, 'Message selected for processing')
607      local formatter = rule.formatter or 'default'
608      local formatted, extra = formatters[formatter](task, rule)
609      if formatted then
610        pushers[rule.backend](task, formatted, rule, extra)
611      else
612        lua_util.debugm(N, task, 'Formatter [%s] returned non-truthy value [%s]', formatter, formatted)
613      end
614    else
615      lua_util.debugm(N, task, 'Selector [%s] returned non-truthy value [%s]', selector, selected)
616    end
617  end
618end
619
620if not next(settings.rules) then
621  rspamd_logger.errx(rspamd_config, 'No rules enabled')
622  lua_util.disable_module(N, "config")
623end
624for k, r in pairs(settings.rules) do
625  rspamd_config:register_symbol({
626    name = 'EXPORT_METADATA_' .. k,
627    type = 'idempotent',
628    callback = gen_exporter(r),
629    priority = 10,
630    flags = 'empty,explicit_disable,ignore_passthrough',
631  })
632end
633