1--[[[ 2-- Just a test for TCP API 3--]] 4 5local rspamd_tcp = require "rspamd_tcp" 6local logger = require "rspamd_logger" 7local tcp_sync = require "lua_tcp_sync" 8 9-- [[ old fashioned callback api ]] 10local function http_simple_tcp_async_symbol(task) 11 logger.errx(task, 'http_tcp_symbol: begin') 12 local function http_get_cb(err, data, conn) 13 logger.errx(task, 'http_get_cb: got reply: %s, error: %s, conn: %s', data, err, conn) 14 task:insert_result('HTTP_ASYNC_RESPONSE_2', 1.0, data) 15 end 16 local function http_read_post_cb(err, conn) 17 logger.errx(task, 'http_read_post_cb: write done: error: %s, conn: %s', err, conn) 18 conn:add_read(http_get_cb) 19 end 20 local function http_read_cb(err, data, conn) 21 logger.errx(task, 'http_read_cb: got reply: %s, error: %s, conn: %s', data, err, conn) 22 conn:add_write(http_read_post_cb, "POST /request2 HTTP/1.1\r\n\r\n") 23 task:insert_result('HTTP_ASYNC_RESPONSE', 1.0, data or err) 24 end 25 rspamd_tcp:request({ 26 task = task, 27 callback = http_read_cb, 28 host = '127.0.0.1', 29 data = {'GET /request HTTP/1.1\r\nConnection: keep-alive\r\n\r\n'}, 30 read = true, 31 port = 18080, 32 }) 33end 34 35local function http_simple_tcp_ssl_symbol(task) 36 logger.errx(task, 'ssl_tcp_symbol: begin') 37 local function ssl_get_cb(err, data, conn) 38 logger.errx(task, 'ssl_get_cb: got reply: %s, error: %s, conn: %s', data, err, conn) 39 task:insert_result('TCP_SSL_RESPONSE_2', 1.0, tostring(data):gsub('%s', '')) 40 end 41 local function ssl_read_post_cb(err, conn) 42 logger.errx(task, 'ssl_read_post_cb: write done: error: %s, conn: %s', err, conn) 43 conn:add_read(ssl_get_cb) 44 end 45 local function ssl_read_cb(err, data, conn) 46 logger.errx(task, 'ssl_read_cb: got reply: %s, error: %s, conn: %s', data, err, conn) 47 conn:add_write(ssl_read_post_cb, "test2\n") 48 task:insert_result('TCP_SSL_RESPONSE', 1.0, tostring(data):gsub('%s', '')) 49 end 50 rspamd_tcp:request({ 51 task = task, 52 callback = ssl_read_cb, 53 host = '127.0.0.1', 54 data = {'test\n'}, 55 read = true, 56 ssl = true, 57 ssl_noverify = true, 58 port = 14433, 59 }) 60end 61 62local function http_large_tcp_ssl_symbol(task) 63 local data = {} 64 65 local function ssl_get_cb(err, rep, conn) 66 logger.errx(task, 'ssl_get_cb: got reply: %s, error: %s, conn: %s', rep, err, conn) 67 task:insert_result('TCP_SSL_LARGE_2', 1.0) 68 end 69 local function ssl_read_post_cb(err, conn) 70 logger.errx(task, 'ssl_large_read_post_cb: write done: error: %s, conn: %s', err, conn) 71 conn:add_read(ssl_get_cb) 72 end 73 local function ssl_read_cb(err, rep, conn) 74 logger.errx(task, 'ssl_large_read_cb: got reply: %s, error: %s, conn: %s', rep, err, conn) 75 conn:add_write(ssl_read_post_cb, 'foo\n') 76 task:insert_result('TCP_SSL_LARGE', 1.0) 77 end 78 79 if task:get_queue_id() == 'SSL Large TCP request' then 80 logger.errx(task, 'ssl_large_tcp_symbol: begin') 81 for i = 1,2 do 82 local st = {} 83 for j=1,300000 do 84 st[j] = 't' 85 end 86 data[i] = table.concat(st) 87 end 88 data[#data + 1] = '\n' 89 90 rspamd_tcp:request({ 91 task = task, 92 callback = ssl_read_cb, 93 host = '127.0.0.1', 94 data = data, 95 read = true, 96 ssl = true, 97 stop_pattern = '\n', 98 ssl_noverify = true, 99 port = 14433, 100 timeout = 20, 101 }) 102 else 103 logger.errx(task, 'ssl_large_tcp_symbol: skip') 104 end 105end 106 107local function http_simple_tcp_symbol(task) 108 logger.errx(task, 'connect_sync, before') 109 110 local err 111 local is_ok, connection = tcp_sync.connect { 112 task = task, 113 host = '127.0.0.1', 114 timeout = 20, 115 port = 18080, 116 } 117 118 if not is_ok then 119 task:insert_result('HTTP_SYNC_WRITE_ERROR', 1.0, connection) 120 logger.errx(task, 'write error: %1', connection) 121 end 122 123 logger.errx(task, 'connect_sync %1, %2', is_ok, tostring(connection)) 124 125 is_ok, err = connection:write('GET /request_sync HTTP/1.1\r\nConnection: keep-alive\r\n\r\n') 126 127 logger.errx(task, 'write %1, %2', is_ok, err) 128 if not is_ok then 129 task:insert_result('HTTP_SYNC_WRITE_ERROR', 1.0, err) 130 logger.errx(task, 'write error: %1', err) 131 end 132 133 local data 134 local got_content = '' 135 repeat 136 is_ok, data = connection:read_once(); 137 logger.errx(task, 'read_once: is_ok: %1, data: %2', is_ok, data) 138 if not is_ok then 139 task:insert_result('HTTP_SYNC_ERROR', 1.0, data) 140 return 141 else 142 got_content = got_content .. data 143 end 144 if got_content:find('hello') then 145 -- dummy_http.py responds with either hello world or hello post 146 break 147 end 148 until false 149 150 task:insert_result('HTTP_SYNC_RESPONSE', 1.0, got_content) 151 152 is_ok, err = connection:write("POST /request2 HTTP/1.1\r\n\r\n") 153 logger.errx(task, 'write[2] %1, %2', is_ok, err) 154 155 got_content = '' 156 repeat 157 is_ok, data = connection:read_once(); 158 logger.errx(task, 'read_once[2]: is_ok %1, data: %2', is_ok, data) 159 if not is_ok then 160 task:insert_result('HTTP_SYNC_ERROR_2', 1.0, data) 161 return 162 else 163 got_content = got_content .. data 164 end 165 if got_content:find('hello') then 166 -- dummy_http.py responds with either hello world or hello post 167 break 168 end 169 until false 170 171 task:insert_result('HTTP_SYNC_RESPONSE_2', 1.0, data) 172 173 connection:close() 174end 175 176local function http_tcp_symbol(task) 177 local url = tostring(task:get_request_header('url')) 178 local method = tostring(task:get_request_header('method')) 179 180 if url == 'nil' then 181 return 182 end 183 184 local err 185 local is_ok, connection = tcp_sync.connect { 186 task = task, 187 host = '127.0.0.1', 188 timeout = 20, 189 port = 18080, 190 } 191 192 logger.errx(task, 'connect_sync %1, %2', is_ok, tostring(connection)) 193 if not is_ok then 194 logger.errx(task, 'connect error: %1', connection) 195 return 196 end 197 198 is_ok, err = connection:write(string.format('%s %s HTTP/1.1\r\nConnection: close\r\n\r\n', method:upper(), url)) 199 200 logger.errx(task, 'write %1, %2', is_ok, err) 201 if not is_ok then 202 logger.errx(task, 'write error: %1', err) 203 return 204 end 205 206 local content_length, content 207 208 while true do 209 local header_line 210 is_ok, header_line = connection:read_until("\r\n") 211 if not is_ok then 212 logger.errx(task, 'failed to get header: %1', header_line) 213 return 214 end 215 216 if header_line == "" then 217 logger.errx(task, 'headers done') 218 break 219 end 220 221 local value 222 local header = header_line:gsub("([%w-]+): (.*)", 223 function (h, v) value = v; return h:lower() end) 224 225 logger.errx(task, 'parsed header: %1 -> "%2"', header, value) 226 227 if header == "content-length" then 228 content_length = tonumber(value) 229 end 230 231 end 232 233 if content_length then 234 is_ok, content = connection:read_bytes(content_length) 235 if is_ok then 236 task:insert_result('HTTP_SYNC_CONTENT_' .. method, 1.0, content) 237 end 238 else 239 is_ok, content = connection:read_until_eof() 240 if is_ok then 241 task:insert_result('HTTP_SYNC_EOF_' .. method, 1.0, content) 242 end 243 end 244 logger.errx(task, '(is_ok: %1) content [%2 bytes] %3', is_ok, content_length, content) 245end 246 247rspamd_config:register_symbol({ 248 name = 'SIMPLE_TCP_ASYNC_TEST', 249 score = 1.0, 250 callback = http_simple_tcp_async_symbol, 251 no_squeeze = true 252}) 253rspamd_config:register_symbol({ 254 name = 'SIMPLE_TCP_ASYNC_SSL_TEST', 255 score = 1.0, 256 callback = http_simple_tcp_ssl_symbol, 257 no_squeeze = true 258}) 259rspamd_config:register_symbol({ 260 name = 'LARGE_TCP_ASYNC_SSL_TEST', 261 score = 1.0, 262 callback = http_large_tcp_ssl_symbol, 263 no_squeeze = true 264}) 265rspamd_config:register_symbol({ 266 name = 'SIMPLE_TCP_TEST', 267 score = 1.0, 268 callback = http_simple_tcp_symbol, 269 no_squeeze = true, 270 flags = 'coro', 271}) 272 273rspamd_config:register_symbol({ 274 name = 'HTTP_TCP_TEST', 275 score = 1.0, 276 callback = http_tcp_symbol, 277 no_squeeze = true, 278 flags = 'coro', 279}) 280-- ]] 281