Implementing AI Service with WebSocket
- Published on
- Published on
- /6 mins read/---
Core Service Implementation
First, let's implement the core AI service:
// services/ai.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';
import { AIRequest, AIResponse, ModelType, Role } from '../interfaces';
import { AIServiceError } from '../errors/ai-service.error';
@Injectable()
export class AIService implements OnModuleInit {
@WebSocketServer()
private server: Server;
private activeConnections: Map<string, Set<string>> = new Map();
private modelInstances: Map<string, any> = new Map();
async onModuleInit() {
// Initialize AI models
await this.initializeModels();
}
private async initializeModels() {
// Initialize different AI models
const models = {
'gpt-4': await this.initializeGPT4(),
'claude-3-5-sonnet': await this.initializeClaude(),
};
Object.entries(models).forEach(([key, instance]) => {
this.modelInstances.set(key, instance);
});
}
async handleRequest(request: AIRequest): Promise<void> {
try {
// Validate request
this.validateRequest(request);
// Get AI model instance
const model = this.modelInstances.get(request.model);
if (!model) {
throw new AIServiceError('MODEL_NOT_FOUND', 'AI model not available');
}
// Process request based on type
switch (request.promptType) {
case 'CODE':
await this.handleCodeInterview(request, model);
break;
case 'BQ':
await this.handleBehavioralInterview(request, model);
break;
case 'HR':
await this.handleHRInterview(request, model);
break;
default:
throw new AIServiceError('INVALID_TYPE', 'Invalid interview type');
}
} catch (error) {
this.handleError(request.sessionId, error);
}
}
private async handleCodeInterview(request: AIRequest, model: any) {
const prompt = this.buildCodeInterviewPrompt(request);
await this.streamResponse(request.sessionId, model, prompt);
}
private async handleBehavioralInterview(request: AIRequest, model: any) {
const prompt = this.buildBehavioralPrompt(request);
await this.streamResponse(request.sessionId, model, prompt);
}
private async handleHRInterview(request: AIRequest, model: any) {
const prompt = this.buildHRPrompt(request);
await this.streamResponse(request.sessionId, model, prompt);
}
private async streamResponse(
sessionId: string,
model: any,
prompt: string
): Promise<void> {
try {
const stream = await model.createStream(prompt);
for await (const chunk of stream) {
const response: AIResponse = {
type: 'chunk',
responseType: 'mock',
content: chunk,
originalTranscriptId: sessionId,
sessionId
};
this.server.to(sessionId).emit('response', response);
}
// Send completion message
this.server.to(sessionId).emit('response', {
type: 'done',
responseType: 'mock',
sessionId,
originalTranscriptId: sessionId
});
} catch (error) {
this.handleError(sessionId, error);
}
}
private validateRequest(request: AIRequest): void {
const requiredFields = [
'transcripts',
'model',
'promptType',
'role',
'company',
'position',
'sessionId'
];
const missingFields = requiredFields.filter(
field => !request[field]
);
if (missingFields.length > 0) {
throw new AIServiceError(
'INVALID_REQUEST',
`Missing required fields: ${missingFields.join(', ')}`
);
}
}
private handleError(sessionId: string, error: any): void {
const response: AIResponse = {
type: 'error',
responseType: 'mock',
error: error.message,
sessionId,
originalTranscriptId: sessionId
};
this.server.to(sessionId).emit('response', response);
}
private buildCodeInterviewPrompt(request: AIRequest): string {
return `
Role: ${request.role === Role.INTERVIEWER ? 'Technical Interviewer' : 'Candidate'}
Company: ${request.company}
Position: ${request.position}
Interview Type: Technical/Coding
Previous Conversation:
${request.transcripts.map(t => `${t.role}: ${t.content}`).join('\n')}
Instructions:
${this.getCodeInterviewInstructions(request.role)}
`;
}
private buildBehavioralPrompt(request: AIRequest): string {
return `
Role: ${request.role === Role.INTERVIEWER ? 'Behavioral Interviewer' : 'Candidate'}
Company: ${request.company}
Position: ${request.position}
Interview Type: Behavioral
Previous Conversation:
${request.transcripts.map(t => `${t.role}: ${t.content}`).join('\n')}
Candidate Stories:
${request.stories?.map(s => `- ${s.title}: ${s.content}`).join('\n')}
Instructions:
${this.getBehavioralInstructions(request.role)}
`;
}
private buildHRPrompt(request: AIRequest): string {
return `
Role: ${request.role === Role.INTERVIEWER ? 'HR Interviewer' : 'Candidate'}
Company: ${request.company}
Position: ${request.position}
Interview Type: HR
Previous Conversation:
${request.transcripts.map(t => `${t.role}: ${t.content}`).join('\n')}
Personal Information:
${this.formatPersonalInfo(request.personalInfo)}
Instructions:
${this.getHRInstructions(request.role)}
`;
}
private getCodeInterviewInstructions(role: Role): string {
return role === Role.INTERVIEWER
? 'Evaluate technical skills, problem-solving approach, and code quality.'
: 'Demonstrate technical knowledge, explain approach, and write clean code.';
}
private getBehavioralInstructions(role: Role): string {
return role === Role.INTERVIEWER
? 'Assess past experiences, leadership skills, and cultural fit.'
: 'Share specific examples using the STAR method.';
}
private getHRInstructions(role: Role): string {
return role === Role.INTERVIEWER
? 'Evaluate career goals, motivation, and company fit.'
: 'Express interest in the role and company culture.';
}
private formatPersonalInfo(info: any): string {
if (!info) return '';
return `
Name: ${info.name}
Location: ${info.location}
Education: ${info.education?.map(e =>
`${e.degree} from ${e.school} (${e.date})`
).join(', ')}
Experience: ${info.workExperiences?.map(w =>
`${w.jobTitle} at ${w.company}`
).join(', ')}
`;
}
}
Gateway Implementation
Next, let's implement the WebSocket gateway:
// gateways/ai.gateway.ts
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { UseGuards } from '@nestjs/common';
import { Server, Socket } from 'socket.io';
import { AIService } from '../services/ai.service';
import { WsAuthGuard } from '../guards/ws-auth.guard';
import { AIRequest } from '../interfaces';
@WebSocketGateway({
cors: {
origin: process.env.CLIENT_URL,
credentials: true
},
namespace: '/ai'
})
@UseGuards(WsAuthGuard)
export class AIGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
constructor(private readonly aiService: AIService) {}
async handleConnection(client: Socket) {
try {
const userId = client.handshake.auth.userId;
if (!userId) {
client.disconnect();
return;
}
await client.join(`user:${userId}`);
console.log(`Client connected: ${userId}`);
} catch (error) {
console.error('Connection error:', error);
client.disconnect();
}
}
handleDisconnect(client: Socket) {
const userId = client.handshake.auth.userId;
if (userId) {
console.log(`Client disconnected: ${userId}`);
}
}
@SubscribeMessage('interview')
async handleInterview(client: Socket, request: AIRequest) {
try {
// Validate user access
const userId = client.handshake.auth.userId;
if (!userId) {
throw new Error('Unauthorized');
}
// Add session room
await client.join(request.sessionId);
// Process interview request
await this.aiService.handleRequest(request);
} catch (error) {
client.emit('error', {
message: error.message,
sessionId: request.sessionId
});
}
}
}
Module Configuration
Finally, configure the AI module:
// ai.module.ts
import { Module } from '@nestjs/common';
import { AIService } from './services/ai.service';
import { AIGateway } from './gateways/ai.gateway';
import { AuthModule } from '../auth/auth.module';
@Module({
imports: [AuthModule],
providers: [AIService, AIGateway],
exports: [AIService]
})
export class AIModule {}
Client Usage Example
Here's how to use the AI service from a client:
import { io, Socket } from 'socket.io-client';
class AIClient {
private socket: Socket;
private handlers: Map<string, (response: any) => void> = new Map();
constructor(url: string, auth: { userId: string; token: string }) {
this.socket = io(`${url}/ai`, {
auth,
transports: ['websocket'],
reconnection: true
});
this.setupEventHandlers();
}
private setupEventHandlers() {
this.socket.on('connect', () => {
console.log('Connected to AI service');
});
this.socket.on('response', (response) => {
const handler = this.handlers.get(response.sessionId);
if (handler) {
handler(response);
}
});
this.socket.on('error', (error) => {
console.error('AI service error:', error);
});
}
async startInterview(request: AIRequest, onResponse: (response: any) => void) {
this.handlers.set(request.sessionId, onResponse);
this.socket.emit('interview', request);
}
stopInterview(sessionId: string) {
this.handlers.delete(sessionId);
}
disconnect() {
this.socket.disconnect();
}
}
// Usage example
const client = new AIClient('http://localhost:3000', {
userId: 'user123',
token: 'jwt-token'
});
const request = {
transcripts: [],
model: 'claude-3-5-sonnet',
promptType: 'CODE',
role: 'candidate',
company: 'Tech Corp',
position: 'Senior Developer',
sessionId: 'session123'
};
client.startInterview(request, (response) => {
if (response.type === 'chunk') {
console.log('Received chunk:', response.content);
} else if (response.type === 'done') {
console.log('Interview complete');
} else if (response.type === 'error') {
console.error('Error:', response.error);
}
});
Notes
- Implement proper error handling
- Use authentication guards
- Handle reconnection scenarios
- Manage WebSocket connections
- Process responses in real-time
- Clean up resources properly
- Monitor service health
- Implement rate limiting