当前位置: 首页 > article >正文

基于DQN的自动驾驶小车绕圈任务

1.任务介绍

        任务来源: DQN: Deep Q Learning |自动驾驶入门(?) |算法与实现
        任务原始代码: self-driving car
        最终效果:
请添加图片描述
        以下所有内容,都是对上面DQN代码的改进,在与deepseek的不断提问交互中学习理解代码,并且对代码做出改进,让小车能够尽可能的跑的远
        这个DQN代码依然有很多可改进的地方,受限于个人精力,没有再继续探索下去,欢迎指正交流。

        后续会更新DDPG算法和PPO算法在此任务上的应用调试记录的代码。

2.调试记录

        问题:使用原始DQN代码,小车很难学习到正确的策略去绕圈,并且跑的距离也很短

    2.1 更改学习率(lr)和探索率(eps_end)

        记录:图片为最初设置的average_score,增大减小都没有显著效果;
在这里插入图片描述

    2.2 增加模型层数

        记录:模型采用三层,中间增加256*256的隐藏层;模型增加一层之后,效果显著提升,训练到400次的时候,average_score已经能够达到5.3w
        原因:模型太简单了,这样的结构可能不足以捕捉复杂的状态空间,导致训练的效果差;
        现阶段问题:横向是不太稳定的,小车跑起来方向频繁抖动,且整体流畅度不够

    2.3 横纵向action细化,同时更改max_mem_size=1000000

        记录:[5,0],[2,1],[2,-1],[0,1],[0,0],[0,-1],[-2,1],[-2,-1],[-5,0],一共9中组合;修改完之后流畅度有较大提升,avg_score也有很大提升;
在这里插入图片描述

    2.4 奖励函数优化

        记录:奖励函数去掉距离的概念,主要考虑居中性、速度的连续性、转角的连续性;可以看到avg_score已经可以达到非常大的值,变化趋势都是先持续增加,然后在迭代次数达到200-300次左右时震荡下降
在这里插入图片描述
在这里插入图片描述
        现阶段问题:avg_score变化趋势都是先持续增加,然后在迭代次数达到200-300次左右时震荡下降

    2.5 增加目标网络,且目标网络更新用软更新

        记录:原始DNQ代码中是没有目标网络的,训练震荡会比较大,增加目标网络可以稳定训练; 小车在行驶过程中频繁的小幅度左右摇头的情况变少了,不过感觉纵向的加减速比较频繁,目标网络用软更新之后明显感觉在两台电脑上训练起来性能提升都很快,70次左右的时候都达到了百万级别,前期分数异常高,后期分数抖动很大
在这里插入图片描述

    2.6 状态中增加最小距离,自车车速,自车转角,使得问题符合MDP

        记录:这一点其实早该考虑到的,原来的状态返回之后雷达距离,是缺少一些自车相关的信息
在这里插入图片描述
在这里插入图片描述
        现阶段问题:模型在290次左右的时候score达到峰值,后面越训练得分越低
        原因:核心原因应该还是之前的经验被遗忘导致的,memory_size由100W改为1000W就有很大改善了

    2.7 状态中雷达数据去掉强制int转换,防止丢失精度影响模型策略学习和泛化能力

        记录:最大score能到250W了,并且mem_cntr已经远远超过100W,证明有帮助,不过仍然存在越训练score越低的情况,大概在sp355次左右;在去掉雷达int的强制转换之后,训练效果有很大提升;
在这里插入图片描述在这里插入图片描述

    2.8 observation归一化

        记录:episode 145,单次score已经能够达到2.5亿左右,平均score达到500W
在这里插入图片描述

3.代码主要改进点

    3.1 DeepQNetwork

        1.网络增加一层;

class DeepQNetwork(nn.Module):def __init__(self, lr, input_dims, n_actions):...# 网络增加一层self.fc1 = nn.Linear(*input_dims, 256)self.fc2 = nn.Linear(256, 256)self.fc3 = nn.Linear(256, n_actions)...

        2.网络激活函数改变;

class DeepQNetwork(nn.Module):def forward(self, state):# 激活函数也做了一定的尝试变更x = F.relu(self.fc1(state))x = F.tanh(self.fc2(x))...

        3.输出action做对应适配;

class DeepQNetwork(nn.Module):def forward(self, state):...# 输出action也做对应适配actions = self.fc3(x)return actions

    3.2 Agent

        1.增加目标网络,用于稳定训练;

class Agent:def __init__(self, gamma, epsilson, lr, input_dims, batch_size, n_actions,max_mem_size=100000, eps_end=0.01, eps_dec=5e-4):...self.Q_eval = DeepQNetwork(lr, input_dims, n_actions)# 增加目标网络,用于稳定训练self.Q_target = DeepQNetwork(lr, input_dims, n_actions)...def learn_dqp(self, n_game):...# TODO: 确认下为什么要用[batch_index, action_batch]----其目的是为每个样本选择实际执行动作的Q值。Q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]# 直接调self.Q_target(new_state_batch)和self.Q_eval.forward(new_state_batch)的效果是一样的q_next = self.Q_target(new_state_batch)...

        2.增加学习率调度器(指数衰减);

class Agent:def __init__(self, gamma, epsilson, lr, input_dims, batch_size, n_actions,max_mem_size=100000, eps_end=0.01, eps_dec=5e-4):...self.lr_min = 1e-6self.loss_value = 0# 新增学习率调度器(指数衰减)self.lr_scheduler = optim.lr_scheduler.ExponentialLR(self.Q_eval.optimizer,gamma=0.995  # 每episode学习率衰减0.5%)self.action_memory = np.zeros(self.mem_size, dtype=np.int32)...def learn_dqp(self, n_game):...self.Q_eval.optimizer.step()# 学习率调整必须在参数更新之后if self.mem_cntr % 2000 == 0:if self.lr_scheduler.get_last_lr()[0] > self.lr_min:self.lr_scheduler.step()  # 调整学习率print("lr updated!, current lr = {}".format(self.lr_scheduler.get_last_lr()[0]))...

        3.目标网络参数更新采用软更新;

