1__author__ = 'Thomas Rueckstiess and Tom Schaul'
2
3from scipy import pi, dot, array, ones, exp
4from scipy.linalg import norm
5
6from pybrain.rl.environments.cartpole.nonmarkovpole import NonMarkovPoleEnvironment
7from pybrain.rl.environments.cartpole.doublepole import DoublePoleEnvironment
8from pybrain.rl.environments import EpisodicTask
9from .cartpole import CartPoleEnvironment
10from pybrain.utilities import crossproduct
11
12
13class BalanceTask(EpisodicTask):
14    """ The task of balancing some pole(s) on a cart """
15    def __init__(self, env=None, maxsteps=1000, desiredValue = 0):
16        """
17        :key env: (optional) an instance of a CartPoleEnvironment (or a subclass thereof)
18        :key maxsteps: maximal number of steps (default: 1000)
19        """
20        self.desiredValue = desiredValue
21        if env == None:
22            env = CartPoleEnvironment()
23        EpisodicTask.__init__(self, env)
24        self.N = maxsteps
25        self.t = 0
26
27        # scale position and angle, don't scale velocities (unknown maximum)
28        self.sensor_limits = [(-3, 3)]
29        for i in range(1, self.outdim):
30            if isinstance(self.env, NonMarkovPoleEnvironment) and i % 2 == 0:
31                self.sensor_limits.append(None)
32            else:
33                self.sensor_limits.append((-pi, pi))
34
35        # self.sensor_limits = [None] * 4
36        # actor between -10 and 10 Newton
37        self.actor_limits = [(-50, 50)]
38
39    def reset(self):
40        EpisodicTask.reset(self)
41        self.t = 0
42
43    def performAction(self, action):
44        self.t += 1
45        EpisodicTask.performAction(self, action)
46
47    def isFinished(self):
48        if max(list(map(abs, self.env.getPoleAngles()))) > 0.7:
49            # pole has fallen
50            return True
51        elif abs(self.env.getCartPosition()) > 2.4:
52            # cart is out of it's border conditions
53            return True
54        elif self.t >= self.N:
55            # maximal timesteps
56            return True
57        return False
58
59    def getReward(self):
60        angles = list(map(abs, self.env.getPoleAngles()))
61        s = abs(self.env.getCartPosition())
62        reward = 0
63        if min(angles) < 0.05 and abs(s) < 0.05:
64            reward = 0
65        elif max(angles) > 0.7 or abs(s) > 2.4:
66            reward = -2 * (self.N - self.t)
67        else:
68            reward = -1
69        return reward
70
71    def setMaxLength(self, n):
72        self.N = n
73
74
75class JustBalanceTask(BalanceTask):
76    """ this task does not require the cart to be moved to the middle. """
77    def getReward(self):
78        angles = list(map(abs, self.env.getPoleAngles()))
79        s = abs(self.env.getCartPosition())
80        if min(angles) < 0.05:
81            reward = 0
82        elif max(angles) > 0.7 or abs(s) > 2.4:
83            reward = -2 * (self.N - self.t)
84        else:
85            reward = -1
86        return reward
87
88
89class EasyBalanceTask(BalanceTask):
90    """ this task is a bit easier to learn because it gives gradual feedback
91        about the distance to the centre. """
92    def getReward(self):
93        angles = list(map(abs, self.env.getPoleAngles()))
94        s = abs(self.env.getCartPosition())
95        if min(angles) < 0.05 and abs(s) < 0.05:
96            reward = 0
97        elif max(angles) > 0.7 or abs(s) > 2.4:
98            reward = -2 * (self.N - self.t)
99        else:
100            reward = -abs(s) / 2
101        return reward
102
103
104class DiscreteBalanceTask(BalanceTask):
105    """ here there are 3 discrete actions, left, right, nothing. """
106
107    numActions = 3
108
109    def __init__(self, env=None, maxsteps=1000):
110        """
111        :key env: (optional) an instance of a CartPoleEnvironment (or a subclass thereof)
112        :key maxsteps: maximal number of steps (default: 1000)
113        """
114        if env == None:
115            env = CartPoleEnvironment()
116        EpisodicTask.__init__(self, env)
117        self.N = maxsteps
118        self.t = 0
119
120        # no scaling of sensors
121        self.sensor_limits = [None] * self.env.outdim
122
123        # scale actor
124        self.actor_limits = [(-50, 50)]
125
126    def getObservation(self):
127        """ a filtered mapping to getSample of the underlying environment. """
128        sensors = self.env.getSensors()
129        if self.sensor_limits:
130            sensors = self.normalize(sensors)
131        return sensors
132
133    def performAction(self, action):
134        action = action - (self.numActions-1)//2.
135        BalanceTask.performAction(self, action)
136
137    def getReward(self):
138        angles = list(map(abs, self.env.getPoleAngles()))
139        s = abs(self.env.getCartPosition())
140        if min(angles) < 0.05: # and abs(s) < 0.05:
141            reward = 1.0
142        elif max(angles) > 0.7 or abs(s) > 2.4:
143            reward = -1. * (self.N - self.t)
144        else:
145            reward = 0
146        return reward
147
148
149class DiscreteNoHelpTask(DiscreteBalanceTask):
150    def getReward(self):
151        angles = list(map(abs, self.env.getPoleAngles()))
152        s = abs(self.env.getCartPosition())
153        if max(angles) > 0.7 or abs(s) > 2.4:
154            reward = -1. * (self.N - self.t)
155        else:
156            reward = 0.0
157        return reward
158
159
160class DiscretePOMDPTask(DiscreteBalanceTask):
161    def __init__(self, env=None, maxsteps=1000):
162        """
163        :key env: (optional) an instance of a CartPoleEnvironment (or a subclass thereof)
164        :key maxsteps: maximal number of steps (default: 1000)
165        """
166        if env == None:
167            env = CartPoleEnvironment()
168        EpisodicTask.__init__(self, env)
169        self.N = maxsteps
170        self.t = 0
171
172        # no scaling of sensors
173        self.sensor_limits = [None] * 2
174
175        # scale actor
176        self.actor_limits = [(-50, 50)]
177
178    @property
179    def outdim(self):
180        return 2
181
182    def getObservation(self):
183        """ a filtered mapping to getSample of the underlying environment. """
184        sensors = [self.env.getSensors()[0], self.env.getSensors()[2]]
185
186        if self.sensor_limits:
187            sensors = self.normalize(sensors)
188        return sensors
189
190
191class LinearizedBalanceTask(BalanceTask):
192    """ Here we follow the setup in
193    Peters J, Vijayakumar S, Schaal S (2003) Reinforcement learning for humanoid robotics.
194    TODO: This stuff is not yet compatible to any other cartpole environment. """
195
196    Q = array([12., 0.25, 1.25, 1.0])
197
198    def getReward(self):
199        return dot(self.env.sensors ** 2, self.Q) + self.env.action[0] ** 2 * 0.01
200
201    def isFinished(self):
202        if abs(self.env.getPoleAngles()[0]) > 0.5235988:  # pi/6
203            # pole has fallen
204            return True
205        elif abs(self.env.getCartPosition()) > 1.5:
206            # cart is out of it's border conditions
207            return True
208        elif self.t >= self.N:
209            # maximal timesteps
210            return True
211        return False
212
213
214class DiscreteBalanceTaskRBF(DiscreteBalanceTask):
215    """ From Lagoudakis & Parr, 2003:
216    With RBF features to generate a 10-dimensional observation (including bias),
217    also no cart-restrictions, no helpful rewards, and a single pole. """
218
219    CENTERS = array(crossproduct([[-pi/4, 0, pi/4], [1, 0, -1]]))
220
221    def getReward(self):
222        angles = list(map(abs, self.env.getPoleAngles()))
223        if max(angles) > 1.6:
224            reward = -1.
225        else:
226            reward = 0.0
227        return reward
228
229    def isFinished(self):
230        if max(list(map(abs, self.env.getPoleAngles()))) > 1.6:
231            return True
232        elif self.t >= self.N:
233            return True
234        return False
235
236    def getObservation(self):
237        res = ones(1+len(self.CENTERS))
238        sensors = self.env.getSensors()[:-2]
239        res[1:] = exp(-array(list(map(norm, self.CENTERS-sensors)))**2/2)
240        return res
241
242    @property
243    def outdim(self):
244        return 1+len(self.CENTERS)
245
246
247class DiscreteDoubleBalanceTaskRBF(DiscreteBalanceTaskRBF):
248    """ Same idea, but two poles. """
249
250    CENTERS = array(crossproduct([[-pi/4, 0, pi/4], [1, 0, -1]]*2))
251
252    def __init__(self, env=None, maxsteps=1000):
253        if env == None:
254            env = DoublePoleEnvironment()
255        DiscreteBalanceTask.__init__(self, env, maxsteps)
256
257