프로젝트/스프링 배치 튜토리얼

Spring Batch 데이터베이스와 함께 사용하기

Junuuu 2023. 10. 11. 00:01
반응형

개요

지난 글에서는 Job을 만들고 메모리에서 값을 생성해 내고 log를 write 해내는 방식으로 작업을 수행하였습니다.

이번글에서는 Database와 연동하여 값을 읽고 쓰는 작업을 수행해보고자 합니다.

 

Gradle Dependencies 추가

dependencies {
	implementation ("org.springframework.boot:spring-boot-starter-data-jpa")
}

jpa 관련 의존성을 추가해주었습니다.

 

TestEntity, TestRepository 세팅 및 데이터 생성

@Table(name = "test_table")
@Entity
class TestJpaEntity(
    @Id
    val id: UUID = UUID.randomUUID(),

    @Enumerated(EnumType.STRING)
    var status: TestStatus = TestStatus.WAITING,

    @Column
    val createdAt: Instant = Instant.now(),
)

enum class TestStatus{
    WAITING,
    READY,
    COMPLETE
}


interface TestJpaEntityRepository: JpaRepository<TestJpaEntity, UUID> {

}

@Component
class InitDatabase(
	private val testJpaEntityRepository: TestJpaEntityRepository,
): ApplicationRunner {
	override fun run(args: ApplicationArguments) {
		repeat(10){
			testJpaEntityRepository.save(TestJpaEntity())
		}
		println()
	}
}

JpaEntity와  Repository를 만들어주었습니다.

WAITING -> READY로 만들어보는 배치를 위해 TestStatus라는 enum을 두었습니다.

ApplicationRunner를 사용하여 SpringBootAppliation이 기동 될 때 정보를 10개 가지도록 세팅해 두었습니다.

 

println()에 디버깅을 걸어두고 DB를 잠깐보고 오면 다음과 같이 데이터세팅이 잘 되어있습니다.

 

실제 배치 시 에러가 발생해서 대응 포인트

BatchApplication의 @Bean이 등록되면서 Reader에서는 Data를 읽어옵니다.

하지만 ApplicationRunner나 @PostContructor를 활용하고자 했을 때 빈이 다 만들어지고 등록되기 전에 수행되었기 때문에 Reader시에 데이터를 읽을 수 없다는 예외가 발생하였습니다.

 

docker를 활용하여 Database는 항시 띄워두었기 때문에 해결하기 위해 하이버네이트 옵션을 변경하였습니다.

(create-drop에서 update로)

ddl-auto: update

 

이제 database에 10건씩 데이터가 지속적으로 쌓이게 됩니다. 

 

Job 만들기 - Reader

@Profile("database-job")
@Configuration
class DataBaseJobConfig(
    private val jobRepository: JobRepository,
    private val batchTransactionManager: PlatformTransactionManager
) {

    @Bean
    fun databaseJob(
        chunkStep: Step
    ): Job {
        return JobBuilder("database job", jobRepository)
            .incrementer(RunIdIncrementer())
            .start(chunkStep)
            .build()
    }

    @Bean
    fun chunkStep(
        reader: ItemReader<out ReaderDto>,
    ): Step {
        return StepBuilder("database step", jobRepository)
            .chunk<ReaderDto, ReaderDto>(BATCH_SIZE, batchTransactionManager)
            .reader(reader)
            .writer(ClassItemWriter())
            .build()
    }

    @StepScope
    @Bean
    fun findTestJpaEntityReader(
        dataSource: DataSource,
    ): JdbcCursorItemReader<ReaderDto> {
        return JdbcCursorItemReaderBuilder<ReaderDto>()
            .name("testDBReader")
            .dataSource(dataSource)
            .sql(findTestSQL)
            .rowMapper(DataClassRowMapper(ReaderDto::class.java))
            .build()
    }

    class ClassItemWriter: ItemWriter<ReaderDto> {
        override fun write(itmes: Chunk<out ReaderDto>) {
            logger.info("Writing item start")
            itmes.forEach {
                logger.info("Writing item : {}", it)
            }
        }
    }

    companion object{
        val findTestSQL = """
            SELECT * FROM test_table
        """.trimIndent()

        const val BATCH_SIZE = 5
    }
}


data class ReaderDto(
    val id: UUID,
    val status: TestStatus,
)

Writer부분은 Log를 출력하도록 그대로 유지하고 Reader부분에는 JdbcCursorItemReader로 Database 정보를 읽어올 수 있도록 구성하였습니다.

 

SQL문을 직접 작성해 주어 해당 조건에 해당하는 데이터들을 읽어옵니다.

 

Job Parameter 설정 - ItemReader

    @StepScope
    @Bean
    fun findTestJpaEntityReader(
        dataSource: DataSource,
        @Value("#{jobParameters['current.date']}") currentDate: String,
    ): JdbcCursorItemReader<ReaderDto> {
        logger.info { "currentDate: $currentDate" }

        return JdbcCursorItemReaderBuilder<ReaderDto>()
            .name("testDBReader")
            .dataSource(dataSource)
            .sql(findTestSQL)
            .rowMapper(DataClassRowMapper(ReaderDto::class.java))
            .build()
    }

reader에서 job parameter를 주입해 볼 수도 있습니다.

 

 

Job 만들기 - ItemWriter

    @StepScope
    @Bean
    fun updateStatusWaitingToReadyItemWriter(): ItemWriter<ReaderDto> {
        return ItemWriter { items ->
            items.run {
                logger.info("Writing item start")
                items.forEach {
                    logger.info("Writing item : {}", it)
                    testJpaEntityRepository.save(TestJpaEntity(id= it.id, status = TestStatus.READY))
                }
            }
        }
    }

Reader에서 읽어온 id의 status의 값을 READY로 update 합니다.

JPA에서는 id를 저장할 때 이미 존재하는 값이라면 save를 수행할 때 update를 수행합니다.

 

 

실행

BatchApplication 실행 후 Waiting이 Ready로 변환됨을 확인하였습니다.

기존에 존재하던 60개의 데이터가 READY상태로 변환되었고, 신규로 10개의 WAITING상태가 만들어졌습니다.