环境

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

简介

强化学习 (RL) 的目标是设计通过与环境交互来学习的代理。在标准 RL 设置中,代理在每个时间步接收一个观察结果并选择一个动作。该动作应用于环境,环境返回奖励和新的观察结果。代理训练一个策略来选择动作以最大化奖励总和,也称为回报。

在 TF-Agents 中,环境可以在 Python 或 TensorFlow 中实现。Python 环境通常更容易实现、理解和调试,但 TensorFlow 环境效率更高,并允许自然并行化。最常见的工作流程是在 Python 中实现环境,并使用我们的包装器之一将其自动转换为 TensorFlow。

让我们先看看 Python 环境。TensorFlow 环境遵循非常相似的 API。

设置

如果您尚未安装 tf-agents 或 gym,请运行

pip install tf-agents[reverb]
pip install tf-keras
import os
# Keep using keras-2 (tf-keras) rather than keras-3 (keras).
os.environ['TF_USE_LEGACY_KERAS'] = '1'
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import abc
import tensorflow as tf
import numpy as np

from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.environments import utils
from tf_agents.specs import array_spec
from tf_agents.environments import wrappers
from tf_agents.environments import suite_gym
from tf_agents.trajectories import time_step as ts
2023-12-22 12:20:01.730535: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-12-22 12:20:01.730578: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-12-22 12:20:01.732199: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered

Python 环境

Python 环境具有 step(action) -> next_time_step 方法,该方法将动作应用于环境,并返回有关下一步的以下信息

  1. observation:这是环境状态中代理可以观察到的部分,以便在下一步选择其动作。
  2. reward:代理正在学习最大化这些奖励在多个步骤中的总和。
  3. step_type:与环境的交互通常是序列/事件的一部分。例如,国际象棋游戏中的一系列动作。step_type 可以是 FIRSTMIDLAST,以指示此时间步是序列中的第一个、中间还是最后一个步骤。
  4. discount:这是一个浮点数,表示相对于当前时间步的奖励,如何对下一步的奖励进行加权。

这些被分组到一个名为元组的 TimeStep(step_type, reward, discount, observation) 中。

所有 Python 环境必须实现的接口位于 environments/py_environment.PyEnvironment 中。主要方法是

class PyEnvironment(object):

  def reset(self):
    """Return initial_time_step."""
    self._current_time_step = self._reset()
    return self._current_time_step

  def step(self, action):
    """Apply action and return new time_step."""
    if self._current_time_step is None:
        return self.reset()
    self._current_time_step = self._step(action)
    return self._current_time_step

  def current_time_step(self):
    return self._current_time_step

  def time_step_spec(self):
    """Return time_step_spec."""

  @abc.abstractmethod
  def observation_spec(self):
    """Return observation_spec."""

  @abc.abstractmethod
  def action_spec(self):
    """Return action_spec."""

  @abc.abstractmethod
  def _reset(self):
    """Return initial_time_step."""

  @abc.abstractmethod
  def _step(self, action):
    """Apply action and return new time_step."""

除了 step() 方法外,环境还提供了一个 reset() 方法,该方法启动一个新序列并提供一个初始 TimeStep。没有必要显式调用 reset 方法。我们假设环境会自动重置,无论是在它们到达事件结束时还是在第一次调用 step() 时。

请注意,子类不会直接实现 step()reset()。相反,它们会覆盖 _step()_reset() 方法。从这些方法返回的时间步将被缓存并通过 current_time_step() 公开。

observation_specaction_spec 方法返回一个 (Bounded)ArraySpecs 的嵌套,它分别描述了观察结果和动作的名称、形状、数据类型和范围。

在 TF-Agents 中,我们反复提到嵌套,它被定义为任何由列表、元组、命名元组或字典组成的树状结构。这些可以任意组合以保持观察结果和动作的结构。我们发现这对具有许多观察结果和动作的更复杂的环境非常有用。

使用标准环境

