Hummingbot 深度剖析:從交易連接器到策略執行引擎的完整架構解析


生成提示詞

請深入分析 Hummingbot 開源量化交易框架的完整架構:
1. Clone hummingbot 源代碼並閱讀核心模組
2. 分析連接器系統如何對接多個交易所
3. 研究訂單管理與事件驅動架構
4. 解析 Strategy V1 與 V2 的差異與演進
5. 繪製 Mermaid 流程圖說明數據流與交易執行
6. 包含關鍵源碼片段與文件路徑
7. 以繁體中文撰寫工程級深度文章

執行摘要

Hummingbot 是一個開源的加密貨幣量化交易框架,支援多交易所連接、自動做市(AMM)、套利策略與自定義交易邏輯。本文將深入剖析其核心技術:

  • 模組化連接器架構:如何統一不同交易所的 API 差異
  • Cython 高效能核心:OrderBook、Clock、PubSub 的 C++ 優化
  • 事件驅動系統:弱引用 Pub/Sub 模式與記憶體管理
  • Strategy V2 演進:Controller + Executor 的現代化策略架構
  • 訂單生命週期管理:從下單到成交的完整追蹤機制

目錄

  1. Hummingbot 架構總覽
  2. 連接器系統深度解析
  3. 訂單簿與數據結構
  4. 事件驅動架構
  5. Strategy V1:腳本策略基礎
  6. Strategy V2:控制器與執行器
  7. 訂單管理與生命週期
  8. 時鐘與時間迭代系統
  9. 連接器實作範例:OKX
  10. 效能優化與 Cython

1. Hummingbot 架構總覽

1.1 系統組件架構

flowchart TB
    subgraph Client["客戶端層"]
        CLI[Hummingbot CLI]
        MQTT[MQTT Bridge]
        Headless[Headless Mode]
    end

    subgraph Application["應用層"]
        HBApp[HummingbotApplication]
        TradingCore[TradingCore]
        ConnMgr[ConnectorManager]
    end

    subgraph Strategy["策略層"]
        StratV1[Script Strategy V1]
        StratV2[Strategy V2]
        Controller[Controllers]
        Executor[Executors]
    end

    subgraph Connector["連接器層"]
        ExchangeBase[ExchangeBase]
        OKX[OKX Connector]
        Binance[Binance Connector]
        Uniswap[Uniswap Connector]
    end

    subgraph Data["數據層"]
        OrderBook[OrderBook Tracker]
        UserStream[User Stream Tracker]
        OrderTracker[Client Order Tracker]
    end

    subgraph Core["核心層 - Cython"]
        Clock[Clock]
        PubSub[PubSub Event System]
        TimeIterator[TimeIterator]
    end

    CLI --> HBApp
    MQTT --> HBApp
    Headless --> HBApp

    HBApp --> TradingCore
    TradingCore --> ConnMgr
    TradingCore --> StratV1
    TradingCore --> StratV2

    StratV2 --> Controller
    Controller --> Executor

    ConnMgr --> ExchangeBase
    ExchangeBase --> OKX
    ExchangeBase --> Binance
    ExchangeBase --> Uniswap

    OKX --> OrderBook
    OKX --> UserStream
    OKX --> OrderTracker

    Clock --> TimeIterator
    TimeIterator --> StratV1
    TimeIterator --> ExchangeBase

    PubSub --> StratV1
    PubSub --> Executor

1.2 核心模組結構

hummingbot/
├── __init__.py              # 初始化、日誌、路徑設定
├── client/                  # CLI 客戶端
│   ├── hummingbot_application.py  # 主應用程式
│   └── config/              # 配置管理
├── connector/               # 交易所連接器
│   ├── connector_base.pyx   # Cython 基類
│   ├── exchange_py_base.py  # Python 交易所基類
│   └── exchange/            # 各交易所實作
├── core/                    # 核心組件
│   ├── clock.pyx           # 時鐘系統
│   ├── pubsub.pyx          # 事件發布訂閱
│   ├── trading_core.py     # 交易核心
│   ├── connector_manager.py # 連接器管理
│   └── data_type/          # 數據結構
├── strategy/                # V1 策略
│   └── script_strategy_base.py
└── strategy_v2/             # V2 策略
    ├── controllers/         # 控制器
    └── executors/           # 執行器

1.3 TradingCore:核心交易引擎

檔案路徑: hummingbot/core/trading_core.py

class TradingCore:
    """
    核心交易功能的模組化架構
    提供:
    - 連接器管理(創建、添加、移除連接器)
    - 市場數據訪問(訂單簿、餘額等)
    - 策略管理(可選 - 可無策略運行)
    - 直接交易能力
    - 時鐘管理用於即時操作
    """

    def __init__(self, client_config):
        self.connector_manager = ConnectorManager(self.client_config_map)
        self.clock: Optional[Clock] = None
        self.strategy: Optional[StrategyBase] = None
        self.markets_recorder: Optional[MarketsRecorder] = None
        self.trade_fill_db: Optional[SQLConnectionManager] = None
        self._metrics_collectors: Dict[str, MetricsCollector] = {}

    def create_connector(self, connector_name: str,
                         trading_pairs: List[str]) -> ExchangeBase:
        """動態創建交易所連接器"""
        return self.connector_manager.create_connector(
            connector_name, trading_pairs
        )

    async def start_clock(self):
        """啟動時鐘系統進行即時交易"""
        if self.clock is None:
            self.clock = Clock(ClockMode.REALTIME, tick_size=1.0)
        await self.clock.run()

    def start_strategy(self, strategy: StrategyBase):
        """啟動交易策略"""
        self.strategy = strategy
        self.clock.add_iterator(strategy)

2. 連接器系統深度解析

2.1 連接器架構層次

classDiagram
    class NetworkIterator {
        <<Cython>>
        +start_network()
        +stop_network()
        +check_network()
    }

    class ConnectorBase {
        <<Cython>>
        +_event_reporter
        +_event_logger
        +_account_balances
        +buy()
        +sell()
        +cancel()
    }

    class ExchangeBase {
        <<Cython>>
        +order_book_tracker
        +user_stream_tracker
        +trading_rules
        +get_order_book()
    }

    class ExchangePyBase {
        <<Python>>
        +_throttler
        +_time_synchronizer
        +_order_tracker
        +_place_order()
        +_execute_cancel()
    }

    class OkxExchange {
        +okx_api_key
        +authenticator
        +supported_order_types()
    }

    NetworkIterator <|-- ConnectorBase
    ConnectorBase <|-- ExchangeBase
    ExchangeBase <|-- ExchangePyBase
    ExchangePyBase <|-- OkxExchange

2.2 連接器基類 (Cython 實作)

檔案路徑: hummingbot/connector/connector_base.pyx

cdef class ConnectorBase(NetworkIterator):
    # 市場事件類型定義
    MARKET_EVENTS = [
        MarketEvent.ReceivedAsset,
        MarketEvent.BuyOrderCompleted,
        MarketEvent.SellOrderCompleted,
        MarketEvent.OrderCancelled,
        MarketEvent.OrderFilled,
        MarketEvent.OrderExpired,
        MarketEvent.OrderUpdate,
        MarketEvent.TradeUpdate,
        MarketEvent.OrderFailure,
        MarketEvent.TransactionFailure,
        MarketEvent.BuyOrderCreated,
        MarketEvent.SellOrderCreated,
        MarketEvent.FundingPaymentCompleted,
    ]

    def __init__(self, balance_asset_limit=None):
        super().__init__()
        self._event_reporter = EventReporter(event_source=self.display_name)
        self._event_logger = EventLogger(event_source=self.display_name)
        self._account_balances: Dict[str, Decimal] = {}
        self._account_available_balances: Dict[str, Decimal] = {}
        self._current_trade_fills: Set[str] = set()
        self._trade_fee_schema: Optional[TradeFeeSchema] = None

    @property
    def tracking_states(self) -> Dict[str, Any]:
        """返回當前追蹤的訂單狀態"""
        return {}

    def get_balance(self, currency: str) -> Decimal:
        """獲取特定幣種餘額"""
        return self._account_balances.get(currency, Decimal(0))

    def get_available_balance(self, currency: str) -> Decimal:
        """獲取可用餘額(扣除凍結部分)"""
        return self._account_available_balances.get(currency, Decimal(0))

