2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了

写在前面

在学习自动驾驶领域上的强化学习过程中,我决定使用highwy-env库建设的模拟器来进行环境构建,但是翻阅了众多教程(包含国内国外)之后,发现教程内容过旧,因为随着2023年的到来,highway-env库也进行了更新,前两年的教程无一例外都使用了老旧版本的函数和返回值。

highway-env是什么东西?

安装方式:(默认最新版)pip install highway-env

首先先列出我发现的新库中的改动:

以前返回值有四个:

        observation, reward, done, info = env.step(action)

现在返回值有五个:

        observation, reward, terminated, truncated, info = env.step(action)

我推测以前的环境数据形式是ndarray数组:

        data = env.reset()

        data = (arragry([[ndarray],[],[],…,[]]),type==dtype32)

现在的环境数据形式是元组:

        data = env.reset()

        data = ((arragry([[ndarray],[],[],…,[]]),type==dtype32,{reward:{},terminated:{},…,})

基于以上改动,那么在代码中的数据处理部分也会相应地发生改变。特别是在使用多个库的时候,需要注意版本关联问题。

参考的一些代码

我的虚拟环境配置:(GPU)

虚拟环境是什么东西?来人,喂它吃九转大肠

其中必须用到的主要有以下几个:

基于 python 3.8.0

pytorch

gym

highway

tqdm

matplotlib

pygame

numpy

highway-env

使用DoubleDQN算法进行训练,此后还有在此基础上的其他改动。

默认创建python文件double_dqn.py,以下为文件中代码,拼在一起就是完整的。

注释是英文是因为我做的是英文的项目,简单翻译即可。

所使用的库

import os
import copy
import random
import time
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F

import gym
import highway_env

检测设备并初始化默认十字路口环境

# set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Author: Da Xuanzi 2023-2-17

# Define the environment
env = gym.make("intersection-v0")
# details
env.config["duration"] = 13
env.config["vehicles_count"] = 20
env.config["vehicles_density"] = 1.3
env.config["reward_speed_range"] = [7.0, 10.0]
env.config["initial_vehicle_count"] = 10
env.config["simulation_frequency"] = 15
env.config["arrived_reward"] = 2
env.reset()

十字路口环境的结构:

env.config
{
    "observation": {
        "type": "Kinematics",
        "vehicles_count": 15,
        "features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],
        "features_range": {
            "x": [-100, 100],
            "y": [-100, 100],
            "vx": [-20, 20],
            "vy": [-20, 20],
        },
        "absolute": True,
        "flatten": False,
        "observe_intentions": False
    },
    "action": {
        "type": "DiscreteMetaAction",
        "longitudinal": False,
        "lateral": True
    },
    "duration": 13,  # [s]
    "destination": "o1",
    "initial_vehicle_count": 10,
    "spawn_probability": 0.6,
    "screen_width": 600,
    "screen_height": 600,
    "centering_position": [0.5, 0.6],
    "scaling": 5.5 * 1.3,
    "collision_reward": IntersectionEnv.COLLISION_REWARD,
    "normalize_reward": False
}

构建网络

可以自定义隐藏层节点个数

class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        # super class
        super(Net, self).__init__()
        # hidden nodes define
        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(state_dim, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim)

    def forward(self, state):
        # define forward pass of the actor
        x = state # state
        # Relu function double
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out

构建学习器

class Replay: # learning
    def __init__(self,
                 buffer_size, init_length, state_dim, action_dim, env):
        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.env = env

        self._storage = []
        self._init_buffer(init_length)
    def _init_buffer(self, n):
        # choose n samples state taken from random actions
        state = self.env.reset()
        for i in range(n):
            action = self.env.action_space.sample()
            observation, reward, done, truncated, info = self.env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self._storage.append(exp)
            state = observation

            if done:
                state = self.env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0)  # remove the last one in dict

    def buffer_sample(self, n):
        # random n samples from exp buffer
        return random.sample(self._storage, n)

构建学习对象

PATH = 你的文件夹绝对路径/相对路径

