强化学习基础 - 机器人走迷宫

发布于 / 机器学习 / 0 条评论

引言

在以往的博文中,我们介绍了深度学习。深度学习是一种通过多层神经网络实现自动化特征提取,从而实现图像识别、语音识别、自然语言处理等目的。

与深度学习不同,强化学习是一种通过智能体与环境的交互来学习最优策略的方法。举一个简单的例子:

让机器人走这个赛博迷宫

A表示机器人现在的位置,S表示起点(A目前就在起点),G表示终点,█表示墙。如果机器人撞了墙,我们就给它点惩♂罚;如果机器人走到了终点G,我们就给它奖励;当然,为了防止机器人原地踏步,我们定义每走一步都要接受一点小小的惩罚(不过这个惩罚远小于撞墙惩罚)

我们定义损失函数为惩罚,去优化这个模型,最后得到的效果必然是机器人不撞墙的走到终点。这个模型就是一个强化学习的模型。

虽然我们之前研究过使用BFS走迷宫,但是我们还可以使用更高级的强化学习方法,通过机器人与环境的交互,让机器人自我学习如何走迷宫,实现解决走迷宫这个问题。

对于机器学习,我们通常需要建立数学模型,然后迭代训练以最小化损失。强化学习也类似,我们先建立马尔可夫决策过程数学模型,并使用强化学习通过反复尝试来寻找最优策略。

马尔可夫决策过程(MDP)

所谓“马尔可夫决策过程”,就是Markov Decision Process(MDP),简单的来说就是:

在某种情景下,我们需要连续的做出决策,而且每个决策会带来一定的结果和新的状态。

例如机器人走迷宫,需要连续的做出往哪个方向走的决策,每个决策会产生不同的后果(撞墙受惩♂罚,原地踏步受小惩罚,到终点得到奖励),而且每个决策后,会使机器人到达新的状态(机器人位置变了)。

MDP的四个关键要素是:状态(State)、动作(Action)、奖励(Reward)和转移概率(Transition Probability)

  • 状态:状态是描述当前情景的信息,在机器人走迷宫的例子中,状态就是机器人当前的位置、当前的奖励/惩罚值。
  • 动作:在每个状态下可以做出的选择,例如机器人可以往上,往下,往左,往右走。动作会引起状态的变化
  • 奖励:每个动作之后得到的反馈和好处。例如撞墙了,我们给它扣0.5分,没撞墙但是没到终点,我们给它扣0.1分
  • 转移概率:在当前状态下执行某个动作后,环境会如何变化,也就是下个状态会是怎样的。例如机器人在行走的过程中,可能因为环境因素滑倒、偏移等,导致机器人执行动作后不能100%的达到预期状态(我们目前的游戏中不考虑这一点)

Q-learning

强化学习中有四个核心概念:策略(Policy)、价值函数(Value Function)、回报(Return)和最优策略(Optimal Policy)

  • 策略:决策规则,每个状态下应该做出什么策略
  • 价值函数:从某个状态出发,执行一个策略,最终能获得的长期奖励的期望。
  • 回报:从某个时刻开始,后续奖励的累计值。
    • 折扣回报:在不同的问题中,短期回报和长期回报的重要程度是不同的,所以还需提出折扣回报的概念。如果短期奖励更为重要,则需要对未来奖励做折扣处理,即乘以一个0-1之间的值作为折扣因子使得未来奖励降低
  • 最优策略:从任何状态出发,得到的最大累计奖励的策略

强化学习中,Q-learning是一种经典算法,通过学习动作价值函数$Q(s,a)$ 来获得最优策略,其中s为状态state,a为动作action

  • 对于每一对状态-动作组合$(s,a)$,Q会维护一个Q值,即期望的回报。
  • 初始时,可以把Q值设置成随机数,当然设置成0也可以。
  • 每次执行一个动作后,Q值会更新,根据当前奖励和未来状态的Q值来更新

Q值更新公式如下

$Q(s_t,a_t) \leftarrow Q(s_t,a_t)+\alpha [r_t+\gamma \mathop{max}\limits_a Q(s_{t+1},a)-Q(s_t,a_t)]$

公式中$s_t$为当前状态$a_t$为当前动作,$r_t$为当前奖励,$\gamma$为折扣因子,控制未来奖励重要性,$\alpha$为学习率,控制迭代更新速度

当所有的Q值都收敛时,选择每个状态下Q值最大的动作即为最优策略。

机器人走迷宫实现

现在你已经掌握了强化学习的基础,让我们使用python解决机器人走迷宫的问题吧!

首先我们引入必要的库

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import time
import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

接着我们定义迷宫环境

