手機端推播通知系統設計:從 APNs/FCM 到大規模分發架構


引言:推播通知的重要性

在行動應用時代,推播通知(Push Notification)是與使用者保持連結的關鍵渠道。無論是即時訊息、交易提醒、行銷活動,都依賴推播系統即時觸達使用者。

設計一個高效的推播系統需要考慮:

  • 即時性:訊息延遲控制在秒級
  • 可靠性:確保訊息送達
  • 可擴展性:支援億級設備
  • 效率:合理使用系統資源

本文將從底層原理到系統設計,全面解析推播通知系統的實現。


第一章:推播通知原理概覽

1.1 推播通知流程

sequenceDiagram
    participant App as Mobile App
    participant Device as Device OS
    participant PNS as Push Notification Service<br/>(APNs/FCM)
    participant Server as App Server
    participant DB as Token Storage

    App->>Device: 註冊推播
    Device->>PNS: 請求 Device Token
    PNS-->>Device: 返回 Device Token
    Device-->>App: 傳遞 Token
    App->>Server: 上傳 Token
    Server->>DB: 存儲 Token

    Note over Server, PNS: 發送推播

    Server->>PNS: 推送訊息 + Token
    PNS->>Device: 傳遞訊息
    Device->>App: 顯示通知

1.2 主要推送服務

graph TB
    subgraph "iOS Ecosystem"
        APNS[Apple Push Notification Service<br/>APNs]
    end

    subgraph "Android Ecosystem"
        FCM[Firebase Cloud Messaging<br/>FCM]
        HMS[Huawei Mobile Services<br/>HMS Push]
        XIAOMI[Mi Push]
        OPPO[OPPO Push]
        VIVO[VIVO Push]
    end

    subgraph "Cross-Platform"
        WEB[Web Push<br/>VAPID]
    end

    subgraph "Third-Party Services"
        JPUSH[極光推送]
        GETUI[個推]
        UMENG[友盟推送]
    end

第二章:Apple Push Notification Service (APNs)

2.1 APNs 架構

graph TB
    subgraph "Provider (Your Server)"
        PS[Push Service]
        TLS[TLS Connection]
        HTTP2[HTTP/2 Protocol]
    end

    subgraph "APNs"
        GW[APNs Gateway]
        QUEUE[Message Queue]
        ROUTER[Device Router]
    end

    subgraph "Apple Device"
        DAEMON[apsd Daemon]
        APP[Your App]
    end

    PS --> TLS
    TLS --> HTTP2
    HTTP2 --> GW
    GW --> QUEUE
    QUEUE --> ROUTER
    ROUTER --> DAEMON
    DAEMON --> APP

2.2 APNs 連接與認證

兩種認證方式:

graph LR
    subgraph "Certificate-Based (傳統)"
        CERT[.p12 Certificate]
        CERT --> |每年更新| APNS1[APNs]
    end

    subgraph "Token-Based (推薦)"
        KEY[.p8 Private Key]
        KEY --> |生成| JWT[JWT Token]
        JWT --> |每小時刷新| APNS2[APNs]
    end

Token 認證實現:

import jwt
import time
from datetime import datetime, timedelta

class APNsTokenManager:
    def __init__(self, key_id: str, team_id: str, private_key_path: str):
        self.key_id = key_id
        self.team_id = team_id
        with open(private_key_path, 'r') as f:
            self.private_key = f.read()
        self.token = None
        self.token_time = None

    def get_token(self) -> str:
        # Token 有效期 1 小時,提前 10 分鐘刷新
        if (self.token is None or
            self.token_time is None or
            datetime.now() - self.token_time > timedelta(minutes=50)):
            self._generate_token()
        return self.token

    def _generate_token(self):
        now = int(time.time())
        payload = {
            'iss': self.team_id,    # Issuer (Team ID)
            'iat': now,              # Issued At
        }
        headers = {
            'alg': 'ES256',          # Algorithm
            'kid': self.key_id,      # Key ID
        }
        self.token = jwt.encode(
            payload,
            self.private_key,
            algorithm='ES256',
            headers=headers
        )
        self.token_time = datetime.now()

2.3 APNs HTTP/2 推送

import httpx
import asyncio
from dataclasses import dataclass
from typing import Optional
import json

@dataclass
class APNsPayload:
    alert: Optional[str] = None
    title: Optional[str] = None
    body: Optional[str] = None
    badge: Optional[int] = None
    sound: str = "default"
    category: Optional[str] = None
    thread_id: Optional[str] = None
    custom_data: Optional[dict] = None

    def to_dict(self) -> dict:
        aps = {}

        if self.alert:
            aps['alert'] = self.alert
        elif self.title or self.body:
            aps['alert'] = {}
            if self.title:
                aps['alert']['title'] = self.title
            if self.body:
                aps['alert']['body'] = self.body

        if self.badge is not None:
            aps['badge'] = self.badge
        if self.sound:
            aps['sound'] = self.sound
        if self.category:
            aps['category'] = self.category
        if self.thread_id:
            aps['thread-id'] = self.thread_id

        payload = {'aps': aps}

        if self.custom_data:
            payload.update(self.custom_data)

        return payload


