WebSocket Transaction Streaming
The Unified Transaction APIs support real-time transaction streaming via WebSocket connections, using the same authentication pattern as HTTP APIs.
Connection URL
wss://data.quantcite.com/api/v1/ws
Authentication Flow
WebSocket connections use the same authentication pattern as HTTP APIs:
- Connect to WebSocket endpoint
- Authenticate with API key
- Use authenticated session for all subsequent commands
Authentication
Step 1: Connect and Authenticate
import asyncio
import websockets
import json
async def connect_and_authenticate():
uri = "wss://data.quantcite.com/api/v1/ws"
async with websockets.connect(uri) as websocket:
# Send authentication message
auth_message = {
"type": "authenticate",
"api_key": "YOUR_API_KEY"
}
await websocket.send(json.dumps(auth_message))
# Wait for authentication response
response = await websocket.recv()
auth_data = json.loads(response)
if auth_data.get("type") == "welcome":
print(f"Authenticated successfully: {auth_data}")
return websocket
else:
print(f"Authentication failed: {auth_data}")
return None
# Run the connection
websocket = asyncio.run(connect_and_authenticate())
Authentication Request
{
"type": "authenticate",
"api_key": "YOUR_API_KEY"
}
Authentication Response
{
"type": "welcome",
"session_id": "860a5a96-0da4-4154-9d13-a79d07121758",
"user_id": "cb88461f-421a-4ac8-9722-afe248a40ae6",
"user_tier": "premium",
"message": "Authenticated successfully as premium user",
"authentication": {
"api_key": "demo_key...",
"status": "authenticated"
}
}
Available Actions
After authentication, you can use these WebSocket actions:
1. Subscribe to Transactions
Subscribe to real-time transaction updates for a specific exchange.
Request
{
"action": "subscribe_transactions",
"exchange": "bybit",
"testnet": false,
"transaction_types": ["deposits", "withdrawals", "trades"]
}
2. Subscribe to Transactions (after authentication) - Your Own Credentials
{
"action": "subscribe_transactions",
"exchange": "bybit",
"testnet": false,
"transaction_types": ["deposits", "withdrawals", "trades"],
"credentials": {
"api_key": "YOUR_BYBIT_API_KEY",
"secret": "YOUR_BYBIT_SECRET"
}
}
Example: OKX WebSocket with Passphrase
{
"action": "subscribe_transactions",
"exchange": "okx",
"testnet": false,
"transaction_types": ["deposits", "withdrawals", "spot_trades"],
"credentials": {
"api_key": "YOUR_OKX_API_KEY",
"secret": "YOUR_OKX_SECRET",
"passphrase": "YOUR_OKX_PASSPHRASE"
}
}
Parameters
| Parameter | Type | Required | Description |
|---|
action | string | Yes | Must be “subscribe_transactions” |
exchange | string | Yes | Exchange name (e.g., “bybit”) |
testnet | boolean | No | Use testnet environment (default: false) |
transaction_types | array | No | Array of transaction types to monitor |
Response
{
"type": "transaction_response",
"success": true,
"exchange": "bybit",
"total_transactions": 9,
"transactions": [
{
"transactionId": "0x4165ad97c0...",
"timestamp": 1693526400000,
"label": "Deposit",
"receivedCurrency": {
"currency": "USDT",
"amount": 1000.0
},
"description": {
"title": "Bybit Deposit",
"desc": "USDT deposit to account"
}
}
],
"summary": {
"total_count": 9,
"by_type": {
"deposit": 8,
"withdrawal": 1
}
},
"processing_time": 2.05
}
2. Validate Credentials
Validate exchange credentials without fetching transactions.
Request (Internal Credentials)
{
"action": "validate_credentials",
"exchange": "bybit",
"testnet": false
}
Request (Your Own Credentials)
{
"action": "validate_credentials",
"exchange": "bybit",
"testnet": false,
"credentials": {
"api_key": "YOUR_BYBIT_API_KEY",
"secret": "YOUR_BYBIT_SECRET"
}
}
Request (OKX with Passphrase)
{
"action": "validate_credentials",
"exchange": "okx",
"testnet": false,
"credentials": {
"api_key": "YOUR_OKX_API_KEY",
"secret": "YOUR_OKX_SECRET",
"passphrase": "YOUR_OKX_PASSPHRASE"
}
}
Response (Success)
{
"type": "validation_response",
"success": true,
"message": "Credentials are valid (using internal credentials)",
"exchange": "bybit",
"credentials_source": "internal",
"account_info": {
"account_type": "UNIFIED",
"status": "active"
}
}
Response (Error)
{
"type": "validation_response",
"success": false,
"message": "Invalid credentials provided",
"exchange": "bybit",
"error": "authentication_failed"
}
3. Get Exchanges
Retrieve list of supported exchanges and their capabilities.
Request
{
"action": "get_exchanges"
}
Response
{
"type": "exchanges_response",
"exchanges": [
{
"exchange_name": "bybit",
"display_name": "Bybit",
"supported_transaction_types": [
"holdings",
"deposits",
"withdrawals",
"spot_orders",
"futures_linear",
"futures_inverse",
"ledger",
"leverage_token_trades",
"transfers"
],
"requires_passphrase": false,
"testnet_available": true,
"documentation_url": "/docs#tag-bybit"
}
]
}
Complete WebSocket Example
import asyncio
import websockets
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class UnifiedTransactionClient:
def __init__(self, api_key):
self.api_key = api_key
self.uri = "wss://data.quantcite.com/api/v1/ws"
self.websocket = None
async def connect(self):
"""Connect to WebSocket and authenticate"""
try:
self.websocket = await websockets.connect(self.uri)
logger.info("Connected to QuantCite WebSocket")
# Authenticate
auth_msg = {
"type": "authenticate",
"api_key": self.api_key
}
await self.websocket.send(json.dumps(auth_msg))
# Wait for auth confirmation
auth_response = await self.websocket.recv()
auth_data = json.loads(auth_response)
if auth_data.get("type") == "welcome":
logger.info(f"Authentication successful")
return True
else:
logger.error(f"Authentication failed: {auth_data}")
return False
except Exception as e:
logger.error(f"Connection failed: {e}")
return False
async def subscribe_to_transactions(self, exchange, transaction_types=None):
"""Subscribe to transaction updates"""
if not self.websocket:
logger.error("Not connected")
return
subscribe_msg = {
"action": "subscribe_transactions",
"exchange": exchange,
"testnet": False,
"transaction_types": transaction_types or ["deposits", "withdrawals", "trades"]
}
await self.websocket.send(json.dumps(subscribe_msg))
logger.info(f"Subscribed to {exchange} transactions")
async def validate_credentials(self, exchange):
"""Validate exchange credentials"""
if not self.websocket:
logger.error("Not connected")
return
validate_msg = {
"action": "validate_credentials",
"exchange": exchange,
"testnet": False
}
await self.websocket.send(json.dumps(validate_msg))
async def get_exchanges(self):
"""Get supported exchanges"""
if not self.websocket:
logger.error("Not connected")
return
exchanges_msg = {
"action": "get_exchanges"
}
await self.websocket.send(json.dumps(exchanges_msg))
async def listen_for_updates(self):
"""Listen for real-time updates"""
if not self.websocket:
logger.error("Not connected")
return
try:
async for message in self.websocket:
data = json.loads(message)
await self.handle_message(data)
except websockets.exceptions.ConnectionClosed:
logger.info("Connection closed")
except Exception as e:
logger.error(f"Error listening for updates: {e}")
async def handle_message(self, data):
"""Handle incoming WebSocket messages"""
msg_type = data.get("type")
if msg_type == "transaction_response":
logger.info(f"Received {data.get('total_transactions', 0)} transactions")
elif msg_type == "validation_response":
if data.get("success"):
logger.info(f"Credentials valid for {data.get('exchange')}")
else:
logger.error(f"Credentials invalid: {data.get('message')}")
elif msg_type == "exchanges_response":
exchanges = data.get("exchanges", [])
logger.info(f"Supported exchanges: {[ex['exchange_name'] for ex in exchanges]}")
elif msg_type == "error":
logger.error(f"Error: {data.get('error')} - {data.get('message')}")
else:
logger.info(f"Received {msg_type}: {data}")
async def main():
# Initialize client
client = UnifiedTransactionClient("YOUR_API_KEY")
# Connect and authenticate
if await client.connect():
# Get supported exchanges
await client.get_exchanges()
# Validate credentials
await client.validate_credentials("bybit")
# Subscribe to transactions
await client.subscribe_to_transactions("bybit", ["deposits", "withdrawals"])
# Listen for updates
await client.listen_for_updates()
# Run the client
if __name__ == "__main__":
asyncio.run(main())
Error Handling
Common WebSocket Errors
| Error Code | Error | Description |
|---|
4001 | authentication_failed | Invalid API key or authentication error |
4002 | invalid_action | Unknown or invalid action specified |
4003 | missing_parameters | Required parameters missing from request |
4004 | exchange_error | Error communicating with exchange |
4005 | rate_limit_exceeded | Rate limit exceeded for WebSocket messages |
{
"type": "error",
"error": "invalid_action",
"message": "Unknown action: invalid_action_name",
"code": 4002,
"timestamp": "2025-09-18T10:30:00.000Z"
}
Implement proper error handling and reconnection logic in your WebSocket clients for production use.