Complete Python Client
Here’s a full-featured Python WebSocket client for the QuantCite API:Copy
import asyncio
import websockets
import json
import logging
from datetime import datetime
import signal
import sys
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class QuantCiteClient:
def __init__(self, api_key, base_url="data.quantcite.com"):
self.api_key = api_key
self.base_url = base_url
self.uri = f"wss://{base_url}/api/v1/ws"
self.websocket = None
self.authenticated = False
self.subscriptions = set()
self.running = True
async def connect(self):
"""Establish WebSocket connection and authenticate"""
try:
logger.info(f"Connecting to {self.uri}")
self.websocket = await websockets.connect(self.uri)
logger.info("WebSocket connected successfully")
# Wait for welcome message
welcome_msg = await self.websocket.recv()
logger.info(f"Welcome message: {json.loads(welcome_msg)}")
# Authenticate
await self.authenticate()
return True
except Exception as e:
logger.error(f"Connection failed: {e}")
return False
async def authenticate(self):
"""Authenticate the WebSocket session"""
auth_msg = {
"type": "authenticate",
"api_key": self.api_key
}
await self.websocket.send(json.dumps(auth_msg))
logger.info("Authentication message sent")
# Wait for authentication response
auth_response = await self.websocket.recv()
auth_data = json.loads(auth_response)
if auth_data.get("type") == "authentication_success":
self.authenticated = True
logger.info("Authentication successful")
logger.info(f"User ID: {auth_data.get('user_id')}")
logger.info(f"Tier: {auth_data.get('tier')}")
# Log data usage
usage = auth_data.get('data_usage', {})
logger.info(f"Data usage: {usage.get('used_gb', 0):.2f}GB / {usage.get('limit_gb', 50)}GB")
else:
logger.error(f"Authentication failed: {auth_data.get('message', 'Unknown error')}")
raise Exception("Authentication failed")
async def subscribe_to_symbol(self, symbol, exchanges=None):
"""Subscribe to orderbook updates for a symbol"""
if not self.authenticated:
logger.error("Not authenticated. Cannot subscribe.")
return False
subscribe_msg = {
"type": "subscribe_aggregated",
"symbol": symbol,
"exchanges": exchanges or "all"
}
await self.websocket.send(json.dumps(subscribe_msg))
self.subscriptions.add(symbol)
logger.info(f"Subscription request sent for {symbol}")
return True
async def unsubscribe_from_symbol(self, symbol):
"""Unsubscribe from a symbol"""
if symbol not in self.subscriptions:
logger.warning(f"Not subscribed to {symbol}")
return False
unsubscribe_msg = {
"type": "unsubscribe_aggregated",
"symbol": symbol
}
await self.websocket.send(json.dumps(unsubscribe_msg))
self.subscriptions.remove(symbol)
logger.info(f"Unsubscribed from {symbol}")
return True
async def get_current_orderbook(self, symbol):
"""Request current orderbook snapshot"""
if not self.authenticated:
logger.error("Not authenticated. Cannot request orderbook.")
return False
request_msg = {
"type": "get_aggregated_orderbook",
"symbol": symbol
}
await self.websocket.send(json.dumps(request_msg))
logger.info(f"Orderbook snapshot requested for {symbol}")
return True
async def get_exchanges(self):
"""Request list of supported exchanges"""
if not self.authenticated:
logger.error("Not authenticated. Cannot request exchanges.")
return False
request_msg = {"type": "get_exchanges"}
await self.websocket.send(json.dumps(request_msg))
logger.info("Exchange list requested")
return True
async def get_trading_pairs(self, exchange=None):
"""Request trading pairs for all or specific exchange"""
if not self.authenticated:
logger.error("Not authenticated. Cannot request trading pairs.")
return False
request_msg = {"type": "get_pairs"}
if exchange:
request_msg["exchange"] = exchange
await self.websocket.send(json.dumps(request_msg))
logger.info(f"Trading pairs requested for {exchange or 'all exchanges'}")
return True
async def send_ping(self):
"""Send ping to maintain connection"""
if self.websocket:
ping_msg = {"type": "ping"}
await self.websocket.send(json.dumps(ping_msg))
async def listen_for_messages(self):
"""Listen for incoming WebSocket messages"""
if not self.websocket:
logger.error("WebSocket not connected")
return
try:
while self.running:
message = await self.websocket.recv()
data = json.loads(message)
await self.handle_message(data)
except websockets.exceptions.ConnectionClosed:
logger.info("WebSocket connection closed")
except Exception as e:
logger.error(f"Error listening for messages: {e}")
async def handle_message(self, data):
"""Handle incoming WebSocket messages"""
msg_type = data.get("type")
if msg_type == "aggregated_orderbook_update":
await self.handle_orderbook_update(data)
elif msg_type == "aggregated_subscription_response":
symbol = data.get("symbol")
success = data.get("success")
if success:
logger.info(f"Successfully subscribed to {symbol}")
else:
logger.error(f"Failed to subscribe to {symbol}: {data.get('message')}")
elif msg_type == "unsubscribe_response":
symbol = data.get("symbol")
logger.info(f"Unsubscribed from {symbol}")
elif msg_type == "exchanges_data":
await self.handle_exchanges_data(data)
elif msg_type == "pairs_data":
await self.handle_pairs_data(data)
elif msg_type == "aggregated_orderbook_data":
await self.handle_orderbook_snapshot(data)
elif msg_type == "data_limit_warning":
logger.warning(f"Data limit warning: {data.get('message')}")
usage = data.get('data_usage', {})
logger.warning(f"Usage: {usage.get('used_gb')}GB / {usage.get('limit_gb')}GB")
elif msg_type == "rate_limit_warning":
logger.warning(f"Rate limit warning: {data.get('message')}")
elif msg_type == "error":
logger.error(f"Error: {data.get('error')} - {data.get('message')}")
elif msg_type == "pong":
latency = data.get('latency_ms', 0)
logger.debug(f"Pong received (latency: {latency}ms)")
else:
logger.info(f"Received {msg_type}: {data}")
async def handle_orderbook_update(self, data):
"""Handle real-time orderbook updates"""
symbol = data.get("symbol")
update_num = data.get("update_number")
orderbook = data.get("data", {})
# Extract market statistics
stats = orderbook.get("market_stats", {})
best_bid = stats.get("best_bid")
best_ask = stats.get("best_ask")
spread = stats.get("spread")
logger.info(f"{symbol} Update #{update_num}: "
f"Bid: {best_bid}, Ask: {best_ask}, Spread: {spread}")
# Process bids and asks if needed
bids = orderbook.get("bids", [])
asks = orderbook.get("asks", [])
logger.debug(f"{symbol} - {len(bids)} bids, {len(asks)} asks")
async def handle_orderbook_snapshot(self, data):
"""Handle orderbook snapshot response"""
symbol = data.get("symbol")
timestamp = data.get("timestamp")
orderbook = data.get("data", {})
stats = orderbook.get("market_stats", {})
logger.info(f"{symbol} Snapshot at {timestamp}: "
f"Bid: {stats.get('best_bid')}, Ask: {stats.get('best_ask')}")
async def handle_exchanges_data(self, data):
"""Handle exchanges list response"""
exchanges_info = data.get("data", {})
supported = exchanges_info.get("supported_exchanges", [])
active = exchanges_info.get("active_exchanges", [])
logger.info(f"Supported exchanges: {len(supported)}")
logger.info(f"Active exchanges: {len(active)}")
logger.debug(f"Active: {', '.join(active[:10])}") # Show first 10
async def handle_pairs_data(self, data):
"""Handle trading pairs response"""
pairs_info = data.get("data", {})
if "exchange" in pairs_info:
# Single exchange response
exchange = pairs_info["exchange"]
pairs = pairs_info.get("pairs", [])
logger.info(f"{exchange}: {len(pairs)} trading pairs")
else:
# All exchanges response
total_exchanges = pairs_info.get("total_exchanges", 0)
total_pairs = pairs_info.get("total_pairs", 0)
logger.info(f"Total: {total_exchanges} exchanges, {total_pairs} pairs")
async def start_ping_loop(self):
"""Send periodic ping messages"""
while self.running:
try:
await asyncio.sleep(30) # Ping every 30 seconds
if self.websocket and self.authenticated:
await self.send_ping()
except Exception as e:
logger.error(f"Ping failed: {e}")
async def run(self):
"""Main run loop"""
# Connect and authenticate
if not await self.connect():
return
# Start ping loop
ping_task = asyncio.create_task(self.start_ping_loop())
# Start listening for messages
listen_task = asyncio.create_task(self.listen_for_messages())
try:
# Wait for both tasks
await asyncio.gather(ping_task, listen_task)
except KeyboardInterrupt:
logger.info("Shutting down...")
finally:
self.running = False
if self.websocket:
await self.websocket.close()
def stop(self):
"""Stop the client"""
self.running = False
async def main():
# Initialize client with your API key
client = QuantCiteClient("demo_key_123")
# Handle graceful shutdown
def signal_handler():
logger.info("Received shutdown signal")
client.stop()
# Set up signal handlers
for sig in [signal.SIGTERM, signal.SIGINT]:
signal.signal(sig, lambda s, f: signal_handler())
try:
# Connect and authenticate
if await client.connect():
# Get available exchanges
await client.get_exchanges()
await asyncio.sleep(1) # Wait for response
# Subscribe to BTC/USDT on specific exchanges
await client.subscribe_to_symbol("BTC/USDT", ["binance", "okx", "bybit"])
await asyncio.sleep(1) # Wait for subscription confirmation
# Get current orderbook snapshot
await client.get_current_orderbook("BTC/USDT")
await asyncio.sleep(1) # Wait for snapshot
# Subscribe to ETH/USDT on all exchanges
await client.subscribe_to_symbol("ETH/USDT", "all")
# Start the main message loop
await client.listen_for_messages()
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
if client.websocket:
await client.websocket.close()
if __name__ == "__main__":
# Run the client
asyncio.run(main())
Installation Requirements
Create arequirements.txt file:
Copy
websockets>=11.0.3
asyncio
Copy
pip install -r requirements.txt
Usage Examples
Basic Usage
Copy
import asyncio
from quantcite_client import QuantCiteClient
async def simple_example():
client = QuantCiteClient("your_api_key_here")
if await client.connect():
# Subscribe to Bitcoin data
await client.subscribe_to_symbol("BTC/USDT", ["binance", "okx"])
# Listen for 60 seconds
await asyncio.sleep(60)
# Clean shutdown
await client.websocket.close()
asyncio.run(simple_example())
Multiple Symbols
Copy
async def multi_symbol_example():
client = QuantCiteClient("your_api_key_here")
if await client.connect():
symbols = ["BTC/USDT", "ETH/USDT", "BNB/USDT"]
for symbol in symbols:
await client.subscribe_to_symbol(symbol, "all")
await asyncio.sleep(0.5) # Avoid rate limits
# Listen for updates
await client.listen_for_messages()
asyncio.run(multi_symbol_example())
Key Features
Async/Await Support
Built with Python’s asyncio for efficient concurrent processing of real-time data streams.
Automatic Reconnection
Handles connection drops gracefully with exponential backoff retry logic.
Message Handling
Comprehensive message type handling with proper error management and logging.
Health Monitoring
Built-in ping/pong mechanism to maintain connection health and monitor latency.
Error Handling
The client includes comprehensive error handling:- Connection Errors: Automatic retry with exponential backoff
- Authentication Failures: Clear error messages and proper cleanup
- Rate Limiting: Respects API rate limits with appropriate delays
- Data Limit Warnings: Monitors usage and provides warnings
- Network Issues: Graceful handling of network interruptions
Remember to replace
"demo_key_123" with your actual API key. Monitor your data usage regularly to stay within the 50GB monthly limit.