1-- XEP-0313: Message Archive Management for Prosody 2-- Copyright (C) 2011-2016 Kim Alvefur 3-- 4-- This file is MIT/X11 licensed. 5 6local xmlns_mam0 = "urn:xmpp:mam:0"; 7local xmlns_mam1 = "urn:xmpp:mam:1"; 8local xmlns_mam2 = "urn:xmpp:mam:2"; 9local xmlns_delay = "urn:xmpp:delay"; 10local xmlns_forward = "urn:xmpp:forward:0"; 11local xmlns_st_id = "urn:xmpp:sid:0"; 12 13local um = require "core.usermanager"; 14local st = require "util.stanza"; 15local rsm = module:require "rsm"; 16local get_prefs = module:require"mamprefs".get; 17local set_prefs = module:require"mamprefs".set; 18local prefs_to_stanza = module:require"mamprefsxml".tostanza; 19local prefs_from_stanza = module:require"mamprefsxml".fromstanza; 20local jid_bare = require "util.jid".bare; 21local jid_split = require "util.jid".split; 22local jid_prepped_split = require "util.jid".prepped_split; 23local dataform = require "util.dataforms".new; 24local host = module.host; 25 26local rm_load_roster = require "core.rostermanager".load_roster; 27 28local getmetatable = getmetatable; 29local function is_stanza(x) 30 return getmetatable(x) == st.stanza_mt; 31end 32 33local tostring = tostring; 34local time_now = os.time; 35local m_min = math.min; 36local timestamp, timestamp_parse = require "util.datetime".datetime, require "util.datetime".parse; 37local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50); 38local global_default_policy = module:get_option("default_archive_policy", true); 39if global_default_policy ~= "roster" then 40 global_default_policy = module:get_option_boolean("default_archive_policy", global_default_policy); 41end 42 43local archive_store = module:get_option_string("archive_store", "archive2"); 44local archive = module:open_store(archive_store, "archive"); 45 46if archive.name == "null" or not archive.find then 47 -- luacheck: ignore 631 48 if not archive.find then 49 module:log("debug", "Attempt to open archive storage returned a valid driver but it does not seem to implement the storage API"); 50 module:log("debug", "mod_%s does not support archiving", archive._provided_by or archive.name and "storage_"..archive.name.."(?)" or "<unknown>"); 51 else 52 module:log("debug", "Attempt to open archive storage returned null driver"); 53 end 54 module:log("debug", "See https://prosody.im/doc/storage and https://prosody.im/doc/archiving for more information"); 55 module:log("info", "Using in-memory fallback archive driver"); 56 archive = module:require "fallback_archive"; 57end 58 59local use_total = true; 60 61local cleanup; 62 63local function schedule_cleanup(username) 64 if cleanup and not cleanup[username] then 65 table.insert(cleanup, username); 66 cleanup[username] = true; 67 end 68end 69 70-- Handle prefs. 71local function handle_prefs(event) 72 local origin, stanza = event.origin, event.stanza; 73 local xmlns_mam = stanza.tags[1].attr.xmlns; 74 local user = origin.username; 75 if stanza.attr.type == "get" then 76 local prefs = prefs_to_stanza(get_prefs(user), xmlns_mam); 77 local reply = st.reply(stanza):add_child(prefs); 78 origin.send(reply); 79 else -- type == "set" 80 local new_prefs = stanza:get_child("prefs", xmlns_mam); 81 local prefs = prefs_from_stanza(new_prefs); 82 local ok, err = set_prefs(user, prefs); 83 if not ok then 84 origin.send(st.error_reply(stanza, "cancel", "internal-server-error", "Error storing preferences: "..tostring(err))); 85 else 86 origin.send(st.reply(stanza)); 87 end 88 end 89 return true; 90end 91 92module:hook("iq/self/"..xmlns_mam0..":prefs", handle_prefs); 93module:hook("iq/self/"..xmlns_mam1..":prefs", handle_prefs); 94module:hook("iq/self/"..xmlns_mam2..":prefs", handle_prefs); 95 96local query_form = dataform { 97 { name = "FORM_TYPE"; type = "hidden"; value = xmlns_mam0; }; 98 { name = "with"; type = "jid-single"; }; 99 { name = "start"; type = "text-single" }; 100 { name = "end"; type = "text-single"; }; 101}; 102 103-- Serve form 104local function handle_get_form(event) 105 local origin, stanza = event.origin, event.stanza; 106 local xmlns_mam = stanza.tags[1].attr.xmlns; 107 query_form[1].value = xmlns_mam; 108 origin.send(st.reply(stanza):query(xmlns_mam):add_child(query_form:form())); 109 return true; 110end 111 112module:hook("iq-get/self/"..xmlns_mam0..":query", handle_get_form); 113module:hook("iq-get/self/"..xmlns_mam1..":query", handle_get_form); 114module:hook("iq-get/self/"..xmlns_mam2..":query", handle_get_form); 115 116-- Handle archive queries 117local function handle_mam_query(event) 118 local origin, stanza = event.origin, event.stanza; 119 local xmlns_mam = stanza.tags[1].attr.xmlns; 120 local query = stanza.tags[1]; 121 local qid = query.attr.queryid; 122 123 origin.mam_requested = true; 124 125 schedule_cleanup(origin.username); 126 127 -- Search query parameters 128 local qwith, qstart, qend; 129 local form = query:get_child("x", "jabber:x:data"); 130 if form then 131 local err; 132 query_form[1].value = xmlns_mam; 133 form, err = query_form:data(form); 134 if err then 135 origin.send(st.error_reply(stanza, "modify", "bad-request", select(2, next(err)))); 136 return true; 137 end 138 qwith, qstart, qend = form["with"], form["start"], form["end"]; 139 qwith = qwith and jid_bare(qwith); -- dataforms does jidprep 140 end 141 142 if qstart or qend then -- Validate timestamps 143 local vstart, vend = (qstart and timestamp_parse(qstart)), (qend and timestamp_parse(qend)); 144 if (qstart and not vstart) or (qend and not vend) then 145 origin.send(st.error_reply(stanza, "modify", "bad-request", "Invalid timestamp")) 146 return true; 147 end 148 qstart, qend = vstart, vend; 149 end 150 151 module:log("debug", "Archive query, id %s with %s from %s until %s)", 152 tostring(qid), qwith or "anyone", qstart or "the dawn of time", qend or "now"); 153 154 -- RSM stuff 155 local qset = rsm.get(query); 156 local qmax = m_min(qset and qset.max or default_max_items, max_max_items); 157 local reverse = qset and qset.before or false; 158 local before, after = qset and qset.before, qset and qset.after; 159 if type(before) ~= "string" then before = nil; end 160 161 -- Load all the data! 162 local data, err = archive:find(origin.username, { 163 start = qstart; ["end"] = qend; -- Time range 164 with = qwith; 165 limit = qmax + 1; 166 before = before; after = after; 167 reverse = reverse; 168 total = use_total; 169 }); 170 171 if not data then 172 origin.send(st.error_reply(stanza, "cancel", "internal-server-error", err)); 173 return true; 174 end 175 local total = tonumber(err); 176 177 if xmlns_mam == xmlns_mam0 then 178 origin.send(st.reply(stanza)); 179 end 180 local msg_reply_attr = { to = stanza.attr.from, from = stanza.attr.to }; 181 182 local results = {}; 183 184 -- Wrap it in stuff and deliver 185 local first, last; 186 local count = 0; 187 local complete = "true"; 188 for id, item, when in data do 189 count = count + 1; 190 if count > qmax then 191 complete = nil; 192 break; 193 end 194 local fwd_st = st.message(msg_reply_attr) 195 :tag("result", { xmlns = xmlns_mam, queryid = qid, id = id }) 196 :tag("forwarded", { xmlns = xmlns_forward }) 197 :tag("delay", { xmlns = xmlns_delay, stamp = timestamp(when) }):up(); 198 199 if not is_stanza(item) then 200 item = st.deserialize(item); 201 end 202 item.attr.xmlns = "jabber:client"; 203 fwd_st:add_child(item); 204 205 if not first then first = id; end 206 last = id; 207 208 if reverse then 209 results[count] = fwd_st; 210 else 211 origin.send(fwd_st); 212 end 213 end 214 215 if reverse then 216 for i = #results, 1, -1 do 217 origin.send(results[i]); 218 end 219 first, last = last, first; 220 end 221 222 -- That's all folks! 223 module:log("debug", "Archive query %s completed", tostring(qid)); 224 225 local fin; 226 if xmlns_mam == xmlns_mam0 then 227 fin = st.message(msg_reply_attr); 228 else 229 fin = st.reply(stanza); 230 end 231 do 232 fin:tag("fin", { xmlns = xmlns_mam, queryid = qid, complete = complete }) 233 :add_child(rsm.generate { 234 first = first, last = last, count = total }) 235 end 236 origin.send(fin); 237 return true; 238end 239module:hook("iq-set/self/"..xmlns_mam0..":query", handle_mam_query); 240module:hook("iq-set/self/"..xmlns_mam1..":query", handle_mam_query); 241module:hook("iq-set/self/"..xmlns_mam2..":query", handle_mam_query); 242 243local function has_in_roster(user, who) 244 local roster = rm_load_roster(user, host); 245 module:log("debug", "%s has %s in roster? %s", user, who, roster[who] and "yes" or "no"); 246 return roster[who]; 247end 248 249local function shall_store(user, who) 250 -- TODO Cache this? 251 if not um.user_exists(user, host) then 252 return false; 253 end 254 local prefs = get_prefs(user); 255 local rule = prefs[who]; 256 module:log("debug", "%s's rule for %s is %s", user, who, tostring(rule)); 257 if rule ~= nil then 258 return rule; 259 end 260 -- Below could be done by a metatable 261 local default = prefs[false]; 262 module:log("debug", "%s's default rule is %s", user, tostring(default)); 263 if default == nil then 264 default = global_default_policy; 265 module:log("debug", "Using global default rule, %s", tostring(default)); 266 end 267 if default == "roster" then 268 return has_in_roster(user, who); 269 end 270 return default; 271end 272 273-- Handle messages 274local function message_handler(event, c2s) 275 local origin, stanza = event.origin, event.stanza; 276 local log = c2s and origin.log or module._log; 277 local orig_type = stanza.attr.type or "normal"; 278 local orig_from = stanza.attr.from; 279 local orig_to = stanza.attr.to or orig_from; 280 -- Stanza without 'to' are treated as if it was to their own bare jid 281 282 -- Whos storage do we put it in? 283 local store_user = c2s and origin.username or jid_split(orig_to); 284 -- And who are they chatting with? 285 local with = jid_bare(c2s and orig_to or orig_from); 286 287 -- Filter out <stanza-id> that claim to be from us 288 if stanza:get_child("stanza-id", xmlns_st_id) then 289 stanza = st.clone(stanza); 290 stanza:maptags(function (tag) 291 if tag.name == "stanza-id" and tag.attr.xmlns == xmlns_st_id then 292 local by_user, by_host, res = jid_prepped_split(tag.attr.by); 293 if not res and by_host == module.host and by_user == store_user then 294 return nil; 295 end 296 end 297 return tag; 298 end); 299 event.stanza = stanza; 300 end 301 302 -- We store chat messages or normal messages that have a body 303 if not(orig_type == "chat" or (orig_type == "normal" and stanza:get_child("body")) ) then 304 log("debug", "Not archiving stanza: %s (type)", stanza:top_tag()); 305 return; 306 end 307 308 -- or if hints suggest we shouldn't 309 if not stanza:get_child("store", "urn:xmpp:hints") then -- No hint telling us we should store 310 if stanza:get_child("no-permanent-store", "urn:xmpp:hints") 311 or stanza:get_child("no-store", "urn:xmpp:hints") then -- Hint telling us we should NOT store 312 log("debug", "Not archiving stanza: %s (hint)", stanza:top_tag()); 313 return; 314 end 315 end 316 317 -- Check with the users preferences 318 if shall_store(store_user, with) then 319 log("debug", "Archiving stanza: %s", stanza:top_tag()); 320 321 -- And stash it 322 local ok = archive:append(store_user, nil, stanza, time_now(), with); 323 if ok then 324 local clone_for_other_handlers = st.clone(stanza); 325 local id = ok; 326 clone_for_other_handlers:tag("stanza-id", { xmlns = xmlns_st_id, by = store_user.."@"..host, id = id }):up(); 327 event.stanza = clone_for_other_handlers; 328 schedule_cleanup(store_user); 329 module:fire_event("archive-message-added", { origin = origin, stanza = stanza, for_user = store_user, id = id }); 330 end 331 else 332 log("debug", "Not archiving stanza: %s (prefs)", stanza:top_tag()); 333 end 334end 335 336local function c2s_message_handler(event) 337 return message_handler(event, true); 338end 339 340local function strip_stanza_id(event) 341 local strip_by = jid_bare(event.origin.full_jid); 342 event.stanza = st.clone(event.stanza); 343 event.stanza:maptags(function(tag) 344 if not ( tag.attr.xmlns == xmlns_st_id and tag.attr.by == strip_by ) then 345 return tag; 346 end 347 end); 348end 349 350module:hook("pre-message/bare", strip_stanza_id, 0.01); 351module:hook("pre-message/full", strip_stanza_id, 0.01); 352 353local cleanup_after = module:get_option_string("archive_expires_after", "1w"); 354local cleanup_interval = module:get_option_number("archive_cleanup_interval", 4 * 60 * 60); 355if cleanup_after ~= "never" then 356 local day = 86400; 357 local multipliers = { d = day, w = day * 7, m = 31 * day, y = 365.2425 * day }; 358 local n, m = cleanup_after:lower():match("(%d+)%s*([dwmy]?)"); 359 if not n then 360 module:log("error", "Could not parse archive_expires_after string %q", cleanup_after); 361 return false; 362 end 363 364 cleanup_after = tonumber(n) * ( multipliers[m] or 1 ); 365 366 module:log("debug", "archive_expires_after = %d -- in seconds", cleanup_after); 367 368 if not archive.delete then 369 module:log("error", "archive_expires_after set but mod_%s does not support deleting", archive._provided_by); 370 return false; 371 end 372 373 -- Set of known users to do message expiry for 374 -- Populated either below or when new messages are added 375 cleanup = {}; 376 377 -- Iterating over users is not supported by all authentication modules 378 -- Catch and ignore error if not supported 379 pcall(function () 380 -- If this works, then we schedule cleanup for all known users on startup 381 for user in um.users(module.host) do 382 schedule_cleanup(user); 383 end 384 end); 385 386 -- At odd intervals, delete old messages for one user 387 module:add_timer(math.random(10, 60), function() 388 local user = table.remove(cleanup, 1); 389 if user then 390 module:log("debug", "Removing old messages for user %q", user); 391 local ok, err = archive:delete(user, { ["end"] = os.time() - cleanup_after; }) 392 if not ok then 393 module:log("warn", "Could not expire archives for user %s: %s", user, err); 394 else 395 -- :affected() is a recent addition for eg SQLite3 in LuaDBI 396 pcall(function(stmt) 397 module:log("debug", "Removed %d messages", stmt:affected()); 398 end, err); 399 end 400 cleanup[user] = nil; 401 end 402 return math.random(cleanup_interval, cleanup_interval * 2); 403 end); 404else 405 -- Don't ask the backend to count the potentially unbounded number of items, 406 -- it'll get slow. 407 use_total = false; 408end 409 410-- Stanzas sent by local clients 411local priority = 0.075 412assert(priority < 0.1, "priority must be after mod_firewall"); 413assert(priority > 0.05, "priority must be before mod_carbons"); 414assert(priority > 0.01, "priority must be before strip_stanza_id"); 415module:hook("pre-message/bare", c2s_message_handler, priority); 416module:hook("pre-message/full", c2s_message_handler, priority); 417-- Stanszas to local clients 418priority = 0.075 419assert(priority > 0, "priority must be before mod_message"); 420assert(priority < 0.1, "priority must be after mod_firewall"); 421assert(priority > 0.05, "priority must be before mod_carbons"); 422module:hook("message/bare", message_handler, priority); 423module:hook("message/full", message_handler, priority); 424 425module:add_feature(xmlns_mam0); -- COMPAT with XEP-0313 v 0.1 426 427module:hook("account-disco-info", function(event) 428 (event.reply or event.stanza):tag("feature", {var=xmlns_mam0}):up(); 429 (event.reply or event.stanza):tag("feature", {var=xmlns_mam1}):up(); 430 (event.reply or event.stanza):tag("feature", {var=xmlns_mam2}):up(); 431 (event.reply or event.stanza):tag("feature", {var=xmlns_st_id}):up(); 432end); 433 434