3.4 C
New York
Wednesday, December 3, 2025

How We Be taught Step-Degree Rewards from Preferences to Clear up Sparse-Reward Environments Utilizing On-line Course of Reward Studying


On this tutorial, we discover On-line Course of Reward Studying (OPRL) and display how we will be taught dense, step-level reward indicators from trajectory preferences to resolve sparse-reward reinforcement studying duties. We stroll by way of every element, from the maze surroundings and reward-model community to desire technology, coaching loops, and analysis, whereas observing how the agent progressively improves its behaviour by way of on-line preference-driven shaping. By working this end-to-end implementation, we achieve a sensible understanding of how OPRL permits higher credit score task, sooner studying, and extra steady coverage optimization in difficult environments the place the agent would in any other case battle to find significant rewards. Take a look at the FULL CODE NOTEBOOK.

import numpy as np
import torch
import torch.nn as nn
import torch.nn.practical as F
from torch.optim import Adam
import matplotlib.pyplot as plt
from collections import deque
import random


torch.manual_seed(42)
np.random.seed(42)
random.seed(42)


class MazeEnv:
   def __init__(self, measurement=8):
       self.measurement = measurement
       self.begin = (0, 0)
       self.purpose = (size-1, size-1)
       self.obstacles = set([(i, size//2) for i in range(1, size-2)])
       self.reset()
  
   def reset(self):
       self.pos = self.begin
       self.steps = 0
       return self._get_state()
  
   def _get_state(self):
       state = np.zeros(self.measurement * self.measurement)
       state[self.pos[0] * self.measurement + self.pos[1]] = 1
       return state
  
   def step(self, motion):
       strikes = [(-1,0), (0,1), (1,0), (0,-1)]
       new_pos = (self.pos[0] + strikes[action][0],
                  self.pos[1] + strikes[action][1])
       if (0 <= new_pos[0] < self.measurement and
           0 <= new_pos[1] < self.measurement and
           new_pos not in self.obstacles):
           self.pos = new_pos
       self.steps += 1
       carried out = self.pos == self.purpose or self.steps >= 60
       reward = 10.0 if self.pos == self.purpose else 0.0
       return self._get_state(), reward, carried out
  
   def render(self):
       grid = [['.' for _ in range(self.size)] for _ in vary(self.measurement)]
       for obs in self.obstacles:
           grid[obs[0]][obs[1]] = '█'
       grid[self.goal[0]][self.goal[1]] = 'G'
       grid[self.pos[0]][self.pos[1]] = 'A'
       return 'n'.be part of([''.join(row) for row in grid])


class ProcessRewardModel(nn.Module):
   def __init__(self, state_dim, hidden=128):
       tremendous().__init__()
       self.internet = nn.Sequential(
           nn.Linear(state_dim, hidden),
           nn.LayerNorm(hidden),
           nn.ReLU(),
           nn.Linear(hidden, hidden),
           nn.LayerNorm(hidden),
           nn.ReLU(),
           nn.Linear(hidden, 1),
           nn.Tanh()
       )
   def ahead(self, states):
       return self.internet(states)
   def trajectory_reward(self, states):
       return self.ahead(states).sum()


class PolicyNetwork(nn.Module):
   def __init__(self, state_dim, action_dim, hidden=128):
       tremendous().__init__()
       self.spine = nn.Sequential(
           nn.Linear(state_dim, hidden),
           nn.ReLU(),
           nn.Linear(hidden, hidden),
           nn.ReLU()
       )
       self.actor = nn.Linear(hidden, action_dim)
       self.critic = nn.Linear(hidden, 1)
   def ahead(self, state):
       options = self.spine(state)
       return self.actor(options), self.critic(options)

We arrange all the basis of our OPRL system by importing libraries, defining the maze surroundings, and constructing the reward and coverage networks. We set up how states are represented, how obstacles block motion, and the way the sparse reward construction works. We additionally design the core neural fashions that can later be taught course of rewards and drive the coverage’s selections. Take a look at the FULL CODE NOTEBOOK.

class OPRLAgent:
   def __init__(self, state_dim, action_dim, lr=3e-4):
       self.coverage = PolicyNetwork(state_dim, action_dim)
       self.reward_model = ProcessRewardModel(state_dim)
       self.policy_opt = Adam(self.coverage.parameters(), lr=lr)
       self.reward_opt = Adam(self.reward_model.parameters(), lr=lr)
       self.trajectories = deque(maxlen=200)
       self.preferences = deque(maxlen=500)
       self.action_dim = action_dim
  
   def select_action(self, state, epsilon=0.1):
       if random.random() < epsilon:
           return random.randint(0, self.action_dim - 1)
       state_t = torch.FloatTensor(state).unsqueeze(0)
       with torch.no_grad():
           logits, _ = self.coverage(state_t)
           probs = F.softmax(logits, dim=-1)
           return torch.multinomial(probs, 1).merchandise()
  
   def collect_trajectory(self, env, epsilon=0.1):
       states, actions, rewards = [], [], []
       state = env.reset()
       carried out = False
       whereas not carried out:
           motion = self.select_action(state, epsilon)
           next_state, reward, carried out = env.step(motion)
           states.append(state)
           actions.append(motion)
           rewards.append(reward)
           state = next_state
       traj = {
           'states': torch.FloatTensor(np.array(states)),
           'actions': torch.LongTensor(actions),
           'rewards': torch.FloatTensor(rewards),
           'return': float(sum(rewards))
       }
       self.trajectories.append(traj)
       return traj

We start establishing the OPRL agent by implementing motion choice and trajectory assortment. We use an ε-greedy technique to make sure exploration and collect sequences of states, actions, and returns. As we run the agent by way of the maze, we retailer complete trajectories that can later function desire information for shaping the reward mannequin. Take a look at the FULL CODE NOTEBOOK.

  def generate_preference(self):
       if len(self.trajectories) < 2:
           return
       t1, t2 = random.pattern(record(self.trajectories), 2)
       label = 1.0 if t1['return'] > t2['return'] else 0.0
       self.preferences.append({'t1': t1, 't2': t2, 'label': label})
  
   def train_reward_model(self, n_updates=5):
       if len(self.preferences) < 32:
           return 0.0
       total_loss = 0.0
       for _ in vary(n_updates):
           batch = random.pattern(record(self.preferences), 32)
           loss = 0.0
           for merchandise in batch:
               r1 = self.reward_model.trajectory_reward(merchandise['t1']['states'])
               r2 = self.reward_model.trajectory_reward(merchandise['t2']['states'])
               logit = r1 - r2
               pred_prob = torch.sigmoid(logit)
               label = merchandise['label']
               loss += -(label * torch.log(pred_prob + 1e-8) +
                        (1-label) * torch.log(1 - pred_prob + 1e-8))
           loss = loss / len(batch)
           self.reward_opt.zero_grad()
           loss.backward()
           torch.nn.utils.clip_grad_norm_(self.reward_model.parameters(), 1.0)
           self.reward_opt.step()
           total_loss += loss.merchandise()
       return total_loss / n_updates

We generate desire pairs from collected trajectories and prepare the method reward mannequin utilizing the Bradley–Terry formulation. We evaluate trajectory-level scores, compute possibilities, and replace the reward mannequin to replicate which behaviours seem higher. This permits us to be taught dense, differentiable, step-level rewards that information the agent even when the surroundings itself is sparse. Take a look at the FULL CODE NOTEBOOK.

 def train_policy(self, n_updates=3, gamma=0.98):
       if len(self.trajectories) < 5:
           return 0.0
       total_loss = 0.0
       for _ in vary(n_updates):
           traj = random.selection(record(self.trajectories))
           with torch.no_grad():
               process_rewards = self.reward_model(traj['states']).squeeze()
           shaped_rewards = traj['rewards'] + 0.1 * process_rewards
           returns = []
           G = 0
           for r in reversed(shaped_rewards.tolist()):
               G = r + gamma * G
               returns.insert(0, G)
           returns = torch.FloatTensor(returns)
           returns = (returns - returns.imply()) / (returns.std() + 1e-8)
           logits, values = self.coverage(traj['states'])
           log_probs = F.log_softmax(logits, dim=-1)
           action_log_probs = log_probs.collect(1, traj['actions'].unsqueeze(1))
           benefits = returns - values.squeeze().detach()
           policy_loss = -(action_log_probs.squeeze() * benefits).imply()
           value_loss = F.mse_loss(values.squeeze(), returns)
           entropy = -(F.softmax(logits, dim=-1) * log_probs).sum(-1).imply()
           loss = policy_loss + 0.5 * value_loss - 0.01 * entropy
           self.policy_opt.zero_grad()
           loss.backward()
           torch.nn.utils.clip_grad_norm_(self.coverage.parameters(), 1.0)
           self.policy_opt.step()
           total_loss += loss.merchandise()
       return total_loss / n_updates


def train_oprl(episodes=500, render_interval=100):
   env = MazeEnv(measurement=8)
   agent = OPRLAgent(state_dim=64, action_dim=4, lr=3e-4)
   returns, reward_losses, policy_losses = [], [], []
   success_rate = []
   for ep in vary(episodes):
       epsilon = max(0.05, 0.5 - ep / 1000)
       traj = agent.collect_trajectory(env, epsilon)
       returns.append(traj['return'])
       if ep % 2 == 0 and ep > 10:
           agent.generate_preference()
       if ep > 20 and ep % 2 == 0:
           rew_loss = agent.train_reward_model(n_updates=3)
           reward_losses.append(rew_loss)
       if ep > 10:
           pol_loss = agent.train_policy(n_updates=2)
           policy_losses.append(pol_loss)
       success = 1 if traj['return'] > 5 else 0
       success_rate.append(success)
       if ep % render_interval == 0 and ep > 0:
           test_env = MazeEnv(measurement=8)
           agent.collect_trajectory(test_env, epsilon=0)
           print(test_env.render())
   return returns, reward_losses, policy_losses, success_rate

We prepare the coverage utilizing formed rewards produced by the realized course of reward mannequin. We compute returns, benefits, worth estimates, and entropy bonuses, enabling the agent to enhance its technique over time. We then construct a full coaching loop through which exploration decays, preferences accumulate, and each the reward mannequin and the coverage are up to date repeatedly. Take a look at the FULL CODE NOTEBOOK.

print("Coaching OPRL Agent on Sparse Reward Maze...n")
returns, rew_losses, pol_losses, success = train_oprl(episodes=500, render_interval=250)


fig, axes = plt.subplots(2, 2, figsize=(14, 10))


axes[0,0].plot(returns, alpha=0.3)
axes[0,0].plot(np.convolve(returns, np.ones(20)/20, mode="legitimate"), linewidth=2)
axes[0,0].set_xlabel('Episode')
axes[0,0].set_ylabel('Return')
axes[0,0].set_title('Agent Efficiency')
axes[0,0].grid(alpha=0.3)


success_smooth = np.convolve(success, np.ones(20)/20, mode="legitimate")
axes[0,1].plot(success_smooth, linewidth=2, coloration="inexperienced")
axes[0,1].set_xlabel('Episode')
axes[0,1].set_ylabel('Success Price')
axes[0,1].set_title('Objective Success Price')
axes[0,1].grid(alpha=0.3)


axes[1,0].plot(rew_losses, linewidth=2, coloration="orange")
axes[1,0].set_xlabel('Replace Step')
axes[1,0].set_ylabel('Loss')
axes[1,0].set_title('Reward Mannequin Loss')
axes[1,0].grid(alpha=0.3)


axes[1,1].plot(pol_losses, linewidth=2, coloration="crimson")
axes[1,1].set_xlabel('Replace Step')
axes[1,1].set_ylabel('Loss')
axes[1,1].set_title('Coverage Loss')
axes[1,1].grid(alpha=0.3)


plt.tight_layout()
plt.present()


print("OPRL Coaching Full!")
print("Course of rewards, desire studying, reward shaping, and on-line updates demonstrated.")

We visualize the educational dynamics by plotting returns, success charges, reward-model loss, and coverage loss. We monitor how the agent’s efficiency evolves as OPRL shapes the reward panorama. By the top of the visualization, we clearly see the influence of course of rewards on fixing a difficult, sparse-reward maze.

In conclusion, we see how OPRL transforms sparse terminal outcomes into wealthy on-line suggestions that repeatedly guides the agent’s behaviour. We watch the method reward mannequin be taught preferences, form the return sign, and speed up the coverage’s means to succeed in the purpose. With bigger mazes, various shaping strengths, and even actual human desire suggestions, we respect how OPRL supplies a versatile and highly effective framework for credit score task in advanced decision-making duties. We end with a transparent, hands-on understanding of how OPRL operates and the way we will lengthen it to extra superior agentic RL settings.


Take a look at the FULL CODE NOTEBOOK and Paper. Be happy to take a look at our GitHub Web page for Tutorials, Codes and Notebooks. Additionally, be at liberty to observe us on Twitter and don’t neglect to hitch our 100k+ ML SubReddit and Subscribe to our E-newsletter. Wait! are you on telegram? now you’ll be able to be part of us on telegram as properly.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its recognition amongst audiences.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles