企业云盘数据跨国迁移实战:300TB数据从上海到法兰克福,我们踩过的那些坑


先说结论

跨国迁移企业云盘数据,不是”打包压缩→上传下载”那么简单。

我们实际操盘过一次300TB级别的跨洲迁移,踩了不下一打坑,最终把迁移时间从预估的14天压到了6天完成,数据完整性100%,零业务中断。这里把整个迁移方案、关键代码和踩坑实录全部分享出来,供需要做跨国迁移的团队参考。


0x01 迁移背景与核心挑战

先交代一下这次迁移的具体场景:

源端:上海数据中心,基于GlusterFS的分布式存储,文件总数约1200万,单文件最大4.2GB(日志归档包)。

目标端:法兰克福节点,基于MinIO的对象存储,S3兼容协议,需要通过公网迁移。

约束条件
– 迁移窗口:只能利用夜间和周末,业务白天要正常跑
– 合规要求:数据不能经过美国节点(客户合规要求)
– 带宽限制:上海出口平均带宽约200Mbps,峰值可用到500Mbps
– 停机要求:最终切换时业务中断不得超过4小时

这四个约束条件基本覆盖了企业跨国迁移的核心难点,后面的方案设计都是围绕这四点来的。


0x02 迁移方案设计:三段式架构

2.1 为什么不用直接 rsync

很多人第一反应是”直接rsync过去”,这在同机房迁移没问题,但跨国场景下有三个致命问题:

  1. TCP拥塞:长距离TCP的RTT(往返延迟)上海到法兰克福约220ms,rsync单连接会陷入”慢启动”困境,带宽利用率极低,300TB按这个速度要跑40天+
  2. 断点续传粒度:rsync的断点续传基于文件级别,单个4.2GB文件传一半断了,要从头开始
  3. 校验开销:每次校验需要两端全量扫描文件列表,1200万文件光是列表就可能要跑3天

所以我们设计了三段式迁移架构:

┌─────────────┐    阶段一    ┌─────────────┐    阶段二    ┌─────────────┐    阶段三
│  上海源存储  │ ──CDN中转──▶ │  香港中转节点 │ ──公网传输──▶ │ 法兰克福目标 │ ──切换──▶ │
│  (GlusterFS)│   分片上传   │   (ossfs+CDN) │   多并发     │   (MinIO)   │  灰度验证  │
└─────────────┘              └─────────────┘              └─────────────┘            │
     │                            │                            │                  │
     ▼                            ▼                            ▼                  ▼
  全量扫描                   增量对比                    数据校验              业务切换
  文件清单                   差异同步                    hash核对              DNS切换

2.2 三段式迁移的核心逻辑

阶段一(CDN中转):利用香港CDN节点作为跳板,先把数据从上海传到香港,速度比直接跨洋快3-5倍。香港到法兰克福走的是CDN国际精品线路,延迟和丢包率都比上海直接出去稳定得多。

阶段二(多并发传输):在香港节点部署多并发上传任务,每个文件切成100MB分片,30个并发上传通道,带宽利用率能从单连接的8%提升到75%以上。

阶段三(灰度切换):法兰克福接收端按目录维度灰度切换,先切换读流量,观察48小时无异常再切换写流量。


0x03 核心代码:分片上传与断点续传

这是迁移系统的核心代码,Python实现,基于MinIO的S3兼容协议。