class DOUBLEDQN(nn.Module):
    def __init__(
        self,
            env, # gym environment
            state_dim, # state size
            action_dim, # action size
        lr = 0.001, # learning rate
        gamma = 0.99, # discount factor
        batch_size = 5, # batch size for each training
        timestamp = "",):
        # super class
        super(DOUBLEDQN, self).__init__()
        self.env = env
        self.env.reset()
        self.timestamp = timestamp
        # for evaluation purpose
        self.test_env = copy.deepcopy(env)
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0
        self.is_rend = False

        self.target_net = Net(self.state_dim, self.action_dim).to(device)#TODO
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)#TODO
        self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)#TODO
        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

    def choose_our_action(self, state, epsilon = 0.9):
        # greedy strategy for choosing action
        # state: ndarray environment state
        # epsilon: float in [0,1]
        # return: action we chosen
        # turn to 1D float tensor -> [[a1,a2,a3,...,an]]
        # we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model
        # ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]
        if type(state) == type((1,)):
            state = state[0]
        temp = [exp for exp in state]
        target = []
        target = np.array(target)
        # n dimension to 1 dimension ndarray
        for i in temp:
            target = np.append(target,i)
        state = torch.FloatTensor(target).to(device)
        # randn() return a set of samples which are Gaussian distribution
        # no argments -> return a float number
        if np.random.randn() <= epsilon:
            # when random number smaller than epsilon: do these things
            # put a state array into estimate net to obtain their value array
            # choose max values in value array -> obtain action
            action_value = self.estimate_net(state)
            action = torch.argmax(action_value).item()
        else:
            # when random number bigger than epsilon: randomly choose a action
            action = np.random.randint(0, self.action_dim)

        return action

    def train(self, num_episode):
        # num_eposide: total turn number for train
        loss_list = [] # loss set
        avg_reward_list = [] # reward set
        episode_reward = 0
        rend = 0
        # tqdm : a model for showing process bar
        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = self.env.reset()
            each_loss = 0
            step = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                if self.is_rend:
                    self.env.render()
                step +=1
                action = self.choose_our_action(state)
                observation, reward, done, truncated, info = self.env.step(action)
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # sample random batch in replay memory
                exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)
                # extract batch data
                action_batch = torch.LongTensor(
                    [exp["action"] for exp in exp_batch]
                ).to(device)
                reward_batch = torch.FloatTensor(
                    [exp["reward"] for exp in exp_batch]
                ).to(device)
                done_batch = torch.FloatTensor(
                    [1 - exp["done"] for exp in exp_batch]
                ).to(device)
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)
                state_batch = torch.FloatTensor(state_temp_list).to(device)


                # reshape
                state_batch = state_batch.reshape(self.batch_size, -1)
                action_batch = action_batch.reshape(self.batch_size, -1)
                reward_batch = reward_batch.reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.reshape(self.batch_size, -1)
                done_batch = done_batch.reshape(self.batch_size, -1)

                # obtain estimate Q value gather(dim, index) dim==1:column index
                estimate_Q_value = self.estimate_net(state_batch).gather(1, action_batch)
                # obtain target Q value detach:remove the matched element
                max_action_index = self.estimate_net(state_next_batch).detach().argmax(1)
                target_Q_value = reward_batch + done_batch * self.gamma * self.target_net(
                    state_next_batch
                ).gather(1, max_action_index.unsqueeze(1))# squeeze(1) n*1->1*1, unsqueeze(1) 1*1->n*1

                # mse_loss: mean loss
                loss = F.mse_loss(estimate_Q_value, target_Q_value)
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # update target network
                # load parameters into model
                if self.learn_step_counter % 10 == 0:
                    self.target_net.load_state_dict(self.estimate_net.state_dict())
                self.learn_step_counter +=1

            reward, count = self.eval()
            episode_reward += reward

            # you can update these variables
            if episode_reward % 100 == 0:
                rend += 1
                if rend % 5 == 0:
                    self.is_rend = True
                else:
                    self.is_rend = False
            # save
            period = 1
            if episode % period == 0:
                each_loss /= step
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(
                    episode, num_episode, each_loss, episode_reward, count
                ))

                # episode_reward = 0
                # create a new directory for saving
                path = PATH + "/" + self.timestamp
                try:
                    os.makedirs(path)
                except OSError:
                    pass
                # saving as timestamp file
                np.save(path + "/DOUBLE_DQN_LOSS.npy", loss_list)
                np.save(path + "/DOUBLE_DQN_EACH_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(), path + "/DOUBLE_DQN_params.pkl")

        self.env.close()
        return loss_list, avg_reward_list

    def eval(self):
        # evaluate the policy
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
     
        while not done:
            action = self.choose_our_action(state, epsilon = 1)
            observation, reward, done, truncated, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = observation

        return total_reward, count

构建运行函数

超参数可以自己设置 lr gamma

if __name__ == "__main__":

    # timestamp
    named_tuple = time.localtime()
    time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)
    print(time_string)
    # create a doubledqn object
    double_dqn_object = DOUBLEDQN(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        batch_size=64,
        timestamp=time_string,
    )
    # your chosen train times
    iteration = 20
    # start training
    avg_loss, avg_reward_list = double_dqn_object.train(iteration)
    path = PATH + "/" + time_string
    np.save(path + "/DOUBLE_DQN_LOSS.npy", avg_loss)
    np.save(path + "/DOUBLE_DQN_EACH_REWARD.npy", avg_reward_list)
    torch.save(double_dqn_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_params.pkl")
    torch.save(double_dqn_object.state_dict(), path + "/DOUBLE_DQN_MODEL.pt")

使用数据进行绘制图片

新建文件draw_figures.py

?处自己替换成自己的路径

import matplotlib.pyplot as plt
import numpy as np
Loss = r"?\?\DOUBLE_DQN_LOSS.npy"
Reward = r"?\?\DOUBLE_DQN_EACH_REWARD.npy"
avg_loss = np.load(Loss)
avg_reward_list = np.load(Reward)
# print("loss", avg_loss)
# print("reward", avg_reward_list)
plt.figure(figsize=(10, 6))
plt.plot(avg_loss)
plt.grid()
plt.title("Double DQN Loss")
plt.xlabel("epochs")
plt.ylabel("loss")
plt.savefig(r"?\figures\double_dqn_loss.png", dpi=150)
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(avg_reward_list)
plt.grid()
plt.title("Double DQN Training Reward")
plt.xlabel("epochs")
plt.ylabel("reward")
plt.savefig(r"?\figures\double_dqn_train_reward.png", dpi=150)
plt.show()

提纳里手动分割线

Dueling_DQN

以上基本稍作改动即可

class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        """
        Initialize the network

        : param state_dim: int, size of state space
        : param action_dim: int, size of action space
        """
        super(Net, self).__init__()

        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(state_dim, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim + 1)

    def forward(self, state):
        """
        Define the forward pass of the actor

        : param state: ndarray, the state of the environment
        """
        x = state
        # print(x.shape)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out

class Replay: # learning
    def __init__(self,
                 buffer_size, init_length, state_dim, action_dim, env):
        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.env = env

        self._storage = []
        self._init_buffer(init_length)
    def _init_buffer(self, n):
        # choose n samples state taken from random actions
        state = self.env.reset()
        for i in range(n):
            action = self.env.action_space.sample()
            observation, reward, done, truncated, info = self.env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self._storage.append(exp)
            state = observation

            if done:
                state = self.env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0)  # remove the last one in dict

    def buffer_sample(self, n):
        # random n samples from exp buffer
        return random.sample(self._storage, n)

