手动搭建transformer模型进行时序预测,股票预测

手动搭建transformer模型,时序预测

一、数据

股票的数据具有时序性,采用股票数据来进行预测

下载随机一只股票历史数据进行处理,此次选用600243的数据

在这一步直接设定batch为1了,需要改的话需要自己重写一下数据,可以看我之前的那一个博客

(42条消息) transformer模型多特征、单特征seq2seq时序预测_m0_57144920的博客-CSDN博客

import pandas as pd
import torch
import numpy as np
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader

data_path=r'D:\pytorch_learn\second\600243pre\600243.csv'
elements=['收盘价','最高价','最低价','开盘价','前收盘']
# element=['开盘价']

def single_data():#以收盘价为y,且x归一化
    data_all = pd.read_csv(data_path, encoding='gbk')
    data_ha = []
    length = len(data_all)
    for index, element in enumerate(elements):
        data_element = data_all[element].values.astype(np.float64)
        data_element = data_element.reshape(length, 1)
        data_ha.append(data_element)
    X_hat = np.concatenate(data_ha, axis=1)
    # X_hat=data_all[element].values.astype(np.float64)
    X_CONVERT = torch.from_numpy(X_hat)
    X = torch.zeros_like(X_CONVERT)
    a = len(X_CONVERT)
    for i in range(a):
        X[i, :] = X_CONVERT[a - 1 - i, :]
    y = X[5:,3].type(torch.float32)
    y=y.reshape(y.shape[0],1)
    X = X[0:-5, :].type(torch.float32)
    # X-=torch.min(X,dim=0)
    # X/=torch.max(X,dim=0)
    # X -= torch.mean(X, dim=0)
    # X /= torch.std(X, dim=0)
    dataset=TensorDataset(X,y)
    data_loader=DataLoader(dataset,batch_size=64,shuffle=False)
    return data_loader   #torch.Size([64, 5]) [64,1]) 作为一个batch(batch_size为1)

二、模型

import torch
import numpy as np
import torch.nn as nn

d_model = 512   # 字 Embedding 的维度
d_ff = 2048     # 前向传播隐藏层维度
d_k = d_v = 64  # K(=Q), V的维度
n_layers = 1    # 有多少个encoder和decoder
n_heads = 8     # Multi-Head Attention设置为8
#
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pos_table = np.array([
        [pos / np.power(10000, 2 * i / d_model) for i in range(d_model)]
        if pos != 0 else np.zeros(d_model) for pos in range(max_len)])
        pos_table[1:, 0::2] = np.sin(pos_table[1:, 0::2])                  # 字嵌入维度为偶数时
        pos_table[1:, 1::2] = np.cos(pos_table[1:, 1::2])                  # 字嵌入维度为奇数时
        self.pos_table = torch.FloatTensor(pos_table)              # enc_inputs: [seq_len, d_model]

    def forward(self, enc_inputs):                                         # enc_inputs: [batch_size, seq_len, d_model]
        enc_inputs += self.pos_table[:enc_inputs.size(1), :]
        return self.dropout(enc_inputs)

def get_attn_pad_mask(seq_q, seq_k):                       # seq_q: [batch_size, seq_len] ,seq_k: [batch_size, seq_len]
    batch_size, len_q ,_= seq_q.size()         #1*64*5
    batch_size, len_k ,_= seq_k.size()
    # pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)          # 判断 输入那些含有P(=0),用1标记 ,[batch_size, 1, len_k]
    pad_attn_mask = torch.ones(batch_size,len_q,len_k)     # 判断 输入那些含有P(=0),用1标记 ,[batch_size, 1, len_k]
    return pad_attn_mask  # 扩展成多维度

class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, attn_mask):  # Q: [batch_size, n_heads, len_q, d_k]
        # K: [batch_size, n_heads, len_k, d_k]
        # V: [batch_size, n_heads, len_v(=len_k), d_v]
        # attn_mask: [batch_size, n_heads, seq_len, seq_len]
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)  # scores : [batch_size, n_heads, len_q, len_k]
        scores.masked_fill_(attn_mask, -1e9)  # 如果时停用词P就等于 0
        attn = nn.Softmax(dim=-1)(scores)
        context = torch.matmul(attn, V)  # [batch_size, n_heads, len_q, d_v]
        return context, attn