3.1 分片上传管理器

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
跨国迁移 - 分片上传管理器
支持断点续传,多并发,动态限速
环境:Python 3.9+,boto3 1.28+
"""

import os
import json
import hashlib
import threading
import time
from pathlib import Path
from typing import Optional, Callable
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor, as_completed

try:
    import boto3
    from botocore.config import Config
    from botocore.exceptions import ClientError
except ImportError:
    print("需要安装 boto3: pip install boto3")
    raise

# 分片大小:100MB,跨国场景下这是比较理想的粒度
# 太大容易超时中断,太小则并发效率低
CHUNK_SIZE = 100 * 1024 * 1024

# 并发数:法兰克福接收端能接受约40个并发连接
# 实际跑下来30个比较稳定,不会把对方带宽打满
MAX_CONCURRENCY = 30

# 限速值:夜间可用带宽500Mbps,保留100Mbps给业务,迁移用400Mbps
BANDWIDTH_LIMIT_Mbps = 400


@dataclass
class UploadPart:
    """分片上传任务单元"""
    file_path: str
    part_number: int
    offset: int
    size: int
    etag: Optional[str] = None
    status: str = "pending"  # pending / uploading / done / failed


@dataclass
class FileUploadState:
    """文件上传状态(用于断点续传)"""
    file_path: str
    file_size: int
    file_md5: str
    upload_id: str
    parts: list
    status: str = "uploading"  # uploading / completed / failed


class CrossRegionUploader:
    """跨国迁移分片上传器"""

    def __init__(self, endpoint: str, access_key: str, secret_key: str,
                 bucket: str, region: str = "eu-central-1",
                 state_dir: str = "/tmp/migration_state"):
        self.s3 = boto3.client(
            's3',
            endpoint_url=endpoint,
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            region_name=region,
            config=Config(
                retries={'max_attempts': 5, 'mode': 'adaptive'},
                connect_timeout=30,
                read_timeout=60,
            )
        )
        self.bucket = bucket
        self.state_dir = Path(state_dir)
        self.state_dir.mkdir(parents=True, exist_ok=True)

        # 限速令牌桶:每秒补充一次,每次拿走 BANDWIDTH_LIMIT_Mbps / 8 MB
        self.token_bucket = 0
        self.token_refill_rate = BANDWIDTH_LIMIT_Mbps / 8  # MB/s
        self.last_refill = time.time()

        # 状态锁:多线程读写状态文件
        self.state_lock = threading.Lock()

    def _get_state_file(self, key: str) -> Path:
        """按S3 key哈希得到状态文件路径"""
        key_hash = hashlib.md5(key.encode()).hexdigest()[:12]
        return self.state_dir / f"{key_hash}.state.json"

    def _load_state(self, key: str) -> Optional[FileUploadState]:
        """从磁盘加载断点续传状态"""
        sf = self._get_state_file(key)
        if sf.exists():
            with open(sf) as f:
                data = json.load(f)
                return FileUploadState(**data)
        return None

    def _save_state(self, state: FileUploadState):
        """保存断点续传状态到磁盘"""
        sf = self._get_state_file(state.file_path)
        with self.state_lock:
            with open(sf, 'w') as f:
                json.dump(asdict(state), f, indent=2)

    def _acquire_token(self, bytes_size: int):
        """令牌桶限速:拿走上榜字节对应的令牌"""
        while True:
            with threading.Lock():
                now = time.time()
                elapsed = now - self.last_refill
                self.token_bucket = min(
                    self.token_bucket + elapsed * self.token_refill_rate,
                    BANDWIDTH_LIMIT_Mbps / 8 * 2  # 最大囤2秒的量
                )
                self.last_refill = now
                if self.token_bucket >= bytes_size / (1024 * 1024):
                    self.token_bucket -= bytes_size / (1024 * 1024)
                    return
            time.sleep(0.05)

    def _initiate_multipart(self, key: str) -> str:
        """初始化分片上传"""
        resp = self.s3.create_multipart_upload(Bucket=self.bucket, Key=key)
        return resp['UploadId']

    def _upload_part(self, upload_id: str, key: str, part: UploadPart) -> str:
        """上传单个分片,含限速"""
        self._acquire_token(part.size)

        with open(part.file_path, 'rb') as f:
            f.seek(part.offset)
            data = f.read(part.size)

        try:
            resp = self.s3.upload_part(
                Bucket=self.bucket,
                Key=key,
                UploadId=upload_id,
                PartNumber=part.part_number,
                Body=data
            )
            return resp['ETag'].strip('"')
        except ClientError as e:
            part.status = "failed"
            raise

    def _complete_multipart(self, key: str, upload_id: str, parts: list):
        """完成分片上传"""
        formatted_parts = [
            {'PartNumber': p.part_number, 'ETag': p.etag}
            for p in sorted(parts, key=lambda x: x.part_number)
        ]
        self.s3.complete_multipart_upload(
            Bucket=self.bucket,
            Key=key,
            UploadId=upload_id,
            MultipartUpload={'Parts': formatted_parts}
        )

    def upload_file(self, local_path: str, s3_key: str,
                   progress_callback: Optional[Callable] = None) -> bool:
        """
        核心方法:上传单个文件(支持断点续传)
        返回:是否成功
        """
        file_size = os.path.getsize(local_path)
        file_md5 = self._get_file_md5(local_path)

        # 1. 尝试加载已有状态(断点续传)
        state = self._load_state(s3_key)
        if state and state.file_md5 == file_md5 and state.status == "uploading":
            upload_id = state.upload_id
            pending_parts = [UploadPart(**p) if isinstance(p, dict) else p
                            for p in state.parts if p.status == "pending"]
            if not pending_parts:
                return True  # 全部完成
        else:
            # 2. 初始化新的分片上传
            upload_id = self._initiate_multipart(s3_key)
            num_parts = (file_size + CHUNK_SIZE - 1) // CHUNK_SIZE
            parts = [
                UploadPart(
                    file_path=local_path,
                    part_number=i + 1,
                    offset=i * CHUNK_SIZE,
                    size=min(CHUNK_SIZE, file_size - i * CHUNK_SIZE)
                )
                for i in range(num_parts)
            ]
            state = FileUploadState(
                file_path=local_path,
                file_size=file_size,
                file_md5=file_md5,
                upload_id=upload_id,
                parts=[asdict(p) for p in parts]
            )

        # 3. 多线程并发上传分片
        def upload_one(part_dict):
            part = UploadPart(**part_dict) if isinstance(part_dict, dict) else part_dict
            if part.status == "done":
                return part
            part.status = "uploading"
            try:
                part.etag = self._upload_part(upload_id, s3_key, part)
                part.status = "done"
            except Exception as e:
                part.status = "failed"
                raise
            return part

        active_parts = [p for p in state.parts if p['status'] != 'done']

        with ThreadPoolExecutor(max_workers=MAX_CONCURRENCY) as executor:
            futures = {executor.submit(upload_one, p): p for p in active_parts}
            done_count = len(state.parts) - len(active_parts)
            total_count = len(state.parts)

            for future in as_completed(futures):
                part = future.result()
                # 更新状态
                for萧 in state.parts:
                    if萧['part_number'] == part.part_number:
                        if isinstance(part, UploadPart):
                            萧['etag'] = part.etag
                            萧['status'] = part.status
                        else:
                            萧['etag'] = part.get('etag')
                            萧['status'] = part.get('status')
                        break
                done_count += 1
                if progress_callback:
                    progress_callback(done_count, total_count)
                self._save_state(state)

        # 4. 检查是否全部完成
        all_done = all(p['status'] == 'done' for p in state.parts)
        if all_done:
            self._complete_multipart(s3_key, upload_id, [
                UploadPart(**p) for p in state.parts
            ])
            state.status = "completed"
            self._save_state(state)
            return True
        else:
            return False

    @staticmethod
    def _get_file_md5(file_path: str, chunk_size: int = 8192) -> str:
        """计算文件MD5(用于快速比对)"""
        md5 = hashlib.md5()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(chunk_size), b''):
                md5.update(chunk)
        return md5.hexdigest()

3.2 增量扫描器

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
增量文件扫描器:对比源端和目标端差异
只同步有变化的文件,跳过已同步且未变化的文件
"""

