本文意在飞速使用LSTM,在数学建模中能更加快速。数据输入支持一维数据(单变量预测)或者为二维数据(多变量同时预测)。包含置信区间的计算。推荐使用 jupyter,因为可以保存训练步骤,重写画图代码更加便捷。

完整代码下载链接

数据输入 api

def data_basic():
    """2023美赛C:https://www.pancake2021.work/wp-content/uploads/Problem_C_Data_Wordle.xlsx"""
    date, data = Utils.openfile("Problem_C_Data_Wordle.xlsx", data_col=[1], date_col=0)
    return date, data


def data01():
    # 测试二维数组多变量
    data1 = np.sin(np.arange(300) * np.pi / 50) + np.random.randn(300) * 0.1
    data2 = np.sin(np.arange(300) * np.pi / 25) + np.random.randn(300) * 0.1 + 3
    return np.c_[data1[:, np.newaxis], data2[:, np.newaxis]]


def data02():
    # 测试一维数组
    return np.sin(np.arange(300) * np.pi / 50) + np.random.randn(300) * 0.1


def data03():
    # 测试二维数组单变量
    return data02()[:, np.newaxis]

在代码中有4种测试样例,其中 函数输入 比较好理解。为了方便文件读取可以采用

Utils.openfile(file_location_name, data_col=[indexes of data], date_col=index-of-date)

其中 file_location_name 即为文件所在位置,数据需满足第一行是列名或手动更改代码。data_col 即为数据所在的列,可以为字符串数组或者index数组。date_col即为日期所在的列,可以为None即没有。

初始化网络

window_size = 10
lstm = CustomLSTM(data, window_size, scale=False)
# lstm = CustomLSTM(data, window_size, date, scale=True)

window_size 即为 seq2seq 划窗大小。默认以window_size大小的窗口预测下一个数据(并没有写一个窗口预测多个数据)。其中date可无,在画图中用1~n的数字代替。scale即是否使用MinMaxScaler,在训练时可以都做尝试。

lstm.init_lstm(hidden=64, num_layers=1)

即网络的 hidden dim 和 lstm 的num layer。

可以自己在 Model.py 中 LSTM类中增加或减少层 (不建议减少)

训练网络

# max_batch 表示是否以整一个数据作为 batch 不做分割
_ = lstm.fit(num_epochs=2000, lr=0.01, max_batch=True, is_copy=False)
result = lstm.fit(num_epochs=2000, lr=0.001, max_batch=True, is_copy=True)
# lstm.train(num_epochs=1000, batch_size=55, max_batch=False)

该模板可以启用batch操作,但是在LSTM中感觉没有必要,直接使用max_batch就行,即用整一个batch进行训练;num_epochs即为循环次数;lr 为 learning rate ;is_copy 表示是否复制结果,这样中心训练的时候可以进行对比。该代码不考虑train test splite操作,可以先用大的lr进行粗训练再降低lr进行训练。

调整窗口大小重新训练

lstm.slice(20)
lstm.init_lstm(hidden=64, lr=0.001, num_layers=1)
result = lstm.fit(num_epochs=500, max_batch=True)

预测有真实值的数据

# 预测之前全部数据,是否启用 Dropout
predict = result.predicted(is_random=False)

Summary

# 打印 summary
r2, mse, rmse, mae, mape, smape = result.summary()

Predict

# 预测之后 n 步数据,is_random 表示是否启用 Dropou,如果随机的话启用 Dropout 。如果要使用 plot 功能,请保持 n 不变。 若要改变 n 则要从该行重新执行全部以下全部代码
n = 60
further_predict = result.predict(n, is_random=False)

指定 index 变量 n 步预测 1-alpha 置信区间

# alpha = 0.05: 0.95置信区间
alpha = 0.05

# 预测置信区间,函数具有 cache 保存功能

# n: 预测 n 步;n_sample 采样个数
conf = result.conf_int_sampled(n, alpha=alpha, n_sample=1000)
further_predict_sampled = result.sampled_result

# 之前(有真实值)的数据的置信区间。下列方法可二选一,推荐使用 conf_predicted_sampled

# 标准方法
conf_predicted = result.conf_int_predicted(alpha)
# 采样方法,函数具有 cache 保存功能
conf_predicted_sampled = result.conf_int_predicted_sampled(alpha=alpha, n_sample=1000)
predict_sampled = result.sampled_result_