2.3 Python 交易所基類

檔案路徑: hummingbot/connector/exchange_py_base.py

class ExchangePyBase(ExchangeBase, ABC):
    SHORT_POLL_INTERVAL = 5.0    # 短輪詢間隔
    LONG_POLL_INTERVAL = 120.0   # 長輪詢間隔

    def __init__(self, balance_asset_limit=None,
                 rate_limits_share_pct=Decimal("100")):
        super().__init__()
        self._trading_rules: Dict[str, TradingRule] = {}
        self._trading_fees: Dict[str, TradeFeeSchema] = {}

        # 時間同步器 - 處理伺服器時間偏差
        self._time_synchronizer = TimeSynchronizer()

        # 限速器 - 遵守 API 調用限制
        self._throttler = AsyncThrottler(
            rate_limits=self.rate_limits_rules,
            limits_share_percentage=rate_limits_share_pct
        )

        # 訂單簿數據源與追蹤器
        self._orderbook_ds = self._create_order_book_data_source()
        self._set_order_book_tracker(OrderBookTracker(
            data_source=self._orderbook_ds,
            trading_pairs=self._trading_pairs
        ))

        # 用戶流追蹤器 - 餘額與訂單更新
        self._user_stream_tracker = self._create_user_stream_tracker()

        # 訂單追蹤器 - 管理活躍訂單
        self._order_tracker: ClientOrderTracker = self._create_order_tracker()

    @property
    def status_dict(self) -> Dict[str, bool]:
        """連接器就緒狀態檢查"""
        return {
            "symbols_mapping_initialized": self.trading_pair_symbol_map_ready(),
            "order_books_initialized": self.order_book_tracker.ready,
            "account_balance": len(self._account_balances) > 0,
            "trading_rule_initialized": len(self._trading_rules) > 0,
            "user_stream_initialized": self._is_user_stream_initialized(),
        }

    @property
    def ready(self) -> bool:
        """所有組件是否就緒"""
        return all(self.status_dict.values())

    @abstractmethod
    async def _place_order(self, order_id: str, trading_pair: str,
                          amount: Decimal, trade_type: TradeType,
                          order_type: OrderType, price: Decimal) -> str:
        """下單抽象方法 - 各交易所實作"""
        raise NotImplementedError

2.4 連接器管理器

檔案路徑: hummingbot/core/connector_manager.py

class ConnectorManager:
    """
    動態管理交易所連接器
    提供:
    - 動態創建與初始化連接器
    - 添加/移除連接器
    - 無策略情況下訪問市場數據
    - 直接通過連接器下單
    - 獨立於策略管理連接器生命週期
    """

    def create_connector(self, connector_name: str,
                        trading_pairs: List[str],
                        trading_required: bool = True,
                        api_keys: Optional[Dict] = None) -> ConnectorBase:
        """
        動態創建連接器實例

        Args:
            connector_name: 交易所名稱 (如 "okx", "binance")
            trading_pairs: 交易對列表 ["BTC-USDT", "ETH-USDT"]
            trading_required: 是否需要交易權限
            api_keys: 可選的 API 金鑰覆蓋
        """
        # 獲取連接器設定
        connector_setting = AllConnectorSettings.get_connector_settings()[connector_name]

        # 處理模擬交易模式
        if self._client_config.paper_trade.paper_trade_enabled:
            connector_name = f"{connector_name}_paper_trade"

        # 載入 API 金鑰
        if api_keys is None:
            api_keys = self._load_api_keys(connector_name)

        # 創建初始化參數
        init_params = connector_setting.conn_init_parameters(
            trading_pairs=trading_pairs,
            trading_required=trading_required,
            api_keys=api_keys,
            client_config_map=self._client_config
        )

        # 實例化連接器
        connector_class = connector_setting.connector_class()
        connector = connector_class(**init_params)

        return connector

3. 訂單簿與數據結構

3.1 訂單簿 (Cython 高效能實作)

檔案路徑: hummingbot/core/data_type/order_book.pyx

cdef class OrderBook(PubSub):
    """
    高效能訂單簿實作
    使用 C++ std::set 實現 O(log n) 的插入與刪除
    """
    ORDER_BOOK_TRADE_EVENT_TAG = OrderBookEvent.TradeEvent.value

    def __init__(self, dex: bool = False):
        super().__init__()
        self._snapshot_uid: int64_t = 0
        self._last_diff_uid: int64_t = 0
        self._best_bid: double = float("NaN")
        self._best_ask: double = float("NaN")
        self._last_trade_price: double = float("NaN")
        self._dex = dex

    cdef c_apply_diffs(self, vector[OrderBookEntry] bids,
                       vector[OrderBookEntry] asks,
                       int64_t update_id):
        """
        應用增量更新
        - 移除已撤銷的掛單
        - 插入新掛單
        - 處理價格重疊
        - 更新最佳買賣價
        """
        cdef:
            OrderBookEntry entry
            set[OrderBookEntry] *bid_book = address(self._bid_book)
            set[OrderBookEntry] *ask_book = address(self._ask_book)

        # 處理買單更新
        for entry in bids:
            if entry.amount == 0:
                # 數量為 0 表示移除
                bid_book.erase(entry)
            else:
                bid_book.insert(entry)

        # 處理賣單更新
        for entry in asks:
            if entry.amount == 0:
                ask_book.erase(entry)
            else:
                ask_book.insert(entry)

        self._last_diff_uid = update_id
        self._update_best_bid_ask()

    cdef c_apply_snapshot(self, vector[OrderBookEntry] bids,
                          vector[OrderBookEntry] asks,
                          int64_t update_id):
        """應用完整快照 - 用於初始化或同步"""
        self._bid_book.clear()
        self._ask_book.clear()

        for entry in bids:
            self._bid_book.insert(entry)
        for entry in asks:
            self._ask_book.insert(entry)

        self._snapshot_uid = update_id
        self._update_best_bid_ask()

    @property
    def get_price(self) -> float:
        """獲取中間價"""
        return (self._best_bid + self._best_ask) / 2.0

    def get_price_for_volume(self, is_buy: bool, volume: Decimal) -> Decimal:
        """計算特定交易量的加權平均價格"""
        book = self._ask_book if is_buy else self._bid_book
        remaining_volume = float(volume)
        total_cost = 0.0

        for entry in book:
            if remaining_volume <= 0:
                break
            fill_amount = min(entry.amount, remaining_volume)
            total_cost += fill_amount * entry.price
            remaining_volume -= fill_amount

        return Decimal(str(total_cost / float(volume)))

3.2 訂單狀態機

檔案路徑: hummingbot/core/data_type/in_flight_order.py

