Notice
Recent Posts
Recent Comments
Link
«   2024/10   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31
Tags more
Archives
Today
Total
관리 메뉴

밤빵's 개발일지

[TIL]20241003 청크 기반 데이터 처리 본문

개발Article

[TIL]20241003 청크 기반 데이터 처리

최밤빵 2024. 10. 3. 23:31

대용량 데이터를 한 번에 처리하지 않고, 작은 단위로 나누어 작업하는 청크 기반 처리(Chunk-Based Processing)라는 개념을 알게 되고, 이 방식이 성능과 효율성에 어떤 장점이 있는지, 그리고 어떻게 구현할 수 있는지 궁금해졌다.

 

▶ 청크 기반 처리란?

청크(Chunk)란 데이터의 큰 덩어리를 작은 단위로 나눈 것을 의미한다. 청크 기반 처리는 데이터를 한꺼번에 읽고 처리하는 대신, 일정 크기의 청크로 나누어 처리함으로써 메모리 사용을 절약하고 시스템 자원을 보다 효율적으로 활용할 수 있는 방법이다. 예를 들어, 수십만 건의 데이터가 있을 때 한 번에 모든 데이터를 메모리에 적재하여 처리하면 시스템에 무리가 가기 쉽다. 이 경우 데이터를 청크 단위로 읽고, 처리하고, 저장하여 효율적으로 작업할 수 있다.

 

▶ 청크 기반 처리의 장점

→ 메모리 효율성:

대량의 데이터를 한꺼번에 메모리에 적재하지 않고, 작은 청크 단위로 나누어 메모리를 효율적으로 사용한다.

→ 성능 향상:

I/O 작업에서 지연 시간을 줄이고, 시스템 리소스를 아낄 수 있어 성능이 크게 향상된다.

→ 에러 관리 용이성:

특정 청크에서 문제가 발생해도, 해당 청크만 재처리하면 되기 때문에 복구가 쉽다.

 

▶ 청크 기반 파일 처리 예시 (Java & Spring Batch)

▷ 스프링 배치를 사용해 CSV 파일을 100개 행씩 청크로 나누어 읽고, 데이터를 가공한 후 출력

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class ChunkBasedFileProcessingConfig {

    private final StepBuilderFactory stepBuilderFactory;

    public ChunkBasedFileProcessingConfig(StepBuilderFactory stepBuilderFactory) {
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public FlatFileItemReader<MyEntity> reader() {
        FlatFileItemReader<MyEntity> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("data.csv"));
        reader.setLineMapper(new DefaultLineMapper<>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames("id", "name", "value");
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                setTargetType(MyEntity.class);
            }});
        }});
        return reader;
    }

    @Bean
    public ItemProcessor<MyEntity, MyEntity> processor() {
        return item -> {
            item.setValue(item.getValue().toUpperCase());  // 예: 값을 대문자로 변환
            return item;
        };
    }

    @Bean
    public ItemWriter<MyEntity> writer() {
        return items -> items.forEach(System.out::println);  // 콘솔 출력
    }

    @Bean
    public Step chunkBasedFileStep() {
        return stepBuilderFactory.get("chunkBasedFileStep")
                .<MyEntity, MyEntity>chunk(100)  // 100개 행을 청크로 처리
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }
}

→ reader: FlatFileItemReader를 사용해 CSV 파일을 읽어온다. data.csv 파일에서 데이터를 읽어오며, 파일 내 각 행은 id, name, value라는 세 필드로 구성되어 있다.

DelimitedLineTokenizer는 CSV 파일의 구분자를 설정하여 각 필드를 나눈다.

BeanWrapperFieldSetMapper를 통해 파일 데이터를 MyEntity 클래스의 필드에 매핑한다.

processor: ItemProcessor 인터페이스를 사용해 데이터 가공 로직을 정의한다. 여기서는 value 필드를 대문자로 변환하는 간단한 처리를 한다.

writer: ItemWriter를 사용해 데이터를 출력한다. 청크 단위로 처리된 데이터를 콘솔에 출력하도록 설정되어 있다.

Step 설정: chunkBasedFileStep에서 chunk(100) 설정을 통해 100개 행을 하나의 청크로 나누어 처리하도록 구성한다. reader, processor, writer를 연결하여 CSV 파일의 데이터를 100개씩 읽고 가공하여 출력하는 작업을 수행한다.

 

▶청크 기반 데이터베이스 처리 예시 (Java & Spring Batch)

▷데이터베이스에서 청크 단위로 데이터를 조회하고, 가공 후 저장하는 방식. JdbcPagingItemReader를 사용하여 1000개 레코드씩 읽고, 특정 처리를 한 후 결과를 저장한다.

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import javax.sql.DataSource;

