Reliable WS Order Flow¶
This guide shows how to combine NexusTrader’s WS reliability features into a production-grade strategy that handles duplicate signals, connection drops, and delayed exchange ACKs without manual bookkeeping.
The four pillars covered here are:
Idempotent order creation —
idempotency_keyprevents duplicate submissionsWS fallback — automatic REST retry when the WS socket is down
Structured ACK error handling —
on_ws_order_request_result()callbackReconnect reconciliation —
on_private_ws_resync_diff()andon_private_ws_status()
Idempotent Order Creation¶
Problem¶
A fast signal loop may call create_order_ws() several times before the
first exchange ACK arrives (e.g. every tick while price is in range). Without
a deduplication mechanism this floods the exchange with identical orders.
Solution¶
Pass an idempotency_key that is stable for the logical order. The cache
maps every key to a single canonical OID; repeated calls with the same key are
silently no-ops.
from decimal import Decimal
from nexustrader.strategy import Strategy
from nexustrader.constants import OrderSide, OrderType
from nexustrader.schema import BookL1
SYMBOL = "BTCUSDT-PERP.OKX"
class IdempotentEntry(Strategy):
def on_bookl1(self, bookl1: BookL1):
if bookl1.ask < 90_000:
# No matter how many ticks fire, only one order is submitted
self.create_order_ws(
symbol=SYMBOL,
side=OrderSide.BUY,
type=OrderType.LIMIT,
amount=Decimal("0.001"),
price=self.price_to_precision(SYMBOL, bookl1.ask),
idempotency_key="entry:btc:long", # stable key for this signal
)
You can also supply a deterministic client_oid (shown to the exchange as
the client order ID) alongside the idempotency key:
self.create_order_ws(
symbol=SYMBOL,
side=OrderSide.BUY,
type=OrderType.LIMIT,
amount=Decimal("0.001"),
price=Decimal("89500"),
client_oid="entry_btc_001", # stable exchange-visible ID
idempotency_key="entry:btc:long", # deduplication key
)
Tip
idempotency_key and client_oid are independent. You can use one or
both. idempotency_key controls local deduplication; client_oid
controls the ID sent to the exchange.
WS Fallback on Connection Failure¶
Both create_order_ws() and cancel_order_ws() accept a ws_fallback
keyword argument (default True):
|
Behaviour when the WS socket is unavailable |
|---|---|
|
Transparently retries the same request via REST. The order lifecycle proceeds normally — no extra handler code needed. |
|
Immediately marks the order |
# Default: if WS is down, retry via REST automatically
self.create_order_ws(symbol=SYMBOL, ..., ws_fallback=True)
# Strict: fail fast if WS is unavailable, never fall back
self.create_order_ws(symbol=SYMBOL, ..., ws_fallback=False)
# Cancel with explicit fail-fast
self.cancel_order_ws(symbol=SYMBOL, oid=oid, ws_fallback=False)
Handling WS ACK Errors¶
Even when the WS socket is healthy, the exchange may be slow to acknowledge
or may reject the request outright. NexusTrader categorises every non-success
outcome with WsOrderResultType and delivers it to the
on_ws_order_request_result() callback.
|
When it fires |
|---|---|
|
Socket was down and |
|
Exchange explicitly rejected the request (e.g. bad price, insufficient margin). |
|
No ACK within 5 s and REST could not confirm the order — state truly unknown. |
|
No ACK within 5 s but REST found the order — treat as success. |
from nexustrader.constants import WsOrderResultType
class RobustStrategy(Strategy):
def on_ws_order_request_result(self, result: dict):
rt = result["result_type"]
oid = result["oid"]
symbol = result["symbol"]
reason = result["reason"]
if rt == WsOrderResultType.ACK_TIMEOUT_CONFIRMED:
# Order exists on exchange — nothing to do
self.log.info(f"[{symbol}] {oid} confirmed via REST after ACK timeout")
elif rt == WsOrderResultType.ACK_REJECTED:
# Exchange said no — log and optionally resubmit with corrected params
self.log.warning(f"[{symbol}] order rejected: {reason}")
elif rt == WsOrderResultType.ACK_TIMEOUT:
# Unknown state — query REST directly to decide
order = self.fetch_order(symbol, oid, force_refresh=True)
if order is None:
self.log.error(
f"[{symbol}] {oid} not found after ACK timeout — may need resubmit"
)
else:
self.log.info(f"[{symbol}] {oid} found on exchange: {order.status}")
elif rt == WsOrderResultType.REQUEST_NOT_SENT:
# ws_fallback=False was used and the socket was down
self.log.error(f"[{symbol}] WS was disconnected — {oid} was never sent")
Reconnect Reconciliation¶
When the private WebSocket drops and reconnects, NexusTrader automatically re-fetches balances, positions, and open orders, then emits a diff of what changed during the outage.
class RobustStrategy(Strategy):
def on_private_ws_status(self, status: dict):
"""Fires on every connection state change."""
event = status["event"] # "connected"|"disconnected"|"reconnected"|"resynced"
exchange = status["exchange"]
self.log.info(f"[{exchange}] WS {event}")
if event == "disconnected":
# Optionally suspend new order submissions until reconnected
self._ws_ready = False
elif event == "resynced":
self._ws_ready = True
def on_private_ws_resync_diff(self, payload: dict):
"""Fires once after the post-reconnect snapshot is reconciled."""
diff = payload.get("diff", {})
closed = diff.get("open_orders_removed", [])
if closed:
self.log.warning(
f"Orders closed during disconnect: {closed}"
)
# Optionally resubmit market orders for filled/expired positions
opened = diff.get("open_orders_added", [])
if opened:
self.log.info(f"New open orders detected after reconnect: {opened}")
positions_closed = diff.get("positions_closed", [])
if positions_closed:
self.log.warning(f"Positions closed during outage: {positions_closed}")
Adjusting the reconciliation grace window
To avoid false order closures from delayed exchange snapshots, a grace period (default 700 ms) is applied before an order missing from the snapshot is confirmed as closed. Increase it on high-latency connections:
from nexustrader.constants import ExchangeType
class RobustStrategy(Strategy):
def on_start(self):
self.set_reconnect_reconcile_grace_ms(ExchangeType.OKX, grace_ms=1500)
self.set_reconnect_reconcile_grace_ms(ExchangeType.BINANCE, grace_ms=1000)
Full Example¶
The snippet below combines all four features into a single strategy skeleton.
from decimal import Decimal
from nexustrader.strategy import Strategy
from nexustrader.constants import OrderSide, OrderType, ExchangeType, WsOrderResultType
from nexustrader.schema import BookL1, Order
SYMBOL = "BTCUSDT-PERP.OKX"
class RobustEntry(Strategy):
def __init__(self):
super().__init__()
self._ws_ready = True
def on_start(self):
self.subscribe_bookl1(symbols=[SYMBOL])
self.set_reconnect_reconcile_grace_ms(ExchangeType.OKX, grace_ms=1200)
# ── 1. Idempotent WS order submission ──────────────────────────────
def on_bookl1(self, bookl1: BookL1):
if not self._ws_ready:
return
if bookl1.ask < 90_000:
self.create_order_ws(
symbol=SYMBOL,
side=OrderSide.BUY,
type=OrderType.LIMIT,
amount=Decimal("0.001"),
price=self.price_to_precision(SYMBOL, bookl1.ask),
idempotency_key="entry:btc:long",
ws_fallback=True, # REST retry if WS drops mid-flight
)
# ── 2. Structured ACK error handling ───────────────────────────────
def on_ws_order_request_result(self, result: dict):
rt = result["result_type"]
oid = result["oid"]
reason = result["reason"]
if rt == WsOrderResultType.ACK_TIMEOUT_CONFIRMED:
self.log.info(f"Order {oid} confirmed via REST after ACK timeout")
elif rt == WsOrderResultType.ACK_REJECTED:
self.log.warning(f"Order {oid} rejected: {reason}")
elif rt == WsOrderResultType.ACK_TIMEOUT:
order = self.fetch_order(SYMBOL, oid, force_refresh=True)
if order is None:
self.log.error(f"Order {oid} missing after timeout — resubmit?")
# ── 3. Reconnect awareness ─────────────────────────────────────────
def on_private_ws_status(self, status: dict):
if status["event"] == "disconnected":
self._ws_ready = False
elif status["event"] in ("reconnected", "resynced"):
self._ws_ready = True
def on_private_ws_resync_diff(self, payload: dict):
diff = payload.get("diff", {})
removed = diff.get("open_orders_removed", [])
if removed:
self.log.warning(f"Orders gone during outage: {removed}")
# ── 4. Standard order lifecycle callbacks ──────────────────────────
def on_filled_order(self, order: Order):
self.log.info(f"Filled {order.oid} @ {order.avg_price}")
def on_failed_order(self, order: Order):
self.log.error(f"Failed {order.oid}: {order.reason}")