Redis 技術細節深度剖析:從源碼理解高性能記憶體數據庫的設計哲學
執行摘要
Redis (Remote Dictionary Server) 是一個開源的高性能記憶體鍵值數據庫,以其極致的性能(單機 10 萬+ QPS)和豐富的數據結構聞名。本文將深入 Redis 源碼(基於 redis/redis 最新版本),從底層實現角度剖析:
- 事件驅動模型:基於 epoll/kqueue 的高效 I/O 多路複用
- 核心數據結構:SDS、Dict、QuickList、SkipList 等的精妙設計
- 持久化機制:RDB 快照與 AOF 日誌的實現細節
- 高可用架構:主從複製與 Redis Cluster 的工程實現
目錄
1. Redis 整體架構
1.1 系統架構概覽
flowchart TB
subgraph Client["客戶端層"]
C1[redis-cli]
C2[Application SDK]
C3[Jedis/Lettuce/redis-py]
end
subgraph Network["網路層"]
N1[TCP Socket]
N2[RESP 協議解析]
N3[連接管理]
end
subgraph EventLoop["事件驅動層"]
E1[ae.c 事件循環]
E2[epoll/kqueue/select]
E3[File Events]
E4[Time Events]
end
subgraph Command["命令處理層"]
CMD1[命令解析]
CMD2[命令路由]
CMD3[命令執行]
CMD4[回覆生成]
end
subgraph DataStore["數據存儲層"]
DS1[redisDb 數據庫]
DS2[Dict 哈希表]
DS3[各類數據結構]
end
subgraph Persistence["持久化層"]
P1[RDB 快照]
P2[AOF 日誌]
P3[混合持久化]
end
C1 --> N1
C2 --> N1
C3 --> N1
N1 --> N2 --> N3
N3 --> E1
E1 --> E2
E2 --> E3
E2 --> E4
E3 --> CMD1
CMD1 --> CMD2 --> CMD3 --> CMD4
CMD3 --> DS1
DS1 --> DS2 --> DS3
DS1 --> P1
DS1 --> P2
1.2 源碼目錄結構
redis/src/
├── server.c # 主服務器邏輯、初始化、主循環
├── server.h # 核心數據結構定義
├── ae.c # 事件驅動庫(Event Loop)
├── ae_epoll.c # Linux epoll 實現
├── ae_kqueue.c # macOS/BSD kqueue 實現
├── networking.c # 網路連接處理
├── dict.c # 哈希表實現
├── sds.c # Simple Dynamic String
├── adlist.c # 雙向鏈表
├── quicklist.c # 快速列表(List 底層)
├── t_string.c # String 類型命令
├── t_list.c # List 類型命令
├── t_set.c # Set 類型命令
├── t_zset.c # Sorted Set 類型命令
├── t_hash.c # Hash 類型命令
├── rdb.c # RDB 持久化
├── aof.c # AOF 持久化
├── cluster.c # 集群模式
├── replication.c # 主從複製
└── module.c # 模組系統
1.3 核心數據結構:redisServer
// server.h - Redis 服務器全局狀態
struct redisServer {
// 基本配置
pid_t pid; // 進程 ID
char *configfile; // 配置文件路徑
int hz; // serverCron 調用頻率(默認 10)
// 網路
int port; // 監聽端口(默認 6379)
int tcp_backlog; // TCP backlog 大小
list *clients; // 客戶端鏈表
int maxclients; // 最大客戶端數
// 事件循環
aeEventLoop *el; // 事件循環結構
// 數據庫
redisDb *db; // 數據庫數組
int dbnum; // 數據庫數量(默認 16)
// 持久化
char *rdb_filename; // RDB 文件名
char *aof_filename; // AOF 文件名
int aof_state; // AOF 狀態
// 複製
char *masterhost; // 主節點地址
int masterport; // 主節點端口
int repl_state; // 複製狀態
// 集群
int cluster_enabled; // 是否啟用集群
clusterState *cluster; // 集群狀態
// 統計
long long stat_numcommands; // 執行命令總數
long long stat_numconnections; // 連接總數
// ... 更多字段
};
2. 事件驅動模型深度解析
Redis 的高性能核心在於其精巧的事件驅動模型,由 ae.c 實現。
2.1 事件類型
flowchart LR
subgraph Events["Redis 事件類型"]
FE[File Events<br/>文件事件]
TE[Time Events<br/>時間事件]
end
subgraph FileEvents["文件事件"]
FE1[AE_READABLE<br/>可讀事件]
FE2[AE_WRITABLE<br/>可寫事件]
end
subgraph TimeEvents["時間事件"]
TE1[serverCron<br/>定時任務]
TE2[過期鍵清理]
TE3[統計更新]
end
FE --> FE1
FE --> FE2
TE --> TE1
TE --> TE2
TE --> TE3
FE1 --> |客戶端連接/請求| Handler1[acceptTcpHandler<br/>readQueryFromClient]
FE2 --> |回覆客戶端| Handler2[sendReplyToClient]
2.2 事件循環核心結構
// ae.h - 事件循環結構
typedef struct aeEventLoop {
int maxfd; // 當前註冊的最大 fd
int setsize; // 事件數組大小
long long timeEventNextId; // 下一個時間事件 ID
aeFileEvent *events; // 文件事件數組
aeFiredEvent *fired; // 已觸發事件數組
aeTimeEvent *timeEventHead; // 時間事件鏈表頭
int stop; // 停止標誌
void *apidata; // 多路複用 API 私有數據
aeBeforeSleepProc *beforesleep; // 睡眠前回調
aeBeforeSleepProc *aftersleep; // 睡眠後回調
int flags; // 事件循環標誌
} aeEventLoop;
// 文件事件結構
typedef struct aeFileEvent {
int mask; // 事件類型 (AE_READABLE | AE_WRITABLE)
aeFileProc *rfileProc; // 讀事件處理器
aeFileProc *wfileProc; // 寫事件處理器
void *clientData; // 客戶端私有數據
} aeFileEvent;
// 時間事件結構
typedef struct aeTimeEvent {
long long id; // 時間事件 ID
monotime when; // 觸發時間
aeTimeProc *timeProc; // 時間事件處理器
aeEventFinalizerProc *finalizerProc; // 清理函數
void *clientData; // 客戶端私有數據
struct aeTimeEvent *prev; // 前驅
struct aeTimeEvent *next; // 後繼
int refcount; // 引用計數
} aeTimeEvent;
2.3 事件循環主流程
// ae.c - 核心事件循環
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 執行事件處理
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
// 1. 計算最近的時間事件,決定阻塞時間
if (flags & AE_TIME_EVENTS) {
// 尋找最近的時間事件
aeTimeEvent *shortest = aeSearchNearestTimer(eventLoop);
// 計算等待時間
}
// 2. 調用 beforesleep 回調
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
// 3. 調用底層多路複用 API (epoll_wait/kevent/select)
numevents = aeApiPoll(eventLoop, tvp);
// 4. 調用 aftersleep 回調
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 5. 處理文件事件
for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
int mask = eventLoop->fired[j].mask;
aeFileEvent *fe = &eventLoop->events[fd];
// 處理讀事件
if (fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
}
// 處理寫事件
if (fe->mask & mask & AE_WRITABLE) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
}
processed++;
}
// 6. 處理時間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}
2.4 多路複用後端選擇
// ae.c - 根據系統選擇最佳後端
#ifdef HAVE_EVPORT
#include "ae_evport.c" // Solaris
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c" // Linux (最常用)
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c" // macOS/BSD
#else
#include "ae_select.c" // 通用後備
#endif
#endif
#endif
2.5 epoll 實現細節
// ae_epoll.c - Linux epoll 封裝
typedef struct aeApiState {
int epfd; // epoll 文件描述符
struct epoll_event *events; // epoll 事件數組
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
state->events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize);
state->epfd = epoll_create(1024); // 創建 epoll 實例
eventLoop->apidata = state;
return 0;
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 調用 epoll_wait 等待事件
retval = epoll_wait(state->epfd, state->events,
eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
numevents = retval;
for (int j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events + j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE | AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE | AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
3. 核心數據結構實現
3.1 SDS (Simple Dynamic String)
SDS 是 Redis 的基礎字串實現,相比 C 標準字串有顯著優勢:
flowchart LR
subgraph CString["C 標準字串"]
CS[R][e][d][i][s][\0]
end
subgraph SDS["Redis SDS"]
direction TB
Header[sdshdr]
Content[R][e][d][i][s][\0]
Header --> Content
end
subgraph SDSAdvantages["SDS 優勢"]
A1["O(1) 獲取長度"]
A2["二進位安全"]
A3["防止緩衝區溢出"]
A4["減少記憶體重分配"]
end
SDS 結構定義
// sds.h - SDS 類型定義
typedef char *sds; // sds 就是 char*,指向實際字串內容
// 根據字串長度選擇不同的 header 類型以節省記憶體
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; // 低 3 位是類型,高 5 位是長度
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; // 已使用長度
uint8_t alloc; // 分配的總長度
unsigned char flags; // 類型標誌
char buf[]; // 實際字串內容
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len;
uint16_t alloc;
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len;
uint32_t alloc;
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len;
uint64_t alloc;
unsigned char flags;
char buf[];
};
// 類型標誌
#define SDS_TYPE_5 0
#define SDS_TYPE_8 1
#define SDS_TYPE_16 2
#define SDS_TYPE_32 3
#define SDS_TYPE_64 4
SDS 記憶體佈局
sdshdr8 記憶體佈局(字串 "Redis"):
┌──────┬──────┬───────┬───────────────────────┐
│ len │alloc │ flags │ buf[] │
│ 5 │ 10 │ 1 │ R│e│d│i│s│\0│ │ │ │ │ │
├──────┴──────┴───────┼───────────────────────┤
│ 3 bytes │ 11 bytes │
└─────────────────────┴───────────────────────┘
↑
sds 指針指向這裡
核心操作實現
// sds.c - 創建新 SDS
sds sdsnewlen(const void *init, size_t initlen) {
void *sh;
sds s;
// 根據長度選擇類型
char type = sdsReqType(initlen);
if (type == SDS_TYPE_5 && initlen == 0) type = SDS_TYPE_8;
int hdrlen = sdsHdrSize(type);
// 分配記憶體:header + 字串內容 + '\0'
sh = s_malloc(hdrlen + initlen + 1);
if (sh == NULL) return NULL;
s = (char*)sh + hdrlen; // sds 指向 buf 開始
// 設置 header
switch(type) {
case SDS_TYPE_8: {
SDS_HDR_VAR(8, s);
sh->len = initlen;
sh->alloc = initlen;
*fp = type; // flags
break;
}
// ... 其他類型
}
// 複製內容
if (initlen && init)
memcpy(s, init, initlen);
s[initlen] = '\0';
return s;
}
// O(1) 獲取長度
static inline size_t sdslen(const sds s) {
unsigned char flags = s[-1]; // flags 在 buf 前一個字節
switch(flags & SDS_TYPE_MASK) {
case SDS_TYPE_5:
return SDS_TYPE_5_LEN(flags);
case SDS_TYPE_8:
return SDS_HDR(8, s)->len;
// ...
}
return 0;
}
// 字串拼接(自動擴容)
sds sdscatlen(sds s, const void *t, size_t len) {
size_t curlen = sdslen(s);
// 確保有足夠空間
s = sdsMakeRoomFor(s, len);
if (s == NULL) return NULL;
// 拼接
memcpy(s + curlen, t, len);
sdssetlen(s, curlen + len);
s[curlen + len] = '\0';
return s;
}
// 空間預分配策略
sds sdsMakeRoomFor(sds s, size_t addlen) {
size_t avail = sdsavail(s);
if (avail >= addlen) return s; // 空間足夠
size_t len = sdslen(s);
size_t newlen = len + addlen;
// 預分配策略:
// < 1MB: 分配 2 倍
// >= 1MB: 額外分配 1MB
if (newlen < SDS_MAX_PREALLOC)
newlen *= 2;
else
newlen += SDS_MAX_PREALLOC;
// 重新分配
// ...
}
3.2 Dict (哈希表)
Dict 是 Redis 最重要的數據結構,所有 Key-Value 存儲的底層都是 Dict。
flowchart TB
subgraph Dict["dict 結構"]
DictStruct[dict<br/>type<br/>ht_table[2]<br/>ht_used[2]<br/>rehashidx]
end
subgraph HashTables["兩個哈希表"]
HT0[ht_table[0]<br/>正常使用]
HT1[ht_table[1]<br/>rehash 時使用]
end
subgraph Buckets0["ht_table[0] buckets"]
B0[bucket 0]
B1[bucket 1]
B2[bucket 2]
BN[bucket n]
end
subgraph Entries["dictEntry 鏈表"]
E1[entry: key1->val1]
E2[entry: key2->val2]
E3[entry: key3->val3]
end
Dict --> HashTables
HT0 --> Buckets0
B1 --> E1 --> E2
B2 --> E3
Dict 結構定義
// dict.h
typedef struct dictEntry {
void *key; // 鍵
union {
void *val; // 值(指針)
uint64_t u64; // 值(無符號整數)
int64_t s64; // 值(有符號整數)
double d; // 值(浮點數)
} v;
struct dictEntry *next; // 鏈表下一個(解決衝突)
} dictEntry;
typedef struct dict {
dictType *type; // 類型特定函數
dictEntry **ht_table[2]; // 兩個哈希表
unsigned long ht_used[2]; // 已使用條目數
long rehashidx; // rehash 索引,-1 表示未進行
int16_t pauserehash; // rehash 暫停計數
signed char ht_size_exp[2]; // 哈希表大小的 2 的冪次
void *metadata[]; // 元數據
} dict;
// 類型函數
typedef struct dictType {
uint64_t (*hashFunction)(const void *key); // 哈希函數
void *(*keyDup)(dict *d, const void *key); // 鍵複製
void *(*valDup)(dict *d, const void *obj); // 值複製
int (*keyCompare)(dict *d, const void *key1, const void *key2); // 鍵比較
void (*keyDestructor)(dict *d, void *key); // 鍵銷毀
void (*valDestructor)(dict *d, void *obj); // 值銷毀
} dictType;
漸進式 Rehash
Redis 使用漸進式 rehash 避免一次性遷移大量數據導致的阻塞:
// dict.c - 漸進式 rehash
int dictRehash(dict *d, int n) {
int empty_visits = n * 10; // 最多訪問空 bucket 數
if (!dictIsRehashing(d)) return 0;
while (n-- && d->ht_used[0] != 0) {
dictEntry *de, *nextde;
// 跳過空 bucket
while (d->ht_table[0][d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
// 遷移當前 bucket 的所有 entry
de = d->ht_table[0][d->rehashidx];
while (de) {
uint64_t h;
nextde = de->next;
// 計算在新表中的索引
h = dictHashKey(d, de->key) & DICTHT_SIZE_MASK(d->ht_size_exp[1]);
// 插入新表頭部
de->next = d->ht_table[1][h];
d->ht_table[1][h] = de;
d->ht_used[0]--;
d->ht_used[1]++;
de = nextde;
}
d->ht_table[0][d->rehashidx] = NULL;
d->rehashidx++;
}
// 檢查是否完成
if (d->ht_used[0] == 0) {
zfree(d->ht_table[0]);
d->ht_table[0] = d->ht_table[1];
d->ht_used[0] = d->ht_used[1];
d->ht_size_exp[0] = d->ht_size_exp[1];
_dictReset(d, 1);
d->rehashidx = -1;
return 0; // 完成
}
return 1; // 未完成
}
sequenceDiagram
participant Client
participant Redis
participant HT0 as ht_table[0]
participant HT1 as ht_table[1]
Note over Redis: rehashidx = 0
Client->>Redis: SET key1 value1
Redis->>HT0: 遷移 bucket[0] 到 ht[1]
Redis->>HT0: 查找或寫入
Redis->>HT1: 查找或寫入
Note over Redis: rehashidx++
Redis->>Client: OK
Client->>Redis: GET key2
Redis->>HT0: 遷移 bucket[1] 到 ht[1]
Redis->>HT0: 查找 key2
Redis->>HT1: 查找 key2
Note over Redis: rehashidx++
Redis->>Client: value2
Note over Redis: 逐步遷移完成
Note over Redis: rehashidx = -1
3.3 QuickList (List 底層實現)
QuickList 是 Redis 3.2 引入的 List 實現,結合了雙向鏈表和壓縮列表的優點:
flowchart LR
subgraph QuickList
QL[quicklist<br/>head<br/>tail<br/>count<br/>len]
end
subgraph Nodes["quicklistNode 鏈表"]
N1[node1<br/>listpack]
N2[node2<br/>listpack]
N3[node3<br/>listpack]
end
subgraph LP1["listpack (壓縮存儲)"]
E1[elem1]
E2[elem2]
E3[elem3]
end
QL --> N1
N1 <--> N2 <--> N3
N1 --> LP1
// quicklist.h
typedef struct quicklistNode {
struct quicklistNode *prev; // 前驅
struct quicklistNode *next; // 後繼
unsigned char *entry; // listpack 數據
size_t sz; // listpack 大小
unsigned int count : 16; // listpack 中的條目數
unsigned int encoding : 2; // 編碼方式
unsigned int container : 2; // 容器類型
unsigned int recompress : 1; // 是否需要重新壓縮
unsigned int attempted_compress : 1;
unsigned int dont_compress : 1;
unsigned int extra : 9;
} quicklistNode;
typedef struct quicklist {
quicklistNode *head; // 頭節點
quicklistNode *tail; // 尾節點
unsigned long count; // 所有 listpack 的條目總數
unsigned long len; // quicklistNode 數量
signed int fill : QL_FILL_BITS; // listpack 大小限制
unsigned int compress : QL_COMP_BITS; // 壓縮深度
unsigned int bookmark_count : QL_BM_BITS;
quicklistBookmark bookmarks[];
} quicklist;
3.4 SkipList (Sorted Set 底層實現)
flowchart TB
subgraph SkipList["跳躍表結構"]
H[header<br/>level=32]
L4["Level 4: "]
L3["Level 3: "]
L2["Level 2: "]
L1["Level 1: "]
N1[node1<br/>score=1.0]
N2[node2<br/>score=2.0]
N3[node3<br/>score=3.0]
N4[node4<br/>score=4.0]
T[tail]
end
H --> |L4| N4 --> |L4| T
H --> |L3| N2 --> |L3| N4
H --> |L2| N1 --> |L2| N2 --> |L2| N4
H --> |L1| N1 --> |L1| N2 --> |L1| N3 --> |L1| N4 --> |L1| T
// server.h - 跳躍表結構
typedef struct zskiplistNode {
sds ele; // 元素值
double score; // 分數
struct zskiplistNode *backward; // 後退指針
struct zskiplistLevel {
struct zskiplistNode *forward; // 前進指針
unsigned long span; // 跨度(用於計算排名)
} level[]; // 層級數組
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length; // 節點數量
int level; // 最大層級
} zskiplist;
// Sorted Set = SkipList + Dict
typedef struct zset {
dict *dict; // 用於 O(1) 查找
zskiplist *zsl; // 用於範圍查詢和排序
} zset;
4. 命令處理流程
4.1 RESP 協議
Redis 使用 RESP (Redis Serialization Protocol) 進行客戶端-服務器通訊:
請求格式(Inline / Bulk):
*3\r\n # 3 個參數
$3\r\n # 第 1 個參數長度 3
SET\r\n # 第 1 個參數內容
$3\r\n # 第 2 個參數長度 3
key\r\n # 第 2 個參數內容
$5\r\n # 第 3 個參數長度 5
value\r\n # 第 3 個參數內容
回覆格式:
+OK\r\n # 簡單字串 (Simple String)
-ERR message\r\n # 錯誤 (Error)
:1000\r\n # 整數 (Integer)
$5\r\nhello\r\n # 批量字串 (Bulk String)
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n # 數組 (Array)
4.2 命令執行流程
sequenceDiagram
participant Client
participant Network as networking.c
participant Parser as 協議解析
participant Lookup as 命令查找
participant Execute as 命令執行
participant Reply as 回覆生成
Client->>Network: TCP 數據包
Network->>Network: readQueryFromClient()
Network->>Parser: processInputBuffer()
Parser->>Parser: 解析 RESP 協議
Parser->>Lookup: lookupCommand()
Lookup->>Lookup: 在命令表中查找
Lookup->>Execute: call(client, cmd)
Execute->>Execute: 執行命令函數
Execute->>Reply: addReply()
Reply->>Network: 寫入輸出緩衝區
Network->>Client: sendReplyToClient()
4.3 命令表定義
// 命令定義 (commands.def 自動生成)
struct redisCommand {
char *name; // 命令名
redisCommandProc *proc; // 命令處理函數
int arity; // 參數數量 (-N 表示至少 N 個)
char *sflags; // 標誌字串
uint64_t flags; // 標誌位
redisGetKeysProc *getkeys_proc; // 獲取鍵的函數
int firstkey; // 第一個鍵的位置
int lastkey; // 最後一個鍵的位置
int keystep; // 鍵的步長
long long microseconds; // 執行總時間(統計)
long long calls; // 調用次數
// ...
};
// 示例:SET 命令
{"set", setCommand, -3,
"write denyoom @string",
CMD_WRITE | CMD_DENYOOM,
// ...
}
5. 持久化機制
5.1 RDB (Redis Database)
RDB 是 Redis 數據的二進制快照:
flowchart TB
subgraph Trigger["觸發方式"]
T1[SAVE 命令<br/>阻塞]
T2[BGSAVE 命令<br/>後台]
T3[自動觸發<br/>save 配置]
end
subgraph Process["BGSAVE 流程"]
P1[fork() 子進程]
P2[子進程遍歷數據]
P3[寫入臨時文件]
P4[重命名為 dump.rdb]
end
subgraph COW["Copy-On-Write"]
C1[父子共享內存頁]
C2[寫時複製]
C3[節省記憶體]
end
T1 --> P2
T2 --> P1 --> P2 --> P3 --> P4
T3 --> P1
P1 --> COW
// rdb.c - BGSAVE 實現
int rdbSaveBackground(char *filename) {
pid_t childpid;
if (hasActiveChildProcess()) return C_ERR;
server.dirty_before_bgsave = server.dirty;
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
// 子進程
int retval;
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename, &error);
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
// 父進程
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
}
return C_OK;
}
// RDB 文件格式
/*
+-------+-------------+----+----+----+----+----+----+-----+---------+
| REDIS | RDB-VERSION | FA | AUX| DB | DB | DB | ... | EOF | CRC64 |
| 5字節 | 4字節 | | | | | | | 1B | 8字節 |
+-------+-------------+----+----+----+----+----+----+-----+---------+
DB 結構:
+------------+-----------+--------+--------+-----+
| SELECTDB | db_number | resize | expire | KV |
| 0xFE | encoded | info | info | ... |
+------------+-----------+--------+--------+-----+
*/
5.2 AOF (Append Only File)
AOF 記錄所有寫操作,重啟時重放恢復數據:
flowchart LR
subgraph Write["寫入流程"]
W1[執行寫命令]
W2[追加到 aof_buf]
W3[事件循環前刷新]
end
subgraph Sync["同步策略"]
S1[always<br/>每次寫入 fsync]
S2[everysec<br/>每秒 fsync]
S3[no<br/>由 OS 決定]
end
subgraph Rewrite["AOF 重寫"]
R1[BGREWRITEAOF]
R2[fork 子進程]
R3[遍歷當前數據]
R4[生成新 AOF]
end
W1 --> W2 --> W3
W3 --> S1
W3 --> S2
W3 --> S3
R1 --> R2 --> R3 --> R4
// aof.c - AOF 追加
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
// 如果需要,添加 SELECT 命令
if (dictid != server.aof_selected_db) {
buf = sdscatprintf(buf, "*2\r\n$6\r\nSELECT\r\n$%d\r\n%d\r\n",
digits, dictid);
server.aof_selected_db = dictid;
}
// 將命令轉換為 RESP 格式
buf = catAppendOnlyGenericCommand(buf, argc, argv);
// 追加到 AOF 緩衝區
server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
sdsfree(buf);
}
// AOF 同步策略
void flushAppendOnlyFile(int force) {
if (sdslen(server.aof_buf) == 0) return;
// 寫入文件
nwritten = aofWrite(server.aof_fd, server.aof_buf, sdslen(server.aof_buf));
// 根據策略 fsync
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
redis_fsync(server.aof_fd);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC) {
if (time(NULL) - server.aof_last_fsync >= 1) {
aof_background_fsync(server.aof_fd);
server.aof_last_fsync = time(NULL);
}
}
}
5.3 混合持久化
Redis 4.0 引入混合持久化,AOF 重寫時使用 RDB 格式:
AOF 混合格式:
+------------------+-------------------+
| RDB 格式數據 | 增量 AOF 命令 |
| (全量快照) | (重寫後的寫入) |
+------------------+-------------------+
6. 記憶體管理
6.1 記憶體分配器
Redis 支持多種記憶體分配器:
// zmalloc.c - 記憶體分配封裝
#if defined(USE_TCMALLOC)
#include <google/tcmalloc.h>
#elif defined(USE_JEMALLOC)
#include <jemalloc/jemalloc.h>
#else
// 使用 libc malloc
#endif
void *zmalloc(size_t size) {
void *ptr = malloc(size + PREFIX_SIZE);
if (!ptr) zmalloc_oom_handler(size);
// 記錄分配大小(用於統計)
*((size_t*)ptr) = size;
update_zmalloc_stat_alloc(size + PREFIX_SIZE);
return (char*)ptr + PREFIX_SIZE;
}
void zfree(void *ptr) {
if (ptr == NULL) return;
realptr = (char*)ptr - PREFIX_SIZE;
oldsize = *((size_t*)realptr);
update_zmalloc_stat_free(oldsize + PREFIX_SIZE);
free(realptr);
}
6.2 記憶體淘汰策略
// evict.c - 記憶體淘汰
int performEvictions(void) {
while (mem_used > server.maxmemory) {
// 根據策略選擇要淘汰的 key
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU) {
// LRU 淘汰
bestkey = approximateLRU();
} else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LFU ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_LFU) {
// LFU 淘汰
bestkey = approximateLFU();
} else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) {
// 隨機淘汰
bestkey = randomKey();
}
// 刪除 key
if (bestkey) {
deleteKey(bestkey);
delta = mem_used - zmalloc_used_memory();
mem_freed += delta;
}
}
return EVICT_OK;
}
7. 主從複製
7.1 複製流程
sequenceDiagram
participant Slave
participant Master
Slave->>Master: PING
Master->>Slave: PONG
Slave->>Master: REPLCONF listening-port 6380
Master->>Slave: OK
Slave->>Master: REPLCONF capa eof capa psync2
Master->>Slave: OK
Slave->>Master: PSYNC ? -1 (首次全量)
Master->>Slave: +FULLRESYNC <runid> <offset>
Master->>Slave: 發送 RDB 文件
Master->>Slave: 發送緩衝區積累的命令
loop 增量同步
Master->>Slave: 傳播寫命令
Slave->>Master: REPLCONF ACK <offset>
end
7.2 部分重同步
// replication.c
void syncCommand(client *c) {
// 嘗試部分重同步
if (!strcasecmp(c->argv[0]->ptr, "psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
// 部分重同步成功
return;
}
}
// 全量重同步
// 1. 發送 +FULLRESYNC
// 2. BGSAVE 生成 RDB
// 3. 發送 RDB
// 4. 發送積壓緩衝區命令
}
// 積壓緩衝區
typedef struct replBacklog {
char *buf; // 環形緩衝區
size_t size; // 緩衝區大小
size_t histlen; // 有效數據長度
size_t offset; // 複製偏移量
// ...
} replBacklog;
8. Redis Cluster
8.1 集群架構
flowchart TB
subgraph Cluster["Redis Cluster (16384 slots)"]
subgraph Node1["Node 1 (Master)"]
S1[Slots 0-5460]
end
subgraph Node2["Node 2 (Master)"]
S2[Slots 5461-10922]
end
subgraph Node3["Node 3 (Master)"]
S3[Slots 10923-16383]
end
subgraph Replicas["Replicas"]
R1[Node 4<br/>Replica of Node 1]
R2[Node 5<br/>Replica of Node 2]
R3[Node 6<br/>Replica of Node 3]
end
end
Node1 <--> Node2 <--> Node3
Node1 <--> Node3
Node1 --> R1
Node2 --> R2
Node3 --> R3
Client[Client] --> |CRC16(key) mod 16384| Cluster
8.2 Slot 計算
// cluster.c - 計算 key 所屬 slot
unsigned int keyHashSlot(char *key, int keylen) {
int s, e;
// 尋找 hash tag: {xxx}
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
if (s == keylen) return crc16(key, keylen) & 0x3FFF;
for (e = s + 1; e < keylen; e++)
if (key[e] == '}') break;
if (e == keylen || e == s + 1) return crc16(key, keylen) & 0x3FFF;
// 使用 {} 內的內容計算
return crc16(key + s + 1, e - s - 1) & 0x3FFF;
}
8.3 Gossip 協議
// cluster.c - Gossip 消息
typedef struct {
char sig[4]; // "RCmb" 簽名
uint32_t totlen; // 消息總長度
uint16_t ver; // 版本
uint16_t port; // 端口
uint16_t type; // 消息類型
uint16_t count; // 附帶的節點信息數量
uint64_t currentEpoch;
uint64_t configEpoch;
uint64_t offset;
char sender[CLUSTER_NAMELEN];
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
// ...
clusterMsgDataGossip gossip[]; // Gossip 數據
} clusterMsg;
// 消息類型
#define CLUSTERMSG_TYPE_PING 0 // Ping
#define CLUSTERMSG_TYPE_PONG 1 // Pong
#define CLUSTERMSG_TYPE_MEET 2 // 加入集群
#define CLUSTERMSG_TYPE_FAIL 3 // 節點失效
#define CLUSTERMSG_TYPE_UPDATE 4 // 配置更新
9. 性能優化策略
9.1 Pipeline
sequenceDiagram
participant Client
participant Redis
Note over Client,Redis: 普通模式
Client->>Redis: SET key1 value1
Redis->>Client: OK
Client->>Redis: SET key2 value2
Redis->>Client: OK
Note over Client,Redis: Pipeline 模式
Client->>Redis: SET key1 value1
Client->>Redis: SET key2 value2
Client->>Redis: SET key3 value3
Redis->>Client: OK
Redis->>Client: OK
Redis->>Client: OK
9.2 對象編碼優化
// object.c - 智能編碼選擇
#define OBJ_ENCODING_RAW 0 // 原始字串
#define OBJ_ENCODING_INT 1 // 整數編碼
#define OBJ_ENCODING_HT 2 // 哈希表
#define OBJ_ENCODING_ZIPLIST 5 // 壓縮列表 (已棄用)
#define OBJ_ENCODING_INTSET 6 // 整數集合
#define OBJ_ENCODING_SKIPLIST 7 // 跳躍表
#define OBJ_ENCODING_EMBSTR 8 // 嵌入式字串
#define OBJ_ENCODING_QUICKLIST 9 // 快速列表
#define OBJ_ENCODING_LISTPACK 11 // listpack
// 字串編碼選擇
robj *tryObjectEncodingEx(robj *o, int try_int) {
if (o->encoding != OBJ_ENCODING_RAW) return o;
len = sdslen(o->ptr);
// 嘗試整數編碼
if (try_int && len <= 20 && string2l(o->ptr, len, &value)) {
if (value >= 0 && value < OBJ_SHARED_INTEGERS) {
return shared.integers[value]; // 共享整數對象
}
o->encoding = OBJ_ENCODING_INT;
o->ptr = (void*) value;
return o;
}
// 嵌入式字串 (<=44 字節)
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
return createEmbeddedStringObject(o->ptr, len);
}
return o;
}
9.3 單線程 vs 多線程 I/O
Redis 6.0 引入了多線程 I/O:
// networking.c - 多線程 I/O
void initThreadedIO(void) {
server.io_threads_num = server.io_threads;
for (int i = 0; i < server.io_threads_num; i++) {
pthread_create(&tid, NULL, IOThreadMain, (void*)(long)i);
}
}
void *IOThreadMain(void *myid) {
while (1) {
// 等待主線程分配任務
while (getIOPendingCount(id) == 0) {
// busy waiting
}
// 處理讀/寫任務
listIter li;
listNode *ln;
listRewind(io_threads_list[id], &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
writeToClient(c, 0);
}
}
// 標記完成
setIOPendingCount(id, 0);
}
}
總結
Redis 的高性能來自於多個精心設計的組件:
| 組件 | 設計決策 | 性能影響 |
|---|---|---|
| 事件循環 | 單線程 + I/O 多路複用 | 避免鎖開銷,簡化並發 |
| SDS | 預分配 + O(1) 長度獲取 | 減少記憶體分配次數 |
| Dict | 漸進式 rehash | 避免阻塞 |
| 編碼優化 | 智能選擇底層結構 | 節省記憶體 |
| 持久化 | fork + COW | 後台執行,不影響服務 |
| 複製 | 部分重同步 | 減少網路傳輸 |
參考資料
本文基於 Redis 最新源碼分析,部分 API 可能隨版本更新而變化。