class OrderState(Enum):
    """訂單狀態枚舉"""
    PENDING_CREATE = 0     # 等待創建
    OPEN = 1               # 已開立
    PENDING_CANCEL = 2     # 等待取消
    CANCELED = 3           # 已取消
    PARTIALLY_FILLED = 4   # 部分成交
    FILLED = 5             # 完全成交
    FAILED = 6             # 失敗
    PENDING_APPROVAL = 7   # 等待批准 (DEX)
    APPROVED = 8           # 已批准 (DEX)
    CREATED = 9            # 已創建
    COMPLETED = 10         # 已完成
stateDiagram-v2
    [*] --> PENDING_CREATE: 提交訂單
    PENDING_CREATE --> CREATED: 交易所確認
    PENDING_CREATE --> FAILED: 提交失敗

    CREATED --> OPEN: 掛單成功
    OPEN --> PARTIALLY_FILLED: 部分成交
    PARTIALLY_FILLED --> PARTIALLY_FILLED: 繼續成交
    PARTIALLY_FILLED --> FILLED: 完全成交
    OPEN --> FILLED: 完全成交

    OPEN --> PENDING_CANCEL: 請求取消
    PARTIALLY_FILLED --> PENDING_CANCEL: 請求取消
    PENDING_CANCEL --> CANCELED: 取消成功
    PENDING_CANCEL --> FILLED: 取消前成交

    FILLED --> COMPLETED: 結算完成
    CANCELED --> COMPLETED: 清理完成

    COMPLETED --> [*]
    FAILED --> [*]

3.3 InFlightOrder:活躍訂單追蹤

class InFlightOrder:
    """
    追蹤從下單到完成的訂單完整生命週期
    """

    def __init__(self,
                 client_order_id: str,
                 trading_pair: str,
                 order_type: OrderType,
                 trade_type: TradeType,
                 amount: Decimal,
                 creation_timestamp: float,
                 price: Optional[Decimal] = None,
                 exchange_order_id: Optional[str] = None,
                 initial_state: OrderState = OrderState.PENDING_CREATE,
                 leverage: int = 1,
                 position: PositionAction = PositionAction.NIL):

        self.client_order_id = client_order_id
        self.trading_pair = trading_pair
        self.order_type = order_type
        self.trade_type = trade_type
        self.amount = amount
        self.price = price
        self.creation_timestamp = creation_timestamp
        self.exchange_order_id = exchange_order_id
        self.current_state = initial_state

        # 成交記錄
        self.order_fills: Dict[str, TradeUpdate] = {}

        # 異步事件 - 用於等待特定狀態
        self.exchange_order_id_update_event = asyncio.Event()
        self.completely_filled_event = asyncio.Event()
        self.processed_by_exchange_event = asyncio.Event()

    @property
    def executed_amount_base(self) -> Decimal:
        """已成交的基礎資產數量"""
        return sum(fill.fill_base_amount for fill in self.order_fills.values())

    @property
    def executed_amount_quote(self) -> Decimal:
        """已成交的計價資產數量"""
        return sum(fill.fill_quote_amount for fill in self.order_fills.values())

    @property
    def average_executed_price(self) -> Decimal:
        """平均成交價格"""
        if self.executed_amount_base == Decimal(0):
            return Decimal(0)
        return self.executed_amount_quote / self.executed_amount_base

    def update_with_trade_update(self, trade_update: TradeUpdate) -> bool:
        """
        使用成交更新訂單
        Returns: 是否為新的成交記錄
        """
        trade_id = trade_update.trade_id
        if trade_id in self.order_fills:
            return False  # 重複的成交

        self.order_fills[trade_id] = trade_update

        # 檢查是否完全成交
        if self.executed_amount_base >= self.amount:
            self.current_state = OrderState.FILLED
            self.completely_filled_event.set()
        elif self.executed_amount_base > Decimal(0):
            self.current_state = OrderState.PARTIALLY_FILLED

        return True

3.4 OrderCandidate:訂單候選計算

檔案路徑: hummingbot/core/data_type/order_candidate.py

@dataclass
class OrderCandidate:
    """
    訂單候選 - 計算訂單對帳戶的完整影響
    包含:抵押品、手續費、潛在收益
    可根據可用餘額調整訂單大小
    """
    trading_pair: str
    is_maker: bool
    order_type: OrderType
    order_side: TradeType
    amount: Decimal
    price: Decimal

    # 計算後填充的欄位
    order_collateral: Optional[TokenAmount] = None
    percent_fee_collateral: Optional[TokenAmount] = None
    percent_fee_value: Optional[TokenAmount] = None
    fixed_fee_collaterals: List[TokenAmount] = field(default_factory=list)
    potential_returns: Optional[TokenAmount] = None
    resized: bool = False

    def populate_collateral_entries(self, exchange: ExchangeBase):
        """計算所有抵押品與手續費項目"""
        self._populate_order_collateral_entry(exchange)

        fee = self._get_fee(exchange)
        self._populate_percent_fee_collateral_entry(exchange, fee)
        self._populate_fixed_fee_collateral_entries(fee)
        self._populate_potential_returns_entry(exchange)

    def adjust_from_balances(self,
                             available_balances: Dict[str, Decimal]) -> bool:
        """
        根據可用餘額調整訂單大小
        Returns: 是否進行了調整
        """
        collateral_token = self.order_collateral.token
        required = self.order_collateral.amount
        available = available_balances.get(collateral_token, Decimal(0))

        if available < required:
            # 計算可負擔的最大數量
            reduction_ratio = available / required
            self.amount *= reduction_ratio
            self.resized = True

            # 重新計算所有欄位
            self.populate_collateral_entries(exchange)
            return True

        return False

4. 事件驅動架構

4.1 事件類型定義

檔案路徑: hummingbot/core/event/events.py

class MarketEvent(Enum):
    """市場事件類型"""
    ReceivedAsset = 101
    BuyOrderCompleted = 102
    SellOrderCompleted = 103
    OrderCancelled = 106
    OrderFilled = 107
    OrderExpired = 108
    OrderUpdate = 109
    TradeUpdate = 110
    OrderFailure = 198
    TransactionFailure = 199
    BuyOrderCreated = 200
    SellOrderCreated = 201
    FundingPaymentCompleted = 202  # 永續合約資金費率
    FundingInfo = 203
    RangePositionLiquidityAdded = 300  # DEX LP
    RangePositionLiquidityRemoved = 301
    RangePositionUpdate = 302

@dataclass
class OrderFilledEvent:
    """訂單成交事件"""
    timestamp: float
    order_id: str
    trading_pair: str
    trade_type: TradeType       # BUY / SELL
    order_type: OrderType       # LIMIT / MARKET
    price: Decimal
    amount: Decimal
    trade_fee: TradeFeeBase
    exchange_trade_id: str = ""
    exchange_order_id: str = ""
    leverage: Optional[int] = 1
    position: Optional[str] = PositionAction.NIL.value

@dataclass
class BuyOrderCompletedEvent:
    """買單完成事件"""
    timestamp: float
    order_id: str
    base_asset: str
    quote_asset: str
    base_asset_amount: Decimal
    quote_asset_amount: Decimal
    order_type: OrderType
    exchange_order_id: Optional[str] = None

4.2 PubSub 事件系統 (Cython)

檔案路徑: hummingbot/core/pubsub.pyx