# 需要首先求得置信区间才能 plot_confidence
# 输出的 conf 均为三维 ndarray 分别代表:变量index,每个变量预测index,(lower mean upper)

采样方法指:启用 Dropout 层运行 n_sample 次得到样本,计算置信区间,同时采样方法得到的平均值来表示预测结果更加符合符合实际。普通方法即运行一次得到的结果。

添加了 bootstrap 求 sample conf。但个人不建议使用。

画图

result.plot_loss()
result.plot()
result.plot_sampled()

# conf_predicted=None/conf_predicted_sampled/conf_predicted . 默认为 conf_predicted_sampled 的数据
# index 为显示第几个变量,一般单变量写 0 即可
result.plot_confidence(index=0, alpha=0.05, conf_predicted=conf_predicted)
result.plot_confidence(index=0, alpha=0.05, conf_predicted=conf_predicted_sampled)

# 该函数是上述所有函数的总结,上述为基本历程,该函数可以让读者自由匹配需要使用哪些数据
"""
names: Union[List[str], None] 表示每个变量的名字
index: Union[List[int], None, int] 表示绘制哪几个 index, None 即为全部
conf_predicted = conf_predicted / conf_predicted_sampled / None 已有数据置信区间
conf_predicte = conf / None 预测之后数据置信区间
conf_alpha: float eg: 0.05
result_ = predict / predict_sampled / None 对已有数据预测的数据
result = further_predict / further_predict_sampled / None 对之后预测的数据
"""

result.plot_all(names=None, index=None, conf_predicted=conf_predicted, conf_predict=conf, conf_alpha=0.05, result=further_predict_sampled, result_=predict_sampled)

plot_loss() 画 loss 曲线。plot() 画所有变量曲线。plot_confidence(index) 画变量为index的置信区间。

可以重写画图,让图更加好看

# 你可以重写画图让图变得好看
class LSTMResult(LSTMResultBase):
    def __init__(self, resultLSTM: Union[CustomLSTM, LSTMResultBase] = None):
        super().__init__(resultLSTM)

    def plot_confidence(self, index=0, alpha=0.05, conf_predicted=None):
        super().plot_confidence(index, alpha, conf_predicted)

    def plot_loss(self):
        pass

    def plot(self, names=None):
        pass

重新画图

# 当你重写 LSTMResult 后,可以进行如下操作
result = LSTMResult(result)
lstm = result.resultLSTM

result.plot()

保存数据和模型

# 保存模型
result.save()

加载保存数据

result = LSTMResultBase.load()
lstm = result.resultLSTM

整体代码

"""
File : main.py
Preferred Python IDE: Jupyter
"""

import numpy as np
from CustomModel import CustomLSTM
import Utils


def data_basic():
    """2023美赛C:https://www.pancake2021.work/wp-content/uploads/Problem_C_Data_Wordle.xlsx"""
    date, data = Utils.openfile("Problem_C_Data_Wordle.xlsx", data_col=[1], date_col=0)
    return date, data


def data01():
    # 测试二维数组多变量
    data1 = np.sin(np.arange(200) * np.pi / 50) + np.random.randn(200) * 0.1
    data2 = np.sin(np.arange(200) * np.pi / 25) + np.random.randn(200) * 0.1 + 3
    return np.c_[data1[:, np.newaxis], data2[:, np.newaxis]]


def data02():
    # 测试一维数组
    return np.sin(np.arange(200) * np.pi / 50) + np.random.randn(200) * 0.1


def data03():
    # 测试二维数组单变量
    return data02()[:, np.newaxis]


# 加载数据,如果要使用 date,date 需要满足 excel 里面日期的格式或自行写代码设置时间戳
# date, data = data_basic()
data = data01()

# 初始化网络
window_size = 10
lstm = CustomLSTM(data, window_size, scale=False)
# lstm = CustomLSTM(data, window_size, date, scale=True)
lstm.init_lstm(hidden=64, num_layers=3)

# 训练网络
# max_batch 表示是否以整一个数据作为 batch 不做分割
_ = lstm.fit(num_epochs=2000, lr=0.01, max_batch=True, is_copy=False)
result = lstm.fit(num_epochs=2000, lr=0.001, max_batch=True, is_copy=True)
# result = lstm.fit(num_epochs=1000, lr=0.01, batch_size=55, max_batch=False)

