先来回忆一下几个变量的定义,Policy Gradient的关键是通过Gradient来更新Policy

$$\theta_{k+1} = \theta_{k} + a \nabla _{\theta}J(\pi_{\theta})|_{\theta_k}$$

其中$\pi_{\theta}$是参数话的policy,$\theta$是它的系数,$J(\pi_{\theta})$用来衡量当前policy $\pi_{\theta}$的性能,咱们这里用$\pi_{\theta}$的期望收益$E_{\tau \sim \pi_{\theta}}[R(\tau)]$作为policy的性能,$R(\tau)$表示一局游戏的收益,$\tau \sim \pi_{\theta}$表示是在当前policy $\pi_{\theta}$下。

$\nabla _{\theta}J(\pi_{\theta})$等于下面这一串

$$\nabla _{\theta}J(\pi_{\theta}) = E_{\tau \sim \pi_{\theta}} \left [ \sum_{t=0}^{T} \nabla_{\theta} \log \pi_{\theta} (a_t | s_t)R(\tau)\right ]$$

上一篇文章介绍了怎么得到这个公式的,实际上并不影响我们的实现。

游戏环境我们使用最经典的推小车

车子上的杆儿会围绕蓝色点自由转动,到达一定角度游戏结束,我们控制车子在水平线上左右移动来使杆儿平衡,大概就像杂技演员拿一根竹竿放在食指上让竹竿一直不掉地上。 

代码是基于OpenAI的Spinning Up实现的,所以如果你有兴趣可以直接看原文:https://spinningup.openai.com/en/latest/spinningup/rl_intro3.html

先创建环境:

import gym
env = gym.make('CartPole-v1')

游戏环境的的observation由4维数组表示,至于每维表示什么意思并不重要,可以采取的动作只有向左、向右所以动作是2维的。policy是一个简单的神经网络,把它定义成如下形式

import torch
import torch.nn as nn
class PolicyNet(nn.Module):
    def __init__(self, obs_space, act_space):
        super().__init__()
        self.linear1 = nn.Linear(obs_space, 32)
        self.activate = nn.Tanh()
        self.linear2 = nn.Linear(32, act_space)
    
    def forward(self, x):
        x = self.linear1(x)
        x = self.activate(x)
        x = self.linear2(x)
        return x

网络的输出没有接softmax,后面在构造分布的时候要注意下。实例化时传入两个参数,第一个参数是observation的维度,第二个参数是action的维度,当前环境分别是4和2:

policy_net = PolicyNet(obs_dim, n_act)

有了这个policy_net我们就可以根据当前的obs取样一个动作:

def get_action(obs):
    logits_output = policy_net(obs)
    act_distribution = Categorical(logits=logits_output)
    return act_distribution.sample().item()

接下来看gradient的计算:

$$\nabla _{\theta}J(\pi_{\theta}) = E_{\tau \sim \pi_{\theta}} \left [ \sum_{t=0}^{T} \nabla_{\theta} \log \pi_{\theta} (a_t | s_t)R(\tau)\right ]$$

一种直觉的理解,再某个状态$s_t$下执行某个动作$a_t$,如果这个动作可以呆了比较高的收益我们就应该提升它(加大该状态执行该动作的概率)这样下次取样就更有可能取到这个动作了。如何来评价$s_t$下执行动作$a_t$的收益好坏呢?我们可以用本局游戏的总收益来衡量,可能不好,但是能用。举个例子,如果我们计算$s_0$到$s_t$中间有多少步来作为这局游戏的收益,使用$\pi_{\theta}$执玩了一局可能是这样的

$$s_0, a_0^1, s_1^1,a_1^1,s_2^1,a_2^1,s_3^1,a_3^1,s_4^1,a_4^1,s_T$$

于是这一局的收益是5,上标1表示这是第一局的状态转移和动作选择,还是用这个policy第二局是这样的

$$s_0, a_0^2, s_1^2,a_1^2,s_2^2,a_2^2,s_3^2,a_3^2,s_4^2,a_4^2,s_5^2,a_5^2,s_6^2,a_6^2,s_7^2,a_7^2,s_T$$

那么第二局的收益是8 ,所以第二局打的比较好,应该强化第二局中每个状态下执行的那个动作的概率,计算梯度的时候第二局写成

$$\nabla _{\theta} \log \pi_{\theta}(s_0|a_0^2) \times 8 + \nabla _{\theta} \log \pi_{\theta}(s_1|a_1^2) \times 8 + \nabla _{\theta} \log \pi_{\theta}(s_2|a_2^2) \times 8 …$$

而第一局是

$$\nabla _{\theta} \log \pi_{\theta}(s_0|a_0^1) \times 5 + \nabla _{\theta} \log \pi_{\theta}(s_1|a_1^1) \times 5 + \nabla _{\theta} \log \pi_{\theta}(s_2|a_2^1) \times 5 …$$

显然第二局乘以8的那部分在梯度更新中更有优势,于是第二局执行的那些动作都得到了强化,当然这个只是能用而已还不够好,最理想的肯定是$Q_{\pi}(s,a)$但是这个不好算啊,先将就着用吧~既然我们达成共识,就看在pytorch中怎么实现上述逻辑了。

