ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Batch 데이터베이스와 함께 사용하기
    프로젝트/스프링 배치 튜토리얼 2023. 10. 11. 00:01

    개요

    지난 글에서는 Job을 만들고 메모리에서 값을 생성해 내고 log를 write 해내는 방식으로 작업을 수행하였습니다.

    이번글에서는 Database와 연동하여 값을 읽고 쓰는 작업을 수행해보고자 합니다.

     

    Gradle Dependencies 추가

    dependencies {
    	implementation ("org.springframework.boot:spring-boot-starter-data-jpa")
    }

    jpa 관련 의존성을 추가해주었습니다.

     

    TestEntity, TestRepository 세팅 및 데이터 생성

    @Table(name = "test_table")
    @Entity
    class TestJpaEntity(
        @Id
        val id: UUID = UUID.randomUUID(),
    
        @Enumerated(EnumType.STRING)
        var status: TestStatus = TestStatus.WAITING,
    
        @Column
        val createdAt: Instant = Instant.now(),
    )
    
    enum class TestStatus{
        WAITING,
        READY,
        COMPLETE
    }
    
    
    interface TestJpaEntityRepository: JpaRepository<TestJpaEntity, UUID> {
    
    }
    
    @Component
    class InitDatabase(
    	private val testJpaEntityRepository: TestJpaEntityRepository,
    ): ApplicationRunner {
    	override fun run(args: ApplicationArguments) {
    		repeat(10){
    			testJpaEntityRepository.save(TestJpaEntity())
    		}
    		println()
    	}
    }

    JpaEntity와  Repository를 만들어주었습니다.

    WAITING -> READY로 만들어보는 배치를 위해 TestStatus라는 enum을 두었습니다.

    ApplicationRunner를 사용하여 SpringBootAppliation이 기동 될 때 정보를 10개 가지도록 세팅해 두었습니다.

     

    println()에 디버깅을 걸어두고 DB를 잠깐보고 오면 다음과 같이 데이터세팅이 잘 되어있습니다.

     

    실제 배치 시 에러가 발생해서 대응 포인트

    BatchApplication의 @Bean이 등록되면서 Reader에서는 Data를 읽어옵니다.

    하지만 ApplicationRunner나 @PostContructor를 활용하고자 했을 때 빈이 다 만들어지고 등록되기 전에 수행되었기 때문에 Reader시에 데이터를 읽을 수 없다는 예외가 발생하였습니다.

     

    docker를 활용하여 Database는 항시 띄워두었기 때문에 해결하기 위해 하이버네이트 옵션을 변경하였습니다.

    (create-drop에서 update로)

    ddl-auto: update

     

    이제 database에 10건씩 데이터가 지속적으로 쌓이게 됩니다. 

     

    Job 만들기 - Reader

    @Profile("database-job")
    @Configuration
    class DataBaseJobConfig(
        private val jobRepository: JobRepository,
        private val batchTransactionManager: PlatformTransactionManager
    ) {
    
        @Bean
        fun databaseJob(
            chunkStep: Step
        ): Job {
            return JobBuilder("database job", jobRepository)
                .incrementer(RunIdIncrementer())
                .start(chunkStep)
                .build()
        }
    
        @Bean
        fun chunkStep(
            reader: ItemReader<out ReaderDto>,
        ): Step {
            return StepBuilder("database step", jobRepository)
                .chunk<ReaderDto, ReaderDto>(BATCH_SIZE, batchTransactionManager)
                .reader(reader)
                .writer(ClassItemWriter())
                .build()
        }
    
        @StepScope
        @Bean
        fun findTestJpaEntityReader(
            dataSource: DataSource,
        ): JdbcCursorItemReader<ReaderDto> {
            return JdbcCursorItemReaderBuilder<ReaderDto>()
                .name("testDBReader")
                .dataSource(dataSource)
                .sql(findTestSQL)
                .rowMapper(DataClassRowMapper(ReaderDto::class.java))
                .build()
        }
    
        class ClassItemWriter: ItemWriter<ReaderDto> {
            override fun write(itmes: Chunk<out ReaderDto>) {
                logger.info("Writing item start")
                itmes.forEach {
                    logger.info("Writing item : {}", it)
                }
            }
        }
    
        companion object{
            val findTestSQL = """
                SELECT * FROM test_table
            """.trimIndent()
    
            const val BATCH_SIZE = 5
        }
    }
    
    
    data class ReaderDto(
        val id: UUID,
        val status: TestStatus,
    )

    Writer부분은 Log를 출력하도록 그대로 유지하고 Reader부분에는 JdbcCursorItemReader로 Database 정보를 읽어올 수 있도록 구성하였습니다.

     

    SQL문을 직접 작성해 주어 해당 조건에 해당하는 데이터들을 읽어옵니다.

     

    Job Parameter 설정 - ItemReader

        @StepScope
        @Bean
        fun findTestJpaEntityReader(
            dataSource: DataSource,
            @Value("#{jobParameters['current.date']}") currentDate: String,
        ): JdbcCursorItemReader<ReaderDto> {
            logger.info { "currentDate: $currentDate" }
    
            return JdbcCursorItemReaderBuilder<ReaderDto>()
                .name("testDBReader")
                .dataSource(dataSource)
                .sql(findTestSQL)
                .rowMapper(DataClassRowMapper(ReaderDto::class.java))
                .build()
        }

    reader에서 job parameter를 주입해 볼 수도 있습니다.

     

     

    Job 만들기 - ItemWriter

        @StepScope
        @Bean
        fun updateStatusWaitingToReadyItemWriter(): ItemWriter<ReaderDto> {
            return ItemWriter { items ->
                items.run {
                    logger.info("Writing item start")
                    items.forEach {
                        logger.info("Writing item : {}", it)
                        testJpaEntityRepository.save(TestJpaEntity(id= it.id, status = TestStatus.READY))
                    }
                }
            }
        }

    Reader에서 읽어온 id의 status의 값을 READY로 update 합니다.

    JPA에서는 id를 저장할 때 이미 존재하는 값이라면 save를 수행할 때 update를 수행합니다.

     

     

    실행

    BatchApplication 실행 후 Waiting이 Ready로 변환됨을 확인하였습니다.

    기존에 존재하던 60개의 데이터가 READY상태로 변환되었고, 신규로 10개의 WAITING상태가 만들어졌습니다.

     

    댓글

Designed by Tistory.