# 调整窗口大小重新训练
# lstm.slice(20)
# lstm.init_lstm(hidden=64, lr=0.001, num_layers=1)
# lstm.fit(num_epochs=50, batch_size=40, max_batch=False)

# 预测之前全部数据
predict = result.predicted(is_random=False)

# 打印 summary
r2, mse, rmse, mae, mape, smape = result.summary()

# 预测之后 n 步数据
n = 60
further_predict = result.predict(n)

# 指定 index 变量 n 步预测 1-alpha 置信区间
alpha = 0.05
conf_predicted = result.conf_int_predicted(alpha)
conf_predicted_sampled = result.conf_int_predicted_sampled(alpha=alpha, n_sample=1000)
conf = result.conf_int_sampled(n, alpha=alpha, n_sample=1000)

# 画图
result.plot_loss()
result.plot()
result.plot_confidence(index=0, alpha=0.05, conf_predicted=None)
"""
File : CustomModel.py
"""

import numpy as np
import warnings
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils.validation import check_is_fitted, check_array
from copy import deepcopy
from Model import LSTM

warnings.simplefilter("always")

# gpu or cpu
_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


# 添加可以按列 inverse 的函数
class MyScaler(MinMaxScaler):
    def __init__(self):
        super().__init__()

    def inverse_transform_col(self, x, n_col):
        check_is_fitted(self)
        x = check_array(
            x, copy=self.copy, dtype=(np.float64, np.float32, np.float16), force_all_finite="allow-nan"
        )
        x -= self.min_[n_col]
        x /= self.scale_[n_col]
        return x


