1# -*- coding: utf-8 -*- 2 3# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17import contextlib 18import threading 19 20from kazoo.protocol import paths as k_paths 21from kazoo.recipe import watchers 22from oslo_serialization import jsonutils 23from oslo_utils import uuidutils 24import six 25import testtools 26from zake import fake_client 27from zake import utils as zake_utils 28 29from taskflow import exceptions as excp 30from taskflow.jobs.backends import impl_zookeeper 31from taskflow import states 32from taskflow import test 33from taskflow.test import mock 34from taskflow.tests.unit.jobs import base 35from taskflow.tests import utils as test_utils 36from taskflow.types import entity 37from taskflow.utils import kazoo_utils 38from taskflow.utils import misc 39from taskflow.utils import persistence_utils as p_utils 40 41FLUSH_PATH_TPL = '/taskflow/flush-test/%s' 42TEST_PATH_TPL = '/taskflow/board-test/%s' 43ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available( 44 impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION) 45TRASH_FOLDER = impl_zookeeper.ZookeeperJobBoard.TRASH_FOLDER 46LOCK_POSTFIX = impl_zookeeper.ZookeeperJobBoard.LOCK_POSTFIX 47 48 49class ZookeeperBoardTestMixin(base.BoardTestMixin): 50 def close_client(self, client): 51 kazoo_utils.finalize_client(client) 52 53 @contextlib.contextmanager 54 def flush(self, client, path=None): 55 # This uses the linearity guarantee of zookeeper (and associated 56 # libraries) to create a temporary node, wait until a watcher notifies 57 # it's created, then yield back for more work, and then at the end of 58 # that work delete the created node. This ensures that the operations 59 # done in the yield of this context manager will be applied and all 60 # watchers will have fired before this context manager exits. 61 if not path: 62 path = FLUSH_PATH_TPL % uuidutils.generate_uuid() 63 created = threading.Event() 64 deleted = threading.Event() 65 66 def on_created(data, stat): 67 if stat is not None: 68 created.set() 69 return False # cause this watcher to cease to exist 70 71 def on_deleted(data, stat): 72 if stat is None: 73 deleted.set() 74 return False # cause this watcher to cease to exist 75 76 watchers.DataWatch(client, path, func=on_created) 77 client.create(path, makepath=True) 78 if not created.wait(test_utils.WAIT_TIMEOUT): 79 raise RuntimeError("Could not receive creation of %s in" 80 " the alloted timeout of %s seconds" 81 % (path, test_utils.WAIT_TIMEOUT)) 82 try: 83 yield 84 finally: 85 watchers.DataWatch(client, path, func=on_deleted) 86 client.delete(path, recursive=True) 87 if not deleted.wait(test_utils.WAIT_TIMEOUT): 88 raise RuntimeError("Could not receive deletion of %s in" 89 " the alloted timeout of %s seconds" 90 % (path, test_utils.WAIT_TIMEOUT)) 91 92 def test_posting_no_post(self): 93 with base.connect_close(self.board): 94 with mock.patch.object(self.client, 'create') as create_func: 95 create_func.side_effect = IOError("Unable to post") 96 self.assertRaises(IOError, self.board.post, 97 'test', p_utils.temporary_log_book()) 98 self.assertEqual(0, self.board.job_count) 99 100 def test_board_iter(self): 101 with base.connect_close(self.board): 102 it = self.board.iterjobs() 103 self.assertEqual(self.board, it.board) 104 self.assertFalse(it.only_unclaimed) 105 self.assertFalse(it.ensure_fresh) 106 107 @mock.patch("taskflow.jobs.backends.impl_zookeeper.misc." 108 "millis_to_datetime") 109 def test_posting_dates(self, mock_dt): 110 epoch = misc.millis_to_datetime(0) 111 mock_dt.return_value = epoch 112 113 with base.connect_close(self.board): 114 j = self.board.post('test', p_utils.temporary_log_book()) 115 self.assertEqual(epoch, j.created_on) 116 self.assertEqual(epoch, j.last_modified) 117 118 self.assertTrue(mock_dt.called) 119 120 121@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available') 122class ZookeeperJobboardTest(test.TestCase, ZookeeperBoardTestMixin): 123 def create_board(self, persistence=None): 124 125 def cleanup_path(client, path): 126 if not client.connected: 127 return 128 client.delete(path, recursive=True) 129 130 client = kazoo_utils.make_client(test_utils.ZK_TEST_CONFIG.copy()) 131 path = TEST_PATH_TPL % (uuidutils.generate_uuid()) 132 board = impl_zookeeper.ZookeeperJobBoard('test-board', {'path': path}, 133 client=client, 134 persistence=persistence) 135 self.addCleanup(self.close_client, client) 136 self.addCleanup(cleanup_path, client, path) 137 self.addCleanup(board.close) 138 return (client, board) 139 140 def setUp(self): 141 super(ZookeeperJobboardTest, self).setUp() 142 self.client, self.board = self.create_board() 143 144 145class ZakeJobboardTest(test.TestCase, ZookeeperBoardTestMixin): 146 def create_board(self, persistence=None): 147 client = fake_client.FakeClient() 148 board = impl_zookeeper.ZookeeperJobBoard('test-board', {}, 149 client=client, 150 persistence=persistence) 151 self.addCleanup(board.close) 152 self.addCleanup(self.close_client, client) 153 return (client, board) 154 155 def setUp(self): 156 super(ZakeJobboardTest, self).setUp() 157 self.client, self.board = self.create_board() 158 self.bad_paths = [self.board.path, self.board.trash_path] 159 self.bad_paths.extend(zake_utils.partition_path(self.board.path)) 160 161 def test_posting_owner_lost(self): 162 163 with base.connect_close(self.board): 164 with self.flush(self.client): 165 j = self.board.post('test', p_utils.temporary_log_book()) 166 self.assertEqual(states.UNCLAIMED, j.state) 167 with self.flush(self.client): 168 self.board.claim(j, self.board.name) 169 self.assertEqual(states.CLAIMED, j.state) 170 171 # Forcefully delete the owner from the backend storage to make 172 # sure the job becomes unclaimed (this may happen if some admin 173 # manually deletes the lock). 174 paths = list(six.iteritems(self.client.storage.paths)) 175 for (path, value) in paths: 176 if path in self.bad_paths: 177 continue 178 if path.endswith('lock'): 179 value['data'] = misc.binary_encode(jsonutils.dumps({})) 180 self.assertEqual(states.UNCLAIMED, j.state) 181 182 def test_posting_state_lock_lost(self): 183 184 with base.connect_close(self.board): 185 with self.flush(self.client): 186 j = self.board.post('test', p_utils.temporary_log_book()) 187 self.assertEqual(states.UNCLAIMED, j.state) 188 with self.flush(self.client): 189 self.board.claim(j, self.board.name) 190 self.assertEqual(states.CLAIMED, j.state) 191 192 # Forcefully delete the lock from the backend storage to make 193 # sure the job becomes unclaimed (this may happen if some admin 194 # manually deletes the lock). 195 paths = list(six.iteritems(self.client.storage.paths)) 196 for (path, value) in paths: 197 if path in self.bad_paths: 198 continue 199 if path.endswith("lock"): 200 self.client.storage.pop(path) 201 self.assertEqual(states.UNCLAIMED, j.state) 202 203 def test_trashing_claimed_job(self): 204 205 with base.connect_close(self.board): 206 with self.flush(self.client): 207 j = self.board.post('test', p_utils.temporary_log_book()) 208 self.assertEqual(states.UNCLAIMED, j.state) 209 with self.flush(self.client): 210 self.board.claim(j, self.board.name) 211 self.assertEqual(states.CLAIMED, j.state) 212 213 with self.flush(self.client): 214 self.board.trash(j, self.board.name) 215 216 trashed = [] 217 jobs = [] 218 paths = list(six.iteritems(self.client.storage.paths)) 219 for (path, value) in paths: 220 if path in self.bad_paths: 221 continue 222 if path.find(TRASH_FOLDER) > -1: 223 trashed.append(path) 224 elif (path.find(self.board._job_base) > -1 225 and not path.endswith(LOCK_POSTFIX)): 226 jobs.append(path) 227 228 self.assertEqual(1, len(trashed)) 229 self.assertEqual(0, len(jobs)) 230 231 def test_posting_received_raw(self): 232 book = p_utils.temporary_log_book() 233 234 with base.connect_close(self.board): 235 self.assertTrue(self.board.connected) 236 self.assertEqual(0, self.board.job_count) 237 posted_job = self.board.post('test', book) 238 239 self.assertEqual(self.board, posted_job.board) 240 self.assertEqual(1, self.board.job_count) 241 self.assertIn(posted_job.uuid, [j.uuid 242 for j in self.board.iterjobs()]) 243 244 # Remove paths that got created due to the running process that we are 245 # not interested in... 246 paths = {} 247 for (path, data) in six.iteritems(self.client.storage.paths): 248 if path in self.bad_paths: 249 continue 250 paths[path] = data 251 252 # Check the actual data that was posted. 253 self.assertEqual(1, len(paths)) 254 path_key = list(six.iterkeys(paths))[0] 255 self.assertTrue(len(paths[path_key]['data']) > 0) 256 self.assertDictEqual({ 257 'uuid': posted_job.uuid, 258 'name': posted_job.name, 259 'book': { 260 'name': book.name, 261 'uuid': book.uuid, 262 }, 263 'priority': 'NORMAL', 264 'details': {}, 265 }, jsonutils.loads(misc.binary_decode(paths[path_key]['data']))) 266 267 def test_register_entity(self): 268 conductor_name = "conductor-abc@localhost:4123" 269 entity_instance = entity.Entity("conductor", 270 conductor_name, 271 {}) 272 with base.connect_close(self.board): 273 self.board.register_entity(entity_instance) 274 # Check '.entity' node has been created 275 self.assertTrue(self.board.entity_path in self.client.storage.paths) 276 277 conductor_entity_path = k_paths.join(self.board.entity_path, 278 'conductor', 279 conductor_name) 280 self.assertTrue(conductor_entity_path in self.client.storage.paths) 281 conductor_data = ( 282 self.client.storage.paths[conductor_entity_path]['data']) 283 self.assertTrue(len(conductor_data) > 0) 284 self.assertDictEqual({ 285 'name': conductor_name, 286 'kind': 'conductor', 287 'metadata': {}, 288 }, jsonutils.loads(misc.binary_decode(conductor_data))) 289 290 entity_instance_2 = entity.Entity("non-sense", 291 "other_name", 292 {}) 293 with base.connect_close(self.board): 294 self.assertRaises(excp.NotImplementedError, 295 self.board.register_entity, 296 entity_instance_2) 297