밤빵's 개발일지
[TIL]20240827 Spring batch 본문
나는 팀 프로젝트를 이어가고있기 때문에 다른 사람들이 하는 개인프로젝트에 관심이 많아서 이것저것 물어보고 다니고 있다. 그렇게 개인프로젝트 내용들을 들으면서 스케줄러와 같은 여러 키워드를 알게됐는데 Spring batch라는게 대규모 데이터 처리를 위해 설계된 프레임워크란걸 알게되서 오늘 개발일지 소재로 쓰게되었다.
Spring Batch는 대용량 데이터 처리에 최적화된 스프링 프레임워크의 서브 프로젝트로, 배치 애플리케이션을 개발하기 위한 도구이다. 배치 애플리케이션은 대량의 데이터를 백그라운드에서 처리하는 작업으로, 금융 결산, 데이터 마이그레이션, 대규모 데이터 분석 등 다양한 분야에서 사용된다.
▶ Spring Batch란 ?
Spring Batch는 스프링 프레임워크를 기반으로 하는 배치 처리 프레임워크로, 대규모 데이터 처리를 안정적이고 효율적으로 수행할 수 있도록 설계되었다. Spring Batch는 데이터 처리의 시작과 끝을 정의하고, 데이터 읽기/쓰기, 에러 처리, 재시도, 트랜잭션 관리 등의 기능을 제공한다. 이를 통해 배치 작업을 빠르고 쉽게 구현할 수 있다.
▶ Spring Batch의 주요 개념
Spring Batch는 배치 처리를 정의하는 Job(작업)과 Step(단계)이라는 두 가지 기본 개념을 기반으로 동작한다.
→ Job:
Job은 배치 프로세스의 최상위 엔터티로, 배치 애플리케이션의 작업 단위를 정의하고, Job은 여러 개의 Step으로 구성되고, 각 Step은 순차적 또는 병렬적으로 실행된다. Job은 JobBuilder를 통해 정의할 수 있으며, Job의 실행은 JobLauncher를 통해 시작된다.
→ Step:
Step은 배치 Job의 구성 요소로, Job이 실행되는 동안 수행해야 할 특정한 작업을 정의한다. Step은 주로 ItemReader, ItemProcessor, ItemWriter 세 가지 구성 요소로 이루어져 있으며, 각 Step은 트랜잭션 내에서 처리된다.
→ Chunk 기반 처리:
Spring Batch는 대량의 데이터를 처리할 때 메모리 효율성을 높이기 위해 청크(Chunk) 기반 처리를 사용한다. 데이터를 청크 단위로 읽고, 처리하고, 쓰는 방식을 통해 메모리 사용을 최적화한다. 예를 들어, 한 번에 100개의 데이터를 읽고 처리한 후, 트랜잭션을 커밋하는 방식으로 처리량을 조절할 수 있다.
▶ Spring Batch의 주요 구성 요소
Spring Batch는 배치 애플리케이션을 효과적으로 설계하고 관리할 수 있는 다양한 구성 요소를 제공한다.
→ ItemReader:
ItemReader는 배치 처리의 입력을 담당하는 인터페이스로, 데이터를 데이터베이스, 파일, 큐 등에서 읽어온다. JdbcCursorItemReader, FlatFileItemReader, JpaPagingItemReader 등 다양한 구현체가 제공되며, 각 데이터 소스에 맞게 선택하여 사용할 수 있다.
→ ItemProcessor:
ItemProcessor는 읽어온 데이터를 가공하고 변환하는 로직을 처리하는 인터페이스로, 예를 들어, 데이터 유효성 검증, 필터링, 형식 변환 등을 수행할 수 있다. ItemProcessor를 구현하여 필요한 데이터 가공 로직을 작성할 수 있다.
→ ItemWriter:
ItemWriter는 처리된 데이터를 최종적으로 출력하는 인터페이스로, 데이터베이스에 저장하거나 파일로 쓰는 등의 역할을 한다. JdbcBatchItemWriter, FlatFileItemWriter, JpaItemWriter 등의 구현체를 제공하며, 데이터 소스에 맞는 Writer를 사용할 수 있다.
→ JobLauncher:
JobLauncher는 배치 Job을 실행하는 역할을 하는 인터페이스로, 보통 Spring의 스케줄러와 함께 사용되며, 특정 시간이나 이벤트에 따라 Job을 실행할 수 있다.
→ JobRepository:
JobRepository는 배치 작업의 메타데이터와 상태를 저장하고 관리하는 저장소 역할을 하고, Job이 실행될 때마다 실행 정보가 JobRepository에 저장되며, 이를 통해 Job의 상태, 진행 상황, 실패 원인 등을 추적할 수 있다.
→ JobParameters:
JobParameters는 Job 실행 시 동적으로 전달되는 파라미터를 정의하고, 파라미터는 Job의 실행에 필요한 입력값으로, Job마다 서로 다른 파라미터를 설정하여 실행할 수 있다.
▶ Spring Batch의 동작 원리
Spring Batch의 동작 원리는 다음과 같다:
→ Job 실행:
배치 애플리케이션이 시작되면, JobLauncher는 정의된 Job을 실행하고, Job은 JobRepository를 통해 실행 정보를 저장하고서, 각 Step을 순차적으로 실행한다.
→ Step 실행:
Job이 실행되면, 각 Step이 순차적으로 또는 병렬적으로 실행된다. Step은 ItemReader, ItemProcessor, ItemWriter를 순서대로 호출하여 데이터를 처리한다.
→ ItemReader - ItemProcessor - ItemWriter 흐름:
ItemReader가 데이터를 읽어오면, ItemProcessor는 데이터를 가공하고 변환하고, 변환된 데이터는 ItemWriter에 전달되어 최종적으로 데이터베이스나 파일에 기록된다. 각 Step은 트랜잭션 경계 내에서 실행되고, 데이터 처리 도중 오류가 발생할 경우 롤백되어 데이터의 일관성이 유지된다.
→ Job 상태 관리:
Job과 Step의 실행 상태는 JobRepository에 저장되고, 실행 결과(성공, 실패, 재시도 등)는 로그와 함께 관리된다. 이 정보를 통해 배치 작업의 상태를 모니터링하고, 실패한 작업을 재시도할 수 있다.
▶ Spring Batch 설정 예시 ①
▽ Spring Batch를 사용하여 파일의 데이터를 읽고, 처리하고, 데이터베이스에 저장하는 간단한 배치 작업을 설정
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.builder.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
@Configuration
@EnableBatchProcessing // Spring Batch 기능 활성화
public class BatchConfig {
@Bean
public FlatFileItemReader<Person> reader() { // 파일에서 데이터를 읽어오는 Reader 정의
FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("sample-data.csv")); // 읽어올 파일 지정 (클래스 경로)
reader.setLineMapper(new DefaultLineMapper<>() {{ // 파일 데이터를 객체로 매핑
setLineTokenizer(new DelimitedLineTokenizer() {{ // CSV 파일의 각 열을 토큰으로 분리
setNames("firstName", "lastName"); // CSV 파일의 헤더 정의
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<>() {{ // 각 필드를 Person 객체에 매핑
setTargetType(Person.class); // 매핑 대상 객체 타입 설정
}});
}});
return reader;
}
@Bean
public ItemProcessor<Person, Person> processor() { // 데이터를 처리하는 Processor 정의
return person -> {
// 이름과 성을 대문자로 변환 (간단한 처리 로직)
person.setFirstName(person.getFirstName().toUpperCase());
person.setLastName(person.getLastName().toUpperCase());
return person;
};
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) { // 처리된 데이터를 데이터베이스에 쓰는 Writer 정의
JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); // SQL 파라미터 제공
writer.setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)"); // 데이터베이스에 삽입할 SQL 쿼리
writer.setDataSource(dataSource); // 데이터 소스 설정 (JDBC)
return writer;
}
@Bean
public Job importUserJob(JobBuilderFactory jobBuilderFactory, Step step1) { // Job 설정 (Step들의 모음)
return jobBuilderFactory.get("importUserJob") // Job 이름 설정
.incrementer(new RunIdIncrementer()) // Job 실행 ID 자동 증가 설정
.flow(step1) // Job이 실행할 Step 지정
.end() // Job의 끝을 명시
.build();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, FlatFileItemReader<Person> reader,
ItemProcessor<Person, Person> processor, JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory.get("step1") // Step 이름 설정
.<Person, Person>chunk(10) // 청크(Chunk) 크기 설정 (한 번에 10개씩 처리)
.reader(reader) // Reader 설정
.processor(processor) // Processor 설정
.writer(writer) // Writer 설정
.build();
}
}
→ FlatFileItemReader는 CSV 파일을 읽는 역할을 하고, DefaultLineMapper를 사용하여 각 줄을 Person 객체로 매핑한다.
→ ItemProcessor는 데이터를 처리하는 로직을 정의하며, 이름을 대문자로 변환하는 간단한 작업을 수행한다.
→ JdbcBatchItemWriter는 처리된 데이터를 데이터베이스에 삽입한다.
→ Job과 Step은 JobBuilderFactory와 StepBuilderFactory를 사용하여 정의되며, Job은 하나 이상의 Step을 포함한다.
→ @EnableBatchProcessing 애너테이션을 사용하여 Spring Batch를 활성화하고, 배치 구성 요소를 자동으로 설정한다.
▶ Spring Batch 사용 시 주의사항
→ 트랜잭션 관리:
Spring Batch는 기본적으로 트랜잭션 경계를 설정하여 데이터의 일관성을 보장한다. 하지만 트랜잭션 범위를 잘못 설정하면 데이터 무결성이 깨질 수 있으므로 주의해야 한다.
→ 성능 최적화:
대량의 데이터를 처리할 때는 Chunk 크기, 스레드 풀 크기, 데이터베이스 커넥션 풀 등을 적절히 조정하여 성능을 최적화해야 한다.필요한 경우 파티셔닝(Partitioning) 기법을 사용하여 병렬 처리를 할 수 있다.
→ 재시도 및 실패 처리:
배치 작업 중 일부 데이터가 실패할 수 있으므로, 재시도 로직과 오류 처리 방식을 설정하는 것이 중요하다. RetryTemplate을 사용하여 특정 조건에서 재시도하거나, SkipPolicy를 통해 오류를 건너뛸 수 있다.
→ JobRepository 설정:
JobRepository는 메타데이터와 실행 상태를 저장하는 중요한 요소이다. 데이터베이스를 설정할 때는 성능과 트랜잭션을 고려하여 적절한 인덱스와 연결 풀을 구성해야 한다.
→ 스케줄링:
배치 작업은 보통 Spring Scheduler나 Quartz와 같은 스케줄러를 사용하여 일정 간격으로 실행된다. 스케줄러 설정을 통해 배치 Job의 실행 시간을 조정하고, 애플리케이션 리소스를 효율적으로 사용할 수 있다.
▶ Spring Batch의 심화 개념
Spring Batch를 사용해 배치 애플리케이션을 개발할 때 고려해야 할 몇 가지 심화된 개념과 설정들이 있다.
▷ 병렬 처리(Parallel Processing)
배치 작업을 병렬로 실행하면 대용량 데이터 처리 속도를 크게 향상시킬 수 있다. Spring Batch는 여러 가지 병렬 처리 기법을 제공하며, 대표적으로 멀티스레딩(Multi-threading), 파티셔닝(Partitioning), 그리드 처리(Grid Processing) 등이 있다.
→멀티스레딩(Multi-threading):
여러 개의 스레드가 하나의 JVM에서 동시에 실행되는 방식이다. 스레드 풀(Thread Pool)을 사용하여 여러 스레드가 동시에 배치 작업을 수행할 수 있다.
→ 파티셔닝(Partitioning):
데이터를 여러 파티션으로 나누어 각각의 파티션을 개별 스레드나 프로세스에서 처리하는 방식이다. 파티셔닝을 통해 멀티노드 환경에서 병렬 처리가 가능하다.
→ 그리드 처리(Grid Processing):
분산 환경에서의 배치 처리 방법으로, 각 노드에서 배치 작업을 병렬로 실행하고, 결과를 모아서 최종 작업을 완료하는 방식이다.
▷ 청크 기반 처리의 단점
Spring Batch는 기본적으로 청크 기반(Chunk-based)으로 데이터를 처리하지만, 모든 상황에 적합하지 않을 수 있다. 예를 들어, 청크 크기가 너무 작으면 트랜잭션 관리와 I/O 비용이 증가할 수 있고, 너무 크면 메모리 부족 현상이 발생할 수 있다. 이러한 문제를 해결하기 위해서는 청크 크기 조정, 스킵 로직(Skip Logic) 및 재시도 로직(Retry Logic)을 적절히 설정해야 한다.
▷ 리소스 관리와 모니터링
대규모 데이터 처리를 위한 배치 애플리케이션은 CPU, 메모리, 네트워크, 디스크 I/O 등 다양한 리소스를 소비한다. 따라서 Spring Batch를 사용할 때는 애플리케이션의 리소스 사용량을 모니터링하고, 적절한 경계를 설정하여 시스템이 과부하 상태에 빠지지 않도록 해야 한다. 또한, Spring Boot Actuator와 같은 도구를 사용하여 배치 작업의 상태와 성능을 모니터링할 수 있다.
▶ Spring Batch 설정 예시 ② - 재시도와 스킵 로직
Spring Batch는 배치 작업 중 일부 데이터 처리에 실패했을 때 자동으로 재시도하거나 실패한 데이터를 건너뛸 수 있는 기능을 제공한다. 전체 배치 작업이 실패하지 않고, 문제 데이터를 처리할 수 있다.
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.file.FlatFileParseException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
// 스킵 정책 설정: 특정 예외 발생 시 건너뛰기
@Bean
public SkipPolicy skipPolicy() {
return (throwable, skipCount) -> {
if (throwable instanceof FlatFileParseException && skipCount <= 10) {
return true; // 최대 10개까지 건너뛴다.
}
return false;
};
}
// 재시도 정책 설정: 3회 재시도, 고정 대기 시간 설정
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2000); // 2초 대기
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
// Processor 예시
@Bean
@StepScope
public ItemProcessor<Person, Person> processor() {
return person -> {
if ("INVALID".equals(person.getLastName())) {
throw new FlatFileParseException("Invalid data found", null); // 예외를 던져 스킵 로직 활성화
}
// 유효한 데이터는 처리
return new Person(person.getFirstName().toUpperCase(), person.getLastName().toUpperCase());
};
}
→ 위 코드예시는 Spring Batch에서 재시도와 스킵 로직을 설정하는 방법을 보여준다. RetryTemplate을 사용하여 일정한 대기 시간과 최대 재시도 횟수를 설정하고, SkipPolicy를 사용하여 특정 조건에서 예외를 무시하고 데이터를 건너뛴다. 이를 통해 애플리케이션은 안정적으로 대량의 데이터를 처리할 수 있다.
▶ Spring Batch와 Spring Integration 연동
Spring Batch는 Spring Integration과 연동하여 배치 작업을 더욱 유연하고 자동화된 방식으로 처리할 수 있다. 예를 들어, 파일이 업로드되면 자동으로 배치 작업이 실행되도록 트리거하거나, 배치 작업 완료 후에 결과를 메시지 큐로 전송하여 다른 서비스와 통신할 수 있다.
import org.springframework.batch.core.Job;
import org.springframework.batch.integration.launch.JobLaunchingGateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.messaging.MessageChannel;
@EnableIntegration
@Configuration
public class BatchIntegrationConfig {
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel(); // 파일 입력 채널 설정
}
@Bean
public FileReadingMessageSource fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File("input")); // 파일을 읽어올 디렉토리 지정
source.setFilter(new SimplePatternFileListFilter("*.csv")); // 읽어올 파일의 패턴 설정
return source;
}
@MessagingGateway(defaultRequestChannel = "fileInputChannel")
public interface JobGateway {
void launchJob(Job job); // 메시지를 통한 Job 실행 트리거
}
@Bean
public JobLaunchingGateway jobLaunchingGateway(JobLauncher jobLauncher) {
return new JobLaunchingGateway(jobLauncher); // Spring Batch의 JobLauncher를 사용한 Job 실행
}
}
→ 위 코드예시는 Spring Integration을 사용하여 파일 시스템에서 CSV 파일을 읽고, 해당 파일을 처리하기 위해 Spring Batch의 Job을 실행하는 방법을 보여준다.
▶ 정리
Spring Batch의 병렬 처리와 청크 기반 처리의 장단점, 리소스 관리 및 모니터링, Spring Integration과의 연동을 통한 자동화된 배치 처리 등 심화된 주제를 정리하면서. Spring Batch를 더욱 효과적으로 활용하는 법을 조금은 이해하게 됐는데,
항상 예시코드들은 간략하게 구현하니까 이렇게 하면 되겠다! 하면서도 실제로 적용해보면 꼭 문제가 생긴다. 개인프로젝트를 했다면 내가 이걸 빨리 구현해낼 수 있었을까😵💫
'개발Article' 카테고리의 다른 글
[TIL]20240829 웹훅(Webhook) (0) | 2024.08.29 |
---|---|
[TIL]20240828 인 컨텍스트 러닝(In-Context Learning) (1) | 2024.08.28 |
[TIL]20240826 파인튜닝(Fine-Tuning) (0) | 2024.08.26 |
[WIL]20240825 결제 기능을 구현할 때 꼭 숙지해야 할 문제점 (0) | 2024.08.25 |
[TIL]20240824 MSA(Microservices Architecture) (0) | 2024.08.24 |