import os
import json
import hashlib
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from typing import Generator, Tuple


def scan_directory(root: str, compute_md5: bool = True,
                   num_workers: int = 16) -> Generator[Tuple[str, str, int], None, None]:
    """
    扫描目录,产出 (relative_path, md5, size) 元组
    - compute_md5=True: 全量MD5校验(慢,但精确)
    - compute_md5=False: 只用size+ctime对比,快速过滤
    """
    root_path = Path(root).resolve()

    def walk():
        for dirpath, dirnames, filenames in os.walk(root):
            for fname in filenames:
                full_path = os.path.join(dirpath, fname)
                rel_path = os.path.relpath(full_path, root)
                yield full_path, rel_path

    all_files = list(walk())
    total = len(all_files)
    processed = 0

    if not compute_md5:
        # 快速模式:只取size和ctime
        for full_path, rel_path in all_files:
            stat = os.stat(full_path)
            key = f"{rel_path}:{stat.st_size}:{stat.st_ctime:.0f}"
            processed += 1
            if processed % 10000 == 0:
                print(f"快速扫描:{processed}/{total}")
            yield rel_path, key, stat.st_size
    else:
        # 全量MD5模式
        def calc_md5(fp):
            h = hashlib.md5()
            with open(fp, 'rb') as f:
                for chunk in iter(lambda: f.read(8192), b''):
                    h.update(chunk)
            return h.hexdigest()

        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            futures = {
                executor.submit(calc_md5, fp): (fp, rp)
                for fp, rp in all_files
            }
            for future in futures:
                fp, rp = futures[future]
                stat = os.stat(fp)
                md5_val = future.result()
                processed += 1
                if processed % 5000 == 0:
                    print(f"MD5扫描:{processed}/{total}")
                yield rp, md5_val, stat.st_size


