Skip to content

6.3 视频生成

学习时长:2-3 周

视频生成是多模态 AI 的前沿领域,涵盖文本生成视频、图像动画化、视频编辑等技术。本节介绍从基础的视频处理到最新的 AI 视频生成模型的完整技术栈。


6.3.1 视频处理基础

核心概念

概念说明常见值
帧率(FPS)每秒帧数24(电影)、30(视频)、60(游戏)
分辨率视频尺寸(宽×高)720p (1280×720)、1080p (1920×1080)、4K (3840×2160)
编码格式视频压缩方式H.264、H.265/HEVC、VP9、AV1
容器格式视频文件格式MP4、AVI、MOV、MKV、WebM
比特率数据传输速率1-10 Mbps(标清)、10-50 Mbps(高清)

视频生成任务分类

文本生成视频(Text-to-Video)
├── 短视频生成(<10秒)
├── 长视频生成(>10秒)
└── 可控生成(风格、动作、镜头)

图像动画化(Image Animation)
├── 单图转视频
├── 人物动画(如数字人)
└── 物体运动

视频编辑(Video Editing)
├── 视频修复/增强
├── 风格迁移
├── 对象替换/移除
└── 时序编辑

视频理解(Video Understanding)
├── 视频描述生成
├── 视频问答
└── 动作识别

1. 视频处理工具库

python
# pip install opencv-python moviepy pillow numpy

import cv2
import numpy as np
from moviepy.editor import VideoFileClip, ImageSequenceClip, concatenate_videoclips
from PIL import Image
from pathlib import Path
from typing import List, Tuple

class VideoProcessor:
    """视频处理工具类"""
    
    @staticmethod
    def extract_frames(
        video_path: str,
        output_dir: str = "./frames",
        fps: int = None,
        max_frames: int = None
    ) -> List[str]:
        """
        提取视频帧
        
        Args:
            video_path: 视频路径
            output_dir: 输出目录
            fps: 提取帧率(None 表示原始帧率)
            max_frames: 最大帧数
        
        Returns:
            帧图像路径列表
        """
        Path(output_dir).mkdir(parents=True, exist_ok=True)
        
        cap = cv2.VideoCapture(video_path)
        original_fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        print(f"📹 视频信息:")
        print(f"  - 总帧数: {total_frames}")
        print(f"  - 原始帧率: {original_fps:.2f} FPS")
        
        # 计算采样间隔
        if fps:
            frame_interval = int(original_fps / fps)
        else:
            frame_interval = 1
        
        frame_paths = []
        frame_count = 0
        saved_count = 0
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            # 按间隔采样
            if frame_count % frame_interval == 0:
                frame_path = Path(output_dir) / f"frame_{saved_count:06d}.jpg"
                cv2.imwrite(str(frame_path), frame)
                frame_paths.append(str(frame_path))
                saved_count += 1
                
                if max_frames and saved_count >= max_frames:
                    break
            
            frame_count += 1
        
        cap.release()
        
        print(f"✅ 提取了 {saved_count} 帧")
        return frame_paths
    
    @staticmethod
    def frames_to_video(
        frame_paths: List[str],
        output_path: str,
        fps: int = 30,
        codec: str = "mp4v"
    ):
        """
        将帧序列合成视频
        
        Args:
            frame_paths: 帧图像路径列表
            output_path: 输出视频路径
            fps: 帧率
            codec: 编码器("mp4v", "avc1", "XVID")
        """
        if not frame_paths:
            raise ValueError("帧列表为空")
        
        # 读取第一帧获取尺寸
        first_frame = cv2.imread(frame_paths[0])
        height, width = first_frame.shape[:2]
        
        # 创建视频写入器
        fourcc = cv2.VideoWriter_fourcc(*codec)
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        print(f"🎬 合成视频:{len(frame_paths)} 帧 @ {fps} FPS")
        
        for i, frame_path in enumerate(frame_paths):
            frame = cv2.imread(frame_path)
            out.write(frame)
            
            if (i + 1) % 100 == 0:
                print(f"  进度: {i + 1}/{len(frame_paths)}")
        
        out.release()
        print(f"✅ 视频已保存: {output_path}")
    
    @staticmethod
    def get_video_info(video_path: str) -> dict:
        """获取视频信息"""
        cap = cv2.VideoCapture(video_path)
        
        info = {
            "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
            "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
            "fps": cap.get(cv2.CAP_PROP_FPS),
            "frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
            "duration": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) / cap.get(cv2.CAP_PROP_FPS)
        }
        
        cap.release()
        return info
    
    @staticmethod
    def resize_video(
        input_path: str,
        output_path: str,
        width: int = None,
        height: int = None,
        scale: float = None
    ):
        """
        调整视频尺寸
        
        Args:
            input_path: 输入视频
            output_path: 输出视频
            width, height: 目标尺寸(优先)
            scale: 缩放比例(如 0.5 表示缩小一半)
        """
        clip = VideoFileClip(input_path)
        
        if width and height:
            resized = clip.resize((width, height))
        elif scale:
            resized = clip.resize(scale)
        else:
            raise ValueError("必须指定 width/height 或 scale")
        
        resized.write_videofile(output_path, codec="libx264")
        print(f"✅ 视频已调整尺寸: {output_path}")
    
    @staticmethod
    def trim_video(
        input_path: str,
        output_path: str,
        start_time: float,
        end_time: float
    ):
        """
        裁剪视频
        
        Args:
            input_path: 输入视频
            output_path: 输出视频
            start_time: 开始时间(秒)
            end_time: 结束时间(秒)
        """
        clip = VideoFileClip(input_path)
        trimmed = clip.subclip(start_time, end_time)
        trimmed.write_videofile(output_path, codec="libx264")
        print(f"✅ 视频已裁剪: {output_path}")
    
    @staticmethod
    def concatenate_videos(
        video_paths: List[str],
        output_path: str
    ):
        """
        拼接多个视频
        
        Args:
            video_paths: 视频路径列表
            output_path: 输出视频路径
        """
        clips = [VideoFileClip(path) for path in video_paths]
        final_clip = concatenate_videoclips(clips)
        final_clip.write_videofile(output_path, codec="libx264")
        print(f"✅ 视频已拼接: {output_path}")

