TokyoAJ

도쿄아재

SPRINGBOOT 2025.04.10

[Kafka] Kafka + Spring Boot + TimescaleDB 연동 및 테스트 환경 구축

개요

대량 이메일 발송 시스템에서 “언제, 누구에게, 어떤 템플릿으로 메일이 전송되었는지” 로그를 남기는 건 매우 중요합니다.

이번 글에서는 Kafka를 통해 이메일 전송 결과 메시지를 비동기 처리하고,

Spring Boot + TimescaleDB로 시계열 기반 전송 로그를 저장하는 시스템을 구축해봅니다.



목차

  1. 프로젝트 구조
  2. Docker로 TimescaleDB 실행
  3. Kafka 연동 구조 및 메시지 예시
  4. Spring Boot + TimescaleDB + Kafka 설정
  5. 전체 소스코드 설명 (Entity, Repository, Service, KafkaListener 등)
  6. 테스트 방법 (curl + Kafka 메시지 시뮬레이션)
  7. 마무리 및 확장 아이디어


1. 프로젝트 구조

src
├── main
│ ├── java
│ │ └── com.example.email
│ │ ├── controller
│ │ │ └── EmailLogController.java # 테스트용 REST API (선택)
│ │ ├── domain
│ │ │ └── EmailLog.java # Entity (JPA + TimescaleDB)
│ │ ├── kafka
│ │ │ └── EmailLogConsumer.java # Kafka Consumer 리스너
│ │ ├── repository
│ │ │ └── EmailLogRepository.java # JPA Repository
│ │ ├── service
│ │ │ └── EmailLogService.java # 비즈니스 로직
│ │ └── EmailApplication.java # SpringBootApplication 진입점
│ └── resources
│ └── application.yml # Redis, Kafka, DB 설정


2. Docker로 TimescaleDB 실행

docker-compose.yml

version: '3.8'

services:
timescaledb:
image: timescale/timescaledb:latest-pg14
container_name: timescaledb
ports:
- "5432:5432"
environment:
POSTGRES_DB: email_logs
POSTGRES_USER: user
POSTGRES_PASSWORD: password
volumes:
- timescale_data:/var/lib/postgresql/data

volumes:
timescale_data:

실행

docker-compose up -d



3. Kafka 연동 구조

시스템 아키텍처


Kafka 메시지 예시 (JSON)

{
"templateId": "welcome_email",
"email": "user@example.com",
"status": "SUCCESS"
}


4. Spring Boot 설정

build.gradle 의존성

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.postgresql:postgresql'
}


application.yml

spring:
datasource:
url: jdbc:postgresql://localhost:5432/email_logs
username: user
password: password
jpa:
hibernate:
ddl-auto: update
properties:
hibernate.dialect: org.hibernate.dialect.PostgreSQLDialect
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: email-log-consumer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer



5. 전체 소스코드 및 주석 설명

5.1. Entity – EmailLog.java

package com.example.email.domain;

import jakarta.persistence.*;
import java.time.ZonedDateTime;

/**
* 이메일 전송 로그를 기록할 TimescaleDB Entity
*/
@Entity
@Table(name = "email_log")
public class EmailLog {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

private String templateId;

private String email;

private ZonedDateTime sentAt;

@Enumerated(EnumType.STRING)
private Status status;

public enum Status {
SUCCESS,
FAILURE
}

// Getters & Setters 생략
}


5.2. Repository – EmailLogRepository.java

package com.example.email.repository;

import com.example.email.domain.EmailLog;
import org.springframework.data.jpa.repository.JpaRepository;

public interface EmailLogRepository extends JpaRepository<EmailLog, Long> {
}


5.3. Service – EmailLogService.java

package com.example.email.service;

import com.example.email.domain.EmailLog;
import com.example.email.repository.EmailLogRepository;
import org.springframework.stereotype.Service;

import java.time.ZonedDateTime;

