Announcement

👇Official Account👇

图片

Welcome to join the group & private message

Article first/tail QR code

Skip to content

Python TensorFlow 实战:Keras 入门到精通

TensorFlow 是 Google 开发的开源深度学习框架,Keras 是其高级 API,使深度学习变得简单直观。本文将带你全面掌握 TensorFlow/Keras 的核心技术和实践方法。

🚀 TensorFlow 简介

1. TensorFlow vs PyTorch

python
"""
TensorFlow vs PyTorch 对比

TensorFlow 优势:
- 生产部署成熟:TF Serving、TF Lite、TF.js
- 跨平台支持:CPU、GPU、TPU、移动设备
- 完整生态系统:TensorBoard、TFX、Hub
- 企业级支持:Google 官方支持

Keras 优势:
- 简洁易用:高层 API,几行代码构建模型
- 快速原型:适合研究和实验
- 模块化设计:易于组合和扩展
- 内置实用工具:预处理、数据增强、回调函数

选择建议:
- 生产部署:TensorFlow
- 快速原型:Keras
- 研究项目:PyTorch
- 移动部署:TensorFlow Lite
"""

2. 安装 TensorFlow

bash
# CPU 版本
pip install tensorflow

# GPU 版本(CUDA 11.2)
pip install tensorflow-gpu

# 验证安装
python -c "import tensorflow as tf; print(tf.__version__); print('GPU Available:', tf.config.list_physical_devices('GPU'))"

# 安装额外的工具
pip install tensorflow-datasets  # 标准数据集
pip install tensorboard          # 可视化工具
pip install tensorflow-hub       # 预训练模型

🧠 Keras 基础

1. Sequential 模型

python
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# 创建 Sequential 模型
model = keras.Sequential([
    layers.Dense(128, activation='relu', input_shape=(784,)),
    layers.Dropout(0.2),
    layers.Dense(64, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(10, activation='softmax')
])

# 查看模型摘要
model.summary()

# 或者逐步添加层
model = keras.Sequential()
model.add(layers.Dense(128, activation='relu', input_shape=(784,)))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(10, activation='softmax'))

2. Functional API

python
# Functional API 更灵活,可以构建复杂模型
inputs = keras.Input(shape=(784,))

x = layers.Dense(128, activation='relu')(inputs)
x = layers.Dropout(0.2)(x)
x = layers.Dense(64, activation='relu')(x)
x = layers.Dropout(0.2)(x)
outputs = layers.Dense(10, activation='softmax')(x)

model = keras.Model(inputs=inputs, outputs=outputs)

# 查看模型结构
keras.utils.plot_model(model, show_shapes=True)

# 多输入多输出模型
input1 = keras.Input(shape=(28, 28, 1), name='image')
input2 = keras.Input(shape=(10,), name='metadata')

x1 = layers.Conv2D(32, 3, activation='relu')(input1)
x1 = layers.MaxPooling2D()(x1)
x1 = layers.Flatten()(x1)

x2 = layers.Dense(16, activation='relu')(input2)

x = layers.Concatenate()([x1, x2])
x = layers.Dense(64, activation='relu')(x)
output1 = layers.Dense(10, activation='softmax', name='classification')(x)
output2 = layers.Dense(1, activation='sigmoid', name='regression')(x)

model = keras.Model(inputs=[input1, input2], outputs=[output1, output2])

3. 自定义模型

python
# 继承 Model 类创建自定义模型
class CustomModel(keras.Model):
    def __init__(self, num_classes=10):
        super(CustomModel, self).__init__()
        self.dense1 = layers.Dense(128, activation='relu')
        self.dropout1 = layers.Dropout(0.2)
        self.dense2 = layers.Dense(64, activation='relu')
        self.dropout2 = layers.Dropout(0.2)
        self.dense3 = layers.Dense(num_classes, activation='softmax')
    
    def call(self, inputs, training=False):
        x = self.dense1(inputs)
        if training:
            x = self.dropout1(x, training=training)
        x = self.dense2(x)
        if training:
            x = self.dropout2(x, training=training)
        return self.dense3(x)

# 创建模型
model = CustomModel(num_classes=10)

📊 数据处理

1. 数据加载和预处理

python
import tensorflow as tf
from tensorflow.keras import datasets, layers, models, preprocessing

# 加载内置数据集
(train_images, train_labels), (test_images, test_labels) = datasets.mnist.load_data()

# 数据预处理
train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255

