企业云盘数据迁移实战:如何安全迁移100TB数据而不丢一行记录
前言
2024年双十一前一天夜里三点,某电商公司的技术负责人老张给我打了一通电话。电话那头的声音带着压抑的焦虑:"我们换了企业云盘,新旧系统数据对不上,差了8GB。业务明天就要用,你能不能告诉我哪里出了问题?"
这不是我第一次遇到这种情况。在过去三年里,我直接或间接参与了超过二十次企业云盘数据迁移项目,见过各种匪夷所思的问题:文件校验和算错导致重复迁移、符号链接循环导致迁移程序死循环、某些特殊字符的文件名在不同系统间编码不一致、元数据(修改时间、所有者权限)丢失导致法律合规问题、迁移过程中新文件还在写入导致数据不一致……
每一次迁移都是一场外科手术级别的精密操作。本文把我踩过的坑、总结的方法论、以及生产级迁移工具的核心代码,全部摊开来讲。如果你正在准备一次企业云盘迁移,这篇文章能让你少走至少两个月的弯路。
第一章 迁移前必须想清楚的七个问题
1.1 问题一:100TB数据到底是什么概念
很多人对数据量缺乏直观感受。让我来换算一下:
- 100TB = 102,400GB
- 以千兆网络传输,理论速度125MB/s,实际稳定速度约80MB/s
- 理论最短时间:100TB ÷ 80MB/s = 1,280,000秒 ≈ 14.8天
- 以万兆网络传输,理论速度1250MB/s,实际稳定速度约800MB/s
- 理论最短时间:100TB ÷ 800MB/s = 128,000秒 ≈ 1.48天
但这只是理论值。实际生产环境中,100TB数据的迁移通常需要2-4周的窗口期。原因如下:
- 业务不能停,但迁移窗口有限(通常是凌晨或周末)
- 迁移过程中需要校验,校验是IO密集型操作,会拖慢传输
- 网络带宽不是独占的,其他业务也在用
- 迁移不是一次性完成,而是增量迁移——先迁历史数据,再增量同步
我见过最夸张的案例是:一家设计公司声称迁移100TB数据,结果清理完重复文件、压缩完历史版本后,实际需要迁移的数据只有35TB,节省了65%的迁移时间和成本。
迁移前第一步:清理数据。 很多企业云盘里存着三五年前的过期文件、重复文件的多个版本、测试环境的垃圾数据。先把仓库清理干净,能省下大量时间和金钱。
1.2 问题二:你的数据是什么类型
企业云盘里的数据不是铁板一块,按特征分:
| 数据类型 | 特征 | 迁移难度 |
|---|---|---|
| 结构化文件(文档、图片、视频) | 大小均匀,可并行 | 低 |
| 小文件集合(代码、配置文件) | 数量多,单个<1MB | 中 |
| 大文件(原始视频、工程图纸) | 单个>10GB | 中 |
| 数据库(SQLite/MongoDB元数据) | 强一致性强依赖 | 高 |
| 历史版本快照 | 版本链复杂,差异存储 | 高 |
| 符号链接/快捷方式 | 依赖关系复杂 | 中 |
如果你的云盘里小文件占比超过60%,迁移速度会显著下降——因为每个文件都需要建立连接、认证、传输、确认,大量的TCP握手和ACK确认会吃掉带宽。
1.3 问题三:迁移窗口期怎么定
企业云盘迁移的黄金窗口是业务低峰期。以下几个时间点通常较好:
- 凌晨2:00-6:00:大多数企业的业务低谷
- 周五深夜到周六凌晨:配合周末,窗口最长
- 法定长假前一天:如国庆、春节前,窗口可达72小时
但迁移窗口不是越长越好。时间越长,累积的新数据越多,增量同步压力越大。
我的经验公式:迁移窗口 = (历史数据量 ÷ 实际传输速度) × 1.5 + 4小时缓冲
假设历史数据100TB,实际传输速度800MB/s:
窗口 = (100×1024×1024÷800)秒 × 1.5 + 14400秒 ≈ 228,000秒 ≈ 63小时 ≈ 2.6天
这意味着你需要一个约3天的迁移窗口。
1.4 问题四:要不要停机迁移
这是最关键的技术决策。
停机迁移:在迁移窗口内暂停所有写入操作,确保迁移期间数据静止。优点是数据一致性有保障,缺点是影响业务。
不停机迁移(双写+增量同步):新旧系统同时运行一段时间,增量数据实时同步到新系统。切换时只需要短暂停止写入(通常几十分钟),然后做最终增量同步。优点是对业务影响小,缺点是技术复杂度高。
对于大多数企业,我建议采用混合策略:
- 先做历史数据迁移(不停机,增量同步运行)
- 业务低峰期做短暂停机(2-4小时),完成最终切换
第二章 迁移方案设计:三条路径的利弊分析
2.1 路径一:官方导出/导入工具
大多数企业云盘服务商会提供官方迁移工具。优点是省心,缺点是:
- 受限于服务商的格式支持度
- 无法处理自定义字段和特殊权限
- 迁移进度不可控,遇到问题只能联系客服
- 无法增量迁移,必须一次性完成
适用场景:数据量小于1TB,迁移窗口充足,且对元数据完整性要求不高。
2.2 路径二:API脚本迁移
通过源云盘和目标云盘的API读写数据,实现程序化迁移。优点是灵活、可控、可定制;缺点是需要开发工作。
这是大多数技术团队的选择,也是本文重点讨论的方案。
2.3 路径三:存储层直接复制
绕过应用层,直接复制底层存储(对象存储S3协议、NFS等)。优点是速度最快(直接走存储协议,不走HTTP),缺点是:
- 需要对源和目标存储都有管理员权限
- 元数据和权限信息可能需要额外处理
- 风险最高,一旦出错没有应用层的保护
适用场景:同一存储后端之间的迁移(如从MinIO集群A迁移到MinIO集群B),或者有专业存储团队支持。
第三章 踩坑实录:五个真实的迁移事故
3.1 事故一:那个把迁移脚本跑在测试环境的夜晚
某制造业客户的技术团队安排了两名工程师做数据迁移。小李负责写脚本,小王负责执行。
脚本写好后,小王准备在测试环境跑一遍脚本逻辑。结果,他复制粘贴命令时,把测试环境的路径粘贴到了生产环境的SSH终端里。脚本开始执行的那一刻,他还没反应过来——直到看到文件列表刷屏才发现不对。
万幸的是,他们用的是"先读取源文件,只下载不删除"的dry run模式,没有对源数据造成破坏。但测试环境的脚本跑通了生产环境的命令,差点把测试数据给覆盖了。
教训:迁移脚本必须做环境隔离检查。在脚本开头加上:
import os
import socket
# 强制要求设置环境变量作为安全确认
ENV_NAME = os.environ.get('MIGRATION_ENV', '')
if ENV_NAME not in ['PRODUCTION', 'PROD']:
raise RuntimeError(f"安全检查失败:当前环境为'{ENV_NAME}',不是生产环境!\n"
f"请设置 export MIGRATION_ENV=PRODUCTION 后再执行。")
TARGET_HOST = os.environ.get('TARGET_HOST', '')
if '192.168.1' not in TARGET_HOST and 'prod' not in TARGET_HOST.lower():
raise RuntimeError(f"安全检查失败:目标地址'{TARGET_HOST}'不是生产环境!")
print(f"⚠️ 确认在生产环境 [{socket.gethostname()}] 上执行")
print(f"⚠️ 迁移目标: {TARGET_HOST}")
print(f"⚠️ 5秒后开始执行...")
time.sleep(5)
这个检查救了这位工程师无数次。不是防止误操作,是强制让他确认自己在做什么。
3.2 事故二:校验和导致的"数据失踪"事件
开头提到的老张遇到的问题,根本原因就在校验和。
他们用的迁移脚本在传输完文件后,用MD5校验文件完整性。问题出在:源云盘在存储视频文件时,会对大于4GB的文件进行分块存储,每块独立计算MD5。导出时,源系统的API返回的MD5是"合并后"的MD5,但实际文件在传输过程中被分块下载后再合并——合并顺序的问题导致最终文件MD5和原文件MD5不一致。
于是校验失败,脚本把不一致的文件标记为"待重试",但重试时又从同样的分块逻辑下载,形成了一个死循环。最后脚本认为"已全部迁移完成",但实际上有8GB的视频文件在合并过程中发生了轻微损坏,校验没通过却没被发现。
排查这个问题花了两天。解决方案是:对于大文件,不要用整体MD5,改用分块校验 + 文件大小双重确认:
import hashlib
import os
def verify_file_chunked(local_path, expected_chunksums, chunk_size=8*1024*1024):
"""
分块校验大文件。
expected_chunksums: List[Dict], 每块的校验信息
例如: [{'offset': 0, 'size': 8388608, 'md5': 'abc123...'}, ...]
"""
file_size = os.path.getsize(local_path)
with open(local_path, 'rb') as f:
for i, chunk_info in enumerate(expected_chunksums):
offset = chunk_info['offset']
expected_md5 = chunk_info['md5']
expected_size = chunk_info['size']
f.seek(offset)
data = f.read(min(chunk_size, expected_size))
# 超时保护:读取大小不符
if len(data) != expected_size:
return False, f"Chunk {i}: expected {expected_size} bytes, got {len(data)}"
actual_md5 = hashlib.md5(data).hexdigest()
if actual_md5 != expected_md5:
return False, f"Chunk {i} MD5 mismatch: expected {expected_md5}, got {actual_md5}"
return True, "OK"
同时,脚本还要记录实际传输的文件大小——对于那些压缩后再存储的视频文件,存储系统会压缩原始文件,但API返回的文件大小是压缩前的大小。传输完成后拿实际文件大小和API返回的文件大小对比,如果差异超过1%,就要报警人工介入。
3.3 事故三:增量同步时忽略的"幽灵文件"
不停机迁移最难处理的是增量同步窗口内的数据变化。
我们遇到过这个场景:用户A在源系统里有一份文件夹,里面有100个文件。在迁移开始时,这100个文件的文件名、大小、MD5都被记录了。但在增量同步期间,用户A删除了其中5个文件,又重命名了3个文件,还新建了2个文件。
普通的增量同步只会同步"新建"和"修改"的文件,不会处理"删除"操作。如果不单独记录删除事件,迁移后目标系统会多出5个幽灵文件——用户明明在源系统删除了它们,但它们还在目标系统里。
解决方案:增量同步必须支持操作日志(Changelog)模式:
from dataclasses import dataclass
from enum import Enum
from typing import List
from datetime import datetime
class ChangeType(Enum):
CREATE = "create"
UPDATE = "update"
DELETE = "delete"
RENAME = "rename"
@dataclass
class FileChange:
path: str
change_type: ChangeType
timestamp: datetime
# DELETE事件时,old_size和old_md5是必要的,用于验证删除操作
old_size: int = None
old_md5: str = None
new_size: int = None
new_md5: str = None
# RENAME事件时的旧路径
old_path: str = None
class IncrementalSyncEngine:
def __init__(self, source_client, target_client):
self.source = source_client
self.target = target_client
# 维护一份文件清单快照,用于判断删除事件
self._snapshot = {}
def create_snapshot(self, root_path="/"):
"""建立文件清单快照"""
print("创建文件快照...")
self._snapshot.clear()
def walk(path):
for item in self.source.list_files(path):
self._snapshot[item['path']] = {
'size': item['size'],
'md5': item['md5'],
'modified': item['modified_at']
}
if item['is_dir']:
walk(item['path'])
walk(root_path)
print(f"快照建立完成,共 {len(self._snapshot)} 个文件")
def diff_and_sync(self, dest_path="/"):
"""对比快照与当前状态,生成差异操作并同步"""
print("计算差异...")
changes = []
current_state = {}
def walk_current(path):
for item in self.source.list_files(path):
current_state[item['path']] = {
'size': item['size'],
'md5': item['md5'],
'modified': item['modified_at']
}
if item['is_dir']:
walk_current(item['path'])
walk_current(dest_path)
# 检测新建和修改
for path, current_info in current_state.items():
old_info = self._snapshot.get(path)
if old_info is None:
changes.append(FileChange(
path=path,
change_type=ChangeType.CREATE,
timestamp=datetime.now(),
new_size=current_info['size'],
new_md5=current_info['md5']
))
elif (current_info['size'] != old_info['size'] or
current_info['md5'] != old_info['md5']):
changes.append(FileChange(
path=path,
change_type=ChangeType.UPDATE,
timestamp=datetime.now(),
old_size=old_info['size'],
old_md5=old_info['md5'],
new_size=current_info['size'],
new_md5=current_info['md5']
))
# 检测删除(快照中有,但当前没有)
for path in self._snapshot:
if path not in current_state:
changes.append(FileChange(
path=path,
change_type=ChangeType.DELETE,
timestamp=datetime.now(),
old_size=self._snapshot[path]['size'],
old_md5=self._snapshot[path]['md5']
))
print(f"发现 {len(changes)} 个变更: "
f"新建={sum(1 for c in changes if c.change_type == ChangeType.CREATE)}, "
f"修改={sum(1 for c in changes if c.change_type == ChangeType.UPDATE)}, "
f"删除={sum(1 for c in changes if c.change_type == ChangeType.DELETE)}")
# 执行变更
for change in changes:
self._apply_change(change)
def _apply_change(self, change: FileChange):
"""应用单个变更"""
if change.change_type == ChangeType.DELETE:
print(f" ️ 删除目标文件: {change.path}")
self.target.delete(change.path)
elif change.change_type == ChangeType.CREATE:
print(f" ➕ 新建: {change.path} ({change.new_size} bytes)")
self.source.download(change.path, f"/tmp/{change.path}")
self.target.upload(f"/tmp/{change.path}", change.path)
elif change.change_type == ChangeType.UPDATE:
print(f" 更新: {change.path}")
self.source.download(change.path, f"/tmp/{change.path}")
self.target.upload(f"/tmp/{change.path}", change.path)
3.4 事故四:文件名的编码陷阱
一家中日合资企业在迁移数据时,发现从日本合作方收到的文件在目标系统里文件名全部乱码。
根本原因是:日本那边用Shift-JIS编码命名文件,文件名里含有日语字符。迁移脚本跑在Linux环境下(UTF-8),读取目录列表时把文件名用UTF-8解析,编码转换过程中大量字符变成了问号。
这不是个小问题。企业云盘里的文件名编码如果不一致,会导致:
- 相同文件被识别为不同文件(文件名编码不同,MD5无法匹配)
- 文件无法被正常访问(路径中含有乱码字符)
- 搜索引擎无法匹配中文文件名
解决方案:在所有文件操作中强制指定编码,并对编码不一致的文件做标记处理:
import codecs
import os
import chardet
def safe_decode(byte_str: bytes, fallback_encodings=['utf-8', 'gbk', 'shift_jis', 'euc-jp', 'iso-8859-1']) -> str:
"""安全解码字节串,尝试多种编码"""
if isinstance(byte_str, str):
return byte_str
# 先尝试检测编码
detected = chardet.detect(byte_str)
confidence = detected['confidence']
encoding = detected['encoding']
if confidence > 0.85 and encoding:
try:
return byte_str.decode(encoding)
except (UnicodeDecodeError, LookupError):
pass
# 逐一尝试fallback编码
for enc in fallback_encodings:
try:
return byte_str.decode(enc)
except (UnicodeDecodeError, LookupError):
continue
# 终极手段:替换无法解码的字符
return byte_str.decode('utf-8', errors='replace')
def sanitize_filename(filename: str) -> str:
"""清理文件名,移除操作系统不允许的字符"""
# Windows不允许: \ / : * ? " < > |
# Linux不允许: \ / \0
illegal_chars = {
'windows': '\\/:*?"<>|',
'linux': '\\/\x00'
}
sanitized = filename
for char in illegal_chars['windows'] + illegal_chars['linux']:
sanitized = sanitized.replace(char, '_')
# 移除控制字符
sanitized = ''.join(c if (ord(c) >= 32 and ord(c) != 127) else '_' for c in sanitized)
# 限制文件名长度(NTFS最大255,ext4最大255)
if len(sanitized) > 200:
name, ext = os.path.splitext(sanitized)
sanitized = name[:200-len(ext)] + ext
return sanitized
def migrate_file_with_encoding_fallback(src_path, dest_path, src_client, dest_client):
"""带编码处理的文件迁移"""
# 获取源文件信息
src_info = src_client.get_file_info(src_path)
src_filename_bytes = src_info['name_bytes'] # 原始字节
# 安全解码
try:
decoded_name = safe_decode(src_filename_bytes)
except Exception as e:
print(f"警告:文件名解码失败 {src_path}: {e}")
decoded_name = f"decoded_failed_{hashlib.md5(src_filename_bytes).hexdigest()[:8]}"
# 清理文件名
safe_name = sanitize_filename(decoded_name)
# 下载
tmp_path = f"/tmp/migration_{os.getpid()}_{safe_name}"
src_client.download(src_path, tmp_path)
# 上传
dest_path_full = os.path.join(dest_path, safe_name)
dest_client.upload(tmp_path, dest_path_full)
# 清理
os.remove(tmp_path)
return {'original': decoded_name, 'sanitized': safe_name}
3.5 事故五:迁移完成后权限全乱了
迁移完数据后,客户发现所有文件的权限都变成了"公开可读"。这不是迁移脚本的问题,而是权限模型的差异。
源系统用的是"文件夹继承权限"模式:子文件夹自动继承父文件夹权限,不需要单独设置。目标系统用的是"独立权限"模式:每个文件/文件夹的权限是独立存储的,不继承。
迁移时,只迁移了文件内容,没有迁移权限体系。结果:所有文件的权限都变成了目标系统的默认权限(通常是创建者的私人权限),而那些原本共享给团队的文件夹,现在变成了"只有创建者可访问"。
教训:数据迁移和权限迁移必须一起规划。迁移前要搞清楚两个系统的权限模型是否兼容:
def analyze_permission_compatibility(source_system, target_system):
"""分析两个系统的权限模型兼容性"""
compatibility_matrix = {
('folder_inheritance', 'folder_inheritance'): 'HIGH',
('folder_inheritance', 'independent'): 'MEDIUM', # 需要额外处理
('independent', 'folder_inheritance'): 'LOW', # 不兼容
('rbac', 'rbac'): 'HIGH',
('rbac', 'acl'): 'MEDIUM',
('acl', 'rbac'): 'MEDIUM',
}
source_model = source_system.get_permission_model()
target_model = target_system.get_permission_model()
score = compatibility_matrix.get((source_model, target_model), 'LOW')
if score == 'LOW':
raise MigrationError(
"权限模型不兼容,请使用ACL映射方案,详见下方说明。"
)
return score
# ACL映射表:从源权限到目标权限的映射
PERMISSION_MAPPING = {
# 源系统权限 -> 目标系统权限
'owner': 'full_control', # 所有者
'edit': 'read_write', # 编辑
'view': 'read_only', # 查看
'comment': 'read_only', # 评论(降级为只读)
'share': 'read_write', # 分享权限在目标系统需要单独处理
}
第四章 生产级迁移工具:核心架构与代码实现
4.1 整体架构
一个生产级的数据迁移工具,通常包含以下组件:
┌─────────────────────────────────────────────────────────────┐
│ Migration Orchestrator │
│ (状态机:IDLE → SNAPSHOT → MIGRATION → VERIFY → SWITCHOVER) │
└────────────────┬────────────────────────────────────────────┘
│
┌────────────┼────────────┬────────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────────┐
│Scanner │ │Transfer│ │Verifier│ │Changelog │
│ Agent │ │ Agent │ │ Agent │ │ Processor │
└────────┘ └────────┘ └────────┘ └────────────┘
│ │ │ │
└────────────┴────────────┴────────────┘
│
┌──────▼──────┐
│ State Store │
│ (Redis/SQLite)│
└─────────────┘
4.2 状态机实现
from enum import Enum, auto
from dataclasses import dataclass, field
from datetime import datetime
import json
import os
class MigrationState(Enum):
IDLE = auto()
SNAPSHOT = auto()
MIGRATION = auto()
VERIFY = auto()
SWITCHOVER = auto()
COMPLETED = auto()
FAILED = auto()
ABORTED = auto()
@dataclass
class MigrationJob:
job_id: str
source_system: str
target_system: str
root_path: str
state: MigrationState = MigrationState.IDLE
total_files: int = 0
migrated_files: int = 0
failed_files: int = 0
total_bytes: int = 0
migrated_bytes: int = 0
errors: list = field(default_factory=list)
started_at: datetime = None
completed_at: datetime = None
state_history: list = field(default_factory=list)
def transition_to(self, new_state: MigrationState, reason: str = ""):
old_state = self.state
self.state = new_state
self.state_history.append({
'from': old_state.name,
'to': new_state.name,
'reason': reason,
'at': datetime.now().isoformat()
})
print(f"[状态机] {old_state.name} → {new_state.name} ({reason})")
def to_dict(self):
return {
'job_id': self.job_id,
'state': self.state.name,
'progress': f"{self.migrated_files}/{self.total_files} ({self.migrated_files/max(self.total_files,1)*100:.1f}%)",
'bytes': f"{self.migrated_bytes}/{self.total_bytes} ({self.migrated_bytes/max(self.total_bytes,1)*100:.1f}%)",
'errors': len(self.errors),
'started_at': self.started_at.isoformat() if self.started_at else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
}
class MigrationOrchestrator:
"""迁移协调器,主状态机"""
def __init__(self, job: MigrationJob, state_store_path: str):
self.job = job
self.state_store_path = state_store_path
self._load_state()
def _load_state(self):
"""从持久化存储加载状态"""
if os.path.exists(self.state_store_path):
with open(self.state_store_path, 'r') as f:
data = json.load(f)
self.job = MigrationJob(**data)
def _save_state(self):
"""持久化状态"""
with open(self.state_store_path, 'w') as f:
json.dump(self.job.__dict__, f, default=str)
def run_snapshot(self):
"""阶段一:建立文件快照"""
self.job.transition_to(MigrationState.SNAPSHOT, "开始扫描文件")
scanner = FileScanner(self.job.source_system)
files = scanner.scan(self.job.root_path, progress_callback=self._on_scan_progress)
self.job.total_files = len(files)
self.job.total_bytes = sum(f['size'] for f in files)
# 保存快照
snapshot_path = f"/tmp/snapshot_{self.job.job_id}.json"
with open(snapshot_path, 'w') as f:
json.dump(files, f)
self._save_state()
self.job.transition_to(MigrationState.IDLE, f"快照完成: {self.job.total_files} 文件, {self.job.total_bytes} 字节")
return snapshot_path
def run_migration(self, snapshot_path: str, parallel_workers: int = 4):
"""阶段二:执行迁移"""
self.job.started_at = datetime.now()
self.job.transition_to(MigrationState.MIGRATION, f"启动{parallel_workers}个并发worker")
with open(snapshot_path, 'r') as f:
files = json.load(f)
# 断点续传:跳过已完成的文件
already_done = set()
if self.job.migrated_files > 0:
checkpoint = f"/tmp/checkpoint_{self.job.job_id}.json"
if os.path.exists(checkpoint):
with open(checkpoint, 'r') as f:
already_done = set(json.load(f))
print(f"断点续传:已跳过 {len(already_done)} 个已迁移文件")
transfer_pool = TransferPool(
source=self.job.source_system,
target=self.job.target_system,
workers=parallel_workers
)
for i, file_info in enumerate(files):
if file_info['path'] in already_done:
continue
try:
transfer_pool.transfer(file_info)
self.job.migrated_files += 1
self.job.migrated_bytes += file_info['size']
except Exception as e:
self.job.failed_files += 1
self.job.errors.append({
'file': file_info['path'],
'error': str(e),
'time': datetime.now().isoformat()
})
# 连续失败超过10次,暂停迁移,人工介入
if self.job.failed_files >= 10:
self.job.transition_to(MigrationState.FAILED, f"连续失败超过10次: {e}")
return
# 每迁移100个文件保存一次checkpoint
if i % 100 == 0:
self._save_checkpoint(already_done)
self._save_state()
self.job.transition_to(MigrationState.VERIFY, "迁移完成,开始校验")
self._save_checkpoint(already_done)
def run_verification(self):
"""阶段三:校验"""
verifier = IntegrityVerifier(
source=self.job.source_system,
target=self.job.target_system
)
report = verifier.verify_all(progress_callback=self._on_verify_progress)
if report['mismatch_count'] > 0:
print(f"⚠️ 校验发现 {report['mismatch_count']} 个不匹配文件")
for mismatch in report['mismatches'][:10]:
print(f" - {mismatch['path']}: {mismatch['reason']}")
# 重新传输不匹配的文件
self.job.transition_to(MigrationState.MIGRATION, "重传不匹配文件")
for mismatch in report['mismatches']:
verifier.retransfer(mismatch['path'])
self.job.transition_to(MigrationState.SWITCHOVER, "校验通过,准备切换")
def run_switchover(self, downtime_minutes: int = 30):
"""阶段四:最终切换(停机窗口)"""
print(f"⚠️ 进入停机切换窗口,预计需要 {downtime_minutes} 分钟")
# 1. 通知用户系统即将维护
# 2. 锁定源系统写入
# 3. 执行最终增量同步
# 4. 再次校验关键文件
# 5. 切换DNS/入口
# 6. 验证新系统可访问
# 7. 通知用户切换完成
self.job.completed_at = datetime.now()
self.job.transition_to(MigrationState.COMPLETED, "迁移完成")
4.3 多线程传输引擎
import threading
import queue
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Optional
import hashlib
@dataclass
class TransferTask:
file_id: str
source_path: str
dest_path: str
size: int
expected_md5: Optional[str]
priority: int = 0
@dataclass
class TransferResult:
task: TransferTask
success: bool
bytes_transferred: int = 0
duration_ms: int = 0
error: Optional[str] = None
class TransferPool:
"""并发传输池"""
def __init__(self, source, target, workers: int = 4,
chunk_size: int = 8*1024*1024,
max_queue_size: int = 1000):
self.source = source
self.target = target
self.workers = workers
self.chunk_size = chunk_size
self.task_queue = queue.PriorityQueue(maxsize=max_queue_size)
self.results = []
self._lock = threading.Lock()
# 流量控制
self._rate_limiter = threading.Semaphore(workers)
self._current_speed = 0 # bytes/s
def transfer(self, file_info: dict) -> TransferResult:
"""单文件传输(带重试)"""
task = TransferTask(
file_id=file_info.get('id', file_info['path']),
source_path=file_info['path'],
dest_path=file_info['path'],
size=file_info['size'],
expected_md5=file_info.get('md5')
)
max_retries = 3
for attempt in range(max_retries):
try:
result = self._do_transfer(task)
# 校验
if result.success and task.expected_md5:
actual_md5 = hashlib.md5(
open(result.bytes_transferred, 'rb').read()
).hexdigest()
if actual_md5 != task.expected_md5:
result.success = False
result.error = f"MD5校验失败: 期望{task.expected_md5}, 实际{actual_md5}"
continue # 重试
return result
except Exception as e:
if attempt == max_retries - 1:
return TransferResult(
task=task,
success=False,
error=f"重试{MAX_RETRIES}次后失败: {e}"
)
time.sleep(2 ** attempt) # 指数退避
return TransferResult(task=task, success=False, error="重试耗尽")
def _do_transfer(self, task: TransferTask) -> TransferResult:
"""执行单次传输"""
start = time.time()
tmp_path = f"/tmp/migration_{os.getpid()}_{task.file_id}"
try:
# 下载
self.source.download(task.source_path, tmp_path)
# 上传
self.target.upload(tmp_path, task.dest_path)
duration = time.time() - start
bytes_transferred = task.size
speed = bytes_transferred / duration if duration > 0 else 0
return TransferResult(
task=task,
success=True,
bytes_transferred=bytes_transferred,
duration_ms=int(duration * 1000)
)
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)
def batch_transfer(self, file_list: list, priority_mode: bool = False):
"""批量传输(带优先级)"""
def worker():
while True:
try:
if priority_mode:
task = self.task_queue.get(timeout=5)
else:
task = self.task_queue.get_nowait()
except queue.Empty:
break
self._rate_limiter.acquire()
try:
result = self.transfer(task)
with self._lock:
self.results.append(result)
finally:
self._rate_limiter.release()
self.task_queue.task_done()
# 填充队列
for file_info in file_list:
task = TransferTask(
file_id=file_info.get('id', file_info['path']),
source_path=file_info['path'],
dest_path=file_info['path'],
size=file_info['size'],
expected_md5=file_info.get('md5'),
priority=file_info.get('priority', 0)
)
self.task_queue.put((task.priority, task))
# 启动worker
threads = []
for _ in range(self.workers):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for t in threads:
t.join()
return self.results
第五章 迁移后的收尾工作
5.1 必须做的十项检查
迁移完成后,不要急着庆祝。以下十项检查必须逐项确认:
- 文件数量对比:源和目标的文件总数是否一致?
- 文件大小对比:总字节数是否一致?
- 随机抽样校验:随机抽取5%的文件,逐一比对MD5
- 权限验证:抽查10个不同文件夹的权限是否正确
- 目录结构验证:目录树是否完整,有没有文件夹丢失?
- 符号链接验证:符号链接是否仍然有效?
- 共享链接验证:原有共享链接是否还能访问?
- 搜索功能验证:搜索几个文件名,看能否找到
- 版本历史验证:历史版本数量是否完整?
- 最后写入时间验证:文件修改时间是否保留?
5.2 切换后的监控
切换完成后,还要持续监控72小时:
- 错误率:是否有大量404或500?
- 延迟:新增文件的访问延迟是否正常?
- 带宽:用户访问是否造成带宽异常?
- 存储:目标系统存储是否正常增长(新增文件正常写入)?
第六章 成本优化:怎么把迁移费用砍掉一半
6.1 迁移前:数据清理
前面提到,迁移前先做数据清理。这不是可选项,而是必选项。
我建议用以下脚本做预迁移扫描:
def pre_migration_audit(root_path, client, retention_days=365):
"""
预迁移审计:找出可清理的数据
返回:可清理文件列表和建议清理量
"""
large_files = [] # 大于1GB的文件
duplicate_files = [] # 重复文件
old_files = [] # 超过保留期的文件
md5_map = {}
for file_info in client.walk_files(root_path):
# 过期文件
if file_info['modified_at'] < datetime.now() - timedelta(days=retention_days):
old_files.append(file_info)
# 大文件
if file_info['size'] > 1024**3:
large_files.append(file_info)
# 重复文件(根据MD5)
if file_info.get('md5'):
if file_info['md5'] in md5_map:
duplicate_files.append({
'original': md5_map[file_info['md5']],
'duplicate': file_info
})
else:
md5_map[file_info['md5']] = file_info
total_cleanable = sum(f['size'] for f in old_files)
total_cleanable += sum(f['size'] for f in large_files if f in old_files)
return {
'old_files_count': len(old_files),
'old_files_size': sum(f['size'] for f in old_files),
'duplicate_files_count': len(duplicate_files),
'duplicate_files_size': sum(f['size'] for f in duplicate_files),
'large_files_count': len(large_files),
'estimated_reduction_pct': total_cleanable / sum(f['size'] for f in [old_files[0]] if old_files) * 100
if old_files else 0
}
6.2 传输中:带宽优化
- 启用压缩传输:在网络带宽有限时,开启传输压缩可节省30-50%带宽,但会增加CPU消耗
- 分时段传输:利用夜间低峰期迁移,避开业务高峰
- 增量优先:先迁移增量部分(通常是近期数据),历史数据可以慢慢迁
结语
数据迁移不是搬家,而是做一次器官移植。搬家的东西可以扔,器官移植的每一条血管都要接上。
本文的核心教训可以归纳为三条:
- 迁移前要审计:清理数据、摸清家底、制定方案。三件事没做完就动手,早晚出事。
- 迁移中要校验:不要相信任何"传输成功"的默认结论,每一个字节都要验证。
- 迁移后要监控:切换完成不是终点,72小时内的异常监控才是验收标准。
如果你正在准备一次迁移,建议把本文的代码模块作为基础框架,根据你的实际环境做调整。迁移工具本身要经过充分测试——至少在测试环境里完整跑通一次,再上生产。
最后一句话:永远保持对数据的敬畏之心。
本文档由虾皮维护 | 写作日期:2026-05-08 | 字数:约24000字