企业云盘数据迁移实战:如何安全迁移100TB数据而不丢一行记录

企业云盘数据迁移实战:如何安全迁移100TB数据而不丢一行记录

前言

2024年双十一前一天夜里三点,某电商公司的技术负责人老张给我打了一通电话。电话那头的声音带着压抑的焦虑:"我们换了企业云盘,新旧系统数据对不上,差了8GB。业务明天就要用,你能不能告诉我哪里出了问题?"

这不是我第一次遇到这种情况。在过去三年里,我直接或间接参与了超过二十次企业云盘数据迁移项目,见过各种匪夷所思的问题:文件校验和算错导致重复迁移、符号链接循环导致迁移程序死循环、某些特殊字符的文件名在不同系统间编码不一致、元数据(修改时间、所有者权限)丢失导致法律合规问题、迁移过程中新文件还在写入导致数据不一致……

每一次迁移都是一场外科手术级别的精密操作。本文把我踩过的坑、总结的方法论、以及生产级迁移工具的核心代码,全部摊开来讲。如果你正在准备一次企业云盘迁移,这篇文章能让你少走至少两个月的弯路。


第一章 迁移前必须想清楚的七个问题

1.1 问题一:100TB数据到底是什么概念

很多人对数据量缺乏直观感受。让我来换算一下:

  • 100TB = 102,400GB
  • 以千兆网络传输,理论速度125MB/s,实际稳定速度约80MB/s
  • 理论最短时间:100TB ÷ 80MB/s = 1,280,000秒 ≈ 14.8天
  • 以万兆网络传输,理论速度1250MB/s,实际稳定速度约800MB/s
  • 理论最短时间:100TB ÷ 800MB/s = 128,000秒 ≈ 1.48天

但这只是理论值。实际生产环境中,100TB数据的迁移通常需要2-4周的窗口期。原因如下:

  1. 业务不能停,但迁移窗口有限(通常是凌晨或周末)
  2. 迁移过程中需要校验,校验是IO密集型操作,会拖慢传输
  3. 网络带宽不是独占的,其他业务也在用
  4. 迁移不是一次性完成,而是增量迁移——先迁历史数据,再增量同步

我见过最夸张的案例是:一家设计公司声称迁移100TB数据,结果清理完重复文件、压缩完历史版本后,实际需要迁移的数据只有35TB,节省了65%的迁移时间和成本。

迁移前第一步:清理数据。 很多企业云盘里存着三五年前的过期文件、重复文件的多个版本、测试环境的垃圾数据。先把仓库清理干净,能省下大量时间和金钱。

1.2 问题二:你的数据是什么类型

企业云盘里的数据不是铁板一块,按特征分:

数据类型 特征 迁移难度
结构化文件(文档、图片、视频) 大小均匀,可并行
小文件集合(代码、配置文件) 数量多,单个<1MB
大文件(原始视频、工程图纸) 单个>10GB
数据库(SQLite/MongoDB元数据) 强一致性强依赖
历史版本快照 版本链复杂,差异存储
符号链接/快捷方式 依赖关系复杂

如果你的云盘里小文件占比超过60%,迁移速度会显著下降——因为每个文件都需要建立连接、认证、传输、确认,大量的TCP握手和ACK确认会吃掉带宽。

1.3 问题三:迁移窗口期怎么定

企业云盘迁移的黄金窗口是业务低峰期。以下几个时间点通常较好:

  • 凌晨2:00-6:00:大多数企业的业务低谷
  • 周五深夜到周六凌晨:配合周末,窗口最长
  • 法定长假前一天:如国庆、春节前,窗口可达72小时

但迁移窗口不是越长越好。时间越长,累积的新数据越多,增量同步压力越大。

我的经验公式:迁移窗口 = (历史数据量 ÷ 实际传输速度) × 1.5 + 4小时缓冲

假设历史数据100TB,实际传输速度800MB/s:
窗口 = (100×1024×1024÷800)秒 × 1.5 + 14400秒 ≈ 228,000秒 ≈ 63小时 ≈ 2.6天

这意味着你需要一个约3天的迁移窗口。

1.4 问题四:要不要停机迁移

这是最关键的技术决策。

停机迁移:在迁移窗口内暂停所有写入操作,确保迁移期间数据静止。优点是数据一致性有保障,缺点是影响业务。

不停机迁移(双写+增量同步):新旧系统同时运行一段时间,增量数据实时同步到新系统。切换时只需要短暂停止写入(通常几十分钟),然后做最终增量同步。优点是对业务影响小,缺点是技术复杂度高。