@Service
public class EmailLogService {

private final EmailLogRepository repository;

public EmailLogService(EmailLogRepository repository) {
this.repository = repository;
}

public EmailLog log(String templateId, String email, boolean isSuccess) {
EmailLog log = new EmailLog();
log.setTemplateId(templateId);
log.setEmail(email);
log.setSentAt(ZonedDateTime.now());
log.setStatus(isSuccess ? EmailLog.Status.SUCCESS : EmailLog.Status.FAILURE);
return repository.save(log);
}
}


5.4. Kafka Consumer – EmailLogConsumer.java

package com.example.email.kafka;

import com.example.email.service.EmailLogService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;

/**
* Kafka로부터 이메일 전송 결과 메시지를 수신하여 로그 저장
*/
@Component
public class EmailLogConsumer {

private final EmailLogService service;
private final ObjectMapper objectMapper = new ObjectMapper();

public EmailLogConsumer(EmailLogService service) {
this.service = service;
}

@KafkaListener(topics = "email-sent-log")
public void listen(ConsumerRecord<String, String> record) {
try {
Map<String, String> message = objectMapper.readValue(record.value(), Map.class);
String templateId = message.get("templateId");
String email = message.get("email");
String status = message.get("status");

boolean isSuccess = "SUCCESS".equalsIgnoreCase(status);
service.log(templateId, email, isSuccess);

} catch (Exception e) {
e.printStackTrace(); // 예외 로깅
}
}
}


5.5. Controller (테스트용) – EmailLogController.java

package com.example.email.controller;

import com.example.email.domain.EmailLog;
import com.example.email.repository.EmailLogRepository;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("/logs")
public class EmailLogController {

private final EmailLogRepository repository;

public EmailLogController(EmailLogRepository repository) {
this.repository = repository;
}

@GetMapping
public List<EmailLog> getAll() {
return repository.findAll();
}
}



6. 테이블 구조 및 하이퍼테이블 생성 (TimescaleDB 전용)

TimescaleDB는 PostgreSQL 기반이지만, 시간 기반 데이터 처리를 위해 hypertable이라는 시계열 전용 테이블을 사용합니다.

이메일 전송 로그를 저장하려면 다음 SQL을 사용하여 기본 테이블을 생성하고, 하이퍼테이블로 전환해야 합니다.


SQL 예시

-- TimescaleDB 확장 모듈 활성화 (DB에서 1회만 수행)
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- email_log 테이블 생성
CREATE TABLE email_log (
id SERIAL PRIMARY KEY,
template_id TEXT NOT NULL,
email TEXT NOT NULL,
sent_at TIMESTAMPTZ NOT NULL,
status TEXT
);

-- 하이퍼테이블로 전환 (sent_at 컬럼을 시계열 기준 컬럼으로 설정)
SELECT create_hypertable('email_log', 'sent_at');


설명

컬럼명설명
id기본 키 (자동 증가)
template_id메일 템플릿 구분값
email수신자 이메일 주소
sent_at전송 시각 (TIMESTAMPTZ)
status전송 상태 (SUCCESS, FAILURE)

create_hypertable은 반드시 sent_at과 같은 시간 기반 컬럼을 기준으로 지정해야 합니다.

이 설정을 통해 TimescaleDB는 내부적으로 자동 파티셔닝과 성능 최적화를 수행하게 됩니다.


7. 테스트 방법

7.1. Kafka 메시지 수동 전송

Kafka CLI를 사용해서 테스트 메시지를 전송할 수 있습니다:

kafka-console-producer.sh --topic email-sent-log --bootstrap-server localhost:9092

입력 예시:
{"templateId":"promo_2024", "email":"test@domain.com", "status":"SUCCESS"}


2. 로그 확인 (REST API)

curl http://localhost:8080/logs



8. 마무리 및 확장 아이디어

이 시스템은 Kafka로부터 비동기 이메일 전송 결과를 수신하고,

TimescaleDB에 시계열 로그로 저장하는 확장 가능한 로그 처리 구조입니다.


확장 가능성

  1. Grafana를 연동하여 로그 시각화
  2. Kafka 메시지 유효성 검증 및 필터링 로직 추가
  3. 실패 로그를 별도 DB나 파일에 백업
  4. 하이퍼테이블 기반 집계 및 알람 기능 구현


댓글