# 使用示例
processor = VideoProcessor()

# 获取视频信息
info = processor.get_video_info("input.mp4")
print(f"视频信息: {info}")

# 提取帧
frames = processor.extract_frames(
    "input.mp4",
    output_dir="./frames",
    fps=10,  # 每秒提取 10 帧
    max_frames=100
)

# 帧合成视频
processor.frames_to_video(
    frames,
    "output.mp4",
    fps=30
)

# 调整尺寸
processor.resize_video(
    "input.mp4",
    "resized.mp4",
    width=1280,
    height=720
)

# 裁剪视频
processor.trim_video(
    "input.mp4",
    "trimmed.mp4",
    start_time=10.0,
    end_time=20.0
)

6.3.2 文本生成视频(Text-to-Video)

1. 使用 ModelScope(阿里开源)

python
# pip install modelscope torch diffusers transformers accelerate

from modelscope.pipelines import pipeline
from modelscope.outputs import OutputKeys
import torch

class TextToVideo:
    """文本生成视频(ModelScope)"""
    
    def __init__(self):
        """
        初始化 ModelScope 文生视频模型
        注意:需要较大显存(建议 16GB+)
        """
        print("🔧 加载 ModelScope 文生视频模型...")
        
        self.pipe = pipeline(
            task='text-to-video-synthesis',
            model='damo/text-to-video-synthesis',
            device='cuda' if torch.cuda.is_available() else 'cpu'
        )
        
        print("✅ 模型加载完成")
    
    def generate(
        self,
        prompt: str,
        output_path: str = "generated_video.mp4",
        num_frames: int = 16,
        fps: int = 8
    ):
        """
        生成视频
        
        Args:
            prompt: 文本提示词
            output_path: 输出视频路径
            num_frames: 生成帧数(16 或 32)
            fps: 帧率
        """
        print(f"🎬 生成视频:{prompt}")
        
        # 生成
        output = self.pipe(
            prompt,
            num_frames=num_frames
        )
        
        # 获取视频
        video = output[OutputKeys.OUTPUT_VIDEO]
        
        # 保存
        from moviepy.editor import ImageSequenceClip
        clip = ImageSequenceClip(list(video), fps=fps)
        clip.write_videofile(output_path, codec="libx264")
        
        print(f"✅ 视频已保存: {output_path}")

# 使用示例
t2v = TextToVideo()

# 生成视频
t2v.generate(
    prompt="A cat playing with a ball in the garden, sunny day, high quality",
    output_path="cat_playing.mp4",
    num_frames=16,
    fps=8
)

