Сегодня мы рассмотрим алгоритм Proximal Policy Optimization. Этот алгоритм идеально балансирует стабильность и простоту реализации. В отличие от TRPO, где приходится возиться с жесткими ограничениями и сложными оптимизационными задачами, PPO позволяет обновлять политику через функцию потерь с clippin (на рус. «механим обрезки»).
Для наглядности будем использовать кастомную среду «CatChaseEnv», в которой агент‑котик учится ловить лазерную точку.
Теоретическая основа PPO
Обновление политики через градиентный спуск
В обучении с подкреплением мы стремимся максимизировать суммарную награду агента. Одним из подходов является прямая оптимизация параметров политики методом градиентного спуска. Однако резкие обновления могут привести к тому, что агент резко изменит свою стратегию, что, в свою очередь, ухудшает обучение.
Вероятностное отношение
Чтобы контролировать изменения политики, мы вводим отношение вероятностей:
Это соотношение показывает, насколько изменилась вероятность выбора конкретного действия после обновления параметров. Если сильно отклоняется от
, значит, новая политика слишком отличается от старой, и это может быть опасно для стабильности обучения.
Оценка преимущества
Оценка преимущества (advantage) показывает, насколько выгодно было совершенное действие по сравнению с ожиданием. Обычно её рассчитывают как разницу между накопленной дисконтированной наградой и оценкой ценности состояния:
где — накопленная награда, а
— оценка ценности состояния
. Преимущество помогает модели понять, какие действия стоит усиливать, а какие — ослаблять.
Механизм clipping
Чтобы избежать слишком агрессивных изменений, используется механизм clipping. Целевая функция PPO выглядит следующим образом:
Здесь параметр (обычно в диапазоне 0.1–0.3) задаёт границы, в которых отношение
может изменяться. Смысл в том, что если обновление выходит за установленные рамки, функция потерь не позволит сделать слишком большой шаг, сохраняя стабильность обучения.
Итак, как это всё работает вместе? Допустим, есть агент, который постоянно пытается улучшить свою стратегию, чтобы зарабатывать больше наград. Сначала используем градиентный спуск, чтобы слегка подкорректировать политику. Но если изменения будут слишком резкими, агент может начать перебарщивать и кардинально менять своё поведение.
Чтобы избежать этого, вводим отношение вероятностей, которое отслеживает, насколько новая политика отличается от старой. Если разница слишком большая, то это сигнал, что изменения слишком экстремальны. Далее, с помощью оценки преимущества приходит понимание, какие действия оказались лучше ожиданий, и хотим усилить именно их. Но чтобы система не пошла по накатанной и не сделала слишком большой шаг, применяется механизм clipping, который как предохранитель ограничивает изменения в пределах разумного. Таким образом, всё вместе эти компоненты создают стабильный и сбалансированный процесс обучения: агент учится, корректируя свои действия мягко и постепенно, без резких скачков в стратегии.
Практическая реализация PPO на Python: учим котика ловить лазер
Реализуем всё на примере симуляции, где агент — котик, пытающийся поймать лазерную точку.
Создание кастомной среды «CatChaseEnv»
Первым делом создадим простую кастомную среду, имитирующую поведение кота.
import gym
from gym import spaces
import numpy as np
class CatChaseEnv(gym.Env):
def __init__(self):
super(CatChaseEnv, self).__init__()
# Действия: 0 - сидеть, 1 - двигаться влево, 2 - двигаться вправо
self.action_space = spaces.Discrete(3)
# Пространство наблюдений: положение кота на линейной шкале от 0 до 100
self.observation_space = spaces.Box(low=0, high=100, shape=(1,), dtype=np.float32)
self.state = None
def reset(self):
# Начальное положение кота по центру (50)
self.state = np.array([50.0])
return self.state
def step(self, action):
# Обработка действия: 1 - влево, 2 - вправо, 0 - без движения
if action == 1:
self.state[0] -= 5
elif action == 2:
self.state[0] += 5
# Награда: чем ближе кот к позиции 80 (лазерная точка), тем выше награда
reward = -abs(self.state[0] - 80)
# Эпизод завершается, если кот уходит за пределы диапазона [0, 100]
done = bool(self.state[0] <= 0 or self.state[0] >= 100)
return self.state, reward, done, {}
Определяем среду, в которой кот начинает с позиции 50. Действия изменяют его позицию, а награда вычисляется как отрицательное расстояние до целевой позиции (80). Если кот выходит за границы (менее 0 или больше 100), эпизод заканчивается.
Фиксация сидов и импорт библиотек
Чтобы эксперименты были воспроизводимы, фиксируем сиды для всех библиотек.
import torch
import torch.nn as nn
import torch.optim as optim
import random
# Фиксируем сиды для Torch, NumPy и random
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
Фиксация случайных сидов гарантирует, что при каждом запуске алгоритма результаты будут воспроизводимыми.
Определение модели: Actor-Critic для котика
Создадим модель, которая одновременно будет оценивать ценность состояния и предсказывать вероятности действий.
class ActorCritic(nn.Module):
def __init__(self, state_dim, action_dim, hidden_size=128):
super(ActorCritic, self).__init__()
# Полносвязный слой для первичной обработки состояния
self.fc1 = nn.Linear(state_dim, hidden_size)
# BatchNorm для ускорения сходимости (учтите, что в RL распределение данных может быть нестандартным)
self.bn1 = nn.BatchNorm1d(hidden_size)
# Две "головы": actor для вероятностей действий и critic для оценки состояния
self.actor = nn.Linear(hidden_size, action_dim)
self.critic = nn.Linear(hidden_size, 1)
self.relu = nn.ReLU()
def forward(self, x):
# Пропускаем вход через fc1, применяем BatchNorm и активацию ReLU
x = self.relu(self.bn1(self.fc1(x)))
policy_logits = self.actor(x)
state_value = self.critic(x)
return policy_logits, state_value
def act(self, state):
# Преобразуем входное состояние в тензор и вычисляем логиты
state_tensor = torch.FloatTensor(state).unsqueeze(0)
logits, _ = self.forward(state_tensor)
probs = torch.softmax(logits, dim=-1)
# Создаем категориальное распределение и семплируем действие
dist = torch.distributions.Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action)
Строим простую сеть с одним скрытым слоем, к которой добавлен BatchNorm для ускорения сходимости.
Буфер памяти
Буфер памяти аккумулирует опыт агента, который затем используется для обновления параметров модели.
class Memory:
def __init__(self):
self.clear()
def store(self, state, action, log_prob, reward, done):
self.states.append(state)
self.actions.append(action)
self.log_probs.append(log_prob)
self.rewards.append(reward)
self.dones.append(done)
def clear(self):
self.states = []
self.actions = []
self.log_probs = []
self.rewards = []
self.dones = []
Этот класс позволяет накапливать информацию об опыте: состояния, действия, логарифмы вероятностей, награды и индикаторы завершения эпизода. После обновления модели буфер очищается, чтобы собрать новый батч данных.
Вычисление накопленных вознаграждений и преимуществ
Чтобы понять, насколько хорошо агент действует, нужно вычислить дисконтированные награды и, затем, преимущество.
def compute_returns(rewards, dones, gamma=0.99):
returns = []
R = 0
# Идем в обратном порядке для вычисления накопленных наград
for reward, done in zip(reversed(rewards), reversed(dones)):
if done:
R = 0 # Сбрасываем, если эпизод завершился
R = reward + gamma * R
returns.insert(0, R)
return returns
def compute_advantages(returns, values):
# Преимущество = накопленные награды минус оценка состояния
advantages = np.array(returns) - np.array(values)
return advantages
Функция compute_returns
вычисляет накопленные награды, проходя данные в обратном порядке. Функция compute_advantages
затем вычисляет разницу между возвращаемыми значениями и оценкой состояния, что и представляет собой преимущество для каждого шага.
Научиться внедрять RL-алгоритмы на практике в играх, робототехнике, энергетике и финансах можно на онлайн-курсе “Reinforcement Learning”.
Функция потерь PPO с механизмом clipping
Самая основная часть алгоритма — корректное вычисление отношения вероятностей и его обрезка для стабильности обучения.
def ppo_loss(policy, states, actions, old_log_probs, advantages, clip_epsilon=0.2):
# Преобразуем входные данные в тензоры
states_tensor = torch.FloatTensor(states)
actions_tensor = torch.LongTensor(actions)
old_log_probs_tensor = torch.stack(old_log_probs).detach()
advantages_tensor = torch.FloatTensor(advantages)
# Получаем новые логиты и вычисляем вероятности действий
logits, _ = policy(states_tensor)
probs = torch.softmax(logits, dim=-1)
dist = torch.distributions.Categorical(probs)
# Вычисляем новые log_prob для каждого действия
new_log_probs = dist.log_prob(actions_tensor)
# Вычисляем отношение вероятностей (новые против старых)
ratio = torch.exp(new_log_probs - old_log_probs_tensor)
# Применяем clipping, чтобы отношение не выходило за пределы [1-ε, 1+ε]
clipped_ratio = torch.clamp(ratio, 1 - clip_epsilon, 1 + clip_epsilon)
loss = -torch.min(ratio * advantages_tensor, clipped_ratio * advantages_tensor).mean()
return loss
Преобразуем входные данные в тензоры, затем получаем новые логиты и вычисляем категориальное распределение. Рассчитываем логарифмы вероятностей для каждого действия, вычисляем отношение новых и старых значений и обрезаем его с помощью torch.clamp
. Итоговая функция потерь — это отрицательное среднее минимальное значение между unclipped и clipped вариантами.
Диапазон clip_epsilon
0.1–0.3 регулирует, насколько агрессивно обновления политики могут отличаться от предыдущей итерации. Если значение clip_epsilon
слишком маленькое, обновления будут чрезмерно ограниченными, что может замедлить процесс обучения, т.к модель не сможет достаточно «отпустить» новые данные. С другой стороны, слишком большое значение может привести к нестабильности: обновления станут слишком резкими, и модель может перелететь через оптимальные значения, ухудшая качество обучения.
Также не забываем про такие параметры, как коэффициент дисконтирования gamma и скорость обучения lr. gamma определяет, насколько сильно учитываются будущие награды, а lr влияет на скорость сходимости модели.
Основной тренировочный цикл
Соберем все части в единый тренировочный цикл, где агент‑кот будет обучаться ловить лазерную точку.
def train_ppo(env_name='CatChaseEnv', num_episodes=1000, update_timestep=2000,
clip_epsilon=0.2, epochs=4, gamma=0.99, lr=3e-4):
# Создаем кастомную среду с котиками
env = gym.make(env_name)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
policy = ActorCritic(state_dim, action_dim)
optimizer = optim.Adam(policy.parameters(), lr=lr)
memory = Memory()
timestep = 0
episode_rewards = []
try:
for episode in range(num_episodes):
state = env.reset()
episode_reward = 0
while True:
# Получаем действие и log_prob из модели
action, log_prob = policy.act(state)
next_state, reward, done, _ = env.step(action)
# Сохраняем опыт в буфер
memory.store(state, action, log_prob, reward, done)
timestep += 1
episode_reward += reward
state = next_state
# Если накопили достаточное количество шагов, обновляем политику
if timestep % update_timestep == 0:
returns = compute_returns(memory.rewards, memory.dones, gamma)
# Получаем оценки ценности для всех состояний
states_tensor = torch.FloatTensor(memory.states)
with torch.no_grad():
_, values = policy(states_tensor)
values = values.numpy().squeeze()
advantages = compute_advantages(returns, values)
# Нормализация преимуществ для стабильности обучения
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# Обновляем политику в течение нескольких эпох
for _ in range(epochs):
loss = ppo_loss(policy, memory.states, memory.actions, memory.log_probs, advantages, clip_epsilon)
optimizer.zero_grad()
loss.backward()
optimizer.step()
memory.clear()
if done:
episode_rewards.append(episode_reward)
print(f"Эпизод {episode+1}/{num_episodes} завершен, награда: {episode_reward:.2f}")
break
except KeyboardInterrupt:
print("Обучение прервано пользователем!")
except Exception as e:
print("Произошла ошибка в обучении:", e)
finally:
env.close()
return policy, episode_rewards
В этом цикле инициализируем среду, модель, оптимизатор и буфер памяти. На каждом шаге агент получает действие, выполняет его в среде, получает награду и сохраняет данные. После накопления определённого количества шагов происходит обновление политики: вычисляются возвращаемые значения, преимущества (с нормализацией), затем обновляется модель в течение нескольких эпох.
Заключение
Спасибо, что дочитали до конца! А как вы используете этот алгоритм в своих проектах? Будет интересно услышать ваши истории, кейсы и, конечно, отзывы о том, что получилось, а что можно улучшить.
Всем, кому интересен Reinforcement Learning, рекомендую посетить открытый урок в Otus 18 марта «RL — учимся обыгрывать человека». На нём узнаете, как применять алгоритмы RL в стохастических играх для решения реальных задач. Записаться
Автор: techevangelist