| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- import torch
- import torch.nn as nn
- # 分类模型 (Transformer)
- class PriceDropClassifiTransModel(nn.Module):
- def __init__(self, input_size, num_periods=2, hidden_size=128, num_layers=3, output_size=1, dropout=0.3, conv_out_channels=64, kernel_size=3, num_heads=8):
- super(PriceDropClassifiTransModel, self).__init__()
- self.hidden_size = hidden_size
- self.num_layers = num_layers
- self.num_periods = num_periods
- # 卷积层
- self.conv1 = nn.Conv1d(
- in_channels=input_size * num_periods,
- out_channels=conv_out_channels,
- kernel_size=kernel_size,
- padding=kernel_size // 2,
- bias=False,
- )
- self.relu = nn.ReLU()
- # Transformer Encoder
- self.transformer_layer = nn.TransformerEncoderLayer(
- d_model=conv_out_channels,
- # d_model=input_size * num_periods, # 这里的d_model应为输入的特征数量, d_model能被num_heads整除
- nhead=num_heads,
- dim_feedforward=hidden_size,
- dropout=dropout
- )
- self.transformer_encoder = nn.TransformerEncoder(
- self.transformer_layer,
- num_layers=num_layers
- )
- # 注意力机制
- self.attention_layer = nn.Sequential(
- nn.Linear(conv_out_channels, hidden_size),
- # nn.Linear(input_size * num_periods, hidden_size),
- # nn.Conv1d(conv_out_channels, hidden_size),
- # nn.Tanh(),
- nn.ReLU(),
- nn.Linear(hidden_size, 1)
- )
- # 分类和回归输出层
- self.fc_classification = nn.Linear(conv_out_channels, 1)
- def forward(self, x):
- """
- 输入x的形状应为 [batch_size, num_periods, seq_length, input_size]
- """
- batch_size, num_periods, seq_length, input_size = x.size()
- # x = x[:,0,:,:].view(batch_size, 1, input_size, seq_length)
- # 将输入转换为 [batch_size, num_periods * input_size, seq_length]
- x = x.permute(0, 1, 3, 2).contiguous() # [batch_size, num_periods, input_size, seq_length]
- x = x.view(batch_size, num_periods * input_size, seq_length) # [batch_size, num_periods * input_size, seq_length]
- # x = x.view(batch_size, 1 * input_size, seq_length)
- # 经过卷积层和激活函数
- x = self.conv1(x) # [batch_size, conv_out_channels, seq_length]
- x = self.relu(x)
- # 转置以适应Transformer输入要求
- x = x.permute(2, 0, 1) # [seq_length, batch_size, conv_out_channels(num_periods * input_size)]
- # 经过Transformer编码器
- x = self.transformer_encoder(x) # [seq_length, batch_size, conv_out_channels(num_periods * input_size)]
- # 计算注意力
- attention_scores = self.attention_layer(x) # [seq_length, batch_size, 1]
- attention_weights = torch.softmax(attention_scores, dim=0) # [seq_length, batch_size, 1]
- # 对所有时间步进行加权求和
- context_vector = torch.sum(attention_weights * x, dim=0) # [batch_size, conv_out_channels(num_periods * input_size)]
-
- # 取最后一个时间步的输出进行分类和回归
- # context_vector = x[-1, :, :] # [batch_size, conv_out_channels(num_periods * input_size)]
- # 分类和回归输出
- classification_output = torch.sigmoid(self.fc_classification(context_vector)) # [batch_size, 1]
- # 打印检查:输出范围
- # print(f"Before clamp: min: {classification_output.min().item()}, max: {classification_output.max().item()}")
- # 将输出值限制在 [0.0001, 0.9999] 范围内,以避免数值极端
- # classification_output = torch.clamp(classification_output, min=1e-4, max=1 - 1e-4)
-
- return classification_output
|