公式给出的gradient我们不用它,因为PyTorch这一类的计算图框架的编程逻辑是我们提供objective function它给自动算梯度,于是我们个gradient该写成objective形式

$$J(\pi_{\theta}) = E_{\tau \sim \pi_{\theta}} [\log \pi_{\theta} (a_t|s_t) R(\tau)]$$

咱不关心这个objective到底是啥意思,它只是用来在PyTorch中计算梯度来反向传播用的

def compute_loss(obs, act, weights):
    logits_output = policy_net(obs)
    actions_dist = Categorical(logits=logits_output)
    logp = actions_dist.log_prob(act)
    return -1 * (logp * weights).mean()

接收三个参数,第一个是当前$\pi_{\theta}$玩的那好多好多局游戏的所有observation,第二个是这些observation出现时所采取的动作,第三个是对这个动作的评价,也就是所在的那局游戏的收益。还是上面的例子,输入的内容就是:

$$obs = s_0,s_1^1,s_2^1,s_3^1,s_4^1,s_0,s_1^2,s_2^2,s_3^2,s_4^2,s_5^2,s_6^2,s_7^2$$

$$act=a_0^1,a_1^1,a_2^1,a_3^1,a_4^1,a_0^2,a_1^2,a_2^2,a_3^2,a_4^2,a_5^2,a_6^2,a_7^2$$

$$weights=5,5,5,5,5,8,8,8,8,8,8,8,8$$

结束状态$s_T$用不上,逐行解释下

  1. 首先使用policy_net获取每个observation下两个动作的logits(经过softmax后得到概率)
  2. 定义Categorical分布,这里要指定参数logits,默认参数接收的是概率
  3. 获得当时游戏时动作的log probability
  4. 最后计算均值的时候给加了一个负号,这是因为pytorch的优化器默认是用来做gradient descent的。

接下来的工作就是让工程循环起来,迭代更新policy。大致过程就是,使用当前的policy ,玩很多很多局游戏,这里是通过step来限制的玩多久,比如玩5000步,游戏过程中采用的act由$\pi_{\theta}$决定,记录下5000步的[observation, action, weight] 交给compute_loss()函数计算出loss,使用pytorch提供的优化器来更新policy。每更新一次policy就得重新取样5000步,这个大概是要强调一下的。这个取样的步数不能太少,要不然很多observation和action都没有取样到。我把代码贴出来,Spinning Up已经给代码增加了比较详细的注释,新版本的gym会有适配问题,这里修改了下。

# for training policy
def train_one_epoch():
    # make some empty lists for logging.
    batch_obs = []          # for observations
    batch_acts = []         # for actions
    batch_weights = []      # for R(tau) weighting in policy gradient
    batch_rets = []         # for measuring episode returns
    batch_lens = []         # for measuring episode lengths

    # reset episode-specific variables
    obs, info = env.reset()       # first obs comes from starting distribution
    done = False            # signal from environment that episode is over
    # 用来装填每一步的reward,一局游戏打完之后sum一就是这局游戏的收益(上面的例子5和8)
    ep_rews = []            # list for rewards accrued throughout ep 

    # render first episode of each epoch
    finished_rendering_this_epoch = False

    # collect experience by acting in the environment with current policy
    # 取样循环
    while True:

        # rendering
        if (not finished_rendering_this_epoch) and render:
            env.render()

        # save obs
        batch_obs.append(obs.copy())

        # act in the environment
        act = get_action(torch.as_tensor(obs, dtype=torch.float32))
        # old version
        # obs, rew, done, _ = env.step(act)
  # version 0.26.2
  obs, reward, done, truncated, info = env.step(act)

        # save action, reward
        batch_acts.append(act)
        ep_rews.append(rew)

        if done:
            # if episode is over, record info about episode
            # 本局游戏已经打完了, 开始统计效果
            ep_ret, ep_len = sum(ep_rews), len(ep_rews)
            batch_rets.append(ep_ret)
            batch_lens.append(ep_len)

            # the weight for each logprob(a|s) is R(tau)
            batch_weights += [ep_ret] * ep_len

            # reset episode-specific variables
            obs, done, ep_rews = env.reset(), False, []

            # won't render again this epoch
            finished_rendering_this_epoch = True

            # end experience loop if we have enough of it
            # 如果已经到取样的step数量,退出取样循环,更新policy
            if len(batch_obs) > batch_size:
                break

    # take a single policy gradient update step
    # 更新policy
    optimizer.zero_grad()
    batch_loss = compute_loss(obs=torch.as_tensor(batch_obs, dtype=torch.float32),
                              act=torch.as_tensor(batch_acts, dtype=torch.int32),
                              weights=torch.as_tensor(batch_weights, dtype=torch.float32)
                              )
    batch_loss.backward()
    optimizer.step()
    return batch_loss, batch_rets, batch_lens

以上就是最最单纯的policy gradient的实现,上面贴出来的网页中还告诉我们一些tricks,有兴趣可以自己看下。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注