Strategies¶
The core of the nexustrader user experience lies in creating and managing trading strategies. A trading strategy is defined by subclassing the Strategy class and implementing the methods necessary for the user’s trading logic.
Note
See the Strategy API Reference for a complete description of all available methods.
Strategy Implementation¶
from nexustrader.strategy import Strategy
class Demo(Strategy):
def __init__(self):
super().__init__() # <-- the super class must be called to initialize the strategy
Lifecycle Hooks¶
Two special hooks are called by the engine automatically:
class Demo(Strategy):
def on_start(self):
"""Called once after the engine has connected to all exchanges.
Use this to subscribe to market data and register indicators."""
self.subscribe_bookl1(symbols=["BTCUSDT-PERP.OKX"])
def on_stop(self):
"""Called when the engine is shutting down."""
self.log.info("Strategy stopping")
Important
Always perform subscriptions inside on_start (not __init__), because connectors are
not ready until the engine has started.
Handlers¶
Handlers are methods within the Strategy class which may perform actions based on different types
of events or on state changes. These methods are named with the prefix on_*. You can choose to
implement any or all of them depending on your strategy.
Live Data Handlers¶
These handlers receive data updates from the exchange.
from nexustrader.schema import BookL1, BookL2, Trade, Kline, FundingRate, MarkPrice, IndexPrice
from nexustrader.constants import KlineInterval, BookLevel
class Demo(Strategy):
def on_start(self):
self.subscribe_bookl1(symbols=["BTCUSDT-PERP.OKX"])
self.subscribe_bookl2(symbols=["BTCUSDT-PERP.OKX"], level=BookLevel.L20)
self.subscribe_trade(symbols=["BTCUSDT-PERP.OKX"])
self.subscribe_kline(symbols=["BTCUSDT-PERP.OKX"], interval=KlineInterval.MINUTE_1)
self.subscribe_funding_rate(symbols=["BTCUSDT-PERP.OKX"])
self.subscribe_mark_price(symbols=["BTCUSDT-PERP.OKX"])
self.subscribe_index_price(symbols=["BTCUSDT-PERP.OKX"])
def on_bookl1(self, bookl1: BookL1):
# bookl1.bid, bookl1.ask, bookl1.mid, bookl1.weighted_mid
...
def on_bookl2(self, bookl2: BookL2):
# bookl2.bids, bookl2.asks (list of [price, size])
...
def on_trade(self, trade: Trade):
# trade.price, trade.size, trade.side
...
def on_kline(self, kline: Kline):
# kline.open, kline.high, kline.low, kline.close, kline.volume
# kline.confirm == True means the bar has closed
...
def on_funding_rate(self, rate: FundingRate):
# rate.rate, rate.next_funding_time
...
def on_mark_price(self, price: MarkPrice):
# price.price
...
def on_index_price(self, price: IndexPrice):
# price.price
...
Unsubscribing from Data¶
You can unsubscribe at runtime:
self.unsubscribe_bookl1(symbols=["BTCUSDT-PERP.OKX"])
self.unsubscribe_trade(symbols=["BTCUSDT-PERP.OKX"])
self.unsubscribe_kline(symbols=["BTCUSDT-PERP.OKX"], interval=KlineInterval.MINUTE_1)
self.unsubscribe_bookl2(symbols=["BTCUSDT-PERP.OKX"], level=BookLevel.L20)
Order Management Handlers¶
These handlers receive order status updates from the exchange.
from nexustrader.schema import Order
class Demo(Strategy):
def on_pending_order(self, order: Order):
"""Order sent to exchange; awaiting confirmation."""
...
def on_accepted_order(self, order: Order):
"""Exchange accepted the order (open on the book)."""
...
def on_partially_filled_order(self, order: Order):
"""Order partially filled; ``order.filled`` shows how much."""
...
def on_filled_order(self, order: Order):
"""Order completely filled."""
...
def on_canceling_order(self, order: Order):
"""Cancel request sent; awaiting exchange confirmation."""
...
def on_canceled_order(self, order: Order):
"""Order successfully cancelled."""
...
def on_failed_order(self, order: Order):
"""Order creation failed."""
...
def on_cancel_failed_order(self, order: Order):
"""Order cancellation failed."""
...
Private WebSocket Handlers¶
These handlers are called when the private WebSocket connection changes state, when a post-reconnect reconciliation diff is available, or when a WS order/cancel request produces a structured ACK outcome.
class Demo(Strategy):
def on_private_ws_status(self, status: dict):
"""Called on private WS connect / disconnect / reconnect / resynced events.
status keys:
- exchange (str): e.g. "okx", "binance", "bybit"
- account_type (str): e.g. "live", "demo"
- event (str): "connected" | "disconnected" | "reconnected" | "resynced"
- timestamp (int): milliseconds since epoch
"""
self.log.info(f"Private WS {status['event']} on {status['exchange']}")
def on_private_ws_resync_diff(self, payload: dict):
"""Called after a successful post-reconnect reconciliation.
payload keys:
- exchange, account_type, timestamp (same as above)
- diff (dict):
- positions_opened (list[str]): symbols with newly detected positions
- positions_closed (list[str]): symbols whose positions disappeared
- open_orders_added (list[str]): OIDs newly found open
- open_orders_removed (list[str]): OIDs no longer open
"""
diff = payload.get("diff", {})
if diff.get("open_orders_removed"):
self.log.warning(f"Orders closed during disconnect: {diff['open_orders_removed']}")
def on_ws_order_request_result(self, result: dict):
"""Called when a background WS order/cancel task produces a structured ACK result.
Fired for every non-success outcome of a ``create_order_ws()`` or
``cancel_order_ws()`` call — the background task catches the exception and
publishes it here so you do not need to wrap individual calls in try/except.
result keys:
- oid (str): client order ID
- symbol (str): e.g. "BTCUSDT-PERP.OKX"
- exchange (str): exchange name
- result_type (WsOrderResultType):
REQUEST_NOT_SENT — socket was down; request never left
ACK_REJECTED — exchange explicitly rejected the request
ACK_TIMEOUT — no ACK received; REST also could not confirm
ACK_TIMEOUT_CONFIRMED — no ACK received, but REST confirmed the order
- reason (str): human-readable description
- timestamp (int): milliseconds since epoch
"""
from nexustrader.constants import WsOrderResultType
rt = result["result_type"]
oid = result["oid"]
symbol = result["symbol"]
if rt == WsOrderResultType.ACK_TIMEOUT_CONFIRMED:
# order actually went through — no action needed
self.log.info(f"[{symbol}] WS ACK timeout but REST confirmed oid={oid}")
elif rt == WsOrderResultType.ACK_REJECTED:
self.log.warning(f"[{symbol}] WS order rejected oid={oid}: {result['reason']}")
elif rt == WsOrderResultType.ACK_TIMEOUT:
# order state unknown — consider REST query or retry logic
self.log.error(f"[{symbol}] WS ACK timeout, state unknown oid={oid}")
elif rt == WsOrderResultType.REQUEST_NOT_SENT:
# ws_fallback=False was used and the socket was down
self.log.error(f"[{symbol}] WS request not sent oid={oid}: {result['reason']}")
Order Query Methods¶
You can query live order state from within any strategy method:
class Demo(Strategy):
def on_bookl1(self, bookl1: BookL1):
# Fetch a specific order by OID (checks cache first, then REST)
order = self.fetch_order(symbol="BTCUSDT-PERP.OKX", oid="my-order-id")
# Bypass the local cache and query the exchange REST API directly.
# Use this when authoritative exchange state is required, e.g. after
# an ACK timeout or after reconnecting.
order = self.fetch_order(
symbol="BTCUSDT-PERP.OKX",
oid="my-order-id",
force_refresh=True,
)
# Fetch all currently open orders for a symbol
open_orders = self.fetch_open_orders(symbol="BTCUSDT-PERP.OKX")
# Fetch recent filled/partially-filled orders from cache
recent = self.fetch_recent_trades(symbol="BTCUSDT-PERP.OKX", limit=10)
Reconnect Reconciliation Grace Period¶
After a private WebSocket reconnects, NexusTrader automatically re-fetches balances, positions, and open orders to detect any changes that occurred during the outage. To avoid false order closures from delayed exchange snapshots, a configurable grace window (default 700 ms) is applied before confirming that a missing order is truly closed.
You can adjust this window per exchange in on_start:
from nexustrader.constants import ExchangeType
class Demo(Strategy):
def on_start(self):
# Wait 1.5 s before confirming missing orders as closed
self.set_reconnect_reconcile_grace_ms(ExchangeType.OKX, grace_ms=1500)
Waiting for Data Readiness¶
Use DataReady to guard your trading logic until all required
data feeds have received at least one update:
from nexustrader.core.entity import DataReady
class Demo(Strategy):
def __init__(self):
super().__init__()
self._ready = DataReady(keys=["BTCUSDT-PERP.OKX", "ETHUSDT-PERP.OKX"])
def on_bookl1(self, bookl1: BookL1):
self._ready.mark(bookl1.symbol)
if not self._ready:
return
# safe to trade now
...
Multi-Mode Support¶
nexustrader supports multiple modes of operation to cater to different trading strategies and requirements. Each mode allows for flexibility in how trading logic is executed based on market conditions or specific triggers.
Event-Driven Mode¶
In this mode, trading logic is executed in response to real-time market events. The methods on_bookl1, on_trade, and on_kline are triggered whenever relevant data is updated, allowing for immediate reaction to market changes.
class Demo(Strategy):
def __init__(self):
super().__init__()
self.subscribe_bookl1(symbols=["BTCUSDT-PERP.BINANCE"])
def on_bookl1(self, bookl1: BookL1):
# implement the trading logic Here
pass
Timer Mode¶
This mode allows you to schedule trading logic to run at specific intervals. You can use the schedule method to define when your trading algorithm should execute, making it suitable for strategies that require periodic checks or actions.
class Demo2(Strategy):
def __init__(self):
super().__init__()
self.schedule(self.algo, trigger="interval", seconds=1)
def algo(self):
# run every 1 second
# implement the trading logic Here
pass
Custom Signal Mode¶
In this mode, trading logic is executed based on custom signals. You can define your own signals and use the on_custom_signal method to trigger trading actions when these signals are received. This is particularly useful for integrating with external systems or custom event sources.
class Demo3(Strategy):
def __init__(self):
super().__init__()
self.signal = True
def on_custom_signal(self, signal: object):
# implement the trading logic Here,
# signal can be any object, it is up to you to define the signal
pass