对于大多数企业,我建议采用混合策略

  1. 先做历史数据迁移(不停机,增量同步运行)
  2. 业务低峰期做短暂停机(2-4小时),完成最终切换

第二章 迁移方案设计:三条路径的利弊分析

2.1 路径一:官方导出/导入工具

大多数企业云盘服务商会提供官方迁移工具。优点是省心,缺点是:

  • 受限于服务商的格式支持度
  • 无法处理自定义字段和特殊权限
  • 迁移进度不可控,遇到问题只能联系客服
  • 无法增量迁移,必须一次性完成

适用场景:数据量小于1TB,迁移窗口充足,且对元数据完整性要求不高。

2.2 路径二:API脚本迁移

通过源云盘和目标云盘的API读写数据,实现程序化迁移。优点是灵活、可控、可定制;缺点是需要开发工作。

这是大多数技术团队的选择,也是本文重点讨论的方案。

2.3 路径三:存储层直接复制

绕过应用层,直接复制底层存储(对象存储S3协议、NFS等)。优点是速度最快(直接走存储协议,不走HTTP),缺点是:

  • 需要对源和目标存储都有管理员权限
  • 元数据和权限信息可能需要额外处理
  • 风险最高,一旦出错没有应用层的保护

适用场景:同一存储后端之间的迁移(如从MinIO集群A迁移到MinIO集群B),或者有专业存储团队支持。


第三章 踩坑实录:五个真实的迁移事故

3.1 事故一:那个把迁移脚本跑在测试环境的夜晚

某制造业客户的技术团队安排了两名工程师做数据迁移。小李负责写脚本,小王负责执行。

脚本写好后,小王准备在测试环境跑一遍脚本逻辑。结果,他复制粘贴命令时,把测试环境的路径粘贴到了生产环境的SSH终端里。脚本开始执行的那一刻,他还没反应过来——直到看到文件列表刷屏才发现不对。

万幸的是,他们用的是"先读取源文件,只下载不删除"的dry run模式,没有对源数据造成破坏。但测试环境的脚本跑通了生产环境的命令,差点把测试数据给覆盖了。

教训:迁移脚本必须做环境隔离检查。在脚本开头加上:

import os
import socket

# 强制要求设置环境变量作为安全确认
ENV_NAME = os.environ.get('MIGRATION_ENV', '')
if ENV_NAME not in ['PRODUCTION', 'PROD']:
    raise RuntimeError(f"安全检查失败:当前环境为'{ENV_NAME}',不是生产环境!\n"
                       f"请设置 export MIGRATION_ENV=PRODUCTION 后再执行。")

TARGET_HOST = os.environ.get('TARGET_HOST', '')
if '192.168.1' not in TARGET_HOST and 'prod' not in TARGET_HOST.lower():
    raise RuntimeError(f"安全检查失败:目标地址'{TARGET_HOST}'不是生产环境!")

print(f"⚠️  确认在生产环境 [{socket.gethostname()}] 上执行")
print(f"⚠️  迁移目标: {TARGET_HOST}")
print(f"⚠️  5秒后开始执行...")
time.sleep(5)

这个检查救了这位工程师无数次。不是防止误操作,是强制让他确认自己在做什么

3.2 事故二:校验和导致的"数据失踪"事件

开头提到的老张遇到的问题,根本原因就在校验和。

他们用的迁移脚本在传输完文件后,用MD5校验文件完整性。问题出在:源云盘在存储视频文件时,会对大于4GB的文件进行分块存储,每块独立计算MD5。导出时,源系统的API返回的MD5是"合并后"的MD5,但实际文件在传输过程中被分块下载后再合并——合并顺序的问题导致最终文件MD5和原文件MD5不一致。

于是校验失败,脚本把不一致的文件标记为"待重试",但重试时又从同样的分块逻辑下载,形成了一个死循环。最后脚本认为"已全部迁移完成",但实际上有8GB的视频文件在合并过程中发生了轻微损坏,校验没通过却没被发现。

排查这个问题花了两天。解决方案是:对于大文件,不要用整体MD5,改用分块校验 + 文件大小双重确认

import hashlib
import os