2. 使用 Stable Video Diffusion

python
# pip install diffusers transformers accelerate

from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
import torch

class StableVideoDiffusion:
    """Stable Video Diffusion(图像转视频)"""
    
    def __init__(self):
        """
        初始化 SVD 模型
        模型:stabilityai/stable-video-diffusion-img2vid-xt
        """
        print("🔧 加载 Stable Video Diffusion 模型...")
        
        self.pipe = StableVideoDiffusionPipeline.from_pretrained(
            "stabilityai/stable-video-diffusion-img2vid-xt",
            torch_dtype=torch.float16,
            variant="fp16"
        )
        
        self.pipe.enable_model_cpu_offload()  # 内存优化
        
        print("✅ 模型加载完成")
    
    def image_to_video(
        self,
        image_path: str,
        output_path: str = "animated.mp4",
        num_frames: int = 25,
        fps: int = 7,
        motion_bucket_id: int = 127,
        noise_aug_strength: float = 0.02
    ):
        """
        图像转视频
        
        Args:
            image_path: 输入图像路径
            output_path: 输出视频路径
            num_frames: 生成帧数(14-25)
            fps: 帧率
            motion_bucket_id: 运动幅度(0-255,越大运动越剧烈)
            noise_aug_strength: 噪声增强强度
        """
        print(f"🎬 图像转视频:{image_path}")
        
        # 加载图像
        image = load_image(image_path)
        image = image.resize((1024, 576))  # SVD 推荐尺寸
        
        # 生成视频帧
        frames = self.pipe(
            image,
            decode_chunk_size=8,
            num_frames=num_frames,
            motion_bucket_id=motion_bucket_id,
            noise_aug_strength=noise_aug_strength
        ).frames[0]
        
        # 导出视频
        export_to_video(frames, output_path, fps=fps)
        
        print(f"✅ 视频已保存: {output_path}")

# 使用示例
svd = StableVideoDiffusion()

# 图像转视频
svd.image_to_video(
    image_path="portrait.jpg",
    output_path="animated_portrait.mp4",
    num_frames=25,
    motion_bucket_id=127  # 中等运动幅度
)

3. 使用 AnimateDiff(开源方案)

python
# pip install diffusers transformers accelerate

from diffusers import MotionAdapter, AnimateDiffPipeline, DDIMScheduler
from diffusers.utils import export_to_video
import torch

class AnimateDiffGenerator:
    """AnimateDiff 视频生成"""
    
    def __init__(self):
        """
        初始化 AnimateDiff
        基于 Stable Diffusion + 运动模块
        """
        print("🔧 加载 AnimateDiff 模型...")
        
        # 加载运动适配器
        adapter = MotionAdapter.from_pretrained(
            "guoyww/animatediff-motion-adapter-v1-5-2",
            torch_dtype=torch.float16
        )
        
        # 加载基础模型
        self.pipe = AnimateDiffPipeline.from_pretrained(
            "runwayml/stable-diffusion-v1-5",
            motion_adapter=adapter,
            torch_dtype=torch.float16
        )
        
        # 使用 DDIM 调度器
        self.pipe.scheduler = DDIMScheduler.from_config(
            self.pipe.scheduler.config,
            beta_schedule="linear",
            steps_offset=1
        )
        
        self.pipe.enable_vae_slicing()
        self.pipe.enable_model_cpu_offload()
        
        print("✅ 模型加载完成")
    
    def generate(
        self,
        prompt: str,
        negative_prompt: str = None,
        output_path: str = "animated.mp4",
        num_frames: int = 16,
        fps: int = 8,
        num_inference_steps: int = 25,
        guidance_scale: float = 7.5
    ):
        """
        生成动画视频
        
        Args:
            prompt: 正向提示词
            negative_prompt: 负向提示词
            output_path: 输出路径
            num_frames: 帧数
            fps: 帧率
            num_inference_steps: 推理步数
            guidance_scale: 引导强度
        """
        print(f"🎬 生成动画:{prompt}")
        
        # 生成
        output = self.pipe(
            prompt=prompt,
            negative_prompt=negative_prompt,
            num_frames=num_frames,
            num_inference_steps=num_inference_steps,
            guidance_scale=guidance_scale
        )
        
        frames = output.frames[0]
        
        # 导出视频
        export_to_video(frames, output_path, fps=fps)
        
        print(f"✅ 视频已保存: {output_path}")