class Agent:def __init__(self, gamma, epsilson, lr, input_dims, batch_size, n_actions,max_mem_size=100000, eps_end=0.01, eps_dec=5e-4):...self.Q_target = DeepQNetwork(lr, input_dims, n_actions)self.update_target_freq = 1000  # 每1000步同步一次self.tau = 0.05  # 软更新系数...def soft_update_target(self):for target_param, param in zip(self.Q_target.parameters(),self.Q_eval.parameters()):target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)def learn_dqp(self, n_game):...# 软更新目标网络self.soft_update_target()...

        4.增加了double-DQN的实现方式;

class Agent:def learn_double_dqp(self, n_game):# TODO: 看一下这一块是不是论文中提到的每间几千步更新一下?# 这一块他设定了一个batch_size=64,即每64个为一批,然后通过模型更新一下参数if self.mem_cntr < self.batch_size:returnself.Q_eval.optimizer.zero_grad()max_mem = min(self.mem_cntr, self.mem_size)batch = np.random.choice(max_mem, self.batch_size, replace=False)batch_index = np.arange(self.batch_size, dtype=np.int32)state_batch = torch.tensor(self.state_memory[batch]).to(self.Q_eval.device)new_state_batch = torch.tensor(self.new_state_memory[batch]).to(self.Q_eval.device)reward_batch = torch.tensor(self.reward_memory[batch]).to(self.Q_eval.device)terminal_batch = torch.tensor(self.terminal_memory[batch]).to(self.Q_eval.device)action_batch = self.action_memory[batch]# TODO: 确认下为什么要用[batch_index, action_batch]----其目的是为每个样本选择实际执行动作的Q值。Q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]# ========== Double DQN修改部分 ==========q_eval_next = self.Q_eval(new_state_batch)max_actions = torch.argmax(q_eval_next, dim=1)# 输出为[batch_size,action_size],每一行对应随机选取的state,每一列对应当前state对应的每个action估计的Q值# print(q_eval_next)# 输出为每个state下能够得到最大Q值的action,dim为1*batch_size# print(max_actions)# 输出为每个state下能够得到最大Q值的action,不过将dim转换为了batch_size*1# print(max_actions.unsqueeze(1))# 输出为,之前得到的每个state下对应的最优action,在当前网络中对应的Q值,dim=batch_size*1# print(self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)))# 输出为,之前得到的每个state下对应的最优action,在当前网络中对应的Q值,不过将dim转换为了1*batch_size# print(self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)).squeeze(1))q_next = self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)).squeeze(1)q_next[terminal_batch] = 0.0q_target = reward_batch + self.gamma * q_next# =======================================# 估计Q值,因为Q值本来就是未来的预期奖励,我们通过估计未来的预期奖励和当前实际过得的预期奖励的差值,来更新# 模型,所以你最大化你的value function,就是在最优化你的policy本身loss = self.Q_eval.loss(q_target, Q_eval).to(self.Q_eval.device)self.loss_value = loss.item()loss.backward()# 添加梯度裁剪# torch.nn.utils.clip_grad_norm_(self.Q_eval.parameters(), max_norm=1.0)self.Q_eval.optimizer.step()# 学习率调整必须在参数更新之后if self.mem_cntr % 2000 == 0:if self.lr_scheduler.get_last_lr()[0] > self.lr_min:self.lr_scheduler.step()  # 调整学习率print("lr updated!, current lr = {}".format(self.lr_scheduler.get_last_lr()[0]))# 软更新目标网络self.soft_update_target()self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_min

    3.3 Agent

        1.小车返回状态优化;
                状态中增加最小距离,自车车速,自车转角,使得问题符合MDP;
                状态中雷达数据去掉强制int转换,防止丢失精度影响模型策略学习和泛化能力;
                observation归一化;

class Car:def get_data(self):# Get Distances To Borderreturn_values = [0] * len(self.radars)self.current_lateral_min_dist = 60for i, radar in enumerate(self.radars):return_values[i] = radar[1] / 300.0if radar[1] < self.current_lateral_min_dist:self.current_lateral_min_dist = radar[1]angle_rad = np.deg2rad(self.angle)return_values = return_values + [self.current_lateral_min_dist / 30,np.clip(self.speed / 20.0, 0.0, 1.0),np.sin(angle_rad), np.cos(angle_rad)]return return_values

        2.奖励函数优化;
                主要去掉了距离的概念,引入居中性、速度连续性、转角连续性概念,这么设计是因为只要小车始终保持居中,并且有速度,小车就可以一直跑下去;

class Car:def get_reward_optimized(self, game_map, alive_count_total):# 居中性lateral_reward = 1.0# print(self.current_lateral_min_dist)if self.current_lateral_min_dist / 60 > 0.5:lateral_reward = self.current_lateral_min_dist / 60elif self.current_lateral_min_dist / 60 < 0.4:lateral_reward = -0.5else:lateral_reward = 0.0# 速度基础speed_base_reward = self.speed / min(12 + alive_count_total / 100000, 20)# 速度连续性if len(self.speed_memory) >= 4:self.speed_memory = self.speed_memory[1:]self.speed_memory.append(self.speed)speed_up_discount = 1.0if self.speed_memory[-1] - self.speed_memory[0] >= 3 and lateral_reward > 0.0:speed_up_discount = -0.5elif self.speed_memory[-1] - self.speed_memory[0] >= 2 and lateral_reward > 0.0:speed_up_discount = 0.7# 转角连续性,写的随意了点,目的就是防止方向左右抖动angle_discount = 1.0if len(self.angle_memory) >= 5:self.angle_memory = self.angle_memory[1:]self.angle_memory.append(self.angle)aaa = [0] * 4if len(self.angle_memory) >= 5:for i in range(1, 5):aaa[i-1] = self.angle_memory[i] - self.angle_memory[i-1]bbb = [0] * 3for j in range(1, 4):bbb[j-1] = 1 if aaa[j-1] * aaa[j] < 0 else 0if sum(bbb) >= 3 and lateral_reward > 0.0:angle_discount = 0.8# print(lateral_reward, speed_up_discount, angle_discount, " ====== ", self.speed_memory)return 100 * lateral_reward * speed_up_discount * angle_discount

    3.4 train

        1.action优化;

