# 优化:
# 新建Model类,将神经网络的结构定义、训练流程(前向/后向)和预测逻辑统一封装起来
# 何将权重更新的职责从网络层(Linear)中分离出来,交给优化器(SGD)来完成
# 使用动量梯度下降优化算法(MSGD)
# 增加了Adam优化算法
# 增加了Optimizer类,优化器的父类,统一了优化器的接口
# 增加了dropout层,防止过拟合,训练时启用,测试时禁用
# 设置了随机种子,使得训练结果可复现# 导入必要的库
import numpy as np
import os
import struct# 定义导入函数
def load_images(path):with open(path, "rb") as f:data = f.read()magic_number, num_items, rows, cols = struct.unpack(">iiii", data[:16])return np.asanyarray(bytearray(data[16:]), dtype=np.uint8).reshape(num_items, 28, 28)def load_labels(path):with open(path, "rb") as f:data = f.read()return np.asanyarray(bytearray(data[8:]), dtype=np.int32)# 激活函数
# 定义sigmoid函数
def sigmoid(x):result = np.zeros_like(x)positive_mask = x >= 0result[positive_mask] = 1 / (1 + np.exp(-x[positive_mask]))negative_mask = x < 0exp_x = np.exp(x[negative_mask])result[negative_mask] = exp_x / (1 + exp_x)return result# 定义softmax函数
def softmax(x):max_x = np.max(x, axis=-1, keepdims=True)x = x - max_xex = np.exp(x)sum_ex = np.sum(ex, axis=1, keepdims=True)result = ex / sum_exresult = np.clip(result, 1e-10, 1e10)return result# 训练集编码处理
# 定义独热编码函数
def make_onehot(labels, class_num):result = np.zeros((labels.shape[0], class_num))for idx, cls in enumerate(labels):result[idx, cls] = 1return result# 定义dataset类
class Dataset:def __init__(self, all_images, all_labels):self.all_images = all_imagesself.all_labels = all_labelsdef __getitem__(self, index):image = self.all_images[index]label = self.all_labels[index]return image, labeldef __len__(self):return len(self.all_images)# 定义dataloader类
class DataLoader:def __init__(self, dataset, batch_size, shuffle=True):self.dataset = datasetself.batch_size = batch_sizeself.shuffle = shuffleself.idx = np.arange(len(self.dataset))def __iter__(self):# 如果需要打乱,则在每个 epoch 开始时重新排列索引if self.shuffle:np.random.shuffle(self.idx)self.cursor = 0return selfdef __next__(self):if self.cursor >= len(self.dataset):raise StopIteration# 使用索引来获取数据batch_idx = self.idx[self.cursor : min(self.cursor + self.batch_size, len(self.dataset))]batch_images = self.dataset.all_images[batch_idx]batch_labels = self.dataset.all_labels[batch_idx]self.cursor += self.batch_sizereturn batch_images, batch_labels# 父类Module,查看各层结构
# 定义Module类
class Module: def __init__(self):self.info = "Module:\n"self.params = []def __repr__(self):return self.info# 定义Parameter类
class Parameter:def __init__(self, weight):self.weight = weightself.grad = np.zeros_like(weight)self.velocity = np.zeros_like(weight) # 🆕 新增:动量/速度向量# 定义linear类
class Linear(Module):def __init__(self, in_features, out_features):super().__init__()self.info += f"** Linear({in_features}, {out_features})"self.W = Parameter(np.random.normal(0, 1, size=(in_features, out_features)))self.B = Parameter(np.random.normal(0, 1, size=(1, out_features)))self.params.append(self.W)self.params.append(self.B) def forward(self, x):self.x = xreturn np.dot(x, self.W.weight) + self.B.weightdef backward(self, G):self.W.grad = np.dot(self.x.T, G)self.B.grad = np.mean(G, axis=0, keepdims=True)return np.dot(G, self.W.weight.T)# 定义Conv2D类
class Conv2D(Module):def __init__(self, in_channel, out_channel):super(Conv2D, self).__init__()self.info += f" Conv2D({in_channel, out_channel})"self.W = Parameter(np.random.normal(0, 1, size=(in_channel, out_channel)))self.B = Parameter(np.zeros((1, out_channel)))self.params.append(self.W)self.params.append(self.B)def forward(self, x):result = x @ self.W.weight + self.B.weightself.x = xreturn resultdef backward(self, G):self.W.grad = self.x.T @ Gself.B.grad = np.mean(G, axis=0, keepdims=True)delta_x = G @ self.W.weight.Treturn delta_x# 定义Conv1D类
class Conv1D(Module):def __init__(self, in_channel, out_channel):super(Conv1D, self).__init__()self.info += f" Conv1D({in_channel,out_channel})"self.W = Parameter(np.random.normal(0, 1, size=(in_channel, out_channel)))self.B = Parameter(np.zeros((1, out_channel)))self.params.append(self.W)self.params.append(self.B)def forward(self, x):result = x @ self.W.weight + self.B.weightself.x = xreturn resultdef backward(self, G):self.W.grad = self.x.T @ Gself.B.grad = np.mean(G, axis=0, keepdims=True)delta_x = G @ self.W.weight.Treturn delta_x# 优化器的父类
# 定义Optimizer类
class Optimizer:def __init__(self, parameters, lr):self.parameters = parametersself.lr = lrdef zero_grad(self):for p in self.parameters:p.grad.fill(0)# 定义SGD类,学习率较大
class SGD(Optimizer):def step(self):for p in self.parameters:p.weight -= self.lr * p.grad# 定义MSGD类,学习率较大
class MSGD(Optimizer):def __init__(self, parameters, lr, u):super().__init__(parameters, lr)self.u = udef step(self):for p in self.parameters:# 1. 更新速度 V_t = u * V_{t-1} + p.gradp.velocity = self.u * p.velocity + p.grad# 2. 更新权重 W = W - lr * V_tp.weight -= self.lr * p.velocity# 定义Adam类,学习率一般较小10^-3到10^-6
class Adam(Optimizer):def __init__(self, parameters, lr, beta1=0.9, beta2=0.999, e=1e-8):super().__init__(parameters, lr)self.beta1 = beta1self.beta2 = beta2self.e = eself.t=0for p in self.parameters:#p.m = 0p.m = np.zeros_like(p.weight)#p.v = 0p.v = np.zeros_like(p.weight)def step(self):self.t += 1for p in self.parameters:gt=p.gradp.m = self.beta1*p.m + (1-self.beta1)*gtp.v = self.beta2*p.v + (1-self.beta2)*gt**2mt_=p.m/(1-self.beta1**self.t)vt_=p.v/(1-self.beta2**self.t)p.weight = p.weight - self.lr*mt_/np.sqrt(vt_+self.e)# 定义Sigmoid类
class Sigmoid(Module):def __init__(self):super().__init__()self.info += "** Sigmoid()" # 打印信息def forward(self, x):self.result = sigmoid(x)return self.resultdef backward(self, G):return G * self.result * (1 - self.result)# 定义Tanh类
class Tanh(Module):def __init__(self):super().__init__()self.info += "** Tanh()" # 打印信息def forward(self, x):self.result = 2 * sigmoid(2 * x) - 1return self.resultdef backward(self, G):return G * (1 - self.result**2)# 定义Softmax类
class Softmax(Module):def __init__(self):super().__init__()self.info += "** Softmax()" # 打印信息def forward(self, x):self.p = softmax(x)return self.pdef backward(self, G):G = (self.p - G) / len(G)return G# 定义ReLU类
class ReLU(Module):def __init__(self):super().__init__()self.info += "** ReLU()" # 打印信息def forward(self, x):self.x = xreturn np.maximum(0, x)def backward(self, G):grad = G.copy()grad[self.x <= 0] = 0return grad# 定义Dropout类class Dropout(Module):def __init__(self, p=0.3):super().__init__()self.info += f"** Dropout(p={p})" # 打印信息self.p = pself.is_training = True # 🆕 新增:训练状态标志def forward(self, x):if not self.is_training:return x # 评估时直接返回r = np.random.rand(*x.shape)self.mask = r >= self.p # 创建掩码# 应用掩码和缩放return (x * self.mask) / (1 - self.p)def backward(self, G):if not self.is_training:return G # 评估时直接返回梯度G[~self.mask] = 0return G / (1 - self.p)# 定义ModelList类
class ModelList:def __init__(self, layers):self.layers = layersdef forward(self, x):for layer in self.layers:x = layer.forward(x)return xdef backward(self, G):for layer in self.layers[::-1]:G = layer.backward(G)def __repr__(self):info = ""for layer in self.layers:info += layer.info + "\n"return info# 定义Model类
class Model:def __init__(self):self.model_list = ModelList([Linear(784, 512),ReLU(),Dropout(0.2),Linear(512, 256),Tanh(),Dropout(0.1),Linear(256, 10),Softmax(),])def forward(self, x, label=None):pre = self.model_list.forward(x)if label is not None:self.label = labelloss = -np.mean(self.label * np.log(pre))return losselse:return np.argmax(pre, axis=-1)def backward(self):self.model_list.backward(self.label)def train(self):"""设置模型为训练模式 (启用 Dropout)。"""for layer in self.model_list.layers:# 检查层是否有 is_training 属性 (即只针对 Dropout 层)if hasattr(layer, "is_training"):layer.is_training = Truedef eval(self):"""设置模型为评估/推理模式 (禁用 Dropout)。"""for layer in self.model_list.layers:if hasattr(layer, "is_training"):layer.is_training = Falsedef __repr__(self):return self.model_list.__repr__()def parameter(self):all_Parameter = []for layer in self.model_list.layers:all_Parameter.extend(layer.params)return all_Parameter# 主函数
if __name__ == "__main__":# 设置随机种子np.random.seed(1000)# 加载训练集图片、标签train_images = (load_images(os.path.join("Python", "NLP basic", "data", "minist", "train-images.idx3-ubyte"))/ 255)train_labels = make_onehot(load_labels(os.path.join("Python", "NLP basic", "data", "minist", "train-labels.idx1-ubyte")),10,)# 加载测试集图片、标签dev_images = (load_images(os.path.join("Python", "NLP basic", "data", "minist", "t10k-images.idx3-ubyte"))/ 255)dev_labels = load_labels(os.path.join("Python", "NLP basic", "data", "minist", "t10k-labels.idx1-ubyte"))# 设置超参数epochs = 10lr = 1e-2 batch_size = 200# 展开图片数据train_images = train_images.reshape(60000, 784)dev_images = dev_images.reshape(-1, 784)# 调用dataset类和dataloader类train_dataset = Dataset(train_images, train_labels)train_dataloader = DataLoader(train_dataset, batch_size)dev_dataset = Dataset(dev_images, dev_labels)dev_dataloader = DataLoader(dev_dataset, batch_size)# 定义模型model = Model()# 定义优化器# opt = SGD(model.parameter(), lr)#opt=MSGD(model.parameter(),lr,0.8)opt=Adam(model.parameter(),lr)# print(model)# 训练集训练过程for e in range(epochs):# 启用训练模式model.train() # 训练集训练for x, l in train_dataloader:loss = model.forward(x, l)model.backward()opt.step()opt.zero_grad()# 验证集验证并输出预测准确率# 切换到评估模式,禁用 Dropoutmodel.eval() right_num = 0for x, batch_labels in dev_dataloader:pre_idx = model.forward(x)right_num += np.sum(pre_idx == batch_labels) # 统计正确个数acc = right_num / len(dev_images) # 计算准确率print(f"Epoch {e}, Acc: {acc:.4f}")