class CustomLSTM:
    X, Y, data, model, criterion, window, losses, scale = [None] * 8

    def __init__(self, data: np.array, window, date=None, scale=False):
        self.data = data
        self.input_dim = self.init_data()
        self.output_dim = self.input_dim
        self.date = date
        self.scale = scale
        # 是否归一化
        if scale:
            self.scaler = MyScaler()
            self.data = self.scaler.fit_transform(self.data)
        self.slice(window)

    # data 可为一维数组或二维数组,强制把一维数组转化为二维数组
    def init_data(self):
        assert (length := len(self.data.shape)) in [1, 2]
        if length == 1:
            self.data = self.data[:, np.newaxis]
        return len(self.data[0])

    # 已弃用下列函数,改为修改 h0, c0 保证不报错
    # 检查总数据大小是否能整除 batch
    def check_batch(self, batch_size):
        length = self.X.shape[0]
        # 保证 batch_size 比总长度小
        assert length >= batch_size
        if batch_size * (length // batch_size) != length:
            warnings.warn(f'数据大小为{length}, batch大小为{batch_size},无法整除,会损失{(length % batch_size) / length * 100}%数据', DeprecationWarning)

    # 重置参数
    def init_param(self):
        self.model, self.criterion, self.losses = [None] * 3

    # 以 window 为窗口大小切片形成整个batch
    def slice(self, window):
        self.init_param()
        self.window = window
        x, y = [], []
        for i in range(len(self.data) - window):
            x.append(self.data[i:i + window])
            y.append(self.data[i + window])
        x = np.array(x)
        y = np.array(y)
        x = torch.from_numpy(x).float()  # (batch_size, sequence_length, input_size)
        y = torch.from_numpy(y).float()
        print(f"数据格式: X = {x.shape}, Y = {y.shape}")
        self.X = x.to(_device)
        self.Y = y.to(_device)

    # 初始化 LSTM model
    def init_lstm(self, hidden=64, num_layers=1):
        self.init_param()
        self.model = LSTM(self.input_dim, hidden, self.output_dim, num_layers).to(_device)
        self.criterion = nn.MSELoss().to(_device)

    # 总数据产生 batch 并可以进行 shuffle
    @staticmethod
    def iterate_batches(inputs, targets, batch_size, shuffle=True):
        assert len(inputs) == len(targets)
        indices = np.arange(n := len(inputs))
        if shuffle:
            np.random.shuffle(indices)
        for start_idx in range(0, len(inputs) - batch_size + 1, batch_size):
            excerpt = indices[start_idx:start_idx + batch_size]
            yield inputs[excerpt], targets[excerpt], None
        if (left := n % batch_size) != 0:
            excerpt = indices[-left:]
            yield inputs[excerpt], targets[excerpt], left

    # 开始训练
    def fit(self, num_epochs=100, lr=0.01, batch_size=128, max_batch=False, is_copy=False):
        losses = []
        avg_loss = 0
        if self.model is None:
            raise ValueError("请先使用CustomLSTM.init_lstm初始化网络")
        optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
        self.model.train()
        if max_batch:
            batch_size = self.X.shape[0]
        else:
            pass
            # self.check_batch(batch_size)
        for epoch in range(num_epochs):
            loss_all = 0
            self.model.init_hidden(batch_size)
            index = 0
            for index, (batch_x, batch_y, left) in enumerate(CustomLSTM.iterate_batches(self.X, self.Y, batch_size, shuffle=False)):
                optimizer.zero_grad()
                outputs = self.model(batch_x, left)
                loss = self.criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()
                loss_all += loss.detach().cpu()
            losses.append(loss_all / (index + 1))
            avg_loss += loss_all / (index + 1)
            if epoch % 20 == 0:
                print('Epoch [{}/{}], Loss: {:.4f}, AVG Loss of 20 sample: {:.4f}'.format(epoch + 1, num_epochs, loss_all / (index + 1), avg_loss / 20))
                avg_loss = 0
        if self.losses is not None:
            self.losses = self.losses + losses
        else:
            self.losses = losses
        self.model.init_hidden(1)
        from LSTMResult import LSTMResultBase
        return LSTMResultBase(deepcopy(self) if is_copy else self)
"""
File : LSTMResult.py
"""

import Utils
import numpy as np
import warnings
import inspect
import torch
import matplotlib.pyplot as plt
import pickle
import time
from functools import cache
from RunTime import cal_time
import CustomModel

warnings.simplefilter("always")

# gpu or cpu
_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


class LSTMResultBase(object):
    # result 代表预测之后的数据的结果; result_ 代表预测已有真实值的数据的结果
    # sampled result 比 result 更有说服力,克服了 Dropout 随机性问题,更贴近实际,更推荐使用 sampled result 数据。
    result, conf, result_, conf_predicted, conf_predicted_sampled, sampled_result, sampled_result_ = [None] * 7

    # 在内存中保存原始训练数据
    def __init__(self, resultLSTM: CustomModel.CustomLSTM = None):
        if isinstance(resultLSTM, CustomModel.CustomLSTM):
            self.resultLSTM = resultLSTM
        elif isinstance(resultLSTM, LSTMResultBase):
            self.resultLSTM = resultLSTM.resultLSTM
            self.copy_param(resultLSTM)
        else:
            warnings.warn(f'输入数据类型:{type(resultLSTM)},非标准数据,请输入 CustomLSTM or LSTMResultBase', DeprecationWarning)
        self.__doc__ = resultLSTM.__doc__

    def copy_param(self, parent):
        self.result, self.conf, self.result_, self.conf_predicted, self.conf_predicted_sampled, self.sampled_result, self.sampled_result_ = \
            parent.result, parent.conf, parent.result_, parent.conf_predicted, parent.conf_predicted_sampled, parent.sampled_result, parent.sampled_result_

    # 获取变量,不采用拷贝方法
    def __getattribute__(self, item):
        get = lambda name: object.__getattribute__(self, name)

        try:
            return get(item)
        except AttributeError:
            pass

        try:
            results = get('resultLSTM')
        except AttributeError:
            raise ImportError("CustomLSTM load error")

        try:
            attribute = getattr(results, item)
            if inspect.ismethod(attribute):
                warnings.warn(f"调用{attribute},可能出错,请尽量不要在此调用 CustomLSTM", DeprecationWarning)
            return attribute
        except AttributeError:
            AttributeError(f"{item} not in bot CustomLSTM or LSTMResult")

    def __dir__(self):
        return [x for x in dir(self.resultLSTM) if not inspect.ismethod(getattr(self.resultLSTM, x))] + dir(LSTMResultBase)

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, dict_):
        self.__dict__.update(dict_)

    # 预测从 window_size 到结束数据的预测值,以便与真实值做比较
    @cal_time()
    def predicted(self, is_random=False):
        if self.model is None:
            raise ValueError("请先使用CustomLSTM.train")
        if is_random:
            self.model.train()
        else:
            self.model.eval()
        self.model.init_hidden(len(self.X))
        with torch.no_grad():
            predicted = self.model(self.X).to(_device)
            predicted = predicted.detach().cpu().numpy()
        self.result_ = np.array(predicted)
        if self.scale:
            self.result_ = self.scaler.inverse_transform(self.result_)
        return self.result_

    # 预测之后 n 天的值
    @cal_time()
    def predict(self, n, is_random=False):
        if self.model is None:
            raise ValueError("请先使用CustomLSTM.train")
        if is_random:
            self.model.train()
        else:
            self.model.eval()
        self.model.init_hidden(1)
        _data = self.data[-self.window:, :].tolist()
        y = []
        with torch.no_grad():
            for _ in range(n):
                x = torch.tensor(_data).float().unsqueeze(0).to(_device)
                _result = self.model(x).detach().cpu().tolist()
                y.append(_result[0])
                _data.append(_result[0])
                _data.pop(0)
        self.result = np.array(y)
        if self.scale:
            self.result = self.scaler.inverse_transform(self.result)
        return self.result

    # 保存整个模型
    def save(self, name="./data"):
        with open(name, 'wb') as f:
            pickle.dump(self, f)

    # 加载模型
    @staticmethod
    def load(name="./data"):
        with open(name, "rb") as f:
            return pickle.load(f)

    # 标准计算已有真实数据的 conf
    def conf_int_predicted(self, alpha=0.05):
        if self.result_ is None:
            raise ValueError("请先使用LSTMResultBase.predicted")
        y_pred = self.result_
        y_true = self.data[self.window:]
        n_param = self.data.shape[1]
        # 变量index,每个变量个数,(lower mean upper)
        conf = np.zeros(shape=(n_param, len(y_true), 3))
        for i in range(n_param):
            conf[i] = Utils.ci(y_true[:, i], y_pred[:, i], alpha=alpha)
        self.conf_predicted = conf
        return conf

    # 采用 sample 方法计算已有真实数据的置信区间,推荐使用该方法
    @cache
    def conf_int_predicted_sampled(self, alpha=0.05, n_sample=1000, method=Utils.sample_ci):
        result_ = np.copy(self.result_)
        n = self.data.shape[0] - self.window
        n_param = self.data.shape[1]
        # 变量index,每个变量个数,(lower mean upper)
        conf = np.zeros(shape=(n_param, n, 3))

        sample = np.zeros(shape=(n_sample, n, n_param))
        for i in range(n_sample):
            print(f"[{i + 1}/{n_sample}] ", end="")
            sample[i] = self.predicted(is_random=True)

        for i in range(n_param):
            conf[i] = method(sample[:, :, i], alpha=alpha)
            # conf[i] = Utils.sample_ci(sample[:, :, i], alpha=alpha)
            # conf[i] = Utils.bootstrap_ci_sampled(sample[:, :, i], alpha=alpha)

        self.conf_predicted_sampled = conf
        self.sampled_result_ = conf[:, :, 1].reshape(n_param, n).T
        self.result_ = result_
        return conf

    # 预测数据置信区间,采用 sample 方法计算
    @cache
    def conf_int_sampled(self, n, alpha=0.05, n_sample=1000, method=Utils.sample_ci):
        result = np.copy(self.result)
        n_param = self.data.shape[1]
        sample = np.zeros(shape=(n_sample, n, n_param))
        conf = np.zeros(shape=(n_param, n, 3))
        for i in range(n_sample):
            print(f"[{i + 1}/{n_sample}] ", end="")
            sample[i] = self.predict(n, is_random=True)

        for i in range(n_param):
            conf[i] = method(sample[:, :, i], alpha=alpha)
            # conf[i] = Utils.sample_ci(sample[:, :, i], alpha=alpha)
            # conf[i] = Utils.bootstrap_ci_sampled(sample[:, :, i], alpha=alpha)

        self.conf = conf
        self.sampled_result = conf[:, :, 1].reshape(n_param, n).T
        self.result = result
        return conf

    # 绘制 loss 曲线
    def plot_loss(self):
        if self.losses is None:
            raise ValueError("loss 不存在,请先进行训练")
        plt.figure(figsize=(12, 6))
        plt.plot(self.losses)
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.show()

    def plot_all(self, names=None, index=None, conf_predicted=None, conf_predict=None, conf_alpha=None, result=None, result_=None):
        if names is None:
            names = [''] * len(self.data[0])
        x = np.arange(len(self.data))
        x_further = np.arange(len(self.data), len(self.data) + len(self.result))
        if index is None:
            index = range(len(self.data[0]))
        elif isinstance(index, int):
            index = [index]
        conf_name = '{} Confidence interval' if conf_alpha is None else '{} ' + f'{1 - conf_alpha} Confidence interval'
        plt.figure(figsize=(12, 6))
        for i in index:
            y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
            plt.plot(x, y_true, label=f'{names[i]} True Values')
            if result_ is not None:
                plt.plot(x[self.window:], result_[:, i:i + 1], label=f'{names[i]} Predictions')
            if result is not None:
                plt.plot(x_further, result[:, i:i + 1], label=f"{names[i]} Further Predictions")
            if conf_predicted is not None:
                plt.fill_between(x[self.window:], conf_predicted[i, :, 0], conf_predicted[i, :, 2], alpha=0.2, label=conf_name.format(names[i]))
            if conf_predict is not None:
                plt.fill_between(x_further, conf_predict[i, :, 0], conf_predict[i, :, 2], alpha=0.2, label=conf_name.format(names[i]))

        self.process_x_label()
        plt.legend()
        plt.show()

    # 因为可以为二维数组即对多个变量进行预测,names即为每个变量的名字
    def plot(self, names=None):
        if self.result is None:
            raise ValueError("请先使用LSTMResultBase.predict")
        if self.result_ is None:
            raise ValueError("请先使用LSTMResultBase.predicted")
        if names is None:
            names = [''] * len(self.data[0])
        x = np.arange(len(self.data))
        x_further = np.arange(len(self.data), len(self.data) + len(self.result))

        plt.figure(figsize=(12, 6))
        for i in range(len(self.data[0])):
            y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
            plt.plot(x, y_true, label=f'{names[i]} True Values')
            plt.plot(x[self.window:], self.result_[:, i:i + 1], label=f'{names[i]} Predictions')
            plt.plot(x_further, self.result[:, i:i + 1], label=f"{names[i]} Further Predictions")
        self.process_x_label()
        plt.legend()
        plt.show()

    def plot_sampled(self, names=None):
        if self.sampled_result is None:
            raise ValueError("请先使用LSTMResultBase.conf_int_sampled")
        if self.sampled_result_ is None:
            raise ValueError("请先使用LSTMResultBase.conf_int_predicted_sampled")
        if names is None:
            names = [''] * len(self.data[0])
        x = np.arange(len(self.data))
        x_further = np.arange(len(self.data), len(self.data) + len(self.result))

        plt.figure(figsize=(12, 6))
        for i in range(len(self.data[0])):
            y_true = self.data[:, i: i + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, i:i + 1], i)
            plt.plot(x, y_true, label=f'{names[0]} True Values')
            plt.plot(x[self.window:], self.sampled_result_[:, i:i + 1], label=f'{names[0]} Predictions')
            plt.plot(x_further, self.sampled_result[:, i:i + 1], label=f"{names[0]} Further Predictions")
        self.process_x_label()
        plt.legend()
        plt.show()

    # 把 x 轴设置为时间
    def process_x_label(self):
        if self.date is None:
            return
        begin = self.date[0]
        end = self.date[-1]
        between = (end - begin) // (len(self.date) - 1)
        n = len(self.result)
        length = len(self.data) + n
        gap = length // 5
        # "%Y-%m-%d %H:%M:%S"
        plt.xticks(np.arange(0, length, gap), [time.strftime("%Y-%m-%d", time.localtime(i)) for i in range(begin, end + between * n + 1, between * gap)],
                   rotation=45)
        plt.tight_layout()

    # 对单个变量画置信区间
    def plot_confidence(self, index=0, alpha=0.05, conf_predicted=None):
        if self.result is None:
            raise ValueError("请先使用LSTMResultBase.predict")
        if self.result_ is None:
            raise ValueError("请先使用LSTMResultBase.predicted")
        if conf_predicted is None:
            try:
                conf_predicted = self.conf_predicted_sampled[index]
            except IndexError:
                try:
                    conf_predicted = self.conf_predicted[index]
                except IndexError:
                    raise ValueError("请先使用LSTMResultBase.conf_int_predicted_sampled/conf_int_predicted")
        else:
            conf_predicted = conf_predicted[index]
        try:
            conf = self.conf[index]
        except IndexError:
            raise ValueError("请先使用LSTMResultBase.conf_int_predicted")

        plt.figure(figsize=(12, 6))
        x = np.arange(len(self.data))
        n = len(self.result)
        x_further = np.arange(len(self.data), len(self.data) + n)
        y_true = self.data[:, index: index + 1] if not self.scale else self.scaler.inverse_transform_col(self.data[:, index: index + 1], index)

        plt.plot(x, y_true, label='True Values')
        plt.plot(x[self.window:], conf_predicted[:, 1], label='Predictions')
        plt.plot(x_further, conf[:, 1], label="Further Predictions")

        plt.fill_between(x[self.window:], conf_predicted[:, 0], conf_predicted[:, 2], alpha=0.2, label=f'{1 - alpha} Confidence interval')
        plt.fill_between(x_further, conf[:, 0], conf[:, 2], alpha=0.2, label=f'{1 - alpha} Confidence interval')

        self.process_x_label()
        plt.legend()
        plt.show()

    # 打印 summary, 即 r^2 评价函数等
    def summary(self):
        if self.result_ is None:
            raise ValueError("请先使用LSTMResultBase.predicted")
        data = self.data[self.window:, :]
        if self.scale:
            data = self.scaler.inverse_transform(data)
        print("==========Summary Begin===========")
        print("R2    =", score_r2 := Utils.r2(self.result_, data))
        print("MSE   =", score_mse := Utils.mse(self.result_, data))
        print("RMSE  =", score_rmse := np.sqrt(score_mse))
        print("MAE   =", score_mae := Utils.mae(self.result_, data))
        print("MAPE  =", score_mape := Utils.mape(self.result_, data, replace=self.scale))
        print("SMAPE =", score_smape := Utils.smape(self.result_, data))
        print(f"Average is {score_r2.mean()} {score_mse.mean()} {score_rmse.mean()} {score_mae.mean()} {score_mape.mean()} {score_smape.mean()}")
        print("===========Summary end============")
        return score_r2, score_mse, score_rmse, score_mae, score_mape, score_smape