def train():...agent = Agent(gamma=0.99, epsilson=1.0, batch_size=256, n_actions=9,eps_end=0.1, input_dims=[num_radar + 4], lr=0.005,max_mem_size=1000000, eps_dec=1e-4)...while not done:action = agent.choose_action(observation)# [5,0],[2,1],[2,-1],[0,1],[0,0],[0,-1],[-2,1],[-2,-1],[-5,0]if action == 0:car.angle += 5car.speed += 0elif action == 1:car.angle += 2if car.speed < 20:car.speed += 1elif action == 2:car.angle += 2if car.speed - 1 >= 12:car.speed -= 1elif action == 3:car.angle += 0if car.speed < 20:car.speed += 1elif action == 4:car.angle += 0car.speed += 0elif action == 5:car.angle += 0if car.speed - 1 >= 12:car.speed -= 1elif action == 6:car.angle -= 2if car.speed < 20:car.speed += 1elif action == 7:car.angle -= 2if car.speed - 1 >= 12:car.speed -= 1else:car.angle -= 5car.speed += 0...

4.任务思考

        1.DQN为什么不能处理连续控制性问题?
        DQN的核心思想是为每个可能的动作计算Q值,并选择Q值最大的动作。在连续动作空间中,动作是无限多的,连续动作空间离散化会造成维度灾难
        2.DQN总是过估计的原因?
在这里插入图片描述
        一方面总是取max操作;另一方面用自己的估计再去估计自己,如果当前已经出现高估,那么下一轮的TD-target更会高估,相当于高估会累计

        3.计算loss时,q_target对应的是每个状态下的最大Q值的动作,而Q_eval对应的却是从action_memory中随机选取的一批动作。这样是否会导致动作不匹配的问题,进而影响学习效果?
        在DQN中,q_target的计算遵循目标策略(最大化Q值),而Q_eval基于行为策略(实际执行的动作)。这种分离是Q-learning的核心设计,不会导致动作不匹配问题,反而是算法收敛的关键。
        Q_eval的作用:
                估实际动评作的价值:即使动作是随机探索的,也需要知道这些动作的历史价值。
        更新方向:
                通过loss = (q_target - Q_eval)^2,将实际动作的Q值向更优的目标值调整。
        q_target的作用:
                引导策略优化:通过最大化下一状态的Q值,逐步将策略向最优方向推进。

        4.DQN算法的网络中,使用MLP和使用CNN会有什么差异?
        总结:
                MLP:适合低维结构化数据,依赖显式特征工程,计算简单但高维易过拟合。
                CNN:适合高维空间数据(如图像),自动提取局部特征,参数共享提升效率。
        选择关键:根据输入数据的空间属性和维度,权衡特征提取需求与计算资源。
        在DQN中,合理选择网络结构是平衡性能与效率的核心。例如,Atari DQN的成功离不开CNN对图像特征的自动提取,而简单控制任务中MLP的轻量化优势则更为突出。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5.完整代码

        调试过程中的一些代码保留着,以提些许思考。