cdef class PubSub:
    """
    高效能發布訂閱系統

    特性:
    - 使用弱引用避免監聽器記憶體洩漏
    - 概率性垃圾回收清理死亡監聽器
    - C++ 容器提供高效能事件分發
    """

    # 每次添加監聽器時 0.5% 機率觸發 GC
    ADD_LISTENER_GC_PROBABILITY = 0.005

    def __init__(self):
        # C++ map: event_tag -> set<listener_weakref>
        self._events = Events()

    cdef c_add_listener(self, int64_t event_tag, EventListener listener):
        """添加事件監聽器(使用弱引用)"""
        # 創建弱引用避免循環引用
        cdef object listener_weakref = PyWeakref_NewRef(listener, None)

        cdef EventListenerWeakRefWrapper listener_wrapper = \
            EventListenerWeakRefWrapper(listener_weakref)

        # 查找或創建事件標籤的監聽器集合
        cdef map[int64_t, set[EventListenerWeakRefWrapper]].iterator it = \
            self._events.find(event_tag)

        if it != self._events.end():
            deref(it).second.insert(listener_wrapper)
        else:
            self._events[event_tag] = set[EventListenerWeakRefWrapper]()
            self._events[event_tag].insert(listener_wrapper)

        # 概率性清理死亡監聽器
        if random.random() < PubSub.ADD_LISTENER_GC_PROBABILITY:
            self.c_remove_dead_listeners(event_tag)

    cdef c_trigger_event(self, int64_t event_tag, object message):
        """觸發事件通知所有監聽器"""
        cdef:
            map[int64_t, set[EventListenerWeakRefWrapper]].iterator it
            set[EventListenerWeakRefWrapper] *listeners_ptr
            EventListenerWeakRefWrapper wrapper
            object listener_ref
            EventListener listener

        it = self._events.find(event_tag)
        if it == self._events.end():
            return

        listeners_ptr = address(deref(it).second)

        for wrapper in deref(listeners_ptr):
            listener_ref = wrapper.listener_weakref

            # 解引用弱引用
            if listener_ref is not None:
                listener = <EventListener>PyWeakref_GetObject(listener_ref)
                if listener is not None:
                    # 調用監聽器
                    listener.c_call(message)

    cdef c_remove_dead_listeners(self, int64_t event_tag):
        """清理已死亡的監聽器弱引用"""
        cdef:
            set[EventListenerWeakRefWrapper] *listeners_ptr
            set[EventListenerWeakRefWrapper] alive_listeners
            EventListenerWeakRefWrapper wrapper

        it = self._events.find(event_tag)
        if it == self._events.end():
            return

        listeners_ptr = address(deref(it).second)

        for wrapper in deref(listeners_ptr):
            if PyWeakref_GetObject(wrapper.listener_weakref) is not None:
                alive_listeners.insert(wrapper)

        deref(listeners_ptr).swap(alive_listeners)

4.3 事件流程圖

sequenceDiagram
    participant Strategy as 策略
    participant Connector as 連接器
    participant OrderTracker as 訂單追蹤器
    participant Exchange as 交易所 API
    participant UserStream as 用戶流
    participant PubSub as 事件系統

    Strategy->>Connector: buy(pair, amount, price)
    Connector->>OrderTracker: start_tracking(InFlightOrder)
    Connector->>PubSub: trigger(BuyOrderCreatedEvent)
    Connector->>Exchange: POST /order
    Exchange-->>Connector: order_id

    UserStream->>Connector: WebSocket 訂單更新
    Connector->>OrderTracker: update_order_state()

    UserStream->>Connector: WebSocket 成交更新
    Connector->>OrderTracker: process_trade_update()
    Connector->>PubSub: trigger(OrderFilledEvent)
    PubSub->>Strategy: on_order_filled()

    alt 完全成交
        Connector->>PubSub: trigger(BuyOrderCompletedEvent)
        PubSub->>Strategy: on_order_completed()
        Connector->>OrderTracker: stop_tracking()
    end

5. Strategy V1:腳本策略基礎

5.1 ScriptStrategyBase

檔案路徑: hummingbot/strategy/script_strategy_base.py

class ScriptStrategyBase(StrategyPyBase):
    """
    簡化的策略基類
    用於腳本式策略開發
    """

    # 定義需要的連接器與交易對
    markets: Dict[str, Set[str]]  # {"okx": {"BTC-USDT", "ETH-USDT"}}

    @classmethod
    def init_markets(cls, config: BaseModel) -> Dict[str, Set[str]]:
        """
        初始化市場配置
        子類實作此方法定義所需的交易所與交易對
        """
        raise NotImplementedError

    def __init__(self,
                 connectors: Dict[str, ConnectorBase],
                 config: Optional[BaseModel] = None):
        super().__init__()
        self.connectors: Dict[str, ConnectorBase] = connectors
        self.ready_to_trade: bool = False
        self.config = config

        # 將連接器添加為市場
        self.add_markets(list(connectors.values()))

    def tick(self, timestamp: float):
        """
        每個時鐘週期調用
        檢查就緒狀態後調用 on_tick()
        """
        if not self.ready_to_trade:
            # 檢查所有連接器是否就緒
            self.ready_to_trade = all(
                connector.ready for connector in self.connectors.values()
            )

            if not self.ready_to_trade:
                for name, conn in self.connectors.items():
                    if not conn.ready:
                        self.logger().warning(
                            f"{name} 尚未就緒。狀態: {conn.status_dict}"
                        )
                return

        # 所有連接器就緒,執行策略邏輯
        self.on_tick()

    def on_tick(self):
        """
        用戶實作的每週期邏輯
        子類覆寫此方法實現交易策略
        """
        pass

    # ============ 交易便捷方法 ============

    def buy(self, connector_name: str, trading_pair: str,
            amount: Decimal, order_type: OrderType,
            price: Decimal = Decimal("NaN"),
            position_action: PositionAction = PositionAction.OPEN) -> str:
        """
        提交買單
        Returns: client_order_id
        """
        market_pair = self._market_trading_pair_tuple(
            connector_name, trading_pair
        )
        return self.buy_with_specific_market(
            market_pair, amount, order_type, price,
            position_action=position_action
        )

    def sell(self, connector_name: str, trading_pair: str,
             amount: Decimal, order_type: OrderType,
             price: Decimal = Decimal("NaN")) -> str:
        """提交賣單"""
        market_pair = self._market_trading_pair_tuple(
            connector_name, trading_pair
        )
        return self.sell_with_specific_market(
            market_pair, amount, order_type, price
        )

    def cancel(self, connector_name: str, trading_pair: str,
               order_id: str):
        """取消訂單"""
        self.cancel_order(
            self._market_trading_pair_tuple(connector_name, trading_pair),
            order_id
        )

    # ============ 市場數據便捷方法 ============

    def get_mid_price(self, connector_name: str,
                      trading_pair: str) -> Decimal:
        """獲取中間價"""
        connector = self.connectors[connector_name]
        return connector.get_mid_price(trading_pair)

    def get_order_book(self, connector_name: str,
                       trading_pair: str) -> OrderBook:
        """獲取訂單簿"""
        connector = self.connectors[connector_name]
        return connector.get_order_book(trading_pair)

    def get_balance(self, connector_name: str, asset: str) -> Decimal:
        """獲取資產餘額"""
        connector = self.connectors[connector_name]
        return connector.get_balance(asset)

5.2 策略範例:簡單做市

from hummingbot.strategy.script_strategy_base import ScriptStrategyBase
from decimal import Decimal