"""
File : Model.py
"""

import torch
import torch.nn as nn

# gpu or cpu
_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


# 定义LSTM模型
class LSTM(nn.Module):
    hidden = None

    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        # As batch_first=True, input: (batch_size, sequence_length, input_size)
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.lstm2 = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True)
        self.out = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(hidden_size, output_size)
        )
        # 可选操作,可以把下一行注释
        self.apply(LSTM.init_weights)

    def forward(self, x, left=None):
        self.lstm.flatten_parameters()
        self.lstm2.flatten_parameters()
        # 防止 loss.backward 报错
        hidden = [each.data for each in self.hidden] if left is None else [each.data[:, :left, :] for each in self.hidden]
        x, hidden = self.lstm(x, hidden)
        x, self.hidden = self.lstm2(x, hidden)
        x = x[:, -1, :]
        x = self.out(x)
        return x

    # (h0, c0)
    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        self.hidden = (weight.new(self.num_layers, batch_size, self.hidden_size).zero_().to(_device),
                       weight.new(self.num_layers, batch_size, self.hidden_size).zero_().to(_device))

    @staticmethod
    def init_weights(m):
        if type(m) == nn.LSTM:
            for name, param in m.named_parameters():
                if 'weight_ih' in name:
                    torch.nn.init.orthogonal_(param.data)
                elif 'weight_hh' in name:
                    torch.nn.init.orthogonal_(param.data)
                elif 'bias' in name:
                    param.data.fill_(0)
        elif type(m) == nn.Conv1d or type(m) == nn.Linear:
            torch.nn.init.orthogonal_(m.weight)
            m.bias.data.fill_(0)