# 迷宫环境定义
class MazeEnv:
    def __init__(self):
        # 统一数据类型: 0空地,1墙,2起点,3终点
        self.maze = np.array([
            [2, 0, 1, 0, 3],
            [1, 0, 1, 0, 1],
            [0, 0, 1, 0, 1],
            [0, 1, 0, 0, 1],
            [0, 0, 0, 1, 1]
        ])
        # 初始化起点和终点位置
        self.start_pos = tuple(np.argwhere(self.maze == 2)[0])
        self.goal_pos = tuple(np.argwhere(self.maze == 3)[0])
        # 重置环境
        self.reset()

    def reset(self):
        # 让智能体(机器人)的位置恢复到起始位置
        self.agent_pos = self.start_pos
        return self.get_state()

    def get_state(self):
        # 这个函数的目的是获取一个矩阵表示当前状态。我们定义矩阵中墙的位置为-1,智能体位置为1,目标位置为0.5
        state = np.zeros_like(self.maze, dtype=np.float32)
        state[self.maze == 1] = -1
        state[self.agent_pos] = 1
        state[self.goal_pos] = 0.5
        # 将矩阵flatten,作为状态向量
        return state.flatten()

    def step(self, action):
        # 有四种行动方向,分别是左、右、上、下
        moves = [(-1,0), (1,0), (0,-1), (0,1)]
        new_pos = (self.agent_pos[0] + moves[action][0], 
                   self.agent_pos[1] + moves[action][1])

        # 默认给一个-0.1的惩罚。防止机器人"不思进取"来回踱步
        reward = -0.1
        done = False

        # 判定机器人是否越界
        if (0 <= new_pos[0] < 5 and 0 <= new_pos[1] < 5):
            if self.maze[new_pos] != 1:
                self.agent_pos = new_pos  # 只有非墙才移动
            else:
                reward = -0.5  # 撞墙给予更严厉惩罚
        else:
            reward = -0.8  # 越界也惩罚更大

        # 如果到达终点,给一个大大滴奖励
        if self.agent_pos == self.goal_pos:
            reward = 1
            done = True

        return self.get_state(), reward, done

    def render(self):
        # 在控制台打印机器人当前的状态
        os.system('cls' if os.name == 'nt' else 'clear')
        maze_copy = np.array(self.maze, dtype=object)
        maze_copy[maze_copy == 0] = ' '
        maze_copy[maze_copy == 1] = '█'
        maze_copy[maze_copy == 2] = 'S'
        maze_copy[maze_copy == 3] = 'G'
        maze_copy[self.agent_pos] = 'A'
        print("\n".join(["".join(row) for row in maze_copy]))
        time.sleep(0.3)

接着我们定义DQN网络

class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(state_size, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, action_size)
        )

    def forward(self, x):
        return self.layers(x)

这里我们定义了一个DQN网络,输入状态向量,通过隐藏层,输出决策行为

接着我们写DQN的训练函数

def train():
    # 初始化一个迷宫实例
    env = MazeEnv()

    # 状态空间和动作空间的大小
    state_size = env.get_state().shape[0]
    action_size = 4

    # 初始化一个DQN实例
    dqn = DQN(state_size, action_size).to(device) 

    # 定义Adam优化器,学习率为0.001
    optimizer = optim.Adam(dqn.parameters(), lr=0.001)

    # 损失函数为MSE
    criterion = nn.MSELoss()

    # 参数
    epsilon = 1.0  # 初始探索概率
    epsilon_decay = 0.995    # 探索概率的衰减率
    epsilon_min = 0.01   # 探索概率最小值
    gamma = 0.9   # 折扣因子
    episodes = 300    # 游戏次数
    memory = []    # 经验数据

    # 开始循环玩游戏
    for episode in range(episodes):
        # 每次游戏前还原到初始状态
        state = env.reset()
        done = False
        step = 0

        # 防止重复循环,最多允许走50步
        while not done and step < 50:
            step += 1
            
            # 有epsilon的概率随机探索,有1-epsilon的概率根据模型返回的策略探索
            if random.random() < epsilon:
                action = random.randint(0, action_size - 1)
            else:
                with torch.no_grad():
                    q_values = dqn(torch.tensor(state).to(device))
                    action = torch.argmax(q_values).item()

            # 执行动作,获得新状态
            next_state, reward, done = env.step(action)

            # 存储经验到记忆池
            memory.append((state, action, reward, next_state, done))
            state = next_state

            # 及时剔除老旧的经验
            if len(memory) > 1000:
                memory.pop(0)

            # 从经验池抽选32个经验,优化模型
            if len(memory) >= 32:
                batch = random.sample(memory, 32)
                states, actions, rewards, next_states, dones = zip(*batch)

                states = torch.tensor(states).to(device)
                actions = torch.tensor(actions).unsqueeze(1).to(device)
                rewards = torch.tensor(rewards).to(device)
                next_states = torch.tensor(next_states).to(device)
                dones = torch.tensor(dones, dtype=torch.float32).to(device)

                q_values = dqn(states).gather(1, actions).squeeze()
                next_q_values = dqn(next_states).max(1)[0]
                targets = rewards + gamma * next_q_values * (1 - dones)

                loss = criterion(q_values, targets.detach())

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        epsilon = max(epsilon_min, epsilon * epsilon_decay)
        if episode % 50 == 0:
            print(f'Episode: {episode}, Epsilon: {epsilon:.2f}')

    torch.save(dqn.state_dict(), 'dqn_maze.pth')
    print(dqn.state_dict())
    print('训练完成,模型保存为 dqn_maze.pth')

接着让我们的机器人玩迷宫吧!

def play():
    env = MazeEnv()
    state_size = env.get_state().shape[0]
    action_size = 4

    dqn = DQN(state_size, action_size)
    dqn.load_state_dict(torch.load('dqn_maze.pth'))
    dqn.eval()

    state = env.reset()
    done = False
    step = 0
    env.render()

    while not done and step < 20:
        step += 1
        with torch.no_grad():
            q_values = dqn(torch.tensor(state))
            action = torch.argmax(q_values).item()
        state, reward, done = env.step(action)
        env.render()
        print()

    if done:
        print("成功到达终点!")
    else:
        print("未能到达终点。")

运行结果:

转载原创文章请注明,转载自: 斐斐のBlog » 强化学习基础 - 机器人走迷宫
目前还没有评论,快来抢沙发吧~