Rui Tao's Portfolio

Implementing AI Service with WebSocket

Published on
Published on
/6 mins read/---

Core Service Implementation

First, let's implement the core AI service:

// services/ai.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';
import { AIRequest, AIResponse, ModelType, Role } from '../interfaces';
import { AIServiceError } from '../errors/ai-service.error';
 
@Injectable()
export class AIService implements OnModuleInit {
  @WebSocketServer()
  private server: Server;
 
  private activeConnections: Map<string, Set<string>> = new Map();
  private modelInstances: Map<string, any> = new Map();
 
  async onModuleInit() {
    // Initialize AI models
    await this.initializeModels();
  }
 
  private async initializeModels() {
    // Initialize different AI models
    const models = {
      'gpt-4': await this.initializeGPT4(),
      'claude-3-5-sonnet': await this.initializeClaude(),
    };
 
    Object.entries(models).forEach(([key, instance]) => {
      this.modelInstances.set(key, instance);
    });
  }
 
  async handleRequest(request: AIRequest): Promise<void> {
    try {
      // Validate request
      this.validateRequest(request);
 
      // Get AI model instance
      const model = this.modelInstances.get(request.model);
      if (!model) {
        throw new AIServiceError('MODEL_NOT_FOUND', 'AI model not available');
      }
 
      // Process request based on type
      switch (request.promptType) {
        case 'CODE':
          await this.handleCodeInterview(request, model);
          break;
        case 'BQ':
          await this.handleBehavioralInterview(request, model);
          break;
        case 'HR':
          await this.handleHRInterview(request, model);
          break;
        default:
          throw new AIServiceError('INVALID_TYPE', 'Invalid interview type');
      }
    } catch (error) {
      this.handleError(request.sessionId, error);
    }
  }
 
  private async handleCodeInterview(request: AIRequest, model: any) {
    const prompt = this.buildCodeInterviewPrompt(request);
    await this.streamResponse(request.sessionId, model, prompt);
  }
 
  private async handleBehavioralInterview(request: AIRequest, model: any) {
    const prompt = this.buildBehavioralPrompt(request);
    await this.streamResponse(request.sessionId, model, prompt);
  }
 
  private async handleHRInterview(request: AIRequest, model: any) {
    const prompt = this.buildHRPrompt(request);
    await this.streamResponse(request.sessionId, model, prompt);
  }
 
  private async streamResponse(
    sessionId: string,
    model: any,
    prompt: string
  ): Promise<void> {
    try {
      const stream = await model.createStream(prompt);
      
      for await (const chunk of stream) {
        const response: AIResponse = {
          type: 'chunk',
          responseType: 'mock',
          content: chunk,
          originalTranscriptId: sessionId,
          sessionId
        };
        
        this.server.to(sessionId).emit('response', response);
      }
 
      // Send completion message
      this.server.to(sessionId).emit('response', {
        type: 'done',
        responseType: 'mock',
        sessionId,
        originalTranscriptId: sessionId
      });
    } catch (error) {
      this.handleError(sessionId, error);
    }
  }
 
  private validateRequest(request: AIRequest): void {
    const requiredFields = [
      'transcripts',
      'model',
      'promptType',
      'role',
      'company',
      'position',
      'sessionId'
    ];
 
    const missingFields = requiredFields.filter(
      field => !request[field]
    );
 
    if (missingFields.length > 0) {
      throw new AIServiceError(
        'INVALID_REQUEST',
        `Missing required fields: ${missingFields.join(', ')}`
      );
    }
  }
 
  private handleError(sessionId: string, error: any): void {
    const response: AIResponse = {
      type: 'error',
      responseType: 'mock',
      error: error.message,
      sessionId,
      originalTranscriptId: sessionId
    };
 
    this.server.to(sessionId).emit('response', response);
  }
 
  private buildCodeInterviewPrompt(request: AIRequest): string {
    return `
      Role: ${request.role === Role.INTERVIEWER ? 'Technical Interviewer' : 'Candidate'}
      Company: ${request.company}
      Position: ${request.position}
      Interview Type: Technical/Coding
      
      Previous Conversation:
      ${request.transcripts.map(t => `${t.role}: ${t.content}`).join('\n')}
      
      Instructions:
      ${this.getCodeInterviewInstructions(request.role)}
    `;
  }
 
  private buildBehavioralPrompt(request: AIRequest): string {
    return `
      Role: ${request.role === Role.INTERVIEWER ? 'Behavioral Interviewer' : 'Candidate'}
      Company: ${request.company}
      Position: ${request.position}
      Interview Type: Behavioral
      
      Previous Conversation:
      ${request.transcripts.map(t => `${t.role}: ${t.content}`).join('\n')}
      
      Candidate Stories:
      ${request.stories?.map(s => `- ${s.title}: ${s.content}`).join('\n')}
      
      Instructions:
      ${this.getBehavioralInstructions(request.role)}
    `;
  }
 
  private buildHRPrompt(request: AIRequest): string {
    return `
      Role: ${request.role === Role.INTERVIEWER ? 'HR Interviewer' : 'Candidate'}
      Company: ${request.company}
      Position: ${request.position}
      Interview Type: HR
      
      Previous Conversation:
      ${request.transcripts.map(t => `${t.role}: ${t.content}`).join('\n')}
      
      Personal Information:
      ${this.formatPersonalInfo(request.personalInfo)}
      
      Instructions:
      ${this.getHRInstructions(request.role)}
    `;
  }
 