class APNsClient:
    SANDBOX_URL = "https://api.sandbox.push.apple.com"
    PRODUCTION_URL = "https://api.push.apple.com"

    def __init__(self, token_manager: APNsTokenManager, sandbox: bool = False):
        self.token_manager = token_manager
        self.base_url = self.SANDBOX_URL if sandbox else self.PRODUCTION_URL
        self.client = httpx.AsyncClient(http2=True)

    async def send(
        self,
        device_token: str,
        payload: APNsPayload,
        topic: str,  # Bundle ID
        priority: int = 10,
        expiration: int = 0,
        collapse_id: Optional[str] = None,
        push_type: str = "alert"
    ) -> dict:
        url = f"{self.base_url}/3/device/{device_token}"

        headers = {
            'authorization': f'bearer {self.token_manager.get_token()}',
            'apns-topic': topic,
            'apns-push-type': push_type,
            'apns-priority': str(priority),
            'apns-expiration': str(expiration),
        }

        if collapse_id:
            headers['apns-collapse-id'] = collapse_id

        body = json.dumps(payload.to_dict())

        response = await self.client.post(url, headers=headers, content=body)

        if response.status_code == 200:
            return {
                'success': True,
                'apns_id': response.headers.get('apns-id')
            }
        else:
            error_body = response.json() if response.content else {}
            return {
                'success': False,
                'status_code': response.status_code,
                'reason': error_body.get('reason'),
                'timestamp': error_body.get('timestamp')
            }

    async def send_batch(
        self,
        notifications: list[tuple[str, APNsPayload]],
        topic: str
    ) -> list[dict]:
        """並行發送多個通知"""
        tasks = [
            self.send(token, payload, topic)
            for token, payload in notifications
        ]
        return await asyncio.gather(*tasks)

    async def close(self):
        await self.client.aclose()


# 使用示例
async def main():
    token_manager = APNsTokenManager(
        key_id="ABC123DEFG",
        team_id="TEAM123456",
        private_key_path="./AuthKey_ABC123DEFG.p8"
    )

    client = APNsClient(token_manager, sandbox=True)

    payload = APNsPayload(
        title="新訊息",
        body="你有一條新訊息",
        badge=1,
        sound="default",
        custom_data={'message_id': '12345'}
    )

    result = await client.send(
        device_token="a1b2c3d4e5f6...",
        payload=payload,
        topic="com.example.app"
    )

    print(result)
    await client.close()

asyncio.run(main())

2.4 APNs 錯誤處理

class APNsError:
    """APNs 錯誤代碼對照"""
    ERRORS = {
        400: {
            'BadCollapseId': '無效的 collapse ID',
            'BadDeviceToken': '無效的 device token',
            'BadExpirationDate': '無效的過期時間',
            'BadMessageId': '無效的訊息 ID',
            'BadPriority': '無效的優先級',
            'BadTopic': '無效的 topic',
            'DeviceTokenNotForTopic': 'Token 與 Topic 不匹配',
            'DuplicateHeaders': '重複的 HTTP 標頭',
            'IdleTimeout': '連接閒置超時',
            'InvalidPushType': '無效的推送類型',
            'MissingDeviceToken': '缺少 device token',
            'MissingTopic': '缺少 topic',
            'PayloadEmpty': 'Payload 為空',
            'TopicDisallowed': 'Topic 被禁用',
        },
        403: {
            'BadCertificate': '無效的證書',
            'BadCertificateEnvironment': '證書環境不匹配',
            'ExpiredProviderToken': 'Provider token 已過期',
            'Forbidden': '禁止訪問',
            'InvalidProviderToken': '無效的 provider token',
            'MissingProviderToken': '缺少 provider token',
        },
        404: {
            'BadPath': '無效的路徑',
        },
        405: {
            'MethodNotAllowed': '方法不允許',
        },
        410: {
            'Unregistered': '設備已取消註冊',  # 重要:需要刪除 token
        },
        413: {
            'PayloadTooLarge': 'Payload 過大(最大 4KB)',
        },
        429: {
            'TooManyProviderTokenUpdates': 'Token 更新過於頻繁',
            'TooManyRequests': '請求過多',
        },
        500: {
            'InternalServerError': 'APNs 內部錯誤',
        },
        503: {
            'ServiceUnavailable': '服務暫時不可用',
            'Shutdown': 'APNs 正在關閉連接',
        },
    }

    @classmethod
    def should_retry(cls, status_code: int, reason: str) -> bool:
        """判斷是否應該重試"""
        if status_code in (429, 500, 503):
            return True
        return False

    @classmethod
    def should_remove_token(cls, status_code: int, reason: str) -> bool:
        """判斷是否應該移除 token"""
        if status_code == 410:  # Unregistered
            return True
        if status_code == 400 and reason in ('BadDeviceToken', 'DeviceTokenNotForTopic'):
            return True
        return False

第三章:Firebase Cloud Messaging (FCM)

3.1 FCM 架構

graph TB
    subgraph "Your Backend"
        SERVER[App Server]
        FCM_SDK[Firebase Admin SDK]
    end

    subgraph "Firebase"
        FCM_BE[FCM Backend]
        TOPIC[Topic Messaging]
        DEVICE[Device Groups]
    end

    subgraph "Client"
        ANDROID[Android App]
        IOS[iOS App]
        WEB[Web App]
    end

    SERVER --> FCM_SDK
    FCM_SDK --> FCM_BE
    FCM_BE --> TOPIC
    FCM_BE --> DEVICE
    FCM_BE --> ANDROID
    FCM_BE --> IOS
    FCM_BE --> WEB

3.2 FCM 訊息類型

graph TB
    subgraph "FCM Message Types"
        subgraph "Notification Message"
            NM[系統顯示通知]
            NM_BG[App 在背景<br/>自動顯示]
            NM_FG[App 在前景<br/>傳遞給 App]
        end

        subgraph "Data Message"
            DM[純數據訊息]
            DM_HANDLE[App 自行處理]
            DM_CONTROL[完全控制顯示]
        end

        subgraph "Combined"
            BOTH[Notification + Data]
            BOTH_USE[適合大多數場景]
        end
    end

3.3 FCM 實現

import firebase_admin
from firebase_admin import credentials, messaging
from typing import Optional, List
from dataclasses import dataclass
import asyncio
from concurrent.futures import ThreadPoolExecutor

