基于 openYuanrong 实现强化学习走迷宫#

强化学习是机器学习里面的一个分支,机器通过观察环境并采取行动来与其交互,以取得最大化的预期收益。简而言之,就是让机器学着如何在环境中通过不断的试错、尝试,学习、累积经验拿到高奖励。

本示例将向您展示如何使用 openYuanrong 训练一个简单的策略来玩迷宫游戏,它包含以下内容:

  • 如何使用有状态函数模拟迷宫游戏。

  • 如何在多个有状态函数实例之间通过数据对象共享游戏策略。

方案介绍#

我们使用多个 openYuanrong 有状态函数实例来模拟迷宫游戏并记录游戏经验,这些经验值用于更新游戏策略。更新后的策略在下个回合的模拟游戏中继续使用,不断循环优化。

准备工作#

参考在主机上部署完成 openYuanrong 部署。

实现流程#

设置迷宫环境#

我们用类 Environment 创建一个 5x5 的网格迷宫环境,起点为(0,0),终点为(4,4)。变量 action_space 定义在网格中可以执行的动作个数,即上下左右四个移动方向。在网格中,我们设定走到终点奖励 10 分,同时设置了一些陷阱稍微增加游戏的难度,如果玩家移动到陷阱位置,会受到扣除 10 分的惩罚。step 方法定义了如何在网格中移动,如果移动会越出边界,则判定移动后仍然在原来的位置。

class Environment:
    def __init__(self):
        self.state, self.goal = (0, 0), (4, 4)

        self.action_space = 4 # 下(0),左(1),上(2),右(3)
        self.maze_space = (5, 5)
        self.maze = np.zeros((self.maze_space[0], self.maze_space[1]))

        # 走出迷宫奖励10分
        self.maze[self.goal] = 10

        # 走到陷阱,扣除5分
        self.maze[(0, 3)] = -10
        self.maze[(1, 1)] = -10
        self.maze[(2, 2)] = -10
        self.maze[(2, 4)] = -10
        self.maze[(3, 0)] = -10
        self.maze[(3, 2)] = -10
        self.maze[(4, 2)] = -10

    def reset(self):
        self.state = (0, 0)
        return self.get_state()

    def get_state(self):
        return (self.state[0], self.state[1])

    def get_reward(self):
        return self.maze[(self.state[0], self.state[1])]

    def is_done(self):
        return self.state == self.goal

    def step(self, action):
        if action == 0:  # 下
            self.state = (min(self.state[0] + 1, self.maze_space[0] - 1), self.state[1])
        elif action == 1:  # 左
            self.state = (self.state[0], max(self.state[1] - 1, 0))
        elif action == 2:  # 上
            self.state = (max(self.state[0] - 1, 0), self.state[1])
        elif action == 3:  # 右
            self.state = (self.state[0], min(self.state[1] + 1, self.maze_space[1] - 1))
        else:
            raise ValueError("Invalid action")

        return self.get_state(), self.get_reward(), self.is_done()

定义游戏策略#

任何强化学习算法,都需要使用一种方法反复模拟以收集经验数据。这里,我们创建一个 Policy 策略类,用于决定如何在迷宫中行动。它的 get_action 方法接受当前玩家所在的状态并返回下一步动作,策略上通过创建一个“状态-动作-价值”表来选择最高价值动作,同时辅以随机动作探索其他可能。每个游戏回合,因为不同的动作会带来不同的奖励,通过记录这些数据,使用 update 方法不断更新我们的策略表,逼近不同动作能带来得最佳值,找到最优的行动路径。

class Policy:
    def __init__(self, env):
        self.actions = np.arange(0, env.action_space)
        self.action_table = np.zeros((env.maze_space[0], env.maze_space[1], env.action_space))

    def get_action(self, state, epsilon=0.8):
        if random.uniform(0, 1) < epsilon:
            return np.random.choice(self.actions)  # 以一定概率随机选择动作
        return np.argmax(self.action_table[state[0], state[1], self.actions])

    def update(self, experiences, weight=0.1, discount_factor=0.9):
        # 使用回合经验更新策略并返回本次走过的路径用于展示
        route = []
        for state, action, reward, next_state in experiences:
            route.append(next_state)
            next_max = np.max(self.action_table[next_state])
            value = self.action_table[state][action]
            new_value = (1 - weight) * value + weight * (reward + discount_factor * next_max)
            self.action_table[state][action] = new_value

        return route

模拟游戏回合#

我们创建一个 Simulator 类来模拟玩迷宫游戏,rollout 方法会返回每回合游戏积累的经验。类通过 @yr.instance 注解使该类的实例可以在openYuanrong集群的节点上分布式运行。

