728x90
반응형
chat service : 실시간 AI 채팅 서비스
채팅 AI 모델 ←gRPC→ SpringBoot 서버 ←WebSocket→ 클라이언트
- 채팅 AI 모델: Python, FastAPI
- 서버: Java, SpringBoot
- 클라이언트: React, TypeScript
기술 사용 이유
gRPC
- 언어에 독립적이고 protocol buffers로 직렬화하여 데이터 전송 속도가 빠름
- AI 모델과의 스트리밍 통신을 비동기로 처리하고, 응답을 실시간으로 전송
WebFlux + Netty
- 논블로킹과 병렬 처리 지원
- 비동기 처리해서 사용자가 여러 개의 요청을 보냈을 때 병렬로 처리해서 처리 속도를 높임
참고
데이터베이스에 채팅 내역 저장 X
AWS EKS에 배포
파일구조
build.gradle
buildscript {
ext {
protobufVersion = '3.25.1'
protobufPluginVersion = '0.8.14'
grpcVersion = '1.58.1'
}
}
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.5'
id 'io.spring.dependency-management' version '1.1.6'
id 'com.google.protobuf' version '0.9.4'
}
group = 'com.plantify'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "2023.0.3")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-reactor-netty'
testImplementation 'io.projectreactor:reactor-test'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
implementation 'javax.annotation:javax.annotation-api:1.3.2'
// grpc
implementation "com.google.protobuf:protobuf-java-util:3.25.1"
implementation 'com.google.protobuf:protobuf-java:4.28.2'
implementation 'net.devh:grpc-client-spring-boot-starter:2.15.0.RELEASE'
runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}"
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:${protobufVersion}"
}
clean {
delete generatedFilesBaseDir
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}
tasks.named('test') {
useJUnitPlatform()
}
application.yml
spring:
application:
name: chat
main:
web-application-type: reactive
web:
resources:
add-mappings: true
static-locations: classpath:/static/
grpc:
server:
address: chat-model.chat.svc.cluster.local:50052
proto
chat.proto
syntax = "proto3";
package chat;
option java_package = "com.plantify.pb.unit.chat";
option java_outer_classname = "ChatProto";
option java_multiple_files = true;
import "pb/svc/unit/common/msg.proto";
service ChatService {
rpc StreamMessage (ChatRequest) returns (stream ChatResponse);
}
message ChatRequest {
string message = 1;
string sender = 2;
}
message ChatResponse {
string reply = 1;
common.Status status = 2;
}
msg.proto
syntax = "proto3";
package common;
option java_package = "com.plantify.pb.unit.common";
option java_outer_classname = "MsgProto";
option java_multiple_files = true;
message Status {
int32 code = 1;
string message = 2;
}
config
GrpcConfig
@Configuration
public class GrpcConfig {
@Value("${grpc.server.address}")
private String grpcServerAddress;
private ManagedChannel channel;
@Bean
public ManagedChannel managedChannel() {
channel = ManagedChannelBuilder.forTarget(grpcServerAddress)
.usePlaintext()
.enableRetry()
.maxRetryAttempts(3)
.build();
return channel;
}
@Bean
public ChatServiceGrpc.ChatServiceStub chatServiceStub(ManagedChannel channel) {
return ChatServiceGrpc.newStub(channel);
}
@PreDestroy
public void shutdown() {
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
}
}
}
ObjectMapperConfig
@Configuration
public class ObjectMapperConfig {
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}
}
WebSocketConfig
@Configuration
@EnableWebFlux
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final ChatWebSocketHandler chatWebSocketHandler;
@Bean
public HandlerMapping webSocketMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/chat", chatWebSocketHandler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(-1);
mapping.setUrlMap(map);
return mapping;
}
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
domain
ChatMessage
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
private String sender;
private String message;
private MessageType type;
}
MessageType
public enum MessageType {
CHAT,
JOIN,
LEAVE,
ERROR
}
service
ChatService
public interface ChatService {
Flux<String> streamResponse(String userMessage);
}
ChatServiceImpl
@Slf4j
@Service
@RequiredArgsConstructor
public class ChatServiceImpl implements ChatService {
private final ChatServiceGrpc.ChatServiceStub chatServiceStub;
@Override
public Flux<String> streamResponse(String userMessage) {
ChatRequest request = ChatRequest.newBuilder()
.setMessage(userMessage)
.setSender("User")
.build();
log.info("gRPC request: {}", userMessage);
return Flux.create(sink -> {
StreamObserver<ChatResponse> responseObserver = new StreamObserver<>() {
@Override
public void onNext(ChatResponse response) {
if (!sink.isCancelled()) {
log.info("gRPC response: {}", response);
sink.next(response.getReply());
}
}
@Override
public void onError(Throwable t) {
if (!sink.isCancelled()) {
log.error("gRPC on error: {}", t.getMessage());
sink.error(t);
}
}
@Override
public void onCompleted() {
if (!sink.isCancelled()) {
log.info("gRPC onCompleted");
sink.complete();
}
}
};
try {
chatServiceStub.streamMessage(request, responseObserver);
log.info("gRPC request sent");
} catch (Exception e) {
log.error("Error sending gRPC request", e);
sink.error(e);
}
}, FluxSink.OverflowStrategy.BUFFER);
}
}
webSocket
ChatWebSocketHandler
@Slf4j
@Component
@RequiredArgsConstructor
public class ChatWebSocketHandler implements WebSocketHandler {
private final ChatService chatService;
private final ObjectMapper objectMapper;
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> incomingMessages = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.flatMap(payload -> handleMessage(payload, session))
.onErrorResume(e -> handleWebSocketError(session, e));
return session.send(incomingMessages);
}
private Flux<WebSocketMessage> handleMessage(String payload, WebSocketSession session) {
try {
log.info("Received payload: {}", payload);
ChatMessage userMessage = objectMapper.readValue(payload, ChatMessage.class);
return chatService.streamResponse(userMessage.getMessage())
.map(reply -> createWebSocketMessage("AI", reply, MessageType.CHAT, session))
.onErrorResume(e -> Flux.just(createWebSocketMessage("System", "Error in AI service", MessageType.ERROR, session)));
} catch (JsonProcessingException e) {
log.error("Invalid message format", e);
return Flux.just(createWebSocketMessage("System", "Invalid message format", MessageType.ERROR, session));
}
}
private Flux<WebSocketMessage> handleWebSocketError(WebSocketSession session, Throwable e) {
log.error("WebSocket error: ", e);
return Flux.just(createWebSocketMessage("System", "An error occurred: " + e.getMessage(), MessageType.ERROR, session));
}
private WebSocketMessage createWebSocketMessage(String sender, String message, MessageType type, WebSocketSession session) {
try {
ChatMessage chatMessage = ChatMessage.builder()
.sender(sender)
.message(message)
.type(type)
.build();
return session.textMessage(objectMapper.writeValueAsString(chatMessage));
} catch (JsonProcessingException e) {
log.error("Error serializing message", e);
return session.textMessage("{\"sender\":\"System\",\"message\":\"Critical error\",\"type\":\"ERROR\"}");
}
}
}
728x90
반응형
'💻 > Spring | SpringBoot' 카테고리의 다른 글
Spring Validation (0) | 2025.02.23 |
---|---|
Spring Webflux, WebClient (0) | 2025.02.23 |
스프링부트 개념정리(이론) (0) | 2024.04.29 |
[Spring Boot 3.x 를 이용한 RESTful Web Services 개발] 섹션 6. RESTful API 설계 가이드 (0) | 2024.04.29 |
[Spring Boot 3.x 를 이용한 RESTful Web Services 개발] 섹션 5. Java Persistence API 사용 (0) | 2024.04.29 |