def verify_file_chunked(local_path, expected_chunksums, chunk_size=8*1024*1024):
    """
    分块校验大文件。
    
    expected_chunksums: List[Dict], 每块的校验信息
        例如: [{'offset': 0, 'size': 8388608, 'md5': 'abc123...'}, ...]
    """
    file_size = os.path.getsize(local_path)
    
    with open(local_path, 'rb') as f:
        for i, chunk_info in enumerate(expected_chunksums):
            offset = chunk_info['offset']
            expected_md5 = chunk_info['md5']
            expected_size = chunk_info['size']
            
            f.seek(offset)
            data = f.read(min(chunk_size, expected_size))
            
            # 超时保护:读取大小不符
            if len(data) != expected_size:
                return False, f"Chunk {i}: expected {expected_size} bytes, got {len(data)}"
            
            actual_md5 = hashlib.md5(data).hexdigest()
            if actual_md5 != expected_md5:
                return False, f"Chunk {i} MD5 mismatch: expected {expected_md5}, got {actual_md5}"
    
    return True, "OK"

同时,脚本还要记录实际传输的文件大小——对于那些压缩后再存储的视频文件,存储系统会压缩原始文件,但API返回的文件大小是压缩前的大小。传输完成后拿实际文件大小和API返回的文件大小对比,如果差异超过1%,就要报警人工介入。

3.3 事故三:增量同步时忽略的"幽灵文件"

不停机迁移最难处理的是增量同步窗口内的数据变化

我们遇到过这个场景:用户A在源系统里有一份文件夹,里面有100个文件。在迁移开始时,这100个文件的文件名、大小、MD5都被记录了。但在增量同步期间,用户A删除了其中5个文件,又重命名了3个文件,还新建了2个文件。

普通的增量同步只会同步"新建"和"修改"的文件,不会处理"删除"操作。如果不单独记录删除事件,迁移后目标系统会多出5个幽灵文件——用户明明在源系统删除了它们,但它们还在目标系统里。

解决方案:增量同步必须支持操作日志(Changelog)模式

from dataclasses import dataclass
from enum import Enum
from typing import List
from datetime import datetime

class ChangeType(Enum):
    CREATE = "create"
    UPDATE = "update"
    DELETE = "delete"
    RENAME = "rename"

@dataclass
class FileChange:
    path: str
    change_type: ChangeType
    timestamp: datetime
    # DELETE事件时,old_size和old_md5是必要的,用于验证删除操作
    old_size: int = None
    old_md5: str = None
    new_size: int = None
    new_md5: str = None
    # RENAME事件时的旧路径
    old_path: str = None

class IncrementalSyncEngine:
    def __init__(self, source_client, target_client):
        self.source = source_client
        self.target = target_client
        # 维护一份文件清单快照,用于判断删除事件
        self._snapshot = {}
    
    def create_snapshot(self, root_path="/"):
        """建立文件清单快照"""
        print("创建文件快照...")
        self._snapshot.clear()
        
        def walk(path):
            for item in self.source.list_files(path):
                self._snapshot[item['path']] = {
                    'size': item['size'],
                    'md5': item['md5'],
                    'modified': item['modified_at']
                }
                if item['is_dir']:
                    walk(item['path'])
        
        walk(root_path)
        print(f"快照建立完成,共 {len(self._snapshot)} 个文件")
    
    def diff_and_sync(self, dest_path="/"):
        """对比快照与当前状态,生成差异操作并同步"""
        print("计算差异...")
        changes = []
        
        current_state = {}
        def walk_current(path):
            for item in self.source.list_files(path):
                current_state[item['path']] = {
                    'size': item['size'],
                    'md5': item['md5'],
                    'modified': item['modified_at']
                }
                if item['is_dir']:
                    walk_current(item['path'])
        
        walk_current(dest_path)
        
        # 检测新建和修改
        for path, current_info in current_state.items():
            old_info = self._snapshot.get(path)
            if old_info is None:
                changes.append(FileChange(
                    path=path,
                    change_type=ChangeType.CREATE,
                    timestamp=datetime.now(),
                    new_size=current_info['size'],
                    new_md5=current_info['md5']
                ))
            elif (current_info['size'] != old_info['size'] or 
                  current_info['md5'] != old_info['md5']):
                changes.append(FileChange(
                    path=path,
                    change_type=ChangeType.UPDATE,
                    timestamp=datetime.now(),
                    old_size=old_info['size'],
                    old_md5=old_info['md5'],
                    new_size=current_info['size'],
                    new_md5=current_info['md5']
                ))
        
        # 检测删除(快照中有,但当前没有)
        for path in self._snapshot:
            if path not in current_state:
                changes.append(FileChange(
                    path=path,
                    change_type=ChangeType.DELETE,
                    timestamp=datetime.now(),
                    old_size=self._snapshot[path]['size'],
                    old_md5=self._snapshot[path]['md5']
                ))
        
        print(f"发现 {len(changes)} 个变更: "
              f"新建={sum(1 for c in changes if c.change_type == ChangeType.CREATE)}, "
              f"修改={sum(1 for c in changes if c.change_type == ChangeType.UPDATE)}, "
              f"删除={sum(1 for c in changes if c.change_type == ChangeType.DELETE)}")
        
        # 执行变更
        for change in changes:
            self._apply_change(change)
    
    def _apply_change(self, change: FileChange):
        """应用单个变更"""
        if change.change_type == ChangeType.DELETE:
            print(f"  ️ 删除目标文件: {change.path}")
            self.target.delete(change.path)
        elif change.change_type == ChangeType.CREATE:
            print(f"  ➕ 新建: {change.path} ({change.new_size} bytes)")
            self.source.download(change.path, f"/tmp/{change.path}")
            self.target.upload(f"/tmp/{change.path}", change.path)
        elif change.change_type == ChangeType.UPDATE:
            print(f"   更新: {change.path}")
            self.source.download(change.path, f"/tmp/{change.path}")
            self.target.upload(f"/tmp/{change.path}", change.path)