# 使用示例
animator = AnimateDiffGenerator()

# 生成动画
animator.generate(
    prompt="A beautiful sunset over the ocean, waves gently moving, cinematic",
    negative_prompt="static, blurry, low quality",
    output_path="sunset_animation.mp4",
    num_frames=16,
    fps=8
)

6.3.3 数字人视频生成

1. 使用 SadTalker(数字人口型同步)

python
# pip install sadtalker

from sadtalker import SadTalker
import torch

class DigitalHuman:
    """数字人视频生成(SadTalker)"""
    
    def __init__(self):
        """
        初始化 SadTalker
        功能:让静态人脸图像说话
        """
        print("🔧 加载 SadTalker 模型...")
        
        self.sad_talker = SadTalker(
            checkpoint_path='checkpoints',
            config_path='src/config',
            device='cuda' if torch.cuda.is_available() else 'cpu'
        )
        
        print("✅ 模型加载完成")
    
    def generate_talking_video(
        self,
        source_image: str,
        driven_audio: str,
        output_path: str = "talking_video.mp4",
        preprocess: str = "crop",
        still_mode: bool = False,
        use_enhancer: bool = False
    ):
        """
        生成说话视频
        
        Args:
            source_image: 人脸图像路径
            driven_audio: 驱动音频路径
            output_path: 输出视频路径
            preprocess: 预处理方式("crop", "resize", "full")
            still_mode: 静态模式(减少头部运动)
            use_enhancer: 使用面部增强器
        """
        print(f"🎬 生成数字人视频")
        print(f"  - 图像: {source_image}")
        print(f"  - 音频: {driven_audio}")
        
        # 生成
        self.sad_talker.test(
            source_image=source_image,
            driven_audio=driven_audio,
            save_dir=output_path,
            preprocess=preprocess,
            still=still_mode,
            use_enhancer=use_enhancer
        )
        
        print(f"✅ 视频已保存: {output_path}")

# 使用示例(需要先下载模型)
# digital_human = DigitalHuman()

# digital_human.generate_talking_video(
#     source_image="portrait.jpg",
#     driven_audio="speech.wav",
#     output_path="talking_portrait.mp4",
#     still_mode=True  # 减少头部运动
# )

2. 使用 Wav2Lip(精确口型同步)

python
# pip install opencv-python librosa

import cv2
import numpy as np
from typing import List

class Wav2LipGenerator:
    """Wav2Lip 口型同步"""
    
    def __init__(self, checkpoint_path: str):
        """
        初始化 Wav2Lip
        
        Args:
            checkpoint_path: 模型权重路径
        """
        import torch
        from models import Wav2Lip
        
        print("🔧 加载 Wav2Lip 模型...")
        
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        
        # 加载模型
        self.model = Wav2Lip()
        checkpoint = torch.load(checkpoint_path, map_location=self.device)
        self.model.load_state_dict(checkpoint["state_dict"])
        self.model = self.model.to(self.device)
        self.model.eval()
        
        print("✅ 模型加载完成")
    
    def sync_lips(
        self,
        video_path: str,
        audio_path: str,
        output_path: str = "synced_video.mp4",
        face_det_batch_size: int = 16,
        wav2lip_batch_size: int = 128
    ):
        """
        同步口型
        
        Args:
            video_path: 输入视频路径
            audio_path: 音频路径
            output_path: 输出视频路径
            face_det_batch_size: 人脸检测批次大小
            wav2lip_batch_size: Wav2Lip 批次大小
        """
        print(f"🎬 同步口型")
        print(f"  - 视频: {video_path}")
        print(f"  - 音频: {audio_path}")
        
        # 实际实现需要完整的 Wav2Lip 推理代码
        # 这里提供简化的接口示例
        
        print(f"✅ 视频已保存: {output_path}")

# 使用示例(需要下载 Wav2Lip 模型)
# wav2lip = Wav2LipGenerator(checkpoint_path="wav2lip.pth")

# wav2lip.sync_lips(
#     video_path="original_video.mp4",
#     audio_path="new_audio.wav",
#     output_path="lip_synced.mp4"
# )

6.3.4 视频编辑与增强

1. 视频风格迁移

python
import torch
import torchvision.transforms as transforms
from PIL import Image
import cv2