# 初始化 Firebase Admin SDK
cred = credentials.Certificate("./firebase-adminsdk.json")
firebase_admin.initialize_app(cred)

@dataclass
class FCMNotification:
    title: Optional[str] = None
    body: Optional[str] = None
    image: Optional[str] = None

@dataclass
class FCMAndroidConfig:
    priority: str = "high"  # "high" or "normal"
    ttl: int = 86400  # 秒
    collapse_key: Optional[str] = None
    channel_id: Optional[str] = None

@dataclass
class FCMAPNsConfig:
    badge: Optional[int] = None
    sound: str = "default"
    category: Optional[str] = None

class FCMClient:
    def __init__(self, max_workers: int = 10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)

    def _build_message(
        self,
        token: str,
        notification: Optional[FCMNotification] = None,
        data: Optional[dict] = None,
        android_config: Optional[FCMAndroidConfig] = None,
        apns_config: Optional[FCMAPNsConfig] = None,
    ) -> messaging.Message:
        message_kwargs = {'token': token}

        # Notification
        if notification:
            message_kwargs['notification'] = messaging.Notification(
                title=notification.title,
                body=notification.body,
                image=notification.image,
            )

        # Data
        if data:
            # FCM data 值必須是字串
            message_kwargs['data'] = {k: str(v) for k, v in data.items()}

        # Android 配置
        if android_config:
            message_kwargs['android'] = messaging.AndroidConfig(
                priority=android_config.priority,
                ttl=timedelta(seconds=android_config.ttl),
                collapse_key=android_config.collapse_key,
                notification=messaging.AndroidNotification(
                    channel_id=android_config.channel_id,
                ) if android_config.channel_id else None,
            )

        # APNs 配置
        if apns_config:
            message_kwargs['apns'] = messaging.APNSConfig(
                payload=messaging.APNSPayload(
                    aps=messaging.Aps(
                        badge=apns_config.badge,
                        sound=apns_config.sound,
                        category=apns_config.category,
                    )
                )
            )

        return messaging.Message(**message_kwargs)

    def send(
        self,
        token: str,
        notification: Optional[FCMNotification] = None,
        data: Optional[dict] = None,
        android_config: Optional[FCMAndroidConfig] = None,
        apns_config: Optional[FCMAPNsConfig] = None,
    ) -> dict:
        """發送單條訊息"""
        message = self._build_message(
            token, notification, data, android_config, apns_config
        )

        try:
            message_id = messaging.send(message)
            return {'success': True, 'message_id': message_id}
        except messaging.UnregisteredError:
            return {'success': False, 'error': 'unregistered', 'remove_token': True}
        except messaging.SenderIdMismatchError:
            return {'success': False, 'error': 'sender_id_mismatch', 'remove_token': True}
        except messaging.QuotaExceededError:
            return {'success': False, 'error': 'quota_exceeded', 'retry': True}
        except Exception as e:
            return {'success': False, 'error': str(e)}

    def send_multicast(
        self,
        tokens: List[str],
        notification: Optional[FCMNotification] = None,
        data: Optional[dict] = None,
    ) -> dict:
        """發送到多個設備(最多 500 個)"""
        message = messaging.MulticastMessage(
            tokens=tokens,
            notification=messaging.Notification(
                title=notification.title,
                body=notification.body,
            ) if notification else None,
            data={k: str(v) for k, v in data.items()} if data else None,
        )

        response = messaging.send_multicast(message)

        results = []
        for idx, send_response in enumerate(response.responses):
            if send_response.success:
                results.append({
                    'token': tokens[idx],
                    'success': True,
                    'message_id': send_response.message_id
                })
            else:
                error = send_response.exception
                results.append({
                    'token': tokens[idx],
                    'success': False,
                    'error': str(error),
                    'remove_token': isinstance(error, (
                        messaging.UnregisteredError,
                        messaging.SenderIdMismatchError
                    ))
                })

        return {
            'success_count': response.success_count,
            'failure_count': response.failure_count,
            'results': results
        }

    def send_to_topic(
        self,
        topic: str,
        notification: Optional[FCMNotification] = None,
        data: Optional[dict] = None,
    ) -> dict:
        """發送到訂閱主題的所有設備"""
        message = messaging.Message(
            topic=topic,
            notification=messaging.Notification(
                title=notification.title,
                body=notification.body,
            ) if notification else None,
            data={k: str(v) for k, v in data.items()} if data else None,
        )

        try:
            message_id = messaging.send(message)
            return {'success': True, 'message_id': message_id}
        except Exception as e:
            return {'success': False, 'error': str(e)}

    def subscribe_to_topic(self, tokens: List[str], topic: str) -> dict:
        """訂閱主題"""
        response = messaging.subscribe_to_topic(tokens, topic)
        return {
            'success_count': response.success_count,
            'failure_count': response.failure_count,
        }

    def unsubscribe_from_topic(self, tokens: List[str], topic: str) -> dict:
        """取消訂閱主題"""
        response = messaging.unsubscribe_from_topic(tokens, topic)
        return {
            'success_count': response.success_count,
            'failure_count': response.failure_count,
        }

    async def send_async(self, *args, **kwargs) -> dict:
        """異步發送"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(self.executor, lambda: self.send(*args, **kwargs))


# 使用示例
client = FCMClient()

# 發送通知
result = client.send(
    token="eXaMpLe_ToKeN...",
    notification=FCMNotification(
        title="新訊息",
        body="你有一條新訊息"
    ),
    data={
        'message_id': '12345',
        'sender': 'John'
    },
    android_config=FCMAndroidConfig(
        priority="high",
        channel_id="messages"
    )
)

# 批量發送
multicast_result = client.send_multicast(
    tokens=["token1", "token2", "token3"],
    notification=FCMNotification(title="公告", body="系統維護通知")
)

# 發送到主題
topic_result = client.send_to_topic(
    topic="news",
    notification=FCMNotification(title="突發新聞", body="...")
)

第四章:大規模推播系統設計

4.1 整體架構

graph TB
    subgraph "API Layer"
        API[API Gateway]
        AUTH[Authentication]
    end

    subgraph "Business Layer"
        PUSH_SVC[Push Service]
        TEMPLATE[Template Engine]
        SEGMENT[User Segmentation]
        SCHEDULER[Scheduler]
    end

    subgraph "Message Queue"
        KAFKA[Kafka/RabbitMQ]
        PRIORITY[Priority Queue]
    end

    subgraph "Dispatch Layer"
        DISPATCHER[Dispatcher]
        APNS_WORKER[APNs Workers]
        FCM_WORKER[FCM Workers]
        HMS_WORKER[HMS Workers]
    end

    subgraph "Storage"
        REDIS[(Redis<br/>Token Cache)]
        DB[(PostgreSQL<br/>Token Store)]
        CLICKHOUSE[(ClickHouse<br/>Analytics)]
    end

    subgraph "Push Services"
        APNS[APNs]
        FCM[FCM]
        HMS[HMS]
    end

    API --> AUTH
    AUTH --> PUSH_SVC
    PUSH_SVC --> TEMPLATE
    PUSH_SVC --> SEGMENT
    PUSH_SVC --> SCHEDULER

    SCHEDULER --> KAFKA
    PUSH_SVC --> KAFKA

    KAFKA --> DISPATCHER
    DISPATCHER --> PRIORITY

    PRIORITY --> APNS_WORKER
    PRIORITY --> FCM_WORKER
    PRIORITY --> HMS_WORKER

    APNS_WORKER --> APNS
    FCM_WORKER --> FCM
    HMS_WORKER --> HMS

    APNS_WORKER --> REDIS
    FCM_WORKER --> REDIS

    REDIS --> DB
    APNS_WORKER --> CLICKHOUSE

4.2 Token 管理

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from enum import Enum
import redis
import asyncpg

class Platform(Enum):
    IOS = "ios"
    ANDROID = "android"
    WEB = "web"

class TokenStatus(Enum):
    ACTIVE = "active"
    INACTIVE = "inactive"
    UNREGISTERED = "unregistered"

@dataclass
class DeviceToken:
    user_id: str
    token: str
    platform: Platform
    app_version: str
    device_model: str
    os_version: str
    status: TokenStatus = TokenStatus.ACTIVE
    created_at: datetime = None
    updated_at: datetime = None
    last_active_at: datetime = None

class TokenRepository:
    def __init__(self, db_pool: asyncpg.Pool, redis_client: redis.Redis):
        self.db = db_pool
        self.redis = redis_client
        self.cache_ttl = 3600  # 1 hour

    async def save_token(self, token: DeviceToken) -> None:
        """保存或更新 token"""
        query = """
            INSERT INTO device_tokens
            (user_id, token, platform, app_version, device_model,
             os_version, status, created_at, updated_at)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
            ON CONFLICT (token) DO UPDATE SET
                user_id = $1,
                platform = $3,
                app_version = $4,
                device_model = $5,
                os_version = $6,
                status = $7,
                updated_at = $9
        """
        now = datetime.utcnow()
        await self.db.execute(
            query,
            token.user_id, token.token, token.platform.value,
            token.app_version, token.device_model, token.os_version,
            token.status.value, now, now
        )

        # 更新快取
        cache_key = f"tokens:{token.user_id}"
        self.redis.delete(cache_key)

    async def get_user_tokens(
        self,
        user_id: str,
        platform: Optional[Platform] = None
    ) -> list[DeviceToken]:
        """獲取使用者的所有 active token"""
        # 先查快取
        cache_key = f"tokens:{user_id}"
        cached = self.redis.get(cache_key)
        if cached:
            tokens = self._deserialize_tokens(cached)
            if platform:
                tokens = [t for t in tokens if t.platform == platform]
            return tokens

        # 查詢資料庫
        query = """
            SELECT * FROM device_tokens
            WHERE user_id = $1 AND status = 'active'
        """
        rows = await self.db.fetch(query, user_id)
        tokens = [self._row_to_token(row) for row in rows]

        # 寫入快取
        self.redis.setex(
            cache_key,
            self.cache_ttl,
            self._serialize_tokens(tokens)
        )

        if platform:
            tokens = [t for t in tokens if t.platform == platform]

        return tokens

    async def mark_unregistered(self, token: str) -> None:
        """標記 token 為已取消註冊"""
        query = """
            UPDATE device_tokens
            SET status = 'unregistered', updated_at = $2
            WHERE token = $1
        """
        await self.db.execute(query, token, datetime.utcnow())

        # 清除相關快取
        result = await self.db.fetchrow(
            "SELECT user_id FROM device_tokens WHERE token = $1", token
        )
        if result:
            self.redis.delete(f"tokens:{result['user_id']}")

    async def cleanup_inactive_tokens(self, days: int = 90) -> int:
        """清理長時間未活躍的 token"""
        query = """
            UPDATE device_tokens
            SET status = 'inactive', updated_at = $2
            WHERE last_active_at < $1 AND status = 'active'
            RETURNING token
        """
        cutoff = datetime.utcnow() - timedelta(days=days)
        result = await self.db.fetch(query, cutoff, datetime.utcnow())
        return len(result)

4.3 訊息佇列設計

from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
import json
import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

class MessagePriority(Enum):
    HIGH = 0      # 即時訊息(如聊天)
    NORMAL = 1    # 一般通知
    LOW = 2       # 行銷訊息

@dataclass
class PushMessage:
    message_id: str
    user_id: str
    title: str
    body: str
    data: dict
    priority: MessagePriority
    created_at: datetime
    expire_at: datetime
    platform: Optional[Platform] = None
    retry_count: int = 0

class MessageQueue:
    def __init__(self, bootstrap_servers: str):
        self.bootstrap_servers = bootstrap_servers
        self.producer = None
        self.consumers = {}

    async def connect(self):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
        )
        await self.producer.start()

    async def publish(self, message: PushMessage) -> None:
        """發布推送訊息"""
        topic = self._get_topic_by_priority(message.priority)

        await self.producer.send_and_wait(
            topic=topic,
            key=message.user_id,
            value=asdict(message),
            headers=[
                ('priority', str(message.priority.value).encode()),
                ('created_at', message.created_at.isoformat().encode()),
            ]
        )

    async def publish_batch(self, messages: list[PushMessage]) -> None:
        """批量發布"""
        batch = self.producer.create_batch()

        for message in messages:
            topic = self._get_topic_by_priority(message.priority)
            # 批量發送邏輯
            await self.producer.send(
                topic=topic,
                key=message.user_id,
                value=asdict(message)
            )

        await self.producer.flush()

    def _get_topic_by_priority(self, priority: MessagePriority) -> str:
        return f"push_messages_{priority.name.lower()}"

    async def consume(
        self,
        priority: MessagePriority,
        handler: callable,
        group_id: str
    ):
        """消費訊息"""
        topic = self._get_topic_by_priority(priority)

        consumer = AIOKafkaConsumer(
            topic,
            bootstrap_servers=self.bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda v: json.loads(v.decode('utf-8')),
            auto_offset_reset='earliest',
            enable_auto_commit=False,
        )

        await consumer.start()
        self.consumers[priority] = consumer

        try:
            async for msg in consumer:
                try:
                    message = PushMessage(**msg.value)
                    await handler(message)
                    await consumer.commit()
                except Exception as e:
                    # 錯誤處理,可能需要重試或死信佇列
                    await self._handle_error(message, e)
        finally:
            await consumer.stop()

    async def _handle_error(self, message: PushMessage, error: Exception):
        """錯誤處理"""
        if message.retry_count < 3:
            message.retry_count += 1
            await self.publish(message)
        else:
            # 發送到死信佇列
            await self.producer.send(
                'push_messages_dlq',
                key=message.user_id,
                value=asdict(message)
            )

4.4 推送調度器

import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List
import aioredis

@dataclass
class ScheduledPush:
    push_id: str
    message: PushMessage
    scheduled_time: datetime
    target_users: List[str]
    status: str = "pending"

class PushScheduler:
    def __init__(
        self,
        redis: aioredis.Redis,
        message_queue: MessageQueue,
        token_repo: TokenRepository
    ):
        self.redis = redis
        self.mq = message_queue
        self.token_repo = token_repo
        self.running = False

    async def schedule(self, push: ScheduledPush) -> str:
        """排程推送"""
        # 存儲排程任務
        await self.redis.zadd(
            'scheduled_pushes',
            {push.push_id: push.scheduled_time.timestamp()}
        )

        # 存儲任務詳情
        await self.redis.hset(
            f'push_details:{push.push_id}',
            mapping={
                'message': json.dumps(asdict(push.message)),
                'target_users': json.dumps(push.target_users),
                'status': push.status,
            }
        )

        return push.push_id

    async def cancel(self, push_id: str) -> bool:
        """取消排程"""
        removed = await self.redis.zrem('scheduled_pushes', push_id)
        if removed:
            await self.redis.hset(f'push_details:{push_id}', 'status', 'cancelled')
        return removed > 0

    async def start(self):
        """啟動排程器"""
        self.running = True
        while self.running:
            try:
                await self._process_due_pushes()
            except Exception as e:
                print(f"Scheduler error: {e}")

            await asyncio.sleep(1)  # 每秒檢查

    async def stop(self):
        self.running = False

    async def _process_due_pushes(self):
        """處理到期的推送"""
        now = datetime.utcnow().timestamp()

        # 獲取所有到期的推送
        due_pushes = await self.redis.zrangebyscore(
            'scheduled_pushes',
            min=0,
            max=now,
            withscores=True
        )

        for push_id, score in due_pushes:
            push_id = push_id.decode() if isinstance(push_id, bytes) else push_id

            # 原子操作:移除並處理
            removed = await self.redis.zrem('scheduled_pushes', push_id)
            if not removed:
                continue  # 已被其他 worker 處理

            # 獲取任務詳情
            details = await self.redis.hgetall(f'push_details:{push_id}')
            if not details:
                continue

            # 發送推送
            await self._execute_push(push_id, details)

    async def _execute_push(self, push_id: str, details: dict):
        """執行推送"""
        message_data = json.loads(details[b'message'].decode())
        target_users = json.loads(details[b'target_users'].decode())

        # 更新狀態
        await self.redis.hset(f'push_details:{push_id}', 'status', 'sending')

        # 為每個使用者創建推送任務
        for user_id in target_users:
            tokens = await self.token_repo.get_user_tokens(user_id)

            for token in tokens:
                message = PushMessage(
                    message_id=f"{push_id}_{token.token[:8]}",
                    user_id=user_id,
                    **message_data
                )
                await self.mq.publish(message)

        # 更新狀態
        await self.redis.hset(f'push_details:{push_id}', 'status', 'completed')

4.5 推送 Worker

import asyncio
from collections import defaultdict
from dataclasses import dataclass
import time

@dataclass
class PushStats:
    total: int = 0
    success: int = 0
    failed: int = 0
    invalid_tokens: int = 0

class PushWorker:
    def __init__(
        self,
        apns_client: APNsClient,
        fcm_client: FCMClient,
        token_repo: TokenRepository,
        message_queue: MessageQueue,
        batch_size: int = 100,
        batch_timeout: float = 0.1
    ):
        self.apns = apns_client
        self.fcm = fcm_client
        self.token_repo = token_repo
        self.mq = message_queue
        self.batch_size = batch_size
        self.batch_timeout = batch_timeout

        self.stats = PushStats()
        self.pending_ios: list = []
        self.pending_android: list = []
        self.last_flush_time = time.time()

    async def process_message(self, message: PushMessage):
        """處理單條推送訊息"""
        # 獲取使用者 token
        tokens = await self.token_repo.get_user_tokens(
            message.user_id,
            message.platform
        )

        if not tokens:
            return

        for token in tokens:
            if token.platform == Platform.IOS:
                self.pending_ios.append((token, message))
            elif token.platform == Platform.ANDROID:
                self.pending_android.append((token, message))

        # 檢查是否需要刷新
        await self._maybe_flush()

    async def _maybe_flush(self):
        """檢查是否需要批量發送"""
        now = time.time()
        should_flush = (
            len(self.pending_ios) >= self.batch_size or
            len(self.pending_android) >= self.batch_size or
            now - self.last_flush_time > self.batch_timeout
        )

        if should_flush:
            await self._flush()

    async def _flush(self):
        """批量發送"""
        ios_batch = self.pending_ios[:self.batch_size]
        android_batch = self.pending_android[:self.batch_size]

        self.pending_ios = self.pending_ios[self.batch_size:]
        self.pending_android = self.pending_android[self.batch_size:]

        # 並行發送
        await asyncio.gather(
            self._send_ios_batch(ios_batch),
            self._send_android_batch(android_batch),
        )

        self.last_flush_time = time.time()

    async def _send_ios_batch(self, batch: list):
        """批量發送 iOS 推送"""
        if not batch:
            return

        notifications = []
        for token, message in batch:
            payload = APNsPayload(
                title=message.title,
                body=message.body,
                custom_data=message.data
            )
            notifications.append((token.token, payload))

        results = await self.apns.send_batch(
            notifications,
            topic="com.example.app"
        )

        # 處理結果
        for (token, _), result in zip(batch, results):
            self.stats.total += 1
            if result['success']:
                self.stats.success += 1
            else:
                self.stats.failed += 1
                if APNsError.should_remove_token(
                    result.get('status_code', 0),
                    result.get('reason', '')
                ):
                    self.stats.invalid_tokens += 1
                    await self.token_repo.mark_unregistered(token.token)

    async def _send_android_batch(self, batch: list):
        """批量發送 Android 推送"""
        if not batch:
            return

        # FCM 一次最多 500 個 token
        chunk_size = 500
        for i in range(0, len(batch), chunk_size):
            chunk = batch[i:i + chunk_size]
            tokens = [t.token for t, _ in chunk]
            message = chunk[0][1]  # 假設同一批次訊息相同

            result = self.fcm.send_multicast(
                tokens=tokens,
                notification=FCMNotification(
                    title=message.title,
                    body=message.body
                ),
                data=message.data
            )

            self.stats.total += len(tokens)
            self.stats.success += result['success_count']
            self.stats.failed += result['failure_count']

            # 處理無效 token
            for r in result['results']:
                if r.get('remove_token'):
                    self.stats.invalid_tokens += 1
                    await self.token_repo.mark_unregistered(r['token'])

    async def run(self, priority: MessagePriority):
        """啟動 worker"""
        await self.mq.consume(
            priority=priority,
            handler=self.process_message,
            group_id=f"push_worker_{priority.name}"
        )

第五章:送達率優化

5.1 送達率監控

graph TB
    subgraph "推送漏斗"
        TOTAL[總推送數<br/>100%]
        SENT[發送成功<br/>98%]
        DELIVERED[送達設備<br/>95%]
        DISPLAYED[展示通知<br/>90%]
        CLICKED[點擊通知<br/>5%]
    end

    TOTAL --> SENT
    SENT --> DELIVERED
    DELIVERED --> DISPLAYED
    DISPLAYED --> CLICKED

5.2 優化策略

class DeliveryOptimizer:
    """送達率優化器"""

    def __init__(self, analytics_client):
        self.analytics = analytics_client

    async def get_optimal_send_time(self, user_id: str) -> datetime:
        """獲取使用者最佳推送時間"""
        # 分析使用者歷史活躍時間
        activity = await self.analytics.get_user_activity_pattern(user_id)

        # 找出最活躍的時間段
        peak_hours = sorted(
            activity.items(),
            key=lambda x: x[1],
            reverse=True
        )[:3]

        # 返回下一個最佳時間
        return self._next_occurrence(peak_hours[0][0])

    async def should_send_now(self, user_id: str, message: PushMessage) -> bool:
        """判斷是否應該立即發送"""
        # 高優先級訊息立即發送
        if message.priority == MessagePriority.HIGH:
            return True

        # 檢查使用者勿擾設定
        dnd = await self.get_dnd_settings(user_id)
        if dnd.is_active():
            return False

        # 檢查推送頻率限制
        recent_count = await self.get_recent_push_count(user_id)
        if recent_count > 10:  # 每小時最多 10 條
            return False

        return True

    async def prioritize_channels(
        self,
        user_id: str
    ) -> list[Platform]:
        """根據使用者偏好排序推送渠道"""
        # 獲取各渠道的點擊率
        channel_ctr = await self.analytics.get_channel_ctr(user_id)

        # 按點擊率排序
        sorted_channels = sorted(
            channel_ctr.items(),
            key=lambda x: x[1],
            reverse=True
        )

        return [Platform(ch) for ch, _ in sorted_channels]

class RetryStrategy:
    """重試策略"""

    MAX_RETRIES = 3
    RETRY_DELAYS = [1, 5, 30]  # 秒

    async def should_retry(
        self,
        message: PushMessage,
        error: Exception
    ) -> tuple[bool, int]:
        """判斷是否應該重試"""
        # 不可重試的錯誤
        if isinstance(error, (InvalidTokenError, UnregisteredError)):
            return False, 0

        # 超過最大重試次數
        if message.retry_count >= self.MAX_RETRIES:
            return False, 0

        # 計算延遲時間
        delay = self.RETRY_DELAYS[message.retry_count]

        # 指數退避
        delay *= (2 ** message.retry_count)

        return True, delay

第六章:Web Push

6.1 Web Push 原理

sequenceDiagram
    participant Browser as Browser
    participant SW as Service Worker
    participant Server as App Server
    participant Push as Push Service<br/>(Browser Vendor)

    Browser->>Browser: 請求通知權限
    Browser->>Push: 訂閱推送
    Push-->>Browser: Push Subscription
    Browser->>Server: 發送訂閱信息

    Note over Server, Push: 發送推送

    Server->>Push: 加密訊息 (VAPID)
    Push->>SW: 傳遞訊息
    SW->>Browser: 顯示通知

6.2 Web Push 實現

from py_vapid import Vapid
from pywebpush import webpush, WebPushException
import json
from dataclasses import dataclass

@dataclass
class PushSubscription:
    endpoint: str
    p256dh: str
    auth: str
    user_id: str

class WebPushClient:
    def __init__(
        self,
        vapid_private_key: str,
        vapid_claims: dict
    ):
        self.vapid_private_key = vapid_private_key
        self.vapid_claims = vapid_claims

    def send(
        self,
        subscription: PushSubscription,
        title: str,
        body: str,
        data: dict = None,
        icon: str = None,
        url: str = None,
        ttl: int = 86400
    ) -> dict:
        """發送 Web Push"""
        payload = {
            'title': title,
            'body': body,
            'icon': icon,
            'data': data or {},
            'url': url,
        }

        subscription_info = {
            'endpoint': subscription.endpoint,
            'keys': {
                'p256dh': subscription.p256dh,
                'auth': subscription.auth,
            }
        }

        try:
            response = webpush(
                subscription_info=subscription_info,
                data=json.dumps(payload),
                vapid_private_key=self.vapid_private_key,
                vapid_claims=self.vapid_claims,
                ttl=ttl
            )
            return {'success': True, 'status_code': response.status_code}

        except WebPushException as e:
            if e.response.status_code == 410:  # Gone - 訂閱已失效
                return {
                    'success': False,
                    'error': 'subscription_expired',
                    'remove': True
                }
            elif e.response.status_code == 429:  # Too Many Requests
                return {
                    'success': False,
                    'error': 'rate_limited',
                    'retry': True
                }
            else:
                return {
                    'success': False,
                    'error': str(e),
                    'status_code': e.response.status_code
                }


# Service Worker (前端)
"""
// service-worker.js
self.addEventListener('push', function(event) {
    const data = event.data.json();

    const options = {
        body: data.body,
        icon: data.icon,
        badge: '/badge.png',
        data: {
            url: data.url,
            ...data.data
        },
        actions: [
            { action: 'open', title: '查看' },
            { action: 'close', title: '關閉' }
        ]
    };

    event.waitUntil(
        self.registration.showNotification(data.title, options)
    );
});