class DUELDQN(nn.Module):
    def __init__(
        self,
        env,
        state_dim,
        action_dim,
        lr=0.001,
        gamma=0.99,
        batch_size=5,
        timestamp="",
    ):
        """
        : param env: object, a gym environment
        : param state_dim: int, size of state space
        : param action_dim: int, size of action space
        : param lr: float, learning rate
        : param gamma: float, discount factor
        : param batch_size: int, batch size for training
        """
        super(DUELDQN, self).__init__()

        self.env = env
        self.env.reset()
        self.timestamp = timestamp

        self.test_env = copy.deepcopy(env)  # for evaluation purpose
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0
        self.is_rend =False

        self.target_net = Net(self.state_dim, self.action_dim).to(device)
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)
        self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)

        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

    def choose_action(self, state, epsilon=0.9):
        # greedy strategy for choosing action
        # state: ndarray environment state
        # epsilon: float in [0,1]
        # return: action we chosen
        # turn to 1D float tensor -> [[a1,a2,a3,...,an]]
        # we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model
        # ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]
        if type(state) == type((1,)):
            state = state[0]
        temp = [exp for exp in state]
        target = []
        target = np.array(target)
        # n dimension to 1 dimension ndarray
        for i in temp:
            target = np.append(target, i)
        state = torch.FloatTensor(target).to(device)
        # randn() return a set of samples which are Gaussian distribution
        # no argments -> return a float number
        if np.random.randn() <= epsilon:
            # when random number smaller than epsilon: do these things
            # put a state array into estimate net to obtain their value array
            # choose max values in value array -> obtain action
            action_value = self.estimate_net(state)
            action_value = action_value[:-1]
            action = torch.argmax(action_value).item()
        else:
            # when random number bigger than epsilon: randomly choose a action
            action = np.random.randint(0, self.action_dim)

        return action

    def calculate_duelling_q_values(self, duelling_q_network_output):
        """
        Calculate the Q values using the duelling network architecture. This is equation (9) in the paper.

        :param duelling_q_network_output: tensor, output of duelling q network
        :return: Q values
        """
        state_value = duelling_q_network_output[:, -1]
        avg_advantage = torch.mean(duelling_q_network_output[:, :-1], dim=1)
        q_values = state_value.unsqueeze(1) + (
            duelling_q_network_output[:, :-1] - avg_advantage.unsqueeze(1)
        )
        return q_values

    def train(self, num_episode):
        # num_eposide: total turn number for train
        loss_list = [] # loss set
        avg_reward_list = [] # reward set
        episode_reward = 0

        # tqdm : a model for showing process bar
        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = self.env.reset()
            each_loss = 0
            step = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                if self.is_rend:
                    self.env.render()
                step += 1
                action = self.choose_action(state)
                observation, reward, done, truncated, info = self.env.step(action)
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # sample random batch in replay memory
                exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)
                # extract batch data
                action_batch = torch.LongTensor(
                    [exp["action"] for exp in exp_batch]
                ).to(device)
                reward_batch = torch.FloatTensor(
                    [exp["reward"] for exp in exp_batch]
                ).to(device)
                done_batch = torch.FloatTensor(
                    [1 - exp["done"] for exp in exp_batch]
                ).to(device)
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)
                state_batch = torch.FloatTensor(state_temp_list).to(device)


                # reshape
                state_batch = state_batch.reshape(self.batch_size, -1)
                action_batch = action_batch.reshape(self.batch_size, -1)
                reward_batch = reward_batch.reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.reshape(self.batch_size, -1)
                done_batch = done_batch.reshape(self.batch_size, -1)

                # get estimate Q value
                estimate_net_output = self.estimate_net(state_batch)
                estimate_Q = self.calculate_duelling_q_values(estimate_net_output)
                estimate_Q = estimate_Q.gather(1, action_batch)

                # get target Q value
                max_action_idx = (
                    self.estimate_net(state_next_batch)[:, :-1].detach().argmax(1)
                )
                target_net_output = self.target_net(state_next_batch)
                target_Q = self.calculate_duelling_q_values(target_net_output).gather(
                    1, max_action_idx.unsqueeze(1)
                )
                target_Q = reward_batch + done_batch * self.gamma * target_Q

                # compute mse loss
                loss = F.mse_loss(estimate_Q, target_Q)
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # update target network
                if self.learn_step_counter % 10 == 0:
                    self.target_net.load_state_dict(self.estimate_net.state_dict())
                self.learn_step_counter += 1

            reward, count = self.eval()
            episode_reward += reward

            # save
            period = 1
            if episode % period == 0:
                each_loss /= step
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(
                    episode, num_episode, each_loss, episode_reward, count
                ))

                # epoch_reward = 0
                path = PATH + "/" + self.timestamp
                # create a new directory for saving
                try:
                    os.makedirs(path)
                except OSError:
                    pass

                np.save(path + "/DUELING_DQN_LOSS.npy", loss_list)
                np.save(path + "/DUELING_DQN_EACH_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(), path + "/DUELING_DQN_params.pkl")

        self.env.close()
        return loss_list, avg_reward_list

    def eval(self):
        # evaluate the policy
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
        while not done:
            action = self.choose_action(state, epsilon=1)
            state_next, reward, done, _, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = state_next

        return total_reward, count


if __name__ == "__main__":

    # timestamp for saving
    named_tuple = time.localtime()  # get struct_time
    time_string = time.strftime(
        "%Y-%m-%d-%H-%M", named_tuple
    )  # have a folder of "date+time ex: 1209_20_36 -> December 12th, 20:36"

    duel_dqn_object = DUELDQN(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        batch_size=64,
        timestamp=time_string,
    )
    path = PATH + "/" + time_string
    # Train the policy
    iterations = 10
    avg_loss, avg_reward_list = duel_dqn_object.train(iterations)
    np.save(path + "/DUELING_DQN_LOSS.npy", avg_loss)
    np.save(path + "/DUELING_DQN_EACH_REWARD.npy", avg_reward_list)
    torch.save(duel_dqn_object.estimate_net.state_dict(), path + "/DUELING_DQN_params.pkl")
    torch.save(duel_dqn_object.state_dict(), path + "/DUELING_DQN_MODEL.pt")

DDQN+OtherChanges

三层2D卷积

# add CNN structure
class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        # initalize the network
        # state_dim: state space
        # action_dim: action space
        super(Net, self).__init__()

        # nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # in_channel : input size = in_channels * in_N * in_N
        # out_channel : define
        # kernel_size : rules or define
        # stride: step length
        # padding: padding size
        # out_N = (in_N - Kernel_size + 2 * Padding)/ Stride +1
        self.cnn = nn.Sequential(
            # the first 2D convolutional layer
            nn.Conv2d(1, 4, kernel_size=3, padding=1),
            nn.BatchNorm2d(4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=1),
            # the second 2D convolutional layer
            nn.Conv2d(4, 8, kernel_size=3, padding=1),
            nn.BatchNorm2d(8),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=1),
            # the third 2D convolutional layer ---- my test and try or more convolutional layers
            nn.Conv2d(8, 4, kernel_size=3, padding=1),
            nn.BatchNorm2d(4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=1),
        )

        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(4 * 1 * 9, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim)

    def forward(self, state):
        # define forward pass of the actor
        x = state # state
        x = self.cnn(x)
        x = x.view(x.size(0), -1)
        # Relu function double
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out