train_labels = train_labels.astype('float32')
test_labels = test_labels.astype('float32')

# 使用 ImageDataGenerator 进行数据增强
train_datagen = preprocessing.image.ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

test_datagen = preprocessing.image.ImageDataGenerator()

# 使用 tf.data API
def preprocess(image, label):
    image = tf.cast(image, tf.float32) / 255.0
    return image, label

# 创建数据集
train_ds = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
train_ds = train_ds.map(preprocess).shuffle(10000).batch(32).prefetch(tf.data.AUTOTUNE)

test_ds = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
test_ds = test_ds.map(preprocess).batch(32).prefetch(tf.data.AUTOTUNE)

# 从目录加载数据
train_ds = preprocessing.image_dataset_from_directory(
    'data/train',
    image_size=(224, 224),
    batch_size=32
)

val_ds = preprocessing.image_dataset_from_directory(
    'data/val',
    image_size=(224, 224),
    batch_size=32
)

2. 自定义数据生成器

python
import numpy as np
import tensorflow as tf

class CustomDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, data, labels, batch_size=32, shuffle=True):
        self.data = data
        self.labels = labels
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indexes = np.arange(len(self.data))
        if self.shuffle:
            np.random.shuffle(self.indexes)
    
    def __len__(self):
        return int(np.ceil(len(self.data) / self.batch_size))
    
    def __getitem__(self, idx):
        indexes = self.indexes[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_data = self.data[indexes]
        batch_labels = self.labels[indexes]
        return batch_data, batch_labels
    
    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexes)

# 使用自定义生成器
train_generator = CustomDataGenerator(train_images, train_labels, batch_size=32)

🖼️ 卷积神经网络

1. 构建 CNN

python
from tensorflow.keras import layers, models

def create_cnn_model(input_shape=(28, 28, 1), num_classes=10):
    model = models.Sequential([
        # 第一个卷积块
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        layers.BatchNormalization(),
        layers.Conv2D(32, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # 第二个卷积块
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # 第三个卷积块
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        layers.Dropout(0.25),
        
        # 全连接层
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation='softmax')
    ])
    
    return model

model = create_cnn_model()
model.summary()

2. 训练 CNN

python
from tensorflow.keras import optimizers, losses, metrics, callbacks

# 编译模型
model.compile(
    optimizer=optimizers.Adam(learning_rate=0.001),
    loss=losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

# 学习率调度器
lr_scheduler = callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=5,
    min_lr=1e-7
)

# 早停
early_stopping = callbacks.EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)

# ModelCheckpoint
checkpoint = callbacks.ModelCheckpoint(
    'best_model.h5',
    monitor='val_accuracy',
    save_best_only=True,
    mode='max'
)

# TensorBoard
tensorboard = callbacks.TensorBoard(
    log_dir='./logs',
    histogram_freq=1
)

# 训练模型
history = model.fit(
    train_images, train_labels,
    epochs=50,
    batch_size=32,
    validation_split=0.2,
    callbacks=[lr_scheduler, early_stopping, checkpoint, tensorboard]
)

# 评估模型
test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2)
print(f'\nTest accuracy: {test_acc:.4f}')

3. 可视化训练过程

python
import matplotlib.pyplot as plt

# 绘制训练和验证的损失和准确率
def plot_training_history(history):
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # 损失曲线
    axes[0].plot(history.history['loss'], label='Train Loss')
    axes[0].plot(history.history['val_loss'], label='Val Loss')
    axes[0].set_title('Model Loss')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # 准确率曲线
    axes[1].plot(history.history['accuracy'], label='Train Accuracy')
    axes[1].plot(history.history['val_accuracy'], label='Val Accuracy')
    axes[1].set_title('Model Accuracy')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Accuracy')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

plot_training_history(history)

🔄 迁移学习

1. 使用预训练模型

python
from tensorflow.keras.applications import VGG16, ResNet50, EfficientNetB0
from tensorflow.keras import layers, models

# 加载预训练模型(不包括顶层)
def create_transfer_model(base_model_name='VGG16', num_classes=10):
    if base_model_name == 'VGG16':
        base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    elif base_model_name == 'ResNet50':
        base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    elif base_model_name == 'EfficientNetB0':
        base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    
    # 冻结基础模型
    base_model.trainable = False
    
    # 添加自定义层
    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(512, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation='softmax')
    ])
    
    return model

# 创建模型
model = create_transfer_model('ResNet50', num_classes=10)
model.summary()