from typing import AsyncGenerator
import pygame
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import math
import osclass DeepQNetwork(nn.Module):def __init__(self, lr, input_dims, n_actions):super(DeepQNetwork, self).__init__()self.input_dims = input_dimsself.n_actions = n_actions# 网络增加一层self.fc1 = nn.Linear(*input_dims, 256)self.fc2 = nn.Linear(256, 256)self.fc3 = nn.Linear(256, n_actions)self.fc = nn.Linear(*input_dims, n_actions)self.optimizer = optim.Adam(self.parameters(), lr=lr)self.loss = nn.MSELoss()self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")self.to(self.device)def forward(self, state):# 激活函数也做了一定的尝试变更x = F.relu(self.fc1(state))x = F.tanh(self.fc2(x))# 输出action也做对应适配actions = self.fc3(x)return actionsclass Agent:def __init__(self, gamma, epsilson, lr, input_dims, batch_size, n_actions,max_mem_size=100000, eps_end=0.01, eps_dec=5e-4):self.gamma = gammaself.epsilon = epsilsonself.eps_min = eps_endself.eps_dec = eps_decself.lr = lrself.action_space = [i for i in range(n_actions)]self.mem_size = max_mem_size  # memory sizeself.batch_size = batch_sizeself.mem_cntr = 0  # memory currentself.input_dims = input_dimsself.n_actions = n_actionsself.Q_eval = DeepQNetwork(lr, input_dims, n_actions)self.Q_target = DeepQNetwork(lr, input_dims, n_actions)self.update_target_freq = 1000  # 每1000步同步一次self.tau = 0.05  # 软更新系数self.lr_min = 1e-6self.state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)self.new_state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)self.loss_value = 0# 新增学习率调度器(指数衰减)self.lr_scheduler = optim.lr_scheduler.ExponentialLR(self.Q_eval.optimizer,gamma=0.995  # 每episode学习率衰减0.5%)self.action_memory = np.zeros(self.mem_size, dtype=np.int32)self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)self.terminal_memory = np.zeros(self.mem_size, dtype=bool)def store_transition(self, state_old, action, reward, state_new, done):index = self.mem_cntr % self.mem_sizeself.state_memory[index] = state_oldself.new_state_memory[index] = state_newself.reward_memory[index] = rewardself.action_memory[index] = actionself.terminal_memory[index] = doneself.mem_cntr += 1def choose_action(self, observation):if np.random.random() > self.epsilon:state = torch.tensor([observation], dtype=torch.float32).to(self.Q_eval.device)actions = self.Q_eval.forward(state)action = torch.argmax(actions).item()else:action = np.random.choice(self.action_space)return actiondef soft_update_target(self):for target_param, param in zip(self.Q_target.parameters(),self.Q_eval.parameters()):target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)def learn_dqp(self, n_game):# TODO: 看一下这一块是不是论文中提到的每间几千步更新一下?# 这一块他设定了一个batch_size=64,即每64个为一批,然后通过模型更新一下参数if self.mem_cntr < self.batch_size:returnself.Q_eval.optimizer.zero_grad()max_mem = min(self.mem_cntr, self.mem_size)batch = np.random.choice(max_mem, self.batch_size, replace=False)batch_index = np.arange(self.batch_size, dtype=np.int32)state_batch = torch.tensor(self.state_memory[batch]).to(self.Q_eval.device)new_state_batch = torch.tensor(self.new_state_memory[batch]).to(self.Q_eval.device)reward_batch = torch.tensor(self.reward_memory[batch]).to(self.Q_eval.device)terminal_batch = torch.tensor(self.terminal_memory[batch]).to(self.Q_eval.device)action_batch = self.action_memory[batch]# TODO: 确认下为什么要用[batch_index, action_batch]----其目的是为每个样本选择实际执行动作的Q值。Q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]# 直接调self.Q_target(new_state_batch)和self.Q_eval.forward(new_state_batch)的效果是一样的q_next = self.Q_target(new_state_batch)# 终止状态(Terminal State):如果当前状态转移后进入终止状态(例如游戏结束、任务完成或失败),# 则没有下一个状态s′,因此未来累积奖励的期望值为0。q_next[terminal_batch] = 0.0# 目标网络对每个state对应能够得到的最大Q值的action# print(torch.max(q_next, dim=1))# 目标网络对每个state对应能够得到的最大Q值# print(torch.max(q_next, dim=1)[0])q_target = reward_batch + self.gamma * torch.max(q_next, dim=1)[0]# 估计Q值,因为Q值本来就是未来的预期奖励,我们通过估计未来的预期奖励和当前实际过得的预期奖励的差值,来更新# 模型,所以你最大化你的value function,就是在最优化你的policy本身loss = self.Q_eval.loss(q_target, Q_eval).to(self.Q_eval.device)self.loss_value = loss.item()loss.backward()# 添加梯度裁剪# torch.nn.utils.clip_grad_norm_(self.Q_eval.parameters(), max_norm=1.0)self.Q_eval.optimizer.step()# 学习率调整必须在参数更新之后if self.mem_cntr % 2000 == 0:if self.lr_scheduler.get_last_lr()[0] > self.lr_min:self.lr_scheduler.step()  # 调整学习率print("lr updated!, current lr = {}".format(self.lr_scheduler.get_last_lr()[0]))# 软更新目标网络self.soft_update_target()self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_mindef learn_double_dqp(self, n_game):# TODO: 看一下这一块是不是论文中提到的每间几千步更新一下?# 这一块他设定了一个batch_size=64,即每64个为一批,然后通过模型更新一下参数if self.mem_cntr < self.batch_size:returnself.Q_eval.optimizer.zero_grad()max_mem = min(self.mem_cntr, self.mem_size)batch = np.random.choice(max_mem, self.batch_size, replace=False)batch_index = np.arange(self.batch_size, dtype=np.int32)state_batch = torch.tensor(self.state_memory[batch]).to(self.Q_eval.device)new_state_batch = torch.tensor(self.new_state_memory[batch]).to(self.Q_eval.device)reward_batch = torch.tensor(self.reward_memory[batch]).to(self.Q_eval.device)terminal_batch = torch.tensor(self.terminal_memory[batch]).to(self.Q_eval.device)action_batch = self.action_memory[batch]# TODO: 确认下为什么要用[batch_index, action_batch]----其目的是为每个样本选择实际执行动作的Q值。Q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]# ========== Double DQN修改部分 ==========q_eval_next = self.Q_eval(new_state_batch)max_actions = torch.argmax(q_eval_next, dim=1)# 输出为[batch_size,action_size],每一行对应随机选取的state,每一列对应当前state对应的每个action估计的Q值# print(q_eval_next)# 输出为每个state下能够得到最大Q值的action,dim为1*batch_size# print(max_actions)# 输出为每个state下能够得到最大Q值的action,不过将dim转换为了batch_size*1# print(max_actions.unsqueeze(1))# 输出为,之前得到的每个state下对应的最优action,在当前网络中对应的Q值,dim=batch_size*1# print(self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)))# 输出为,之前得到的每个state下对应的最优action,在当前网络中对应的Q值,不过将dim转换为了1*batch_size# print(self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)).squeeze(1))q_next = self.Q_target(new_state_batch).gather(1, max_actions.unsqueeze(1)).squeeze(1)q_next[terminal_batch] = 0.0q_target = reward_batch + self.gamma * q_next# =======================================# 估计Q值,因为Q值本来就是未来的预期奖励,我们通过估计未来的预期奖励和当前实际过得的预期奖励的差值,来更新# 模型,所以你最大化你的value function,就是在最优化你的policy本身loss = self.Q_eval.loss(q_target, Q_eval).to(self.Q_eval.device)self.loss_value = loss.item()loss.backward()# 添加梯度裁剪# torch.nn.utils.clip_grad_norm_(self.Q_eval.parameters(), max_norm=1.0)self.Q_eval.optimizer.step()# 学习率调整必须在参数更新之后if self.mem_cntr % 2000 == 0:if self.lr_scheduler.get_last_lr()[0] > self.lr_min:self.lr_scheduler.step()  # 调整学习率print("lr updated!, current lr = {}".format(self.lr_scheduler.get_last_lr()[0]))# 软更新目标网络self.soft_update_target()self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_minWIDTH = 1920
HEIGHT = 1080
CAR_SIZE_X = 60
CAR_SIZE_Y = 60
BORDER_COLOR = (255, 255, 255, 255)  # Color To Crash on Hit
current_generation = 0  # Generation counterclass Car:def __init__(self, boundary_x, boundary_y, num_radar):# Load Car Sprite and Rotateself.sprite = pygame.image.load('car.png').convert()  # Convert Speeds Up A Lotself.sprite = pygame.transform.scale(self.sprite, (CAR_SIZE_X, CAR_SIZE_Y))self.rotated_sprite = self.sprite# self.position = [690, 740] # Starting Positionself.position = [830, 920]  # Starting Positionself.angle = 0self.angle_memory = []self.speed = 0self.speed_memory = []self.speed_set = False  # Flag For Default Speed Later onself.center = [self.position[0] + CAR_SIZE_X / 2, self.position[1] + CAR_SIZE_Y / 2]  # Calculate Centerself.radars = [[(0, 0), 60]] * num_radar  # List For Sensors / Radarsself.drawing_radars = []  # Radars To Be Drawnself.current_lateral_min_dist = 60self.alive = True  # Boolean To Check If Car is Crashedself.distance = 0  # Distance Drivenself.time = 0  # Time Passedself.width = 0self.height = 0self.boundary_x = boundary_xself.boundary_y = boundary_ydef draw(self, screen):screen.blit(self.rotated_sprite, self.position)  # Draw Spriteself.draw_radar(screen)  # OPTIONAL FOR SENSORSdef draw_radar(self, screen):# Optionally Draw All Sensors / Radarsfor radar in self.radars:position = radar[0]pygame.draw.line(screen, (0, 255, 0), self.center, position, 1)pygame.draw.circle(screen, (0, 255, 0), position, 5)def check_collision(self, game_map):self.alive = Truefor point in self.corners:# If Any Corner Touches Border Color -> Crash# Assumes Rectangleif game_map.get_at((int(point[0]), int(point[1]))) == BORDER_COLOR:self.alive = Falsebreakdef check_radar(self, degree, game_map):length = 0x = int(self.center[0] + math.cos(math.radians(360 - (self.angle + degree))) * length)y = int(self.center[1] + math.sin(math.radians(360 - (self.angle + degree))) * length)# While We Don't Hit BORDER_COLOR AND length < 300 (just a max) -> go further and furtherwhile not game_map.get_at((x, y)) == BORDER_COLOR and length < 300:length = length + 1x = int(self.center[0] + math.cos(math.radians(360 - (self.angle + degree))) * length)y = int(self.center[1] + math.sin(math.radians(360 - (self.angle + degree))) * length)# Calculate Distance To Border And Append To Radars List TODO: update dist calculatedist = int(math.sqrt(math.pow(x - self.center[0], 2) + math.pow(y - self.center[1], 2)))self.radars.append([(x, y), dist])def update(self, game_map):# Set The Speed To 20 For The First Time# Only When Having 4 Output Nodes With Speed Up and Downif not self.speed_set:self.speed = 10self.speed_set = Trueself.width, self.height = game_map.get_size()# Get Rotated Sprite And Move Into The Right X-Direction# Don't Let The Car Go Closer Than 20px To The Edgeself.rotated_sprite = self.rotate_center(self.sprite, self.angle)self.position[0] += math.cos(math.radians(360 - self.angle)) * self.speedself.position[0] = max(self.position[0], 20)self.position[0] = min(self.position[0], WIDTH - 120)# Increase Distance and Timeself.distance += self.speedself.time += 1# Same For Y-Positionself.position[1] += math.sin(math.radians(360 - self.angle)) * self.speedself.position[1] = max(self.position[1], 20)self.position[1] = min(self.position[1], WIDTH - 120)# Calculate New Centerself.center = [int(self.position[0]) + CAR_SIZE_X / 2, int(self.position[1]) + CAR_SIZE_Y / 2]# print("center: {}".format(self.center))# Calculate Four Corners# Length Is Half The Sidelength = 0.5 * CAR_SIZE_Xleft_top = [self.center[0] + math.cos(math.radians(360 - (self.angle + 30))) * length,self.center[1] + math.sin(math.radians(360 - (self.angle + 30))) * length]right_top = [self.center[0] + math.cos(math.radians(360 - (self.angle + 150))) * length,self.center[1] + math.sin(math.radians(360 - (self.angle + 150))) * length]left_bottom = [self.center[0] + math.cos(math.radians(360 - (self.angle + 210))) * length,self.center[1] + math.sin(math.radians(360 - (self.angle + 210))) * length]right_bottom = [self.center[0] + math.cos(math.radians(360 - (self.angle + 330))) * length,self.center[1] + math.sin(math.radians(360 - (self.angle + 330))) * length]self.corners = [left_top, right_top, left_bottom, right_bottom]# Check Collisions And Clear Radarsself.check_collision(game_map)self.radars.clear()# From -90 To 120 With Step-Size 45 Check Radarfor d in range(-120, 126, 15):  # -90,-45,0,45,90zself.check_radar(d, game_map)def get_data(self):# Get Distances To Borderreturn_values = [0] * len(self.radars)self.current_lateral_min_dist = 60for i, radar in enumerate(self.radars):return_values[i] = radar[1] / 300.0if radar[1] < self.current_lateral_min_dist:self.current_lateral_min_dist = radar[1]angle_rad = np.deg2rad(self.angle)return_values = return_values + [self.current_lateral_min_dist / 30,np.clip(self.speed / 20.0, 0.0, 1.0),np.sin(angle_rad), np.cos(angle_rad)]return return_valuesdef is_alive(self):# Basic Alive Functionreturn self.alivedef get_reward_optimized(self, game_map, alive_count_total):# 居中性lateral_reward = 1.0# print(self.current_lateral_min_dist)if self.current_lateral_min_dist / 60 > 0.5:lateral_reward = self.current_lateral_min_dist / 60elif self.current_lateral_min_dist / 60 < 0.4:lateral_reward = -0.5else:lateral_reward = 0.0# 速度基础speed_base_reward = self.speed / min(12 + alive_count_total / 100000, 20)# 速度连续性if len(self.speed_memory) >= 4:self.speed_memory = self.speed_memory[1:]self.speed_memory.append(self.speed)speed_up_discount = 1.0if self.speed_memory[-1] - self.speed_memory[0] >= 3 and lateral_reward > 0.0:speed_up_discount = -0.5elif self.speed_memory[-1] - self.speed_memory[0] >= 2 and lateral_reward > 0.0:speed_up_discount = 0.7# 转角连续性angle_discount = 1.0if len(self.angle_memory) >= 5:self.angle_memory = self.angle_memory[1:]self.angle_memory.append(self.angle)aaa = [0] * 4if len(self.angle_memory) >= 5:for i in range(1, 5):aaa[i-1] = self.angle_memory[i] - self.angle_memory[i-1]bbb = [0] * 3for j in range(1, 4):bbb[j-1] = 1 if aaa[j-1] * aaa[j] < 0 else 0if sum(bbb) >= 3 and lateral_reward > 0.0:angle_discount = 0.8# print(lateral_reward, speed_up_discount, angle_discount, " ====== ", self.speed_memory)return 100 * lateral_reward * speed_up_discount * angle_discountdef rotate_center(self, image, angle):# Rotate The Rectanglerectangle = image.get_rect()rotated_image = pygame.transform.rotate(image, angle)rotated_rectangle = rectangle.copy()rotated_rectangle.center = rotated_image.get_rect().centerrotated_image = rotated_image.subsurface(rotated_rectangle).copy()return rotated_imagedef train():pygame.init()screen = pygame.display.set_mode((WIDTH, HEIGHT))game_map = pygame.image.load('map.png').convert()  # Convert Speeds Up A Lotclock = pygame.time.Clock()num_radar = 17agent = Agent(gamma=0.99, epsilson=1.0, batch_size=256, n_actions=9,eps_end=0.1, input_dims=[num_radar + 4], lr=0.005,max_mem_size=1000000, eps_dec=1e-4)scores, eps_history = [], []average_scores = []distance = []average_distance = []alive_counts = []average_alive_counts = []n_games = 500optimal_score_and_episode = [0, 0]for i in range(n_games):car = Car([], [], num_radar)done = Falsescore = 0observation = car.get_data()alive_count = 0while not done:action = agent.choose_action(observation)# [5,0],[2,1],[2,-1],[0,1],[0,0],[0,-1],[-2,1],[-2,-1],[-5,0]if action == 0:car.angle += 5car.speed += 0elif action == 1:car.angle += 2if car.speed < 20:car.speed += 1elif action == 2:car.angle += 2if car.speed - 1 >= 12:car.speed -= 1elif action == 3:car.angle += 0if car.speed < 20:car.speed += 1elif action == 4:car.angle += 0car.speed += 0elif action == 5:car.angle += 0if car.speed - 1 >= 12:car.speed -= 1elif action == 6:car.angle -= 2if car.speed < 20:car.speed += 1elif action == 7:car.angle -= 2if car.speed - 1 >= 12:car.speed -= 1else:car.angle -= 5car.speed += 0screen.blit(game_map, (0, 0))car.update(game_map)car.draw(screen)pygame.display.flip()clock.tick(60)observation_, reward, done = car.get_data(), car.get_reward_optimized(game_map,agent.mem_cntr), not car.is_alive()score += rewardagent.store_transition(observation, action, reward, observation_, done)agent.learn_double_dqp(i)observation = observation_alive_count += 1if score > optimal_score_and_episode[0]:print("-----optimal_score_and_episode updated form {} to {}.".format(optimal_score_and_episode,[score, i]))optimal_score_and_episode = [score, i]state = {'eval_state': agent.Q_eval.state_dict(),'target_state': agent.Q_target.state_dict(),'eval_optimizer_state': agent.Q_eval.optimizer.state_dict(),'target_optimizer_state': agent.Q_target.optimizer.state_dict(),'optimal_score_and_episode': optimal_score_and_episode}torch.save(state, f'./dqn_eval.pth')# TODO:待改进,加载模型创建临时环境让小车随即初始化位置跑若干次,并且跑的时候保留最低的探索率,#      取score平均值,平均值大的模型保存下来scores.append(score)eps_history.append(agent.epsilon)avg_score = np.mean(scores[-100:])average_scores.append(avg_score)distance.append(car.distance)avg_distance = np.mean(distance[-100:])average_distance.append(avg_distance)alive_counts.append(alive_count)avg_alive_count = np.mean(alive_counts[-100:])average_alive_counts.append(avg_alive_count)# 打印当前学习率(调试用)current_lr = agent.lr_scheduler.get_last_lr()[0]print(f'episode: {i}, current_lr: {current_lr}, score= {round(score, 2)}, epsilon= {round(agent.epsilon, 3)},'f' avg_score= {round(avg_score, 2)}, distance= {round(car.distance)}, loss= {round(agent.loss_value)}, 'f'alive_count= {round(alive_count)}, mem_cntr= {agent.mem_cntr}')plt.subplot(1, 2, 1)plt.plot([i for i in range(0, n_games)], average_scores)plt.title("average_scores")plt.subplot(1, 2, 2)plt.plot([i for i in range(0, n_games)], average_distance)plt.title("average_distance")plt.show()if __name__ == '__main__':train()