"""
File : RunTime.py
"""

import time
from functools import wraps


# 用于计算函数运行时间,自定义修饰器,带有ms/s/min/hour/day单位
def cal_time(unit='s'):
    units = ['ms', 's', 'min', 'hour', 'day']
    times = [1000, 1, 1 / 60, 1 / 3600, 1 / 3600 / 24]
    try:
        time_process = times[units.index(unit.lower())]
    except Exception as e:
        raise e

    def input_func(func):

        @wraps(func)
        def wrap(*args, **kwargs):
            begin_time = time.time()
            ans = func(*args, **kwargs)
            print(func.__name__, f"用时 {(time.time() - begin_time) * time_process} {unit}")
            return ans

        return wrap

    return input_func
"""
File : Utils.py
"""

import numpy as np
import pandas as pd
import scipy.stats as st
from typing import Union, List


# 置信区间

def ci(y_true: np.ndarray, y_pred: np.ndarray, alpha: float = 0.05, use_t: bool = True) -> np.ndarray:
    residuals = y_true - y_pred
    n = len(residuals)
    if use_t:
        dist = st.t
        df = n - 1
        t_value = dist.ppf(1 - alpha / 2, df)
    else:
        dist = st.norm
        t_value = dist.ppf(1 - alpha / 2)
    if y_true.ndim == 1:
        std_err = np.std(residuals, ddof=1) / np.sqrt(n)
    else:
        std_err = np.std(residuals, ddof=1, axis=0) / np.sqrt(n)
    err = t_value * std_err
    return np.c_[y_pred - err, y_pred, y_pred + err]