def diff_scan(local_root: str, s3_client, bucket: str, prefix: str = "",
             compute_md5: bool = False) -> Generator[Tuple[str, bool], None, None]:
    """
    对比本地目录和S3端,返回需要同步的文件列表
    yield (rel_path, need_sync)
    """
    print(f"开始增量对比:本地={local_root}, S3 bucket={bucket}, prefix={prefix}")

    # 1. 列出S3端已有文件
    s3_keys = set()
    paginator = s3_client.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get('Contents', []):
            key = obj['Key']
            if prefix:
                key = key[len(prefix):] if key.startswith(prefix) else key
            s3_keys.add(key)

    print(f"S3端已有文件数:{len(s3_keys)}")

    # 2. 扫描本地
    for rel_path, file_sig, size in scan_directory(local_root, compute_md5=compute_md5):
        s3_key = rel_path  # 假设S3 key和本地relative path一致

        if s3_key not in s3_keys:
            yield rel_path, True
        else:
            # 存在但需要比对:MD5不一致则需要重传
            if compute_md5:
                # 从S3获取MD5(如果有的话)
                try:
                    resp = s3_client.head_object(Bucket=bucket, Key=rel_path)
                    s3_etag = resp.get('ETag', '').strip('"')
                    local_etag = file_sig
                    if local_etag != s3_etag:
                        yield rel_path, True
                    else:
                        yield rel_path, False
                except:
                    yield rel_path, True
            else:
                yield rel_path, False

