import torch import torch.nn as nn # 分类模型 (Transformer) class PriceDropClassifiTransModel(nn.Module): def __init__(self, input_size, num_periods=2, hidden_size=128, num_layers=3, output_size=1, dropout=0.3, conv_out_channels=64, kernel_size=3, num_heads=8): super(PriceDropClassifiTransModel, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.num_periods = num_periods # 卷积层 self.conv1 = nn.Conv1d( in_channels=input_size * num_periods, out_channels=conv_out_channels, kernel_size=kernel_size, padding=kernel_size // 2, bias=False, ) self.relu = nn.ReLU() # Transformer Encoder self.transformer_layer = nn.TransformerEncoderLayer( d_model=conv_out_channels, # d_model=input_size * num_periods, # 这里的d_model应为输入的特征数量, d_model能被num_heads整除 nhead=num_heads, dim_feedforward=hidden_size, dropout=dropout ) self.transformer_encoder = nn.TransformerEncoder( self.transformer_layer, num_layers=num_layers ) # 注意力机制 self.attention_layer = nn.Sequential( nn.Linear(conv_out_channels, hidden_size), # nn.Linear(input_size * num_periods, hidden_size), # nn.Conv1d(conv_out_channels, hidden_size), # nn.Tanh(), nn.ReLU(), nn.Linear(hidden_size, 1) ) # 分类和回归输出层 self.fc_classification = nn.Linear(conv_out_channels, 1) def forward(self, x): """ 输入x的形状应为 [batch_size, num_periods, seq_length, input_size] """ batch_size, num_periods, seq_length, input_size = x.size() # x = x[:,0,:,:].view(batch_size, 1, input_size, seq_length) # 将输入转换为 [batch_size, num_periods * input_size, seq_length] x = x.permute(0, 1, 3, 2).contiguous() # [batch_size, num_periods, input_size, seq_length] x = x.view(batch_size, num_periods * input_size, seq_length) # [batch_size, num_periods * input_size, seq_length] # x = x.view(batch_size, 1 * input_size, seq_length) # 经过卷积层和激活函数 x = self.conv1(x) # [batch_size, conv_out_channels, seq_length] x = self.relu(x) # 转置以适应Transformer输入要求 x = x.permute(2, 0, 1) # [seq_length, batch_size, conv_out_channels(num_periods * input_size)] # 经过Transformer编码器 x = self.transformer_encoder(x) # [seq_length, batch_size, conv_out_channels(num_periods * input_size)] # 计算注意力 attention_scores = self.attention_layer(x) # [seq_length, batch_size, 1] attention_weights = torch.softmax(attention_scores, dim=0) # [seq_length, batch_size, 1] # 对所有时间步进行加权求和 context_vector = torch.sum(attention_weights * x, dim=0) # [batch_size, conv_out_channels(num_periods * input_size)] # 取最后一个时间步的输出进行分类和回归 # context_vector = x[-1, :, :] # [batch_size, conv_out_channels(num_periods * input_size)] # 分类和回归输出 classification_output = torch.sigmoid(self.fc_classification(context_vector)) # [batch_size, 1] # 打印检查:输出范围 # print(f"Before clamp: min: {classification_output.min().item()}, max: {classification_output.max().item()}") # 将输出值限制在 [0.0001, 0.9999] 范围内,以避免数值极端 # classification_output = torch.clamp(classification_output, min=1e-4, max=1 - 1e-4) return classification_output