知道一个状态的值或者状态行动对(state-action pair)很有用。这里的值指的是,如果你从某一个状态或者状态行动对开始,一直按照某个策略运行下去最终获得的期望回报。几乎是所有的强化学习方法,都在用不同的形式使用着值函数。
最优行动-值函数和被最优策略选中的行动有重要的联系。从定义上讲, 指的是从一个状态s开始,任意执行一个行动a,然后一直按照最优策略执行下去所获得的回报。
强化学习中,有些时候我们不需要描述一个行动的绝对好坏,而只需要知道它相对于平均水平的优势。也就是说,我们只想知道一个行动的相对优势 。这就是优势函数的概念。
马尔科夫决策过程(Markov Decision Processes, MDPs)。MDP是一个5元组,其中
基于是否使用了Policy、Value fuction、Model 不同的方法组合,强化学习算法可以分为
免模型学习(Model-Free):Model-free就是不去学习和理解环境,环境给出什么信息就是什么信息,常见的方法有policy optimization和Q-learning
我们要从 A 走到 F,每两点之间表示这条路的成本,我们要选择路径让成本越低越好:
在 A 时,可以选的 (B, C, D, E),发现 D 最优,就走到 D,此时,可以选的 (B, C, F),发现 F 最优,就走到 F,此时完成任务。
这个算法就是强化学习的一种,叫做 epsilon greedy,是一种 Policy based 的方法,当然了这个路径并不是最优的走法(最优走法应该是A -> C -> F)。
通过强化学习,一个 agent 可以在探索和开发(exploration and exploitation)之间做权衡,并且选择一个最大的回报。
一般的监督学习算法不考虑这种平衡,就只是是 exploitative。
Yan Lecun认为自监督学习(Self-supervised learning)可以作为强化学习的一种潜在解决方案,因为自监督学习将输入和输出都当成完整系统的一部分,使得它在诸如图像补全,图像迁移,时间序列预测等任务上都非常有效。此外自监督模型的复杂度随着额外反馈信息的加入而增加,可以在很大程度上减少计算过程中人为的干预。
简言之,自监督学习是一种特殊目的的无监督学习。不同于传统的AutoEncoder等方法,仅仅以重构输入为目的,而是希望通过surrogate task学习到和高层语义信息相关联的特征。
在 Flappy bird 这个游戏中,我们需要简单的点击操作来控制小鸟,躲过各种水管,飞的越远越好,因为飞的越远就能获得更高的积分奖励。这就是一个典型的强化学习场景:
,在第 t 时刻,我们让 gt=rt+γrt+1+... 等于 q(st,a)
Q-Learning和DQN都是基于价值的强化学习算法,在给定一个状态下,计算采取每个动作的价值,我们选择有最高Q值(在所有状态下最大的期望奖励)的行动。如果我们省略中间的步骤,即直接根据当前的状态来选择动作,也就引出了强化学习中的另一种很重要的算法,即策略梯度(Policy Gradient,PG)
Policy Gradient算法的优缺点如下:
首先我们需要一个 policy model 来输出动作概率,输出动作概率后,我们 sample() 函数去得到一个具体的动作,然后跟环境交互过后,我们可以得到一整个回合的数据。拿到回合数据之后,我再去执行一下 learn() 函数,在 learn() 函数里面,我就可以拿这些数据去构造损失函数,然后扔给优化器去优化,去更新我的 policy model。
演员-评论家算法(Actor-Critic)是基于策略(Policy Based)和基于价值(Value Based)相结合的方法
从DDPG这个名字看,它是由D(Deep)+D(Deterministic )+ PG(Policy Gradient)组成。
Q Learning 的算法框架是让系统按照策略指引进行探索,在探索每一步都进行状态价值的更新。其中最优行动值函数Q的更新算法如下:
基于 Q-Learning 的方法
为了解决超大状态空间Q-table的存储和提取问题,神经网络技术被引入了强化学习,可以将状态和动作当成神经网络的输入,然后经过神经网络分析后得到动作的 Q 值,这样就没必要在表格中记录 Q 值,而是直接使用神经网络预测Q值。同时也可以只输入状态输入神经网络,通过神经网络预测得到所有的动作,然后再根据价值选择策略函数选择出对应的动作。
这种方法不采用显示地表示策略,而是纯使用规划技术来选择行动,例如 模型预测控制 (model-predictive control, MPC)。在模型预测控制中,智能体每次观察环境的时候,都会计算得到一个对于当前模型最优的规划,这里的规划指的是未来一个固定时间段内,智能体会采取的所有行动(通过学习值函数,规划算法可能会考虑到超出范围的未来奖励)。智能体先执行规划的第一个行动,然后立即舍弃规划的剩余部分。每次准备和环境进行互动时,它会计算出一个新的规划,从而避免执行小于规划范围的规划给出的行动。
Expert Iteration是纯规划的后来之作,它使用、学习策略的显示表示形式:
智能体在模型中应用了一种规划算法,类似蒙特卡洛树搜索(Monte Carlo Tree Search),通过对当前策略进行采样生成规划的候选行为。这种算法得到的行动比策略本身生成的要好,所以相对于策略来说,它是“专家”。随后更新策略,以产生更类似于规划算法输出的行动。
""" A simple example for Reinforcement Learning using table lookup Q-learning method. An agent "o" is on the left of a 1 dimensional world, the treasure is on the rightmost location. Run this program and to see how the agent will improve its strategy of finding the treasure. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """ import numpy as np import pandas as pd import time np.random.seed(2) # reproducible N_STATES = 32 # the length of the 1 dimensional world ACTIONS = ['left', 'right'] # available actions EPSILON = 0.9 # greedy police ALPHA = 0.1 # learning rate GAMMA = 0.9 # discount factor MAX_EPISODES = 999 # maximum episodes FRESH_TIME = 0.1 # fresh time for one move def build_q_table(n_states, actions): table = pd.DataFrame( np.zeros((n_states, len(actions))), # q_table initial values columns=actions, # actions's name ) # print(table) # show table return table def choose_action(state, q_table): # This is how to choose an action state_actions = q_table.iloc[state, :] if (np.random.uniform() > EPSILON) or ((state_actions == 0).all()): # act non-greedy or state-action have no value action_name = np.random.choice(ACTIONS) else: # act greedy action_name = state_actions.idxmax() # replace argmax to idxmax as argmax means a different function in newer version of pandas return action_name def get_env_feedback(S, A): # This is how agent will interact with the environment if A == 'right': # move right if S == N_STATES - 2: # terminate S_ = 'terminal' R = 1 else: S_ = S + 1 R = 0 else: # move left R = 0 if S == 0: S_ = S # reach the wall else: S_ = S - 1 return S_, R def update_env(S, episode, step_counter): # This is how environment be updated env_list = ['-']*(N_STATES-1) + ['T'] # '---------T' our environment if S == 'terminal': interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter) print('\r{}'.format(interaction), end='') time.sleep(2) print('\r ', end='') else: env_list[S] = 'o' interaction = ''.join(env_list) print('\r{}'.format(interaction), end='') time.sleep(FRESH_TIME) def rl(): # main part of RL loop q_table = build_q_table(N_STATES, ACTIONS) for episode in range(MAX_EPISODES): step_counter = 0 S = 0 is_terminated = False update_env(S, episode, step_counter) while not is_terminated: A = choose_action(S, q_table) S_, R = get_env_feedback(S, A) # take action & get next state and reward q_predict = q_table.loc[S, A] # q_predict: current action's action_probability if S_ != 'terminal': q_target = R + GAMMA * q_table.iloc[S_, :].max() # next state is not terminal else: q_target = R # next state is terminal is_terminated = True # terminate this episode q_table.loc[S, A] += ALPHA * (q_target - q_predict) # update the current action's action_probability, which make the currently selected action more likely to occur, S = S_ # move to next state update_env(S, episode, step_counter+1) step_counter += 1 print('\r\nQ-table: {0}\n'.format(q_table)) return q_table if __name__ == "__main__": q_table = rl() print('\r\nQ-table:\n') print(q_table)
Q-table: left right 0 0.0 0.0 1 0.0 0.0 2 0.0 0.0 3 0.0 0.0 4 0.0 0.1 5 0.0 0.0 Q-table: left right 0 0.0 0.000 1 0.0 0.000 2 0.0 0.000 3 0.0 0.009 4 0.0 0.190 5 0.0 0.000 Q-table: left right 0 0.0 0.00000 1 0.0 0.00000 2 0.0 0.00081 3 0.0 0.02520 4 0.0 0.27100 5 0.0 0.00000 Q-table: left right 0 0.0 0.000000 1 0.0 0.000073 2 0.0 0.002997 3 0.0 0.047070 4 0.0 0.343900 5 0.0 0.000000 Q-table: left right 0 0.00000 0.000007 1 0.00000 0.000572 2 0.00003 0.006934 3 0.00000 0.073314 4 0.00000 0.409510 5 0.00000 0.000000 Q-table: left right 0 0.00000 0.000057 1 0.00000 0.001138 2 0.00003 0.012839 3 0.00000 0.102839 4 0.00000 0.468559 5 0.00000 0.000000 Q-table: left right 0 0.00000 0.000154 1 0.00000 0.002180 2 0.00003 0.020810 3 0.00000 0.134725 4 0.00000 0.521703 5 0.00000 0.000000 Q-table: left right 0 0.00000 0.000335 1 0.00000 0.003835 2 0.00003 0.030854 3 0.00000 0.168206 4 0.00000 0.569533 5 0.00000 0.000000 Q-table: left right 0 0.00000 0.000647 1 0.00000 0.006228 2 0.00003 0.042907 3 0.00000 0.202643 4 0.00000 0.612580 5 0.00000 0.000000 Q-table: left right 0 0.00000 0.001142 1 0.00000 0.009467 2 0.00003 0.056855 3 0.00000 0.237511 4 0.00000 0.651322 5 0.00000 0.000000 Q-table: left right 0 0.00000 0.001880 1 0.00000 0.013637 2 0.00003 0.072545 3 0.00000 0.272379 4 0.00000 0.686189 5 0.00000 0.000000 Q-table: left right 0 0.000000 0.002920 1 0.000000 0.018803 2 0.000030 0.089805 3 0.000000 0.337965 4 0.027621 0.717570 5 0.000000 0.000000 Q-table: left right 0 0.000000 0.004320 1 0.000000 0.025005 2 0.000030 0.111241 3 0.000000 0.368750 4 0.027621 0.745813 5 0.000000 0.000000 Q-table: left right 0 0.000000 0.006138 1 0.000000 0.032516 2 0.000030 0.133304 3 0.000000 0.398998 4 0.027621 0.771232 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.008451 1 0.000000 0.041262 2 0.000030 0.155884 3 0.000000 0.428509 4 0.027621 0.794109 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.011319 1 0.000000 0.051165 2 0.000030 0.178861 3 0.000000 0.457128 4 0.027621 0.814698 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.014792 1 0.000000 0.062146 2 0.000030 0.202117 3 0.000000 0.484738 4 0.027621 0.833228 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.018906 1 0.000000 0.074122 2 0.000030 0.225531 3 0.000000 0.511255 4 0.027621 0.849905 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.023687 1 0.000000 0.087008 2 0.000030 0.248991 3 0.000000 0.536621 4 0.027621 0.864915 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.029149 1 0.000000 0.100716 2 0.000030 0.272388 3 0.000000 0.560801 4 0.027621 0.878423 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.035298 1 0.000000 0.115159 2 0.000030 0.295621 3 0.000000 0.604459 4 0.077399 0.890581 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.042133 1 0.000000 0.130249 2 0.000030 0.320461 3 0.000000 0.624166 4 0.077399 0.901523 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.049642 1 0.000000 0.146066 2 0.000030 0.344589 3 0.000000 0.642886 4 0.077399 0.911371 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.057824 1 0.000000 0.162472 2 0.000030 0.367990 3 0.000000 0.660621 4 0.077399 0.920234 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.066664 1 0.000000 0.179344 2 0.000030 0.390647 3 0.000000 0.677380 4 0.077399 0.928210 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.076138 1 0.000000 0.196568 2 0.000030 0.412547 3 0.000000 0.693181 4 0.077399 0.935389 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.086216 1 0.000000 0.214040 2 0.000030 0.433678 3 0.000000 0.708048 4 0.077399 0.941850 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.096858 1 0.000000 0.231667 2 0.000030 0.454035 3 0.000000 0.722009 4 0.077399 0.947665 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.108022 1 0.000000 0.249364 2 0.000030 0.473612 3 0.000000 0.735098 4 0.077399 0.952899 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.119663 1 0.000000 0.267053 2 0.000030 0.492410 3 0.000000 0.747349 4 0.077399 0.957609 5 0.000000 0.000000 Q-table: left right 0 0.000552 0.131731 1 0.000000 0.284664 2 0.000030 0.510430 3 0.000000 0.769104 4 0.137951 0.961848 5 0.000000 0.000000 Episode 32: total_steps = 5
为了更好地理解这个算法,我们逐步分析Q-table的值变化过程,以 N_STATES=8 为例。
Q-table: left right 0 0.0 0.0 1 0.0 0.0 2 0.0 0.0 3 0.0 0.0 4 0.0 0.0 5 0.0 0.0 6 0.0 0.0 7 0.0 0.0
由于整个Q-table所有的 (s,a) 都为零,所以agent在探索过程中,q_target,q_predict都为零,此时不存在Q-table更新动作。
此时 q_target=1;q_predict=0,所以 ALPHA * (q_target - q_predict) = 0.1,且只有 (S=6,A=right) 能获得这次Q-table概率更新。
注意!这是Q-table的第一次概率更新,这个概率更新的根源是”目标达到的R奖励“,而 (S=6,A=right) 是引导算法获得这次奖励的行动路径。后续的所有概率更新都是从这里开始的。
Q-table: left right 0 0.0 0.000 1 0.0 0.000 2 0.0 0.000 3 0.0 0.000 4 0.0 0.000 5 0.0 0.000 6 0.0 0.100 7 0.0 0.000
此时,整个Q-table,除了S=6,其他的 (s,a) 依然都为零,所以小圆点在 S=5 之前不管如何做随机运动,都不会发生概率更新动作。
有趣地”分水岭“发生在 S=5 的时候,在 S=5 这个点,由于此时 (S=5,a)依然都为零,所以小圆点依然是在做随机运动,
计算得到 (S=5, A=right) = 0.09。
0 0.0 0.000 1 0.0 0.000 2 0.0 0.000 3 0.0 0.000 4 0.0 0.000 5 0.0 0.009 6 0.0 0.100 7 0.0 0.000
在 S=6这个状态上,
所以有q_predict = q_table.loc[6, A] = 0.1;ALPHA * (q_target - q_predict) = 0.09;
所以有q_table.loc[6, right] += ALPHA * (q_target - q_predict) = 0.1+0.09 = 0.19;
0 0.0 0.000 1 0.0 0.000 2 0.0 0.000 3 0.0 0.000 4 0.0 0.000 5 0.0 0.009 6 0.0 0.190 7 0.0 0.000
和前面的分析过程类似,”分水岭“发生在 S=4 的时候,在 S=4 这个点,由于此时 (S=4,a)依然都为零,所以小圆点依然是在做随机运动,
计算得到 (S=4, A=right) = 0.09。
0 0.0 0.00000 1 0.0 0.00000 2 0.0 0.00000 3 0.0 0.00000 4 0.0 0.00081 5 0.0 0.00900 6 0.0 0.19000 7 0.0 0.00000
我们可以把Q函数视为一个在Q-Table上滚动的读取器,用于寻找与当前状态关联的行以及与动作关联的列。它会从相匹配的单元格中返回 Q 值。这就是未来奖励的期望。
在我们探索环境(environment)之前,Q-table 会给出相同的任意的设定值(大多数情况下是 0)。随着对环境的持续探索,这个 Q-table 会通过迭代地使用 Bellman 方程(动态规划方程)更新 Q(s,a) 来给出越来越好的近似。
如果每个Q值都等于零,我们就需要权衡探索/利用(exploration/exploitation)的程度了,思路就是,在一开始,我们将使用 epsilon 贪婪策略:
""" Reinforcement learning maze example. Red rectangle: explorer. Black rectangles: hells [reward = -1]. Yellow bin circle: paradise [reward = +1]. All other states: ground [reward = 0]. This script is the main part which controls the update method of this example. The RL is in RL_brain.py. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """ from maze_env import Maze from RL_brain import QLearningTable def update(): for episode in range(100): # initial observation observation = env.reset() print("observation: ", observation) print("str(observation): ", str(observation)) while True: # fresh env env.render() # RL choose action based on observation action = RL.choose_action(str(observation)) # RL take action and get next observation and reward observation_, reward, done = env.step(action) # RL learn from this transition RL.learn(str(observation), action, reward, str(observation_)) # swap observation observation = observation_ # break while loop when end of this episode if done: break # end of game print('game over') env.destroy() if __name__ == "__main__": env = Maze() print("env.n_actions: ", env.n_actions) print("list(range(env.n_actions)): ", list(range(env.n_actions))) RL = QLearningTable(actions=list(range(env.n_actions))) env.after(100, update) env.mainloop()
""" Reinforcement learning maze example. Red rectangle: explorer. Black rectangles: hells [reward = -1]. Yellow bin circle: paradise [reward = +1]. All other states: ground [reward = 0]. This script is the environment part of this example. The RL is in RL_brain.py. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """ import random import numpy as np import time import sys if sys.version_info.major == 2: import Tkinter as tk else: import tkinter as tk UNIT = 40 # pixels MAZE_H = 16 # grid height MAZE_W = 16 # grid width class Maze(tk.Tk, object): def __init__(self): super(Maze, self).__init__() self.action_space = ['u', 'd', 'l', 'r'] self.n_actions = len(self.action_space) self.title('maze') self.geometry('{0}x{1}'.format(MAZE_W * UNIT, MAZE_H * UNIT)) self._build_maze() def _build_maze(self): self.canvas = tk.Canvas(self, bg='white', height=MAZE_H * UNIT, width=MAZE_W * UNIT) # create grids for c in range(0, MAZE_W * UNIT, UNIT): x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT self.canvas.create_line(x0, y0, x1, y1) for r in range(0, MAZE_H * UNIT, UNIT): x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r self.canvas.create_line(x0, y0, x1, y1) # create origin #origin = np.array([random.randint(UNIT, UNIT*MAZE_W), random.randint(UNIT, UNIT*MAZE_W)]) origin = np.array([20+(UNIT)*random.randint(1, 3), 20+(UNIT)*random.randint(1, 3)]) # hell hell1_center = origin + np.array([UNIT * 2, UNIT]) self.hell1 = self.canvas.create_rectangle( hell1_center[0] - 15, hell1_center[1] - 15, hell1_center[0] + 15, hell1_center[1] + 15, fill='black') # hell hell2_center = origin + np.array([UNIT, UNIT * 2]) self.hell2 = self.canvas.create_rectangle( hell2_center[0] - 15, hell2_center[1] - 15, hell2_center[0] + 15, hell2_center[1] + 15, fill='black') # create oval oval_center = origin + UNIT * 2 self.oval = self.canvas.create_oval( oval_center[0] - 15, oval_center[1] - 15, oval_center[0] + 15, oval_center[1] + 15, fill='yellow') # create red rect self.rect = self.canvas.create_rectangle( origin[0] - 15, origin[1] - 15, origin[0] + 15, origin[1] + 15, fill='red') # pack all self.canvas.pack() def reset(self): self.update() time.sleep(0.5) self.canvas.delete(self.rect) origin = np.array([20, 20]) self.rect = self.canvas.create_rectangle( origin[0] - 15, origin[1] - 15, origin[0] + 15, origin[1] + 15, fill='red') # return observation return self.canvas.coords(self.rect) def step(self, action): s = self.canvas.coords(self.rect) base_action = np.array([0, 0]) if action == 0: # up if s[1] > UNIT: base_action[1] -= UNIT elif action == 1: # down if s[1] < (MAZE_H - 1) * UNIT: base_action[1] += UNIT elif action == 2: # right if s[0] < (MAZE_W - 1) * UNIT: base_action[0] += UNIT elif action == 3: # left if s[0] > UNIT: base_action[0] -= UNIT self.canvas.move(self.rect, base_action[0], base_action[1]) # move agent s_ = self.canvas.coords(self.rect) # next state # reward function if s_ == self.canvas.coords(self.oval): reward = 1 done = True s_ = 'terminal' elif s_ in [self.canvas.coords(self.hell1), self.canvas.coords(self.hell2)]: reward = -1 done = True s_ = 'terminal' else: reward = 0 done = False return s_, reward, done def render(self): time.sleep(0.1) self.update() def update(): for t in range(10): s = env.reset() while True: env.render() a = 1 s, r, done = env.step(a) if done: break if __name__ == '__main__': env = Maze() env.after(100, update) env.mainloop()
""" This part of code is the Q learning brain, which is a brain of the agent. All decisions are made in here. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """ import numpy as np import pandas as pd class QLearningTable: def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9): self.actions = actions # a list self.lr = learning_rate self.gamma = reward_decay self.epsilon = e_greedy self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64) def choose_action(self, observation): self.check_state_exist(observation) # action selection if np.random.uniform() < self.epsilon: # choose best action state_action = self.q_table.loc[observation, :] # some actions may have the same value, randomly choose on in these actions action = np.random.choice(state_action[state_action == np.max(state_action)].index) else: # choose random action action = np.random.choice(self.actions) return action def learn(self, s, a, r, s_): self.check_state_exist(s_) q_predict = self.q_table.loc[s, a] if s_ != 'terminal': q_target = r + self.gamma * self.q_table.loc[s_, :].max() # next state is not terminal else: q_target = r # next state is terminal print("self.q_table: ", self.q_table) self.q_table.loc[s, a] += self.lr * (q_target - q_predict) # update def check_state_exist(self, state): if state not in self.q_table.index: # append new state to q table self.q_table = pd.concat([self.q_table, pd.DataFrame([pd.Series( [0]*len(self.actions), index=self.q_table.columns, name=state, )])])
import gymnasium as gym env = gym.make("LunarLander-v2", render_mode="human") observation, info = env.reset(seed=42) for _ in range(2000): action = env.action_space.sample() # this is where you would insert your policy observation, reward, terminated, truncated, info = env.step(action) if terminated or truncated: observation, info = env.reset() env.close()
""" Sarsa is a online updating method for Reinforcement learning. Unlike Q learning which is a offline updating method, Sarsa is updating while in the current trajectory. You will see the sarsa is more coward when punishment is close because it cares about all behaviours, while q learning is more brave because it only cares about maximum behaviour. """ from maze_env import Maze from RL_brain import SarsaTable def update(): for episode in range(100): # initial observation observation = env.reset() # RL choose action based on observation action = RL.choose_action(str(observation)) while True: # fresh env env.render() # RL take action and get next observation and reward observation_, reward, done = env.step(action) # RL choose action based on next observation action_ = RL.choose_action(str(observation_)) # RL learn from this transition (s, a, r, s, a) ==> Sarsa RL.learn(str(observation), action, reward, str(observation_), action_) # swap observation and action observation = observation_ action = action_ # break while loop when end of this episode if done: print("RL.q_table: ", RL.q_table) break # end of game print('game over') env.destroy() if __name__ == "__main__": env = Maze() RL = SarsaTable(actions=list(range(env.n_actions))) env.after(100, update) env.mainloop()
""" Reinforcement learning maze example. Red rectangle: explorer. Black rectangles: hells [reward = -1]. Yellow bin circle: paradise [reward = +1]. All other states: ground [reward = 0]. This script is the environment part of this example. The RL is in RL_brain.py. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """ import random import numpy as np import time import sys if sys.version_info.major == 2: import Tkinter as tk else: import tkinter as tk UNIT = 40 # pixels MAZE_H = 4 # grid height MAZE_W = 4 # grid width class Maze(tk.Tk, object): def __init__(self): super(Maze, self).__init__() self.action_space = ['u', 'd', 'l', 'r'] self.n_actions = len(self.action_space) self.title('maze') self.geometry('{0}x{1}'.format(MAZE_W * UNIT, MAZE_H * UNIT)) self._build_maze() def _build_maze(self): self.canvas = tk.Canvas(self, bg='white', height=MAZE_H * UNIT, width=MAZE_W * UNIT) # create grids for c in range(0, MAZE_W * UNIT, UNIT): x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT self.canvas.create_line(x0, y0, x1, y1) for r in range(0, MAZE_H * UNIT, UNIT): x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r self.canvas.create_line(x0, y0, x1, y1) # create origin origin = np.array([20, 20]) #origin = np.array([20 + (UNIT) * random.randint(1, 3), 20 + (UNIT) * random.randint(1, 3)]) # hell hell1_center = origin + np.array([UNIT * 2, UNIT]) self.hell1 = self.canvas.create_rectangle( hell1_center[0] - 15, hell1_center[1] - 15, hell1_center[0] + 15, hell1_center[1] + 15, fill='black') # hell hell2_center = origin + np.array([UNIT, UNIT * 2]) self.hell2 = self.canvas.create_rectangle( hell2_center[0] - 15, hell2_center[1] - 15, hell2_center[0] + 15, hell2_center[1] + 15, fill='black') # create oval oval_center = origin + UNIT * 2 self.oval = self.canvas.create_oval( oval_center[0] - 15, oval_center[1] - 15, oval_center[0] + 15, oval_center[1] + 15, fill='yellow') # create red rect self.rect = self.canvas.create_rectangle( origin[0] - 15, origin[1] - 15, origin[0] + 15, origin[1] + 15, fill='red') # pack all self.canvas.pack() def reset(self): self.update() time.sleep(0.5) self.canvas.delete(self.rect) origin = np.array([20, 20]) self.rect = self.canvas.create_rectangle( origin[0] - 15, origin[1] - 15, origin[0] + 15, origin[1] + 15, fill='red') # return observation return self.canvas.coords(self.rect) def step(self, action): s = self.canvas.coords(self.rect) base_action = np.array([0, 0]) if action == 0: # up if s[1] > UNIT: base_action[1] -= UNIT elif action == 1: # down if s[1] < (MAZE_H - 1) * UNIT: base_action[1] += UNIT elif action == 2: # right if s[0] < (MAZE_W - 1) * UNIT: base_action[0] += UNIT elif action == 3: # left if s[0] > UNIT: base_action[0] -= UNIT self.canvas.move(self.rect, base_action[0], base_action[1]) # move agent s_ = self.canvas.coords(self.rect) # next state # reward function if s_ == self.canvas.coords(self.oval): reward = 1 done = True s_ = 'terminal' elif s_ in [self.canvas.coords(self.hell1), self.canvas.coords(self.hell2)]: reward = -1 done = True s_ = 'terminal' else: reward = 0 done = False return s_, reward, done def render(self): time.sleep(0.1) self.update()
""" This part of code is the Q learning brain, which is a brain of the agent. All decisions are made in here. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """ import numpy as np import pandas as pd class RL(object): def __init__(self, action_space, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9): self.actions = action_space # a list self.lr = learning_rate self.gamma = reward_decay self.epsilon = e_greedy self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64) def check_state_exist(self, state): if state not in self.q_table.index: # append new state to q table self.q_table = pd.concat([self.q_table, pd.DataFrame([pd.Series( [0] * len(self.actions), index=self.q_table.columns, name=state, )])]) def choose_action(self, observation): self.check_state_exist(observation) # action selection if np.random.rand() < self.epsilon: # choose best action state_action = self.q_table.loc[observation, :] # some actions may have the same value, randomly choose on in these actions action = np.random.choice(state_action[state_action == np.max(state_action)].index) else: # choose random action action = np.random.choice(self.actions) return action def learn(self, *args): pass # off-policy class QLearningTable(RL): def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9): super(QLearningTable, self).__init__(actions, learning_rate, reward_decay, e_greedy) def learn(self, s, a, r, s_): self.check_state_exist(s_) q_predict = self.q_table.loc[s, a] if s_ != 'terminal': q_target = r + self.gamma * self.q_table.loc[s_, :].max() # next state is not terminal else: q_target = r # next state is terminal self.q_table.loc[s, a] += self.lr * (q_target - q_predict) # update # on-policy class SarsaTable(RL): def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9): super(SarsaTable, self).__init__(actions, learning_rate, reward_decay, e_greedy) def learn(self, s, a, r, s_, a_): self.check_state_exist(s_) q_predict = self.q_table.loc[s, a] if s_ != 'terminal': q_target = r + self.gamma * self.q_table.loc[s_, a_] # next state is not terminal else: q_target = r # next state is terminal self.q_table.loc[s, a] += self.lr * (q_target - q_predict) # update
""" Sarsa is a online updating method for Reinforcement learning. Unlike Q learning which is a offline updating method, Sarsa is updating while in the current trajectory. You will see the sarsa is more coward when punishment is close because it cares about all behaviours, while q learning is more brave because it only cares about maximum behaviour. """ from maze_env import Maze from RL_brain import SarsaLambdaTable def update(): for episode in range(100): # initial observation observation = env.reset() print("RL.q_table: ", RL.q_table) print("observation: ", observation) # RL choose action based on observation action = RL.choose_action(str(observation)) print("action = RL.choose_action(str(observation)) RL.q_table: ", RL.q_table) # initial all zero eligibility trace RL.eligibility_trace *= 0 while True: # fresh env env.render() # RL take action and get next observation and reward observation_, reward, done = env.step(action) # RL choose action based on next observation action_ = RL.choose_action(str(observation_)) # RL learn from this transition (s, a, r, s, a) ==> Sarsa RL.learn(str(observation), action, reward, str(observation_), action_) # swap observation and action observation = observation_ action = action_ # break while loop when end of this episode if done: print("RL.q_table: ", RL.q_table) break # end of game print('game over') env.destroy() if __name__ == "__main__": env = Maze() RL = SarsaLambdaTable(actions=list(range(env.n_actions))) env.after(100, update) env.mainloop()
""" Reinforcement learning maze example. Red rectangle: explorer. Black rectangles: hells [reward = -1]. Yellow bin circle: paradise [reward = +1]. All other states: ground [reward = 0]. This script is the environment part of this example. The RL is in RL_brain.py. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """ import numpy as np import time import sys if sys.version_info.major == 2: import Tkinter as tk else: import tkinter as tk UNIT = 40 # pixels MAZE_H = 16 # grid height MAZE_W = 16 # grid width class Maze(tk.Tk, object): def __init__(self): super(Maze, self).__init__() self.action_space = ['u', 'd', 'l', 'r'] self.n_actions = len(self.action_space) self.title('maze') self.geometry('{0}x{1}'.format(MAZE_W * UNIT, MAZE_H * UNIT)) self._build_maze() def _build_maze(self): self.canvas = tk.Canvas(self, bg='white', height=MAZE_H * UNIT, width=MAZE_W * UNIT) # create grids for c in range(0, MAZE_W * UNIT, UNIT): x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT self.canvas.create_line(x0, y0, x1, y1) for r in range(0, MAZE_H * UNIT, UNIT): x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r self.canvas.create_line(x0, y0, x1, y1) # create origin origin = np.array([20+40*2, 20+40*2]) # hell hell1_center = origin + np.array([UNIT * 2, UNIT]) self.hell1 = self.canvas.create_rectangle( hell1_center[0] - 15, hell1_center[1] - 15, hell1_center[0] + 15, hell1_center[1] + 15, fill='black') # hell hell2_center = origin + np.array([UNIT, UNIT * 2]) self.hell2 = self.canvas.create_rectangle( hell2_center[0] - 15, hell2_center[1] - 15, hell2_center[0] + 15, hell2_center[1] + 15, fill='black') # create oval oval_center = origin + UNIT * 2 self.oval = self.canvas.create_oval( oval_center[0] - 15, oval_center[1] - 15, oval_center[0] + 15, oval_center[1] + 15, fill='yellow') # create red rect self.rect = self.canvas.create_rectangle( origin[0] - 15, origin[1] - 15, origin[0] + 15, origin[1] + 15, fill='red') # pack all self.canvas.pack() def reset(self): self.update() time.sleep(0.5) self.canvas.delete(self.rect) origin = np.array([20, 20]) self.rect = self.canvas.create_rectangle( origin[0] - 15, origin[1] - 15, origin[0] + 15, origin[1] + 15, fill='red') # return observation return self.canvas.coords(self.rect) def step(self, action): s = self.canvas.coords(self.rect) base_action = np.array([0, 0]) if action == 0: # up if s[1] > UNIT: base_action[1] -= UNIT elif action == 1: # down if s[1] < (MAZE_H - 1) * UNIT: base_action[1] += UNIT elif action == 2: # right if s[0] < (MAZE_W - 1) * UNIT: base_action[0] += UNIT elif action == 3: # left if s[0] > UNIT: base_action[0] -= UNIT self.canvas.move(self.rect, base_action[0], base_action[1]) # move agent s_ = self.canvas.coords(self.rect) # next state # reward function if s_ == self.canvas.coords(self.oval): reward = 1 done = True s_ = 'terminal' elif s_ in [self.canvas.coords(self.hell1), self.canvas.coords(self.hell2)]: reward = -1 done = True s_ = 'terminal' else: reward = 0 done = False return s_, reward, done def render(self): time.sleep(0.05) self.update()
""" This part of code is the Q learning brain, which is a brain of the agent. All decisions are made in here. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """ import numpy as np import pandas as pd class RL(object): def __init__(self, action_space, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9): self.actions = action_space # a list self.lr = learning_rate self.gamma = reward_decay self.epsilon = e_greedy self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64) def check_state_exist(self, state): if state not in self.q_table.index: # append new state to q table self.q_table = pd.concat([self.q_table, pd.DataFrame([pd.Series( [0] * len(self.actions), index=self.q_table.columns, name=state, )])]) def choose_action(self, observation): self.check_state_exist(observation) # action selection if np.random.rand() < self.epsilon: # choose best action state_action = self.q_table.loc[observation, :] # some actions may have the same value, randomly choose on in these actions action = np.random.choice(state_action[state_action == np.max(state_action)].index) else: # choose random action action = np.random.choice(self.actions) return action def learn(self, *args): pass # backward eligibility traces class SarsaLambdaTable(RL): def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9, trace_decay=0.9): super(SarsaLambdaTable, self).__init__(actions, learning_rate, reward_decay, e_greedy) # backward view, eligibility trace. self.lambda_ = trace_decay self.eligibility_trace = self.q_table.copy() def check_state_exist(self, state): if state not in self.q_table.index: # append new state to q table to_be_append = pd.DataFrame([pd.Series( [0] * len(self.actions), index=self.q_table.columns, name=state, )]) #print("to_be_append: ", to_be_append) self.q_table = pd.concat([self.q_table, to_be_append]) #print("after pd.concat self.q_table: ", self.q_table) # also update eligibility trace self.eligibility_trace = pd.concat([self.eligibility_trace, to_be_append]) def learn(self, s, a, r, s_, a_): self.check_state_exist(s_) q_predict = self.q_table.loc[s, a] if s_ != 'terminal': q_target = r + self.gamma * self.q_table.loc[s_, a_] # next state is not terminal else: q_target = r # next state is terminal error = q_target - q_predict # increase trace amount for visited state-action pair # Method 1: # self.eligibility_trace.loc[s, a] += 1 # Method 2: self.eligibility_trace.loc[s, :] *= 0 self.eligibility_trace.loc[s, a] = 0.5 # Q update self.q_table += self.lr * error * self.eligibility_trace # decay eligibility trace after update self.eligibility_trace *= self.gamma*self.lambda_