self.addEventListener('notificationclick', function(event) {
    event.notification.close();

    if (event.action === 'open' || !event.action) {
        const url = event.notification.data.url || '/';
        event.waitUntil(
            clients.openWindow(url)
        );
    }
});
"""

第七章:監控與分析

7.1 監控指標

from prometheus_client import Counter, Histogram, Gauge
import time

# Prometheus 指標
push_total = Counter(
    'push_notifications_total',
    'Total push notifications',
    ['platform', 'priority', 'status']
)

push_latency = Histogram(
    'push_notification_latency_seconds',
    'Push notification latency',
    ['platform'],
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]
)

pending_messages = Gauge(
    'push_pending_messages',
    'Pending messages in queue',
    ['priority']
)

invalid_tokens = Counter(
    'push_invalid_tokens_total',
    'Invalid tokens detected',
    ['platform', 'reason']
)

class PushMetrics:
    @staticmethod
    def record_push(platform: str, priority: str, success: bool, latency: float):
        status = 'success' if success else 'failed'
        push_total.labels(platform=platform, priority=priority, status=status).inc()
        push_latency.labels(platform=platform).observe(latency)

    @staticmethod
    def record_invalid_token(platform: str, reason: str):
        invalid_tokens.labels(platform=platform, reason=reason).inc()

    @staticmethod
    def update_pending(priority: str, count: int):
        pending_messages.labels(priority=priority).set(count)

7.2 分析儀表板

-- ClickHouse 分析查詢

-- 每小時推送統計
SELECT
    toStartOfHour(created_at) as hour,
    platform,
    countIf(status = 'sent') as sent,
    countIf(status = 'delivered') as delivered,
    countIf(status = 'clicked') as clicked,
    round(delivered / sent * 100, 2) as delivery_rate,
    round(clicked / delivered * 100, 2) as click_rate
FROM push_events
WHERE created_at >= now() - INTERVAL 24 HOUR
GROUP BY hour, platform
ORDER BY hour DESC;

-- 推送延遲分佈
SELECT
    platform,
    quantile(0.50)(latency_ms) as p50,
    quantile(0.90)(latency_ms) as p90,
    quantile(0.99)(latency_ms) as p99,
    max(latency_ms) as max_latency
FROM push_events
WHERE created_at >= now() - INTERVAL 1 HOUR
GROUP BY platform;

-- 失敗原因分析
SELECT
    platform,
    error_code,
    error_reason,
    count() as count,
    round(count() / sum(count()) OVER () * 100, 2) as percentage
FROM push_events
WHERE status = 'failed'
    AND created_at >= now() - INTERVAL 24 HOUR
GROUP BY platform, error_code, error_reason
ORDER BY count DESC
LIMIT 20;

-- 使用者觸達分析
SELECT
    user_id,
    countIf(status = 'sent') as total_sent,
    countIf(status = 'delivered') as total_delivered,
    countIf(status = 'clicked') as total_clicked,
    round(total_clicked / total_delivered * 100, 2) as engagement_rate
FROM push_events
WHERE created_at >= now() - INTERVAL 7 DAY
GROUP BY user_id
HAVING total_sent > 10
ORDER BY engagement_rate DESC
LIMIT 100;

第八章:最佳實踐

8.1 推送內容設計

# 好的推送通知
good_examples:
  - title: "您的訂單已發貨"
    body: "訂單 #12345 已由順豐快遞攬收,預計明天送達"
    action: "查看物流"

  - title: "新訊息來自 John"
    body: "John: 嗨,週末有空嗎?"
    action: "回覆"

  - title: "限時優惠"
    body: "您關注的商品降價 30%,僅剩 2 小時"
    action: "立即查看"

# 不好的推送通知
bad_examples:
  - title: "重要通知"
    body: "請查看 App 了解詳情"  # 內容不明確

  - title: "🔥🔥🔥 超級優惠 🔥🔥🔥"
    body: "快來看看!!!"  # 過度使用符號

  - title: "Application Update"
    body: "New version available"  # 非用戶語言

8.2 頻率控制

class FrequencyController:
    """推送頻率控制"""

    def __init__(self, redis: redis.Redis):
        self.redis = redis

    async def check_and_increment(
        self,
        user_id: str,
        push_type: str
    ) -> bool:
        """檢查並增加計數"""
        limits = {
            'marketing': {'hourly': 1, 'daily': 3, 'weekly': 10},
            'transactional': {'hourly': 10, 'daily': 50},
            'social': {'hourly': 20, 'daily': 100},
        }

        type_limits = limits.get(push_type, limits['transactional'])

        # 檢查各時間窗口
        for window, limit in type_limits.items():
            key = f"push_freq:{user_id}:{push_type}:{window}"
            count = await self.redis.get(key)

            if count and int(count) >= limit:
                return False

        # 增加計數
        for window, _ in type_limits.items():
            key = f"push_freq:{user_id}:{push_type}:{window}"
            ttl = self._get_ttl(window)
            await self.redis.incr(key)
            await self.redis.expire(key, ttl)

        return True

    def _get_ttl(self, window: str) -> int:
        return {
            'hourly': 3600,
            'daily': 86400,
            'weekly': 604800,
        }[window]

8.3 A/B 測試

class PushABTest:
    """推送 A/B 測試"""

    def __init__(self, db, analytics):
        self.db = db
        self.analytics = analytics

    async def create_test(
        self,
        name: str,
        variants: list[dict],
        traffic_split: list[float],
        target_audience: str
    ) -> str:
        """創建 A/B 測試"""
        test_id = generate_uuid()

        await self.db.insert('ab_tests', {
            'id': test_id,
            'name': name,
            'variants': json.dumps(variants),
            'traffic_split': traffic_split,
            'target_audience': target_audience,
            'status': 'running',
            'created_at': datetime.utcnow(),
        })

        return test_id

    async def get_variant(self, test_id: str, user_id: str) -> dict:
        """為使用者分配變體"""
        test = await self.db.get('ab_tests', test_id)

        # 使用 user_id hash 確保一致性分配
        hash_value = int(hashlib.md5(
            f"{test_id}:{user_id}".encode()
        ).hexdigest(), 16) % 100

        cumulative = 0
        for i, split in enumerate(test['traffic_split']):
            cumulative += split * 100
            if hash_value < cumulative:
                return test['variants'][i]

        return test['variants'][-1]

    async def analyze_results(self, test_id: str) -> dict:
        """分析測試結果"""
        query = """
            SELECT
                variant,
                count() as total,
                countIf(clicked = 1) as clicks,
                round(clicks / total * 100, 2) as ctr
            FROM push_events
            WHERE ab_test_id = %(test_id)s
            GROUP BY variant
        """

        results = await self.analytics.query(query, {'test_id': test_id})

        # 計算統計顯著性
        return self._calculate_significance(results)

總結

設計一個高效的推播通知系統需要考慮多個層面:

graph TB
    subgraph "核心要素"
        PLATFORM[平台適配<br/>APNs/FCM/HMS]
        DELIVERY[送達保證<br/>重試/降級]
        SCALE[可擴展性<br/>分散式架構]
        MONITOR[監控分析<br/>指標/告警]
    end

    subgraph "優化方向"
        LATENCY[降低延遲]
        RATE[提高送達率]
        ENGAGE[提升互動率]
        COST[控制成本]
    end

關鍵要點:

  1. 平台適配:正確實現 APNs/FCM 協議
  2. Token 管理:及時清理無效 token
  3. 訊息佇列:解耦和削峰
  4. 批量處理:提高吞吐量
  5. 錯誤處理:合理的重試策略
  6. 頻率控制:避免騷擾使用者
  7. 監控分析:持續優化

參考資源

  1. Apple APNs 文檔
  2. Firebase Cloud Messaging
  3. Web Push Protocol
  4. VAPID 規範