1import subprocess
2import signal
3import os
4import os.path
5from RLTest import Env
6import time
7import random
8from includes import *
9from common import waitForIndex
10
11
12class TimeoutException(Exception):
13  pass
14
15class TimeLimit(object):
16    """
17    A context manager that fires a TimeExpired exception if it does not
18    return within the specified amount of time.
19    """
20    def __init__(self, timeout):
21        self.timeout = timeout
22    def __enter__(self):
23        signal.signal(signal.SIGALRM, self.handler)
24        signal.setitimer(signal.ITIMER_REAL, self.timeout, 0)
25    def __exit__(self, exc_type, exc_value, traceback):
26        signal.setitimer(signal.ITIMER_REAL, 0)
27        signal.signal(signal.SIGALRM, signal.SIG_DFL)
28    def handler(self, signum, frame):
29        raise TimeoutException()
30
31def checkSlaveSynced(env, slaveConn, command, expected_result, time_out=5):
32  try:
33    with TimeLimit(time_out):
34      res = slaveConn.execute_command(*command)
35      while res != expected_result:
36        res = slaveConn.execute_command(*command)
37  except TimeoutException:
38    env.assertTrue(False, message='Failed waiting for command to be executed on slave')
39  except Exception as e:
40    env.assertTrue(False, message=e.message)
41
42def testDelReplicate():
43  env = Env(useSlaves=True, forceTcp=True)
44
45  env.skipOnCluster()
46
47  ## on existing env we can not get a slave connection
48  ## so we can no test it
49  if env.env == 'existing-env':
50        env.skip()
51
52  master = env.getConnection()
53  slave = env.getSlaveConnection()
54  env.assertTrue(master.execute_command("ping"))
55  env.assertTrue(slave.execute_command("ping"))
56  env.assertOk(master.execute_command('ft.create', 'idx', 'ON', 'HASH', 'FILTER', 'startswith(@__key, "")', 'schema', 'f', 'text'))
57  env.cmd('set', 'indicator', '1')
58  checkSlaveSynced(env, slave, ('exists', 'indicator'), 1, time_out=20)
59
60  for i in range(10):
61    master.execute_command('ft.add', 'idx', 'doc%d' % i, 1.0, 'fields',
62                                      'f', 'hello world')
63  time.sleep(0.01)
64  checkSlaveSynced(env, slave, ('ft.get', 'idx', 'doc9'), ['f', 'hello world'], time_out=20)
65
66  for i in range(10):
67    # checking for insertion
68    env.assertEqual(['f', 'hello world'],
69      master.execute_command('ft.get', 'idx', 'doc%d' % i))
70    env.assertEqual(['f', 'hello world'],
71      slave.execute_command('ft.get', 'idx', 'doc%d' % i))
72
73    # deleting
74    env.assertEqual(1, master.execute_command(
75          'ft.del', 'idx', 'doc%d' % i))
76
77  checkSlaveSynced(env, slave, ('ft.get', 'idx', 'doc9'), None, time_out=20)
78
79  for i in range(10):
80    # checking for deletion
81    env.assertEqual(None,
82      master.execute_command('ft.get', 'idx', 'doc%d' % i))
83    env.assertEqual(None,
84      slave.execute_command('ft.get', 'idx', 'doc%d' % i))
85
86def testDropReplicate():
87  env = Env(useSlaves=True, forceTcp=True)
88
89  env.skipOnCluster()
90
91  ## on existing env we can not get a slave connection
92  ## so we can no test it
93  if env.env == 'existing-env':
94        env.skip()
95
96  master = env.getConnection()
97  slave = env.getSlaveConnection()
98  env.assertTrue(master.execute_command("ping"))
99  env.assertTrue(slave.execute_command("ping"))
100
101  '''
102  This test first creates documents
103  Next, it creates an index so all documents are scanned into the index
104  At last the index is dropped right away, before all documents have been indexed.
105
106  The text checks consistency between master and slave.
107  '''
108  for j in range(100):
109    geo = '1.23456,' + str(float(j) / 100)
110    master.execute_command('HSET', 'doc%d' % j, 't', 'hello%d' % j, 'tg', 'world%d' % j, 'n', j, 'g', geo)
111
112  # test for FT.DROPINDEX
113  master.execute_command('WAIT', 1, 1000)
114  master.execute_command('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT', 'n', 'NUMERIC', 'tg', 'TAG', 'g', 'GEO')
115  master.execute_command('FT.DROPINDEX', 'idx', 'DD')
116  master.execute_command('WAIT', 1, 1000)
117
118  # check that same docs were deleted by master and slave
119  master_keys = sorted(master.execute_command('KEYS', '*'))
120  slave_keys = sorted(slave.execute_command('KEYS', '*'))
121  env.assertEqual(len(master_keys), len(slave_keys))
122  env.assertEqual(master_keys, slave_keys)
123
124  # show the different documents mostly for test debug info
125  master_set = set(master_keys)
126  slave_set = set(slave_keys)
127  env.assertEqual(master_set.difference(slave_set), set([]))
128  env.assertEqual(slave_set.difference(master_set), set([]))
129
130  # test for FT.DROP
131  master.execute_command('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT', 'n', 'NUMERIC', 'tg', 'TAG', 'g', 'GEO')
132  time.sleep(0.001)
133  master.execute_command('FT.DROP', 'idx')
134
135  # check that same docs were deleted by master and slave
136  time.sleep(0.01)
137  master_keys = sorted(master.execute_command('KEYS', '*'))
138  slave_keys = sorted(slave.execute_command('KEYS', '*'))
139  env.assertEqual(len(master_keys), len(slave_keys))
140  env.assertEqual(master_keys, slave_keys)
141
142  # show the different documents mostly for test debug info
143  master_set = set(master_keys)
144  slave_set = set(slave_keys)
145  env.assertEqual(master_set.difference(slave_set), set([]))
146  env.assertEqual(slave_set.difference(master_set), set([]))
147