1#! /usr/bin/env python
2"""
3This script tests the waypoints actuator, both the data and service api
4"""
5import logging
6
7import sys
8import math
9from morse.testing.testing import MorseTestCase
10from pymorse import Morse, MorseServicePreempted
11
12logger = logging.getLogger("morsetesting.general")
13# Include this import to be able to use your test file as a regular
14# builder script, ie, usable with: 'morse [run|exec] base_testing.py
15try:
16    from morse.builder import *
17except ImportError:
18    pass
19
20class Waypoints_Test(MorseTestCase):
21    def setUpEnv(self):
22        """ Defines the test scenario, using the Builder API.
23        """
24
25        robot = ATRV("robot")
26
27        pose = Pose('pose')
28        pose.translate(z=-0.10) # atrv body
29        robot.append(pose)
30        pose.add_stream('socket')
31
32        motion = Waypoint('motion')
33        robot.append(motion)
34        motion.add_stream('socket')
35        motion.add_service('socket')
36
37
38        env = Environment('empty', fastmode = True)
39        env.add_service('socket')
40
41    def test_waypoint_datastream(self):
42        """ This test is guaranteed to be started only when the simulator
43        is ready.
44        """
45        with Morse() as simu:
46
47            # Read the start position, it must be (0.0, 0.0, 0.0)
48            pose_stream = simu.robot.pose
49            pose = pose_stream.get()
50            for key, coord in pose.items():
51                if key != 'timestamp':
52                    self.assertAlmostEqual(coord, 0.0, delta=0.02)
53
54            # waypoint controller
55            motion = simu.robot.motion
56            motion.publish({'x' : 4.0, 'y': 2.0, 'z': 0.0,
57                            'tolerance' : 0.5,
58                            'speed' : 1.0})
59            simu.sleep(10)
60
61            pose = pose_stream.get()
62            self.assertAlmostEqual(pose['x'], 4.0, delta=0.5)
63            self.assertAlmostEqual(pose['y'], 2.0, delta=0.5)
64
65
66            # test tolerance parameter
67            motion.publish({'x' : 0.0, 'y': 0.0, 'z': 0.0,
68                            'tolerance' : 1.0,
69                            'speed' : 1.0})
70            simu.sleep(10)
71            pose = pose_stream.get()
72            distance_goal = math.sqrt( pose['x'] * pose['x'] + pose['y'] * pose['y'])
73            self.assertLess(distance_goal, 1.0)
74            self.assertGreater(distance_goal, 0.5)
75
76    def test_waypoint_services(self):
77
78        with Morse() as simu:
79            # Read the start position, it must be (0.0, 0.0, 0.0)
80            pose_stream = simu.robot.pose
81            pose = pose_stream.get()
82
83            for key, coord in pose.items():
84                if key != 'timestamp':
85                    self.assertAlmostEqual(coord, 0.0, delta=0.02)
86
87            logger.info("Moving 2m ahead...")
88
89            simu.robot.motion.goto(2.0, 0.0, 0.0, 0.1, 1.0).result() # wait for completion
90
91            pose = pose_stream.get()
92            self.assertAlmostEqual(pose['x'], 2.0, delta=0.1)
93            self.assertAlmostEqual(pose['y'], 0.0, delta=0.1)
94            logger.info("Ok.")
95
96            action = simu.robot.motion.goto(4.0, 0.0, 0.0, 0.1, 1.0) # do not wait for completion
97            logger.info("Moving for 1 sec...")
98            simu.sleep(1)
99
100            pose = pose_stream.get() #should have done 1m
101            self.assertAlmostEqual(pose['x'], 3.0, delta=0.15)
102            logger.info("Ok, reached correct position")
103
104            self.assertTrue(action.running())
105            self.assertFalse(action.done())
106
107            logger.info("Cancelling motion and waiting for 0.5 sec...")
108            action.cancel()
109            simu.sleep(0.1)
110
111            self.assertFalse(action.running())
112            self.assertTrue(action.done())
113
114            with self.assertRaises(MorseServicePreempted):
115                action.result()
116
117            simu.sleep(0.5)
118            pose = pose_stream.get() #should not have moved
119            self.assertAlmostEqual(pose['x'], 3.0, delta=0.15)
120            logger.info("Ok, did not move")
121
122            logger.info("Moving again, waiting for 2 sec, and ensuring the action terminate")
123            action = simu.robot.motion.goto(4.0, 0.0, 0.0, 0.1, 1.0) # do not wait for completion
124            simu.sleep(2)
125            self.assertTrue(action.done())
126            self.assertFalse(action.running())
127
128            # Stop will stop the robot, but do not erase current goal
129            action = simu.robot.motion.goto(6.0, 0.0, 0.0, 0.1, 1.0) # do not wait for completion
130            logger.info("Moving for 1 sec...")
131            simu.sleep(1)
132
133            self.assertFalse(action.done())
134            self.assertTrue(action.running())
135
136            status = simu.robot.motion.get_status().result()
137            self.assertEqual(status, "Transit")
138
139            simu.robot.motion.stop().result()
140
141            # Stop does not change the fact that the goto is pending,
142            # but stop the move
143            self.assertFalse(action.done())
144            self.assertTrue(action.running())
145
146            status = simu.robot.motion.get_status().result()
147            self.assertEqual(status, "Stop")
148
149            simu.sleep(0.2)
150
151            pose = pose_stream.get()
152            _x = pose['x']
153            self.assertAlmostEqual(pose['x'], 5.2, delta=0.20)
154
155            simu.sleep(0.5)
156            pose = pose_stream.get() #should not have moved
157            self.assertAlmostEqual(pose['x'], _x, delta=0.03)
158            logger.info("Ok, did not move")
159
160            # now resume the move
161
162            simu.robot.motion.resume().result()
163
164            simu.sleep(0.5)
165
166            # must move now
167            pose = pose_stream.get()
168            self.assertAlmostEqual(pose['x'], 5.7, delta=0.20)
169            status = simu.robot.motion.get_status().result()
170            self.assertEqual(status, "Transit")
171
172            # wait for the end of the move
173            simu.sleep(1.5)
174            self.assertTrue(action.done())
175            self.assertFalse(action.running())
176
177            pose = pose_stream.get()
178            self.assertAlmostEqual(pose['x'], 6.0, delta=0.15)
179            status = simu.robot.motion.get_status().result()
180            self.assertEqual(status, "Arrived")
181
182
183########################## Run these tests ##########################
184if __name__ == "__main__":
185    from morse.testing.testing import main
186    main(Waypoints_Test)
187