3.4 事故四:文件名的编码陷阱

一家中日合资企业在迁移数据时,发现从日本合作方收到的文件在目标系统里文件名全部乱码。

根本原因是:日本那边用Shift-JIS编码命名文件,文件名里含有日语字符。迁移脚本跑在Linux环境下(UTF-8),读取目录列表时把文件名用UTF-8解析,编码转换过程中大量字符变成了问号。

这不是个小问题。企业云盘里的文件名编码如果不一致,会导致:

  1. 相同文件被识别为不同文件(文件名编码不同,MD5无法匹配)
  2. 文件无法被正常访问(路径中含有乱码字符)
  3. 搜索引擎无法匹配中文文件名

解决方案:在所有文件操作中强制指定编码,并对编码不一致的文件做标记处理:

import codecs
import os
import chardet

def safe_decode(byte_str: bytes, fallback_encodings=['utf-8', 'gbk', 'shift_jis', 'euc-jp', 'iso-8859-1']) -> str:
    """安全解码字节串,尝试多种编码"""
    if isinstance(byte_str, str):
        return byte_str
    
    # 先尝试检测编码
    detected = chardet.detect(byte_str)
    confidence = detected['confidence']
    encoding = detected['encoding']
    
    if confidence > 0.85 and encoding:
        try:
            return byte_str.decode(encoding)
        except (UnicodeDecodeError, LookupError):
            pass
    
    # 逐一尝试fallback编码
    for enc in fallback_encodings:
        try:
            return byte_str.decode(enc)
        except (UnicodeDecodeError, LookupError):
            continue
    
    # 终极手段:替换无法解码的字符
    return byte_str.decode('utf-8', errors='replace')

def sanitize_filename(filename: str) -> str:
    """清理文件名,移除操作系统不允许的字符"""
    # Windows不允许: \ / : * ? " < > |
    # Linux不允许: \ / \0
    illegal_chars = {
        'windows': '\\/:*?"<>|',
        'linux': '\\/\x00'
    }
    
    sanitized = filename
    for char in illegal_chars['windows'] + illegal_chars['linux']:
        sanitized = sanitized.replace(char, '_')
    
    # 移除控制字符
    sanitized = ''.join(c if (ord(c) >= 32 and ord(c) != 127) else '_' for c in sanitized)
    
    # 限制文件名长度(NTFS最大255,ext4最大255)
    if len(sanitized) > 200:
        name, ext = os.path.splitext(sanitized)
        sanitized = name[:200-len(ext)] + ext
    
    return sanitized

def migrate_file_with_encoding_fallback(src_path, dest_path, src_client, dest_client):
    """带编码处理的文件迁移"""
    
    # 获取源文件信息
    src_info = src_client.get_file_info(src_path)
    src_filename_bytes = src_info['name_bytes']  # 原始字节
    
    # 安全解码
    try:
        decoded_name = safe_decode(src_filename_bytes)
    except Exception as e:
        print(f"警告:文件名解码失败 {src_path}: {e}")
        decoded_name = f"decoded_failed_{hashlib.md5(src_filename_bytes).hexdigest()[:8]}"
    
    # 清理文件名
    safe_name = sanitize_filename(decoded_name)
    
    # 下载
    tmp_path = f"/tmp/migration_{os.getpid()}_{safe_name}"
    src_client.download(src_path, tmp_path)
    
    # 上传
    dest_path_full = os.path.join(dest_path, safe_name)
    dest_client.upload(tmp_path, dest_path_full)
    
    # 清理
    os.remove(tmp_path)
    
    return {'original': decoded_name, 'sanitized': safe_name}

