1import subprocess 2import signal 3import os 4import os.path 5from RLTest import Env 6import time 7import random 8from includes import * 9from common import waitForIndex 10 11 12class TimeoutException(Exception): 13 pass 14 15class TimeLimit(object): 16 """ 17 A context manager that fires a TimeExpired exception if it does not 18 return within the specified amount of time. 19 """ 20 def __init__(self, timeout): 21 self.timeout = timeout 22 def __enter__(self): 23 signal.signal(signal.SIGALRM, self.handler) 24 signal.setitimer(signal.ITIMER_REAL, self.timeout, 0) 25 def __exit__(self, exc_type, exc_value, traceback): 26 signal.setitimer(signal.ITIMER_REAL, 0) 27 signal.signal(signal.SIGALRM, signal.SIG_DFL) 28 def handler(self, signum, frame): 29 raise TimeoutException() 30 31def checkSlaveSynced(env, slaveConn, command, expected_result, time_out=5): 32 try: 33 with TimeLimit(time_out): 34 res = slaveConn.execute_command(*command) 35 while res != expected_result: 36 res = slaveConn.execute_command(*command) 37 except TimeoutException: 38 env.assertTrue(False, message='Failed waiting for command to be executed on slave') 39 except Exception as e: 40 env.assertTrue(False, message=e.message) 41 42def testDelReplicate(): 43 env = Env(useSlaves=True, forceTcp=True) 44 45 env.skipOnCluster() 46 47 ## on existing env we can not get a slave connection 48 ## so we can no test it 49 if env.env == 'existing-env': 50 env.skip() 51 52 master = env.getConnection() 53 slave = env.getSlaveConnection() 54 env.assertTrue(master.execute_command("ping")) 55 env.assertTrue(slave.execute_command("ping")) 56 env.assertOk(master.execute_command('ft.create', 'idx', 'ON', 'HASH', 'FILTER', 'startswith(@__key, "")', 'schema', 'f', 'text')) 57 env.cmd('set', 'indicator', '1') 58 checkSlaveSynced(env, slave, ('exists', 'indicator'), 1, time_out=20) 59 60 for i in range(10): 61 master.execute_command('ft.add', 'idx', 'doc%d' % i, 1.0, 'fields', 62 'f', 'hello world') 63 time.sleep(0.01) 64 checkSlaveSynced(env, slave, ('ft.get', 'idx', 'doc9'), ['f', 'hello world'], time_out=20) 65 66 for i in range(10): 67 # checking for insertion 68 env.assertEqual(['f', 'hello world'], 69 master.execute_command('ft.get', 'idx', 'doc%d' % i)) 70 env.assertEqual(['f', 'hello world'], 71 slave.execute_command('ft.get', 'idx', 'doc%d' % i)) 72 73 # deleting 74 env.assertEqual(1, master.execute_command( 75 'ft.del', 'idx', 'doc%d' % i)) 76 77 checkSlaveSynced(env, slave, ('ft.get', 'idx', 'doc9'), None, time_out=20) 78 79 for i in range(10): 80 # checking for deletion 81 env.assertEqual(None, 82 master.execute_command('ft.get', 'idx', 'doc%d' % i)) 83 env.assertEqual(None, 84 slave.execute_command('ft.get', 'idx', 'doc%d' % i)) 85 86def testDropReplicate(): 87 env = Env(useSlaves=True, forceTcp=True) 88 89 env.skipOnCluster() 90 91 ## on existing env we can not get a slave connection 92 ## so we can no test it 93 if env.env == 'existing-env': 94 env.skip() 95 96 master = env.getConnection() 97 slave = env.getSlaveConnection() 98 env.assertTrue(master.execute_command("ping")) 99 env.assertTrue(slave.execute_command("ping")) 100 101 ''' 102 This test first creates documents 103 Next, it creates an index so all documents are scanned into the index 104 At last the index is dropped right away, before all documents have been indexed. 105 106 The text checks consistency between master and slave. 107 ''' 108 for j in range(100): 109 geo = '1.23456,' + str(float(j) / 100) 110 master.execute_command('HSET', 'doc%d' % j, 't', 'hello%d' % j, 'tg', 'world%d' % j, 'n', j, 'g', geo) 111 112 # test for FT.DROPINDEX 113 master.execute_command('WAIT', 1, 1000) 114 master.execute_command('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT', 'n', 'NUMERIC', 'tg', 'TAG', 'g', 'GEO') 115 master.execute_command('FT.DROPINDEX', 'idx', 'DD') 116 master.execute_command('WAIT', 1, 1000) 117 118 # check that same docs were deleted by master and slave 119 master_keys = sorted(master.execute_command('KEYS', '*')) 120 slave_keys = sorted(slave.execute_command('KEYS', '*')) 121 env.assertEqual(len(master_keys), len(slave_keys)) 122 env.assertEqual(master_keys, slave_keys) 123 124 # show the different documents mostly for test debug info 125 master_set = set(master_keys) 126 slave_set = set(slave_keys) 127 env.assertEqual(master_set.difference(slave_set), set([])) 128 env.assertEqual(slave_set.difference(master_set), set([])) 129 130 # test for FT.DROP 131 master.execute_command('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT', 'n', 'NUMERIC', 'tg', 'TAG', 'g', 'GEO') 132 time.sleep(0.001) 133 master.execute_command('FT.DROP', 'idx') 134 135 # check that same docs were deleted by master and slave 136 time.sleep(0.01) 137 master_keys = sorted(master.execute_command('KEYS', '*')) 138 slave_keys = sorted(slave.execute_command('KEYS', '*')) 139 env.assertEqual(len(master_keys), len(slave_keys)) 140 env.assertEqual(master_keys, slave_keys) 141 142 # show the different documents mostly for test debug info 143 master_set = set(master_keys) 144 slave_set = set(slave_keys) 145 env.assertEqual(master_set.difference(slave_set), set([])) 146 env.assertEqual(slave_set.difference(master_set), set([])) 147