@Configuration
public class ChunkBasedBatchConfig {

    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    public ChunkBasedBatchConfig(StepBuilderFactory stepBuilderFactory, DataSource dataSource) {
        this.stepBuilderFactory = stepBuilderFactory;
        this.dataSource = dataSource;
    }

    @Bean
    public JdbcPagingItemReader<MyEntity> reader() {
        JdbcPagingItemReader<MyEntity> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(dataSource);
        reader.setPageSize(1000);  // 청크 크기를 1000으로 설정
        reader.setRowMapper(new BeanPropertyRowMapper<>(MyEntity.class));

        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("SELECT *");
        queryProvider.setFromClause("FROM my_table");
        queryProvider.setSortKey("id");

        reader.setQueryProvider(queryProvider.getObject());
        return reader;
    }

    @Bean
    public ItemProcessor<MyEntity, MyProcessedEntity> processor() {
        return item -> new MyProcessedEntity(item.getProcessedField());
    }

    @Bean
    public ItemWriter<MyProcessedEntity> writer() {
        return items -> items.forEach(System.out::println);
    }

    @Bean
    public Step chunkBasedStep() {
        return stepBuilderFactory.get("chunkBasedStep")
                .<MyEntity, MyProcessedEntity>chunk(1000)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }
}

→ reader: JdbcPagingItemReader를 사용해 my_table에서 데이터를 읽어온다. 데이터베이스에서 1000개의 레코드를 한 번에 가져온 후 MyEntity 클래스에 매핑한다.

SqlPagingQueryProviderFactoryBean을 통해 데이터베이스 쿼리를 설정하고, id 필드를 기준으로 정렬하여 데이터를 페이지 단위로 가져온다.

processor: MyEntity의 데이터를 가공하여 MyProcessedEntity 객체로 변환한다. 이 부분에서 원하는 데이터 변환 로직을 정의할 수 있다.

writer: 가공된 데이터를 콘솔에 출력한다. 실제 환경에서는 데이터베이스에 저장하거나 파일로 출력할 수 있다.

Step 설정: chunkBasedStep에서 chunk(1000) 설정을 통해 1000개 레코드를 한 번에 읽고, 가공 후 출력하도록 설정한다.

 

▶청크 기반 처리와 스프링 배치의 장점 비교

청크 기반 처리와 스프링 배치는 모두 대용량 데이터를 효율적으로 처리하기 위한 프레임워크이고, 두 방식은 상호 보완적인 역할을 한다. 특히, 청크 기반 처리는 스프링 배치의 중심 개념 중 하나로, 스프링 배치를 활용해 청크 단위로 데이터를 읽고 가공하고 저장할 수 있다.

 

▷효율적인 메모리 사용:

→ 청크 기반 처리:

데이터를 일정 크기의 청크로 나누어 처리함으로써 메모리 과부하를 방지할 수 있다. 대량의 데이터를 한꺼번에 처리하지 않고, 각 청크 단위로 메모리에 적재하고 처리 후 바로 메모리에서 제거한다.

스프링 배치:

스프링 배치는 이러한 청크 기반 처리 모델을 기본으로 구현되어 있어, 대량 데이터의 효율적인 메모리 사용을 보장한다. 또한, 메모리 사용량을 조절하는 다양한 옵션을 제공하여 대용량 데이터를 안전하게 처리할 수 있다.

▷고가용성과 장애 복구:

청크 기반 처리:

청크 단위로 데이터를 처리하므로 특정 청크에서 문제가 발생해도 해당 청크만 재처리할 수 있어 안정적이다. 예를 들어, 청크 중 하나에서 에러가 발생하더라도 나머지 청크는 정상적으로 처리할 수 있다.

스프링 배치:

스프링 배치는 단계별로 작업을 구성하여 청크 처리 실패 시 재시도, 오류 처리, 중단된 작업의 재시작 기능을 지원한다. 이러한 기능 덕분에 안정적인 대규모 데이터 처리가 가능하다.

▷개발 생산성 향상:

청크 기반 처리:

데이터를 처리하는 단계를 단순화하고, 작은 데이터 묶음으로 작업을 나눔으로써 코드를 이해하고 유지 관리하기 쉬워진다.

스프링 배치:

스프링 배치는 청크 기반 처리를 쉽게 설정할 수 있는 다양한 구성 요소와 API를 제공하여, 개발자가 복잡한 대용량 처리 로직을 일일이 구현하지 않아도 되도록 지원한다. 설정만으로 효율적인 데이터 처리 파이프라인을 구축할 수 있어 개발 생산성이 높아진다.

 

청크 기반 처리는 특히 대용량 데이터를 처리할 때 메모리 효율성을 높이고 성능을 향상시킬 수 있는 유용한 방법이다. 또한, 오류가 발생했을 때 개별 청크만 재처리할 수 있어 복구가 용이하다.