class SimpleMarketMaker(ScriptStrategyBase):
    """
    簡單的做市策略
    在買賣價差兩側掛單
    """

    markets = {"okx": {"BTC-USDT"}}

    # 策略參數
    order_amount = Decimal("0.01")   # 每單數量
    spread = Decimal("0.002")        # 價差 0.2%
    order_refresh_time = 15.0        # 訂單刷新時間

    def __init__(self, connectors):
        super().__init__(connectors)
        self._last_order_time = 0
        self._buy_order_id = None
        self._sell_order_id = None

    def on_tick(self):
        current_time = self.current_timestamp

        # 檢查是否需要刷新訂單
        if current_time - self._last_order_time < self.order_refresh_time:
            return

        # 取消現有訂單
        self._cancel_existing_orders()

        # 獲取中間價
        mid_price = self.get_mid_price("okx", "BTC-USDT")

        # 計算買賣價格
        buy_price = mid_price * (1 - self.spread / 2)
        sell_price = mid_price * (1 + self.spread / 2)

        # 下買單
        self._buy_order_id = self.buy(
            "okx", "BTC-USDT",
            self.order_amount,
            OrderType.LIMIT,
            buy_price
        )

        # 下賣單
        self._sell_order_id = self.sell(
            "okx", "BTC-USDT",
            self.order_amount,
            OrderType.LIMIT,
            sell_price
        )

        self._last_order_time = current_time
        self.logger().info(
            f"掛單完成: 買 {buy_price:.2f} / 賣 {sell_price:.2f}"
        )

    def _cancel_existing_orders(self):
        if self._buy_order_id:
            self.cancel("okx", "BTC-USDT", self._buy_order_id)
        if self._sell_order_id:
            self.cancel("okx", "BTC-USDT", self._sell_order_id)

    def did_fill_order(self, event: OrderFilledEvent):
        """訂單成交回調"""
        self.logger().info(
            f"訂單成交: {event.trade_type.name} "
            f"{event.amount} @ {event.price}"
        )

6. Strategy V2:控制器與執行器

6.1 V2 架構概覽

flowchart TB
    subgraph StrategyV2["Strategy V2 架構"]
        MarketData[MarketDataProvider]
        Controller[Controller]
        ActionQueue[Actions Queue]
        Executor[Executor]
    end

    subgraph Controllers["控制器類型"]
        MMController[MarketMakingController]
        DirController[DirectionalController]
        GenController[GenericController]
    end

    subgraph Executors["執行器類型"]
        OrderExec[OrderExecutor]
        PositionExec[PositionExecutor]
        DCAExec[DCAExecutor]
        GridExec[GridExecutor]
        ArbExec[ArbitrageExecutor]
        TWAPExec[TWAPExecutor]
        XEMMExec[XEMMExecutor]
    end

    MarketData --> Controller
    Controller --> ActionQueue
    ActionQueue --> Executor

    Controller --> MMController
    Controller --> DirController
    Controller --> GenController

    Executor --> OrderExec
    Executor --> PositionExec
    Executor --> DCAExec
    Executor --> GridExec
    Executor --> ArbExec
    Executor --> TWAPExec
    Executor --> XEMMExec

6.2 RunnableBase:可運行組件基類

檔案路徑: hummingbot/strategy_v2/runnable_base.py

class RunnableStatus(Enum):
    NOT_STARTED = "not_started"
    RUNNING = "running"
    TERMINATED = "terminated"

class RunnableBase(ABC):
    """
    智能組件的基類
    為需要定期執行任務的組件提供基本結構
    """

    def __init__(self, update_interval: float = 0.5):
        self.update_interval = update_interval
        self._status: RunnableStatus = RunnableStatus.NOT_STARTED
        self.terminated = asyncio.Event()

    @property
    def status(self) -> RunnableStatus:
        return self._status

    def start(self):
        """啟動組件"""
        if self._status == RunnableStatus.NOT_STARTED:
            self.terminated.clear()
            self._status = RunnableStatus.RUNNING
            safe_ensure_future(self.control_loop())

    def stop(self):
        """停止組件"""
        if self._status != RunnableStatus.TERMINATED:
            self._status = RunnableStatus.TERMINATED
            self.terminated.set()

    async def control_loop(self):
        """主控制循環"""
        await self.on_start()

        while not self.terminated.is_set():
            try:
                await self.control_task()
            except Exception as e:
                self.logger().error(f"控制任務錯誤: {e}", exc_info=True)
            finally:
                await asyncio.sleep(self.update_interval)

        self.on_stop()

    async def on_start(self):
        """啟動時回調 - 子類可覆寫"""
        pass

    @abstractmethod
    async def control_task(self):
        """
        主控制任務 - 子類必須實作
        每個 update_interval 執行一次
        """
        raise NotImplementedError

    def on_stop(self):
        """停止時回調 - 子類可覆寫"""
        pass

6.3 ControllerBase:控制器基類

檔案路徑: hummingbot/strategy_v2/controllers/controller_base.py

@dataclass
class ControllerConfigBase(BaseClientModel):
    """控制器配置基類"""
    id: str = Field(default=None)
    controller_name: str
    controller_type: str = "generic"
    total_amount_quote: Decimal = Field(default=Decimal("100"))
    manual_kill_switch: bool = Field(default=False)
    candles_config: List[CandlesConfig] = Field(default=[])
    initial_positions: List[InitialPositionConfig] = Field(default=[])

class ControllerBase(RunnableBase):
    """
    控制器基類
    管理策略執行、協調執行器
    """

    def __init__(self,
                 config: ControllerConfigBase,
                 market_data_provider: MarketDataProvider,
                 actions_queue: asyncio.Queue,
                 update_interval: float = 1.0):
        super().__init__(update_interval=update_interval)
        self.config = config
        self.market_data_provider = market_data_provider
        self.actions_queue = actions_queue

        # 執行器狀態追蹤
        self.executors_info: List[ExecutorInfo] = []
        self.positions_held: List[PositionSummary] = []

    async def control_task(self):
        """控制任務 - 子類實作具體邏輯"""
        if self.config.manual_kill_switch:
            await self._stop_all_executors()
            return

        # 更新市場數據
        await self._update_market_data()

        # 執行策略邏輯
        actions = await self.determine_actions()

        # 將動作發送到隊列
        for action in actions:
            await self.actions_queue.put(action)

    @abstractmethod
    async def determine_actions(self) -> List[ExecutorAction]:
        """
        確定需要執行的動作
        子類實作此方法定義策略邏輯
        """
        raise NotImplementedError

    def get_executor_by_id(self, executor_id: str) -> Optional[ExecutorInfo]:
        """根據 ID 獲取執行器信息"""
        for info in self.executors_info:
            if info.id == executor_id:
                return info
        return None

6.4 ExecutorBase:執行器基類

檔案路徑: hummingbot/strategy_v2/executors/executor_base.py