class VideoStyleTransfer:
    """视频风格迁移"""
    
    def __init__(self, style_image_path: str):
        """
        初始化风格迁移
        
        Args:
            style_image_path: 风格图像路径
        """
        print("🔧 加载风格迁移模型...")
        
        # 使用预训练的风格迁移模型
        # 实际应用中可以使用 PyTorch Hub 的模型
        self.model = torch.hub.load(
            'pytorch/vision:v0.10.0',
            'vgg19',
            pretrained=True
        ).features
        
        self.style_image = self._load_image(style_image_path)
        
        print("✅ 模型加载完成")
    
    def _load_image(self, image_path: str) -> torch.Tensor:
        """加载并预处理图像"""
        image = Image.open(image_path).convert('RGB')
        
        transform = transforms.Compose([
            transforms.Resize((512, 512)),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])
        
        return transform(image).unsqueeze(0)
    
    def transfer_video(
        self,
        input_video: str,
        output_video: str,
        style_weight: float = 1e6,
        content_weight: float = 1.0
    ):
        """
        对视频应用风格迁移
        
        Args:
            input_video: 输入视频路径
            output_video: 输出视频路径
            style_weight: 风格权重
            content_weight: 内容权重
        """
        print(f"🎨 应用风格迁移: {input_video}")
        
        # 提取帧
        processor = VideoProcessor()
        frames = processor.extract_frames(input_video, output_dir="./temp_frames")
        
        # 对每帧应用风格迁移
        styled_frames = []
        for i, frame_path in enumerate(frames):
            print(f"  处理帧 {i+1}/{len(frames)}")
            
            # 风格迁移(简化示例)
            styled_frame = self._transfer_style(frame_path)
            styled_frames.append(styled_frame)
        
        # 合成视频
        processor.frames_to_video(styled_frames, output_video, fps=30)
        
        print(f"✅ 风格迁移完成: {output_video}")
    
    def _transfer_style(self, frame_path: str) -> str:
        """对单帧应用风格迁移(简化版)"""
        # 实际实现需要完整的神经风格迁移算法
        # 这里返回原帧路径作为示例
        return frame_path

# 使用示例
# style_transfer = VideoStyleTransfer(style_image_path="starry_night.jpg")
# style_transfer.transfer_video(
#     input_video="original.mp4",
#     output_video="stylized.mp4"
# )

2. 视频超分辨率(增强画质)

python
# pip install basicsr realesrgan

from basicsr.archs.rrdbnet_arch import RRDBNet
from realesrgan import RealESRGANer
import cv2
import numpy as np

class VideoUpscaler:
    """视频超分辨率"""
    
    def __init__(self, model_name: str = "RealESRGAN_x4plus"):
        """
        初始化超分辨率模型
        
        Args:
            model_name: 模型名称
                - RealESRGAN_x4plus: 4倍放大(通用)
                - RealESRGAN_x4plus_anime_6B: 4倍放大(动漫)
        """
        print(f"🔧 加载 {model_name} 模型...")
        
        # 创建模型
        model = RRDBNet(
            num_in_ch=3,
            num_out_ch=3,
            num_feat=64,
            num_block=23,
            num_grow_ch=32,
            scale=4
        )
        
        # 创建增强器
        self.upsampler = RealESRGANer(
            scale=4,
            model_path=f'weights/{model_name}.pth',
            model=model,
            tile=0,
            tile_pad=10,
            pre_pad=0,
            half=True  # FP16 加速
        )
        
        print("✅ 模型加载完成")
    
    def upscale_video(
        self,
        input_video: str,
        output_video: str,
        scale: int = 4
    ):
        """
        视频超分辨率
        
        Args:
            input_video: 输入视频路径
            output_video: 输出视频路径
            scale: 放大倍数
        """
        print(f"📈 视频超分辨率: {input_video}")
        
        # 打开视频
        cap = cv2.VideoCapture(input_video)
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # 创建输出视频
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(
            output_video,
            fourcc,
            fps,
            (width * scale, height * scale)
        )
        
        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            # 超分辨率
            output, _ = self.upsampler.enhance(frame, outscale=scale)
            out.write(output)
            
            frame_count += 1
            if frame_count % 30 == 0:
                print(f"  进度: {frame_count}/{total_frames}")
        
        cap.release()
        out.release()
        
        print(f"✅ 视频已增强: {output_video}")

# 使用示例
upscaler = VideoUpscaler(model_name="RealESRGAN_x4plus")
upscaler.upscale_video(
    input_video="low_res.mp4",
    output_video="high_res.mp4",
    scale=4
)