# 编译和训练
model.compile(
    optimizer=optimizers.Adam(learning_rate=0.001),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=10
)

# 微调:解冻部分层进行训练
base_model = model.layers[0]
base_model.trainable = True

# 只训练最后几层
for layer in base_model.layers[:-10]:
    layer.trainable = False

model.compile(
    optimizer=optimizers.Adam(learning_rate=1e-5),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

history_fine = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=20
)

🌐 循环神经网络

1. 构建 RNN/LSTM

python
from tensorflow.keras import layers, models

# Simple RNN
def create_rnn_model(vocab_size, embedding_dim, max_length):
    model = models.Sequential([
        layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
        layers.SimpleRNN(64, return_sequences=True),
        layers.SimpleRNN(32),
        layers.Dense(1, activation='sigmoid')
    ])
    return model

# LSTM
def create_lstm_model(vocab_size, embedding_dim, max_length):
    model = models.Sequential([
        layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
        layers.LSTM(64, return_sequences=True),
        layers.LSTM(32),
        layers.Dense(1, activation='sigmoid')
    ])
    return model

# Bi-LSTM(双向 LSTM)
def create_bilstm_model(vocab_size, embedding_dim, max_length):
    model = models.Sequential([
        layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
        layers.Bidirectional(layers.LSTM(64, return_sequences=True)),
        layers.Bidirectional(layers.LSTM(32)),
        layers.Dense(1, activation='sigmoid')
    ])
    return model

# GRU
def create_gru_model(vocab_size, embedding_dim, max_length):
    model = models.Sequential([
        layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
        layers.GRU(64, return_sequences=True),
        layers.GRU(32),
        layers.Dense(1, activation='sigmoid')
    ])
    return model

2. 文本分类示例

python
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np

# 示例数据
texts = [
    "This is a positive review",
    "I love this product",
    "Best movie ever",
    "Terrible experience",
    "I hate this",
    "Worst service"
]
labels = [1, 1, 1, 0, 0, 0]

# 文本预处理
tokenizer = Tokenizer(num_words=5000, oov_token='<OOV>')
tokenizer.fit_on_texts(texts)

sequences = tokenizer.texts_to_sequences(texts)
padded_sequences = pad_sequences(sequences, maxlen=20, padding='post', truncating='post')

# 转换为 NumPy 数组
X = np.array(padded_sequences)
y = np.array(labels)

# 创建模型
vocab_size = 5000
embedding_dim = 64
max_length = 20

model = models.Sequential([
    layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
    layers.Bidirectional(layers.LSTM(64)),
    layers.Dense(24, activation='relu'),
    layers.Dense(1, activation='sigmoid')
])

# 编译和训练
model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy']
)

history = model.fit(
    X, y,
    epochs=10,
    batch_size=2,
    validation_split=0.2
)

# 预测
new_text = ["This is amazing"]
new_sequence = tokenizer.texts_to_sequences(new_text)
new_padded = pad_sequences(new_sequence, maxlen=max_length, padding='post')
prediction = model.predict(new_padded)
print(f"Prediction: {prediction[0][0]:.4f}")

🎯 实战案例:图像分类

python
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
import matplotlib.pyplot as plt

def train_image_classifier():
    # 加载数据集
    (train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.cifar10.load_data()
    
    # 数据预处理
    train_images = train_images.astype('float32') / 255.0
    test_images = test_images.astype('float32') / 255.0
    
    train_labels = train_labels.reshape(-1)
    test_labels = test_labels.reshape(-1)
    
    # 数据增强
    data_augmentation = tf.keras.Sequential([
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.1),
        layers.RandomZoom(0.1),
    ])
    
    # 创建模型
    def create_model():
        inputs = tf.keras.Input(shape=(32, 32, 3))
        
        # 数据增强
        x = data_augmentation(inputs)
        
        # 卷积层
        x = layers.Rescaling(1./255)(x)
        x = layers.Conv2D(32, 3, activation='relu')(x)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D()(x)
        
        x = layers.Conv2D(64, 3, activation='relu')(x)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D()(x)
        
        x = layers.Conv2D(128, 3, activation='relu')(x)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D()(x)
        
        x = layers.Flatten()(x)
        x = layers.Dense(256, activation='relu')(x)
        x = layers.Dropout(0.5)(x)
        outputs = layers.Dense(10, activation='softmax')(x)
        
        model = tf.keras.Model(inputs, outputs)
        return model
    
    model = create_model()
    
    # 学习率调度器
    lr_schedule = optimizers.schedules.ExponentialDecay(
        initial_learning_rate=0.001,
        decay_steps=10000,
        decay_rate=0.9
    )
    
    # 编译模型
    model.compile(
        optimizer=optimizers.Adam(learning_rate=lr_schedule),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    # 回调函数
    callbacks_list = [
        tf.keras.callbacks.EarlyStopping(
            monitor='val_accuracy',
            patience=10,
            restore_best_weights=True
        ),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5
        )
    ]
    
    # 训练模型
    history = model.fit(
        train_images, train_labels,
        batch_size=64,
        epochs=50,
        validation_split=0.2,
        callbacks=callbacks_list
    )
    
    # 评估模型
    test_loss, test_acc = model.evaluate(test_images, test_labels)
    print(f'\nTest accuracy: {test_acc:.4f}')
    
    # 可视化训练过程
    plot_training_history(history)
    
    return model