3.5 事故五:迁移完成后权限全乱了

迁移完数据后,客户发现所有文件的权限都变成了"公开可读"。这不是迁移脚本的问题,而是权限模型的差异

源系统用的是"文件夹继承权限"模式:子文件夹自动继承父文件夹权限,不需要单独设置。目标系统用的是"独立权限"模式:每个文件/文件夹的权限是独立存储的,不继承。

迁移时,只迁移了文件内容,没有迁移权限体系。结果:所有文件的权限都变成了目标系统的默认权限(通常是创建者的私人权限),而那些原本共享给团队的文件夹,现在变成了"只有创建者可访问"。

教训:数据迁移和权限迁移必须一起规划。迁移前要搞清楚两个系统的权限模型是否兼容:

def analyze_permission_compatibility(source_system, target_system):
    """分析两个系统的权限模型兼容性"""
    
    compatibility_matrix = {
        ('folder_inheritance', 'folder_inheritance'): 'HIGH',
        ('folder_inheritance', 'independent'): 'MEDIUM',  # 需要额外处理
        ('independent', 'folder_inheritance'): 'LOW',  # 不兼容
        ('rbac', 'rbac'): 'HIGH',
        ('rbac', 'acl'): 'MEDIUM',
        ('acl', 'rbac'): 'MEDIUM',
    }
    
    source_model = source_system.get_permission_model()
    target_model = target_system.get_permission_model()
    
    score = compatibility_matrix.get((source_model, target_model), 'LOW')
    
    if score == 'LOW':
        raise MigrationError(
            "权限模型不兼容,请使用ACL映射方案,详见下方说明。"
        )
    
    return score

# ACL映射表:从源权限到目标权限的映射
PERMISSION_MAPPING = {
    # 源系统权限 -> 目标系统权限
    'owner': 'full_control',        # 所有者
    'edit': 'read_write',            # 编辑
    'view': 'read_only',            # 查看
    'comment': 'read_only',          # 评论(降级为只读)
    'share': 'read_write',          # 分享权限在目标系统需要单独处理
}

第四章 生产级迁移工具:核心架构与代码实现

4.1 整体架构

一个生产级的数据迁移工具,通常包含以下组件:

┌─────────────────────────────────────────────────────────────┐
│                    Migration Orchestrator                     │
│  (状态机:IDLE → SNAPSHOT → MIGRATION → VERIFY → SWITCHOVER) │
└────────────────┬────────────────────────────────────────────┘
                 │
    ┌────────────┼────────────┬────────────┐
    ▼            ▼            ▼            ▼
┌────────┐  ┌────────┐  ┌────────┐  ┌────────────┐
│Scanner │  │Transfer│  │Verifier│  │Changelog   │
│ Agent  │  │ Agent  │  │ Agent  │  │ Processor  │
└────────┘  └────────┘  └────────┘  └────────────┘
   │            │            │            │
   └────────────┴────────────┴────────────┘
                 │
         ┌──────▼──────┐
         │  State Store │
         │  (Redis/SQLite)│
         └─────────────┘

4.2 状态机实现

from enum import Enum, auto
from dataclasses import dataclass, field
from datetime import datetime
import json
import os

class MigrationState(Enum):
    IDLE = auto()
    SNAPSHOT = auto()
    MIGRATION = auto()
    VERIFY = auto()
    SWITCHOVER = auto()
    COMPLETED = auto()
    FAILED = auto()
    ABORTED = auto()

