Skip to main content

Complete Python Client

Here’s a full-featured Python WebSocket client for the QuantCite API:
import asyncio
import websockets
import json
import logging
from datetime import datetime
import signal
import sys

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class QuantCiteClient:
    def __init__(self, api_key, base_url="data.quantcite.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.uri = f"wss://{base_url}/api/v1/ws"
        self.websocket = None
        self.authenticated = False
        self.subscriptions = set()
        self.running = True
        
    async def connect(self):
        """Establish WebSocket connection and authenticate"""
        try:
            logger.info(f"Connecting to {self.uri}")
            self.websocket = await websockets.connect(self.uri)
            logger.info("WebSocket connected successfully")
            
            # Wait for welcome message
            welcome_msg = await self.websocket.recv()
            logger.info(f"Welcome message: {json.loads(welcome_msg)}")
            
            # Authenticate
            await self.authenticate()
            
            return True
            
        except Exception as e:
            logger.error(f"Connection failed: {e}")
            return False
    
    async def authenticate(self):
        """Authenticate the WebSocket session"""
        auth_msg = {
            "type": "authenticate",
            "api_key": self.api_key
        }
        
        await self.websocket.send(json.dumps(auth_msg))
        logger.info("Authentication message sent")
        
        # Wait for authentication response
        auth_response = await self.websocket.recv()
        auth_data = json.loads(auth_response)
        
        if auth_data.get("type") == "authentication_success":
            self.authenticated = True
            logger.info("Authentication successful")
            logger.info(f"User ID: {auth_data.get('user_id')}")
            logger.info(f"Tier: {auth_data.get('tier')}")
            
            # Log data usage
            usage = auth_data.get('data_usage', {})
            logger.info(f"Data usage: {usage.get('used_gb', 0):.2f}GB / {usage.get('limit_gb', 50)}GB")
            
        else:
            logger.error(f"Authentication failed: {auth_data.get('message', 'Unknown error')}")
            raise Exception("Authentication failed")
    
    async def subscribe_to_symbol(self, symbol, exchanges=None):
        """Subscribe to orderbook updates for a symbol"""
        if not self.authenticated:
            logger.error("Not authenticated. Cannot subscribe.")
            return False
            
        subscribe_msg = {
            "type": "subscribe_aggregated",
            "symbol": symbol,
            "exchanges": exchanges or "all"
        }
        
        await self.websocket.send(json.dumps(subscribe_msg))
        self.subscriptions.add(symbol)
        logger.info(f"Subscription request sent for {symbol}")
        
        return True
    
    async def unsubscribe_from_symbol(self, symbol):
        """Unsubscribe from a symbol"""
        if symbol not in self.subscriptions:
            logger.warning(f"Not subscribed to {symbol}")
            return False
            
        unsubscribe_msg = {
            "type": "unsubscribe_aggregated",
            "symbol": symbol
        }
        
        await self.websocket.send(json.dumps(unsubscribe_msg))
        self.subscriptions.remove(symbol)
        logger.info(f"Unsubscribed from {symbol}")
        
        return True
    
    async def get_current_orderbook(self, symbol):
        """Request current orderbook snapshot"""
        if not self.authenticated:
            logger.error("Not authenticated. Cannot request orderbook.")
            return False
            
        request_msg = {
            "type": "get_aggregated_orderbook",
            "symbol": symbol
        }
        
        await self.websocket.send(json.dumps(request_msg))
        logger.info(f"Orderbook snapshot requested for {symbol}")
        
        return True
    
    async def get_exchanges(self):
        """Request list of supported exchanges"""
        if not self.authenticated:
            logger.error("Not authenticated. Cannot request exchanges.")
            return False
            
        request_msg = {"type": "get_exchanges"}
        await self.websocket.send(json.dumps(request_msg))
        logger.info("Exchange list requested")
        
        return True
    
    async def get_trading_pairs(self, exchange=None):
        """Request trading pairs for all or specific exchange"""
        if not self.authenticated:
            logger.error("Not authenticated. Cannot request trading pairs.")
            return False
            
        request_msg = {"type": "get_pairs"}
        if exchange:
            request_msg["exchange"] = exchange
            
        await self.websocket.send(json.dumps(request_msg))
        logger.info(f"Trading pairs requested for {exchange or 'all exchanges'}")
        
        return True
    
    async def send_ping(self):
        """Send ping to maintain connection"""
        if self.websocket:
            ping_msg = {"type": "ping"}
            await self.websocket.send(json.dumps(ping_msg))
    
    async def listen_for_messages(self):
        """Listen for incoming WebSocket messages"""
        if not self.websocket:
            logger.error("WebSocket not connected")
            return
            
        try:
            while self.running:
                message = await self.websocket.recv()
                data = json.loads(message)
                await self.handle_message(data)
                
        except websockets.exceptions.ConnectionClosed:
            logger.info("WebSocket connection closed")
        except Exception as e:
            logger.error(f"Error listening for messages: {e}")
    
    async def handle_message(self, data):
        """Handle incoming WebSocket messages"""
        msg_type = data.get("type")
        
        if msg_type == "aggregated_orderbook_update":
            await self.handle_orderbook_update(data)
            
        elif msg_type == "aggregated_subscription_response":
            symbol = data.get("symbol")
            success = data.get("success")
            if success:
                logger.info(f"Successfully subscribed to {symbol}")
            else:
                logger.error(f"Failed to subscribe to {symbol}: {data.get('message')}")
                
        elif msg_type == "unsubscribe_response":
            symbol = data.get("symbol")
            logger.info(f"Unsubscribed from {symbol}")
            
        elif msg_type == "exchanges_data":
            await self.handle_exchanges_data(data)
            
        elif msg_type == "pairs_data":
            await self.handle_pairs_data(data)
            
        elif msg_type == "aggregated_orderbook_data":
            await self.handle_orderbook_snapshot(data)
            
        elif msg_type == "data_limit_warning":
            logger.warning(f"Data limit warning: {data.get('message')}")
            usage = data.get('data_usage', {})
            logger.warning(f"Usage: {usage.get('used_gb')}GB / {usage.get('limit_gb')}GB")
            
        elif msg_type == "rate_limit_warning":
            logger.warning(f"Rate limit warning: {data.get('message')}")
            
        elif msg_type == "error":
            logger.error(f"Error: {data.get('error')} - {data.get('message')}")
            
        elif msg_type == "pong":
            latency = data.get('latency_ms', 0)
            logger.debug(f"Pong received (latency: {latency}ms)")
            
        else:
            logger.info(f"Received {msg_type}: {data}")
    
    async def handle_orderbook_update(self, data):
        """Handle real-time orderbook updates"""
        symbol = data.get("symbol")
        update_num = data.get("update_number")
        orderbook = data.get("data", {})
        
        # Extract market statistics
        stats = orderbook.get("market_stats", {})
        best_bid = stats.get("best_bid")
        best_ask = stats.get("best_ask")
        spread = stats.get("spread")
        
        logger.info(f"{symbol} Update #{update_num}: "
                   f"Bid: {best_bid}, Ask: {best_ask}, Spread: {spread}")
        
        # Process bids and asks if needed
        bids = orderbook.get("bids", [])
        asks = orderbook.get("asks", [])
        
        logger.debug(f"{symbol} - {len(bids)} bids, {len(asks)} asks")
    
    async def handle_orderbook_snapshot(self, data):
        """Handle orderbook snapshot response"""
        symbol = data.get("symbol")
        timestamp = data.get("timestamp")
        orderbook = data.get("data", {})
        
        stats = orderbook.get("market_stats", {})
        logger.info(f"{symbol} Snapshot at {timestamp}: "
                   f"Bid: {stats.get('best_bid')}, Ask: {stats.get('best_ask')}")
    
    async def handle_exchanges_data(self, data):
        """Handle exchanges list response"""
        exchanges_info = data.get("data", {})
        supported = exchanges_info.get("supported_exchanges", [])
        active = exchanges_info.get("active_exchanges", [])
        
        logger.info(f"Supported exchanges: {len(supported)}")
        logger.info(f"Active exchanges: {len(active)}")
        logger.debug(f"Active: {', '.join(active[:10])}")  # Show first 10
    
    async def handle_pairs_data(self, data):
        """Handle trading pairs response"""
        pairs_info = data.get("data", {})
        
        if "exchange" in pairs_info:
            # Single exchange response
            exchange = pairs_info["exchange"]
            pairs = pairs_info.get("pairs", [])
            logger.info(f"{exchange}: {len(pairs)} trading pairs")
        else:
            # All exchanges response
            total_exchanges = pairs_info.get("total_exchanges", 0)
            total_pairs = pairs_info.get("total_pairs", 0)
            logger.info(f"Total: {total_exchanges} exchanges, {total_pairs} pairs")
    
    async def start_ping_loop(self):
        """Send periodic ping messages"""
        while self.running:
            try:
                await asyncio.sleep(30)  # Ping every 30 seconds
                if self.websocket and self.authenticated:
                    await self.send_ping()
            except Exception as e:
                logger.error(f"Ping failed: {e}")
    
    async def run(self):
        """Main run loop"""
        # Connect and authenticate
        if not await self.connect():
            return
        
        # Start ping loop
        ping_task = asyncio.create_task(self.start_ping_loop())
        
        # Start listening for messages
        listen_task = asyncio.create_task(self.listen_for_messages())
        
        try:
            # Wait for both tasks
            await asyncio.gather(ping_task, listen_task)
        except KeyboardInterrupt:
            logger.info("Shutting down...")
        finally:
            self.running = False
            if self.websocket:
                await self.websocket.close()
    
    def stop(self):
        """Stop the client"""
        self.running = False


async def main():
    # Initialize client with your API key
    client = QuantCiteClient("demo_key_123")
    
    # Handle graceful shutdown
    def signal_handler():
        logger.info("Received shutdown signal")
        client.stop()
    
    # Set up signal handlers
    for sig in [signal.SIGTERM, signal.SIGINT]:
        signal.signal(sig, lambda s, f: signal_handler())
    
    try:
        # Connect and authenticate
        if await client.connect():
            
            # Get available exchanges
            await client.get_exchanges()
            await asyncio.sleep(1)  # Wait for response
            
            # Subscribe to BTC/USDT on specific exchanges
            await client.subscribe_to_symbol("BTC/USDT", ["binance", "okx", "bybit"])
            await asyncio.sleep(1)  # Wait for subscription confirmation
            
            # Get current orderbook snapshot
            await client.get_current_orderbook("BTC/USDT")
            await asyncio.sleep(1)  # Wait for snapshot
            
            # Subscribe to ETH/USDT on all exchanges
            await client.subscribe_to_symbol("ETH/USDT", "all")
            
            # Start the main message loop
            await client.listen_for_messages()
            
    except KeyboardInterrupt:
        logger.info("Interrupted by user")
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
    finally:
        if client.websocket:
            await client.websocket.close()


if __name__ == "__main__":
    # Run the client
    asyncio.run(main())

Installation Requirements

Create a requirements.txt file:
websockets>=11.0.3
asyncio
Install dependencies:
pip install -r requirements.txt

Usage Examples

Basic Usage

import asyncio
from quantcite_client import QuantCiteClient

async def simple_example():
    client = QuantCiteClient("your_api_key_here")
    
    if await client.connect():
        # Subscribe to Bitcoin data
        await client.subscribe_to_symbol("BTC/USDT", ["binance", "okx"])
        
        # Listen for 60 seconds
        await asyncio.sleep(60)
        
        # Clean shutdown
        await client.websocket.close()

asyncio.run(simple_example())

Multiple Symbols

async def multi_symbol_example():
    client = QuantCiteClient("your_api_key_here")
    
    if await client.connect():
        symbols = ["BTC/USDT", "ETH/USDT", "BNB/USDT"]
        
        for symbol in symbols:
            await client.subscribe_to_symbol(symbol, "all")
            await asyncio.sleep(0.5)  # Avoid rate limits
        
        # Listen for updates
        await client.listen_for_messages()

asyncio.run(multi_symbol_example())

Key Features

Async/Await Support

Built with Python’s asyncio for efficient concurrent processing of real-time data streams.

Automatic Reconnection

Handles connection drops gracefully with exponential backoff retry logic.

Message Handling

Comprehensive message type handling with proper error management and logging.

Health Monitoring

Built-in ping/pong mechanism to maintain connection health and monitor latency.

Error Handling

The client includes comprehensive error handling:
  • Connection Errors: Automatic retry with exponential backoff
  • Authentication Failures: Clear error messages and proper cleanup
  • Rate Limiting: Respects API rate limits with appropriate delays
  • Data Limit Warnings: Monitors usage and provides warnings
  • Network Issues: Graceful handling of network interruptions
Remember to replace "demo_key_123" with your actual API key. Monitor your data usage regularly to stay within the 50GB monthly limit.