# sample 置信区间

def sample_ci(sample: np.ndarray, alpha: float = 0.05, use_t: bool = True) -> np.ndarray:
    n = len(sample)
    if use_t:
        dist = st.t
        df = n - 1
        t_value = dist.ppf(1 - alpha / 2, df)
    else:
        dist = st.norm
        t_value = dist.ppf(1 - alpha / 2)
    means = sample.mean(axis=0)
    stds = sample.std(axis=0, ddof=1)
    lower = means - t_value * stds / np.sqrt(n)
    upper = means + t_value * stds / np.sqrt(n)
    return np.c_[lower, means, upper]


# sample bootstrap 置信区间 sample: (n_sample, n)

def bootstrap_ci_sampled(sample: np.ndarray, alpha: float = 0.05, n_bootstraps: int = 1000) -> np.ndarray:
    n = len(sample[0])
    sample_size = len(sample)
    lower = np.zeros(shape=(n,))
    means = np.zeros(shape=(n,))
    upper = np.zeros(shape=(n,))
    for i in range(n):
        bootstrap_sample = np.random.choice(sample[:, i], size=(sample_size, n_bootstraps), replace=True)
        bootstrap_means = np.mean(bootstrap_sample, axis=1)
        lower[i] = np.percentile(bootstrap_means, 100 * alpha / 2)
        upper[i] = np.percentile(bootstrap_means, 100 * (1 - alpha / 2))
        means[i] = np.mean(bootstrap_means)
    return np.c_[lower, means, upper]


# 输入均为二维 numpy , 数据产生在列,输出一维 numpy 评价 value

def r2(y_pred, y_true):
    return 1 - ((y_pred - y_true) ** 2).sum(axis=0) / ((y_true.mean(axis=0) - y_true) ** 2).sum(axis=0)


def mse(y_pred, y_true):
    return ((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred)


def rmse(y_pred, y_true):
    return np.sqrt(((y_true - y_pred) ** 2).sum(axis=0) / len(y_pred))


def mae(y_pred, y_true):
    return (np.absolute(y_true - y_pred)).sum(axis=0) / len(y_true)


def mape(y_pred, y_true, replace=False):
    # 防止 y_true 含 0
    if replace:
        y_true = np.copy(y_true)
        y_true[y_true == 0] = 1
    return (np.abs((y_pred - y_true) / y_true)).mean(axis=0) * 100


def smape(y_pred, y_true):
    return 2.0 * (np.abs(y_pred - y_true) / (np.abs(y_pred) + np.abs(y_true))).mean(axis=0) * 100


# 添加快捷打开文件操作

def openfile(name: str, data_col: Union[None, List[str], List[int]] = None, date_col: Union[None, int] = None) -> (np.ndarray, np.ndarray):
    file_type = name.split(".")[-1]
    if file_type == "csv":
        df = pd.read_csv(name, encoding='GBK')
    elif file_type == "xlsx" or file_type == "xls":
        df = pd.read_excel(name)
    else:
        raise TypeError(f"{name} 类型不是 csv, xls, xlsx")
    # df = df[["列名字1", "列名字2"]]
    df_data = df.iloc[:, data_col] if data_col is not None else df
    date = df.iloc[:, date_col].astype(
        'int64') // 1e9 if date_col is not None else None
    return np.array(date, dtype=int), np.array(df_data)
更新日志:

2023-4-10:修改了 bootstrap错误;优化了dir(LSTMResultBase);添加plot_all 自定义函数;修复了错误 warnings 不显示;对 sample 和 single 数据分离;取消 batch_size 检测改为修改 h0, c0;把 predicted 放入 resultbase 中;修复了 save 数据丢失问题。

物联沃分享整理
物联沃-IOTWORD物联网 » LSTM 易用代码 (pytorch)

发表评论