@yr.instance
class Simulator(object):
    def __init__(self, env):
        self.env = env

    def rollout(self, policy, epsilon=0.8):
        # 玩一回合游戏,积累经验
        experiences = []
        state = self.env.reset()
        done = False
        while not done:
            action = policy.get_action(state, epsilon)
            next_state, reward, done = self.env.step(action)
            # 如果行动后状态没有变化就重走一次
            if next_state == state:
                continue
            experiences.append([state, action, reward, next_state])
            state = next_state
        return experiences

主流程#

我们通过 yr.init() 初始化openYuanrong环境并创建两个模拟器实例 simulators 分布式并行运行,每个回合更新一次策略,快速迭代获取经验。您可以简单地通过修改 simulators_num 和 episodes_num 参数扩展到更大的集群的运行。

Tip

我们指定了 Simulator 类实例运行所需的 cpu 和内存资源,在需要异构资源的场景中,您可以在部署集群的主从节点命令中增加 --npu_collection_mode--gpu_collection_enable 参数,openYuanrong会自动采集节点上的异构资源信息。实际使用时,通过 custom_resources 参数指定对应资源。

# 指定需要一张 3090 型号的 GPU 卡
opt.custom_resources = {"GPU/NVIDIA GeForce RTX 3090/count": 1}
# 指定需要一张任意型号的 NPU 卡
opt.custom_resources = {"NPU/.+/count": 1}
if __name__== "__main__" :
    # 初始化openYuanrong环境
    yr.init()

    # 创建两个模拟器,可根据openYuanrong集群大小调整
    simulators_num = 2
    # 每个模拟器玩500回合游戏
    episodes_num = 500

    env = Environment()
    policy = Policy(env)

    # 创建有状态函数Simulator类实例并设置其运行所需资源(1核CPU,1G内存)
    opt = yr.InvokeOptions(cpu=1000, memory=1024)
    simulators = [Simulator.options(opt).invoke(Environment()) for _ in range(simulators_num)]

    max_reward_route = float("-inf")
    for episode in range(episodes_num):
        policy_ref = yr.put(policy)
        experiences = [s.rollout.invoke(policy_ref) for s in simulators]

        # 等待所有结果返回
        while len(experiences) > 0:
            result = yr.wait(experiences)
            for xp in yr.get(result[0]):
                # 更新游戏策略,下个回合使用新策略
                route = policy.update(xp)
                # 计算当前回合的奖励,打印当前最高奖励回合的行动路径,用于观察算法收敛速度
                route_reward = sum(env.maze[state] for state in route)
                if max_reward_route < route_reward:
                    max_reward_route = route_reward
                    print(f"Episode {episode}, the optimal route is {route}, total reward {max_reward_route}")
            # 未返回的对象继续等待
            experiences = result[1]

    # 销毁实例,释放资源
    for s in simulators:
        s.terminate()
    yr.finalize()

运行程序#

完整代码
import numpy as np
import random
import yr

class Environment:
    def __init__(self):
        self.state, self.goal = (0, 0), (4, 4)

        self.action_space = 4 # 下(0),左(1),上(2),右(3)
        self.maze_space = (5, 5)
        self.maze = np.zeros((self.maze_space[0], self.maze_space[1]))

        # 走出迷宫奖励10分
        self.maze[self.goal] = 10

        # 走到陷阱,扣除5分
        self.maze[(0, 3)] = -10
        self.maze[(1, 1)] = -10
        self.maze[(2, 2)] = -10
        self.maze[(2, 4)] = -10
        self.maze[(3, 0)] = -10
        self.maze[(3, 2)] = -10
        self.maze[(4, 2)] = -10

    def reset(self):
        self.state = (0, 0)
        return self.get_state()

    def get_state(self):
        return (self.state[0], self.state[1])

    def get_reward(self):
        return self.maze[(self.state[0], self.state[1])]

    def is_done(self):
        return self.state == self.goal

    def step(self, action):
        if action == 0:  # 下
            self.state = (min(self.state[0] + 1, self.maze_space[0] - 1), self.state[1])
        elif action == 1:  # 左
            self.state = (self.state[0], max(self.state[1] - 1, 0))
        elif action == 2:  # 上
            self.state = (max(self.state[0] - 1, 0), self.state[1])
        elif action == 3:  # 右
            self.state = (self.state[0], min(self.state[1] + 1, self.maze_space[1] - 1))
        else:
            raise ValueError("Invalid action")

        return self.get_state(), self.get_reward(), self.is_done()


class Policy:
    def __init__(self, env):
        self.actions = np.arange(0, env.action_space)
        self.action_table = np.zeros((env.maze_space[0], env.maze_space[1], env.action_space))

    def get_action(self, state, epsilon=0.8):
        if random.uniform(0, 1) < epsilon:
            return np.random.choice(self.actions)  # 以一定概率随机选择动作
        return np.argmax(self.action_table[state[0], state[1], self.actions])

    def update(self, experiences, weight=0.1, discount_factor=0.9):
        # 使用回合经验更新策略并返回本次走过的路径用于展示
        route = []
        for state, action, reward, next_state in experiences:
            route.append(next_state)
            next_max = np.max(self.action_table[next_state])
            value = self.action_table[state][action]
            new_value = (1 - weight) * value + weight * (reward + discount_factor * next_max)
            self.action_table[state][action] = new_value

        return route


