全卷积网络(FCN,Fully Convolutional Networks),是2015年,由{jonlong,shelhamer,trevor}@cs.berkeley.edu提出的,这是深度学习在图像分割领域的开山之作,奠定了使用深度网络解决图像语义分割问题的基础框架。参考:Fully Convolutional Networks for Semantic Segmentation
它的意义在于:提出非常简单易懂的全卷积模型结构,先用卷积网络,例如MobileNetV2的特征提取层提取图像特征,然后移除最后的全连接层,接着使用上采样的转置卷积层(Transpose Convolution)将多次下采样的特征图恢复到和原图一样的大小,最后对每个像素生成一个分类的标签转置卷积层(Transpose Convolution)可以理解为一个可以被训练的自动插值器,实现将低分辨率的图片转换为高分辨率的图片。插值算法,例如:
- 最近邻插值(Nearest neighbor interpolation)
- 双线性插值(Bi-Linear interpolation)
- 双立方插值(Bi-Cubic interpolation)
已经很成熟了,这些插值算法类似于手动特征工程,依据设计者的经验,不可以被训练,并没有给神经网络学习的余地。
转置卷积这个方法不会使用预先定义的插值方法,它具有可以学习的参数
参考《Up-sampling with Transposed Convolution》
图像语义分割的目标是给每个像素赋予一个类别标签,目前经典语义分割模型在PASCAL VOC2012数据集上的测试结果如下:基于TensorFlow实现的语义分割模型源代码,在运行过程中,可以感受模型的预测是怎样随着训练而改善的
import tensorflow as tf
from tensorflow_examples.models.pix2pix import pix2pix
import tensorflow_datasets as tfds
tfds.disable_progress_bar()
import matplotlib.pyplot as plt
dataset, info = tfds.load('oxford_iiit_pet:3.*.*', with_info=True)
print(info.splits['train'].num_examples)
TRAIN_LENGTH = info.splits['train'].num_examples
BATCH_SIZE = 64
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE
def normalize(input_image, input_mask):
input_image = tf.cast(input_image, tf.float32) / 255.0 #图像标准化到 [0,1]
input_mask -= 1 #分割掩码都减 1,得到了以下的标签:{0, 1, 2}
return input_image, input_mask
@tf.function
def load_image_train(datapoint):
input_image = tf.image.resize(datapoint['image'], (128,128))
input_mask = tf.image.resize(datapoint['segmentation_mask'], (128,128))
if tf.random.uniform(()) > 0.5:
input_image = tf.image.flip_left_right(input_image)
input_mask = tf.image.flip_left_right(input_mask)
input_image, input_mask = normalize(input_image, input_mask)
return input_image, input_mask
def load_image_test(datapoint):
# 测试数据与训练数据做一样的resize + normalize操作
input_image = tf.image.resize(datapoint['image'], (128,128))
input_mask = tf.image.resize(datapoint['segmentation_mask'], (128,128))
# 测试数据无需图像增强操作
input_image, input_mask = normalize(input_image, input_mask)
return input_image, input_mask
train = dataset['train'].map(load_image_train, num_parallel_calls=tf.data.AUTOTUNE)
test = dataset['test'].map(load_image_test)
# If you wish to randomize the iteration order, make sure to call shuffle after calling cache
train_dataset = train.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
test_dataset = test.batch(BATCH_SIZE)
def display(display_list):
plt.figure(figsize=(9, 9))
title = ['Input Image', 'True Mask', 'Predicted Mask']
for i in range(len(display_list)):
plt.subplot(1, len(display_list), i+1)
plt.title(title[i])
plt.imshow(display_list[i])
plt.axis('off')
plt.show()
for batch_images, batch_masks in train_dataset.take(1):
print(batch_images.shape, batch_masks.shape)
sample_image, sample_mask = batch_images[0].numpy(), batch_masks[0].numpy()
print(type(sample_image),type(sample_mask))
#display([sample_image, sample_mask])
# 输出信道数量为 3 是因为每个像素有三种可能的标签。
# 把这想象成一个多类别分类,对每个像素进行分类,每个像素属于三种类别之一
OUTPUT_CHANNELS = 3
# 直接使用预训练模型: MobileNetV2
base_model = tf.keras.applications.MobileNetV2(
input_shape=[128,128,3],
include_top=False
)
#base_model.summary()
# 使用这些层的激活设置
layer_names = [
'block_1_expand_relu', # 64x64
'block_3_expand_relu', # 32x32
'block_6_expand_relu', # 16x16
'block_13_expand_relu', # 8x8
'block_16_project', # 4x4
]
layers = [base_model.get_layer(name).output for name in layer_names]
# 创建特征提取模型
down_stack = tf.keras.Model(
inputs=base_model.input,
outputs=layers
)
down_stack.trainable = False
#down_stack.summary()
# 直接利用TensorFlow examples 的解码器/上取样器
up_stack = [
pix2pix.upsample(512,3), # 4x4 -> 8x8
pix2pix.upsample(256,3), # 8x8 -> 16x16
pix2pix.upsample(128,3), # 16x16 -> 32x32
pix2pix.upsample(64,3), # 32x32 -> 64x64
]
def unet_model(output_channels):
inputs = tf.keras.Input(shape=(128,128,3))
x = inputs
# 在模型中降采样
skips = down_stack(x)
x = skips[-1]
skips = reversed(skips[:-1])
# 上采样然后建立跳跃连接
for up, skip in zip(up_stack, skips):
x = up(x)
concat = tf.keras.layers.Concatenate()
x = concat([x, skip])
# 模型的最后一层
last = tf.keras.layers.Conv2DTranspose(
filters=output_channels,
kernel_size=3,
strides=2,
padding='same'
) # 64x64 -> 128x128
outputs = last(x) # Output shape (batch_size, new_rows, new_cols, filters)
return tf.keras.Model(inputs=inputs, outputs=outputs)
print("build unet...")
model = unet_model(OUTPUT_CHANNELS)
# model.summary()
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
def create_mask(pred_mask):
print(pred_mask.shape)
pred_mask = tf.argmax(pred_mask, axis=-1)
print(pred_mask.shape)
pred_mask = pred_mask[..., tf.newaxis]
print(pred_mask.shape)
return pred_mask[0]
def show_predictions(dataset=None, num=1):
if dataset:
for image, mask in dataset.take(num):
pred_mask = model.predict(image)
print("pred_mask shape:",pred_mask.shape, pred_mask[0,0,0])
display([image[0], mask[0], create_mask(pred_mask)])
else:
pred_mask = model.predict(sample_image[tf.newaxis, ...])
print(sample_image.shape, sample_mask.shape, pred_mask.shape)
pred_mask = create_mask(pred_mask)
display([sample_image, sample_mask, pred_mask])
#print("show_predictions...")
#show_predictions()
class DisplayCallback(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
#clear_output(wait=True)
show_predictions()
print ('\nSample Prediction after epoch {}\n'.format(epoch+1))
EPOCHS = 20
VAL_SUBSPLITS = 5
VALIDATION_STEPS = info.splits['test'].num_examples//BATCH_SIZE//VAL_SUBSPLITS
model_history = model.fit(train_dataset, epochs=EPOCHS,
steps_per_epoch=STEPS_PER_EPOCH,
validation_steps=VALIDATION_STEPS,
validation_data=test_dataset,
callbacks=[DisplayCallback()])
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']
epochs = range(EPOCHS)
plt.figure()
plt.plot(epochs, loss, 'r', label='Training loss')
plt.plot(epochs, val_loss, 'bo', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.ylim([0, 1])
plt.legend()
plt.show()
show_predictions(test_dataset, 3)
model.save("unet.h5")