@dataclass
class MigrationJob:
    job_id: str
    source_system: str
    target_system: str
    root_path: str
    state: MigrationState = MigrationState.IDLE
    total_files: int = 0
    migrated_files: int = 0
    failed_files: int = 0
    total_bytes: int = 0
    migrated_bytes: int = 0
    errors: list = field(default_factory=list)
    started_at: datetime = None
    completed_at: datetime = None
    state_history: list = field(default_factory=list)
    
    def transition_to(self, new_state: MigrationState, reason: str = ""):
        old_state = self.state
        self.state = new_state
        self.state_history.append({
            'from': old_state.name,
            'to': new_state.name,
            'reason': reason,
            'at': datetime.now().isoformat()
        })
        print(f"[状态机] {old_state.name} → {new_state.name} ({reason})")
    
    def to_dict(self):
        return {
            'job_id': self.job_id,
            'state': self.state.name,
            'progress': f"{self.migrated_files}/{self.total_files} ({self.migrated_files/max(self.total_files,1)*100:.1f}%)",
            'bytes': f"{self.migrated_bytes}/{self.total_bytes} ({self.migrated_bytes/max(self.total_bytes,1)*100:.1f}%)",
            'errors': len(self.errors),
            'started_at': self.started_at.isoformat() if self.started_at else None,
            'completed_at': self.completed_at.isoformat() if self.completed_at else None,
        }

class MigrationOrchestrator:
    """迁移协调器,主状态机"""
    
    def __init__(self, job: MigrationJob, state_store_path: str):
        self.job = job
        self.state_store_path = state_store_path
        self._load_state()
    
    def _load_state(self):
        """从持久化存储加载状态"""
        if os.path.exists(self.state_store_path):
            with open(self.state_store_path, 'r') as f:
                data = json.load(f)
                self.job = MigrationJob(**data)
    
    def _save_state(self):
        """持久化状态"""
        with open(self.state_store_path, 'w') as f:
            json.dump(self.job.__dict__, f, default=str)
    
    def run_snapshot(self):
        """阶段一:建立文件快照"""
        self.job.transition_to(MigrationState.SNAPSHOT, "开始扫描文件")
        scanner = FileScanner(self.job.source_system)
        
        files = scanner.scan(self.job.root_path, progress_callback=self._on_scan_progress)
        self.job.total_files = len(files)
        self.job.total_bytes = sum(f['size'] for f in files)
        
        # 保存快照
        snapshot_path = f"/tmp/snapshot_{self.job.job_id}.json"
        with open(snapshot_path, 'w') as f:
            json.dump(files, f)
        
        self._save_state()
        self.job.transition_to(MigrationState.IDLE, f"快照完成: {self.job.total_files} 文件, {self.job.total_bytes} 字节")
        return snapshot_path
    
    def run_migration(self, snapshot_path: str, parallel_workers: int = 4):
        """阶段二:执行迁移"""
        self.job.started_at = datetime.now()
        self.job.transition_to(MigrationState.MIGRATION, f"启动{parallel_workers}个并发worker")
        
        with open(snapshot_path, 'r') as f:
            files = json.load(f)
        
        # 断点续传:跳过已完成的文件
        already_done = set()
        if self.job.migrated_files > 0:
            checkpoint = f"/tmp/checkpoint_{self.job.job_id}.json"
            if os.path.exists(checkpoint):
                with open(checkpoint, 'r') as f:
                    already_done = set(json.load(f))
                print(f"断点续传:已跳过 {len(already_done)} 个已迁移文件")
        
        transfer_pool = TransferPool(
            source=self.job.source_system,
            target=self.job.target_system,
            workers=parallel_workers
        )
        
        for i, file_info in enumerate(files):
            if file_info['path'] in already_done:
                continue
            
            try:
                transfer_pool.transfer(file_info)
                self.job.migrated_files += 1
                self.job.migrated_bytes += file_info['size']
            except Exception as e:
                self.job.failed_files += 1
                self.job.errors.append({
                    'file': file_info['path'],
                    'error': str(e),
                    'time': datetime.now().isoformat()
                })
                # 连续失败超过10次,暂停迁移,人工介入
                if self.job.failed_files >= 10:
                    self.job.transition_to(MigrationState.FAILED, f"连续失败超过10次: {e}")
                    return
            
            # 每迁移100个文件保存一次checkpoint
            if i % 100 == 0:
                self._save_checkpoint(already_done)
                self._save_state()
        
        self.job.transition_to(MigrationState.VERIFY, "迁移完成,开始校验")
        self._save_checkpoint(already_done)
    
    def run_verification(self):
        """阶段三:校验"""
        verifier = IntegrityVerifier(
            source=self.job.source_system,
            target=self.job.target_system
        )
        
        report = verifier.verify_all(progress_callback=self._on_verify_progress)
        
        if report['mismatch_count'] > 0:
            print(f"⚠️ 校验发现 {report['mismatch_count']} 个不匹配文件")
            for mismatch in report['mismatches'][:10]:
                print(f"  - {mismatch['path']}: {mismatch['reason']}")
            
            # 重新传输不匹配的文件
            self.job.transition_to(MigrationState.MIGRATION, "重传不匹配文件")
            for mismatch in report['mismatches']:
                verifier.retransfer(mismatch['path'])
        
        self.job.transition_to(MigrationState.SWITCHOVER, "校验通过,准备切换")
    
    def run_switchover(self, downtime_minutes: int = 30):
        """阶段四:最终切换(停机窗口)"""
        print(f"⚠️ 进入停机切换窗口,预计需要 {downtime_minutes} 分钟")
        
        # 1. 通知用户系统即将维护
        # 2. 锁定源系统写入
        # 3. 执行最终增量同步
        # 4. 再次校验关键文件
        # 5. 切换DNS/入口
        # 6. 验证新系统可访问
        # 7. 通知用户切换完成
        
        self.job.completed_at = datetime.now()
        self.job.transition_to(MigrationState.COMPLETED, "迁移完成")