@yr.instance
class Simulator(object):
    def __init__(self, env):
        self.env = env

    def rollout(self, policy, epsilon=0.8):
        # 玩一回合游戏,积累经验
        experiences = []
        state = self.env.reset()
        done = False
        while not done:
            action = policy.get_action(state, epsilon)
            next_state, reward, done = self.env.step(action)
            # 如果行动后状态没有变化就重走一次
            if next_state == state:
                continue
            experiences.append([state, action, reward, next_state])
            state = next_state
        return experiences


if __name__== "__main__" :
    # 初始化openYuanrong环境
    yr.init()

    # 创建两个模拟器,可根据openYuanrong集群大小调整
    simulators_num = 2
    # 每个模拟器玩500回合游戏
    episodes_num = 500

    env = Environment()
    policy = Policy(env)

    # 创建有状态函数Simulator类实例并设置其运行所需资源(1核CPU,1G内存)
    opt = yr.InvokeOptions(cpu=1000, memory=1024)
    simulators = [Simulator.options(opt).invoke(Environment()) for _ in range(simulators_num)]

    max_reward_route = float("-inf")
    for episode in range(episodes_num):
        policy_ref = yr.put(policy)
        experiences = [s.rollout.invoke(policy_ref) for s in simulators]

        # 等待所有结果返回
        while len(experiences) > 0:
            result = yr.wait(experiences)
            for xp in yr.get(result[0]):
                # 更新游戏策略,下个回合使用新策略
                route = policy.update(xp)
                # 计算当前回合的奖励,打印当前最高奖励回合的行动路径,用于观察算法收敛速度
                route_reward = sum(env.maze[state] for state in route)
                if max_reward_route < route_reward:
                    max_reward_route = route_reward
                    print(f"Episode {episode}, the optimal route is {route}, total reward {max_reward_route}")
            # 未返回的对象继续等待
            experiences = result[1]

    # 销毁实例,释放资源
    for s in simulators:
        s.terminate()
    yr.finalize()

参考输出如下,路径中可能包含重复位置,您可以进一步优化算法比如把环境中的所有位置默认奖励设置为 -1。

Episode 0, the optimal route is [(1, 0), (0, 0), (1, 0), (1, 1), (1, 2), (2, 2), (2, 3), (2, 2), (3, 2), (3, 1), (3, 2), (4, 2), (3, 2), (4, 2), (4, 1), (4, 0), (4, 1), (3, 1), (4, 1), (3, 1), (2, 1), (3, 1), (3, 0), (4, 0), (3, 0), (4, 0), (4, 1), (4, 0), (4, 1), (3, 1), (3, 2), (3, 3), (2, 3), (3, 3), (4, 3), (4, 4)], total reward -100.0
Episode 1, the optimal route is [(1, 0), (0, 0), (1, 0), (0, 0), (0, 1), (1, 1), (1, 0), (1, 1), (1, 2), (0, 2), (1, 2), (1, 1), (1, 2), (1, 1), (2, 1), (3, 1), (4, 1), (4, 0), (4, 1), (4, 2), (4, 1), (4, 0), (4, 1), (3, 1), (4, 1), (4, 0), (4, 1), (4, 0), (4, 1), (4, 0), (4, 1), (4, 2), (4, 1), (4, 0), (4, 1), (4, 2), (4, 3), (4, 4)], total reward -60.0
Episode 2, the optimal route is [(1, 0), (1, 1), (1, 0), (0, 0), (0, 1), (1, 1), (2, 1), (3, 1), (3, 2), (3, 3), (2, 3), (3, 3), (3, 4), (4, 4)], total reward -20.0
Episode 3, the optimal route is [(0, 1), (0, 0), (1, 0), (2, 0), (2, 1), (3, 1), (2, 1), (2, 2), (2, 3), (1, 3), (2, 3), (2, 4), (1, 4), (1, 3), (2, 3), (3, 3), (3, 4), (4, 4)], total reward -10.0
Episode 7, the optimal route is [(0, 1), (0, 2), (0, 3), (0, 2), (1, 2), (0, 2), (1, 2), (0, 2), (1, 2), (1, 3), (2, 3), (3, 3), (4, 3), (3, 3), (3, 4), (3, 3), (3, 4), (3, 3), (3, 4), (4, 4)], total reward 0.0
Episode 29, the optimal route is [(0, 1), (0, 2), (1, 2), (1, 3), (1, 4), (0, 4), (1, 4), (1, 3), (2, 3), (3, 3), (3, 4), (4, 4)], total reward 10.0