class Replay:
    def __init__(self, buffer_size, init_length, state_dim, action_dim, env):

        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.env = env

        self._storage = []
        self._init_buffer(init_length)

    def _init_buffer(self, n):
        # choose n samples state taken from random actions
        state = self.env.reset()
        for i in range(n):
            action = self.env.action_space.sample()
            observation, reward, done, truncated, info = self.env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self._storage.append(exp)
            state = observation

            if done:
                state = self.env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0) # remove the last one in dict

    def buffer_sample(self, N):
        # random n samples from exp buffer
        return random.sample(self._storage, N)



class DOUBLEDQN_CNN(nn.Module):
    def __init__(
        self,
            env,  # gym environment
            state_dim,  # state size
            action_dim,  # action size
            lr=0.001,  # learning rate
            gamma=0.99,  # discount factor
            batch_size=5,  # batch size for each training
            timestamp="", ):
        # super class
        super(DOUBLEDQN_CNN, self).__init__()

        self.env = env
        self.env.reset()
        self.timestamp = timestamp
        # for evaluation purpose
        self.test_env = copy.deepcopy(env)
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0
        self.is_rend = False

        self.target_net = Net(self.state_dim, self.action_dim).to(device)
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)
        self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)

        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)


    def choose_action(self, state, epsilon=0.9):
        # greedy strategy for choosing action
        # state: ndarray environment state
        # epsilon: float in [0,1]
        # return: action we chosen
        # turn to 1D float tensor -> [[a1,a2,a3,...,an]]
        # we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model
        # ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]
        if type(state) == type((1,)):
            state = state[0]
        #TODO
        state = (
            torch.FloatTensor(state).to(device).reshape(-1, 1, 7, self.state_dim // 7)
        )
        if np.random.randn() <= epsilon:
            action_value = self.estimate_net(state)
            action = torch.argmax(action_value).item()
        else:
            action = np.random.randint(0, self.action_dim)
        return action

    def train(self, num_episode):
        # num_eposide: total turn number for train
        count_list = []
        loss_list = []
        total_reward_list = []
        avg_reward_list = []
        episode_reward = 0
        rend = 0
        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = self.env.reset()
            each_loss = 0
            step = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                if self.is_rend:
                    self.env.render()
                step += 1
                action = self.choose_action(state)
                observation, reward, done, truncated, info = self.env.step(action)
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # sample random batch from replay memory
                exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)

                # extract batch data

                action_batch = torch.LongTensor([exp["action"] for exp in exp_batch])
                reward_batch = torch.FloatTensor([exp["reward"] for exp in exp_batch])
                done_batch = torch.FloatTensor([1 - exp["done"] for exp in exp_batch])
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list)
                state_batch = torch.FloatTensor(state_temp_list)

                # reshape
                state_batch = state_batch.to(device).reshape(
                    self.batch_size, 1, 7, self.state_dim // 7
                )
                action_batch = action_batch.to(device).reshape(self.batch_size, -1)
                reward_batch = reward_batch.to(device).reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.to(device).reshape(
                    self.batch_size, 1, 7, self.state_dim // 7
                )
                done_batch = done_batch.to(device).reshape(self.batch_size, -1)

                # get estimate Q value
                estimate_Q = self.estimate_net(state_batch).gather(1, action_batch)

                # get target Q value
                max_action_idx = self.estimate_net(state_next_batch).detach().argmax(1)
                target_Q = reward_batch + done_batch * self.gamma * self.target_net(
                    state_next_batch
                ).gather(1, max_action_idx.unsqueeze(1))

                # compute mse loss
                loss = F.mse_loss(estimate_Q, target_Q)
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                # update target network
                if self.learn_step_counter % 10 == 0:
                    self.target_net.load_state_dict(self.estimate_net.state_dict())
                self.learn_step_counter += 1

            reward, count = self.eval()
            episode_reward += reward

            # you can update these variables
            if episode_reward % 100 == 0:
                rend += 1
                if rend % 5 == 0:
                    self.is_rend = True
                else:
                    self.is_rend = False
            # save
            period = 1
            if episode % period == 0:
                each_loss /= step
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(
                    episode, num_episode, each_loss, episode_reward, count
                ))

                # epoch_reward = 0
                # create a new directory for saving
                path = PATH + "/" + self.timestamp
                try:
                    os.makedirs(path)
                except OSError:
                    pass
                # saving as timestamp file
                np.save(path + "/DOUBLE_DQN_CNN_LOSS.npy", loss_list)
                np.save(path + "/DOUBLE_DQN_CNN_EACH_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(), path + "/DOUBLE_DQN_CNN_params.pkl")

        self.env.close()
        return loss_list, avg_reward_list

    def eval(self):
        # evaluate the policy
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
        while not done:
            action = self.choose_action(state, epsilon=1)
            observation, reward, done, truncated, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = observation

        return total_reward, count


if __name__ == "__main__":

    # timestamp
    named_tuple = time.localtime()
    time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)
    print(time_string)
    # create a doubledqn object
    double_dqn_cnn_object = DOUBLEDQN_CNN(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        batch_size=64,
        timestamp=time_string,
    )
    # your chosen train times
    iteration = 20
    # start training
    avg_loss, avg_reward_list = double_dqn_cnn_object.train(iteration)
    path = PATH + "/" + time_string
    np.save(path + "/DOUBLE_DQN_CNN_LOSS.npy", avg_loss)
    np.save(path + "/DOUBLE_DQN_CNN_EACH_REWARD.npy", avg_reward_list)
    torch.save(double_dqn_cnn_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_CNN_params.pkl")
    torch.save(double_dqn_cnn_object.state_dict(), path + "/DOUBLE_DQN_CNN_MODEL.pt")

经验池

class Net(nn.Module):
    def __init__(self, state_dim, action_dim):
        # state_dim: state space
        # action_dim: action space
        super(Net, self).__init__()

        hidden_nodes1 = 1024
        hidden_nodes2 = 512
        self.fc1 = nn.Linear(state_dim, hidden_nodes1)
        self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)
        self.fc3 = nn.Linear(hidden_nodes2, action_dim)

    def forward(self, state):
        # state: ndarray
        x = state
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        out = self.fc3(x)
        return out

