先来回忆一下几个变量的定义,Policy Gradient的关键是通过Gradient来更新Policy
$$\theta_{k+1} = \theta_{k} + a \nabla _{\theta}J(\pi_{\theta})|_{\theta_k}$$
其中$\pi_{\theta}$是参数话的policy,$\theta$是它的系数,$J(\pi_{\theta})$用来衡量当前policy $\pi_{\theta}$的性能,咱们这里用$\pi_{\theta}$的期望收益$E_{\tau \sim \pi_{\theta}}[R(\tau)]$作为policy的性能,$R(\tau)$表示一局游戏的收益,$\tau \sim \pi_{\theta}$表示是在当前policy $\pi_{\theta}$下。
$\nabla _{\theta}J(\pi_{\theta})$等于下面这一串
$$\nabla _{\theta}J(\pi_{\theta}) = E_{\tau \sim \pi_{\theta}} \left [ \sum_{t=0}^{T} \nabla_{\theta} \log \pi_{\theta} (a_t | s_t)R(\tau)\right ]$$
上一篇文章介绍了怎么得到这个公式的,实际上并不影响我们的实现。
游戏环境我们使用最经典的推小车

车子上的杆儿会围绕蓝色点自由转动,到达一定角度游戏结束,我们控制车子在水平线上左右移动来使杆儿平衡,大概就像杂技演员拿一根竹竿放在食指上让竹竿一直不掉地上。

