手機端推播通知系統設計:從 APNs/FCM 到大規模分發架構
引言:推播通知的重要性
在行動應用時代,推播通知(Push Notification)是與使用者保持連結的關鍵渠道。無論是即時訊息、交易提醒、行銷活動,都依賴推播系統即時觸達使用者。
設計一個高效的推播系統需要考慮:
- 即時性:訊息延遲控制在秒級
- 可靠性:確保訊息送達
- 可擴展性:支援億級設備
- 效率:合理使用系統資源
本文將從底層原理到系統設計,全面解析推播通知系統的實現。
第一章:推播通知原理概覽
1.1 推播通知流程
sequenceDiagram
participant App as Mobile App
participant Device as Device OS
participant PNS as Push Notification Service<br/>(APNs/FCM)
participant Server as App Server
participant DB as Token Storage
App->>Device: 註冊推播
Device->>PNS: 請求 Device Token
PNS-->>Device: 返回 Device Token
Device-->>App: 傳遞 Token
App->>Server: 上傳 Token
Server->>DB: 存儲 Token
Note over Server, PNS: 發送推播
Server->>PNS: 推送訊息 + Token
PNS->>Device: 傳遞訊息
Device->>App: 顯示通知
1.2 主要推送服務
graph TB
subgraph "iOS Ecosystem"
APNS[Apple Push Notification Service<br/>APNs]
end
subgraph "Android Ecosystem"
FCM[Firebase Cloud Messaging<br/>FCM]
HMS[Huawei Mobile Services<br/>HMS Push]
XIAOMI[Mi Push]
OPPO[OPPO Push]
VIVO[VIVO Push]
end
subgraph "Cross-Platform"
WEB[Web Push<br/>VAPID]
end
subgraph "Third-Party Services"
JPUSH[極光推送]
GETUI[個推]
UMENG[友盟推送]
end
第二章:Apple Push Notification Service (APNs)
2.1 APNs 架構
graph TB
subgraph "Provider (Your Server)"
PS[Push Service]
TLS[TLS Connection]
HTTP2[HTTP/2 Protocol]
end
subgraph "APNs"
GW[APNs Gateway]
QUEUE[Message Queue]
ROUTER[Device Router]
end
subgraph "Apple Device"
DAEMON[apsd Daemon]
APP[Your App]
end
PS --> TLS
TLS --> HTTP2
HTTP2 --> GW
GW --> QUEUE
QUEUE --> ROUTER
ROUTER --> DAEMON
DAEMON --> APP
2.2 APNs 連接與認證
兩種認證方式:
graph LR
subgraph "Certificate-Based (傳統)"
CERT[.p12 Certificate]
CERT --> |每年更新| APNS1[APNs]
end
subgraph "Token-Based (推薦)"
KEY[.p8 Private Key]
KEY --> |生成| JWT[JWT Token]
JWT --> |每小時刷新| APNS2[APNs]
end
Token 認證實現:
import jwt
import time
from datetime import datetime, timedelta
class APNsTokenManager:
def __init__(self, key_id: str, team_id: str, private_key_path: str):
self.key_id = key_id
self.team_id = team_id
with open(private_key_path, 'r') as f:
self.private_key = f.read()
self.token = None
self.token_time = None
def get_token(self) -> str:
# Token 有效期 1 小時,提前 10 分鐘刷新
if (self.token is None or
self.token_time is None or
datetime.now() - self.token_time > timedelta(minutes=50)):
self._generate_token()
return self.token
def _generate_token(self):
now = int(time.time())
payload = {
'iss': self.team_id, # Issuer (Team ID)
'iat': now, # Issued At
}
headers = {
'alg': 'ES256', # Algorithm
'kid': self.key_id, # Key ID
}
self.token = jwt.encode(
payload,
self.private_key,
algorithm='ES256',
headers=headers
)
self.token_time = datetime.now()
2.3 APNs HTTP/2 推送
import httpx
import asyncio
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class APNsPayload:
alert: Optional[str] = None
title: Optional[str] = None
body: Optional[str] = None
badge: Optional[int] = None
sound: str = "default"
category: Optional[str] = None
thread_id: Optional[str] = None
custom_data: Optional[dict] = None
def to_dict(self) -> dict:
aps = {}
if self.alert:
aps['alert'] = self.alert
elif self.title or self.body:
aps['alert'] = {}
if self.title:
aps['alert']['title'] = self.title
if self.body:
aps['alert']['body'] = self.body
if self.badge is not None:
aps['badge'] = self.badge
if self.sound:
aps['sound'] = self.sound
if self.category:
aps['category'] = self.category
if self.thread_id:
aps['thread-id'] = self.thread_id
payload = {'aps': aps}
if self.custom_data:
payload.update(self.custom_data)
return payload
class APNsClient:
SANDBOX_URL = "https://api.sandbox.push.apple.com"
PRODUCTION_URL = "https://api.push.apple.com"
def __init__(self, token_manager: APNsTokenManager, sandbox: bool = False):
self.token_manager = token_manager
self.base_url = self.SANDBOX_URL if sandbox else self.PRODUCTION_URL
self.client = httpx.AsyncClient(http2=True)
async def send(
self,
device_token: str,
payload: APNsPayload,
topic: str, # Bundle ID
priority: int = 10,
expiration: int = 0,
collapse_id: Optional[str] = None,
push_type: str = "alert"
) -> dict:
url = f"{self.base_url}/3/device/{device_token}"
headers = {
'authorization': f'bearer {self.token_manager.get_token()}',
'apns-topic': topic,
'apns-push-type': push_type,
'apns-priority': str(priority),
'apns-expiration': str(expiration),
}
if collapse_id:
headers['apns-collapse-id'] = collapse_id
body = json.dumps(payload.to_dict())
response = await self.client.post(url, headers=headers, content=body)
if response.status_code == 200:
return {
'success': True,
'apns_id': response.headers.get('apns-id')
}
else:
error_body = response.json() if response.content else {}
return {
'success': False,
'status_code': response.status_code,
'reason': error_body.get('reason'),
'timestamp': error_body.get('timestamp')
}
async def send_batch(
self,
notifications: list[tuple[str, APNsPayload]],
topic: str
) -> list[dict]:
"""並行發送多個通知"""
tasks = [
self.send(token, payload, topic)
for token, payload in notifications
]
return await asyncio.gather(*tasks)
async def close(self):
await self.client.aclose()
# 使用示例
async def main():
token_manager = APNsTokenManager(
key_id="ABC123DEFG",
team_id="TEAM123456",
private_key_path="./AuthKey_ABC123DEFG.p8"
)
client = APNsClient(token_manager, sandbox=True)
payload = APNsPayload(
title="新訊息",
body="你有一條新訊息",
badge=1,
sound="default",
custom_data={'message_id': '12345'}
)
result = await client.send(
device_token="a1b2c3d4e5f6...",
payload=payload,
topic="com.example.app"
)
print(result)
await client.close()
asyncio.run(main())
2.4 APNs 錯誤處理
class APNsError:
"""APNs 錯誤代碼對照"""
ERRORS = {
400: {
'BadCollapseId': '無效的 collapse ID',
'BadDeviceToken': '無效的 device token',
'BadExpirationDate': '無效的過期時間',
'BadMessageId': '無效的訊息 ID',
'BadPriority': '無效的優先級',
'BadTopic': '無效的 topic',
'DeviceTokenNotForTopic': 'Token 與 Topic 不匹配',
'DuplicateHeaders': '重複的 HTTP 標頭',
'IdleTimeout': '連接閒置超時',
'InvalidPushType': '無效的推送類型',
'MissingDeviceToken': '缺少 device token',
'MissingTopic': '缺少 topic',
'PayloadEmpty': 'Payload 為空',
'TopicDisallowed': 'Topic 被禁用',
},
403: {
'BadCertificate': '無效的證書',
'BadCertificateEnvironment': '證書環境不匹配',
'ExpiredProviderToken': 'Provider token 已過期',
'Forbidden': '禁止訪問',
'InvalidProviderToken': '無效的 provider token',
'MissingProviderToken': '缺少 provider token',
},
404: {
'BadPath': '無效的路徑',
},
405: {
'MethodNotAllowed': '方法不允許',
},
410: {
'Unregistered': '設備已取消註冊', # 重要:需要刪除 token
},
413: {
'PayloadTooLarge': 'Payload 過大(最大 4KB)',
},
429: {
'TooManyProviderTokenUpdates': 'Token 更新過於頻繁',
'TooManyRequests': '請求過多',
},
500: {
'InternalServerError': 'APNs 內部錯誤',
},
503: {
'ServiceUnavailable': '服務暫時不可用',
'Shutdown': 'APNs 正在關閉連接',
},
}
@classmethod
def should_retry(cls, status_code: int, reason: str) -> bool:
"""判斷是否應該重試"""
if status_code in (429, 500, 503):
return True
return False
@classmethod
def should_remove_token(cls, status_code: int, reason: str) -> bool:
"""判斷是否應該移除 token"""
if status_code == 410: # Unregistered
return True
if status_code == 400 and reason in ('BadDeviceToken', 'DeviceTokenNotForTopic'):
return True
return False
第三章:Firebase Cloud Messaging (FCM)
3.1 FCM 架構
graph TB
subgraph "Your Backend"
SERVER[App Server]
FCM_SDK[Firebase Admin SDK]
end
subgraph "Firebase"
FCM_BE[FCM Backend]
TOPIC[Topic Messaging]
DEVICE[Device Groups]
end
subgraph "Client"
ANDROID[Android App]
IOS[iOS App]
WEB[Web App]
end
SERVER --> FCM_SDK
FCM_SDK --> FCM_BE
FCM_BE --> TOPIC
FCM_BE --> DEVICE
FCM_BE --> ANDROID
FCM_BE --> IOS
FCM_BE --> WEB
3.2 FCM 訊息類型
graph TB
subgraph "FCM Message Types"
subgraph "Notification Message"
NM[系統顯示通知]
NM_BG[App 在背景<br/>自動顯示]
NM_FG[App 在前景<br/>傳遞給 App]
end
subgraph "Data Message"
DM[純數據訊息]
DM_HANDLE[App 自行處理]
DM_CONTROL[完全控制顯示]
end
subgraph "Combined"
BOTH[Notification + Data]
BOTH_USE[適合大多數場景]
end
end
3.3 FCM 實現
import firebase_admin
from firebase_admin import credentials, messaging
from typing import Optional, List
from dataclasses import dataclass
import asyncio
from concurrent.futures import ThreadPoolExecutor
# 初始化 Firebase Admin SDK
cred = credentials.Certificate("./firebase-adminsdk.json")
firebase_admin.initialize_app(cred)
@dataclass
class FCMNotification:
title: Optional[str] = None
body: Optional[str] = None
image: Optional[str] = None
@dataclass
class FCMAndroidConfig:
priority: str = "high" # "high" or "normal"
ttl: int = 86400 # 秒
collapse_key: Optional[str] = None
channel_id: Optional[str] = None
@dataclass
class FCMAPNsConfig:
badge: Optional[int] = None
sound: str = "default"
category: Optional[str] = None
class FCMClient:
def __init__(self, max_workers: int = 10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def _build_message(
self,
token: str,
notification: Optional[FCMNotification] = None,
data: Optional[dict] = None,
android_config: Optional[FCMAndroidConfig] = None,
apns_config: Optional[FCMAPNsConfig] = None,
) -> messaging.Message:
message_kwargs = {'token': token}
# Notification
if notification:
message_kwargs['notification'] = messaging.Notification(
title=notification.title,
body=notification.body,
image=notification.image,
)
# Data
if data:
# FCM data 值必須是字串
message_kwargs['data'] = {k: str(v) for k, v in data.items()}
# Android 配置
if android_config:
message_kwargs['android'] = messaging.AndroidConfig(
priority=android_config.priority,
ttl=timedelta(seconds=android_config.ttl),
collapse_key=android_config.collapse_key,
notification=messaging.AndroidNotification(
channel_id=android_config.channel_id,
) if android_config.channel_id else None,
)
# APNs 配置
if apns_config:
message_kwargs['apns'] = messaging.APNSConfig(
payload=messaging.APNSPayload(
aps=messaging.Aps(
badge=apns_config.badge,
sound=apns_config.sound,
category=apns_config.category,
)
)
)
return messaging.Message(**message_kwargs)
def send(
self,
token: str,
notification: Optional[FCMNotification] = None,
data: Optional[dict] = None,
android_config: Optional[FCMAndroidConfig] = None,
apns_config: Optional[FCMAPNsConfig] = None,
) -> dict:
"""發送單條訊息"""
message = self._build_message(
token, notification, data, android_config, apns_config
)
try:
message_id = messaging.send(message)
return {'success': True, 'message_id': message_id}
except messaging.UnregisteredError:
return {'success': False, 'error': 'unregistered', 'remove_token': True}
except messaging.SenderIdMismatchError:
return {'success': False, 'error': 'sender_id_mismatch', 'remove_token': True}
except messaging.QuotaExceededError:
return {'success': False, 'error': 'quota_exceeded', 'retry': True}
except Exception as e:
return {'success': False, 'error': str(e)}
def send_multicast(
self,
tokens: List[str],
notification: Optional[FCMNotification] = None,
data: Optional[dict] = None,
) -> dict:
"""發送到多個設備(最多 500 個)"""
message = messaging.MulticastMessage(
tokens=tokens,
notification=messaging.Notification(
title=notification.title,
body=notification.body,
) if notification else None,
data={k: str(v) for k, v in data.items()} if data else None,
)
response = messaging.send_multicast(message)
results = []
for idx, send_response in enumerate(response.responses):
if send_response.success:
results.append({
'token': tokens[idx],
'success': True,
'message_id': send_response.message_id
})
else:
error = send_response.exception
results.append({
'token': tokens[idx],
'success': False,
'error': str(error),
'remove_token': isinstance(error, (
messaging.UnregisteredError,
messaging.SenderIdMismatchError
))
})
return {
'success_count': response.success_count,
'failure_count': response.failure_count,
'results': results
}
def send_to_topic(
self,
topic: str,
notification: Optional[FCMNotification] = None,
data: Optional[dict] = None,
) -> dict:
"""發送到訂閱主題的所有設備"""
message = messaging.Message(
topic=topic,
notification=messaging.Notification(
title=notification.title,
body=notification.body,
) if notification else None,
data={k: str(v) for k, v in data.items()} if data else None,
)
try:
message_id = messaging.send(message)
return {'success': True, 'message_id': message_id}
except Exception as e:
return {'success': False, 'error': str(e)}
def subscribe_to_topic(self, tokens: List[str], topic: str) -> dict:
"""訂閱主題"""
response = messaging.subscribe_to_topic(tokens, topic)
return {
'success_count': response.success_count,
'failure_count': response.failure_count,
}
def unsubscribe_from_topic(self, tokens: List[str], topic: str) -> dict:
"""取消訂閱主題"""
response = messaging.unsubscribe_from_topic(tokens, topic)
return {
'success_count': response.success_count,
'failure_count': response.failure_count,
}
async def send_async(self, *args, **kwargs) -> dict:
"""異步發送"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, lambda: self.send(*args, **kwargs))
# 使用示例
client = FCMClient()
# 發送通知
result = client.send(
token="eXaMpLe_ToKeN...",
notification=FCMNotification(
title="新訊息",
body="你有一條新訊息"
),
data={
'message_id': '12345',
'sender': 'John'
},
android_config=FCMAndroidConfig(
priority="high",
channel_id="messages"
)
)
# 批量發送
multicast_result = client.send_multicast(
tokens=["token1", "token2", "token3"],
notification=FCMNotification(title="公告", body="系統維護通知")
)
# 發送到主題
topic_result = client.send_to_topic(
topic="news",
notification=FCMNotification(title="突發新聞", body="...")
)
第四章:大規模推播系統設計
4.1 整體架構
graph TB
subgraph "API Layer"
API[API Gateway]
AUTH[Authentication]
end
subgraph "Business Layer"
PUSH_SVC[Push Service]
TEMPLATE[Template Engine]
SEGMENT[User Segmentation]
SCHEDULER[Scheduler]
end
subgraph "Message Queue"
KAFKA[Kafka/RabbitMQ]
PRIORITY[Priority Queue]
end
subgraph "Dispatch Layer"
DISPATCHER[Dispatcher]
APNS_WORKER[APNs Workers]
FCM_WORKER[FCM Workers]
HMS_WORKER[HMS Workers]
end
subgraph "Storage"
REDIS[(Redis<br/>Token Cache)]
DB[(PostgreSQL<br/>Token Store)]
CLICKHOUSE[(ClickHouse<br/>Analytics)]
end
subgraph "Push Services"
APNS[APNs]
FCM[FCM]
HMS[HMS]
end
API --> AUTH
AUTH --> PUSH_SVC
PUSH_SVC --> TEMPLATE
PUSH_SVC --> SEGMENT
PUSH_SVC --> SCHEDULER
SCHEDULER --> KAFKA
PUSH_SVC --> KAFKA
KAFKA --> DISPATCHER
DISPATCHER --> PRIORITY
PRIORITY --> APNS_WORKER
PRIORITY --> FCM_WORKER
PRIORITY --> HMS_WORKER
APNS_WORKER --> APNS
FCM_WORKER --> FCM
HMS_WORKER --> HMS
APNS_WORKER --> REDIS
FCM_WORKER --> REDIS
REDIS --> DB
APNS_WORKER --> CLICKHOUSE
4.2 Token 管理
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from enum import Enum
import redis
import asyncpg
class Platform(Enum):
IOS = "ios"
ANDROID = "android"
WEB = "web"
class TokenStatus(Enum):
ACTIVE = "active"
INACTIVE = "inactive"
UNREGISTERED = "unregistered"
@dataclass
class DeviceToken:
user_id: str
token: str
platform: Platform
app_version: str
device_model: str
os_version: str
status: TokenStatus = TokenStatus.ACTIVE
created_at: datetime = None
updated_at: datetime = None
last_active_at: datetime = None
class TokenRepository:
def __init__(self, db_pool: asyncpg.Pool, redis_client: redis.Redis):
self.db = db_pool
self.redis = redis_client
self.cache_ttl = 3600 # 1 hour
async def save_token(self, token: DeviceToken) -> None:
"""保存或更新 token"""
query = """
INSERT INTO device_tokens
(user_id, token, platform, app_version, device_model,
os_version, status, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (token) DO UPDATE SET
user_id = $1,
platform = $3,
app_version = $4,
device_model = $5,
os_version = $6,
status = $7,
updated_at = $9
"""
now = datetime.utcnow()
await self.db.execute(
query,
token.user_id, token.token, token.platform.value,
token.app_version, token.device_model, token.os_version,
token.status.value, now, now
)
# 更新快取
cache_key = f"tokens:{token.user_id}"
self.redis.delete(cache_key)
async def get_user_tokens(
self,
user_id: str,
platform: Optional[Platform] = None
) -> list[DeviceToken]:
"""獲取使用者的所有 active token"""
# 先查快取
cache_key = f"tokens:{user_id}"
cached = self.redis.get(cache_key)
if cached:
tokens = self._deserialize_tokens(cached)
if platform:
tokens = [t for t in tokens if t.platform == platform]
return tokens
# 查詢資料庫
query = """
SELECT * FROM device_tokens
WHERE user_id = $1 AND status = 'active'
"""
rows = await self.db.fetch(query, user_id)
tokens = [self._row_to_token(row) for row in rows]
# 寫入快取
self.redis.setex(
cache_key,
self.cache_ttl,
self._serialize_tokens(tokens)
)
if platform:
tokens = [t for t in tokens if t.platform == platform]
return tokens
async def mark_unregistered(self, token: str) -> None:
"""標記 token 為已取消註冊"""
query = """
UPDATE device_tokens
SET status = 'unregistered', updated_at = $2
WHERE token = $1
"""
await self.db.execute(query, token, datetime.utcnow())
# 清除相關快取
result = await self.db.fetchrow(
"SELECT user_id FROM device_tokens WHERE token = $1", token
)
if result:
self.redis.delete(f"tokens:{result['user_id']}")
async def cleanup_inactive_tokens(self, days: int = 90) -> int:
"""清理長時間未活躍的 token"""
query = """
UPDATE device_tokens
SET status = 'inactive', updated_at = $2
WHERE last_active_at < $1 AND status = 'active'
RETURNING token
"""
cutoff = datetime.utcnow() - timedelta(days=days)
result = await self.db.fetch(query, cutoff, datetime.utcnow())
return len(result)
4.3 訊息佇列設計
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
import json
import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
class MessagePriority(Enum):
HIGH = 0 # 即時訊息(如聊天)
NORMAL = 1 # 一般通知
LOW = 2 # 行銷訊息
@dataclass
class PushMessage:
message_id: str
user_id: str
title: str
body: str
data: dict
priority: MessagePriority
created_at: datetime
expire_at: datetime
platform: Optional[Platform] = None
retry_count: int = 0
class MessageQueue:
def __init__(self, bootstrap_servers: str):
self.bootstrap_servers = bootstrap_servers
self.producer = None
self.consumers = {}
async def connect(self):
self.producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
)
await self.producer.start()
async def publish(self, message: PushMessage) -> None:
"""發布推送訊息"""
topic = self._get_topic_by_priority(message.priority)
await self.producer.send_and_wait(
topic=topic,
key=message.user_id,
value=asdict(message),
headers=[
('priority', str(message.priority.value).encode()),
('created_at', message.created_at.isoformat().encode()),
]
)
async def publish_batch(self, messages: list[PushMessage]) -> None:
"""批量發布"""
batch = self.producer.create_batch()
for message in messages:
topic = self._get_topic_by_priority(message.priority)
# 批量發送邏輯
await self.producer.send(
topic=topic,
key=message.user_id,
value=asdict(message)
)
await self.producer.flush()
def _get_topic_by_priority(self, priority: MessagePriority) -> str:
return f"push_messages_{priority.name.lower()}"
async def consume(
self,
priority: MessagePriority,
handler: callable,
group_id: str
):
"""消費訊息"""
topic = self._get_topic_by_priority(priority)
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
group_id=group_id,
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False,
)
await consumer.start()
self.consumers[priority] = consumer
try:
async for msg in consumer:
try:
message = PushMessage(**msg.value)
await handler(message)
await consumer.commit()
except Exception as e:
# 錯誤處理,可能需要重試或死信佇列
await self._handle_error(message, e)
finally:
await consumer.stop()
async def _handle_error(self, message: PushMessage, error: Exception):
"""錯誤處理"""
if message.retry_count < 3:
message.retry_count += 1
await self.publish(message)
else:
# 發送到死信佇列
await self.producer.send(
'push_messages_dlq',
key=message.user_id,
value=asdict(message)
)
4.4 推送調度器
import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List
import aioredis
@dataclass
class ScheduledPush:
push_id: str
message: PushMessage
scheduled_time: datetime
target_users: List[str]
status: str = "pending"
class PushScheduler:
def __init__(
self,
redis: aioredis.Redis,
message_queue: MessageQueue,
token_repo: TokenRepository
):
self.redis = redis
self.mq = message_queue
self.token_repo = token_repo
self.running = False
async def schedule(self, push: ScheduledPush) -> str:
"""排程推送"""
# 存儲排程任務
await self.redis.zadd(
'scheduled_pushes',
{push.push_id: push.scheduled_time.timestamp()}
)
# 存儲任務詳情
await self.redis.hset(
f'push_details:{push.push_id}',
mapping={
'message': json.dumps(asdict(push.message)),
'target_users': json.dumps(push.target_users),
'status': push.status,
}
)
return push.push_id
async def cancel(self, push_id: str) -> bool:
"""取消排程"""
removed = await self.redis.zrem('scheduled_pushes', push_id)
if removed:
await self.redis.hset(f'push_details:{push_id}', 'status', 'cancelled')
return removed > 0
async def start(self):
"""啟動排程器"""
self.running = True
while self.running:
try:
await self._process_due_pushes()
except Exception as e:
print(f"Scheduler error: {e}")
await asyncio.sleep(1) # 每秒檢查
async def stop(self):
self.running = False
async def _process_due_pushes(self):
"""處理到期的推送"""
now = datetime.utcnow().timestamp()
# 獲取所有到期的推送
due_pushes = await self.redis.zrangebyscore(
'scheduled_pushes',
min=0,
max=now,
withscores=True
)
for push_id, score in due_pushes:
push_id = push_id.decode() if isinstance(push_id, bytes) else push_id
# 原子操作:移除並處理
removed = await self.redis.zrem('scheduled_pushes', push_id)
if not removed:
continue # 已被其他 worker 處理
# 獲取任務詳情
details = await self.redis.hgetall(f'push_details:{push_id}')
if not details:
continue
# 發送推送
await self._execute_push(push_id, details)
async def _execute_push(self, push_id: str, details: dict):
"""執行推送"""
message_data = json.loads(details[b'message'].decode())
target_users = json.loads(details[b'target_users'].decode())
# 更新狀態
await self.redis.hset(f'push_details:{push_id}', 'status', 'sending')
# 為每個使用者創建推送任務
for user_id in target_users:
tokens = await self.token_repo.get_user_tokens(user_id)
for token in tokens:
message = PushMessage(
message_id=f"{push_id}_{token.token[:8]}",
user_id=user_id,
**message_data
)
await self.mq.publish(message)
# 更新狀態
await self.redis.hset(f'push_details:{push_id}', 'status', 'completed')
4.5 推送 Worker
import asyncio
from collections import defaultdict
from dataclasses import dataclass
import time
@dataclass
class PushStats:
total: int = 0
success: int = 0
failed: int = 0
invalid_tokens: int = 0
class PushWorker:
def __init__(
self,
apns_client: APNsClient,
fcm_client: FCMClient,
token_repo: TokenRepository,
message_queue: MessageQueue,
batch_size: int = 100,
batch_timeout: float = 0.1
):
self.apns = apns_client
self.fcm = fcm_client
self.token_repo = token_repo
self.mq = message_queue
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.stats = PushStats()
self.pending_ios: list = []
self.pending_android: list = []
self.last_flush_time = time.time()
async def process_message(self, message: PushMessage):
"""處理單條推送訊息"""
# 獲取使用者 token
tokens = await self.token_repo.get_user_tokens(
message.user_id,
message.platform
)
if not tokens:
return
for token in tokens:
if token.platform == Platform.IOS:
self.pending_ios.append((token, message))
elif token.platform == Platform.ANDROID:
self.pending_android.append((token, message))
# 檢查是否需要刷新
await self._maybe_flush()
async def _maybe_flush(self):
"""檢查是否需要批量發送"""
now = time.time()
should_flush = (
len(self.pending_ios) >= self.batch_size or
len(self.pending_android) >= self.batch_size or
now - self.last_flush_time > self.batch_timeout
)
if should_flush:
await self._flush()
async def _flush(self):
"""批量發送"""
ios_batch = self.pending_ios[:self.batch_size]
android_batch = self.pending_android[:self.batch_size]
self.pending_ios = self.pending_ios[self.batch_size:]
self.pending_android = self.pending_android[self.batch_size:]
# 並行發送
await asyncio.gather(
self._send_ios_batch(ios_batch),
self._send_android_batch(android_batch),
)
self.last_flush_time = time.time()
async def _send_ios_batch(self, batch: list):
"""批量發送 iOS 推送"""
if not batch:
return
notifications = []
for token, message in batch:
payload = APNsPayload(
title=message.title,
body=message.body,
custom_data=message.data
)
notifications.append((token.token, payload))
results = await self.apns.send_batch(
notifications,
topic="com.example.app"
)
# 處理結果
for (token, _), result in zip(batch, results):
self.stats.total += 1
if result['success']:
self.stats.success += 1
else:
self.stats.failed += 1
if APNsError.should_remove_token(
result.get('status_code', 0),
result.get('reason', '')
):
self.stats.invalid_tokens += 1
await self.token_repo.mark_unregistered(token.token)
async def _send_android_batch(self, batch: list):
"""批量發送 Android 推送"""
if not batch:
return
# FCM 一次最多 500 個 token
chunk_size = 500
for i in range(0, len(batch), chunk_size):
chunk = batch[i:i + chunk_size]
tokens = [t.token for t, _ in chunk]
message = chunk[0][1] # 假設同一批次訊息相同
result = self.fcm.send_multicast(
tokens=tokens,
notification=FCMNotification(
title=message.title,
body=message.body
),
data=message.data
)
self.stats.total += len(tokens)
self.stats.success += result['success_count']
self.stats.failed += result['failure_count']
# 處理無效 token
for r in result['results']:
if r.get('remove_token'):
self.stats.invalid_tokens += 1
await self.token_repo.mark_unregistered(r['token'])
async def run(self, priority: MessagePriority):
"""啟動 worker"""
await self.mq.consume(
priority=priority,
handler=self.process_message,
group_id=f"push_worker_{priority.name}"
)
第五章:送達率優化
5.1 送達率監控
graph TB
subgraph "推送漏斗"
TOTAL[總推送數<br/>100%]
SENT[發送成功<br/>98%]
DELIVERED[送達設備<br/>95%]
DISPLAYED[展示通知<br/>90%]
CLICKED[點擊通知<br/>5%]
end
TOTAL --> SENT
SENT --> DELIVERED
DELIVERED --> DISPLAYED
DISPLAYED --> CLICKED
5.2 優化策略
class DeliveryOptimizer:
"""送達率優化器"""
def __init__(self, analytics_client):
self.analytics = analytics_client
async def get_optimal_send_time(self, user_id: str) -> datetime:
"""獲取使用者最佳推送時間"""
# 分析使用者歷史活躍時間
activity = await self.analytics.get_user_activity_pattern(user_id)
# 找出最活躍的時間段
peak_hours = sorted(
activity.items(),
key=lambda x: x[1],
reverse=True
)[:3]
# 返回下一個最佳時間
return self._next_occurrence(peak_hours[0][0])
async def should_send_now(self, user_id: str, message: PushMessage) -> bool:
"""判斷是否應該立即發送"""
# 高優先級訊息立即發送
if message.priority == MessagePriority.HIGH:
return True
# 檢查使用者勿擾設定
dnd = await self.get_dnd_settings(user_id)
if dnd.is_active():
return False
# 檢查推送頻率限制
recent_count = await self.get_recent_push_count(user_id)
if recent_count > 10: # 每小時最多 10 條
return False
return True
async def prioritize_channels(
self,
user_id: str
) -> list[Platform]:
"""根據使用者偏好排序推送渠道"""
# 獲取各渠道的點擊率
channel_ctr = await self.analytics.get_channel_ctr(user_id)
# 按點擊率排序
sorted_channels = sorted(
channel_ctr.items(),
key=lambda x: x[1],
reverse=True
)
return [Platform(ch) for ch, _ in sorted_channels]
class RetryStrategy:
"""重試策略"""
MAX_RETRIES = 3
RETRY_DELAYS = [1, 5, 30] # 秒
async def should_retry(
self,
message: PushMessage,
error: Exception
) -> tuple[bool, int]:
"""判斷是否應該重試"""
# 不可重試的錯誤
if isinstance(error, (InvalidTokenError, UnregisteredError)):
return False, 0
# 超過最大重試次數
if message.retry_count >= self.MAX_RETRIES:
return False, 0
# 計算延遲時間
delay = self.RETRY_DELAYS[message.retry_count]
# 指數退避
delay *= (2 ** message.retry_count)
return True, delay
第六章:Web Push
6.1 Web Push 原理
sequenceDiagram
participant Browser as Browser
participant SW as Service Worker
participant Server as App Server
participant Push as Push Service<br/>(Browser Vendor)
Browser->>Browser: 請求通知權限
Browser->>Push: 訂閱推送
Push-->>Browser: Push Subscription
Browser->>Server: 發送訂閱信息
Note over Server, Push: 發送推送
Server->>Push: 加密訊息 (VAPID)
Push->>SW: 傳遞訊息
SW->>Browser: 顯示通知
6.2 Web Push 實現
from py_vapid import Vapid
from pywebpush import webpush, WebPushException
import json
from dataclasses import dataclass
@dataclass
class PushSubscription:
endpoint: str
p256dh: str
auth: str
user_id: str
class WebPushClient:
def __init__(
self,
vapid_private_key: str,
vapid_claims: dict
):
self.vapid_private_key = vapid_private_key
self.vapid_claims = vapid_claims
def send(
self,
subscription: PushSubscription,
title: str,
body: str,
data: dict = None,
icon: str = None,
url: str = None,
ttl: int = 86400
) -> dict:
"""發送 Web Push"""
payload = {
'title': title,
'body': body,
'icon': icon,
'data': data or {},
'url': url,
}
subscription_info = {
'endpoint': subscription.endpoint,
'keys': {
'p256dh': subscription.p256dh,
'auth': subscription.auth,
}
}
try:
response = webpush(
subscription_info=subscription_info,
data=json.dumps(payload),
vapid_private_key=self.vapid_private_key,
vapid_claims=self.vapid_claims,
ttl=ttl
)
return {'success': True, 'status_code': response.status_code}
except WebPushException as e:
if e.response.status_code == 410: # Gone - 訂閱已失效
return {
'success': False,
'error': 'subscription_expired',
'remove': True
}
elif e.response.status_code == 429: # Too Many Requests
return {
'success': False,
'error': 'rate_limited',
'retry': True
}
else:
return {
'success': False,
'error': str(e),
'status_code': e.response.status_code
}
# Service Worker (前端)
"""
// service-worker.js
self.addEventListener('push', function(event) {
const data = event.data.json();
const options = {
body: data.body,
icon: data.icon,
badge: '/badge.png',
data: {
url: data.url,
...data.data
},
actions: [
{ action: 'open', title: '查看' },
{ action: 'close', title: '關閉' }
]
};
event.waitUntil(
self.registration.showNotification(data.title, options)
);
});
self.addEventListener('notificationclick', function(event) {
event.notification.close();
if (event.action === 'open' || !event.action) {
const url = event.notification.data.url || '/';
event.waitUntil(
clients.openWindow(url)
);
}
});
"""
第七章:監控與分析
7.1 監控指標
from prometheus_client import Counter, Histogram, Gauge
import time
# Prometheus 指標
push_total = Counter(
'push_notifications_total',
'Total push notifications',
['platform', 'priority', 'status']
)
push_latency = Histogram(
'push_notification_latency_seconds',
'Push notification latency',
['platform'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]
)
pending_messages = Gauge(
'push_pending_messages',
'Pending messages in queue',
['priority']
)
invalid_tokens = Counter(
'push_invalid_tokens_total',
'Invalid tokens detected',
['platform', 'reason']
)
class PushMetrics:
@staticmethod
def record_push(platform: str, priority: str, success: bool, latency: float):
status = 'success' if success else 'failed'
push_total.labels(platform=platform, priority=priority, status=status).inc()
push_latency.labels(platform=platform).observe(latency)
@staticmethod
def record_invalid_token(platform: str, reason: str):
invalid_tokens.labels(platform=platform, reason=reason).inc()
@staticmethod
def update_pending(priority: str, count: int):
pending_messages.labels(priority=priority).set(count)
7.2 分析儀表板
-- ClickHouse 分析查詢
-- 每小時推送統計
SELECT
toStartOfHour(created_at) as hour,
platform,
countIf(status = 'sent') as sent,
countIf(status = 'delivered') as delivered,
countIf(status = 'clicked') as clicked,
round(delivered / sent * 100, 2) as delivery_rate,
round(clicked / delivered * 100, 2) as click_rate
FROM push_events
WHERE created_at >= now() - INTERVAL 24 HOUR
GROUP BY hour, platform
ORDER BY hour DESC;
-- 推送延遲分佈
SELECT
platform,
quantile(0.50)(latency_ms) as p50,
quantile(0.90)(latency_ms) as p90,
quantile(0.99)(latency_ms) as p99,
max(latency_ms) as max_latency
FROM push_events
WHERE created_at >= now() - INTERVAL 1 HOUR
GROUP BY platform;
-- 失敗原因分析
SELECT
platform,
error_code,
error_reason,
count() as count,
round(count() / sum(count()) OVER () * 100, 2) as percentage
FROM push_events
WHERE status = 'failed'
AND created_at >= now() - INTERVAL 24 HOUR
GROUP BY platform, error_code, error_reason
ORDER BY count DESC
LIMIT 20;
-- 使用者觸達分析
SELECT
user_id,
countIf(status = 'sent') as total_sent,
countIf(status = 'delivered') as total_delivered,
countIf(status = 'clicked') as total_clicked,
round(total_clicked / total_delivered * 100, 2) as engagement_rate
FROM push_events
WHERE created_at >= now() - INTERVAL 7 DAY
GROUP BY user_id
HAVING total_sent > 10
ORDER BY engagement_rate DESC
LIMIT 100;
第八章:最佳實踐
8.1 推送內容設計
# 好的推送通知
good_examples:
- title: "您的訂單已發貨"
body: "訂單 #12345 已由順豐快遞攬收,預計明天送達"
action: "查看物流"
- title: "新訊息來自 John"
body: "John: 嗨,週末有空嗎?"
action: "回覆"
- title: "限時優惠"
body: "您關注的商品降價 30%,僅剩 2 小時"
action: "立即查看"
# 不好的推送通知
bad_examples:
- title: "重要通知"
body: "請查看 App 了解詳情" # 內容不明確
- title: "🔥🔥🔥 超級優惠 🔥🔥🔥"
body: "快來看看!!!" # 過度使用符號
- title: "Application Update"
body: "New version available" # 非用戶語言
8.2 頻率控制
class FrequencyController:
"""推送頻率控制"""
def __init__(self, redis: redis.Redis):
self.redis = redis
async def check_and_increment(
self,
user_id: str,
push_type: str
) -> bool:
"""檢查並增加計數"""
limits = {
'marketing': {'hourly': 1, 'daily': 3, 'weekly': 10},
'transactional': {'hourly': 10, 'daily': 50},
'social': {'hourly': 20, 'daily': 100},
}
type_limits = limits.get(push_type, limits['transactional'])
# 檢查各時間窗口
for window, limit in type_limits.items():
key = f"push_freq:{user_id}:{push_type}:{window}"
count = await self.redis.get(key)
if count and int(count) >= limit:
return False
# 增加計數
for window, _ in type_limits.items():
key = f"push_freq:{user_id}:{push_type}:{window}"
ttl = self._get_ttl(window)
await self.redis.incr(key)
await self.redis.expire(key, ttl)
return True
def _get_ttl(self, window: str) -> int:
return {
'hourly': 3600,
'daily': 86400,
'weekly': 604800,
}[window]
8.3 A/B 測試
class PushABTest:
"""推送 A/B 測試"""
def __init__(self, db, analytics):
self.db = db
self.analytics = analytics
async def create_test(
self,
name: str,
variants: list[dict],
traffic_split: list[float],
target_audience: str
) -> str:
"""創建 A/B 測試"""
test_id = generate_uuid()
await self.db.insert('ab_tests', {
'id': test_id,
'name': name,
'variants': json.dumps(variants),
'traffic_split': traffic_split,
'target_audience': target_audience,
'status': 'running',
'created_at': datetime.utcnow(),
})
return test_id
async def get_variant(self, test_id: str, user_id: str) -> dict:
"""為使用者分配變體"""
test = await self.db.get('ab_tests', test_id)
# 使用 user_id hash 確保一致性分配
hash_value = int(hashlib.md5(
f"{test_id}:{user_id}".encode()
).hexdigest(), 16) % 100
cumulative = 0
for i, split in enumerate(test['traffic_split']):
cumulative += split * 100
if hash_value < cumulative:
return test['variants'][i]
return test['variants'][-1]
async def analyze_results(self, test_id: str) -> dict:
"""分析測試結果"""
query = """
SELECT
variant,
count() as total,
countIf(clicked = 1) as clicks,
round(clicks / total * 100, 2) as ctr
FROM push_events
WHERE ab_test_id = %(test_id)s
GROUP BY variant
"""
results = await self.analytics.query(query, {'test_id': test_id})
# 計算統計顯著性
return self._calculate_significance(results)
總結
設計一個高效的推播通知系統需要考慮多個層面:
graph TB
subgraph "核心要素"
PLATFORM[平台適配<br/>APNs/FCM/HMS]
DELIVERY[送達保證<br/>重試/降級]
SCALE[可擴展性<br/>分散式架構]
MONITOR[監控分析<br/>指標/告警]
end
subgraph "優化方向"
LATENCY[降低延遲]
RATE[提高送達率]
ENGAGE[提升互動率]
COST[控制成本]
end
關鍵要點:
- 平台適配:正確實現 APNs/FCM 協議
- Token 管理:及時清理無效 token
- 訊息佇列:解耦和削峰
- 批量處理:提高吞吐量
- 錯誤處理:合理的重試策略
- 頻率控制:避免騷擾使用者
- 監控分析:持續優化