# Priortized_Replay
class Prioritized_Replay:
    def __init__(
        self,
        buffer_size,
        init_length,
        state_dim,
        action_dim,
        est_Net,
        tar_Net,
        gamma,
    ):
        # state_dim: state space
        # action_dim: action space
        # env: env
        self.buffer_size = buffer_size
        self.init_length = init_length
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.is_rend = False

        self.priority = deque(maxlen=buffer_size)
        self._storage = []
        self._init_buffer(init_length, est_Net, tar_Net)

    def _init_buffer(self, n, est_Net, tar_Net):
        # n: sample number
        state = env.reset()
        for i in range(n):
            action = env.action_space.sample()
            observation, reward, done, truncated, info = env.step(action)
            # gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit
            # observation: numpy array [location]
            # reward: reward for *action
            # terminated: bool whether end
            # truncated: bool whether overflow (from done)
            # info: help/log/information
            if type(state) == type((1,)):
                state = state[0]
            # if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item
            # because after run env.reset(), the state stores the environmental data and it can not be edited
            # we only need the state data -- the first ndarray
            exp = {
                "state": state,
                "action": action,
                "reward": reward,
                "state_next": observation,
                "done": done,
            }
            self.prioritize(est_Net, tar_Net, exp, alpha=0.6)
            self._storage.append(exp)
            state = observation

            if done:
                state = env.reset()
                done = False

    def buffer_add(self, exp):
        # exp buffer: {exp}=={
        #                 "state": state,
        #                 "action": action,
        #                 "reward": reward,
        #                 "state_next": observation,
        #                 "done": terminated,}
        self._storage.append(exp)
        if len(self._storage) > self.buffer_size:
            self._storage.pop(0)
    # add prioritize
    def prioritize(self, est_Net, tar_Net, exp, alpha=0.6):
        state = torch.FloatTensor(exp["state"]).to(device).reshape(-1)

        q = est_Net(state)[exp["action"]].detach().cpu().numpy()
        q_next = exp["reward"] + self.gamma * torch.max(est_Net(state).detach())
        # TD error
        p = (np.abs(q_next.cpu().numpy() - q) + (np.e ** -10)) ** alpha
        self.priority.append(p.item())

    def get_prioritized_batch(self, N):
        prob = self.priority / np.sum(self.priority)
        # random.choices(list,weights=None,*,cum_weights=None,k=1)
        # weight: set the chosen item rate
        # k: times for choice
        # cum_weight: sum of weight
        sample_idxes = random.choices(range(len(prob)), k=N, weights=prob)
        importance = (1 / prob) * (1 / len(self.priority))
        sampled_importance = np.array(importance)[sample_idxes]
        sampled_batch = np.array(self._storage)[sample_idxes]
        return sampled_batch.tolist(), sampled_importance

    def buffer_sample(self, N):
        # random n samples from exp buffer
        return random.sample(self._storage, N)


class DDQNPB(nn.Module):
    def __init__(
        self,
        env,
        state_dim,
        action_dim,
        lr=0.001,
        gamma=0.99,
        buffer_size=1000,
        batch_size=50,
        beta=1,
        beta_decay=0.995,
        beta_min=0.01,
        timestamp="",
    ):
        # env: environment
        # state_dim: state space
        # action_dim: action space
        # lr: learning rate
        # gamma: loss/discount factor
        # batch_size: training batch size
        super(DDQNPB, self).__init__()

        self.timestamp = timestamp

        self.test_env = copy.deepcopy(env)  # for evaluation purpose
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.learn_step_counter = 0

        self.target_net = Net(self.state_dim, self.action_dim).to(device)
        self.estimate_net = Net(self.state_dim, self.action_dim).to(device)
        self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

        self.ReplayBuffer = Prioritized_Replay(
            buffer_size,
            100,
            self.state_dim,
            self.action_dim,
            self.estimate_net,
            self.target_net,
            gamma,
        )
        self.priority = self.ReplayBuffer.priority
        # NOTE: right here beta is equal to (1-beta) in most of website articles, notation difference
        # start from 1 and decay
        self.beta = beta
        self.beta_decay = beta_decay
        self.beta_min = beta_min

    def choose_action(self, state, epsilon=0.9):
        # state: env state
        # epsilon: [0,1]
        # return action you choose
        # get a 1D array
        if type(state) == type((1,)):
            state = state[0]
        temp = [exp for exp in state]
        target = []
        target = np.array(target)
        # n dimension to 1 dimension ndarray
        for i in temp:
            target = np.append(target, i)
        state = torch.FloatTensor(target).to(device)
        if np.random.randn() <= epsilon:
            action_value = self.estimate_net(state)
            action = torch.argmax(action_value).item()
        else:
            action = np.random.randint(0, self.action_dim)
        return action

    def train(self, num_episode):
        # num_epochs: training times
        loss_list = []
        avg_reward_list = []
        episode_reward = 0

        for episode in tqdm(range(1,int(num_episode)+1)):
            done = False
            state = env.reset()
            each_loss = 0
            step = 0
            rend = 0
            if type(state) == type((1,)):
                state = state[0]
            while not done:
                action = self.choose_action(state)
                observation, reward, done, _, info = env.step(action)
                # self.env.render()
                # store experience to replay memory
                exp = {
                    "state": state,
                    "action": action,
                    "reward": reward,
                    "state_next": observation,
                    "done": done,
                }
                self.ReplayBuffer.buffer_add(exp)
                state = observation

                # importance weighting
                if self.beta > self.beta_min:
                    self.beta *= self.beta_decay

                # sample random batch from replay memory
                exp_batch, importance = self.ReplayBuffer.get_prioritized_batch(
                    self.batch_size
                )
                importance = torch.FloatTensor(importance ** (1 - self.beta)).to(device)

                # extract batch data
                action_batch = torch.LongTensor(
                    [exp["action"] for exp in exp_batch]
                ).to(device)
                reward_batch = torch.FloatTensor(
                    [exp["reward"] for exp in exp_batch]
                ).to(device)
                done_batch = torch.FloatTensor(
                    [1 - exp["done"] for exp in exp_batch]
                ).to(device)
                # Slow method -> Fast method when having more data
                state_next_temp = [exp["state_next"] for exp in exp_batch]
                state_temp = [exp["state"] for exp in exp_batch]
                state_temp_list = np.array(state_temp)
                state_next_temp_list = np.array(state_next_temp)

                state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)
                state_batch = torch.FloatTensor(state_temp_list).to(device)

                # reshape
                state_batch = state_batch.reshape(self.batch_size, -1)
                action_batch = action_batch.reshape(self.batch_size, -1)
                reward_batch = reward_batch.reshape(self.batch_size, -1)
                state_next_batch = state_next_batch.reshape(self.batch_size, -1)
                done_batch = done_batch.reshape(self.batch_size, -1)

                # get estimate Q value
                estimate_Q = self.estimate_net(state_batch).gather(1, action_batch)

                # get target Q value
                max_action_idx = self.estimate_net(state_next_batch).detach().argmax(1)
                target_Q = reward_batch + done_batch * self.gamma * self.target_net(
                    state_next_batch
                ).gather(1, max_action_idx.unsqueeze(1))

                # compute mse loss
                # loss = F.mse_loss(estimate_Q, target_Q)
                loss = torch.mean(
                    torch.multiply(torch.square(estimate_Q - target_Q), importance)
                )
                each_loss += loss.item()

                # update network
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                #TODO
                # update target network
                if self.learn_step_counter % 10 == 0:
                    # self.update_target_networks()
                    self.target_net.load_state_dict(self.estimate_net.state_dict())

                self.learn_step_counter += 1
                step += 1

                env.render()
                # you can update these variables
                # if episode_reward % 100 == 0:
                #     rend += 1
                #     if rend % 5 == 0:
                #         self.is_rend = True
                #     else:
                #         self.is_rend = False

            reward, count = self.eval()
            episode_reward += reward


            # save
            period = 1
            if episode % period == 0:
                # log
                each_loss /= period
                episode_reward /= period
                avg_reward_list.append(episode_reward)
                loss_list.append(each_loss)

                print(
                    "\nepoch: [{}/{}], \tavg loss: {:.4f}, \tavg reward: {:.3f}, \tsteps: {}".format(
                        episode, num_episode, each_loss, episode_reward, count
                    )
                )
                # episode_reward = 0
                # create a new directory for saving
                path = PATH + "/" + self.timestamp
                try:
                    os.makedirs(path)
                except OSError:
                    pass
                np.save(path + "/DOUBLE_DQN_PRIORITIZED_LOSS.npy", loss_list)
                np.save(path + "/DOUBLE_DQN_PRIORITIZED_REWARD.npy", avg_reward_list)
                torch.save(self.estimate_net.state_dict(),path + "/DOUBLE_DQN_PRIORITIZED_params.pkl")

        env.close()
        return loss_list, avg_reward_list

    def eval(self):
        """
        Evaluate the policy
        """
        count = 0
        total_reward = 0
        done = False
        state = self.test_env.reset()
        if type(state) == type((1,)):
            state = state[0]
        while not done:
            action = self.choose_action(state, epsilon=1)
            observation, reward, done, truncated, info = self.test_env.step(action)
            total_reward += reward
            count += 1
            state = observation

        return total_reward, count