代码是基于OpenAI的Spinning Up实现的,所以如果你有兴趣可以直接看原文:https://spinningup.openai.com/en/latest/spinningup/rl_intro3.html
先创建环境:
import gym
env = gym.make('CartPole-v1')
游戏环境的的observation由4维数组表示,至于每维表示什么意思并不重要,可以采取的动作只有向左、向右所以动作是2维的。policy是一个简单的神经网络,把它定义成如下形式
import torch
import torch.nn as nn
class PolicyNet(nn.Module):
def __init__(self, obs_space, act_space):
super().__init__()
self.linear1 = nn.Linear(obs_space, 32)
self.activate = nn.Tanh()
self.linear2 = nn.Linear(32, act_space)
def forward(self, x):
x = self.linear1(x)
x = self.activate(x)
x = self.linear2(x)
return x
网络的输出没有接softmax,后面在构造分布的时候要注意下。实例化时传入两个参数,第一个参数是observation的维度,第二个参数是action的维度,当前环境分别是4和2:
policy_net = PolicyNet(obs_dim, n_act)
有了这个policy_net我们就可以根据当前的obs取样一个动作:
def get_action(obs):
logits_output = policy_net(obs)
act_distribution = Categorical(logits=logits_output)
return act_distribution.sample().item()
接下来看gradient的计算:
$$\nabla _{\theta}J(\pi_{\theta}) = E_{\tau \sim \pi_{\theta}} \left [ \sum_{t=0}^{T} \nabla_{\theta} \log \pi_{\theta} (a_t | s_t)R(\tau)\right ]$$
一种直觉的理解,再某个状态$s_t$下执行某个动作$a_t$,如果这个动作可以呆了比较高的收益我们就应该提升它(加大该状态执行该动作的概率)这样下次取样就更有可能取到这个动作了。如何来评价$s_t$下执行动作$a_t$的收益好坏呢?我们可以用本局游戏的总收益来衡量,可能不好,但是能用。举个例子,如果我们计算$s_0$到$s_t$中间有多少步来作为这局游戏的收益,使用$\pi_{\theta}$执玩了一局可能是这样的
$$s_0, a_0^1, s_1^1,a_1^1,s_2^1,a_2^1,s_3^1,a_3^1,s_4^1,a_4^1,s_T$$
于是这一局的收益是5,上标1表示这是第一局的状态转移和动作选择,还是用这个policy第二局是这样的
$$s_0, a_0^2, s_1^2,a_1^2,s_2^2,a_2^2,s_3^2,a_3^2,s_4^2,a_4^2,s_5^2,a_5^2,s_6^2,a_6^2,s_7^2,a_7^2,s_T$$
那么第二局的收益是8 ,所以第二局打的比较好,应该强化第二局中每个状态下执行的那个动作的概率,计算梯度的时候第二局写成
$$\nabla _{\theta} \log \pi_{\theta}(s_0|a_0^2) \times 8 + \nabla _{\theta} \log \pi_{\theta}(s_1|a_1^2) \times 8 + \nabla _{\theta} \log \pi_{\theta}(s_2|a_2^2) \times 8 …$$
而第一局是
$$\nabla _{\theta} \log \pi_{\theta}(s_0|a_0^1) \times 5 + \nabla _{\theta} \log \pi_{\theta}(s_1|a_1^1) \times 5 + \nabla _{\theta} \log \pi_{\theta}(s_2|a_2^1) \times 5 …$$
显然第二局乘以8的那部分在梯度更新中更有优势,于是第二局执行的那些动作都得到了强化,当然这个只是能用而已还不够好,最理想的肯定是$Q_{\pi}(s,a)$但是这个不好算啊,先将就着用吧~既然我们达成共识,就看在pytorch中怎么实现上述逻辑了。
公式给出的gradient我们不用它,因为PyTorch这一类的计算图框架的编程逻辑是我们提供objective function它给自动算梯度,于是我们个gradient该写成objective形式
$$J(\pi_{\theta}) = E_{\tau \sim \pi_{\theta}} [\log \pi_{\theta} (a_t|s_t) R(\tau)]$$
咱不关心这个objective到底是啥意思,它只是用来在PyTorch中计算梯度来反向传播用的
def compute_loss(obs, act, weights):
logits_output = policy_net(obs)
actions_dist = Categorical(logits=logits_output)
logp = actions_dist.log_prob(act)
return -1 * (logp * weights).mean()
接收三个参数,第一个是当前$\pi_{\theta}$玩的那好多好多局游戏的所有observation,第二个是这些observation出现时所采取的动作,第三个是对这个动作的评价,也就是所在的那局游戏的收益。还是上面的例子,输入的内容就是:
$$obs = s_0,s_1^1,s_2^1,s_3^1,s_4^1,s_0,s_1^2,s_2^2,s_3^2,s_4^2,s_5^2,s_6^2,s_7^2$$
$$act=a_0^1,a_1^1,a_2^1,a_3^1,a_4^1,a_0^2,a_1^2,a_2^2,a_3^2,a_4^2,a_5^2,a_6^2,a_7^2$$
$$weights=5,5,5,5,5,8,8,8,8,8,8,8,8$$
结束状态$s_T$用不上,逐行解释下
- 首先使用policy_net获取每个observation下两个动作的logits(经过softmax后得到概率)
- 定义Categorical分布,这里要指定参数logits,默认参数接收的是概率
- 获得当时游戏时动作的log probability
- 最后计算均值的时候给加了一个负号,这是因为pytorch的优化器默认是用来做gradient descent的。
接下来的工作就是让工程循环起来,迭代更新policy。大致过程就是,使用当前的policy ,玩很多很多局游戏,这里是通过step来限制的玩多久,比如玩5000步,游戏过程中采用的act由$\pi_{\theta}$决定,记录下5000步的[observation, action, weight] 交给compute_loss()函数计算出loss,使用pytorch提供的优化器来更新policy。每更新一次policy就得重新取样5000步,这个大概是要强调一下的。这个取样的步数不能太少,要不然很多observation和action都没有取样到。我把代码贴出来,Spinning Up已经给代码增加了比较详细的注释,新版本的gym会有适配问题,这里修改了下。
# for training policy
def train_one_epoch():
# make some empty lists for logging.
batch_obs = [] # for observations
batch_acts = [] # for actions
batch_weights = [] # for R(tau) weighting in policy gradient
batch_rets = [] # for measuring episode returns
batch_lens = [] # for measuring episode lengths
# reset episode-specific variables
obs, info = env.reset() # first obs comes from starting distribution
done = False # signal from environment that episode is over
# 用来装填每一步的reward,一局游戏打完之后sum一就是这局游戏的收益(上面的例子5和8)
ep_rews = [] # list for rewards accrued throughout ep
# render first episode of each epoch
finished_rendering_this_epoch = False
# collect experience by acting in the environment with current policy
# 取样循环
while True:
# rendering
if (not finished_rendering_this_epoch) and render:
env.render()
# save obs
batch_obs.append(obs.copy())
# act in the environment
act = get_action(torch.as_tensor(obs, dtype=torch.float32))
# old version
# obs, rew, done, _ = env.step(act)
# version 0.26.2
obs, reward, done, truncated, info = env.step(act)
# save action, reward
batch_acts.append(act)
ep_rews.append(rew)
if done:
# if episode is over, record info about episode
# 本局游戏已经打完了, 开始统计效果
ep_ret, ep_len = sum(ep_rews), len(ep_rews)
batch_rets.append(ep_ret)
batch_lens.append(ep_len)
# the weight for each logprob(a|s) is R(tau)
batch_weights += [ep_ret] * ep_len
# reset episode-specific variables
obs, done, ep_rews = env.reset(), False, []
# won't render again this epoch
finished_rendering_this_epoch = True
# end experience loop if we have enough of it
# 如果已经到取样的step数量,退出取样循环,更新policy
if len(batch_obs) > batch_size:
break
# take a single policy gradient update step
# 更新policy
optimizer.zero_grad()
batch_loss = compute_loss(obs=torch.as_tensor(batch_obs, dtype=torch.float32),
act=torch.as_tensor(batch_acts, dtype=torch.int32),
weights=torch.as_tensor(batch_weights, dtype=torch.float32)
)
batch_loss.backward()
optimizer.step()
return batch_loss, batch_rets, batch_lens
以上就是最最单纯的policy gradient的实现,上面贴出来的网页中还告诉我们一些tricks,有兴趣可以自己看下。