Spring Batch 데이터베이스와 함께 사용하기
개요
지난 글에서는 Job을 만들고 메모리에서 값을 생성해 내고 log를 write 해내는 방식으로 작업을 수행하였습니다.
이번글에서는 Database와 연동하여 값을 읽고 쓰는 작업을 수행해보고자 합니다.
Gradle Dependencies 추가
dependencies {
implementation ("org.springframework.boot:spring-boot-starter-data-jpa")
}
jpa 관련 의존성을 추가해주었습니다.
TestEntity, TestRepository 세팅 및 데이터 생성
@Table(name = "test_table")
@Entity
class TestJpaEntity(
@Id
val id: UUID = UUID.randomUUID(),
@Enumerated(EnumType.STRING)
var status: TestStatus = TestStatus.WAITING,
@Column
val createdAt: Instant = Instant.now(),
)
enum class TestStatus{
WAITING,
READY,
COMPLETE
}
interface TestJpaEntityRepository: JpaRepository<TestJpaEntity, UUID> {
}
@Component
class InitDatabase(
private val testJpaEntityRepository: TestJpaEntityRepository,
): ApplicationRunner {
override fun run(args: ApplicationArguments) {
repeat(10){
testJpaEntityRepository.save(TestJpaEntity())
}
println()
}
}
JpaEntity와 Repository를 만들어주었습니다.
WAITING -> READY로 만들어보는 배치를 위해 TestStatus라는 enum을 두었습니다.
ApplicationRunner를 사용하여 SpringBootAppliation이 기동 될 때 정보를 10개 가지도록 세팅해 두었습니다.
println()에 디버깅을 걸어두고 DB를 잠깐보고 오면 다음과 같이 데이터세팅이 잘 되어있습니다.
실제 배치 시 에러가 발생해서 대응 포인트
BatchApplication의 @Bean이 등록되면서 Reader에서는 Data를 읽어옵니다.
하지만 ApplicationRunner나 @PostContructor를 활용하고자 했을 때 빈이 다 만들어지고 등록되기 전에 수행되었기 때문에 Reader시에 데이터를 읽을 수 없다는 예외가 발생하였습니다.
docker를 활용하여 Database는 항시 띄워두었기 때문에 해결하기 위해 하이버네이트 옵션을 변경하였습니다.
(create-drop에서 update로)
이제 database에 10건씩 데이터가 지속적으로 쌓이게 됩니다.
Job 만들기 - Reader
@Profile("database-job")
@Configuration
class DataBaseJobConfig(
private val jobRepository: JobRepository,
private val batchTransactionManager: PlatformTransactionManager
) {
@Bean
fun databaseJob(
chunkStep: Step
): Job {
return JobBuilder("database job", jobRepository)
.incrementer(RunIdIncrementer())
.start(chunkStep)
.build()
}
@Bean
fun chunkStep(
reader: ItemReader<out ReaderDto>,
): Step {
return StepBuilder("database step", jobRepository)
.chunk<ReaderDto, ReaderDto>(BATCH_SIZE, batchTransactionManager)
.reader(reader)
.writer(ClassItemWriter())
.build()
}
@StepScope
@Bean
fun findTestJpaEntityReader(
dataSource: DataSource,
): JdbcCursorItemReader<ReaderDto> {
return JdbcCursorItemReaderBuilder<ReaderDto>()
.name("testDBReader")
.dataSource(dataSource)
.sql(findTestSQL)
.rowMapper(DataClassRowMapper(ReaderDto::class.java))
.build()
}
class ClassItemWriter: ItemWriter<ReaderDto> {
override fun write(itmes: Chunk<out ReaderDto>) {
logger.info("Writing item start")
itmes.forEach {
logger.info("Writing item : {}", it)
}
}
}
companion object{
val findTestSQL = """
SELECT * FROM test_table
""".trimIndent()
const val BATCH_SIZE = 5
}
}
data class ReaderDto(
val id: UUID,
val status: TestStatus,
)
Writer부분은 Log를 출력하도록 그대로 유지하고 Reader부분에는 JdbcCursorItemReader로 Database 정보를 읽어올 수 있도록 구성하였습니다.
SQL문을 직접 작성해 주어 해당 조건에 해당하는 데이터들을 읽어옵니다.
Job Parameter 설정 - ItemReader
@StepScope
@Bean
fun findTestJpaEntityReader(
dataSource: DataSource,
@Value("#{jobParameters['current.date']}") currentDate: String,
): JdbcCursorItemReader<ReaderDto> {
logger.info { "currentDate: $currentDate" }
return JdbcCursorItemReaderBuilder<ReaderDto>()
.name("testDBReader")
.dataSource(dataSource)
.sql(findTestSQL)
.rowMapper(DataClassRowMapper(ReaderDto::class.java))
.build()
}
reader에서 job parameter를 주입해 볼 수도 있습니다.
Job 만들기 - ItemWriter
@StepScope
@Bean
fun updateStatusWaitingToReadyItemWriter(): ItemWriter<ReaderDto> {
return ItemWriter { items ->
items.run {
logger.info("Writing item start")
items.forEach {
logger.info("Writing item : {}", it)
testJpaEntityRepository.save(TestJpaEntity(id= it.id, status = TestStatus.READY))
}
}
}
}
Reader에서 읽어온 id의 status의 값을 READY로 update 합니다.
JPA에서는 id를 저장할 때 이미 존재하는 값이라면 save를 수행할 때 update를 수행합니다.
실행
BatchApplication 실행 후 Waiting이 Ready로 변환됨을 확인하였습니다.
기존에 존재하던 60개의 데이터가 READY상태로 변환되었고, 신규로 10개의 WAITING상태가 만들어졌습니다.