if __name__ == "__main__":

    # timestamp for saving
    named_tuple = time.localtime()  # get struct_time
    time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)

    double_dqn_prioritized_object = DDQNPB(
        env,
        state_dim=105,
        action_dim=3,
        lr=0.001,
        gamma=0.99,
        buffer_size=1000,
        batch_size=64,
        timestamp=time_string,
    )

    # Train the policy
    iterations = 10000
    avg_loss, avg_reward_list = double_dqn_prioritized_object.train(iterations)

    path = PATH + "/" + time_string
    np.save(path + "/DOUBLE_DQN_PRIORITIZED_LOSS.npy", avg_loss)
    np.save(path + "/DOUBLE_DQN_PRIORITIZED_REWARD.npy", avg_reward_list)
    torch.save(double_dqn_prioritized_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_PRIORITIZED_params.pkl")
    torch.save(double_dqn_prioritized_object.state_dict(), path + "/DOUBLE_DQN_PRIORITIZED_MODEL.pt")

有些东西可以自己改掉,自己调出的bug才是好bug!(大雾)

写在后面:

关于自定义环境,刚刚花30分钟研究了一下,官方写的教程稀烂(狗头),我自己得到的攻略如下:

  1. 找到你的highway-env安装包位置,我的在:E:\formalFiles\Anaconda3-2020.07\envs\autodrive_38\Lib\site-packages\highway_env
  2. 在highway-env里的envs可以看到多个场景的定义文件,此处拿出intersection_env.py举例,其他的同理。新建一个文件test_env.py,把intersection_env.py的所有内容复制粘贴到里面。

  3. 在test_env.py里,重命名如下:
    class test(AbstractEnv):
        #
        # ACTIONS: Dict[int, str] = {
        #     0: 'SLOWER',
        #     1: 'IDLE',
        #     2: 'FASTER'
        # }
        ACTIONS: Dict[int, str] = {
            0: 'LANE_LEFT',
            1: 'IDLE',
            2: 'LANE_RIGHT',
            3: 'FASTER',
            4: 'SLOWER'
        }

    删除除了第一个class以外的所有class定义。这里是把动作区间改成5个。

  4. 在envs/_init_.py的末尾加上
    from highway_env.envs.test_env import *
  5. 在highway-env文件夹里找到一个单独的_init_.py,不是上一步的python文件!修改如下:
    def register_highway_envs():
        """Import the envs module so that envs register themselves."""
        # my test environment
        register(
            id='test-v0',# 引用名
            entry_point='highway_env.envs:test'#环境类名
        )
  6. 修改奖励,来到你的定义环境文件highway-env/envs/test_env.py里面,看到_reward函数,以及和它有关的_agent_reward函数等,可自行改掉算子。utils.py中有函数lmap()。
    def _reward(self, action: int) -> float:
        """Aggregated reward, for cooperative agents."""
        return sum(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles
                       ) / len(self.controlled_vehicles)
    
    def _agent_reward(self, action: int, vehicle: Vehicle) -> float:
        """Per-agent reward signal."""
        rewards = self._agent_rewards(action, vehicle)
        reward = sum(self.config.get(name, 0) * reward for name, reward in rewards.items())
        reward = self.config["arrived_reward"] if rewards["arrived_reward"] else reward
        reward *= rewards["on_road_reward"]
        if self.config["normalize_reward"]:
            reward = utils.lmap(reward, [self.config["collision_reward"], self.config["arrived_reward"]], [0, 1])
        return reward
    
    def _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]:
        """Per-agent per-objective reward signal."""
        scaled_speed = utils.lmap(vehicle.speed, self.config["reward_speed_range"], [0, 1])
        return {
                "collision_reward": vehicle.crashed,
                "high_speed_reward": np.clip(scaled_speed, 0, 1),
                "arrived_reward": self.has_arrived(vehicle),
                "on_road_reward": vehicle.on_road
            }
  7. 引用自定义环境如下:
    import highway-env
    import gym
    
    env = gym.make("test-v0")
    env.reset()
  8. 我自定义的环境文件,个人设定,不代表最佳结果:
    from typing import Dict, Tuple, Text
    
    import numpy as np
    from highway_env import utils
    from highway_env.envs.common.abstract import AbstractEnv, MultiAgentWrapper
    from highway_env.road.lane import LineType, StraightLane, CircularLane, AbstractLane
    from highway_env.road.regulation import RegulatedRoad
    from highway_env.road.road import RoadNetwork
    from highway_env.vehicle.kinematics import Vehicle
    from highway_env.vehicle.controller import ControlledVehicle
    
    
    class test(AbstractEnv):
        #
        # ACTIONS: Dict[int, str] = {
        #     0: 'SLOWER',
        #     1: 'IDLE',
        #     2: 'FASTER'
        # }
        ACTIONS: Dict[int, str] = {
            0: 'LANE_LEFT',
            1: 'IDLE',
            2: 'LANE_RIGHT',
            3: 'FASTER',
            4: 'SLOWER'
        }
        ACTIONS_INDEXES = {v: k for k, v in ACTIONS.items()}
    
        @classmethod
        def default_config(cls) -> dict:
            config = super().default_config()
            config.update({
                "observation": {
                    "type": "Kinematics",
                    "vehicles_count": 15,
                    "features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],
                    "features_range": {
                        "x": [-100, 100],
                        "y": [-100, 100],
                        "vx": [-20, 20],
                        "vy": [-20, 20],
                    },
                    "absolute": True,
                    "flatten": False,
                    "observe_intentions": False
                },
                "action": {
                    "type": "DiscreteMetaAction",
                    "longitudinal": True,
                    "lateral": True,
                    "target_speeds": [0, 4.5, 9]
                },
                "duration": 13,  # [s]
                "destination": "o1",
                "controlled_vehicles": 1,
                "initial_vehicle_count": 10,
                "spawn_probability": 0.6,
                "screen_width": 600,
                "screen_height": 600,
                "centering_position": [0.5, 0.6],
                "scaling": 5.5 * 1.3,
                "collision_reward": -10,
                "high_speed_reward": 2,
                "arrived_reward": 5,
                "reward_speed_range": [7.0, 9.0],# change
                "normalize_reward": False,
                "offroad_terminal": False
            })
            return config
    
        def _reward(self, action: int) -> float:
            """Aggregated reward, for cooperative agents."""
            return sum(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles
                       ) / len(self.controlled_vehicles)
    
        def _rewards(self, action: int) -> Dict[Text, float]:
            """Multi-objective rewards, for cooperative agents."""
            agents_rewards = [self._agent_rewards(action, vehicle) for vehicle in self.controlled_vehicles]
            return {
                name: sum(agent_rewards[name] for agent_rewards in agents_rewards) / len(agents_rewards)
                for name in agents_rewards[0].keys()
            }
        # edit your reward
        def _agent_reward(self, action: int, vehicle: Vehicle) -> float:
            """Per-agent reward signal."""
            rewards = self._agent_rewards(action, vehicle)
            reward = sum(self.config.get(name, 0) * reward for name, reward in rewards.items())
            reward = self.config["arrived_reward"] if rewards["arrived_reward"] else reward
            reward *= rewards["on_road_reward"]
            if self.config["normalize_reward"]:
                reward = utils.lmap(reward, [self.config["collision_reward"], self.config["arrived_reward"]], [0, 1])
            return reward
    
        def _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]:
            """Per-agent per-objective reward signal."""
            scaled_speed = utils.lmap(vehicle.speed, self.config["reward_speed_range"], [0, 1])
            return {
                "collision_reward": vehicle.crashed,
                "high_speed_reward": np.clip(scaled_speed, 0, 1),
                "arrived_reward": self.has_arrived(vehicle),
                "on_road_reward": vehicle.on_road
            }
    
        def _is_terminated(self) -> bool:
            return any(vehicle.crashed for vehicle in self.controlled_vehicles) \
                   or all(self.has_arrived(vehicle) for vehicle in self.controlled_vehicles) \
                   or (self.config["offroad_terminal"] and not self.vehicle.on_road)
    
        def _agent_is_terminal(self, vehicle: Vehicle) -> bool:
            """The episode is over when a collision occurs or when the access ramp has been passed."""
            return (vehicle.crashed or
                    self.has_arrived(vehicle) or
                    self.time >= self.config["duration"])
    
        def _is_truncated(self) -> bool:
            return
    
        def _info(self, obs: np.ndarray, action: int) -> dict:
            info = super()._info(obs, action)
            info["agents_rewards"] = tuple(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles)
            info["agents_dones"] = tuple(self._agent_is_terminal(vehicle) for vehicle in self.controlled_vehicles)
            return info
    
        def _reset(self) -> None:
            self._make_road()
            self._make_vehicles(self.config["initial_vehicle_count"])
    
        def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, dict]:
            obs, reward, terminated, truncated, info = super().step(action)
            self._clear_vehicles()
            self._spawn_vehicle(spawn_probability=self.config["spawn_probability"])
            return obs, reward, terminated, truncated, info
    
        def _make_road(self) -> None:
            """
            Make an 4-way intersection.
    
            The horizontal road has the right of way. More precisely, the levels of priority are:
                - 3 for horizontal straight lanes and right-turns
                - 1 for vertical straight lanes and right-turns
                - 2 for horizontal left-turns
                - 0 for vertical left-turns
    
            The code for nodes in the road network is:
            (o:outer | i:inner + [r:right, l:left]) + (0:south | 1:west | 2:north | 3:east)
    
            :return: the intersection road
            """
            lane_width = AbstractLane.DEFAULT_WIDTH
            right_turn_radius = lane_width + 5  # [m}
            left_turn_radius = right_turn_radius + lane_width  # [m}
            outer_distance = right_turn_radius + lane_width / 2
            access_length = 50 + 50  # [m]
    
            net = RoadNetwork()
            n, c, s = LineType.NONE, LineType.CONTINUOUS, LineType.STRIPED
            for corner in range(4):
                angle = np.radians(90 * corner)
                is_horizontal = corner % 2
                priority = 3 if is_horizontal else 1
                rotation = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]])
                # Incoming
                start = rotation @ np.array([lane_width / 2, access_length + outer_distance])
                end = rotation @ np.array([lane_width / 2, outer_distance])
                net.add_lane("o" + str(corner), "ir" + str(corner),
                             StraightLane(start, end, line_types=[s, c], priority=priority, speed_limit=10))
                # Right turn
                r_center = rotation @ (np.array([outer_distance, outer_distance]))
                net.add_lane("ir" + str(corner), "il" + str((corner - 1) % 4),
                             CircularLane(r_center, right_turn_radius, angle + np.radians(180), angle + np.radians(270),
                                          line_types=[n, c], priority=priority, speed_limit=10))
                # Left turn
                l_center = rotation @ (np.array([-left_turn_radius + lane_width / 2, left_turn_radius - lane_width / 2]))
                net.add_lane("ir" + str(corner), "il" + str((corner + 1) % 4),
                             CircularLane(l_center, left_turn_radius, angle + np.radians(0), angle + np.radians(-90),
                                          clockwise=False, line_types=[n, n], priority=priority - 1, speed_limit=10))
                # Straight
                start = rotation @ np.array([lane_width / 2, outer_distance])
                end = rotation @ np.array([lane_width / 2, -outer_distance])
                net.add_lane("ir" + str(corner), "il" + str((corner + 2) % 4),
                             StraightLane(start, end, line_types=[s, n], priority=priority, speed_limit=10))
                # Exit
                start = rotation @ np.flip([lane_width / 2, access_length + outer_distance], axis=0)
                end = rotation @ np.flip([lane_width / 2, outer_distance], axis=0)
                net.add_lane("il" + str((corner - 1) % 4), "o" + str((corner - 1) % 4),
                             StraightLane(end, start, line_types=[n, c], priority=priority, speed_limit=10))
    
            road = RegulatedRoad(network=net, np_random=self.np_random, record_history=self.config["show_trajectories"])
            self.road = road
    
        def _make_vehicles(self, n_vehicles: int = 10) -> None:
            """
            Populate a road with several vehicles on the highway and on the merging lane
    
            :return: the ego-vehicle
            """
            # Configure vehicles
            vehicle_type = utils.class_from_path(self.config["other_vehicles_type"])
            vehicle_type.DISTANCE_WANTED = 5  # Low jam distance
            vehicle_type.COMFORT_ACC_MAX = 6
            vehicle_type.COMFORT_ACC_MIN = -3
    
            # Random vehicles
            simulation_steps = 3
            for t in range(n_vehicles - 1):
                self._spawn_vehicle(np.linspace(0, 80, n_vehicles)[t])
            for _ in range(simulation_steps):
                [(self.road.act(), self.road.step(1 / self.config["simulation_frequency"])) for _ in range(self.config["simulation_frequency"])]
    
            # Challenger vehicle
            self._spawn_vehicle(60, spawn_probability=1, go_straight=True, position_deviation=0.1, speed_deviation=0)
    
            # Controlled vehicles
            self.controlled_vehicles = []
            for ego_id in range(0, self.config["controlled_vehicles"]):
                ego_lane = self.road.network.get_lane(("o{}".format(ego_id % 4), "ir{}".format(ego_id % 4), 0))
                destination = self.config["destination"] or "o" + str(self.np_random.randint(1, 4))
                ego_vehicle = self.action_type.vehicle_class(
                                 self.road,
                                 ego_lane.position(60 + 5*self.np_random.normal(1), 0),
                                 speed=ego_lane.speed_limit,
                                 heading=ego_lane.heading_at(60))
                try:
                    ego_vehicle.plan_route_to(destination)
                    ego_vehicle.speed_index = ego_vehicle.speed_to_index(ego_lane.speed_limit)
                    ego_vehicle.target_speed = ego_vehicle.index_to_speed(ego_vehicle.speed_index)
                except AttributeError:
                    pass
    
                self.road.vehicles.append(ego_vehicle)
                self.controlled_vehicles.append(ego_vehicle)
                for v in self.road.vehicles:  # Prevent early collisions
                    if v is not ego_vehicle and np.linalg.norm(v.position - ego_vehicle.position) < 20:
                        self.road.vehicles.remove(v)
    
        def _spawn_vehicle(self,
                           longitudinal: float = 0,
                           position_deviation: float = 1.,
                           speed_deviation: float = 1.,
                           spawn_probability: float = 0.6,
                           go_straight: bool = False) -> None:
            if self.np_random.uniform() > spawn_probability:
                return
    
            route = self.np_random.choice(range(4), size=2, replace=False)
            route[1] = (route[0] + 2) % 4 if go_straight else route[1]
            vehicle_type = utils.class_from_path(self.config["other_vehicles_type"])
            vehicle = vehicle_type.make_on_lane(self.road, ("o" + str(route[0]), "ir" + str(route[0]), 0),
                                                longitudinal=(longitudinal + 5
                                                              + self.np_random.normal() * position_deviation),
                                                speed=8 + self.np_random.normal() * speed_deviation)
            for v in self.road.vehicles:
                if np.linalg.norm(v.position - vehicle.position) < 15:
                    return
            vehicle.plan_route_to("o" + str(route[1]))
            vehicle.randomize_behavior()
            self.road.vehicles.append(vehicle)
            return vehicle
    
        def _clear_vehicles(self) -> None:
            is_leaving = lambda vehicle: "il" in vehicle.lane_index[0] and "o" in vehicle.lane_index[1] \
                                         and vehicle.lane.local_coordinates(vehicle.position)[0] \
                                         >= vehicle.lane.length - 4 * vehicle.LENGTH
            self.road.vehicles = [vehicle for vehicle in self.road.vehicles if
                                  vehicle in self.controlled_vehicles or not (is_leaving(vehicle) or vehicle.route is None)]
    
        def has_arrived(self, vehicle: Vehicle, exit_distance: float = 25) -> bool:
            return "il" in vehicle.lane_index[0] \
                   and "o" in vehicle.lane_index[1] \
                   and vehicle.lane.local_coordinates(vehicle.position)[0] >= exit_distance
    
    

哦,都要一个可视化是吧?来了来了。

在test-v0下,用double_dqn.py训练的图:(action_dim==5)

目前是单智能体,后续的多智能体需要调整输入的数据和动作,以及控制小车的数量,做为后续的待定改进点。

其他?等我写好 多智能体 0-0!

待好心人补充….毕竟这里是无人区啊(悲)

 终有一日,我会成为神一样的提纳里先生!

物联沃分享整理
物联沃-IOTWORD物联网 » 2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了

发表评论