6.3.5 视频理解与描述

1. 视频描述生成

python
# pip install transformers torch

from transformers import VivitImageProcessor, VivitForVideoClassification
import torch
import cv2
import numpy as np

class VideoUnderstanding:
    """视频理解"""
    
    def __init__(self):
        """初始化视频理解模型"""
        print("🔧 加载视频理解模型...")
        
        self.processor = VivitImageProcessor.from_pretrained(
            "google/vivit-b-16x2-kinetics400"
        )
        self.model = VivitForVideoClassification.from_pretrained(
            "google/vivit-b-16x2-kinetics400"
        )
        
        print("✅ 模型加载完成")
    
    def classify_action(self, video_path: str, top_k: int = 5) -> list:
        """
        识别视频中的动作
        
        Args:
            video_path: 视频路径
            top_k: 返回前 k 个结果
        
        Returns:
            [{"action": "running", "score": 0.95}, ...]
        """
        print(f"🎬 分析视频动作: {video_path}")
        
        # 提取视频帧
        frames = self._extract_frames(video_path, num_frames=32)
        
        # 预处理
        inputs = self.processor(list(frames), return_tensors="pt")
        
        # 推理
        with torch.no_grad():
            outputs = self.model(**inputs)
            logits = outputs.logits
        
        # 解析结果
        probs = torch.nn.functional.softmax(logits, dim=-1)[0]
        top_probs, top_indices = torch.topk(probs, top_k)
        
        results = []
        for prob, idx in zip(top_probs, top_indices):
            action = self.model.config.id2label[idx.item()]
            results.append({
                "action": action,
                "score": prob.item()
            })
        
        return results
    
    def _extract_frames(
        self,
        video_path: str,
        num_frames: int = 32
    ) -> np.ndarray:
        """均匀提取视频帧"""
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # 计算采样间隔
        indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
        
        frames = []
        for idx in indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()
            if ret:
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames.append(frame)
        
        cap.release()
        return np.array(frames)

# 使用示例
video_understanding = VideoUnderstanding()

# 识别动作
actions = video_understanding.classify_action("sports_video.mp4")

print("🎯 识别到的动作:")
for action in actions:
    print(f"  • {action['action']}: {action['score']:.2%}")

2. 视频问答(Video QA)

python
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
from openai import OpenAI

class VideoQA:
    """视频问答系统"""
    
    def __init__(self):
        """初始化视频问答系统"""
        self.video_understanding = VideoUnderstanding()
        self.llm_client = OpenAI()
    
    def answer_question(
        self,
        video_path: str,
        question: str
    ) -> str:
        """
        回答关于视频的问题
        
        Args:
            video_path: 视频路径
            question: 问题
        
        Returns:
            答案
        """
        print(f"❓ 问题: {question}")
        
        # 1. 分析视频内容
        actions = self.video_understanding.classify_action(video_path, top_k=3)
        
        # 2. 构建上下文
        context = "视频中识别到的动作:\n"
        for action in actions:
            context += f"- {action['action']} (置信度: {action['score']:.2%})\n"
        
        # 3. 调用 LLM 回答
        prompt = f"""基于以下视频分析结果,回答问题。

{context}

问题:{question}

请给出简洁明了的答案:"""
        
        response = self.llm_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )
        
        answer = response.choices[0].message.content
        
        print(f"💡 答案: {answer}")
        return answer

# 使用示例
video_qa = VideoQA()

# 视频问答
answer = video_qa.answer_question(
    video_path="basketball_game.mp4",
    question="视频中的人在做什么运动?"
)

6.3.6 实战案例:自动视频摘要生成器

python
"""
综合案例:自动视频摘要生成器
功能:
1. 视频场景分割
2. 关键帧提取
3. 场景描述生成
4. 摘要视频合成
"""

from typing import List, Dict
import numpy as np
from pathlib import Path