相关文章:

基于DQN的自动驾驶小车绕圈任务

1.任务介绍 任务来源: DQN: Deep Q Learning &#xff5c;自动驾驶入门&#xff08;&#xff1f;&#xff09; &#xff5c;算法与实现 任务原始代码: self-driving car 最终效果&#xff1a; 以下所有内容&#xff0c;都是对上面DQN代码的改进&#…...

terraform resource创建了5台阿里云ecs,如要使用terraform删除其中一台主机,如何删除?

在 Terraform 中删除阿里云 5 台 ECS 实例中的某一台&#xff0c;具体操作取决于你创建资源时使用的 多实例管理方式&#xff08;count 或 for_each&#xff09;。以下是详细解决方案&#xff1a; 方法一&#xff1a;使用 for_each&#xff08;推荐&#xff09; 如果创建时使…...

【Linux】Linux工具(1)

3.Linux工具&#xff08;1&#xff09; 文章目录 3.Linux工具&#xff08;1&#xff09;Linux 软件包管理器 yum什么是软件包关于 rzsz查看软件包——yum list命令如何安装软件如何卸载软件补充——yum如何找到要安装软件的下载地址 Linux开发工具Linux编辑器-vim使用1.vim的基…...

探索大语言模型(LLM):词袋法(Bag of Words)原理与实现

文章目录 引言一、词袋法原理1.1 核心思想1.2 实现步骤 二、数学公式2.1 词频表示2.2 TF-IDF加权&#xff08;可选&#xff09; 三、示例表格3.1 构建词汇表3.2 文本向量化&#xff08;词频&#xff09; 四、Python代码实现4.1 基础实现&#xff08;手动计算&#xff09;4.2 输…...

vue引入物理引擎matter.js

vue引入物理引擎matter.js 在 Vue 项目中集成 Matter.js 物理引擎的步骤如下: 1. 安装 Matter.js npm install matter-js # 或 yarn add matter-js2. 创建 Vue 组件 <template><div ref="physicsContainer" class="physics-container"><…...

基于 Spring Boot 瑞吉外卖系统开发(十一)

基于 Spring Boot 瑞吉外卖系统开发&#xff08;十一&#xff09; 菜品启售和停售 “批量启售”、“批量停售”、操作列的售卖状态绑定单击事件&#xff0c;触发单击事件时&#xff0c;最终携带需要修改售卖状态的菜品id以post请求方式向“/dish/status/{params.status}”发送…...

支持鸿蒙next的uts插件

*本文共四个功能函数&#xff0c;相当于四个插件。作者为了偷懒写成了一个插件&#xff0c;调对应的函数即可。 1、chooseImageHarmony函数&#xff1a;拉起相册选择图片并转为Base64 2、takePhotoAndConvertToBase64函数&#xff1a;拉起相机拍照并转为Base64 3、openBrows…...

深入理解负载均衡:传输层与应用层的原理与实战

目录 前言1. 传输层&#xff08;Layer 4&#xff09;负载均衡1.1 工作层级与核心机制1.2 实现方式详解1.3 优缺点分析1.4 典型实现工具 2. 应用层&#xff08;Layer 7&#xff09;负载均衡2.1 工作层级与核心机制2.2 实现方式解析2.3 优缺点分析2.4 常用实现工具 3. Layer 4 与…...

WPF之Slider控件详解

文章目录 1. 概述2. 基本属性2.1 值范围属性2.2 滑动步长属性2.3 刻度显示属性2.4 方向属性2.5 选择范围属性 3. 事件处理3.1 值变化事件3.2 滑块拖动事件 4. 样式和模板自定义4.1 基本样式设置4.2 控件模板自定义 5. 数据绑定5.1 绑定到ViewModel5.2 同步多个控件 6. 实际应用…...

极狐GitLab 如何将项目共享给群组?

极狐GitLab 是 GitLab 在中国的发行版&#xff0c;关于中文参考文档和资料有&#xff1a; 极狐GitLab 中文文档极狐GitLab 中文论坛极狐GitLab 官网 共享项目和群组 (BASIC ALL) 在极狐GitLab 16.10 中&#xff0c;更改为在成员页面的成员选项卡上显示被邀请群组成员&#xf…...

企业微信自建消息推送应用

企业微信自建应用来推送消息 前言 最近有个给特定部门推送消息的需求&#xff0c;所以配置一个应用专门用来推送消息。实现过程大致为&#xff1a;服务器生成每天的报告&#xff0c;通过调用API来发送消息。以前一直都是发邮件&#xff0c;整个邮箱里全是报告文件&#xff0c…...

【React】Hooks useReducer 详解,让状态管理更可预测、更高效

1.背景 useReducer是React提供的一个高级Hook,没有它我们也可以正常开发&#xff0c;但是useReducer可以使我们的代码具有更好的可读性&#xff0c;可维护性。 useReducer 跟 useState 一样的都是帮我们管理组件的状态的&#xff0c;但是呢与useState不同的是 useReducer 是集…...

日志之ClickHouse部署及替换ELK中的Elasticsearch

文章目录 1 ELK替换1.1 Elasticsearch vs ClickHouse1.2 环境部署1.2.1 zookeeper 集群部署1.2.2 Kafka 集群部署1.2.3 FileBeat 部署1.2.4 clickhouse 部署1.2.4.1 准备步骤1.2.4.2 添加官方存储库1.2.4.3 部署&启动&连接1.2.4.5 基本配置服务1.2.4.6 测试创建数据库和…...

亚远景-ASPICE vs ISO 21434:汽车软件开发标准的深度对比

ASPICE&#xff08;Automotive SPICE&#xff09;和ISO 21434是汽车软件开发领域的两大核心标准&#xff0c;分别聚焦于过程质量与网络安全。以下从核心目标、覆盖范围、实施重点、协同关系及行业价值五个维度进行深度对比分析&#xff1a; 一、核心目标对比 ASPICE&#xff1…...

51单片机快速成长路径

作为在嵌入式领域深耕18年的工程师&#xff0c;分享一条经过工业验证的51单片机快速成长路径&#xff0c;全程干货无注水&#xff1a; 一、突破认知误区&#xff08;新手必看&#xff09; 不要纠结于「汇编还是C」&#xff1a;现代开发90%场景用C&#xff0c;掌握指针和内存管…...

使用 NGINX 实现 HTTP Basic 认证ngx_http_auth_basic_module 模块