class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False)
        self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
        self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
        self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)

    def forward(self, input_Q, input_K, input_V, attn_mask):  # input_Q: [batch_size, len_q, d_model]
        # input_K: [batch_size, len_k, d_model]
        # input_V: [batch_size, len_v(=len_k), d_model]
        # attn_mask: [batch_size, seq_len, seq_len]
        residual, batch_size = input_Q, input_Q.size(0)
        Q = self.W_Q(input_Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2)  # Q: [batch_size, n_heads, len_q, d_k]
        K = self.W_K(input_K).view(batch_size, -1, n_heads, d_k).transpose(1, 2)  # K: [batch_size, n_heads, len_k, d_k]
        V = self.W_V(input_V).view(batch_size, -1, n_heads, d_v).transpose(1,2)  # V: [batch_size, n_heads, len_v(=len_k), d_v]
        attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1,
                                                  1)  # attn_mask : [batch_size, n_heads, seq_len, seq_len]
        context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)  # context: [batch_size, n_heads, len_q, d_v]
        # attn: [batch_size, n_heads, len_q, len_k]
        context = context.transpose(1, 2).reshape(batch_size, -1,
                                                  n_heads * d_v)  # context: [batch_size, len_q, n_heads * d_v]
        output = self.fc(context)  # [batch_size, len_q, d_model]
        return nn.LayerNorm(d_model)(output + residual), attn


class PoswiseFeedForwardNet(nn.Module):
    def __init__(self):
        super(PoswiseFeedForwardNet, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(d_model, d_ff, bias=False),
            nn.ReLU(),
            nn.Linear(d_ff, d_model, bias=False))

    def forward(self, inputs):  # inputs: [batch_size, seq_len, d_model]
        residual = inputs
        output = self.fc(inputs)
        return nn.LayerNorm(d_model)(output + residual)  # [batch_size, seq_len, d_model]

class EncoderLayer(nn.Module):
    def __init__(self):
        super(EncoderLayer, self).__init__()
        self.enc_self_attn = MultiHeadAttention()                                     # 多头注意力机制
        self.pos_ffn = PoswiseFeedForwardNet()                                        # 前馈神经网络

    def forward(self, enc_inputs, enc_self_attn_mask):                                # enc_inputs: [batch_size, src_len, d_model]
        #输入3个enc_inputs分别与W_q、W_k、W_v相乘得到Q、K、V                          # enc_self_attn_mask: [batch_size, src_len, src_len]
        enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs,    # enc_outputs: [batch_size, src_len, d_model],
                                               enc_self_attn_mask)                    # attn: [batch_size, n_heads, src_len, src_len]
        enc_outputs = self.pos_ffn(enc_outputs)                                       # enc_outputs: [batch_size, src_len, d_model]
        return enc_outputs, attn

class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.src_emb = nn.Linear(5, d_model)
        self.pos_emb = PositionalEncoding(d_model)                               # 加入位置信息
        self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])

    def forward(self, enc_inputs):                                               # enc_inputs: [batch_size, src_len]
        enc_outputs = self.src_emb(enc_inputs)   # enc_outputs: [batch_size, src_len, d_model]
        # print('1',enc_outputs)
        enc_outputs = self.pos_emb(enc_outputs)  # enc_outputs: [batch_size, src_len, d_model]
        enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs)  # enc_self_attn_mask: [batch_size, src_len, src_len]
        enc_self_attns = []
        for layer in self.layers:
            enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask)  # enc_outputs :   [batch_size, src_len, d_model],                                                                # enc_self_attn : [batch_size, n_heads, src_len, src_len]
            enc_self_attns.append(enc_self_attn)
        return enc_outputs, enc_self_attns

class Transformer(nn.Module):
    def __init__(self):
        super(Transformer, self).__init__()
        self.Encoder = Encoder()
        self.projection = nn.Linear(d_model, 1, bias=False)
        # self.projection1 = nn.Linear(128, 1, bias=False)
    def forward(self, enc_inputs):                         # enc_inputs: [batch_size, src_len]                                                   # dec_inputs: [batch_size, tgt_len]
        enc_outputs, enc_self_attns = self.Encoder(enc_inputs)         # enc_outputs: [batch_size, src_len, d_model],
                                                                       # enc_self_attns: [n_layers, batch_size, n_heads, src_len, src_len]
        dec_logits = self.projection(enc_outputs)                      # dec_logits: [batch_size, tgt_len, 1]
        # dec_logits = self.projection1(dec_logits)                      # dec_logits: [batch_size, tgt_len, 1]
        return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns

三、训练

from transformerhah import Transformer
import torch.nn as nn
import torch.optim as optim
from data import single_data
import matplotlib.pyplot as plt
import numpy as np
import os
import torch
import copy

png_save_path=r'D:\pytorch_learn\second\12.24transformer\png\3'
if not os.path.isdir(png_save_path):
    os.mkdir(png_save_path)

path_train=os.path.join(png_save_path,'weight.pth')

loader=single_data()

model = Transformer()
criterion = nn.MSELoss()     #忽略 占位符 索引为0.
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99)

best_loss=100000
best_epoch=0
for epoch in range(50):
    epoch_loss=0
    y_pre=[]
    y_true=[]
    for X,y in loader:  # enc_inputs : [batch_size, src_len,1](64*5)
        enc_inputs=X.unsqueeze(0)   #(1*64*5)
        # enc_inputs=enc_inputs.squeeze(2)
        # dec_inputs : [batch_size, ]
        # dec_outputs: [batch_size, 1]
        outputs, enc_self_attns = model(enc_inputs)
        # print(outputs.shape)
        outputs=outputs.squeeze(1)
        outputs=outputs.unsqueeze(0)
        y=y.unsqueeze(0)
        # outputs: [batch_size * tgt_len, tgt_vocab_size]
        loss = criterion(outputs, y.view(1,-1))
        loss_num=loss.item()
        epoch_loss+=loss_num
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()
        y_pre.append(outputs.detach().numpy())
        y_true.append(y.detach().numpy())

    if epoch_loss<best_loss:
        best_loss=epoch_loss
        best_epoch=epoch
        best_model_wts=copy.deepcopy(model.state_dict())
        torch.save(best_model_wts,path_train)

    pre = np.concatenate(y_pre, axis=1).squeeze(0)  # no norm label
    true = np.concatenate(y_true, axis=1).squeeze(2)  # no norm label
    true=true.squeeze(0)
    if True:
        plt.plot(true, color="blue", alpha=0.5)
        plt.plot(pre, color="red", alpha=0.5)
        plt.plot(pre - true, color="green", alpha=0.8)
        plt.grid(True, which='both')
        plt.axhline(y=0, color='k')
        # pyplot.savefig(os.path.join(png_save_path, 'pre.png'))
        plt.savefig(os.path.join(png_save_path, '%d.png'%epoch))
        plt.close()
    print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(epoch_loss))
print('best_loss::|',best_loss,'---best_epoch::|',best_epoch)
train_over_path=os.path.join(png_save_path,'loss(%d)---epoch(%d).pth'%(best_loss,best_epoch))
os.rename(path_train,train_over_path)
print('*******************over****************************')

四、效果

五、总结

在这里好像没有进行padding,每个批次的seq_len都为64,但是最后一个批次不足64,没进行padding,


2022.4.24更新:
最近私信的小伙伴有点多(可能是大家都开始接手课题的原因吧哈哈),如想要代码的话可以在后台私信我留下邮箱,我看到了会发过去的。
接下来补充一下代码部分的缺点:
代码是去年写的,现在来看还有很多不足之处,有些地方写的也很稚嫩
1、没有配置gpu环境
2、batch_size在数据处理时设置为1了,导致loss看起来很大,而且训练效果不是很理想(我的理解是batch_size设置为1就相当于随机梯度下降了,收敛效果应该没有batch下降好),感兴趣的兄弟们可以在数据预处理代码部分改一下
3、超参数设置不是很方便,当时写代码的只想着快点跑起来,没考虑代码的可读性,现在写的话应该会用parser库把超参数都集中到一起


代码自取

链接:https://pan.baidu.com/s/1ppCsSi55UohOOVZKm3hA4A
提取码:6666
–来自百度网盘超级会员V4的分享

来源:猛男的成长之路

物联沃分享整理
物联沃-IOTWORD物联网 » 手动搭建transformer模型进行时序预测,股票预测

发表评论