1import mdp.parallel as parallel 2from _tools import * 3 4requires_parallel_python = skip_on_condition( 5 "not mdp.config.has_parallel_python", 6 "This test requires Parallel Python") 7 8 9@requires_parallel_python 10def test_reverse_patching(): 11 # revert pp patching 12 # XXX This is needed to avoid failures of the other 13 # XXX pp tests when run more then once in the same interpreter 14 # XXX session 15 if hasattr(mdp.config, 'pp_monkeypatch_dirname'): 16 import pp 17 pp._Worker.command = mdp._pp_worker_command[:] 18 parallel.pp_support._monkeypatch_pp(mdp.config.pp_monkeypatch_dirname) 19 20 21@requires_parallel_python 22def test_simple(): 23 """Test local pp scheduling.""" 24 scheduler = parallel.pp_support.LocalPPScheduler(ncpus=2, 25 max_queue_length=0, 26 verbose=False) 27 # process jobs 28 for i in range(50): 29 scheduler.add_task(i, parallel.SqrTestCallable()) 30 results = scheduler.get_results() 31 scheduler.shutdown() 32 # check result 33 results.sort() 34 results = numx.array(results[:6]) 35 assert numx.all(results == numx.array([0,1,4,9,16,25])) 36 37@requires_parallel_python 38def test_scheduler_flow(): 39 """Test local pp scheduler with real Nodes.""" 40 precision = 10**-6 41 node1 = mdp.nodes.PCANode(output_dim=20) 42 node2 = mdp.nodes.PolynomialExpansionNode(degree=1) 43 node3 = mdp.nodes.SFANode(output_dim=10) 44 flow = mdp.parallel.ParallelFlow([node1, node2, node3]) 45 parallel_flow = mdp.parallel.ParallelFlow(flow.copy()[:]) 46 scheduler = parallel.pp_support.LocalPPScheduler(ncpus=3, 47 max_queue_length=0, 48 verbose=False) 49 input_dim = 30 50 scales = numx.linspace(1, 100, num=input_dim) 51 scale_matrix = mdp.numx.diag(scales) 52 train_iterables = [numx.dot(mdp.numx_rand.random((5, 100, input_dim)), 53 scale_matrix) 54 for _ in range(3)] 55 parallel_flow.train(train_iterables, scheduler=scheduler) 56 x = mdp.numx.random.random((10, input_dim)) 57 # test that parallel execution works as well 58 # note that we need more chungs then processes to test caching 59 parallel_flow.execute([x for _ in range(8)], scheduler=scheduler) 60 scheduler.shutdown() 61 # compare to normal flow 62 flow.train(train_iterables) 63 assert parallel_flow[0].tlen == flow[0].tlen 64 y1 = flow.execute(x) 65 y2 = parallel_flow.execute(x) 66 assert_array_almost_equal(abs(y1 - y2), precision) 67