一、前言 在 Web 应用中&#xff0c;对部分资源进行访问控制是十分常见的需求。除了基于 IP 限制、JWT 验证、子请求校验等方式外&#xff0c;最经典也最简单的一种方式便是 HTTP Basic Authentication。NGINX 提供的 ngx_http_auth_basic_module 模块支持基于用户名和密码的基…...

解构与重构:自动化测试框架的进阶认知之旅

目录 一、自动化测试的介绍 &#xff08;一&#xff09;自动化测试的起源与发展 &#xff08;二&#xff09;自动化测试的定义与目标 &#xff08;三&#xff09;自动化测试的适用场景 二、什么是自动化测试框架 &#xff08;一&#xff09;自动化测试框架的定义 &#x…...

DockerDesktop替换方案

背景 由于DockerDesktop并非开源软件&#xff0c;如果在公司使用&#xff0c;可能就有一些限制&#xff0c;那是不是除了使用DockerDesktop外&#xff0c;就没其它办法了呢&#xff0c;现在咱们来说说替换方案。 WSL WSL是什么&#xff0c;可自行百度&#xff0c;这里引用WS…...

力扣热题100之搜索二维矩阵 II

题目 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。 每列的元素从上到下升序排列。 代码 方法一&#xff1a;直接全体遍历 这个方法很直接&#xff0c;但是居然没有超时&#xff0c…...

docker操作镜像-以mysql为例

Docker安装使用-CSDN博客 docker操作镜像-以mysql为例 当安装一个新的镜像时可以登录https://hub.docker.com/直接搜索想要安装的镜像&#xff0c;查看文档 1&#xff09;拉取镜像 docker pull mysql 或者 docker pull mysql:版本号 然后直接跳到第4&#xff09;步即可 2…...

使用OpenCV 和 Dlib 进行卷积神经网络人脸检测

文章目录 引言1.准备工作2.代码解析2.1 导入必要的库2.2 加载CNN人脸检测模型2.3 加载并预处理图像2.4 进行人脸检测2.5 绘制检测结果2.6 显示结果 3.完整代码4.性能考虑5.总结 引言 人脸检测是计算机视觉中最基础也最重要的任务之一。今天我将分享如何使用dlib库中的CNN人脸检…...

React 实现 JWT 登录验证的最小可运行示例

下面是一个用 React 实现 JWT 登录验证的最小可运行示例&#xff0c;包含&#xff1a; React 前端&#xff1a;登录、保存 Token、获取用户数据。模拟后端&#xff1a;用 mock API&#xff08;你也可以接真后端&#xff09;。 &#x1f9f1; 技术栈 React&#xff08;使用 Vi…...

Power Query精通指南1:查询结构设计、数据类型、数据导入与迁移(平面文件、Excel、Web)

文章目录 零、Power Query简介0.1 Power Query 主要功能0.2 Power Query 的优势0.3 Power Query 组件 一、Power Query数据处理基本流程1.1 前期准备1.2 提取1.3 转换1.3.1 Power Query 编辑器界面1.3.2 默认转换1.3.3 自定义转换 1.4 加载1.4.1 自动检测数据类型1.4.2 重命名查…...

vue2开发者sass预处理注意

vue2开发者sass预处理注意 sass的预处理器&#xff0c;早年使用node-sass&#xff0c;也就是vue2最初默认的编译器。 sass官方推出了dart-sass来替代。 node-sass已经停维很久了。 vue3默认使用的是dart-sass。 Uniapp的官方文档截图 从 HBuilderX 4.56 &#xff0c;vue2 …...

淘宝按图搜索商品(拍立淘)Java 爬虫实战指南

在电商领域&#xff0c;按图搜索商品功能为用户提供了更直观、便捷的购物体验。淘宝的拍立淘功能更是凭借其强大的图像识别技术&#xff0c;成为许多开发者和商家关注的焦点。本文将详细介绍如何利用 Java 爬虫技术实现淘宝按图搜索商品功能&#xff0c;包括注册账号、上传图片…...

安卓基础(封装引用)

​​情况 1&#xff1a;普通 Java 项目&#xff08;非 Android&#xff09;​​ src/ ├── com/ │ ├── example/ │ │ ├── utils/ │ │ │ └── A.java // 工具类 A │ │ └── main/ │ │ └── B.java // 主类 B ​​A…...

深入理解 Docker 网络原理:构建高效、灵活的容器网络

在现代软件开发中&#xff0c;Docker 已经成为了容器化技术的代名词&#xff0c;广泛应用于开发、测试和生产环境。Docker 使得开发者能够将应用及其依赖打包成一个轻量级的容器&#xff0c;并通过 Docker 容器化技术来实现高效的部署与管理。 然而&#xff0c;在日常使用 Dock…...

使用 Selenium 爬取动态网页数据 —— 实战与坑点详解

本文记录了笔者在爬取网页数据过程中遇到的各种技术挑战&#xff0c;包括页面动态渲染、JavaScript 注入等问题&#xff0c;并最终给出一个可运行的完整方案。 文章目录 网页获取不到数据&#x1f680; 尝试用 Selenium 渲染页面 网页获取不到数据 某网页数据依赖大量 JavaSc…...

React 笔记[1] hello world

React 笔记[1] hello world 明白了&#xff01;既然你已经安装了 Node.js&#xff0c;我们可以 从零开始搭建一个 React Tailwind CSS 的 Hello World 项目。我将一步步列出操作指令&#xff0c;你只需要在终端里依次执行。 ✅ 第一步&#xff1a;初始化项目 mkdir my-hello…...

Verilog Test Fixture 时钟激励

1、占空比50%时钟产生 always begin<clock> 1b0 ;#<PERIOD/2> ;<clock> 1b1 ;#<PERIOD/2> ; end reg <clock> 1b0 ;alwaysbegin#<PERIOD/2> ;<clock> ~<clock> ;end 2…...