TF Agents 为许多标准环境(如 OpenAI Gym、DeepMind-control 和 Atari)内置了包装器,以便它们遵循我们的 py_environment.PyEnvironment 接口。这些包装后的环境可以使用我们的环境套件轻松加载。让我们从 OpenAI gym 加载 CartPole 环境,并查看动作和时间步规格。

environment = suite_gym.load('CartPole-v0')
print('action_spec:', environment.action_spec())
print('time_step_spec.observation:', environment.time_step_spec().observation)
print('time_step_spec.step_type:', environment.time_step_spec().step_type)
print('time_step_spec.discount:', environment.time_step_spec().discount)
print('time_step_spec.reward:', environment.time_step_spec().reward)
action_spec: BoundedArraySpec(shape=(), dtype=dtype('int64'), name='action', minimum=0, maximum=1)
time_step_spec.observation: BoundedArraySpec(shape=(4,), dtype=dtype('float32'), name='observation', minimum=[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], maximum=[4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38])
time_step_spec.step_type: ArraySpec(shape=(), dtype=dtype('int32'), name='step_type')
time_step_spec.discount: BoundedArraySpec(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0)
time_step_spec.reward: ArraySpec(shape=(), dtype=dtype('float32'), name='reward')

因此,我们看到环境期望类型为 int64 的动作在 [0, 1] 中,并返回 TimeSteps,其中观察结果是长度为 4 的 float32 向量,折扣因子是 float32 在 [0.0, 1.0] 中。现在,让我们尝试对整个事件采取固定动作 (1,)

action = np.array(1, dtype=np.int32)
time_step = environment.reset()
print(time_step)
while not time_step.is_last():
  time_step = environment.step(action)
  print(time_step)
