论文:《Soft Actor-Critic Algorithms and Applications (arXiv: 1812) 》# 不是1801版
算法构成 | 说明 |
---|---|
rl_typing.py | 强化学习数据类型声明 |
sac_agent.py | SAC-Auto算法 |
agent = SAC_Agent(env, kwargs=...) # 初始化算法, 并设置SAC的训练参数
agent.set_buffer(buffer) # 为算法自定义replay buffer
agent.set_nn(actor, critic, kwargs=...) # 为算法自定义神经网络
# 更多具体接口信息通过help函数查看DocString
agent.to('cpu') # 将算法转移到指定设备上
agent.cuda(0) # 将算法转移到cuda0上运算
agent.cpu() # 将算法转移到cpu上运算
agent.save('./训练备份') # 存储算法训练过程checkpoint
agent.load('./训练备份') # 加载算法训练过程checkpoint
agent.export('策略.onnx', kwargs=...) # 部署训练好的onnx策略模型
act_array = agent.select_action(obs, kwargs=...) # 环境交互, 基于策略选择-1~1的随机/确定动作
act_array = agent.random_action() # 环境随机探索, 完全随机产生-1~1的动作
agent.store_memory(transition, kwargs=...) # 存储环境转移元组(s, a, r, s_, done)
info_dict = agent.learn(kwargs=...) # 进行一次SAC优化, 返回Loss/Q函数/...
obs_tensor = agent.state_to_tensor(obs, kwargs=...) # 将Gym返回的1个obs转换成batch_obs, 用于处理混合输入情况, 默认跟随buffer设置
batch_dict = agent.replay_memory(batch_size, kwargs=...) # 经验回放, 用于实现花样经验回放, 默认跟随buffer设置
agent.buffer_len # 算法属性, 查看当前经验个数, 默认跟随buffer设置
agent.use_per # 算法属性, 查看是否使用PER, 默认跟随buffer设置
- 要求 观测Encoder 输入为观测 batch_obs 张量,输出形状为(batch, feature_dim)的特征 batch_feature 张量。要求forward函数只接受一个位置参数obs,混合观测要求传入的obs为张量字典dict[any, Tensor] / 张量列表list[Tensor] / 张量元组tuple[Tensor, ...]。
- 要求 策略函数 输入为特征 batch_feature 张量,输出形状为(batch, action_dim)的未经tanh激活的均值 batch_mu 张量和对数标准差 batch_logstd 张量。要求forward函数只接受一个位置参数feature,形状为(batch, feature_dim)。
- 要求 Q函数 输入为特征 batch_feature 张量+动作 batch_action 张量,输出形状为(batch, 1)的Q值 batch_q 张量。要求forward函数只接受一个位置参数 feature_and_action,形状为(batch, feature_dim+action_dim)。
FEATURE_DIM = 128
ACTION_DIM = 3
# 自定义观测Encoder
class MyEncoder(nn.Module):
def __init__(self):
super().__init__()
self.encoder = ... # user encoder: CNN、RNN、Transformer、GNN ...
self.mlp = nn.Sequential(
nn.Linear(..., FEATURE_DIM),
nn.ReLU(True),
)
def forward(self, observation):
feature = self.mlp(self.encoder(observation))
return feature
encoder_net = MyEncoder()
# 自定义策略函数
class MyPolicy(nn.Module):
def __init__(self):
super().__init__()
self.mlp = nn.Sequential(
nn.Linear(FEATURE_DIM, 128),
nn.ReLU(True),
nn.Linear(128, ACTION_DIM), # no activation
)
def forward(self, feature):
return self.mlp(feature)
mu_net, logstd_net = MyPolicy(), MyPolicy()
# 自定义TwinQ函数
class MyQfun(nn.Module):
def __init__(self):
super().__init__()
self.mlp = nn.Sequential(
nn.Linear(FEATURE_DIM + ACTION_DIM, 128),
nn.ReLU(True),
nn.Linear(128, 1), # no activation
)
def forward(self, feature_and_action):
return self.mlp(feature_and_action)
q1_net, q2_net = MyQfun(), MyQfun()
# 为算法设置神经网络
actor = SAC_Actor(encoder_net, mu_net, logstd_net, kwargs=...) # 实例化actor网络
critic = SAC_Critic(encoder_net, q1_net, q2_net) # 实例化critic网络
agent.set_nn(
actor,
critic,
actor_optim_cls = th.optim.Adam,
critic_optim_cls = th.optim.Adam,
copy = True
)
实现自定义经验回放,可自定义存储不同数据类型的混合观测数据(进行一些多传感器数据融合的端到端控制问题求解),也可自定义实现PER等功能。
要求在派生类中实现以下抽象方法(输入参数和返回数据的格式参考DocString),可参考demo_train.py中派生类实现方法:
必须实现的方法 | 功能 |
---|---|
reset | 重置经验池(Off-Policy算法一般用不到),也可用于初始化经验池(生成转移元组collections) |
push | 经验存储:存入环境转移元组*(s, a, r, s_, done)* ,其中状态s 和下一个状态 s_ (或观测 obs )为array(或混合形式dict[any, array]、list[array]、tuple[array, ...]),动作 a 为array,奖励 r 为float, s_ 是否存在 done 为bool。 |
sample | 经验采样:要求返回包含关键字*'s','a','r','s_','done'* 的batch 字典, batch 的每个key对应value为Tensor(或dict[any, Tensor]、list[Tensor]、tuple[Tensor, ...]);PER的batch还要包含关键字 'IS_weight' ,对应的value为Tensor。 |
state_to_tensor | 数据升维并转换:将Gym输出的1个obs 转换成 batch obs ,要求返回Tensor(或混合形式dict[any, Tensor]、list[Tensor]、tuple[Tensor, ...])。 |
非必须实现的方法/属性 | 功能 |
save | 存储buffer数据,用于保存训练进度,可省略 |
load | 加载buffer数据,用于加载训练进度,可省略 |
update_priorities | 用于更新PER的优先级,非PER可省略 |
is_per(属性) | 是否是PER回放,默认False |
is_rnn(属性) | 是否RNN按episode回放,默认False |
nbytes(属性) | 用于查看经验池占用内存,默认0 |
MAX_SIZE = int(2**20)
OBS_SPACE = env.observation_space
ACT_SPACE = env.action_space
# 自定义Buffer
class Buffer(BaseBuffer):
def __init__(self, max_size: int, obs_space: Space, act_space: Union[Box, Discrete]):
super().__init__()
# 控制参数
self.max_size = max_size
self.curr_size = 0
self.ptr = 0
# 数据存储模块
self.obs_space = obs_space
self.act_space = act_space
self.data = ... # user collection
def reset(self, *args, **kwargs):
self.curr_size = 0
self.ptr = 0
def push(
self,
transition: tuple[Obs, Act, float, Obs, bool],
terminal: bool = None,
**kwargs
):
# add transtion
self.data[self.ptr] = ...
# update
self.ptr = (1 + self.ptr) % self.max_size
self.curr_size = min(1 + self.curr_size, self.max_size)
def sample(
self,
batch_size: int = 1,
*,
idxs: ListLike = None,
rate: float = None,
**kwargs,
) -> dict[str, Union[ObsBatch, ActBatch, th.FloatTensor]]:
# sample indexes
idxs = idxs or np.random.choice(self.curr_size, size=batch_size, replace=False)
# get data
batch = {
"s": self.data[idxs] # device: self.device; shape: (batch_size, ...); type: FloatTensor / tuple[FloatTensor, ...] / list[FloatTensor] / dict[any, FloatTensor]
"a": ... # device: self.device; shape: (batch_size, act_dim(TD3/SAC/PPO) / 1(DQN/DSAC/DPPO)); type: FloatTensor(TD3/SAC/PPO) / LongTensor(DQN/DSAC/DPPO)
"r": ... # device: self.device; shape: (batch_size, 1); type: FloatTensor
"s_": ... # device: self.device; shape: (batch_size, ...); type: FloatTensor / tuple[FloatTensor, ...] / list[FloatTensor] / dict[any, FloatTensor]
"done": ... # device: self.device; shape: (batch_size, 1); type: FloatTensor
}
return batch
def state_to_tensor(self, state: Obs, use_rnn=False) -> ObsBatch:
# Easy Obs
return th.FloatTensor(state).unsqueeze(0).to(self.device) # no_rnn: shape = (1, ...); use_rnn: shape = (1, 1, ...)
# Mixed Obs
return {k: th.FloatTensor(state[k]).unsqueeze(0).to(self.device) for k in state.keys()}
return [th.FloatTensor(state[i]).unsqueeze(0).to(self.device) for i in range(len(state))]
return tuple(th.FloatTensor(state[i]).unsqueeze(0).to(self.device) for i in range(len(state)))
# 为算法设置Buffer
buffer = Buffer(MAX_SIZE, OBS_SPACE, ACT_SPACE) # 实例化buffer模块
agent.set_buffer(buffer)
包含的模块 | 说明 |
---|---|
LidarModel | 激光雷达模拟(基于东北天坐标系) |
NormalizedActionsWrapper | 环境装饰器:非-1~1动作空间归一化,用于与算法适配 |
DynamicPathPlanning | 动力学路径规划环境(动作空间-1~1,基于东天南坐标系) |
StaticPathPlanning | 路径搜索环境(动作空间非-1~1) |
# 实例化环境
from path_plan_env import DynamicPathPlanning
env = DynamicPathPlanning(kwargs=...)
# 训练/测试交互
obs, info = env.reset(kwargs=...) # new gym style
obs = env.reset(kwargs=...) # old gym style
while 1:
try:
env.render(kwargs=...) # 可视化路径规划(测试)
act = np.array([...]) # shape=(act_dim, ) range∈-1~1
obs, rew, done, truncated, info = env.step(act, kwargs=...) # new gym style
obs, rew, done, info = env.step(act, kwargs=...) # old gym style
except AssertionError:
env.plot("fig.png", kwargs=...) # 输出规划结果(训练)
break
$$ \mathbf{s}{new} \gets \mathbf{s}{old} + \mathbf{a} $$
1.0观测空间(BoxSpace):
观测空间 | n=6 |
---|---|
空间名(onnx输入名) | ”observation“ |
空间类型 | Box |
数据结构 | shape = (2n, ); dtype = float32 |
low | [x_min, y_min] * n |
high | [x_max, y_max] * n |
1.1动作空间(BoxSpace):
动作空间 | n=6 |
---|---|
空间名(onnx输出名) | ”action“ |
空间类型 | Box |
数据结构 | shape = (2n, ); dtype = float32 |
low | [-(x_max-x_min)/10, -(y_max-y_min)/10] * n |
high | [+(x_max-x_min)/10, +(y_max-y_min)/10] * n |
发射n条射线,雷达测距数据结构:
动力学模型:
状态空间(BoxSpace):
控制空间(BoxSpace):
2.0观测空间(DictSpace):
$$ \mathbf{o} \subset \text{Dict}\left { \mathbf{vector} {t-N+1:t}:\text{Box}\left { D,V,q \right }, \mathbf{points}{t-N+1:t}:\text{Box}\left { d_0,d_1,\dots ,d_{n-1} \right } \right } $$
D为距离、V为速度、q为视线角、points为雷达测距
时序vector观测空间 | N=4 |
---|---|
空间名(onnx输入名) | ”seq_vector" |
空间类型 | Box |
数据结构 | shape = (N, 3); dtype = float32 |
low | [ [0, V_low, -pi] ] * N |
high | [ [1.414*map_size, V_high, pi] ] * N |
时序points观测空间 | N=4,n=128 |
空间名(onnx输入名) | “seq_points" |
空间类型 | Box |
数据结构 | shape = (N, n) ; dtype = float32 |
low | [ [-1] * n ] * N |
high | [ [d_max] * n ] * N |
2.1动作空间(BoxSpace):
动作空间 | |
---|---|
空间名(onnx输出名) | "action" |
空间类型 | Box |
数据结构 | shape = (2, ); dtype = float32 |
low | [-1, -1] |
high | [1, 1] |
python >= 3.9
SAC算法依赖项:
gym >= 0.21.0 (数据结构API)
numpy >= 1.22.3 (数组运算API)
pytorch >= 1.10.2 (深度学习API)
onnx >= 1.13.1 (模型部署API)
onnxruntime >= 1.15.1 (模型推理API)
非SAC算法依赖项:
tensorboard (训练日志记录)
scipy >= 1.7.3 (自定义Env数值积分)
shapely >= 2.0.1 (自定义Env障碍表示)
matplotlib >= 3.5.1 (自定义Env可视化)
Path-Planning: 路径规划算法,A*、Dijstra、Hybrid A*等经典路径规划
Grey-Wolf-Optimizer-for-Path-Planning: 灰狼优化算法路径规划、多智能体/多无人机航迹规划