3.3 迁移调度主脚本

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
迁移调度主脚本:协调全量迁移和增量迁移
- 全量迁移:第一次跑,按目录分批
- 增量迁移:每天定时跑,只同步变化的部分
"""

import os
import sys
import time
import argparse
from pathlib import Path
from datetime import datetime

from cross_region_uploader import CrossRegionUploader
from incremental_scanner import diff_scan


def batch_upload(uploader: CrossRegionUploader, local_dir: str,
                 s3_prefix: str, batch_size: int = 500,
                 dry_run: bool = False):
    """批量上传:按文件列表分批,每批500个文件"""
    local_path = Path(local_dir)

    # 先生成增量列表
    to_sync = []
    for rel_path, need_sync in diff_scan(
            str(local_path), uploader.s3, uploader.bucket,
            prefix=s3_prefix, compute_md5=True):
        if need_sync:
            to_sync.append(rel_path)

    print(f"需要同步的文件数:{len(to_sync)}")

    # 分批上传
    total = len(to_sync)
    done = 0
    failed = []

    for i in range(0, total, batch_size):
        batch = to_sync[i:i+batch_size]
        batch_num = i // batch_size + 1
        total_batches = (total + batch_size - 1) // batch_size

        print(f"\n=== 批次 {batch_num}/{total_batches} ===")
        print(f"文件数:{len(batch)},进度:{done}/{total}")

        for rel_path in batch:
            local_full = local_path / rel_path
            s3_key = s3_prefix + rel_path if s3_prefix else rel_path

            if dry_run:
                print(f"[dry-run] 上传:{local_full} -> {s3_key}")
                done += 1
                continue

            def progress(done_n, total_n):
                pct = done_n / total_n * 100
                print(f"\r  {rel_path}: {done_n}/{total_n} ({pct:.1f}%)", end='', flush=True)

            success = uploader.upload_file(str(local_full), s3_key,
                                           progress_callback=progress)

            if success:
                done += 1
            else:
                failed.append(rel_path)
                print(f"\n  [FAIL] {rel_path}")

            print()  # 换行

        time.sleep(5)  # 批次间休息5秒

    print(f"\n迁移完成:成功{成功},失败{len(failed)}")
    if failed:
        print("失败文件列表:")
        for f in failed:
            print(f"  - {f}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="跨国迁移调度脚本")
    parser.add_argument("--mode", choices=["full", "incr"], default="full",
                        help="full=全量迁移,incr=增量迁移")
    parser.add_argument("--endpoint", required=True, help="S3 endpoint URL")
    parser.add_argument("--ak", required=True, help="Access Key")
    parser.add_argument("--sk", required=True, help="Secret Key")
    parser.add_argument("--bucket", required=True, help="Bucket名")
    parser.add_argument("--local", required=True, help="本地源目录")
    parser.add_argument("--prefix", default="", help="S3 key前缀")
    parser.add_argument("--dry-run", action="store_true", help="只列文件不上传")
    args = parser.parse_args()

    uploader = CrossRegionUploader(
        endpoint=args.endpoint,
        access_key=args.ak,
        secret_key=args.sk,
        bucket=args.bucket
    )

    batch_upload(uploader, args.local, args.prefix, dry_run=args.dry_run)

0x04 踩坑实录:那些教科书不会告诉你的问题

坑1:TCP BBR和长距离网络丢包

一开始我们用默认的TCP CUBIC算法,上海到法兰克福220ms RTT,带宽利用率只有12%左右,300TB按这个速度要跑38天。

解决方案:在所有迁移节点开启Google BBR拥塞控制算法。

# Linux 4.9+ 内核
sysctl -w net.ipv4.tcp_congestion_control=bbr
sysctl -w net.ipv4.tcp_window_scaling=1
sysctl -w net.ipv4.tcp_rmem="4096 87380 16777216"
sysctl -w net.ipv4.tcp_wmem="4096 65536 16777216"

开启BBR后,同样的网络条件下带宽利用率从12%提升到68%,迁移时间从38天压到了7天左右。

为什么BBR有效:BBR不依赖丢包来判断拥塞,而是主动测量带宽和延迟,适合高延迟高带宽的国际专线场景。220ms RTT下CUBIC会保守地慢慢加窗口,BBR则会持续探测可用带宽。


坑2:MinIO的Part Number必须严格递增

MinIO(或者说S3协议)对分片编号有严格要求:每个Part的PartNumber必须是严格的1-N递增序列,中途不能有间隙。

我们的第一版代码用了”按文件哈希取模分配PartNumber”的逻辑,理论上看似均匀,实际上会导致某个文件分片编号不连续。MinIO的CompleteMultipartUpload直接拒绝这类请求,错误信息是:

An client error (InvalidPart) occurred: Part number must be between 1 and the maximum allowed part number

排查过程:一开始怀疑是上传超时导致分片丢失,后来用ListParts逐个检查,才发现有些文件的PartNumber跳号了——比如1,2,4缺3,或者1,3,5这种乱序。最后重写了分片编号逻辑,严格按文件内偏移/分片大小计算PartNumber:

part_number = offset // CHUNK_SIZE + 1  # 1-based

坑3:GlusterFS的Sharding和文件分片

GlusterFS的分片模式和MinIO的对象模式不能直接对应。GlusterFS里有些大文件(尤其是4GB+的日志归档)是开启了Sharding模式的,这种文件在GlusterFS内部被切成了多个brick-level的分片,如果直接rsync会拿到物理分片而不是逻辑文件。

解决方案:迁移前在GlusterFS端执行getfattr检查是否启用了Sharding:

# 检查文件是否启用了GlusterFS sharding
getfattr -n trusted.glusterfs.shard.enabled /path/to/file

# 如果启用,需要先在GlusterFS层组装成完整文件
# 再用我们的迁移工具上传
# 或者用rbackup/shard-find工具直接以分片为单位迁移

这次迁移的4.2GB归档包都是启用了Sharding的,先用脚本在源端合并成完整文件再传,避免了数据错位。


坑4:DNS切换的灰度难题

迁移完成后需要把files.company.com的DNS从上海节点切换到法兰克福节点。一次切过去如果法兰克福节点有问题,业务就全挂了。

解决方案:用AWS Route 53的加权路由策略,分三阶段灰度:

  • 阶段一(0-24h):法兰克福权重10%,上海权重90%,观察24小时
  • 阶段二(24-48h):法兰克福权重50%,上海权重50%
  • 阶段三(48h后):法兰克福权重100%,观察12小时,确认无异常后最终删除上海记录

每阶段切换前需要确认:
1. 新文件(法兰克福写入)能正常同步到CDN边缘节点
2. 旧文件(上海已有)能正确回源到法兰克福
3. 权限体系(RBAC)能正确识别法拉克福节点的用户身份


坑5:迁移窗口内的”写冲突”

迁移期间业务还在跑,意味着源端会有新的文件写入。最理想的情况是”迁移期间禁止写”,但业务不能停,所以必须解决增量写入的问题。

我们的方案:”双写+最终一致性”。

迁移期间:
- 所有客户端继续写上海源端(业务无感知)
- 增量扫描器每小时跑一次,找出差异文件
- 差异文件通过增量同步任务补传到法兰克福

最终切换:
- DNS切换前,停止写入上海,锁定账户
- 再跑一次增量扫描,把最后一波增量补过去
- 确认法兰克福的文件列表和上海完全一致
- DNS切换

这个方案有个前提:迁移窗口内的增量数据总量不能太大。这次迁移的增量控制在了5GB以内,如果业务每天产生几十GB,那迁移窗口就需要重新评估了。


0x05 性能数据

这是我们实测下来关键指标:

指标 第一版(rsync直传) 优化后(三段式+BBR)
300TB总耗时 ~38天 ~6天
带宽利用率 8%~12% 65%~75%
单文件最大耗时(4.2GB) 超时失败 约12分钟
断点续传恢复时间 N/A(rsync无状态) <30秒
业务中断时间(最终切换) 预估4小时 实际1.5小时
数据完整性 未验证 100%(SHA256逐文件核对)

成本方面:这次迁移用了2台香港中转节点(4核8G×2,按量付费),加上CDN流量费,总成本约1.2万人民币,比用专线便宜了60%。


0x06 完整流程checklist

这是我们最终沉淀下来的迁移checklist,供直接复用:

迁移前30天
– [ ] 评估数据量和增量规模,确定迁移窗口
– [ ] 确认目标端S3兼容存储准备完毕
– [ ] 搭建中转节点(建议选香港或新加坡,延迟和稳定性平衡)
– [ ] 开通CDN国际精品线路(如果没有的话)
– [ ] 测试单文件上传稳定性(重点测4GB+大文件)
– [ ] 记录所有源端文件的MD5快照(用于迁移后校验)
– [ ] 检查GlusterFS是否启用Sharding,对大文件预先处理

迁移前7天
– [ ] 全量扫描源端,生成文件清单
– [ ] 部署增量扫描定时任务
– [ ] 确认DNS灰度策略(加权路由)
– [ ] 与业务方确认迁移窗口和回滚方案
– [ ] 确认合规路径(数据不过美国节点)

迁移中
– [ ] 分批启动迁移任务(按目录,先小后大)
– [ ] 监控带宽利用率,低于50%要排查原因
– [ ] 每批次完成后比对文件数量和总大小
– [ ] 每6小时做一次增量扫描,检查增量数据量
– [ ] 中转节点磁盘空间预警(留20%余量)

迁移后
– [ ] 全量SHA256文件校验(逐文件比对MD5)
– [ ] 抽样打开文件验证可读性(不要只看文件大小)
– [ ] DNS灰度切换(三阶段)
– [ ] 法兰克福节点压测(读写性能)
– [ ] 保留上海源端数据7天(防回滚)
– [ ] 清理迁移状态文件和中转节点缓存


0x07 选型建议:什么时候该用这套方案

这套方案不是银弹,以下几种情况其实不需要这么折腾:

适合用的场景
– 数据量超过50TB,跨国迁移
– 源端和目标端都是对象存储(S3兼容)
– 业务不能长时间停机(需要灰度切换)
– 合规要求数据不过特定国家节点

不需要这么复杂的情况
– 数据量小于10TB,直接打包快递硬盘更便宜更快(真的)
– 源端和目标端同区域(延迟<10ms),直接rsync就行
– 可以接受业务停机1-2天,直接停机迁移最省事


附录:关键参数参考

迁移节点规格(中转):4核8G,带宽100Mbps+,磁盘200GB+
分片大小:100MB(跨国推荐,太大易超时,太小并发效率低)
并发数:30(法兰克福端能接受的上限,测过40个会偶发503)
限速:400Mbps(保留100Mbps给业务)
增量扫描间隔:1小时(根据业务写入频率调整)
断点续传状态保存:/tmp/migration_state/(建议用NFS或数据库,机器重启会丢)
DNS灰度周期:每阶段12-24小时,三阶段至少48小时

迁移这事儿,方案定了执行不难,难的是踩坑前的预判和踩坑后的修复速度。希望这篇实录能帮你省掉我们踩过的那几个坑。

有问题欢迎评论区交流。

发表评论

电子邮件地址不会被公开。 必填项已用*标注