TimeStep(
{'step_type': array(0, dtype=int32),
 'reward': array(0., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([-0.02944653,  0.04422915,  0.03086922, -0.04273267], dtype=float32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(1., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([-0.02856195,  0.23889515,  0.03001456, -0.32551846], dtype=float32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(1., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([-0.02378405,  0.43357718,  0.02350419, -0.608587  ], dtype=float32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(1., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([-0.01511251,  0.6283628 ,  0.01133245, -0.89377517], dtype=float32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(1., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([-0.00254525,  0.8233292 , -0.00654305, -1.1828743 ], dtype=float32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(1., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([ 0.01392134,  1.0185355 , -0.03020054, -1.477601  ], dtype=float32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(1., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([ 0.03429205,  1.214013  , -0.05975256, -1.7795612 ], dtype=float32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(1., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([ 0.0585723 ,  1.4097542 , -0.09534378, -2.090206  ], dtype=float32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(1., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([ 0.08676739,  1.6056995 , -0.1371479 , -2.4107776 ], dtype=float32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(1., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([ 0.11888137,  1.8017205 , -0.18536346, -2.7422433 ], dtype=float32)})
TimeStep(
{'step_type': array(2, dtype=int32),
 'reward': array(1., dtype=float32),
 'discount': array(0., dtype=float32),
 'observation': array([ 0.1549158 ,  1.9976014 , -0.24020831, -3.0852165 ], dtype=float32)})

创建您自己的 Python 环境

对于许多客户来说,一个常见的用例是在 TF-Agents 中将标准代理之一(参见 agents/)应用到他们的问题中。为此,他们必须将他们的问题构建成一个环境。所以让我们看看如何在 Python 中实现一个环境。

假设我们想要训练一个代理来玩以下(受二十一点启发的)纸牌游戏

  1. 游戏使用一个无限的牌组进行,牌组中包含数字 1...10 的牌。
  2. 在每一轮中,代理可以做两件事:获得一张新的随机牌,或者停止当前回合。
  3. 目标是在回合结束时,使你的牌的总和尽可能接近 21,但不能超过 21。

一个代表游戏的环境可能看起来像这样

  1. 动作:我们有两个动作。动作 0:获得一张新牌,动作 1:结束当前回合。
  2. 观察:当前回合中牌的总和。
  3. 奖励:目标是尽可能接近 21,但不能超过 21,因此我们可以在回合结束时使用以下奖励来实现:如果 sum_of_cards <= 21,则为 sum_of_cards - 21,否则为 -21。
class CardGameEnv(py_environment.PyEnvironment):

  def __init__(self):
    self._action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=1, name='action')
    self._observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=0, name='observation')
    self._state = 0
    self._episode_ended = False

  def action_spec(self):
    return self._action_spec

  def observation_spec(self):
    return self._observation_spec

  def _reset(self):
    self._state = 0
    self._episode_ended = False
    return ts.restart(np.array([self._state], dtype=np.int32))

  def _step(self, action):

    if self._episode_ended:
      # The last action ended the episode. Ignore the current action and start
      # a new episode.
      return self.reset()

    # Make sure episodes don't go on forever.
    if action == 1:
      self._episode_ended = True
    elif action == 0:
      new_card = np.random.randint(1, 11)
      self._state += new_card
    else:
      raise ValueError('`action` should be 0 or 1.')

    if self._episode_ended or self._state >= 21:
      reward = self._state - 21 if self._state <= 21 else -21
      return ts.termination(np.array([self._state], dtype=np.int32), reward)
    else:
      return ts.transition(
          np.array([self._state], dtype=np.int32), reward=0.0, discount=1.0)

让我们确保我们在定义上述环境时做的一切都正确。在创建自己的环境时,你必须确保生成的观察和时间步长遵循规范中定义的正确形状和类型。这些用于生成 TensorFlow 图,因此如果我们弄错了,可能会导致难以调试的问题。

为了验证我们的环境,我们将使用一个随机策略来生成动作,并将迭代 5 个回合以确保一切按预期工作。如果我们收到不符合环境规范的时间步长,则会引发错误。

environment = CardGameEnv()
utils.validate_py_environment(environment, episodes=5)

现在我们知道环境按预期工作,让我们使用一个固定策略运行此环境:请求 3 张牌,然后结束回合。

get_new_card_action = np.array(0, dtype=np.int32)
end_round_action = np.array(1, dtype=np.int32)

environment = CardGameEnv()
time_step = environment.reset()
print(time_step)
cumulative_reward = time_step.reward

for _ in range(3):
  time_step = environment.step(get_new_card_action)
  print(time_step)
  cumulative_reward += time_step.reward

time_step = environment.step(end_round_action)
print(time_step)
cumulative_reward += time_step.reward
print('Final Reward = ', cumulative_reward)
TimeStep(
{'step_type': array(0, dtype=int32),
 'reward': array(0., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([0], dtype=int32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(0., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([1], dtype=int32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(0., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([2], dtype=int32)})
TimeStep(
{'step_type': array(1, dtype=int32),
 'reward': array(0., dtype=float32),
 'discount': array(1., dtype=float32),
 'observation': array([4], dtype=int32)})
TimeStep(
{'step_type': array(2, dtype=int32),
 'reward': array(-17., dtype=float32),
 'discount': array(0., dtype=float32),
 'observation': array([4], dtype=int32)})
Final Reward =  -17.0

环境包装器

环境包装器接受一个 Python 环境并返回一个修改后的环境版本。原始环境和修改后的环境都是 py_environment.PyEnvironment 的实例,并且可以将多个包装器链接在一起。

一些常见的包装器可以在 environments/wrappers.py 中找到。例如

  1. ActionDiscretizeWrapper:将连续动作空间转换为离散动作空间。
  2. RunStats:捕获环境的运行统计信息,例如执行的步数、完成的回合数等。
  3. TimeLimit:在固定步数后终止回合。

示例 1:动作离散化包装器

InvertedPendulum 是一个 PyBullet 环境,它接受范围在 [-2, 2] 内的连续动作。如果我们想在该环境上训练一个离散动作代理,例如 DQN,我们必须离散化(量化)动作空间。这正是 ActionDiscretizeWrapper 所做的。比较包装前后 action_spec

env = suite_gym.load('Pendulum-v1')
print('Action Spec:', env.action_spec())

discrete_action_env = wrappers.ActionDiscretizeWrapper(env, num_actions=5)
print('Discretized Action Spec:', discrete_action_env.action_spec())
Action Spec: BoundedArraySpec(shape=(1,), dtype=dtype('float32'), name='action', minimum=-2.0, maximum=2.0)
Discretized Action Spec: BoundedArraySpec(shape=(), dtype=dtype('int32'), name='action', minimum=0, maximum=4)

包装后的 discrete_action_envpy_environment.PyEnvironment 的实例,可以像常规 Python 环境一样对待。

TensorFlow 环境

TF 环境的接口在 environments/tf_environment.TFEnvironment 中定义,看起来与 Python 环境非常相似。TF 环境与 Python 环境在几个方面有所不同

  • 它们生成张量对象而不是数组
  • 与规范相比,TF 环境在生成的张量中添加了一个批次维度。

将 Python 环境转换为 TFEnv 允许 TensorFlow 并行化操作。例如,可以定义一个 collect_experience_op,它从环境中收集数据并添加到 replay_buffer 中,以及一个 train_op,它从 replay_buffer 中读取数据并训练代理,并在 TensorFlow 中自然地并行运行它们。

class TFEnvironment(object):

  def time_step_spec(self):
    """Describes the `TimeStep` tensors returned by `step()`."""

  def observation_spec(self):
    """Defines the `TensorSpec` of observations provided by the environment."""

  def action_spec(self):
    """Describes the TensorSpecs of the action expected by `step(action)`."""

  def reset(self):
    """Returns the current `TimeStep` after resetting the Environment."""
    return self._reset()

  def current_time_step(self):
    """Returns the current `TimeStep`."""
    return self._current_time_step()

  def step(self, action):
    """Applies the action and returns the new `TimeStep`."""
    return self._step(action)

  @abc.abstractmethod
  def _reset(self):
    """Returns the current `TimeStep` after resetting the Environment."""

  @abc.abstractmethod
  def _current_time_step(self):
    """Returns the current `TimeStep`."""

  @abc.abstractmethod
  def _step(self, action):
    """Applies the action and returns the new `TimeStep`."""

current_time_step() 方法返回当前时间步长,并在需要时初始化环境。

reset() 方法强制在环境中重置并返回当前步长。

如果 action 不依赖于先前 time_step,则在 Graph 模式下需要 tf.control_dependency

现在,让我们看看如何创建 TFEnvironments

创建你自己的 TensorFlow 环境

这比在 Python 中创建环境更复杂,因此我们不会在本 colab 中介绍。一个例子可以在 这里找到。更常见的用例是在 Python 中实现你的环境,并使用我们的 TFPyEnvironment 包装器(见下文)将其包装在 TensorFlow 中。

将 Python 环境包装在 TensorFlow 中

我们可以使用 TFPyEnvironment 包装器轻松地将任何 Python 环境包装成 TensorFlow 环境。

env = suite_gym.load('CartPole-v0')
tf_env = tf_py_environment.TFPyEnvironment(env)

print(isinstance(tf_env, tf_environment.TFEnvironment))
print("TimeStep Specs:", tf_env.time_step_spec())
print("Action Specs:", tf_env.action_spec())
True
TimeStep Specs: TimeStep(
{'step_type': TensorSpec(shape=(), dtype=tf.int32, name='step_type'),
 'reward': TensorSpec(shape=(), dtype=tf.float32, name='reward'),
 'discount': BoundedTensorSpec(shape=(), dtype=tf.float32, name='discount', minimum=array(0., dtype=float32), maximum=array(1., dtype=float32)),
 'observation': BoundedTensorSpec(shape=(4,), dtype=tf.float32, name='observation', minimum=array([-4.8000002e+00, -3.4028235e+38, -4.1887903e-01, -3.4028235e+38],
      dtype=float32), maximum=array([4.8000002e+00, 3.4028235e+38, 4.1887903e-01, 3.4028235e+38],
      dtype=float32))})
Action Specs: BoundedTensorSpec(shape=(), dtype=tf.int64, name='action', minimum=array(0), maximum=array(1))

请注意,规范现在是类型:(Bounded)TensorSpec

使用示例

简单示例

env = suite_gym.load('CartPole-v0')

tf_env = tf_py_environment.TFPyEnvironment(env)
# reset() creates the initial time_step after resetting the environment.
time_step = tf_env.reset()
num_steps = 3
transitions = []
reward = 0
for i in range(num_steps):
  action = tf.constant([i % 2])
  # applies the action and returns the new TimeStep.
  next_time_step = tf_env.step(action)
  transitions.append([time_step, action, next_time_step])
  reward += next_time_step.reward
  time_step = next_time_step

np_transitions = tf.nest.map_structure(lambda x: x.numpy(), transitions)
print('\n'.join(map(str, np_transitions)))
print('Total reward:', reward.numpy())
[TimeStep(
{'step_type': array([0], dtype=int32),
 'reward': array([0.], dtype=float32),
 'discount': array([1.], dtype=float32),
 'observation': array([[-0.00848238, -0.0419628 ,  0.02369678, -0.03962697]],
      dtype=float32)}), array([0], dtype=int32), TimeStep(
{'step_type': array([1], dtype=int32),
 'reward': array([1.], dtype=float32),
 'discount': array([1.], dtype=float32),
 'observation': array([[-0.00932164, -0.2374164 ,  0.02290425,  0.26043734]],
      dtype=float32)})]
[TimeStep(
{'step_type': array([1], dtype=int32),
 'reward': array([1.], dtype=float32),
 'discount': array([1.], dtype=float32),
 'observation': array([[-0.00932164, -0.2374164 ,  0.02290425,  0.26043734]],
      dtype=float32)}), array([1], dtype=int32), TimeStep(
{'step_type': array([1], dtype=int32),
 'reward': array([1.], dtype=float32),
 'discount': array([1.], dtype=float32),
 'observation': array([[-0.01406997, -0.04262878,  0.02811299, -0.02493422]],
      dtype=float32)})]
[TimeStep(
{'step_type': array([1], dtype=int32),
 'reward': array([1.], dtype=float32),
 'discount': array([1.], dtype=float32),
 'observation': array([[-0.01406997, -0.04262878,  0.02811299, -0.02493422]],
      dtype=float32)}), array([0], dtype=int32), TimeStep(
{'step_type': array([1], dtype=int32),
 'reward': array([1.], dtype=float32),
 'discount': array([1.], dtype=float32),
 'observation': array([[-0.01492254, -0.23814237,  0.02761431,  0.27648443]],
      dtype=float32)})]
Total reward: [3.]

整个回合

env = suite_gym.load('CartPole-v0')
tf_env = tf_py_environment.TFPyEnvironment(env)

time_step = tf_env.reset()
rewards = []
steps = []
num_episodes = 5

for _ in range(num_episodes):
  episode_reward = 0
  episode_steps = 0
  while not time_step.is_last():
    action = tf.random.uniform([1], 0, 2, dtype=tf.int32)
    time_step = tf_env.step(action)
    episode_steps += 1
    episode_reward += time_step.reward.numpy()
  rewards.append(episode_reward)
  steps.append(episode_steps)
  time_step = tf_env.reset()

num_steps = np.sum(steps)
avg_length = np.mean(steps)
avg_reward = np.mean(rewards)

print('num_episodes:', num_episodes, 'num_steps:', num_steps)
print('avg_length', avg_length, 'avg_reward:', avg_reward)
num_episodes: 5 num_steps: 124
avg_length 24.8 avg_reward: 24.8