Source code for nexustrader.error
[docs]
class NexusTraderError(Exception):
[docs]
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
[docs]
class EngineBuildError(NexusTraderError):
[docs]
def __init__(self, message: str):
super().__init__(message)
[docs]
class SubscriptionError(NexusTraderError):
[docs]
def __init__(self, message: str):
super().__init__(message)
[docs]
class KlineSupportedError(NexusTraderError):
[docs]
def __init__(self, message: str):
super().__init__(message)
[docs]
class StrategyBuildError(NexusTraderError):
[docs]
def __init__(self, message: str):
super().__init__(message)
[docs]
class OrderError(NexusTraderError):
[docs]
def __init__(self, message: str):
super().__init__(message)
[docs]
class PositionModeError(NexusTraderError):
[docs]
def __init__(self, message: str):
super().__init__(message)
[docs]
class WsRequestNotSentError(NexusTraderError):
"""Raised when a WebSocket request could not be sent because the socket is not connected."""
[docs]
def __init__(
self, message: str = "WebSocket request not sent: connection unavailable"
):
super().__init__(message)
[docs]
class WsAckTimeoutError(NexusTraderError):
"""Raised when no ACK is received from the exchange within the timeout window."""
[docs]
def __init__(self, oid: str, timeout: float):
super().__init__(f"No WS ACK received for oid={oid} within {timeout}s")
self.oid = oid
self.timeout = timeout
[docs]
class WsAckRejectedError(NexusTraderError):
"""Raised when the exchange explicitly rejects a WebSocket order/cancel request."""
[docs]
def __init__(self, oid: str, reason: str):
super().__init__(f"WS ACK rejected for oid={oid}: {reason}")
self.oid = oid
self.reason = reason