  private getCodeInterviewInstructions(role: Role): string {
    return role === Role.INTERVIEWER
      ? 'Evaluate technical skills, problem-solving approach, and code quality.'
      : 'Demonstrate technical knowledge, explain approach, and write clean code.';
  }
 
  private getBehavioralInstructions(role: Role): string {
    return role === Role.INTERVIEWER
      ? 'Assess past experiences, leadership skills, and cultural fit.'
      : 'Share specific examples using the STAR method.';
  }
 
  private getHRInstructions(role: Role): string {
    return role === Role.INTERVIEWER
      ? 'Evaluate career goals, motivation, and company fit.'
      : 'Express interest in the role and company culture.';
  }
 
  private formatPersonalInfo(info: any): string {
    if (!info) return '';
    
    return `
      Name: ${info.name}
      Location: ${info.location}
      Education: ${info.education?.map(e => 
        `${e.degree} from ${e.school} (${e.date})`
      ).join(', ')}
      Experience: ${info.workExperiences?.map(w =>
        `${w.jobTitle} at ${w.company}`
      ).join(', ')}
    `;
  }
}

Gateway Implementation

Next, let's implement the WebSocket gateway:

// gateways/ai.gateway.ts
import {
  WebSocketGateway,
  WebSocketServer,
  SubscribeMessage,
  OnGatewayConnection,
  OnGatewayDisconnect,
} from '@nestjs/websockets';
import { UseGuards } from '@nestjs/common';
import { Server, Socket } from 'socket.io';
import { AIService } from '../services/ai.service';
import { WsAuthGuard } from '../guards/ws-auth.guard';
import { AIRequest } from '../interfaces';
 
@WebSocketGateway({
  cors: {
    origin: process.env.CLIENT_URL,
    credentials: true
  },
  namespace: '/ai'
})
@UseGuards(WsAuthGuard)
export class AIGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
 
  constructor(private readonly aiService: AIService) {}
 
  async handleConnection(client: Socket) {
    try {
      const userId = client.handshake.auth.userId;
      if (!userId) {
        client.disconnect();
        return;
      }
 
      await client.join(`user:${userId}`);
      console.log(`Client connected: ${userId}`);
    } catch (error) {
      console.error('Connection error:', error);
      client.disconnect();
    }
  }
 
  handleDisconnect(client: Socket) {
    const userId = client.handshake.auth.userId;
    if (userId) {
      console.log(`Client disconnected: ${userId}`);
    }
  }
 
  @SubscribeMessage('interview')
  async handleInterview(client: Socket, request: AIRequest) {
    try {
      // Validate user access
      const userId = client.handshake.auth.userId;
      if (!userId) {
        throw new Error('Unauthorized');
      }
 
      // Add session room
      await client.join(request.sessionId);
 
      // Process interview request
      await this.aiService.handleRequest(request);
    } catch (error) {
      client.emit('error', {
        message: error.message,
        sessionId: request.sessionId
      });
    }
  }
}

Module Configuration

Finally, configure the AI module:

// ai.module.ts
import { Module } from '@nestjs/common';
import { AIService } from './services/ai.service';
import { AIGateway } from './gateways/ai.gateway';
import { AuthModule } from '../auth/auth.module';
 
@Module({
  imports: [AuthModule],
  providers: [AIService, AIGateway],
  exports: [AIService]
})
export class AIModule {}

Client Usage Example

Here's how to use the AI service from a client:

import { io, Socket } from 'socket.io-client';
 
class AIClient {
  private socket: Socket;
  private handlers: Map<string, (response: any) => void> = new Map();
 
  constructor(url: string, auth: { userId: string; token: string }) {
    this.socket = io(`${url}/ai`, {
      auth,
      transports: ['websocket'],
      reconnection: true
    });
 
    this.setupEventHandlers();
  }
 
  private setupEventHandlers() {
    this.socket.on('connect', () => {
      console.log('Connected to AI service');
    });
 
    this.socket.on('response', (response) => {
      const handler = this.handlers.get(response.sessionId);
      if (handler) {
        handler(response);
      }
    });
 
    this.socket.on('error', (error) => {
      console.error('AI service error:', error);
    });
  }
 
  async startInterview(request: AIRequest, onResponse: (response: any) => void) {
    this.handlers.set(request.sessionId, onResponse);
    this.socket.emit('interview', request);
  }
 
  stopInterview(sessionId: string) {
    this.handlers.delete(sessionId);
  }
 
  disconnect() {
    this.socket.disconnect();
  }
}
 
// Usage example
const client = new AIClient('http://localhost:3000', {
  userId: 'user123',
  token: 'jwt-token'
});
 
const request = {
  transcripts: [],
  model: 'claude-3-5-sonnet',
  promptType: 'CODE',
  role: 'candidate',
  company: 'Tech Corp',
  position: 'Senior Developer',
  sessionId: 'session123'
};
 
client.startInterview(request, (response) => {
  if (response.type === 'chunk') {
    console.log('Received chunk:', response.content);
  } else if (response.type === 'done') {
    console.log('Interview complete');
  } else if (response.type === 'error') {
    console.error('Error:', response.error);
  }
});

Notes

  • Implement proper error handling
  • Use authentication guards
  • Handle reconnection scenarios
  • Manage WebSocket connections
  • Process responses in real-time
  • Clean up resources properly
  • Monitor service health
  • Implement rate limiting