1import mdp.parallel as parallel
2from _tools import *
3
4requires_parallel_python = skip_on_condition(
5    "not mdp.config.has_parallel_python",
6    "This test requires Parallel Python")
7
8
9@requires_parallel_python
10def test_reverse_patching():
11    # revert pp patching
12    # XXX This is needed to avoid failures of the other
13    # XXX pp tests when run more then once in the same interpreter
14    # XXX session
15    if hasattr(mdp.config, 'pp_monkeypatch_dirname'):
16        import pp
17        pp._Worker.command = mdp._pp_worker_command[:]
18        parallel.pp_support._monkeypatch_pp(mdp.config.pp_monkeypatch_dirname)
19
20
21@requires_parallel_python
22def test_simple():
23    """Test local pp scheduling."""
24    scheduler = parallel.pp_support.LocalPPScheduler(ncpus=2,
25                                                     max_queue_length=0,
26                                                     verbose=False)
27    # process jobs
28    for i in range(50):
29        scheduler.add_task(i, parallel.SqrTestCallable())
30    results = scheduler.get_results()
31    scheduler.shutdown()
32    # check result
33    results.sort()
34    results = numx.array(results[:6])
35    assert numx.all(results == numx.array([0,1,4,9,16,25]))
36
37@requires_parallel_python
38def test_scheduler_flow():
39    """Test local pp scheduler with real Nodes."""
40    precision = 10**-6
41    node1 = mdp.nodes.PCANode(output_dim=20)
42    node2 = mdp.nodes.PolynomialExpansionNode(degree=1)
43    node3 = mdp.nodes.SFANode(output_dim=10)
44    flow = mdp.parallel.ParallelFlow([node1, node2, node3])
45    parallel_flow = mdp.parallel.ParallelFlow(flow.copy()[:])
46    scheduler = parallel.pp_support.LocalPPScheduler(ncpus=3,
47                                                     max_queue_length=0,
48                                                     verbose=False)
49    input_dim = 30
50    scales = numx.linspace(1, 100, num=input_dim)
51    scale_matrix = mdp.numx.diag(scales)
52    train_iterables = [numx.dot(mdp.numx_rand.random((5, 100, input_dim)),
53                             scale_matrix)
54                       for _ in range(3)]
55    parallel_flow.train(train_iterables, scheduler=scheduler)
56    x = mdp.numx.random.random((10, input_dim))
57    # test that parallel execution works as well
58    # note that we need more chungs then processes to test caching
59    parallel_flow.execute([x for _ in range(8)], scheduler=scheduler)
60    scheduler.shutdown()
61    # compare to normal flow
62    flow.train(train_iterables)
63    assert parallel_flow[0].tlen == flow[0].tlen
64    y1 = flow.execute(x)
65    y2 = parallel_flow.execute(x)
66    assert_array_almost_equal(abs(y1 - y2), precision)
67