4.3 多线程传输引擎

import threading
import queue
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Optional
import hashlib

@dataclass
class TransferTask:
    file_id: str
    source_path: str
    dest_path: str
    size: int
    expected_md5: Optional[str]
    priority: int = 0

@dataclass
class TransferResult:
    task: TransferTask
    success: bool
    bytes_transferred: int = 0
    duration_ms: int = 0
    error: Optional[str] = None

class TransferPool:
    """并发传输池"""
    
    def __init__(self, source, target, workers: int = 4, 
                 chunk_size: int = 8*1024*1024,
                 max_queue_size: int = 1000):
        self.source = source
        self.target = target
        self.workers = workers
        self.chunk_size = chunk_size
        self.task_queue = queue.PriorityQueue(maxsize=max_queue_size)
        self.results = []
        self._lock = threading.Lock()
        
        # 流量控制
        self._rate_limiter = threading.Semaphore(workers)
        self._current_speed = 0  # bytes/s
    
    def transfer(self, file_info: dict) -> TransferResult:
        """单文件传输(带重试)"""
        task = TransferTask(
            file_id=file_info.get('id', file_info['path']),
            source_path=file_info['path'],
            dest_path=file_info['path'],
            size=file_info['size'],
            expected_md5=file_info.get('md5')
        )
        
        max_retries = 3
        for attempt in range(max_retries):
            try:
                result = self._do_transfer(task)
                
                # 校验
                if result.success and task.expected_md5:
                    actual_md5 = hashlib.md5(
                        open(result.bytes_transferred, 'rb').read()
                    ).hexdigest()
                    if actual_md5 != task.expected_md5:
                        result.success = False
                        result.error = f"MD5校验失败: 期望{task.expected_md5}, 实际{actual_md5}"
                        continue  # 重试
                
                return result
            except Exception as e:
                if attempt == max_retries - 1:
                    return TransferResult(
                        task=task,
                        success=False,
                        error=f"重试{MAX_RETRIES}次后失败: {e}"
                    )
                time.sleep(2 ** attempt)  # 指数退避
        
        return TransferResult(task=task, success=False, error="重试耗尽")
    
    def _do_transfer(self, task: TransferTask) -> TransferResult:
        """执行单次传输"""
        start = time.time()
        
        tmp_path = f"/tmp/migration_{os.getpid()}_{task.file_id}"
        
        try:
            # 下载
            self.source.download(task.source_path, tmp_path)
            
            # 上传
            self.target.upload(tmp_path, task.dest_path)
            
            duration = time.time() - start
            bytes_transferred = task.size
            speed = bytes_transferred / duration if duration > 0 else 0
            
            return TransferResult(
                task=task,
                success=True,
                bytes_transferred=bytes_transferred,
                duration_ms=int(duration * 1000)
            )
        finally:
            if os.path.exists(tmp_path):
                os.remove(tmp_path)
    
    def batch_transfer(self, file_list: list, priority_mode: bool = False):
        """批量传输(带优先级)"""
        
        def worker():
            while True:
                try:
                    if priority_mode:
                        task = self.task_queue.get(timeout=5)
                    else:
                        task = self.task_queue.get_nowait()
                except queue.Empty:
                    break
                
                self._rate_limiter.acquire()
                try:
                    result = self.transfer(task)
                    with self._lock:
                        self.results.append(result)
                finally:
                    self._rate_limiter.release()
                    self.task_queue.task_done()
        
        # 填充队列
        for file_info in file_list:
            task = TransferTask(
                file_id=file_info.get('id', file_info['path']),
                source_path=file_info['path'],
                dest_path=file_info['path'],
                size=file_info['size'],
                expected_md5=file_info.get('md5'),
                priority=file_info.get('priority', 0)
            )
            self.task_queue.put((task.priority, task))
        
        # 启动worker
        threads = []
        for _ in range(self.workers):
            t = threading.Thread(target=worker)
            t.start()
            threads.append(t)
        
        for t in threads:
            t.join()
        
        return self.results

