Rui Tao's Portfolio

Building a High-Performance AI Service with WebSocket and Async Python

A close up of a network with wires connected to it
Published on
/4 mins read/---

In modern web applications, especially those involving AI services, handling concurrent requests efficiently while maintaining real-time communication is crucial. This article explores how to build a high-performance AI service using WebSocket and async Python, focusing on scalability and resource management.

The Challenge

When building AI-powered applications, we often face several challenges:

  • Handling multiple concurrent requests
  • Managing long-running AI model inference
  • Providing real-time responses
  • Efficiently utilizing system resources
  • Maintaining code maintainability

Let's see how we can address these challenges using WebSocket and async Python.

Architecture Overview

Our AI service uses a WebSocket-based architecture with async request processing:

Client (Node.js) <--WebSocket--> Python AI Service
                                    |
                                    |--> Model Managers
                                    |     |- DeepSeek
                                    |     |- Claude
                                    |     |- GPT
                                    |
                                    |--> Connection Manager
                                          |- Request Queue
                                          |- Processor Pool

This architecture enables:

  • Real-time bidirectional communication
  • Efficient request queuing
  • Dynamic processor scaling
  • Resource isolation

Key Components

Connection Manager

The heart of our system is the Connection Manager, which handles WebSocket connections and request processing:

class ConnectionManager:
    def __init__(self):
        # Request queue for each WebSocket connection
        self.active_connections: Dict[WebSocket, asyncio.Queue] = {}
        # Processor task pool for each connection
        self.processors: Dict[WebSocket, List[asyncio.Task]] = {}
        self.initial_processors = 10  # Initial processor count
        self.max_processors = 1000    # Maximum processor limit

Request Processing Flow

The request processing involves several steps:

  1. Connection Establishment
async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections[websocket] = asyncio.Queue()
    self.processors[websocket] = []
    
    # Initialize processor pool
    for _ in range(self.initial_processors):
        processor = asyncio.create_task(self.request_processor(websocket))
        self.processors[websocket].append(processor)
  1. Dynamic Processor Scaling
async def request_processor(self, websocket: WebSocket):
    queue = self.active_connections[websocket]
    while True:
        try:
            # Scale processors based on queue size
            if (queue.qsize() > len(self.processors[websocket]) and 
                len(self.processors[websocket]) < self.max_processors):
                new_processor = asyncio.create_task(
                    self.request_processor(websocket)
                )
                self.processors[websocket].append(new_processor)
 
            # Process request
            data = await queue.get()
            try:
                async for response in ai_service.generate_response(...):
                    await websocket.send_text(response.json())
            finally:
                queue.task_done()
        except asyncio.CancelledError:
            break

Performance Optimizations

1. Processor Pool Management

We implement several optimizations for processor pool management:

  • Start with a small number of processors (10)
  • Dynamically scale based on queue length
  • Set maximum processor limit (1000)
  • Clean up resources on disconnection
async def disconnect(self, websocket: WebSocket):
    # Cancel all processors
    if websocket in self.processors:
        for processor in self.processors[websocket]:
            processor.cancel()
        await asyncio.gather(*self.processors[websocket], 
                           return_exceptions=True)
        del self.processors[websocket]
    
    # Cleanup connection and queue
    if websocket in self.active_connections:
        del self.active_connections[websocket]

2. Stream Response Support

For better user experience, we implement streaming responses from AI models:

async def _stream_response(self, messages, request, last_transcript_id):
    async for chunk in model_manager.astream(messages, ...):
        yield AIResponse(
            type="chunk",
            content=chunk,
            ...
        )

3. Timeout Control

We implement timeout protection for API calls:

done, pending = await asyncio.wait(
    [api_task],
    timeout=10.0,
    return_when=asyncio.FIRST_COMPLETED
)

Error Handling

Robust error handling is crucial for a production system:

1. Request Level Errors

try:
    async for response in ai_service.generate_response(...):
        await websocket.send_text(response.json())
except Exception as e:
    logger.error(f"Error processing request: {e}")
    # Return error response to client

2. Connection Level Errors

try:
    while True:
        message = await websocket.receive_text()
        data = AIRequest.model_validate_json(message)
        await queue.put(data)
except WebSocketDisconnect:
    # Handle disconnection
    await connection_manager.disconnect(websocket)

Benefits and Limitations

Benefits

  1. High Concurrency

    • Multiple concurrent requests per WebSocket connection
    • Dynamic processor pool scaling
  2. Resource Efficiency

    • Coroutines instead of threads
    • Ordered request processing through queues
  3. Error Isolation

    • Request-level error handling
    • No impact on other requests
  4. Real-time Response

    • AI model streaming support
    • Enhanced user experience

Limitations

  1. Resource Constraints

    • Processor pool limit (1000)
    • API timeout (10 seconds)
  2. Memory Usage

    • Need to monitor processor pool size
    • Resource cleanup importance

Future Improvements

Here are some potential improvements for the system:

  1. Monitoring Metrics

    • Add processor pool usage monitoring
    • Request processing time statistics
  2. Adaptive Scaling

    • Auto-adjust processor count based on load
    • Implement smarter resource management
  3. Retry Mechanism

    • Add failed request retry
    • Implement more reliable error recovery

Conclusion

Building a high-performance AI service requires careful consideration of concurrency, resource management, and error handling. By using WebSocket with async Python and implementing a processor pool architecture, we've created a scalable system that can handle multiple concurrent requests efficiently while maintaining code maintainability and stability.

The combination of queues and processor pools provides a robust foundation for handling AI requests, while features like streaming responses and dynamic scaling ensure a good user experience. While there are limitations to consider, the system provides a solid base that can be further improved with monitoring and adaptive scaling.

References