class ExecutorBase(RunnableBase):
    """
    執行器基類
    負責執行具體的訂單操作
    """

    def __init__(self,
                 strategy: ScriptStrategyBase,
                 connectors: List[str],
                 config: ExecutorConfigBase,
                 update_interval: float = 0.5):
        super().__init__(update_interval)
        self.config = config
        self._strategy = strategy
        self.close_type: Optional[CloseType] = None
        self.close_timestamp: Optional[float] = None

        # 獲取所需的連接器
        self.connectors = {
            name: connector
            for name, connector in strategy.connectors.items()
            if name in connectors
        }

        # 事件轉發器 - 訂閱訂單事件
        self._create_buy_order_forwarder = SourceInfoEventForwarder(
            self.process_order_created_event
        )
        self._fill_order_forwarder = SourceInfoEventForwarder(
            self.process_order_filled_event
        )
        self._cancel_order_forwarder = SourceInfoEventForwarder(
            self.process_order_canceled_event
        )
        self._failed_order_forwarder = SourceInfoEventForwarder(
            self.process_order_failed_event
        )

        # 註冊事件監聽
        self._register_events()

    def _register_events(self):
        """註冊訂單事件監聽"""
        for connector in self.connectors.values():
            connector.add_listener(
                MarketEvent.BuyOrderCreated,
                self._create_buy_order_forwarder
            )
            connector.add_listener(
                MarketEvent.OrderFilled,
                self._fill_order_forwarder
            )
            connector.add_listener(
                MarketEvent.OrderCancelled,
                self._cancel_order_forwarder
            )
            connector.add_listener(
                MarketEvent.OrderFailure,
                self._failed_order_forwarder
            )

    @property
    def executor_info(self) -> ExecutorInfo:
        """返回執行器當前狀態與指標"""
        return ExecutorInfo(
            id=self.config.id,
            timestamp=self.config.timestamp,
            type=self.config.type,
            status=self.status,
            close_type=self.close_type,
            net_pnl_pct=self.net_pnl_pct,
            net_pnl_quote=self.net_pnl_quote,
            cum_fees_quote=self.cum_fees_quote,
            filled_amount_quote=self.filled_amount_quote,
            is_active=self.is_active,
            is_trading=self.is_trading,
        )

    @property
    def net_pnl_pct(self) -> Decimal:
        """淨利潤百分比"""
        raise NotImplementedError

    @property
    def net_pnl_quote(self) -> Decimal:
        """淨利潤(計價貨幣)"""
        raise NotImplementedError

    def process_order_filled_event(self, event_tag: int,
                                   market: ConnectorBase,
                                   event: OrderFilledEvent):
        """處理訂單成交事件"""
        if event.order_id in self._tracked_orders:
            self._update_position(event)
            self._calculate_pnl()

6.5 執行器類型一覽

執行器功能使用場景
OrderExecutor基本訂單執行簡單買賣操作
PositionExecutor持倉管理帶止損止盈的完整交易
DCAExecutor定投執行分批建倉
GridExecutor網格交易區間震盪獲利
ArbitrageExecutor套利執行跨交易所價差捕捉
TWAPExecutor時間加權大單分拆減少衝擊
XEMMExecutor跨所做市在多個交易所同時做市

7. 訂單管理與生命週期

7.1 ClientOrderTracker

檔案路徑: hummingbot/connector/client_order_tracker.py

class ClientOrderTracker:
    """
    客戶端訂單追蹤器
    管理活躍訂單、緩存已完成訂單、處理遺失訂單
    """

    MAX_CACHE_SIZE = 1000
    CACHED_ORDER_TTL = 30.0  # 緩存訂單 TTL (秒)

    def __init__(self, connector: ConnectorBase,
                 lost_order_count_limit: int = 3):
        self._connector = connector
        self._lost_order_count_limit = lost_order_count_limit

        # 活躍訂單 - 正在交易所上的訂單
        self._in_flight_orders: Dict[str, InFlightOrder] = {}

        # 緩存訂單 - 剛完成的訂單,保留以處理延遲的成交
        self._cached_orders: TTLCache = TTLCache(
            maxsize=self.MAX_CACHE_SIZE,
            ttl=self.CACHED_ORDER_TTL
        )

        # 遺失訂單 - 在交易所上找不到但可能有成交的訂單
        self._lost_orders: Dict[str, InFlightOrder] = {}

        # 訂單未找到計數 - 超過閾值視為遺失
        self._order_not_found_records: Dict[str, int] = defaultdict(int)

    @property
    def active_orders(self) -> Dict[str, InFlightOrder]:
        """當前活躍的訂單"""
        return self._in_flight_orders.copy()

    @property
    def cached_orders(self) -> Dict[str, InFlightOrder]:
        """緩存中的已完成訂單"""
        return dict(self._cached_orders)

    @property
    def all_fillable_orders(self) -> Dict[str, InFlightOrder]:
        """
        所有可能有成交的訂單
        包含:活躍 + 緩存 + 遺失訂單
        """
        return {
            **self.active_orders,
            **self.cached_orders,
            **self.lost_orders
        }

    def start_tracking_order(self, order: InFlightOrder):
        """開始追蹤新訂單"""
        if order.client_order_id in self._in_flight_orders:
            self.logger().warning(
                f"訂單 {order.client_order_id} 已在追蹤中"
            )
            return

        self._in_flight_orders[order.client_order_id] = order
        self.logger().info(
            f"開始追蹤訂單: {order.client_order_id} "
            f"{order.trade_type.name} {order.amount} {order.trading_pair}"
        )

    def stop_tracking_order(self, client_order_id: str):
        """
        停止追蹤訂單
        將訂單移至緩存,保留一段時間以處理延遲事件
        """
        if client_order_id not in self._in_flight_orders:
            return

        order = self._in_flight_orders.pop(client_order_id)

        # 移入緩存而非直接刪除
        self._cached_orders[client_order_id] = order

        self.logger().info(
            f"停止追蹤訂單: {client_order_id}, "
            f"最終狀態: {order.current_state.name}"
        )

    def process_order_update(self, update: OrderUpdate):
        """處理訂單狀態更新"""
        order = self._get_order(update.client_order_id)
        if order is None:
            return False

        # 更新交易所訂單 ID
        if update.exchange_order_id and not order.exchange_order_id:
            order.exchange_order_id = update.exchange_order_id
            order.exchange_order_id_update_event.set()

        # 更新狀態
        order.current_state = update.new_state

        # 檢查是否完成
        if update.new_state in (OrderState.FILLED, OrderState.CANCELED,
                                OrderState.FAILED, OrderState.EXPIRED):
            self.stop_tracking_order(update.client_order_id)

        return True

    def process_trade_update(self, trade: TradeUpdate):
        """處理成交更新"""
        order = self.all_fillable_orders.get(trade.client_order_id)
        if order is None:
            self.logger().warning(
                f"收到未知訂單的成交: {trade.client_order_id}"
            )
            return False

        is_new = order.update_with_trade_update(trade)

        if is_new:
            self.logger().info(
                f"訂單成交: {trade.client_order_id} "
                f"成交 {trade.fill_base_amount} @ {trade.fill_price}"
            )

        return is_new

    def _get_order(self, client_order_id: str) -> Optional[InFlightOrder]:
        """從所有集合中獲取訂單"""
        return (
            self._in_flight_orders.get(client_order_id) or
            self._cached_orders.get(client_order_id) or
            self._lost_orders.get(client_order_id)
        )

7.2 訂單生命週期流程

sequenceDiagram
    participant User as 用戶/策略
    participant Tracker as OrderTracker
    participant Connector as 連接器
    participant Exchange as 交易所
    participant Events as 事件系統

    User->>Connector: buy(pair, amount, price)
    Connector->>Connector: 生成 client_order_id
    Connector->>Tracker: start_tracking(InFlightOrder)
    Note over Tracker: 狀態: PENDING_CREATE

    Connector->>Events: emit(BuyOrderCreatedEvent)
    Connector->>Exchange: POST /api/order

    alt 提交成功
        Exchange-->>Connector: {order_id, status}
        Connector->>Tracker: process_order_update(CREATED)
        Note over Tracker: 狀態: CREATED → OPEN
    else 提交失敗
        Exchange-->>Connector: {error}
        Connector->>Tracker: process_order_update(FAILED)
        Connector->>Events: emit(OrderFailureEvent)
        Tracker->>Tracker: stop_tracking → 緩存
    end

    loop 等待成交
        Exchange-->>Connector: WebSocket 成交通知
        Connector->>Tracker: process_trade_update(TradeUpdate)
        Note over Tracker: 累計成交數量
        Connector->>Events: emit(OrderFilledEvent)

        alt 部分成交
            Note over Tracker: 狀態: PARTIALLY_FILLED
        else 完全成交
            Note over Tracker: 狀態: FILLED
            Connector->>Events: emit(BuyOrderCompletedEvent)
            Tracker->>Tracker: stop_tracking → 緩存
        end
    end

    Note over Tracker: 緩存 30 秒後自動清理

