실시간 AI 채팅 서비스 개요
사용자는 AI 챗봇과 1:1로 대화
- WebSocket 기반 실시간 통신
- 사용자가 연속으로 여러 요청을 보냄
- AI 응답을 토큰 단위로 스트리밍
- 하나의 연결이 수백 ms~수 초 동안 유지

상황: AI 추론이 늘어날수록 서버가 느려짐
기존 구조
Blocking I/O 기반 구조
Client
↔ WebSocket
Spring MVC (Tomcat)
→ RestTemplate (Blocking)
→ FastAPI (AI Inference)
- Spring MVC + Tomcat 환경에서 WebSocket으로 클라이언트와 실시간 채팅
- AI 응답은 `RestTemplate`을 통해 FastAPI 서버에 동기 HTTP(JSON) 요청으로 받아옴
=> AI 응답 기다리는 동안 WebSocket 연결 하나가 서버 리소스를 오래 점유
문제
비동기처럼 보이지만 Blocking 구조
응답 지연 줄이기 위해 `CompletableFuture.runAsync` 사용
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(500); // 불필요한 블로킹 호출
ChatMessage aiResponse = ChatMessage.builder()
.sender("AI")
.message(chatService.generateResponse(chatMessage.getMessage()))
.type(MessageType.CHAT)
.build();
messagingTemplate.convertAndSend("/topic/public", aiResponse);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
- `Thread.sleep`으로 명시적인 블로킹 발생
- 별도의 스레드 풀 관리 없이 요청 수만큼 작업 생성
- 내부에서 호출하는 AI 응답 생성 로직은 여전히 동기 HTTP 호출
=> 스레드 사용량 감소 X, 컨텍스트 스위칭 비용만 증가
AI 응답 지연이 서버 병목으로 이어짐
`RestTemplate`을 사용한 동기 HTTP 호출
AI 응답 시간 길어질수록,
- 요청을 처리하던 Tomcat worker thread가 점점 고갈
- 새로운 WebSocket 요청은 대기 상태에 빠짐
- 동시에 여러 사용자가 접속 -> 서버 전체가 눈에 띄게 느려짐
해결: 논블로킹 + 스트리밍 구조
Client
↔ WebSocket
Spring WebFlux (Netty)
→ gRPC
→ FastAPI (AI Inference)
WebFlux + Netty 도입
Spring MVC + Tomcat에서 Spring WebFlux + Netty 기반 논블로킹 구조로 변경
- Worker thread 점유 제거
- 요청 대기 중 스레드 점유 X
- 이벤트 루프 기반 처리
- WebSocket 연결 수 증가에도 안정적인 처리
*Spring Webflux, WebClient 참고
Spring Webflux, WebClient
Blocking 방식요청한 작업이 끝날 때까지 다른 작업 X 기다림= 하나의 Thread 요청 처리 → 처리함수 실행하면 제어권 함께 넘김⇒ 해당 함수가 끝날 때까지 다른 함수 호출 XNon-Blocking 방식요청한 작
debug.tistory.com
AI 통신을 gRPC 스트리밍으로 변경
HTTP 통신에서 gRPC 기반 비동기 스트리밍으로 변경
- `RestTemplate` 제거
- HTTP/2 기반 스트리밍
- JSON -> Protocol Buffers 사용 => 페이로드 크기 ↓
- 토큰 단위 스트리밍 응답
*gRPC 참고
gRPC
등장배경1. Server-Client Model하나의 메인 프레임에서 동작하는 Monolothic 구조로 설계 시절 네트워크 통신 중요 X고가인 메인 프레임워크 → 비교적 저가의 워크스테이션 서버로 대체하고 싶어함But,
debug.tistory.com
gRPC 실습(연습)
파일 구조build.gradlebuildscript { ext { protobufVersion = '3.25.1' protobufPluginVersion = '0.8.14' grpcVersion = '1.58.1' }}plugins { id 'java' id 'org.springframework.boot' version '3.3.4' id 'io.spring.dependency-management' version '1.1.6' id 'com
debug.tistory.com
시퀀스 다이어그램

코드 확인
`ChatMessage`
ChatMessasge
- sender
- message
- type(CHAT, JOIN, LEAVE, ERROR)
`GrpcConfig`
@Configuration
public class GrpcConfig {
@Value("${grpc.server.address}")
private String grpcServerAddress;
private ManagedChannel channel;
@Bean
public ManagedChannel managedChannel() {
channel = ManagedChannelBuilder.forTarget(grpcServerAddress)
.usePlaintext()
.enableRetry()
.maxRetryAttempts(3)
.build();
return channel;
}
@Bean
public ChatServiceGrpc.ChatServiceStub chatServiceStub(ManagedChannel channel) {
return ChatServiceGrpc.newStub(channel);
}
@PreDestroy
public void shutdown() {
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
}
}
}
`ChatServiceImpl` 중 `streamResponse` 메서드
gRPC 스트리밍 -> Flux 변환해 WebSocket으로 바로 전달할 수 있는 리액티브 스트림
=> AI 응답을 모두 생성한 뒤 전달하는 방식 X 토큰이 생성되는 즉시 클라이언트 전송 O
-> AI 추론 시간이 길어지더라도 응답 생성과 전송이 분리되어 서버 요청 흐름이 지연 X
@Slf4j
@Service
@RequiredArgsConstructor
public class ChatServiceImpl implements ChatService {
private final ChatServiceGrpc.ChatServiceStub chatServiceStub;
@Override
public Flux<String> streamResponse(String userMessage) {
ChatRequest request = ChatRequest.newBuilder()
.setMessage(userMessage)
.setSender(SenderType.USER.name())
.build();
return Flux.create(sink -> {
StreamObserver<ChatResponse> responseObserver = new StreamObserver<>() {
@Override
public void onNext(ChatResponse response) {
if (!sink.isCancelled()) {
sink.next(response.getReply());
}
}
@Override
public void onError(Throwable t) {
if (!sink.isCancelled()) {
log.error("gRPC on error: {}", t.getMessage());
sink.error(t);
}
}
@Override
public void onCompleted() {
log.info("gRPC stream completed");
sink.complete();
}
};
chatServiceStub
.withDeadlineAfter(5, TimeUnit.SECONDS)
.streamMessage(request, responseObserver);
}, FluxSink.OverflowStrategy.BUFFER);
}
}
`ChatWebSocketHandler`
- 클라이언트가 보낸 각 WebSocket 메시지마다 AI gRPC 스트리밍 응답 생성
/ `flatMap` ~> 여러 응답 스트림을 비동기로 처리 - 메시지 파싱 or AI 응답 처리 중 발생한 예외 -> 메시지 단위로 처리
- WebSocket 연결 유지한 채 에러 메시지를 클라이언트로 전달
- 단, WebSocket 스트림 자체에서 발생한 예외 -> 에러 메시지 전송한 뒤 연결 종료
- `ChatMessageMapper`: ChatMessage <-> JSON
- `WebSocketMessageFactor`: ChatMessage -> WebSocketMessage
- `ChatWebSocketHandler`: 흐름만
<`flatMap`>
각 메시지에 대한 비동기 스트림 동시에 활성화, 그 결과를 순서 상관 X downstream으로 보냄
IF. 순서 보장 필요 -> `concatMap` or `flatMapSequential` 사용
@Slf4j
@Component
@RequiredArgsConstructor
public class ChatWebSocketHandler implements WebSocketHandler {
private final ChatService chatService;
private final ChatMessageMapper messageMapper;
private final WebSocketMessageFactory messageFactory;
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> outgoing =
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(payload -> handleMessage(payload, session))
.onErrorResume(e ->
Flux.just(
messageFactory.error(
session,
"WebSocket error occurred"
)
)
);
return session.send(outgoing);
}
private Flux<WebSocketMessage> handleMessage(String payload, WebSocketSession session) {
return Mono.fromCallable(() -> messageMapper.fromJson(payload))
.flatMapMany(userMessage ->
chatService.streamResponse(userMessage.getMessage())
.map(reply ->
messageFactory.chat(
session,
SenderType.AI,
reply
)
)
)
.onErrorResume(e -> {
log.error("Message handling error", e);
return Flux.just(
messageFactory.error(
session,
"Invalid message or AI service error"
)
);
});
}
}
Proto 파일
`msg`
syntax = "proto3";
package common;
option java_package = "com.plantify.pb.unit.common";
option java_outer_classname = "MsgProto";
option java_multiple_files = true;
message Status {
int32 code = 1;
string message = 2;
}
`chat`
syntax = "proto3";
package chat;
option java_package = "com.plantify.pb.unit.chat";
option java_outer_classname = "ChatProto";
option java_multiple_files = true;
import "pb/svc/unit/common/msg.proto";
service ChatService {
rpc StreamMessage (ChatRequest) returns (stream ChatResponse);
}
message ChatRequest {
string message = 1;
string sender = 2;
}
message ChatResponse {
string reply = 1;
common.Status status = 2;
}
참고: Kafka를 사용하지 않은 이유
Kafka도 비동기 처리지만,
- 메시지 브로커를 경유하며 발생하는 지연
- polling 기반 consumer 관리 필요
- 요청-응답 기반 채팅 UX에는 부적합
=> 수십만 명 규모의 오픈 채팅 or 로그성 메시지라면 Kafka가 적합
But, AI 채팅처럼 즉각적인 응답이 필요한 경우에는 gRPC가 더 적합
깃허브
플랜티파이
플랜티파이 has 21 repositories available. Follow their code on GitHub.
github.com
GitHub - hk-plantify/chat-service: [Java] 채팅 AI 모델 ←gRPC→ SpringBoot 서버 ←WebSocket→ 클라이언트
[Java] 채팅 AI 모델 ←gRPC→ SpringBoot 서버 ←WebSocket→ 클라이언트 - hk-plantify/chat-service
github.com