1from __future__ import absolute_import
2
3from mock import patch, create_autospec
4from tornado.ioloop import IOLoop
5
6import nsq
7from nsq import event
8
9_conn_port = 4150
10
11
12def get_reader(max_in_flight=5):
13    return nsq.Reader("test", "test",
14                      message_handler=_message_handler,
15                      lookupd_http_addresses=["http://localhost:4161"],
16                      max_in_flight=max_in_flight,
17                      max_backoff_duration=2.0,
18                      )
19
20
21def get_ioloop():
22    ioloop = create_autospec(IOLoop)
23    ioloop.time.return_value = 0
24    return ioloop
25
26
27def get_conn(reader):
28    global _conn_port
29    with patch('nsq.conn.tornado.iostream.IOStream', autospec=True):
30        conn = reader.connect_to_nsqd('localhost', _conn_port)
31    _conn_port += 1
32    conn.trigger(event.READY, conn=conn)
33    return conn
34
35
36def send_message(conn):
37    msg = _get_message(conn)
38    conn.in_flight += 1
39    conn.trigger(event.MESSAGE, conn=conn, message=msg)
40    return msg
41
42
43def _get_message(conn):
44    msg = nsq.Message("1234", "{}", 1234, 0)
45    msg.on('finish', conn._on_message_finish)
46    msg.on('requeue', conn._on_message_requeue)
47    return msg
48
49
50def _message_handler(msg):
51    msg.enable_async()
52