class VideoSummarizer:
    """视频摘要生成器"""
    
    def __init__(self):
        """初始化"""
        print("🔧 初始化视频摘要生成器...")
        
        self.processor = VideoProcessor()
        self.video_understanding = VideoUnderstanding()
        
        print("✅ 初始化完成")
    
    def generate_summary(
        self,
        video_path: str,
        output_dir: str = "./video_summary",
        summary_ratio: float = 0.2
    ) -> Dict:
        """
        生成视频摘要
        
        Args:
            video_path: 输入视频路径
            output_dir: 输出目录
            summary_ratio: 摘要比例(0.2 表示保留 20% 的内容)
        
        Returns:
            摘要信息字典
        """
        print(f"\n📹 生成视频摘要: {video_path}")
        print(f"摘要比例: {summary_ratio * 100:.0f}%\n")
        
        Path(output_dir).mkdir(parents=True, exist_ok=True)
        
        # 1. 场景分割
        print("🎬 步骤 1/4:场景分割")
        scenes = self._detect_scenes(video_path)
        print(f"✅ 检测到 {len(scenes)} 个场景\n")
        
        # 2. 场景评分
        print("⭐ 步骤 2/4:场景评分")
        scored_scenes = self._score_scenes(video_path, scenes)
        print(f"✅ 场景评分完成\n")
        
        # 3. 选择关键场景
        print("🎯 步骤 3/4:选择关键场景")
        key_scenes = self._select_key_scenes(
            scored_scenes,
            ratio=summary_ratio
        )
        print(f"✅ 选择了 {len(key_scenes)} 个关键场景\n")
        
        # 4. 生成摘要视频
        print("🎥 步骤 4/4:生成摘要视频")
        summary_path = self._create_summary_video(
            video_path,
            key_scenes,
            output_dir
        )
        print(f"✅ 摘要视频已生成: {summary_path}\n")
        
        # 5. 生成文字描述
        print("📝 生成文字描述...")
        description = self._generate_description(key_scenes)
        
        # 保存描述
        desc_path = Path(output_dir) / "summary_description.txt"
        with open(desc_path, "w", encoding="utf-8") as f:
            f.write(description)
        
        print(f"✅ 描述已保存: {desc_path}")
        
        return {
            "summary_video": summary_path,
            "description": description,
            "num_scenes": len(scenes),
            "num_key_scenes": len(key_scenes),
            "compression_ratio": len(key_scenes) / len(scenes)
        }
    
    def _detect_scenes(self, video_path: str) -> List[Dict]:
        """
        检测场景切换
        
        Returns:
            [{"start_frame": 0, "end_frame": 100}, ...]
        """
        cap = cv2.VideoCapture(video_path)
        
        scenes = []
        prev_frame = None
        scene_start = 0
        frame_idx = 0
        threshold = 30.0  # 场景切换阈值
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            if prev_frame is not None:
                # 计算帧差异
                diff = cv2.absdiff(frame, prev_frame)
                diff_score = np.mean(diff)
                
                # 检测场景切换
                if diff_score > threshold:
                    scenes.append({
                        "start_frame": scene_start,
                        "end_frame": frame_idx - 1
                    })
                    scene_start = frame_idx
            
            prev_frame = frame
            frame_idx += 1
        
        # 添加最后一个场景
        if scene_start < frame_idx:
            scenes.append({
                "start_frame": scene_start,
                "end_frame": frame_idx - 1
            })
        
        cap.release()
        return scenes
    
    def _score_scenes(
        self,
        video_path: str,
        scenes: List[Dict]
    ) -> List[Dict]:
        """
        为场景评分(基于动作识别)
        
        Returns:
            [{"start_frame": 0, "end_frame": 100, "score": 0.85}, ...]
        """
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        
        scored_scenes = []
        
        for scene in scenes:
            # 提取场景中间帧
            mid_frame = (scene["start_frame"] + scene["end_frame"]) // 2
            cap.set(cv2.CAP_PROP_POS_FRAMES, mid_frame)
            ret, frame = cap.read()
            
            if ret:
                # 简化评分:基于帧的清晰度和对比度
                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                sharpness = cv2.Laplacian(gray, cv2.CV_64F).var()
                contrast = gray.std()
                
                # 综合评分
                score = (sharpness / 1000 + contrast / 100) / 2
                score = min(score, 1.0)
            else:
                score = 0.0
            
            scored_scenes.append({
                **scene,
                "score": score,
                "duration": (scene["end_frame"] - scene["start_frame"]) / fps
            })
        
        cap.release()
        return scored_scenes
    
    def _select_key_scenes(
        self,
        scored_scenes: List[Dict],
        ratio: float
    ) -> List[Dict]:
        """选择关键场景"""
        # 按评分排序
        sorted_scenes = sorted(
            scored_scenes,
            key=lambda x: x["score"],
            reverse=True
        )
        
        # 选择前 N 个场景
        num_key_scenes = max(1, int(len(sorted_scenes) * ratio))
        key_scenes = sorted_scenes[:num_key_scenes]
        
        # 按时间顺序排序
        key_scenes.sort(key=lambda x: x["start_frame"])
        
        return key_scenes
    
    def _create_summary_video(
        self,
        video_path: str,
        key_scenes: List[Dict],
        output_dir: str
    ) -> str:
        """创建摘要视频"""
        from moviepy.editor import VideoFileClip, concatenate_videoclips
        
        # 加载原视频
        full_clip = VideoFileClip(video_path)
        fps = full_clip.fps
        
        # 提取关键场景片段
        clips = []
        for scene in key_scenes:
            start_time = scene["start_frame"] / fps
            end_time = scene["end_frame"] / fps
            
            clip = full_clip.subclip(start_time, end_time)
            clips.append(clip)
        
        # 拼接
        final_clip = concatenate_videoclips(clips)
        
        # 保存
        output_path = Path(output_dir) / "summary_video.mp4"
        final_clip.write_videofile(
            str(output_path),
            codec="libx264",
            audio_codec="aac"
        )
        
        return str(output_path)
    
    def _generate_description(self, key_scenes: List[Dict]) -> str:
        """生成文字描述"""
        description = f"# 视频摘要\n\n"
        description += f"共选择了 {len(key_scenes)} 个关键场景:\n\n"
        
        for i, scene in enumerate(key_scenes, 1):
            description += f"## 场景 {i}\n"
            description += f"- 时间: {scene['duration']:.2f}\n"
            description += f"- 评分: {scene['score']:.2f}\n\n"
        
        return description

