1# -*- coding: utf-8 -*-
2
3#    Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16
17import contextlib
18import threading
19
20from kazoo.protocol import paths as k_paths
21from kazoo.recipe import watchers
22from oslo_serialization import jsonutils
23from oslo_utils import uuidutils
24import six
25import testtools
26from zake import fake_client
27from zake import utils as zake_utils
28
29from taskflow import exceptions as excp
30from taskflow.jobs.backends import impl_zookeeper
31from taskflow import states
32from taskflow import test
33from taskflow.test import mock
34from taskflow.tests.unit.jobs import base
35from taskflow.tests import utils as test_utils
36from taskflow.types import entity
37from taskflow.utils import kazoo_utils
38from taskflow.utils import misc
39from taskflow.utils import persistence_utils as p_utils
40
41FLUSH_PATH_TPL = '/taskflow/flush-test/%s'
42TEST_PATH_TPL = '/taskflow/board-test/%s'
43ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available(
44    impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION)
45TRASH_FOLDER = impl_zookeeper.ZookeeperJobBoard.TRASH_FOLDER
46LOCK_POSTFIX = impl_zookeeper.ZookeeperJobBoard.LOCK_POSTFIX
47
48
49class ZookeeperBoardTestMixin(base.BoardTestMixin):
50    def close_client(self, client):
51        kazoo_utils.finalize_client(client)
52
53    @contextlib.contextmanager
54    def flush(self, client, path=None):
55        # This uses the linearity guarantee of zookeeper (and associated
56        # libraries) to create a temporary node, wait until a watcher notifies
57        # it's created, then yield back for more work, and then at the end of
58        # that work delete the created node. This ensures that the operations
59        # done in the yield of this context manager will be applied and all
60        # watchers will have fired before this context manager exits.
61        if not path:
62            path = FLUSH_PATH_TPL % uuidutils.generate_uuid()
63        created = threading.Event()
64        deleted = threading.Event()
65
66        def on_created(data, stat):
67            if stat is not None:
68                created.set()
69                return False  # cause this watcher to cease to exist
70
71        def on_deleted(data, stat):
72            if stat is None:
73                deleted.set()
74                return False  # cause this watcher to cease to exist
75
76        watchers.DataWatch(client, path, func=on_created)
77        client.create(path, makepath=True)
78        if not created.wait(test_utils.WAIT_TIMEOUT):
79            raise RuntimeError("Could not receive creation of %s in"
80                               " the alloted timeout of %s seconds"
81                               % (path, test_utils.WAIT_TIMEOUT))
82        try:
83            yield
84        finally:
85            watchers.DataWatch(client, path, func=on_deleted)
86            client.delete(path, recursive=True)
87            if not deleted.wait(test_utils.WAIT_TIMEOUT):
88                raise RuntimeError("Could not receive deletion of %s in"
89                                   " the alloted timeout of %s seconds"
90                                   % (path, test_utils.WAIT_TIMEOUT))
91
92    def test_posting_no_post(self):
93        with base.connect_close(self.board):
94            with mock.patch.object(self.client, 'create') as create_func:
95                create_func.side_effect = IOError("Unable to post")
96                self.assertRaises(IOError, self.board.post,
97                                  'test', p_utils.temporary_log_book())
98            self.assertEqual(0, self.board.job_count)
99
100    def test_board_iter(self):
101        with base.connect_close(self.board):
102            it = self.board.iterjobs()
103            self.assertEqual(self.board, it.board)
104            self.assertFalse(it.only_unclaimed)
105            self.assertFalse(it.ensure_fresh)
106
107    @mock.patch("taskflow.jobs.backends.impl_zookeeper.misc."
108                "millis_to_datetime")
109    def test_posting_dates(self, mock_dt):
110        epoch = misc.millis_to_datetime(0)
111        mock_dt.return_value = epoch
112
113        with base.connect_close(self.board):
114            j = self.board.post('test', p_utils.temporary_log_book())
115            self.assertEqual(epoch, j.created_on)
116            self.assertEqual(epoch, j.last_modified)
117
118        self.assertTrue(mock_dt.called)
119
120
121@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
122class ZookeeperJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
123    def create_board(self, persistence=None):
124
125        def cleanup_path(client, path):
126            if not client.connected:
127                return
128            client.delete(path, recursive=True)
129
130        client = kazoo_utils.make_client(test_utils.ZK_TEST_CONFIG.copy())
131        path = TEST_PATH_TPL % (uuidutils.generate_uuid())
132        board = impl_zookeeper.ZookeeperJobBoard('test-board', {'path': path},
133                                                 client=client,
134                                                 persistence=persistence)
135        self.addCleanup(self.close_client, client)
136        self.addCleanup(cleanup_path, client, path)
137        self.addCleanup(board.close)
138        return (client, board)
139
140    def setUp(self):
141        super(ZookeeperJobboardTest, self).setUp()
142        self.client, self.board = self.create_board()
143
144
145class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin):
146    def create_board(self, persistence=None):
147        client = fake_client.FakeClient()
148        board = impl_zookeeper.ZookeeperJobBoard('test-board', {},
149                                                 client=client,
150                                                 persistence=persistence)
151        self.addCleanup(board.close)
152        self.addCleanup(self.close_client, client)
153        return (client, board)
154
155    def setUp(self):
156        super(ZakeJobboardTest, self).setUp()
157        self.client, self.board = self.create_board()
158        self.bad_paths = [self.board.path, self.board.trash_path]
159        self.bad_paths.extend(zake_utils.partition_path(self.board.path))
160
161    def test_posting_owner_lost(self):
162
163        with base.connect_close(self.board):
164            with self.flush(self.client):
165                j = self.board.post('test', p_utils.temporary_log_book())
166            self.assertEqual(states.UNCLAIMED, j.state)
167            with self.flush(self.client):
168                self.board.claim(j, self.board.name)
169            self.assertEqual(states.CLAIMED, j.state)
170
171            # Forcefully delete the owner from the backend storage to make
172            # sure the job becomes unclaimed (this may happen if some admin
173            # manually deletes the lock).
174            paths = list(six.iteritems(self.client.storage.paths))
175            for (path, value) in paths:
176                if path in self.bad_paths:
177                    continue
178                if path.endswith('lock'):
179                    value['data'] = misc.binary_encode(jsonutils.dumps({}))
180            self.assertEqual(states.UNCLAIMED, j.state)
181
182    def test_posting_state_lock_lost(self):
183
184        with base.connect_close(self.board):
185            with self.flush(self.client):
186                j = self.board.post('test', p_utils.temporary_log_book())
187            self.assertEqual(states.UNCLAIMED, j.state)
188            with self.flush(self.client):
189                self.board.claim(j, self.board.name)
190            self.assertEqual(states.CLAIMED, j.state)
191
192            # Forcefully delete the lock from the backend storage to make
193            # sure the job becomes unclaimed (this may happen if some admin
194            # manually deletes the lock).
195            paths = list(six.iteritems(self.client.storage.paths))
196            for (path, value) in paths:
197                if path in self.bad_paths:
198                    continue
199                if path.endswith("lock"):
200                    self.client.storage.pop(path)
201            self.assertEqual(states.UNCLAIMED, j.state)
202
203    def test_trashing_claimed_job(self):
204
205        with base.connect_close(self.board):
206            with self.flush(self.client):
207                j = self.board.post('test', p_utils.temporary_log_book())
208            self.assertEqual(states.UNCLAIMED, j.state)
209            with self.flush(self.client):
210                self.board.claim(j, self.board.name)
211            self.assertEqual(states.CLAIMED, j.state)
212
213            with self.flush(self.client):
214                self.board.trash(j, self.board.name)
215
216            trashed = []
217            jobs = []
218            paths = list(six.iteritems(self.client.storage.paths))
219            for (path, value) in paths:
220                if path in self.bad_paths:
221                    continue
222                if path.find(TRASH_FOLDER) > -1:
223                    trashed.append(path)
224                elif (path.find(self.board._job_base) > -1
225                        and not path.endswith(LOCK_POSTFIX)):
226                    jobs.append(path)
227
228            self.assertEqual(1, len(trashed))
229            self.assertEqual(0, len(jobs))
230
231    def test_posting_received_raw(self):
232        book = p_utils.temporary_log_book()
233
234        with base.connect_close(self.board):
235            self.assertTrue(self.board.connected)
236            self.assertEqual(0, self.board.job_count)
237            posted_job = self.board.post('test', book)
238
239            self.assertEqual(self.board, posted_job.board)
240            self.assertEqual(1, self.board.job_count)
241            self.assertIn(posted_job.uuid, [j.uuid
242                                            for j in self.board.iterjobs()])
243
244        # Remove paths that got created due to the running process that we are
245        # not interested in...
246        paths = {}
247        for (path, data) in six.iteritems(self.client.storage.paths):
248            if path in self.bad_paths:
249                continue
250            paths[path] = data
251
252        # Check the actual data that was posted.
253        self.assertEqual(1, len(paths))
254        path_key = list(six.iterkeys(paths))[0]
255        self.assertTrue(len(paths[path_key]['data']) > 0)
256        self.assertDictEqual({
257            'uuid': posted_job.uuid,
258            'name': posted_job.name,
259            'book': {
260                'name': book.name,
261                'uuid': book.uuid,
262            },
263            'priority': 'NORMAL',
264            'details': {},
265        }, jsonutils.loads(misc.binary_decode(paths[path_key]['data'])))
266
267    def test_register_entity(self):
268        conductor_name = "conductor-abc@localhost:4123"
269        entity_instance = entity.Entity("conductor",
270                                        conductor_name,
271                                        {})
272        with base.connect_close(self.board):
273            self.board.register_entity(entity_instance)
274        # Check '.entity' node has been created
275        self.assertTrue(self.board.entity_path in self.client.storage.paths)
276
277        conductor_entity_path = k_paths.join(self.board.entity_path,
278                                             'conductor',
279                                             conductor_name)
280        self.assertTrue(conductor_entity_path in self.client.storage.paths)
281        conductor_data = (
282            self.client.storage.paths[conductor_entity_path]['data'])
283        self.assertTrue(len(conductor_data) > 0)
284        self.assertDictEqual({
285            'name': conductor_name,
286            'kind': 'conductor',
287            'metadata': {},
288        }, jsonutils.loads(misc.binary_decode(conductor_data)))
289
290        entity_instance_2 = entity.Entity("non-sense",
291                                          "other_name",
292                                          {})
293        with base.connect_close(self.board):
294            self.assertRaises(excp.NotImplementedError,
295                              self.board.register_entity,
296                              entity_instance_2)
297