8. 時鐘與時間迭代系統

8.1 Clock:時鐘系統 (Cython)

檔案路徑: hummingbot/core/clock.pyx

class ClockMode(Enum):
    REALTIME = 1     # 即時模式 - 使用實際時間
    BACKTEST = 2     # 回測模式 - 模擬時間推進

cdef class Clock:
    """
    時鐘系統
    管理時間推進與子迭代器調度
    """

    def __init__(self, clock_mode: ClockMode,
                 tick_size: float = 1.0,
                 start_time: float = 0.0,
                 end_time: float = 0.0):
        self._clock_mode = clock_mode
        self._tick_size = tick_size

        if clock_mode is ClockMode.BACKTEST:
            self._start_time = start_time
        else:
            # 即時模式:對齊到 tick_size 邊界
            self._start_time = (time.time() // tick_size) * tick_size

        self._current_tick = self._start_time
        self._child_iterators: List[TimeIterator] = []

    def add_iterator(self, iterator: TimeIterator):
        """添加時間迭代器(策略、連接器等)"""
        self._child_iterators.append(iterator)
        iterator._clock = self

    def remove_iterator(self, iterator: TimeIterator):
        """移除時間迭代器"""
        if iterator in self._child_iterators:
            self._child_iterators.remove(iterator)
            iterator._clock = None

    async def run(self):
        """無限運行直到外部停止"""
        await self.run_til(float("nan"))

    async def run_til(self, timestamp: float):
        """
        即時模式:運行直到指定時間戳
        等待實際時間推進,然後執行所有迭代器
        """
        while True:
            now = time.time()

            if not math.isnan(timestamp) and now >= timestamp:
                return

            # 計算下一個 tick 時間
            next_tick_time = ((now // self._tick_size) + 1) * self._tick_size

            # 等待到下一個 tick
            await asyncio.sleep(next_tick_time - now)

            # 更新當前 tick
            self._current_tick = next_tick_time

            # 執行所有子迭代器
            for iterator in self._child_iterators:
                try:
                    iterator.c_tick(self._current_tick)
                except Exception as e:
                    self.logger().error(
                        f"迭代器 {type(iterator).__name__} 執行錯誤: {e}"
                    )

    def backtest_til(self, timestamp: float):
        """
        回測模式:立即推進到指定時間
        不等待實際時間,立即執行所有 tick
        """
        while self._current_tick < timestamp:
            self._current_tick += self._tick_size

            for iterator in self._child_iterators:
                iterator.c_tick(self._current_tick)

    @property
    def current_timestamp(self) -> float:
        """當前時間戳"""
        if self._clock_mode is ClockMode.REALTIME:
            return time.time()
        return self._current_tick

8.2 TimeIterator:時間迭代器

檔案路徑: hummingbot/core/time_iterator.pyx

cdef class TimeIterator(NetworkIterator):
    """
    所有需要時鐘驅動的組件的基類
    包括:策略、連接器、追蹤器等
    """

    def __init__(self):
        super().__init__()
        self._clock: Optional[Clock] = None
        self._current_timestamp: float = 0.0

    @property
    def current_timestamp(self) -> float:
        """
        獲取當前時間戳
        優先使用時鐘時間,否則使用系統時間
        """
        if self._clock is not None:
            return self._clock.current_timestamp
        return time.time()

    cdef c_tick(self, double timestamp):
        """
        時鐘每 tick 調用
        子類覆寫以實現週期性邏輯
        """
        self._current_timestamp = timestamp

    cdef c_start(self, Clock clock, double timestamp):
        """組件啟動時調用"""
        self._clock = clock
        self._current_timestamp = timestamp

    cdef c_stop(self, Clock clock):
        """組件停止時調用"""
        self._clock = None

8.3 時鐘調度流程

sequenceDiagram
    participant Clock as 時鐘
    participant Connector as 連接器
    participant Strategy as 策略
    participant OrderBook as 訂單簿

    Note over Clock: tick_size = 1.0s

    loop 每秒執行
        Clock->>Clock: 等待下一個 tick
        Clock->>Connector: c_tick(timestamp)
        Connector->>OrderBook: 更新訂單簿
        Connector->>Connector: 檢查訂單狀態

        Clock->>Strategy: c_tick(timestamp)
        Strategy->>Strategy: tick(timestamp)

        alt 就緒檢查
            Strategy->>Connector: ready?
            alt 未就緒
                Strategy->>Strategy: 記錄警告,跳過
            else 已就緒
                Strategy->>Strategy: on_tick()
                Strategy->>Connector: buy/sell/cancel
            end
        end
    end

9. 連接器實作範例:OKX

9.1 OKX 連接器結構

connector/exchange/okx/
├── okx_exchange.py              # 主連接器類
├── okx_auth.py                  # 認證處理
├── okx_api_order_book_data_source.py  # 訂單簿數據源
├── okx_api_user_stream_data_source.py # 用戶流數據源
├── okx_constants.py             # 常量定義(端點、限制)
├── okx_utils.py                 # 工具函數
└── okx_web_utils.py             # HTTP 客戶端工廠

9.2 OKX 交易所實作

檔案路徑: hummingbot/connector/exchange/okx/okx_exchange.py

class OkxExchange(ExchangePyBase):
    """OKX 交易所連接器"""

    def __init__(self,
                 okx_api_key: str,
                 okx_secret_key: str,
                 okx_passphrase: str,
                 balance_asset_limit: Optional[str] = None,
                 rate_limits_share_pct: Decimal = Decimal("100"),
                 trading_pairs: Optional[List[str]] = None,
                 trading_required: bool = True):
        super().__init__(balance_asset_limit, rate_limits_share_pct)

        self._okx_api_key = okx_api_key
        self._okx_secret_key = okx_secret_key
        self._okx_passphrase = okx_passphrase
        self._trading_pairs = trading_pairs or []
        self._trading_required = trading_required

    @property
    def authenticator(self) -> OkxAuth:
        """認證器"""
        return OkxAuth(
            api_key=self._okx_api_key,
            secret_key=self._okx_secret_key,
            passphrase=self._okx_passphrase
        )

    @property
    def name(self) -> str:
        return "okx"

    @property
    def rate_limits_rules(self) -> List[RateLimit]:
        """API 限速規則"""
        return CONSTANTS.RATE_LIMITS

    def supported_order_types(self) -> List[OrderType]:
        """支援的訂單類型"""
        return [
            OrderType.LIMIT,
            OrderType.LIMIT_MAKER,
            OrderType.MARKET
        ]

    async def _place_order(self,
                          order_id: str,
                          trading_pair: str,
                          amount: Decimal,
                          trade_type: TradeType,
                          order_type: OrderType,
                          price: Decimal,
                          **kwargs) -> str:
        """
        下單實作
        將 Hummingbot 格式轉換為 OKX API 格式
        """
        # 獲取交易所交易對符號
        symbol = await self.exchange_symbol_associated_to_pair(trading_pair)

        # 構建請求數據
        data = {
            "clOrdId": order_id,        # 客戶端訂單 ID
            "tdMode": "cash",           # 現貨模式
            "ordType": CONSTANTS.ORDER_TYPE_MAP[order_type],
            "side": trade_type.name.lower(),
            "instId": symbol,
            "sz": str(amount),
        }

        # 限價單需要價格
        if order_type != OrderType.MARKET:
            data["px"] = str(price)

        # 發送請求
        response = await self._api_post(
            path_url=CONSTANTS.ORDER_PATH,
            data=data,
            is_auth_required=True
        )

        # 解析回應
        order_result = response["data"][0]

        if order_result["sCode"] != "0":
            raise IOError(
                f"下單失敗: {order_result['sMsg']}"
            )

        return order_result["ordId"]  # 返回交易所訂單 ID

    async def _execute_cancel(self,
                             order_id: str,
                             trading_pair: str) -> bool:
        """取消訂單實作"""
        symbol = await self.exchange_symbol_associated_to_pair(trading_pair)

        data = {
            "instId": symbol,
            "clOrdId": order_id
        }

        response = await self._api_post(
            path_url=CONSTANTS.CANCEL_ORDER_PATH,
            data=data,
            is_auth_required=True
        )

        result = response["data"][0]
        return result["sCode"] == "0"

    def _create_order_book_data_source(self) -> OrderBookTrackerDataSource:
        """創建訂單簿數據源"""
        return OkxAPIOrderBookDataSource(
            trading_pairs=self._trading_pairs,
            connector=self,
            api_factory=self._web_assistants_factory
        )

    def _create_user_stream_tracker(self) -> UserStreamTracker:
        """創建用戶流追蹤器"""
        return UserStreamTracker(
            data_source=OkxAPIUserStreamDataSource(
                auth=self.authenticator,
                trading_pairs=self._trading_pairs,
                connector=self,
                api_factory=self._web_assistants_factory
            )
        )

9.3 OKX 認證實作

檔案路徑: hummingbot/connector/exchange/okx/okx_auth.py

class OkxAuth(AuthBase):
    """OKX API 認證"""

    def __init__(self, api_key: str, secret_key: str, passphrase: str):
        self._api_key = api_key
        self._secret_key = secret_key
        self._passphrase = passphrase

    def generate_signature(self,
                          timestamp: str,
                          method: str,
                          path: str,
                          body: str = "") -> str:
        """
        生成 OKX API 簽名
        簽名格式:timestamp + method + path + body
        使用 HMAC-SHA256 + Base64
        """
        message = f"{timestamp}{method.upper()}{path}{body}"

        signature = hmac.new(
            self._secret_key.encode("utf-8"),
            message.encode("utf-8"),
            hashlib.sha256
        ).digest()

        return base64.b64encode(signature).decode("utf-8")

    async def rest_authenticate(self, request: RESTRequest) -> RESTRequest:
        """為 REST 請求添加認證標頭"""
        timestamp = datetime.utcnow().isoformat(timespec="milliseconds") + "Z"

        # 獲取請求體
        body = ""
        if request.data is not None:
            body = json.dumps(request.data)

        # 生成簽名
        signature = self.generate_signature(
            timestamp=timestamp,
            method=request.method.value,
            path=request.url.path,
            body=body
        )

        # 設置認證標頭
        request.headers.update({
            "OK-ACCESS-KEY": self._api_key,
            "OK-ACCESS-SIGN": signature,
            "OK-ACCESS-TIMESTAMP": timestamp,
            "OK-ACCESS-PASSPHRASE": self._passphrase,
            "Content-Type": "application/json"
        })

        return request

    async def ws_authenticate(self, request: WSRequest) -> WSRequest:
        """為 WebSocket 連接添加認證"""
        timestamp = str(int(time.time()))

        signature = self.generate_signature(
            timestamp=timestamp,
            method="GET",
            path="/users/self/verify"
        )

        # WebSocket 認證消息
        auth_message = {
            "op": "login",
            "args": [{
                "apiKey": self._api_key,
                "passphrase": self._passphrase,
                "timestamp": timestamp,
                "sign": signature
            }]
        }

        request.payload = auth_message
        return request

10. 效能優化與 Cython

10.1 Cython 優化策略

Hummingbot 在效能關鍵路徑使用 Cython 編譯:

模組優化原因
OrderBook高頻訂單簿更新,使用 C++ std::set
PubSub事件分發頻繁,弱引用管理
Clock精確時間控制,高頻 tick
LimitOrder訂單數據結構,內存佈局優化
ConnectorBase連接器基類,事件處理熱路徑

10.2 OrderBook C++ 後端

# 使用 C++ 標準庫容器
from libcpp.set cimport set as cpp_set
from libcpp.vector cimport vector

cdef class OrderBook:
    cdef:
        # C++ set 提供 O(log n) 插入/刪除
        cpp_set[OrderBookEntry] _bid_book
        cpp_set[OrderBookEntry] _ask_book

        # 快取最佳價格
        double _best_bid
        double _best_ask

    cdef inline void _update_best_bid_ask(self):
        """內聯函數優化最佳價格更新"""
        if not self._bid_book.empty():
            self._best_bid = deref(self._bid_book.rbegin()).price
        if not self._ask_book.empty():
            self._best_ask = deref(self._ask_book.begin()).price

10.3 效能特徵

操作複雜度實作方式
訂單簿更新O(log n)C++ std::set
事件分發O(m)弱引用列表遍歷
訂單查詢O(1)Python dict
成交匹配O(k)遍歷成交記錄
價格更新O(1)快取最佳價格

10.4 記憶體管理

# 弱引用防止記憶體洩漏
class PubSub:
    ADD_LISTENER_GC_PROBABILITY = 0.005  # 0.5% 機率觸發 GC

    def c_add_listener(self, event_tag, listener):
        # 使用弱引用
        listener_weakref = PyWeakref_NewRef(listener, None)

        # 概率性清理
        if random.random() < self.ADD_LISTENER_GC_PROBABILITY:
            self.c_remove_dead_listeners(event_tag)

# 訂單緩存使用 TTL
class ClientOrderTracker:
    CACHED_ORDER_TTL = 30.0  # 30 秒後自動清理

    def __init__(self):
        self._cached_orders = TTLCache(
            maxsize=1000,
            ttl=self.CACHED_ORDER_TTL
        )

總結

架構優勢

  1. 模組化設計

    • 連接器系統統一多交易所介面
    • 策略與執行分離,易於擴展
    • 事件驅動解耦組件依賴
  2. 效能優化

    • Cython 編譯熱路徑
    • C++ 容器提供高效資料結構
    • 弱引用防止記憶體洩漏
  3. 靈活性

    • V1 腳本策略快速開發
    • V2 控制器+執行器支援複雜策略
    • 支援即時與回測模式
  4. 可靠性

    • 完整的訂單生命週期追蹤
    • 緩存機制處理延遲事件
    • 遺失訂單恢復機制

適用場景

場景推薦架構
簡單做市Script Strategy V1
網格交易GridExecutor V2
跨所套利ArbitrageExecutor V2
定投策略DCAExecutor V2
複雜多腿策略Controller + 多 Executor

關鍵文件路徑

核心模組:
├── hummingbot/core/trading_core.py        # 交易核心
├── hummingbot/core/clock.pyx              # 時鐘系統
├── hummingbot/core/pubsub.pyx             # 事件系統
├── hummingbot/core/data_type/order_book.pyx # 訂單簿

連接器:
├── hummingbot/connector/connector_base.pyx
├── hummingbot/connector/exchange_py_base.py
├── hummingbot/connector/client_order_tracker.py

策略:
├── hummingbot/strategy/script_strategy_base.py
├── hummingbot/strategy_v2/controllers/controller_base.py
├── hummingbot/strategy_v2/executors/executor_base.py

Hummingbot 展示了如何構建一個專業級的量化交易框架,其模組化架構、效能優化策略與完善的事件系統值得深入學習。