Baseline部分
数据集处理
数据分析
sizeSet = set()
for path in trainPath:
image = nib.load(path)
imgShape = image.dataobj.shape
if imgShape not in sizeSet:
sizeSet.add(imgShape)
print(sizeSet)
可以发现数据中尺寸总体只有以下几种形式:
{(168, 168, 82, 1), (128, 128, 47, 1), (256, 256, 207, 1), (256, 256, 81, 1), (400, 400, 109, 1), (128, 128, 540, 1), (128, 128, 63, 1), (168, 168, 81, 1)},因此在建立数据集时要注意尺寸设置。
### Import related ###
...
# 路径获取
trainPath = glob.glob('./脑PET图像分析和疾病预测挑战赛公开数据/Train/*/*')
testPath = glob.glob('./脑PET图像分析和疾病预测挑战赛公开数据/Test/*')
np.random.shuffle(trainPath)
np.random.shuffle(testPath)
# 因为图片数量较少,防止反复解码带来时间损失,将解码图片存入
DATA_CACHE = {}
class XunFeiDataset(Dataset):
def __init__(self, imgPath, transform=None):
self.imgPath = imgPath
if transform is not None:
self.transform = transform
else:
self.transform = None
def __getitem__(self, index):
if self.imgPath[index] in DATA_CACHE:
img = DATA_CACHE[self.imgPath[index]]
else:
# nib load 读入图片
img = nib.load(self.imgPath[index])
# dataobj 读为numpy.ndarray,最后一维为灰度图,可以去掉,从而转为二维卷积
img = img.dataobj[:,:,:, 0]
DATA_CACHE[self.imgPath[index]] = img
# 随机选择一些通道,防止通道不均,random.choice指变为50个通道,
# 每个通道都是随机抽取原来img.shape[-1]其中的一个通道得到,可以重复
idx = np.random.choice(range(img.shape[-1]), 50)
img = img[:, :, idx]
img = img.astype(np.float32)
if self.transform is not None:
img = self.transform(image = img)['image']
# 转换数据格式为C, H, W
img = img.transpose([2,0,1])
##########################################
# 注意,这里long()很重要,不然会遇到报错 #
##########################################
return img,torch.from_numpy(np.array(int('NC' in self.imgPath[index]))).long()
def __len__(self):
return len(self.imgPath)
import albumentations as A
###########################################
# 如果使用windows系统,num_workers置0最稳妥#
###########################################
train_loader = torch.utils.data.DataLoader(
XunFeiDataset(trainPath[:-10],
A.Compose([
A.RandomRotate90(),
A.RandomCrop(120, 120),
A.HorizontalFlip(p=0.5),
A.RandomContrast(p=0.5),
A.RandomBrightnessContrast(p=0.5),
])
), batch_size=2, shuffle=True, num_workers=0, pin_memory=False
)
val_loader = torch.utils.data.DataLoader(
XunFeiDataset(trainPath[-10:],
A.Compose([
A.RandomCrop(120, 120),
])
), batch_size=2, shuffle=False, num_workers=0, pin_memory=False
)
test_loader = torch.utils.data.DataLoader(
XunFeiDataset(testPath,
A.Compose([
A.RandomCrop(128, 128),
A.HorizontalFlip(p=0.5),
A.RandomContrast(p=0.5),
])
), batch_size=2, shuffle=False, num_workers=0, pin_memory=False
)
这里有两个容易报错的点
- 数据类型不匹配
RuntimeError: "nll_loss_forward_reduce_cuda_kernel_2d_index" not implemented for 'Int'
对应上面数据类型不匹配的问题,使用 .long() 转换。
多线程未能正确退出。
raise RuntimeError('DataLoader worker (pid(s) {}) exited unexpectedly'.format(pids_str)) from e RuntimeError: DataLoader worker (pid(s) 14652) exited unexpectedly.
对应上面 num_workers 设置,因为数据量较小,其实设置num_workers=0即可。
注:本条未验证正确性:如果数据量较大的情况下要加上
if name == '__main__':
模型建立
2D-ResNet
class XunFeiNet(nn.Module):
def __init__(self):
super(XunFeiNet, self).__init__()
model = models.resnet18(True)
model.conv1 = torch.nn.Conv2d(50, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
model.avgpool = nn.AdaptiveAvgPool2d(1)
model.fc = nn.Linear(512, 2)
self.resnet = model
def forward(self, img):
out = self.resnet(img)
return out
model = XunFeiNet()
model = model.to('cuda')
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.AdamW(model.parameters(), 0.001)
这里没什么好说的,使用torch预先设定的模型,并自己修改一些层。
3D-ResNet
具体内容可以参见附录:A。注意这里使用3D-ResNet的时候要保留灰度通道,reshape改为
img = img.transpose([3,2,0,1])
或在使用时 unsqueeze 即可。
model = ResNet3D(Bottleneck, [3, 8, 36, 3], num_classes=2, shortcut_type='B', no_cuda=False, include_top=True) # resnet 152
model = model.to('cuda')
模型训练与验证
def train(train_loader, model, criterion, optimizer):
model.train()
trainLoss = 0.0
probAll = []
labelAll = []
for i, (input, target) in enumerate(train_loader):
input = input.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
output = model(input)
# 求每一行的最大值索引
probAll.extend(np.argmax(output.detach().cpu().numpy(),axis=1))
# 将标签也记录
labelAll.extend(target.cpu().numpy())
loss = criterion(output, target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if i % 20 == 0:
print(loss.item())
trainLoss += loss.item()
# 计算F1-score
print("F1-Score:{:.4f}".format(f1_score(labelAll,probAll)))
return trainLoss/len(train_loader)
def validate(val_loader, model, criterion):
model.eval()
valAcc = 0.0
probAll = []
labelAll = []
with torch.no_grad():
for i, (input, target) in enumerate(val_loader):
input = input.cuda()
target = target.cuda()
# 计算输出
output = model(input)
# 求每一行的最大值索引
probAll.extend(np.argmax(output.detach().cpu().numpy(),axis=1))
# 将标签也记录
labelAll.extend(target.cpu().numpy())
loss = criterion(output, target)
valAcc += (output.argmax(1) == target).sum().item()
# 因为最终比赛使用F1-Score作为评价标准,所以本地也要使用F1-Score。
######################################################################
# 注意:因为线上有提交限制,所以一定要建立好本地分数和线上分数的对应关系#
######################################################################
score = f1_score(labelAll,probAll)
# print("val F1-Score:{:.4f}".format(score))
return (valAcc / len(val_loader.dataset)), score
import copy
bestDict = None
bestF1 = 0
for _ in range(12):
trainLoss = train(train_loader, model, criterion, optimizer)
valAcc, scoreVal = validate(val_loader, model, criterion)
trainAcc, scoreTrain = validate(train_loader, model, criterion)
# 根据输出结果保存模型参数
if scoreVal > bestF1:
# 使用deepcopy建立副本,避免浅拷贝问题
bestDict = copy.deepcopy(model.state_dict())
bestF1 = scoreVal
print(f"[INFO]: Model saved. F1-score {scoreVal}")
print(trainLoss, trainAcc, valAcc, scoreTrain, scoreVal )
print(f"[INFO]: Training finished. F1-score {bestF1}")
模型预测与提交
def predict(test_loader, model, criterion):
model.eval()
val_acc = 0.0
test_pred = []
with torch.no_grad():
for i, (input, target) in enumerate(test_loader):
input = input.cuda()
target = target.cuda()
output = model(input)
test_pred.append(output.data.cpu().numpy())
return np.vstack(test_pred)
pred = None
for _ in range(10):
if pred is None:
pred = predict(test_loader, model, criterion)
else:
pred += predict(test_loader, model, criterion)
##################################################
# 如果是windows系统,这里要换成\\,第一个\代表转义 #
###################################################
submit = pd.DataFrame(
{
'uuid': [int(x.split('\\')[-1][:-4]) for x in testPath],
'label': pred.argmax(1)
})
submit['label'] = submit['label'].map({1:'NC', 0: 'MCI'})
submit = submit.sort_values(by='uuid')
submit.to_csv('submit2.csv', index=None)
被提问到的配置问题
Set-ExecutionPolicy
在Win11系统中,如果出现如下命令没有反应的情况
Set-ExecutionPolicy -scope CurrentUser Remotesigned
可以尝试输入
Get-ExecutionPolicy -List
看一下ExecutionPolicy是否正确修改。
Jupyter notebook 运行无反应或 conda init 失败
因为 Onedrive 可能会开启自动备份或者 Windows 用户名为中文时系统路径,所以可能会因为中文字符原因产生问题。
GBK 编码问题
可能是因为中文字符原因,可以设置EncodingOutput
code $Profile
输入 [System.Console]::OutputEncoding=[System.Text.Encoding]::GetEncoding(65001) 即可,修改为65001 UTF-8编码。
一些小Trick
Test-Time-Augment(TTA)
import ttach as tta
model = ResNet_3d(Bottleneck, [3, 8, 36, 3], num_classes=2, shortcut_type='B', no_cuda=False, include_top=True)
model = model.to('cuda')
model.load_state_dict(best_dict)
transforms = tta.Compose(
[
tta.HorizontalFlip(),
A.RandomContrast(p=0.5),
A.RandomBrightnessContrast(p=0.5),
tta.VerticalFlip(),
# tta.Scale(scales=[1, 2, 4]),
# tta.Multiply(factors=[0.9, 1, 1.1]),
]
)
model_tta = tta.ClassificationTTAWrapper(model, transforms)
def predict(test_loader, model, criterion):
model.eval()
val_acc = 0.0
test_pred = []
with torch.no_grad():
for i, (input, target) in enumerate(test_loader):
input = input.unsqueeze(1).cuda()
target = target.cuda()
output = model(input)
test_pred.append(output.data.cpu().numpy())
return np.vstack(test_pred)
import time
pred = None
for _ in range(10):
if pred is None:
pred = predict(test_loader, model, criterion)
else:
pred += predict(test_loader, model, criterion)
submit = pd.DataFrame(
{
'uuid': [int(x.split('/')[-1].split("\\")[-1][:-4]) for x in testPath],
'label': pred.argmax(1)
})
submit['label'] = submit['label'].map({1:'NC', 0: 'MCI'})
submit = submit.sort_values(by='uuid')
submit.to_csv('submit-tta-pre.csv', index=None)
TTA是一种偏工程的方法,类似于模型集成,增加了鲁棒性,在测试时通过各种数据增广,在得到结果后再综合得到输出。
附录:A
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
from functools import partial
def conv3x3x3(in_planes, out_planes, stride=1, dilation=1):
# 3x3x3 convolution with padding
return nn.Conv3d(
in_planes,
out_planes,
kernel_size=3,
dilation=dilation,
stride=stride,
padding=dilation,
bias=False)
def downsample_basic_block(x, planes, stride, no_cuda=False):
out = F.avg_pool3d(x, kernel_size=1, stride=stride)
zero_pads = torch.Tensor(
out.size(0), planes - out.size(1), out.size(2), out.size(3),
out.size(4)).zero_()
if not no_cuda:
if isinstance(out.data, torch.cuda.FloatTensor):
zero_pads = zero_pads.cuda()
out = Variable(torch.cat([out.data, zero_pads], dim=1))
return out
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, dilation=1, downsample=None):
super(BasicBlock, self).__init__()
self.conv1 = conv3x3x3(inplanes, planes, stride=stride, dilation=dilation)
self.bn1 = nn.BatchNorm3d(planes)
self.relu = nn.ReLU(inplace=True)
self.conv2 = conv3x3x3(planes, planes, dilation=dilation)
self.bn2 = nn.BatchNorm3d(planes)
self.downsample = downsample
self.stride = stride
self.dilation = dilation
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
class Bottleneck(nn.Module):
expansion = 4
def __init__(self, inplanes, planes, stride=1, dilation=1, downsample=None):
super(Bottleneck, self).__init__()
self.conv1 = nn.Conv3d(inplanes, planes, kernel_size=1, bias=False)
self.bn1 = nn.BatchNorm3d(planes)
self.conv2 = nn.Conv3d(
planes, planes, kernel_size=3, stride=stride, dilation=dilation, padding=dilation, bias=False)
self.bn2 = nn.BatchNorm3d(planes)
self.conv3 = nn.Conv3d(planes, planes * 4, kernel_size=1, bias=False)
self.bn3 = nn.BatchNorm3d(planes * 4)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
self.dilation = dilation
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
class ResNet_3d(nn.Module):
def __init__(self,
block,
layers,
num_classes=1000,
shortcut_type='B',
no_cuda = False,
include_top=True):
super(ResNet_3d, self).__init__()
self.inplanes = 64
self.no_cuda = no_cuda
self.include_top = include_top
self.conv1 = nn.Conv3d(
1,
64,
kernel_size=7,
stride=(2, 2, 2),
padding=(3, 3, 3),
bias=False)
self.bn1 = nn.BatchNorm3d(64)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool3d(kernel_size=(3, 3, 3), stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, layers[0], shortcut_type)
self.layer2 = self._make_layer(
block, 128, layers[1], shortcut_type, stride=2)
self.layer3 = self._make_layer(
block, 256, layers[2], shortcut_type, stride=2)
self.layer4 = self._make_layer(
block, 512, layers[3], shortcut_type, stride=2)
if self.include_top:
self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1)) # output size = (1, 1)自适应
self.fc = nn.Linear(512 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, nn.Conv3d):
m.weight = nn.init.kaiming_normal(m.weight, mode='fan_out')
elif isinstance(m, nn.BatchNorm3d):
m.weight.data.fill_(1)
m.bias.data.zero_()
def _make_layer(self, block, planes, blocks, shortcut_type, stride=1, dilation=1):
downsample = None
if stride != 1 or self.inplanes != planes * block.expansion:
if shortcut_type == 'A':
downsample = partial(
downsample_basic_block,
planes=planes * block.expansion,
stride=stride,
no_cuda=self.no_cuda)
else:
downsample = nn.Sequential(
nn.Conv3d(
self.inplanes,
planes * block.expansion,
kernel_size=1,
stride=stride,
bias=False), nn.BatchNorm3d(planes * block.expansion))
layers = []
layers.append(block(self.inplanes, planes, stride=stride, downsample=downsample))
self.inplanes = planes * block.expansion
for i in range(1, blocks):
layers.append(block(self.inplanes, planes))
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
if self.include_top:
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x