# 运行训练
model = train_image_classifier()

📚 最佳实践

1. 模型保存和加载

python
# 保存整个模型
model.save('my_model.h5')

# 加载模型
loaded_model = tf.keras.models.load_model('my_model.h5')

# 只保存权重
model.save_weights('model_weights.h5')

# 加载权重
model.load_weights('model_weights.h5')

# 保存为 SavedModel 格式(推荐用于生产)
model.save('saved_model/my_model')

# 加载 SavedModel
loaded_model = tf.keras.models.load_model('saved_model/my_model')

2. 自定义回调函数

python
class CustomCallback(tf.keras.callbacks.Callback):
    def __init__(self, patience=5):
        super(CustomCallback, self).__init__()
        self.patience = patience
        self.best_accuracy = 0
        self.counter = 0
    
    def on_epoch_end(self, epoch, logs=None):
        current_accuracy = logs.get('val_accuracy')
        if current_accuracy > self.best_accuracy:
            self.best_accuracy = current_accuracy
            self.counter = 0
            # 保存最佳模型
            self.model.save('best_model.h5')
            print(f'\nNew best accuracy: {current_accuracy:.4f}')
        else:
            self.counter += 1
            if self.counter >= self.patience:
                print(f'\nEarly stopping at epoch {epoch}')
                self.model.stop_training = True
    
    def on_train_begin(self, logs=None):
        print('Starting training')
    
    def on_train_end(self, logs=None):
        print('Training completed')

# 使用自定义回调
custom_callback = CustomCallback(patience=5)
history = model.fit(
    train_images, train_labels,
    epochs=50,
    validation_split=0.2,
    callbacks=[custom_callback]
)

3. 使用 TensorBoard

python
# 启动 TensorBoard
# tensorboard --logdir=./logs

# 在训练时使用 TensorBoard
tensorboard_callback = tf.keras.callbacks.TensorBoard(
    log_dir='./logs',
    histogram_freq=1,
    write_graph=True,
    write_images=True
)

history = model.fit(
    train_images, train_labels,
    epochs=50,
    validation_split=0.2,
    callbacks=[tensorboard_callback]
)

4. 混合精度训练

python
from tensorflow.keras import mixed_precision

# 设置混合精度策略
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

# 创建模型
model = create_model()

# 输出层使用 float32
output_layer = model.layers[-1]
output_layer = mixed_precision.LossScaleOptimizer(output_layer)

# 编译模型
optimizer = optimizers.Adam()
optimizer = mixed_precision.LossScaleOptimizer(optimizer)

model.compile(
    optimizer=optimizer,
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

🎓 学习路径

  1. Keras 基础(2周)

    • Sequential 和 Functional API
    • 数据加载和预处理
    • 简单神经网络
  2. 深度学习模型(3-4周)

    • CNN
    • RNN/LSTM/GRU
    • 迁移学习
  3. 高级技术(3-4周)

    • 自定义层和模型
    • 数据增强
    • 模型优化
  4. 实战项目(4-6周)

    • 图像分类
    • 目标检测
    • 文本生成

📖 推荐资源

  • 官方文档

  • 推荐书籍

    • 《Deep Learning with Python》- François Chollet
    • 《Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow》- Aurélien Géron
  • 在线课程

    • TensorFlow 官方教程
    • Coursera - Deep Learning Specialization

掌握 TensorFlow/Keras,构建强大的深度学习应用!

上次更新于: