Reliable WS Order Flow

This guide shows how to combine NexusTrader’s WS reliability features into a production-grade strategy that handles duplicate signals, connection drops, and delayed exchange ACKs without manual bookkeeping.

The four pillars covered here are:

  1. Idempotent order creationidempotency_key prevents duplicate submissions

  2. WS fallback — automatic REST retry when the WS socket is down

  3. Structured ACK error handlingon_ws_order_request_result() callback

  4. Reconnect reconciliationon_private_ws_resync_diff() and on_private_ws_status()


Idempotent Order Creation

Problem

A fast signal loop may call create_order_ws() several times before the first exchange ACK arrives (e.g. every tick while price is in range). Without a deduplication mechanism this floods the exchange with identical orders.

Solution

Pass an idempotency_key that is stable for the logical order. The cache maps every key to a single canonical OID; repeated calls with the same key are silently no-ops.

from decimal import Decimal
from nexustrader.strategy import Strategy
from nexustrader.constants import OrderSide, OrderType
from nexustrader.schema import BookL1

SYMBOL = "BTCUSDT-PERP.OKX"

class IdempotentEntry(Strategy):
    def on_bookl1(self, bookl1: BookL1):
        if bookl1.ask < 90_000:
            # No matter how many ticks fire, only one order is submitted
            self.create_order_ws(
                symbol=SYMBOL,
                side=OrderSide.BUY,
                type=OrderType.LIMIT,
                amount=Decimal("0.001"),
                price=self.price_to_precision(SYMBOL, bookl1.ask),
                idempotency_key="entry:btc:long",   # stable key for this signal
            )

You can also supply a deterministic client_oid (shown to the exchange as the client order ID) alongside the idempotency key:

self.create_order_ws(
    symbol=SYMBOL,
    side=OrderSide.BUY,
    type=OrderType.LIMIT,
    amount=Decimal("0.001"),
    price=Decimal("89500"),
    client_oid="entry_btc_001",       # stable exchange-visible ID
    idempotency_key="entry:btc:long", # deduplication key
)

Tip

idempotency_key and client_oid are independent. You can use one or both. idempotency_key controls local deduplication; client_oid controls the ID sent to the exchange.


WS Fallback on Connection Failure

Both create_order_ws() and cancel_order_ws() accept a ws_fallback keyword argument (default True):

ws_fallback

Behaviour when the WS socket is unavailable

True (default)

Transparently retries the same request via REST. The order lifecycle proceeds normally — no extra handler code needed.

False

Immediately marks the order FAILED / CANCEL_FAILED with reason="WS_REQUEST_NOT_SENT: …" and fires on_failed_order / on_cancel_failed_order. Use this for strict latency-critical strategies that must not silently fall back to a slower path.

# Default: if WS is down, retry via REST automatically
self.create_order_ws(symbol=SYMBOL, ..., ws_fallback=True)

# Strict: fail fast if WS is unavailable, never fall back
self.create_order_ws(symbol=SYMBOL, ..., ws_fallback=False)

# Cancel with explicit fail-fast
self.cancel_order_ws(symbol=SYMBOL, oid=oid, ws_fallback=False)

Handling WS ACK Errors

Even when the WS socket is healthy, the exchange may be slow to acknowledge or may reject the request outright. NexusTrader categorises every non-success outcome with WsOrderResultType and delivers it to the on_ws_order_request_result() callback.

result_type

When it fires

REQUEST_NOT_SENT

Socket was down and ws_fallback=False.

ACK_REJECTED

Exchange explicitly rejected the request (e.g. bad price, insufficient margin).

ACK_TIMEOUT

No ACK within 5 s and REST could not confirm the order — state truly unknown.

ACK_TIMEOUT_CONFIRMED

No ACK within 5 s but REST found the order — treat as success.

from nexustrader.constants import WsOrderResultType

class RobustStrategy(Strategy):

    def on_ws_order_request_result(self, result: dict):
        rt     = result["result_type"]
        oid    = result["oid"]
        symbol = result["symbol"]
        reason = result["reason"]

        if rt == WsOrderResultType.ACK_TIMEOUT_CONFIRMED:
            # Order exists on exchange — nothing to do
            self.log.info(f"[{symbol}] {oid} confirmed via REST after ACK timeout")

        elif rt == WsOrderResultType.ACK_REJECTED:
            # Exchange said no — log and optionally resubmit with corrected params
            self.log.warning(f"[{symbol}] order rejected: {reason}")

        elif rt == WsOrderResultType.ACK_TIMEOUT:
            # Unknown state — query REST directly to decide
            order = self.fetch_order(symbol, oid, force_refresh=True)
            if order is None:
                self.log.error(
                    f"[{symbol}] {oid} not found after ACK timeout — may need resubmit"
                )
            else:
                self.log.info(f"[{symbol}] {oid} found on exchange: {order.status}")

        elif rt == WsOrderResultType.REQUEST_NOT_SENT:
            # ws_fallback=False was used and the socket was down
            self.log.error(f"[{symbol}] WS was disconnected — {oid} was never sent")

Reconnect Reconciliation

When the private WebSocket drops and reconnects, NexusTrader automatically re-fetches balances, positions, and open orders, then emits a diff of what changed during the outage.

class RobustStrategy(Strategy):

    def on_private_ws_status(self, status: dict):
        """Fires on every connection state change."""
        event    = status["event"]   # "connected"|"disconnected"|"reconnected"|"resynced"
        exchange = status["exchange"]
        self.log.info(f"[{exchange}] WS {event}")

        if event == "disconnected":
            # Optionally suspend new order submissions until reconnected
            self._ws_ready = False
        elif event == "resynced":
            self._ws_ready = True

    def on_private_ws_resync_diff(self, payload: dict):
        """Fires once after the post-reconnect snapshot is reconciled."""
        diff = payload.get("diff", {})

        closed = diff.get("open_orders_removed", [])
        if closed:
            self.log.warning(
                f"Orders closed during disconnect: {closed}"
            )
            # Optionally resubmit market orders for filled/expired positions

        opened = diff.get("open_orders_added", [])
        if opened:
            self.log.info(f"New open orders detected after reconnect: {opened}")

        positions_closed = diff.get("positions_closed", [])
        if positions_closed:
            self.log.warning(f"Positions closed during outage: {positions_closed}")

Adjusting the reconciliation grace window

To avoid false order closures from delayed exchange snapshots, a grace period (default 700 ms) is applied before an order missing from the snapshot is confirmed as closed. Increase it on high-latency connections:

from nexustrader.constants import ExchangeType

class RobustStrategy(Strategy):
    def on_start(self):
        self.set_reconnect_reconcile_grace_ms(ExchangeType.OKX, grace_ms=1500)
        self.set_reconnect_reconcile_grace_ms(ExchangeType.BINANCE, grace_ms=1000)

Full Example

The snippet below combines all four features into a single strategy skeleton.

from decimal import Decimal
from nexustrader.strategy import Strategy
from nexustrader.constants import OrderSide, OrderType, ExchangeType, WsOrderResultType
from nexustrader.schema import BookL1, Order

SYMBOL = "BTCUSDT-PERP.OKX"

class RobustEntry(Strategy):

    def __init__(self):
        super().__init__()
        self._ws_ready = True

    def on_start(self):
        self.subscribe_bookl1(symbols=[SYMBOL])
        self.set_reconnect_reconcile_grace_ms(ExchangeType.OKX, grace_ms=1200)

    # ── 1. Idempotent WS order submission ──────────────────────────────
    def on_bookl1(self, bookl1: BookL1):
        if not self._ws_ready:
            return
        if bookl1.ask < 90_000:
            self.create_order_ws(
                symbol=SYMBOL,
                side=OrderSide.BUY,
                type=OrderType.LIMIT,
                amount=Decimal("0.001"),
                price=self.price_to_precision(SYMBOL, bookl1.ask),
                idempotency_key="entry:btc:long",
                ws_fallback=True,   # REST retry if WS drops mid-flight
            )

    # ── 2. Structured ACK error handling ───────────────────────────────
    def on_ws_order_request_result(self, result: dict):
        rt     = result["result_type"]
        oid    = result["oid"]
        reason = result["reason"]

        if rt == WsOrderResultType.ACK_TIMEOUT_CONFIRMED:
            self.log.info(f"Order {oid} confirmed via REST after ACK timeout")
        elif rt == WsOrderResultType.ACK_REJECTED:
            self.log.warning(f"Order {oid} rejected: {reason}")
        elif rt == WsOrderResultType.ACK_TIMEOUT:
            order = self.fetch_order(SYMBOL, oid, force_refresh=True)
            if order is None:
                self.log.error(f"Order {oid} missing after timeout — resubmit?")

    # ── 3. Reconnect awareness ─────────────────────────────────────────
    def on_private_ws_status(self, status: dict):
        if status["event"] == "disconnected":
            self._ws_ready = False
        elif status["event"] in ("reconnected", "resynced"):
            self._ws_ready = True

    def on_private_ws_resync_diff(self, payload: dict):
        diff = payload.get("diff", {})
        removed = diff.get("open_orders_removed", [])
        if removed:
            self.log.warning(f"Orders gone during outage: {removed}")

    # ── 4. Standard order lifecycle callbacks ──────────────────────────
    def on_filled_order(self, order: Order):
        self.log.info(f"Filled {order.oid} @ {order.avg_price}")

    def on_failed_order(self, order: Order):
        self.log.error(f"Failed {order.oid}: {order.reason}")