1from time import sleep, time 2import unittest 3from redis import ResponseError 4from includes import * 5from common import waitForIndex 6 7 8def to_dict(res): 9 d = {res[i]: res[i + 1] for i in range(0, len(res), 2)} 10 return d 11 12 13def loadDocs(env, count=100, idx='idx', text='hello world'): 14 env.expect('FT.CREATE', idx, 'ON', 'HASH', 'prefix', 1, idx, 'SCHEMA', 'f1', 'TEXT').ok() 15 waitForIndex(env, idx) 16 for x in range(count): 17 cmd = ['FT.ADD', idx, '{}_doc{}'.format(idx, x), 1.0, 'FIELDS', 'f1', text] 18 env.cmd(*cmd) 19 r1 = env.cmd('ft.search', idx, text) 20 r2 = list(set(map(lambda x: x[1], filter(lambda x: isinstance(x, list), r1)))) 21 env.assertEqual([text], r2) 22 r3 = env.cmd('ft.info', idx) 23 env.assertEqual(count, int(r3[r3.index('num_docs') + 1])) 24 25def exhaustCursor(env, idx, resp, *args): 26 first, cid = resp 27 rows = [resp] 28 while cid: 29 resp, cid=env.cmd('FT.CURSOR', 'READ', idx, cid, *args) 30 rows.append([resp, cid]) 31 return rows 32 33def getCursorStats(env, idx='idx'): 34 info = env.cmd('FT.INFO', idx) 35 try: 36 info_dict = to_dict(info)['cursor_stats'] 37 except: 38 return {'index_total' : 0, 'global_total' : 0} 39 return to_dict(info_dict) 40 41def testCursors(env): 42 loadDocs(env) 43 query = ['FT.AGGREGATE', 'idx', '*', 'LOAD', 1, '@f1', 'WITHCURSOR'] 44 resp = env.cmd(*query) 45 46 # Check info and see if there are other cursors 47 info = getCursorStats(env) 48 env.assertEqual(0, info['global_total']) 49 50 resp = exhaustCursor(env, 'idx', resp) 51 env.assertEqual(1, len(resp)) # Only one response 52 env.assertEqual(0, resp[0][1]) 53 env.assertEqual(101, len(resp[0][0])) 54 55 # Issue the same query, but using a specified count 56 resp = env.cmd(*(query[::]+['COUNT', 10])) 57 58 resp = exhaustCursor(env, 'idx', resp) 59 env.assertEqual(11, len(resp)) 60 61def testMultipleIndexes(env): 62 loadDocs(env, idx='idx2', text='goodbye') 63 loadDocs(env, idx='idx1', text='hello') 64 q1 = ['FT.AGGREGATE', 'idx1', '*', 'LOAD', 1, '@f1', 'WITHCURSOR', 'COUNT', 10 ] 65 q2 = q1[::] 66 q2[1] = 'idx2' 67 waitForIndex(env, 'idx1') 68 waitForIndex(env, 'idx2') 69 r1 = exhaustCursor(env, 'idx1', env.cmd( * q1)) 70 r2 = exhaustCursor(env, 'idx2', env.cmd( * q2)) 71 env.assertEqual(11, len(r1[0][0])) 72 env.assertEqual(11, len(r2[0][0])) 73 # Compare last results 74 last1 = r1[0][0][10] 75 last2 = r2[0][0][10] 76 env.assertEqual(['f1', 'hello'], last1) 77 env.assertEqual(['f1', 'goodbye'], last2) 78 79def testCapacities(env): 80 if env.is_cluster(): 81 raise unittest.SkipTest() 82 loadDocs(env, idx='idx1') 83 loadDocs(env, idx='idx2') 84 q1 = ['FT.AGGREGATE', 'idx1', '*', 'LOAD', '1', '@f1', 'WITHCURSOR', 'COUNT', 10] 85 q2 = q1[::] 86 q2[1] = 'idx2' 87 88 cursors1 = [] 89 cursors2 = [] 90 for _ in range(128): 91 r1 = env.cmd(*q1) 92 r2 = env.cmd(*q2) 93 cursors1.append(r1) 94 cursors2.append(r2) 95 96 # Get info for the cursors 97 info = getCursorStats(env, 'idx1') 98 env.assertEqual(128, info['index_total']) 99 env.assertEqual(256, info['global_total']) 100 info = getCursorStats(env, 'idx2') 101 env.assertEqual(128, info['index_total']) 102 103 # Try to create another cursor 104 env.assertRaises(ResponseError, env.cmd, * q1) 105 env.assertRaises(ResponseError, env.cmd, * q2) 106 107 # Clear all the cursors 108 for c in cursors1: 109 env.cmd('FT.CURSOR', 'DEL', 'idx1', c[-1]) 110 env.assertEqual(0, getCursorStats(env, 'idx1')['index_total']) 111 112 # Check that we can create a new cursor 113 c = env.cmd( * q1) 114 env.cmd('FT.CURSOR', 'DEL', 'idx1', c[-1]) 115 116def testTimeout(env): 117 # currently this test is only valid on one shard because coordinator creates more cursor which are not clean 118 # with the same timeout 119 env.skipOnCluster() 120 loadDocs(env, idx='idx1') 121 # Maximum idle of 1ms 122 q1 = ['FT.AGGREGATE', 'idx1', '*', 'LOAD', '1', '@f1', 'WITHCURSOR', 'COUNT', 10, 'MAXIDLE', 1] 123 resp = env.cmd(*q1) 124 exptime = time() + 2.5 125 rv = 1 126 while time() < exptime: 127 sleep(0.01) 128 env.cmd('FT.CURSOR', 'GC', 'idx1', '0') 129 rv = getCursorStats(env, 'idx1')['index_total'] 130 if not rv: 131 break 132 env.assertEqual(0, rv) 133''' 134def testErrors(env): 135 env.expect('ft.create idx schema name text').equal('OK') 136 #env.expect('ft.add idx hotel 1.0 fields name hilton').equal('OK') 137 env.expect('FT.AGGREGATE idx hilton withcursor').error() \ 138 .contains('Index `idx` does not have cursors enabled') 139''' 140def testLeaked(env): 141 # Test ensures in CursorList_Destroy() checks shutdown with remaining cursors 142 loadDocs(env) 143 env.expect('FT.AGGREGATE idx * LOAD 1 @f1 WITHCURSOR COUNT 1 MAXIDLE 1') 144