1from time import sleep, time
2import unittest
3from redis import ResponseError
4from includes import *
5from common import waitForIndex
6
7
8def to_dict(res):
9    d = {res[i]: res[i + 1] for i in range(0, len(res), 2)}
10    return d
11
12
13def loadDocs(env, count=100, idx='idx', text='hello world'):
14    env.expect('FT.CREATE', idx, 'ON', 'HASH', 'prefix', 1, idx, 'SCHEMA', 'f1', 'TEXT').ok()
15    waitForIndex(env, idx)
16    for x in range(count):
17        cmd = ['FT.ADD', idx, '{}_doc{}'.format(idx, x), 1.0, 'FIELDS', 'f1', text]
18        env.cmd(*cmd)
19    r1 = env.cmd('ft.search', idx, text)
20    r2 = list(set(map(lambda x: x[1], filter(lambda x: isinstance(x, list), r1))))
21    env.assertEqual([text], r2)
22    r3 = env.cmd('ft.info', idx)
23    env.assertEqual(count, int(r3[r3.index('num_docs') + 1]))
24
25def exhaustCursor(env, idx, resp, *args):
26    first, cid = resp
27    rows = [resp]
28    while cid:
29        resp, cid=env.cmd('FT.CURSOR', 'READ', idx, cid, *args)
30        rows.append([resp, cid])
31    return rows
32
33def getCursorStats(env, idx='idx'):
34    info = env.cmd('FT.INFO', idx)
35    try:
36        info_dict = to_dict(info)['cursor_stats']
37    except:
38        return {'index_total' : 0, 'global_total' : 0}
39    return to_dict(info_dict)
40
41def testCursors(env):
42    loadDocs(env)
43    query = ['FT.AGGREGATE', 'idx', '*', 'LOAD', 1, '@f1', 'WITHCURSOR']
44    resp = env.cmd(*query)
45
46    # Check info and see if there are other cursors
47    info = getCursorStats(env)
48    env.assertEqual(0, info['global_total'])
49
50    resp = exhaustCursor(env, 'idx', resp)
51    env.assertEqual(1, len(resp)) # Only one response
52    env.assertEqual(0, resp[0][1])
53    env.assertEqual(101, len(resp[0][0]))
54
55    # Issue the same query, but using a specified count
56    resp = env.cmd(*(query[::]+['COUNT', 10]))
57
58    resp = exhaustCursor(env, 'idx', resp)
59    env.assertEqual(11, len(resp))
60
61def testMultipleIndexes(env):
62    loadDocs(env, idx='idx2', text='goodbye')
63    loadDocs(env, idx='idx1', text='hello')
64    q1 = ['FT.AGGREGATE', 'idx1', '*', 'LOAD', 1, '@f1', 'WITHCURSOR', 'COUNT', 10 ]
65    q2 = q1[::]
66    q2[1] = 'idx2'
67    waitForIndex(env, 'idx1')
68    waitForIndex(env, 'idx2')
69    r1 = exhaustCursor(env, 'idx1', env.cmd( * q1))
70    r2 = exhaustCursor(env, 'idx2', env.cmd( * q2))
71    env.assertEqual(11, len(r1[0][0]))
72    env.assertEqual(11, len(r2[0][0]))
73    # Compare last results
74    last1 = r1[0][0][10]
75    last2 = r2[0][0][10]
76    env.assertEqual(['f1', 'hello'], last1)
77    env.assertEqual(['f1', 'goodbye'], last2)
78
79def testCapacities(env):
80    if env.is_cluster():
81        raise unittest.SkipTest()
82    loadDocs(env, idx='idx1')
83    loadDocs(env, idx='idx2')
84    q1 = ['FT.AGGREGATE', 'idx1', '*', 'LOAD', '1', '@f1', 'WITHCURSOR', 'COUNT', 10]
85    q2 = q1[::]
86    q2[1] = 'idx2'
87
88    cursors1 = []
89    cursors2 = []
90    for _ in range(128):
91        r1 = env.cmd(*q1)
92        r2 = env.cmd(*q2)
93        cursors1.append(r1)
94        cursors2.append(r2)
95
96    # Get info for the cursors
97    info = getCursorStats(env, 'idx1')
98    env.assertEqual(128, info['index_total'])
99    env.assertEqual(256, info['global_total'])
100    info = getCursorStats(env, 'idx2')
101    env.assertEqual(128, info['index_total'])
102
103    # Try to create another cursor
104    env.assertRaises(ResponseError, env.cmd, * q1)
105    env.assertRaises(ResponseError, env.cmd, * q2)
106
107    # Clear all the cursors
108    for c in cursors1:
109        env.cmd('FT.CURSOR', 'DEL', 'idx1', c[-1])
110    env.assertEqual(0, getCursorStats(env, 'idx1')['index_total'])
111
112    # Check that we can create a new cursor
113    c = env.cmd( * q1)
114    env.cmd('FT.CURSOR', 'DEL', 'idx1', c[-1])
115
116def testTimeout(env):
117    # currently this test is only valid on one shard because coordinator creates more cursor which are not clean
118    # with the same timeout
119    env.skipOnCluster()
120    loadDocs(env, idx='idx1')
121    # Maximum idle of 1ms
122    q1 = ['FT.AGGREGATE', 'idx1', '*', 'LOAD', '1', '@f1', 'WITHCURSOR', 'COUNT', 10, 'MAXIDLE', 1]
123    resp = env.cmd(*q1)
124    exptime = time() + 2.5
125    rv = 1
126    while time() < exptime:
127        sleep(0.01)
128        env.cmd('FT.CURSOR', 'GC', 'idx1', '0')
129        rv = getCursorStats(env, 'idx1')['index_total']
130        if not rv:
131            break
132    env.assertEqual(0, rv)
133'''
134def testErrors(env):
135    env.expect('ft.create idx schema name text').equal('OK')
136    #env.expect('ft.add idx hotel 1.0 fields name hilton').equal('OK')
137    env.expect('FT.AGGREGATE idx hilton withcursor').error()       \
138        .contains('Index `idx` does not have cursors enabled')
139'''
140def testLeaked(env):
141    # Test ensures in CursorList_Destroy() checks shutdown with remaining cursors
142    loadDocs(env)
143    env.expect('FT.AGGREGATE idx * LOAD 1 @f1 WITHCURSOR COUNT 1 MAXIDLE 1')
144