1from __future__ import absolute_import 2 3from mock import patch, create_autospec 4from tornado.ioloop import IOLoop 5 6import nsq 7from nsq import event 8 9_conn_port = 4150 10 11 12def get_reader(max_in_flight=5): 13 return nsq.Reader("test", "test", 14 message_handler=_message_handler, 15 lookupd_http_addresses=["http://localhost:4161"], 16 max_in_flight=max_in_flight, 17 max_backoff_duration=2.0, 18 ) 19 20 21def get_ioloop(): 22 ioloop = create_autospec(IOLoop) 23 ioloop.time.return_value = 0 24 return ioloop 25 26 27def get_conn(reader): 28 global _conn_port 29 with patch('nsq.conn.tornado.iostream.IOStream', autospec=True): 30 conn = reader.connect_to_nsqd('localhost', _conn_port) 31 _conn_port += 1 32 conn.trigger(event.READY, conn=conn) 33 return conn 34 35 36def send_message(conn): 37 msg = _get_message(conn) 38 conn.in_flight += 1 39 conn.trigger(event.MESSAGE, conn=conn, message=msg) 40 return msg 41 42 43def _get_message(conn): 44 msg = nsq.Message("1234", "{}", 1234, 0) 45 msg.on('finish', conn._on_message_finish) 46 msg.on('requeue', conn._on_message_requeue) 47 return msg 48 49 50def _message_handler(msg): 51 msg.enable_async() 52