Hummingbot 深度剖析:從交易連接器到策略執行引擎的完整架構解析
生成提示詞
請深入分析 Hummingbot 開源量化交易框架的完整架構:
1. Clone hummingbot 源代碼並閱讀核心模組
2. 分析連接器系統如何對接多個交易所
3. 研究訂單管理與事件驅動架構
4. 解析 Strategy V1 與 V2 的差異與演進
5. 繪製 Mermaid 流程圖說明數據流與交易執行
6. 包含關鍵源碼片段與文件路徑
7. 以繁體中文撰寫工程級深度文章
執行摘要
Hummingbot 是一個開源的加密貨幣量化交易框架,支援多交易所連接、自動做市(AMM)、套利策略與自定義交易邏輯。本文將深入剖析其核心技術:
- 模組化連接器架構:如何統一不同交易所的 API 差異
- Cython 高效能核心:OrderBook、Clock、PubSub 的 C++ 優化
- 事件驅動系統:弱引用 Pub/Sub 模式與記憶體管理
- Strategy V2 演進:Controller + Executor 的現代化策略架構
- 訂單生命週期管理:從下單到成交的完整追蹤機制
目錄
- Hummingbot 架構總覽
- 連接器系統深度解析
- 訂單簿與數據結構
- 事件驅動架構
- Strategy V1:腳本策略基礎
- Strategy V2:控制器與執行器
- 訂單管理與生命週期
- 時鐘與時間迭代系統
- 連接器實作範例:OKX
- 效能優化與 Cython
1. Hummingbot 架構總覽
1.1 系統組件架構
flowchart TB
subgraph Client["客戶端層"]
CLI[Hummingbot CLI]
MQTT[MQTT Bridge]
Headless[Headless Mode]
end
subgraph Application["應用層"]
HBApp[HummingbotApplication]
TradingCore[TradingCore]
ConnMgr[ConnectorManager]
end
subgraph Strategy["策略層"]
StratV1[Script Strategy V1]
StratV2[Strategy V2]
Controller[Controllers]
Executor[Executors]
end
subgraph Connector["連接器層"]
ExchangeBase[ExchangeBase]
OKX[OKX Connector]
Binance[Binance Connector]
Uniswap[Uniswap Connector]
end
subgraph Data["數據層"]
OrderBook[OrderBook Tracker]
UserStream[User Stream Tracker]
OrderTracker[Client Order Tracker]
end
subgraph Core["核心層 - Cython"]
Clock[Clock]
PubSub[PubSub Event System]
TimeIterator[TimeIterator]
end
CLI --> HBApp
MQTT --> HBApp
Headless --> HBApp
HBApp --> TradingCore
TradingCore --> ConnMgr
TradingCore --> StratV1
TradingCore --> StratV2
StratV2 --> Controller
Controller --> Executor
ConnMgr --> ExchangeBase
ExchangeBase --> OKX
ExchangeBase --> Binance
ExchangeBase --> Uniswap
OKX --> OrderBook
OKX --> UserStream
OKX --> OrderTracker
Clock --> TimeIterator
TimeIterator --> StratV1
TimeIterator --> ExchangeBase
PubSub --> StratV1
PubSub --> Executor
1.2 核心模組結構
hummingbot/
├── __init__.py # 初始化、日誌、路徑設定
├── client/ # CLI 客戶端
│ ├── hummingbot_application.py # 主應用程式
│ └── config/ # 配置管理
├── connector/ # 交易所連接器
│ ├── connector_base.pyx # Cython 基類
│ ├── exchange_py_base.py # Python 交易所基類
│ └── exchange/ # 各交易所實作
├── core/ # 核心組件
│ ├── clock.pyx # 時鐘系統
│ ├── pubsub.pyx # 事件發布訂閱
│ ├── trading_core.py # 交易核心
│ ├── connector_manager.py # 連接器管理
│ └── data_type/ # 數據結構
├── strategy/ # V1 策略
│ └── script_strategy_base.py
└── strategy_v2/ # V2 策略
├── controllers/ # 控制器
└── executors/ # 執行器
1.3 TradingCore:核心交易引擎
檔案路徑: hummingbot/core/trading_core.py
class TradingCore:
"""
核心交易功能的模組化架構
提供:
- 連接器管理(創建、添加、移除連接器)
- 市場數據訪問(訂單簿、餘額等)
- 策略管理(可選 - 可無策略運行)
- 直接交易能力
- 時鐘管理用於即時操作
"""
def __init__(self, client_config):
self.connector_manager = ConnectorManager(self.client_config_map)
self.clock: Optional[Clock] = None
self.strategy: Optional[StrategyBase] = None
self.markets_recorder: Optional[MarketsRecorder] = None
self.trade_fill_db: Optional[SQLConnectionManager] = None
self._metrics_collectors: Dict[str, MetricsCollector] = {}
def create_connector(self, connector_name: str,
trading_pairs: List[str]) -> ExchangeBase:
"""動態創建交易所連接器"""
return self.connector_manager.create_connector(
connector_name, trading_pairs
)
async def start_clock(self):
"""啟動時鐘系統進行即時交易"""
if self.clock is None:
self.clock = Clock(ClockMode.REALTIME, tick_size=1.0)
await self.clock.run()
def start_strategy(self, strategy: StrategyBase):
"""啟動交易策略"""
self.strategy = strategy
self.clock.add_iterator(strategy)
2. 連接器系統深度解析
2.1 連接器架構層次
classDiagram
class NetworkIterator {
<<Cython>>
+start_network()
+stop_network()
+check_network()
}
class ConnectorBase {
<<Cython>>
+_event_reporter
+_event_logger
+_account_balances
+buy()
+sell()
+cancel()
}
class ExchangeBase {
<<Cython>>
+order_book_tracker
+user_stream_tracker
+trading_rules
+get_order_book()
}
class ExchangePyBase {
<<Python>>
+_throttler
+_time_synchronizer
+_order_tracker
+_place_order()
+_execute_cancel()
}
class OkxExchange {
+okx_api_key
+authenticator
+supported_order_types()
}
NetworkIterator <|-- ConnectorBase
ConnectorBase <|-- ExchangeBase
ExchangeBase <|-- ExchangePyBase
ExchangePyBase <|-- OkxExchange
2.2 連接器基類 (Cython 實作)
檔案路徑: hummingbot/connector/connector_base.pyx
cdef class ConnectorBase(NetworkIterator):
# 市場事件類型定義
MARKET_EVENTS = [
MarketEvent.ReceivedAsset,
MarketEvent.BuyOrderCompleted,
MarketEvent.SellOrderCompleted,
MarketEvent.OrderCancelled,
MarketEvent.OrderFilled,
MarketEvent.OrderExpired,
MarketEvent.OrderUpdate,
MarketEvent.TradeUpdate,
MarketEvent.OrderFailure,
MarketEvent.TransactionFailure,
MarketEvent.BuyOrderCreated,
MarketEvent.SellOrderCreated,
MarketEvent.FundingPaymentCompleted,
]
def __init__(self, balance_asset_limit=None):
super().__init__()
self._event_reporter = EventReporter(event_source=self.display_name)
self._event_logger = EventLogger(event_source=self.display_name)
self._account_balances: Dict[str, Decimal] = {}
self._account_available_balances: Dict[str, Decimal] = {}
self._current_trade_fills: Set[str] = set()
self._trade_fee_schema: Optional[TradeFeeSchema] = None
@property
def tracking_states(self) -> Dict[str, Any]:
"""返回當前追蹤的訂單狀態"""
return {}
def get_balance(self, currency: str) -> Decimal:
"""獲取特定幣種餘額"""
return self._account_balances.get(currency, Decimal(0))
def get_available_balance(self, currency: str) -> Decimal:
"""獲取可用餘額(扣除凍結部分)"""
return self._account_available_balances.get(currency, Decimal(0))
2.3 Python 交易所基類
檔案路徑: hummingbot/connector/exchange_py_base.py
class ExchangePyBase(ExchangeBase, ABC):
SHORT_POLL_INTERVAL = 5.0 # 短輪詢間隔
LONG_POLL_INTERVAL = 120.0 # 長輪詢間隔
def __init__(self, balance_asset_limit=None,
rate_limits_share_pct=Decimal("100")):
super().__init__()
self._trading_rules: Dict[str, TradingRule] = {}
self._trading_fees: Dict[str, TradeFeeSchema] = {}
# 時間同步器 - 處理伺服器時間偏差
self._time_synchronizer = TimeSynchronizer()
# 限速器 - 遵守 API 調用限制
self._throttler = AsyncThrottler(
rate_limits=self.rate_limits_rules,
limits_share_percentage=rate_limits_share_pct
)
# 訂單簿數據源與追蹤器
self._orderbook_ds = self._create_order_book_data_source()
self._set_order_book_tracker(OrderBookTracker(
data_source=self._orderbook_ds,
trading_pairs=self._trading_pairs
))
# 用戶流追蹤器 - 餘額與訂單更新
self._user_stream_tracker = self._create_user_stream_tracker()
# 訂單追蹤器 - 管理活躍訂單
self._order_tracker: ClientOrderTracker = self._create_order_tracker()
@property
def status_dict(self) -> Dict[str, bool]:
"""連接器就緒狀態檢查"""
return {
"symbols_mapping_initialized": self.trading_pair_symbol_map_ready(),
"order_books_initialized": self.order_book_tracker.ready,
"account_balance": len(self._account_balances) > 0,
"trading_rule_initialized": len(self._trading_rules) > 0,
"user_stream_initialized": self._is_user_stream_initialized(),
}
@property
def ready(self) -> bool:
"""所有組件是否就緒"""
return all(self.status_dict.values())
@abstractmethod
async def _place_order(self, order_id: str, trading_pair: str,
amount: Decimal, trade_type: TradeType,
order_type: OrderType, price: Decimal) -> str:
"""下單抽象方法 - 各交易所實作"""
raise NotImplementedError
2.4 連接器管理器
檔案路徑: hummingbot/core/connector_manager.py
class ConnectorManager:
"""
動態管理交易所連接器
提供:
- 動態創建與初始化連接器
- 添加/移除連接器
- 無策略情況下訪問市場數據
- 直接通過連接器下單
- 獨立於策略管理連接器生命週期
"""
def create_connector(self, connector_name: str,
trading_pairs: List[str],
trading_required: bool = True,
api_keys: Optional[Dict] = None) -> ConnectorBase:
"""
動態創建連接器實例
Args:
connector_name: 交易所名稱 (如 "okx", "binance")
trading_pairs: 交易對列表 ["BTC-USDT", "ETH-USDT"]
trading_required: 是否需要交易權限
api_keys: 可選的 API 金鑰覆蓋
"""
# 獲取連接器設定
connector_setting = AllConnectorSettings.get_connector_settings()[connector_name]
# 處理模擬交易模式
if self._client_config.paper_trade.paper_trade_enabled:
connector_name = f"{connector_name}_paper_trade"
# 載入 API 金鑰
if api_keys is None:
api_keys = self._load_api_keys(connector_name)
# 創建初始化參數
init_params = connector_setting.conn_init_parameters(
trading_pairs=trading_pairs,
trading_required=trading_required,
api_keys=api_keys,
client_config_map=self._client_config
)
# 實例化連接器
connector_class = connector_setting.connector_class()
connector = connector_class(**init_params)
return connector
3. 訂單簿與數據結構
3.1 訂單簿 (Cython 高效能實作)
檔案路徑: hummingbot/core/data_type/order_book.pyx
cdef class OrderBook(PubSub):
"""
高效能訂單簿實作
使用 C++ std::set 實現 O(log n) 的插入與刪除
"""
ORDER_BOOK_TRADE_EVENT_TAG = OrderBookEvent.TradeEvent.value
def __init__(self, dex: bool = False):
super().__init__()
self._snapshot_uid: int64_t = 0
self._last_diff_uid: int64_t = 0
self._best_bid: double = float("NaN")
self._best_ask: double = float("NaN")
self._last_trade_price: double = float("NaN")
self._dex = dex
cdef c_apply_diffs(self, vector[OrderBookEntry] bids,
vector[OrderBookEntry] asks,
int64_t update_id):
"""
應用增量更新
- 移除已撤銷的掛單
- 插入新掛單
- 處理價格重疊
- 更新最佳買賣價
"""
cdef:
OrderBookEntry entry
set[OrderBookEntry] *bid_book = address(self._bid_book)
set[OrderBookEntry] *ask_book = address(self._ask_book)
# 處理買單更新
for entry in bids:
if entry.amount == 0:
# 數量為 0 表示移除
bid_book.erase(entry)
else:
bid_book.insert(entry)
# 處理賣單更新
for entry in asks:
if entry.amount == 0:
ask_book.erase(entry)
else:
ask_book.insert(entry)
self._last_diff_uid = update_id
self._update_best_bid_ask()
cdef c_apply_snapshot(self, vector[OrderBookEntry] bids,
vector[OrderBookEntry] asks,
int64_t update_id):
"""應用完整快照 - 用於初始化或同步"""
self._bid_book.clear()
self._ask_book.clear()
for entry in bids:
self._bid_book.insert(entry)
for entry in asks:
self._ask_book.insert(entry)
self._snapshot_uid = update_id
self._update_best_bid_ask()
@property
def get_price(self) -> float:
"""獲取中間價"""
return (self._best_bid + self._best_ask) / 2.0
def get_price_for_volume(self, is_buy: bool, volume: Decimal) -> Decimal:
"""計算特定交易量的加權平均價格"""
book = self._ask_book if is_buy else self._bid_book
remaining_volume = float(volume)
total_cost = 0.0
for entry in book:
if remaining_volume <= 0:
break
fill_amount = min(entry.amount, remaining_volume)
total_cost += fill_amount * entry.price
remaining_volume -= fill_amount
return Decimal(str(total_cost / float(volume)))
3.2 訂單狀態機
檔案路徑: hummingbot/core/data_type/in_flight_order.py
class OrderState(Enum):
"""訂單狀態枚舉"""
PENDING_CREATE = 0 # 等待創建
OPEN = 1 # 已開立
PENDING_CANCEL = 2 # 等待取消
CANCELED = 3 # 已取消
PARTIALLY_FILLED = 4 # 部分成交
FILLED = 5 # 完全成交
FAILED = 6 # 失敗
PENDING_APPROVAL = 7 # 等待批准 (DEX)
APPROVED = 8 # 已批准 (DEX)
CREATED = 9 # 已創建
COMPLETED = 10 # 已完成
stateDiagram-v2
[*] --> PENDING_CREATE: 提交訂單
PENDING_CREATE --> CREATED: 交易所確認
PENDING_CREATE --> FAILED: 提交失敗
CREATED --> OPEN: 掛單成功
OPEN --> PARTIALLY_FILLED: 部分成交
PARTIALLY_FILLED --> PARTIALLY_FILLED: 繼續成交
PARTIALLY_FILLED --> FILLED: 完全成交
OPEN --> FILLED: 完全成交
OPEN --> PENDING_CANCEL: 請求取消
PARTIALLY_FILLED --> PENDING_CANCEL: 請求取消
PENDING_CANCEL --> CANCELED: 取消成功
PENDING_CANCEL --> FILLED: 取消前成交
FILLED --> COMPLETED: 結算完成
CANCELED --> COMPLETED: 清理完成
COMPLETED --> [*]
FAILED --> [*]
3.3 InFlightOrder:活躍訂單追蹤
class InFlightOrder:
"""
追蹤從下單到完成的訂單完整生命週期
"""
def __init__(self,
client_order_id: str,
trading_pair: str,
order_type: OrderType,
trade_type: TradeType,
amount: Decimal,
creation_timestamp: float,
price: Optional[Decimal] = None,
exchange_order_id: Optional[str] = None,
initial_state: OrderState = OrderState.PENDING_CREATE,
leverage: int = 1,
position: PositionAction = PositionAction.NIL):
self.client_order_id = client_order_id
self.trading_pair = trading_pair
self.order_type = order_type
self.trade_type = trade_type
self.amount = amount
self.price = price
self.creation_timestamp = creation_timestamp
self.exchange_order_id = exchange_order_id
self.current_state = initial_state
# 成交記錄
self.order_fills: Dict[str, TradeUpdate] = {}
# 異步事件 - 用於等待特定狀態
self.exchange_order_id_update_event = asyncio.Event()
self.completely_filled_event = asyncio.Event()
self.processed_by_exchange_event = asyncio.Event()
@property
def executed_amount_base(self) -> Decimal:
"""已成交的基礎資產數量"""
return sum(fill.fill_base_amount for fill in self.order_fills.values())
@property
def executed_amount_quote(self) -> Decimal:
"""已成交的計價資產數量"""
return sum(fill.fill_quote_amount for fill in self.order_fills.values())
@property
def average_executed_price(self) -> Decimal:
"""平均成交價格"""
if self.executed_amount_base == Decimal(0):
return Decimal(0)
return self.executed_amount_quote / self.executed_amount_base
def update_with_trade_update(self, trade_update: TradeUpdate) -> bool:
"""
使用成交更新訂單
Returns: 是否為新的成交記錄
"""
trade_id = trade_update.trade_id
if trade_id in self.order_fills:
return False # 重複的成交
self.order_fills[trade_id] = trade_update
# 檢查是否完全成交
if self.executed_amount_base >= self.amount:
self.current_state = OrderState.FILLED
self.completely_filled_event.set()
elif self.executed_amount_base > Decimal(0):
self.current_state = OrderState.PARTIALLY_FILLED
return True
3.4 OrderCandidate:訂單候選計算
檔案路徑: hummingbot/core/data_type/order_candidate.py
@dataclass
class OrderCandidate:
"""
訂單候選 - 計算訂單對帳戶的完整影響
包含:抵押品、手續費、潛在收益
可根據可用餘額調整訂單大小
"""
trading_pair: str
is_maker: bool
order_type: OrderType
order_side: TradeType
amount: Decimal
price: Decimal
# 計算後填充的欄位
order_collateral: Optional[TokenAmount] = None
percent_fee_collateral: Optional[TokenAmount] = None
percent_fee_value: Optional[TokenAmount] = None
fixed_fee_collaterals: List[TokenAmount] = field(default_factory=list)
potential_returns: Optional[TokenAmount] = None
resized: bool = False
def populate_collateral_entries(self, exchange: ExchangeBase):
"""計算所有抵押品與手續費項目"""
self._populate_order_collateral_entry(exchange)
fee = self._get_fee(exchange)
self._populate_percent_fee_collateral_entry(exchange, fee)
self._populate_fixed_fee_collateral_entries(fee)
self._populate_potential_returns_entry(exchange)
def adjust_from_balances(self,
available_balances: Dict[str, Decimal]) -> bool:
"""
根據可用餘額調整訂單大小
Returns: 是否進行了調整
"""
collateral_token = self.order_collateral.token
required = self.order_collateral.amount
available = available_balances.get(collateral_token, Decimal(0))
if available < required:
# 計算可負擔的最大數量
reduction_ratio = available / required
self.amount *= reduction_ratio
self.resized = True
# 重新計算所有欄位
self.populate_collateral_entries(exchange)
return True
return False
4. 事件驅動架構
4.1 事件類型定義
檔案路徑: hummingbot/core/event/events.py
class MarketEvent(Enum):
"""市場事件類型"""
ReceivedAsset = 101
BuyOrderCompleted = 102
SellOrderCompleted = 103
OrderCancelled = 106
OrderFilled = 107
OrderExpired = 108
OrderUpdate = 109
TradeUpdate = 110
OrderFailure = 198
TransactionFailure = 199
BuyOrderCreated = 200
SellOrderCreated = 201
FundingPaymentCompleted = 202 # 永續合約資金費率
FundingInfo = 203
RangePositionLiquidityAdded = 300 # DEX LP
RangePositionLiquidityRemoved = 301
RangePositionUpdate = 302
@dataclass
class OrderFilledEvent:
"""訂單成交事件"""
timestamp: float
order_id: str
trading_pair: str
trade_type: TradeType # BUY / SELL
order_type: OrderType # LIMIT / MARKET
price: Decimal
amount: Decimal
trade_fee: TradeFeeBase
exchange_trade_id: str = ""
exchange_order_id: str = ""
leverage: Optional[int] = 1
position: Optional[str] = PositionAction.NIL.value
@dataclass
class BuyOrderCompletedEvent:
"""買單完成事件"""
timestamp: float
order_id: str
base_asset: str
quote_asset: str
base_asset_amount: Decimal
quote_asset_amount: Decimal
order_type: OrderType
exchange_order_id: Optional[str] = None
4.2 PubSub 事件系統 (Cython)
檔案路徑: hummingbot/core/pubsub.pyx
cdef class PubSub:
"""
高效能發布訂閱系統
特性:
- 使用弱引用避免監聽器記憶體洩漏
- 概率性垃圾回收清理死亡監聽器
- C++ 容器提供高效能事件分發
"""
# 每次添加監聽器時 0.5% 機率觸發 GC
ADD_LISTENER_GC_PROBABILITY = 0.005
def __init__(self):
# C++ map: event_tag -> set<listener_weakref>
self._events = Events()
cdef c_add_listener(self, int64_t event_tag, EventListener listener):
"""添加事件監聽器(使用弱引用)"""
# 創建弱引用避免循環引用
cdef object listener_weakref = PyWeakref_NewRef(listener, None)
cdef EventListenerWeakRefWrapper listener_wrapper = \
EventListenerWeakRefWrapper(listener_weakref)
# 查找或創建事件標籤的監聽器集合
cdef map[int64_t, set[EventListenerWeakRefWrapper]].iterator it = \
self._events.find(event_tag)
if it != self._events.end():
deref(it).second.insert(listener_wrapper)
else:
self._events[event_tag] = set[EventListenerWeakRefWrapper]()
self._events[event_tag].insert(listener_wrapper)
# 概率性清理死亡監聽器
if random.random() < PubSub.ADD_LISTENER_GC_PROBABILITY:
self.c_remove_dead_listeners(event_tag)
cdef c_trigger_event(self, int64_t event_tag, object message):
"""觸發事件通知所有監聽器"""
cdef:
map[int64_t, set[EventListenerWeakRefWrapper]].iterator it
set[EventListenerWeakRefWrapper] *listeners_ptr
EventListenerWeakRefWrapper wrapper
object listener_ref
EventListener listener
it = self._events.find(event_tag)
if it == self._events.end():
return
listeners_ptr = address(deref(it).second)
for wrapper in deref(listeners_ptr):
listener_ref = wrapper.listener_weakref
# 解引用弱引用
if listener_ref is not None:
listener = <EventListener>PyWeakref_GetObject(listener_ref)
if listener is not None:
# 調用監聽器
listener.c_call(message)
cdef c_remove_dead_listeners(self, int64_t event_tag):
"""清理已死亡的監聽器弱引用"""
cdef:
set[EventListenerWeakRefWrapper] *listeners_ptr
set[EventListenerWeakRefWrapper] alive_listeners
EventListenerWeakRefWrapper wrapper
it = self._events.find(event_tag)
if it == self._events.end():
return
listeners_ptr = address(deref(it).second)
for wrapper in deref(listeners_ptr):
if PyWeakref_GetObject(wrapper.listener_weakref) is not None:
alive_listeners.insert(wrapper)
deref(listeners_ptr).swap(alive_listeners)
4.3 事件流程圖
sequenceDiagram
participant Strategy as 策略
participant Connector as 連接器
participant OrderTracker as 訂單追蹤器
participant Exchange as 交易所 API
participant UserStream as 用戶流
participant PubSub as 事件系統
Strategy->>Connector: buy(pair, amount, price)
Connector->>OrderTracker: start_tracking(InFlightOrder)
Connector->>PubSub: trigger(BuyOrderCreatedEvent)
Connector->>Exchange: POST /order
Exchange-->>Connector: order_id
UserStream->>Connector: WebSocket 訂單更新
Connector->>OrderTracker: update_order_state()
UserStream->>Connector: WebSocket 成交更新
Connector->>OrderTracker: process_trade_update()
Connector->>PubSub: trigger(OrderFilledEvent)
PubSub->>Strategy: on_order_filled()
alt 完全成交
Connector->>PubSub: trigger(BuyOrderCompletedEvent)
PubSub->>Strategy: on_order_completed()
Connector->>OrderTracker: stop_tracking()
end
5. Strategy V1:腳本策略基礎
5.1 ScriptStrategyBase
檔案路徑: hummingbot/strategy/script_strategy_base.py
class ScriptStrategyBase(StrategyPyBase):
"""
簡化的策略基類
用於腳本式策略開發
"""
# 定義需要的連接器與交易對
markets: Dict[str, Set[str]] # {"okx": {"BTC-USDT", "ETH-USDT"}}
@classmethod
def init_markets(cls, config: BaseModel) -> Dict[str, Set[str]]:
"""
初始化市場配置
子類實作此方法定義所需的交易所與交易對
"""
raise NotImplementedError
def __init__(self,
connectors: Dict[str, ConnectorBase],
config: Optional[BaseModel] = None):
super().__init__()
self.connectors: Dict[str, ConnectorBase] = connectors
self.ready_to_trade: bool = False
self.config = config
# 將連接器添加為市場
self.add_markets(list(connectors.values()))
def tick(self, timestamp: float):
"""
每個時鐘週期調用
檢查就緒狀態後調用 on_tick()
"""
if not self.ready_to_trade:
# 檢查所有連接器是否就緒
self.ready_to_trade = all(
connector.ready for connector in self.connectors.values()
)
if not self.ready_to_trade:
for name, conn in self.connectors.items():
if not conn.ready:
self.logger().warning(
f"{name} 尚未就緒。狀態: {conn.status_dict}"
)
return
# 所有連接器就緒,執行策略邏輯
self.on_tick()
def on_tick(self):
"""
用戶實作的每週期邏輯
子類覆寫此方法實現交易策略
"""
pass
# ============ 交易便捷方法 ============
def buy(self, connector_name: str, trading_pair: str,
amount: Decimal, order_type: OrderType,
price: Decimal = Decimal("NaN"),
position_action: PositionAction = PositionAction.OPEN) -> str:
"""
提交買單
Returns: client_order_id
"""
market_pair = self._market_trading_pair_tuple(
connector_name, trading_pair
)
return self.buy_with_specific_market(
market_pair, amount, order_type, price,
position_action=position_action
)
def sell(self, connector_name: str, trading_pair: str,
amount: Decimal, order_type: OrderType,
price: Decimal = Decimal("NaN")) -> str:
"""提交賣單"""
market_pair = self._market_trading_pair_tuple(
connector_name, trading_pair
)
return self.sell_with_specific_market(
market_pair, amount, order_type, price
)
def cancel(self, connector_name: str, trading_pair: str,
order_id: str):
"""取消訂單"""
self.cancel_order(
self._market_trading_pair_tuple(connector_name, trading_pair),
order_id
)
# ============ 市場數據便捷方法 ============
def get_mid_price(self, connector_name: str,
trading_pair: str) -> Decimal:
"""獲取中間價"""
connector = self.connectors[connector_name]
return connector.get_mid_price(trading_pair)
def get_order_book(self, connector_name: str,
trading_pair: str) -> OrderBook:
"""獲取訂單簿"""
connector = self.connectors[connector_name]
return connector.get_order_book(trading_pair)
def get_balance(self, connector_name: str, asset: str) -> Decimal:
"""獲取資產餘額"""
connector = self.connectors[connector_name]
return connector.get_balance(asset)
5.2 策略範例:簡單做市
from hummingbot.strategy.script_strategy_base import ScriptStrategyBase
from decimal import Decimal
class SimpleMarketMaker(ScriptStrategyBase):
"""
簡單的做市策略
在買賣價差兩側掛單
"""
markets = {"okx": {"BTC-USDT"}}
# 策略參數
order_amount = Decimal("0.01") # 每單數量
spread = Decimal("0.002") # 價差 0.2%
order_refresh_time = 15.0 # 訂單刷新時間
def __init__(self, connectors):
super().__init__(connectors)
self._last_order_time = 0
self._buy_order_id = None
self._sell_order_id = None
def on_tick(self):
current_time = self.current_timestamp
# 檢查是否需要刷新訂單
if current_time - self._last_order_time < self.order_refresh_time:
return
# 取消現有訂單
self._cancel_existing_orders()
# 獲取中間價
mid_price = self.get_mid_price("okx", "BTC-USDT")
# 計算買賣價格
buy_price = mid_price * (1 - self.spread / 2)
sell_price = mid_price * (1 + self.spread / 2)
# 下買單
self._buy_order_id = self.buy(
"okx", "BTC-USDT",
self.order_amount,
OrderType.LIMIT,
buy_price
)
# 下賣單
self._sell_order_id = self.sell(
"okx", "BTC-USDT",
self.order_amount,
OrderType.LIMIT,
sell_price
)
self._last_order_time = current_time
self.logger().info(
f"掛單完成: 買 {buy_price:.2f} / 賣 {sell_price:.2f}"
)
def _cancel_existing_orders(self):
if self._buy_order_id:
self.cancel("okx", "BTC-USDT", self._buy_order_id)
if self._sell_order_id:
self.cancel("okx", "BTC-USDT", self._sell_order_id)
def did_fill_order(self, event: OrderFilledEvent):
"""訂單成交回調"""
self.logger().info(
f"訂單成交: {event.trade_type.name} "
f"{event.amount} @ {event.price}"
)
6. Strategy V2:控制器與執行器
6.1 V2 架構概覽
flowchart TB
subgraph StrategyV2["Strategy V2 架構"]
MarketData[MarketDataProvider]
Controller[Controller]
ActionQueue[Actions Queue]
Executor[Executor]
end
subgraph Controllers["控制器類型"]
MMController[MarketMakingController]
DirController[DirectionalController]
GenController[GenericController]
end
subgraph Executors["執行器類型"]
OrderExec[OrderExecutor]
PositionExec[PositionExecutor]
DCAExec[DCAExecutor]
GridExec[GridExecutor]
ArbExec[ArbitrageExecutor]
TWAPExec[TWAPExecutor]
XEMMExec[XEMMExecutor]
end
MarketData --> Controller
Controller --> ActionQueue
ActionQueue --> Executor
Controller --> MMController
Controller --> DirController
Controller --> GenController
Executor --> OrderExec
Executor --> PositionExec
Executor --> DCAExec
Executor --> GridExec
Executor --> ArbExec
Executor --> TWAPExec
Executor --> XEMMExec
6.2 RunnableBase:可運行組件基類
檔案路徑: hummingbot/strategy_v2/runnable_base.py
class RunnableStatus(Enum):
NOT_STARTED = "not_started"
RUNNING = "running"
TERMINATED = "terminated"
class RunnableBase(ABC):
"""
智能組件的基類
為需要定期執行任務的組件提供基本結構
"""
def __init__(self, update_interval: float = 0.5):
self.update_interval = update_interval
self._status: RunnableStatus = RunnableStatus.NOT_STARTED
self.terminated = asyncio.Event()
@property
def status(self) -> RunnableStatus:
return self._status
def start(self):
"""啟動組件"""
if self._status == RunnableStatus.NOT_STARTED:
self.terminated.clear()
self._status = RunnableStatus.RUNNING
safe_ensure_future(self.control_loop())
def stop(self):
"""停止組件"""
if self._status != RunnableStatus.TERMINATED:
self._status = RunnableStatus.TERMINATED
self.terminated.set()
async def control_loop(self):
"""主控制循環"""
await self.on_start()
while not self.terminated.is_set():
try:
await self.control_task()
except Exception as e:
self.logger().error(f"控制任務錯誤: {e}", exc_info=True)
finally:
await asyncio.sleep(self.update_interval)
self.on_stop()
async def on_start(self):
"""啟動時回調 - 子類可覆寫"""
pass
@abstractmethod
async def control_task(self):
"""
主控制任務 - 子類必須實作
每個 update_interval 執行一次
"""
raise NotImplementedError
def on_stop(self):
"""停止時回調 - 子類可覆寫"""
pass
6.3 ControllerBase:控制器基類
檔案路徑: hummingbot/strategy_v2/controllers/controller_base.py
@dataclass
class ControllerConfigBase(BaseClientModel):
"""控制器配置基類"""
id: str = Field(default=None)
controller_name: str
controller_type: str = "generic"
total_amount_quote: Decimal = Field(default=Decimal("100"))
manual_kill_switch: bool = Field(default=False)
candles_config: List[CandlesConfig] = Field(default=[])
initial_positions: List[InitialPositionConfig] = Field(default=[])
class ControllerBase(RunnableBase):
"""
控制器基類
管理策略執行、協調執行器
"""
def __init__(self,
config: ControllerConfigBase,
market_data_provider: MarketDataProvider,
actions_queue: asyncio.Queue,
update_interval: float = 1.0):
super().__init__(update_interval=update_interval)
self.config = config
self.market_data_provider = market_data_provider
self.actions_queue = actions_queue
# 執行器狀態追蹤
self.executors_info: List[ExecutorInfo] = []
self.positions_held: List[PositionSummary] = []
async def control_task(self):
"""控制任務 - 子類實作具體邏輯"""
if self.config.manual_kill_switch:
await self._stop_all_executors()
return
# 更新市場數據
await self._update_market_data()
# 執行策略邏輯
actions = await self.determine_actions()
# 將動作發送到隊列
for action in actions:
await self.actions_queue.put(action)
@abstractmethod
async def determine_actions(self) -> List[ExecutorAction]:
"""
確定需要執行的動作
子類實作此方法定義策略邏輯
"""
raise NotImplementedError
def get_executor_by_id(self, executor_id: str) -> Optional[ExecutorInfo]:
"""根據 ID 獲取執行器信息"""
for info in self.executors_info:
if info.id == executor_id:
return info
return None
6.4 ExecutorBase:執行器基類
檔案路徑: hummingbot/strategy_v2/executors/executor_base.py
class ExecutorBase(RunnableBase):
"""
執行器基類
負責執行具體的訂單操作
"""
def __init__(self,
strategy: ScriptStrategyBase,
connectors: List[str],
config: ExecutorConfigBase,
update_interval: float = 0.5):
super().__init__(update_interval)
self.config = config
self._strategy = strategy
self.close_type: Optional[CloseType] = None
self.close_timestamp: Optional[float] = None
# 獲取所需的連接器
self.connectors = {
name: connector
for name, connector in strategy.connectors.items()
if name in connectors
}
# 事件轉發器 - 訂閱訂單事件
self._create_buy_order_forwarder = SourceInfoEventForwarder(
self.process_order_created_event
)
self._fill_order_forwarder = SourceInfoEventForwarder(
self.process_order_filled_event
)
self._cancel_order_forwarder = SourceInfoEventForwarder(
self.process_order_canceled_event
)
self._failed_order_forwarder = SourceInfoEventForwarder(
self.process_order_failed_event
)
# 註冊事件監聽
self._register_events()
def _register_events(self):
"""註冊訂單事件監聽"""
for connector in self.connectors.values():
connector.add_listener(
MarketEvent.BuyOrderCreated,
self._create_buy_order_forwarder
)
connector.add_listener(
MarketEvent.OrderFilled,
self._fill_order_forwarder
)
connector.add_listener(
MarketEvent.OrderCancelled,
self._cancel_order_forwarder
)
connector.add_listener(
MarketEvent.OrderFailure,
self._failed_order_forwarder
)
@property
def executor_info(self) -> ExecutorInfo:
"""返回執行器當前狀態與指標"""
return ExecutorInfo(
id=self.config.id,
timestamp=self.config.timestamp,
type=self.config.type,
status=self.status,
close_type=self.close_type,
net_pnl_pct=self.net_pnl_pct,
net_pnl_quote=self.net_pnl_quote,
cum_fees_quote=self.cum_fees_quote,
filled_amount_quote=self.filled_amount_quote,
is_active=self.is_active,
is_trading=self.is_trading,
)
@property
def net_pnl_pct(self) -> Decimal:
"""淨利潤百分比"""
raise NotImplementedError
@property
def net_pnl_quote(self) -> Decimal:
"""淨利潤(計價貨幣)"""
raise NotImplementedError
def process_order_filled_event(self, event_tag: int,
market: ConnectorBase,
event: OrderFilledEvent):
"""處理訂單成交事件"""
if event.order_id in self._tracked_orders:
self._update_position(event)
self._calculate_pnl()
6.5 執行器類型一覽
| 執行器 | 功能 | 使用場景 |
|---|---|---|
| OrderExecutor | 基本訂單執行 | 簡單買賣操作 |
| PositionExecutor | 持倉管理 | 帶止損止盈的完整交易 |
| DCAExecutor | 定投執行 | 分批建倉 |
| GridExecutor | 網格交易 | 區間震盪獲利 |
| ArbitrageExecutor | 套利執行 | 跨交易所價差捕捉 |
| TWAPExecutor | 時間加權 | 大單分拆減少衝擊 |
| XEMMExecutor | 跨所做市 | 在多個交易所同時做市 |
7. 訂單管理與生命週期
7.1 ClientOrderTracker
檔案路徑: hummingbot/connector/client_order_tracker.py
class ClientOrderTracker:
"""
客戶端訂單追蹤器
管理活躍訂單、緩存已完成訂單、處理遺失訂單
"""
MAX_CACHE_SIZE = 1000
CACHED_ORDER_TTL = 30.0 # 緩存訂單 TTL (秒)
def __init__(self, connector: ConnectorBase,
lost_order_count_limit: int = 3):
self._connector = connector
self._lost_order_count_limit = lost_order_count_limit
# 活躍訂單 - 正在交易所上的訂單
self._in_flight_orders: Dict[str, InFlightOrder] = {}
# 緩存訂單 - 剛完成的訂單,保留以處理延遲的成交
self._cached_orders: TTLCache = TTLCache(
maxsize=self.MAX_CACHE_SIZE,
ttl=self.CACHED_ORDER_TTL
)
# 遺失訂單 - 在交易所上找不到但可能有成交的訂單
self._lost_orders: Dict[str, InFlightOrder] = {}
# 訂單未找到計數 - 超過閾值視為遺失
self._order_not_found_records: Dict[str, int] = defaultdict(int)
@property
def active_orders(self) -> Dict[str, InFlightOrder]:
"""當前活躍的訂單"""
return self._in_flight_orders.copy()
@property
def cached_orders(self) -> Dict[str, InFlightOrder]:
"""緩存中的已完成訂單"""
return dict(self._cached_orders)
@property
def all_fillable_orders(self) -> Dict[str, InFlightOrder]:
"""
所有可能有成交的訂單
包含:活躍 + 緩存 + 遺失訂單
"""
return {
**self.active_orders,
**self.cached_orders,
**self.lost_orders
}
def start_tracking_order(self, order: InFlightOrder):
"""開始追蹤新訂單"""
if order.client_order_id in self._in_flight_orders:
self.logger().warning(
f"訂單 {order.client_order_id} 已在追蹤中"
)
return
self._in_flight_orders[order.client_order_id] = order
self.logger().info(
f"開始追蹤訂單: {order.client_order_id} "
f"{order.trade_type.name} {order.amount} {order.trading_pair}"
)
def stop_tracking_order(self, client_order_id: str):
"""
停止追蹤訂單
將訂單移至緩存,保留一段時間以處理延遲事件
"""
if client_order_id not in self._in_flight_orders:
return
order = self._in_flight_orders.pop(client_order_id)
# 移入緩存而非直接刪除
self._cached_orders[client_order_id] = order
self.logger().info(
f"停止追蹤訂單: {client_order_id}, "
f"最終狀態: {order.current_state.name}"
)
def process_order_update(self, update: OrderUpdate):
"""處理訂單狀態更新"""
order = self._get_order(update.client_order_id)
if order is None:
return False
# 更新交易所訂單 ID
if update.exchange_order_id and not order.exchange_order_id:
order.exchange_order_id = update.exchange_order_id
order.exchange_order_id_update_event.set()
# 更新狀態
order.current_state = update.new_state
# 檢查是否完成
if update.new_state in (OrderState.FILLED, OrderState.CANCELED,
OrderState.FAILED, OrderState.EXPIRED):
self.stop_tracking_order(update.client_order_id)
return True
def process_trade_update(self, trade: TradeUpdate):
"""處理成交更新"""
order = self.all_fillable_orders.get(trade.client_order_id)
if order is None:
self.logger().warning(
f"收到未知訂單的成交: {trade.client_order_id}"
)
return False
is_new = order.update_with_trade_update(trade)
if is_new:
self.logger().info(
f"訂單成交: {trade.client_order_id} "
f"成交 {trade.fill_base_amount} @ {trade.fill_price}"
)
return is_new
def _get_order(self, client_order_id: str) -> Optional[InFlightOrder]:
"""從所有集合中獲取訂單"""
return (
self._in_flight_orders.get(client_order_id) or
self._cached_orders.get(client_order_id) or
self._lost_orders.get(client_order_id)
)
7.2 訂單生命週期流程
sequenceDiagram
participant User as 用戶/策略
participant Tracker as OrderTracker
participant Connector as 連接器
participant Exchange as 交易所
participant Events as 事件系統
User->>Connector: buy(pair, amount, price)
Connector->>Connector: 生成 client_order_id
Connector->>Tracker: start_tracking(InFlightOrder)
Note over Tracker: 狀態: PENDING_CREATE
Connector->>Events: emit(BuyOrderCreatedEvent)
Connector->>Exchange: POST /api/order
alt 提交成功
Exchange-->>Connector: {order_id, status}
Connector->>Tracker: process_order_update(CREATED)
Note over Tracker: 狀態: CREATED → OPEN
else 提交失敗
Exchange-->>Connector: {error}
Connector->>Tracker: process_order_update(FAILED)
Connector->>Events: emit(OrderFailureEvent)
Tracker->>Tracker: stop_tracking → 緩存
end
loop 等待成交
Exchange-->>Connector: WebSocket 成交通知
Connector->>Tracker: process_trade_update(TradeUpdate)
Note over Tracker: 累計成交數量
Connector->>Events: emit(OrderFilledEvent)
alt 部分成交
Note over Tracker: 狀態: PARTIALLY_FILLED
else 完全成交
Note over Tracker: 狀態: FILLED
Connector->>Events: emit(BuyOrderCompletedEvent)
Tracker->>Tracker: stop_tracking → 緩存
end
end
Note over Tracker: 緩存 30 秒後自動清理
8. 時鐘與時間迭代系統
8.1 Clock:時鐘系統 (Cython)
檔案路徑: hummingbot/core/clock.pyx
class ClockMode(Enum):
REALTIME = 1 # 即時模式 - 使用實際時間
BACKTEST = 2 # 回測模式 - 模擬時間推進
cdef class Clock:
"""
時鐘系統
管理時間推進與子迭代器調度
"""
def __init__(self, clock_mode: ClockMode,
tick_size: float = 1.0,
start_time: float = 0.0,
end_time: float = 0.0):
self._clock_mode = clock_mode
self._tick_size = tick_size
if clock_mode is ClockMode.BACKTEST:
self._start_time = start_time
else:
# 即時模式:對齊到 tick_size 邊界
self._start_time = (time.time() // tick_size) * tick_size
self._current_tick = self._start_time
self._child_iterators: List[TimeIterator] = []
def add_iterator(self, iterator: TimeIterator):
"""添加時間迭代器(策略、連接器等)"""
self._child_iterators.append(iterator)
iterator._clock = self
def remove_iterator(self, iterator: TimeIterator):
"""移除時間迭代器"""
if iterator in self._child_iterators:
self._child_iterators.remove(iterator)
iterator._clock = None
async def run(self):
"""無限運行直到外部停止"""
await self.run_til(float("nan"))
async def run_til(self, timestamp: float):
"""
即時模式:運行直到指定時間戳
等待實際時間推進,然後執行所有迭代器
"""
while True:
now = time.time()
if not math.isnan(timestamp) and now >= timestamp:
return
# 計算下一個 tick 時間
next_tick_time = ((now // self._tick_size) + 1) * self._tick_size
# 等待到下一個 tick
await asyncio.sleep(next_tick_time - now)
# 更新當前 tick
self._current_tick = next_tick_time
# 執行所有子迭代器
for iterator in self._child_iterators:
try:
iterator.c_tick(self._current_tick)
except Exception as e:
self.logger().error(
f"迭代器 {type(iterator).__name__} 執行錯誤: {e}"
)
def backtest_til(self, timestamp: float):
"""
回測模式:立即推進到指定時間
不等待實際時間,立即執行所有 tick
"""
while self._current_tick < timestamp:
self._current_tick += self._tick_size
for iterator in self._child_iterators:
iterator.c_tick(self._current_tick)
@property
def current_timestamp(self) -> float:
"""當前時間戳"""
if self._clock_mode is ClockMode.REALTIME:
return time.time()
return self._current_tick
8.2 TimeIterator:時間迭代器
檔案路徑: hummingbot/core/time_iterator.pyx
cdef class TimeIterator(NetworkIterator):
"""
所有需要時鐘驅動的組件的基類
包括:策略、連接器、追蹤器等
"""
def __init__(self):
super().__init__()
self._clock: Optional[Clock] = None
self._current_timestamp: float = 0.0
@property
def current_timestamp(self) -> float:
"""
獲取當前時間戳
優先使用時鐘時間,否則使用系統時間
"""
if self._clock is not None:
return self._clock.current_timestamp
return time.time()
cdef c_tick(self, double timestamp):
"""
時鐘每 tick 調用
子類覆寫以實現週期性邏輯
"""
self._current_timestamp = timestamp
cdef c_start(self, Clock clock, double timestamp):
"""組件啟動時調用"""
self._clock = clock
self._current_timestamp = timestamp
cdef c_stop(self, Clock clock):
"""組件停止時調用"""
self._clock = None
8.3 時鐘調度流程
sequenceDiagram
participant Clock as 時鐘
participant Connector as 連接器
participant Strategy as 策略
participant OrderBook as 訂單簿
Note over Clock: tick_size = 1.0s
loop 每秒執行
Clock->>Clock: 等待下一個 tick
Clock->>Connector: c_tick(timestamp)
Connector->>OrderBook: 更新訂單簿
Connector->>Connector: 檢查訂單狀態
Clock->>Strategy: c_tick(timestamp)
Strategy->>Strategy: tick(timestamp)
alt 就緒檢查
Strategy->>Connector: ready?
alt 未就緒
Strategy->>Strategy: 記錄警告,跳過
else 已就緒
Strategy->>Strategy: on_tick()
Strategy->>Connector: buy/sell/cancel
end
end
end
9. 連接器實作範例:OKX
9.1 OKX 連接器結構
connector/exchange/okx/
├── okx_exchange.py # 主連接器類
├── okx_auth.py # 認證處理
├── okx_api_order_book_data_source.py # 訂單簿數據源
├── okx_api_user_stream_data_source.py # 用戶流數據源
├── okx_constants.py # 常量定義(端點、限制)
├── okx_utils.py # 工具函數
└── okx_web_utils.py # HTTP 客戶端工廠
9.2 OKX 交易所實作
檔案路徑: hummingbot/connector/exchange/okx/okx_exchange.py
class OkxExchange(ExchangePyBase):
"""OKX 交易所連接器"""
def __init__(self,
okx_api_key: str,
okx_secret_key: str,
okx_passphrase: str,
balance_asset_limit: Optional[str] = None,
rate_limits_share_pct: Decimal = Decimal("100"),
trading_pairs: Optional[List[str]] = None,
trading_required: bool = True):
super().__init__(balance_asset_limit, rate_limits_share_pct)
self._okx_api_key = okx_api_key
self._okx_secret_key = okx_secret_key
self._okx_passphrase = okx_passphrase
self._trading_pairs = trading_pairs or []
self._trading_required = trading_required
@property
def authenticator(self) -> OkxAuth:
"""認證器"""
return OkxAuth(
api_key=self._okx_api_key,
secret_key=self._okx_secret_key,
passphrase=self._okx_passphrase
)
@property
def name(self) -> str:
return "okx"
@property
def rate_limits_rules(self) -> List[RateLimit]:
"""API 限速規則"""
return CONSTANTS.RATE_LIMITS
def supported_order_types(self) -> List[OrderType]:
"""支援的訂單類型"""
return [
OrderType.LIMIT,
OrderType.LIMIT_MAKER,
OrderType.MARKET
]
async def _place_order(self,
order_id: str,
trading_pair: str,
amount: Decimal,
trade_type: TradeType,
order_type: OrderType,
price: Decimal,
**kwargs) -> str:
"""
下單實作
將 Hummingbot 格式轉換為 OKX API 格式
"""
# 獲取交易所交易對符號
symbol = await self.exchange_symbol_associated_to_pair(trading_pair)
# 構建請求數據
data = {
"clOrdId": order_id, # 客戶端訂單 ID
"tdMode": "cash", # 現貨模式
"ordType": CONSTANTS.ORDER_TYPE_MAP[order_type],
"side": trade_type.name.lower(),
"instId": symbol,
"sz": str(amount),
}
# 限價單需要價格
if order_type != OrderType.MARKET:
data["px"] = str(price)
# 發送請求
response = await self._api_post(
path_url=CONSTANTS.ORDER_PATH,
data=data,
is_auth_required=True
)
# 解析回應
order_result = response["data"][0]
if order_result["sCode"] != "0":
raise IOError(
f"下單失敗: {order_result['sMsg']}"
)
return order_result["ordId"] # 返回交易所訂單 ID
async def _execute_cancel(self,
order_id: str,
trading_pair: str) -> bool:
"""取消訂單實作"""
symbol = await self.exchange_symbol_associated_to_pair(trading_pair)
data = {
"instId": symbol,
"clOrdId": order_id
}
response = await self._api_post(
path_url=CONSTANTS.CANCEL_ORDER_PATH,
data=data,
is_auth_required=True
)
result = response["data"][0]
return result["sCode"] == "0"
def _create_order_book_data_source(self) -> OrderBookTrackerDataSource:
"""創建訂單簿數據源"""
return OkxAPIOrderBookDataSource(
trading_pairs=self._trading_pairs,
connector=self,
api_factory=self._web_assistants_factory
)
def _create_user_stream_tracker(self) -> UserStreamTracker:
"""創建用戶流追蹤器"""
return UserStreamTracker(
data_source=OkxAPIUserStreamDataSource(
auth=self.authenticator,
trading_pairs=self._trading_pairs,
connector=self,
api_factory=self._web_assistants_factory
)
)
9.3 OKX 認證實作
檔案路徑: hummingbot/connector/exchange/okx/okx_auth.py
class OkxAuth(AuthBase):
"""OKX API 認證"""
def __init__(self, api_key: str, secret_key: str, passphrase: str):
self._api_key = api_key
self._secret_key = secret_key
self._passphrase = passphrase
def generate_signature(self,
timestamp: str,
method: str,
path: str,
body: str = "") -> str:
"""
生成 OKX API 簽名
簽名格式:timestamp + method + path + body
使用 HMAC-SHA256 + Base64
"""
message = f"{timestamp}{method.upper()}{path}{body}"
signature = hmac.new(
self._secret_key.encode("utf-8"),
message.encode("utf-8"),
hashlib.sha256
).digest()
return base64.b64encode(signature).decode("utf-8")
async def rest_authenticate(self, request: RESTRequest) -> RESTRequest:
"""為 REST 請求添加認證標頭"""
timestamp = datetime.utcnow().isoformat(timespec="milliseconds") + "Z"
# 獲取請求體
body = ""
if request.data is not None:
body = json.dumps(request.data)
# 生成簽名
signature = self.generate_signature(
timestamp=timestamp,
method=request.method.value,
path=request.url.path,
body=body
)
# 設置認證標頭
request.headers.update({
"OK-ACCESS-KEY": self._api_key,
"OK-ACCESS-SIGN": signature,
"OK-ACCESS-TIMESTAMP": timestamp,
"OK-ACCESS-PASSPHRASE": self._passphrase,
"Content-Type": "application/json"
})
return request
async def ws_authenticate(self, request: WSRequest) -> WSRequest:
"""為 WebSocket 連接添加認證"""
timestamp = str(int(time.time()))
signature = self.generate_signature(
timestamp=timestamp,
method="GET",
path="/users/self/verify"
)
# WebSocket 認證消息
auth_message = {
"op": "login",
"args": [{
"apiKey": self._api_key,
"passphrase": self._passphrase,
"timestamp": timestamp,
"sign": signature
}]
}
request.payload = auth_message
return request
10. 效能優化與 Cython
10.1 Cython 優化策略
Hummingbot 在效能關鍵路徑使用 Cython 編譯:
| 模組 | 優化原因 |
|---|---|
| OrderBook | 高頻訂單簿更新,使用 C++ std::set |
| PubSub | 事件分發頻繁,弱引用管理 |
| Clock | 精確時間控制,高頻 tick |
| LimitOrder | 訂單數據結構,內存佈局優化 |
| ConnectorBase | 連接器基類,事件處理熱路徑 |
10.2 OrderBook C++ 後端
# 使用 C++ 標準庫容器
from libcpp.set cimport set as cpp_set
from libcpp.vector cimport vector
cdef class OrderBook:
cdef:
# C++ set 提供 O(log n) 插入/刪除
cpp_set[OrderBookEntry] _bid_book
cpp_set[OrderBookEntry] _ask_book
# 快取最佳價格
double _best_bid
double _best_ask
cdef inline void _update_best_bid_ask(self):
"""內聯函數優化最佳價格更新"""
if not self._bid_book.empty():
self._best_bid = deref(self._bid_book.rbegin()).price
if not self._ask_book.empty():
self._best_ask = deref(self._ask_book.begin()).price
10.3 效能特徵
| 操作 | 複雜度 | 實作方式 |
|---|---|---|
| 訂單簿更新 | O(log n) | C++ std::set |
| 事件分發 | O(m) | 弱引用列表遍歷 |
| 訂單查詢 | O(1) | Python dict |
| 成交匹配 | O(k) | 遍歷成交記錄 |
| 價格更新 | O(1) | 快取最佳價格 |
10.4 記憶體管理
# 弱引用防止記憶體洩漏
class PubSub:
ADD_LISTENER_GC_PROBABILITY = 0.005 # 0.5% 機率觸發 GC
def c_add_listener(self, event_tag, listener):
# 使用弱引用
listener_weakref = PyWeakref_NewRef(listener, None)
# 概率性清理
if random.random() < self.ADD_LISTENER_GC_PROBABILITY:
self.c_remove_dead_listeners(event_tag)
# 訂單緩存使用 TTL
class ClientOrderTracker:
CACHED_ORDER_TTL = 30.0 # 30 秒後自動清理
def __init__(self):
self._cached_orders = TTLCache(
maxsize=1000,
ttl=self.CACHED_ORDER_TTL
)
總結
架構優勢
-
模組化設計
- 連接器系統統一多交易所介面
- 策略與執行分離,易於擴展
- 事件驅動解耦組件依賴
-
效能優化
- Cython 編譯熱路徑
- C++ 容器提供高效資料結構
- 弱引用防止記憶體洩漏
-
靈活性
- V1 腳本策略快速開發
- V2 控制器+執行器支援複雜策略
- 支援即時與回測模式
-
可靠性
- 完整的訂單生命週期追蹤
- 緩存機制處理延遲事件
- 遺失訂單恢復機制
適用場景
| 場景 | 推薦架構 |
|---|---|
| 簡單做市 | Script Strategy V1 |
| 網格交易 | GridExecutor V2 |
| 跨所套利 | ArbitrageExecutor V2 |
| 定投策略 | DCAExecutor V2 |
| 複雜多腿策略 | Controller + 多 Executor |
關鍵文件路徑
核心模組:
├── hummingbot/core/trading_core.py # 交易核心
├── hummingbot/core/clock.pyx # 時鐘系統
├── hummingbot/core/pubsub.pyx # 事件系統
├── hummingbot/core/data_type/order_book.pyx # 訂單簿
連接器:
├── hummingbot/connector/connector_base.pyx
├── hummingbot/connector/exchange_py_base.py
├── hummingbot/connector/client_order_tracker.py
策略:
├── hummingbot/strategy/script_strategy_base.py
├── hummingbot/strategy_v2/controllers/controller_base.py
├── hummingbot/strategy_v2/executors/executor_base.py
Hummingbot 展示了如何構建一個專業級的量化交易框架,其模組化架構、效能優化策略與完善的事件系統值得深入學習。