# 使用示例
summarizer = VideoSummarizer()

# 生成摘要
result = summarizer.generate_summary(
    video_path="long_video.mp4",
    output_dir="./summary_output",
    summary_ratio=0.2  # 保留 20% 的内容
)

print("\n" + "="*60)
print("📊 摘要统计:")
print(f"原始场景数: {result['num_scenes']}")
print(f"关键场景数: {result['num_key_scenes']}")
print(f"压缩比例: {result['compression_ratio']:.1%}")
print(f"摘要视频: {result['summary_video']}")
print("="*60)

6.3.7 学习资源

推荐论文

开源项目

  • Stable Video Diffusion: Stability AI 官方实现
  • AnimateDiff: 社区活跃的动画生成项目
  • SadTalker: 数字人口型同步
  • Wav2Lip: 精确口型同步

工具与平台

  • Runway ML: 在线视频编辑平台
  • Pika Labs: AI 视频生成服务
  • HeyGen: 数字人视频生成平台

实战练习

  1. 实现视频自动剪辑系统(场景检测 + 关键帧提取)
  2. 构建数字人播报系统(TTS + 口型同步)
  3. 开发视频风格迁移应用(风格化处理)
  4. 实现视频摘要生成器(场景分析 + 智能剪辑)

关键要点

  • 显存需求大:视频生成模型通常需要 16GB+ 显存
  • 生成速度慢:视频生成比图像生成慢得多
  • 质量控制难:时序一致性是主要挑战
  • 分辨率权衡:更高分辨率需要更多资源
  • 帧率选择:8-15 FPS 适合 AI 生成,30 FPS 适合真实视频

性能优化

  • 使用模型量化(FP16、INT8)
  • 批量处理帧(提升吞吐量)
  • GPU 内存管理(enable_model_cpu_offload)
  • 分段处理长视频(避免内存溢出)

常见问题

  • ❌ 时序不连贯(闪烁、跳跃)
  • ❌ 运动模糊(快速运动场景)
  • ❌ 人物变形(数字人生成)
  • ❌ 口型不同步(需要专门的口型同步模型)

技术趋势

  • 更长的视频生成(从几秒到几分钟)
  • 更高的分辨率(4K、8K)
  • 更强的可控性(精确控制运动、镜头)
  • 实时生成(降低延迟)
  • 多模态融合(视频 + 音频 + 文本)

下一步 完成本章学习后,你已经掌握了多模态 AI 的三大核心技术:视觉、语音、视频。建议综合实战项目:

  1. 多模态内容创作平台(文生图 + 文生视频 + 配音)
  2. 智能视频编辑助手(自动剪辑 + 字幕 + 配乐)
  3. 数字人直播系统(实时口型同步 + 语音驱动)

坚持是一种品格