第五章 迁移后的收尾工作

5.1 必须做的十项检查

迁移完成后,不要急着庆祝。以下十项检查必须逐项确认:

  1. 文件数量对比:源和目标的文件总数是否一致?
  2. 文件大小对比:总字节数是否一致?
  3. 随机抽样校验:随机抽取5%的文件,逐一比对MD5
  4. 权限验证:抽查10个不同文件夹的权限是否正确
  5. 目录结构验证:目录树是否完整,有没有文件夹丢失?
  6. 符号链接验证:符号链接是否仍然有效?
  7. 共享链接验证:原有共享链接是否还能访问?
  8. 搜索功能验证:搜索几个文件名,看能否找到
  9. 版本历史验证:历史版本数量是否完整?
  10. 最后写入时间验证:文件修改时间是否保留?

5.2 切换后的监控

切换完成后,还要持续监控72小时:

- 错误率:是否有大量404或500?
- 延迟:新增文件的访问延迟是否正常?
- 带宽:用户访问是否造成带宽异常?
- 存储:目标系统存储是否正常增长(新增文件正常写入)?

第六章 成本优化:怎么把迁移费用砍掉一半

6.1 迁移前:数据清理

前面提到,迁移前先做数据清理。这不是可选项,而是必选项。

我建议用以下脚本做预迁移扫描:

def pre_migration_audit(root_path, client, retention_days=365):
    """
    预迁移审计:找出可清理的数据
    返回:可清理文件列表和建议清理量
    """
    large_files = []     # 大于1GB的文件
    duplicate_files = [] # 重复文件
    old_files = []      # 超过保留期的文件
    
    md5_map = {}
    
    for file_info in client.walk_files(root_path):
        # 过期文件
        if file_info['modified_at'] < datetime.now() - timedelta(days=retention_days):
            old_files.append(file_info)
        
        # 大文件
        if file_info['size'] > 1024**3:
            large_files.append(file_info)
        
        # 重复文件(根据MD5)
        if file_info.get('md5'):
            if file_info['md5'] in md5_map:
                duplicate_files.append({
                    'original': md5_map[file_info['md5']],
                    'duplicate': file_info
                })
            else:
                md5_map[file_info['md5']] = file_info
    
    total_cleanable = sum(f['size'] for f in old_files)
    total_cleanable += sum(f['size'] for f in large_files if f in old_files)
    
    return {
        'old_files_count': len(old_files),
        'old_files_size': sum(f['size'] for f in old_files),
        'duplicate_files_count': len(duplicate_files),
        'duplicate_files_size': sum(f['size'] for f in duplicate_files),
        'large_files_count': len(large_files),
        'estimated_reduction_pct': total_cleanable / sum(f['size'] for f in [old_files[0]] if old_files) * 100
            if old_files else 0
    }

6.2 传输中:带宽优化

  • 启用压缩传输:在网络带宽有限时,开启传输压缩可节省30-50%带宽,但会增加CPU消耗
  • 分时段传输:利用夜间低峰期迁移,避开业务高峰
  • 增量优先:先迁移增量部分(通常是近期数据),历史数据可以慢慢迁

结语

数据迁移不是搬家,而是做一次器官移植。搬家的东西可以扔,器官移植的每一条血管都要接上。

本文的核心教训可以归纳为三条:

  1. 迁移前要审计:清理数据、摸清家底、制定方案。三件事没做完就动手,早晚出事。
  2. 迁移中要校验:不要相信任何"传输成功"的默认结论,每一个字节都要验证。
  3. 迁移后要监控:切换完成不是终点,72小时内的异常监控才是验收标准。

如果你正在准备一次迁移,建议把本文的代码模块作为基础框架,根据你的实际环境做调整。迁移工具本身要经过充分测试——至少在测试环境里完整跑通一次,再上生产。

最后一句话:永远保持对数据的敬畏之心。


本文档由虾皮维护 | 写作日期:2026-05-08 | 字数:约24000字

发表评论

电子邮件地址不会被公开。 必填项已用*标注