프로젝트/redis

[Spring Boot + Kotlin] Redis Transaction 실습

Junuuu 2023. 11. 7. 00:01
728x90

개요

공식문서로 알아보는 Redis Transaction에 이어 Spring Boot+ Kotlin을 활용하여 Redis Transaction 실습을 진행해보려 합니다.

 

 

테스트를 위한 데이터 세팅

@Component
class LocalRedisDateSetter(
    private val stringRedisTemplate: StringRedisTemplate,
) {

    @PostConstruct
    fun localDataSet() {
        stringRedisTemplate.opsForHash<String, String>().put("productItem", "104", "30")
        val valueOperation = stringRedisTemplate.opsForValue()
        valueOperation.set("test","test")
    }
}

@PostConstruct를 활용하여 key가 productItem이고 field가 104(상품 id)이며 value(재고)가 30인 hashOperation을 수행합니다.

 

Redis Transaction 활용

@Component
class StockAdapter(
    private val stringRedisTemplate: StringRedisTemplate,
) {
    fun decrease(){
        val key = PRODUCT_ITEM_ID_KEY
        val stockCount = stringRedisTemplate.opsForHash<String, String>().get(key, STOCK_ID)
            ?: throw NoSuchElementException("해당 id로 조회할 수 없습니다.")
        logger.info { "감소 전 남은 재고 = $stockCount" }

        stringRedisTemplate.execute {
            it.watch(key.toByteArray())
            it.multi()
            if(stockCount.toLong() < 1){
                throw IllegalStateException("재고는 0미만일 수 없습니다. 재고를 채워주세요")
            }
            it.hashCommands().hIncrBy(key.toByteArray(), STOCK_ID.toByteArray(), -1)
            return@execute it.exec()
        }
    }

    companion object{
        const val PRODUCT_ITEM_ID_KEY = "productItem"
        const val STOCK_ID = "104"
    }
}

위의 코드는 재고를 가져와 감소시키는데 CAS연산인 watch를 활용하여 다른 트랜잭션에서 변경된 경우 수행하지 않아 재고가 항상 음수로 떨어지지 않도록 구현된 코드입니다.

 

단순한 테스트를 위해 원래라면 변수로 받아야 할 key와 filed를 상수로 정의해 두었습니다.

해당 값들은 테스트를 위해 세팅했던 데이터들과 동일합니다.

 

 

Redis Transaction 동시성 테스트

	@Test
	fun `30개의 재고가 존재할 때 50개의 요청을 동시에 보내면 0개의 재고가 남아있어야 한다`(){
		val numberOfThreads = 50
		val executorService = newFixedThreadPool(numberOfThreads)
		val latch = CountDownLatch(numberOfThreads)

		repeat(numberOfThreads) {
			executorService.submit {
				try {
					decreaseStockService.decrease()
				} finally {
					latch.countDown()
				}
			}
		}
		latch.await()

		val actual = stringRedisTemplate.opsForHash<String, String>().get("productItem", "104")
		assertEquals(actual, "0")
	}

50개의 ThreadPool을 생성하고 decreaseStockService를 호출한 뒤 재고가 0개인지 확인합니다.

이때 초기 재고는 30개로 세팅해 두었습니다.

테스트를 수행해 보면 잘 통과됩니다.

 

 

Redis Transaction에서 GET을 수행하면 Null이 반환된다.

@Component
class StockAdapter(
    private val stringRedisTemplate: StringRedisTemplate,
) {
    fun decrease(){
        val key = PRODUCT_ITEM_ID_KEY
        val stockCount = stringRedisTemplate.opsForHash<String, String>().get(key, STOCK_ID)
            ?: throw NoSuchElementException("해당 id로 조회할 수 없습니다.")
        logger.info { "감소 전 남은 재고 = $stockCount" }

        stringRedisTemplate.execute {
            it.watch(key.toByteArray())
            it.multi()
            if(stockCount.toLong() < 1){
                throw IllegalStateException("재고는 0미만일 수 없습니다. 재고를 채워주세요")
            }
            it.hashCommands().hIncrBy(key.toByteArray(), STOCK_ID.toByteArray(), -1)
            return@execute it.exec()
        }
    }

    companion object{
        const val PRODUCT_ITEM_ID_KEY = "productItem"
        const val STOCK_ID = "104"
    }
}

처음의 코드를 다시 보면 get을 해오는 부분은 트랜잭션 밖으로 빠져있습니다.

트랜잭션 내부에서 값을 가져오면 될 텐데 왜 이렇게 구현되었을까요?

 

hashOperations의 get 메서드

hashOperations의 get메서드의 javaDocs을 읽어보면 트랜잭션 내부에 활용하는 경우에는 null을 반환한다고 합니다.

 

실제로 그런지 예제 코드를 작성하여 테스트해 보겠습니다.

@Component
class StockAdapter(
    private val stringRedisTemplate: StringRedisTemplate,
) {
    fun decrease(){
        val key = PRODUCT_ITEM_ID_KEY
        val stockCount = stringRedisTemplate.opsForHash<String, String>().get(key, STOCK_ID)
            ?: throw NoSuchElementException("해당 id로 조회할 수 없습니다.")
        logger.info { "감소 전 남은 재고 = $stockCount" }

        stringRedisTemplate.execute {
            it.watch(key.toByteArray())
            it.multi()
            val stockCountInTransaction = it.hashCommands().hMGet(key.toByteArray(), STOCK_ID.toByteArray())
            logger.info { "트랜잭션에서 GET 해오면 Null일까? = $stockCountInTransaction" }
            val testValueInTransaction = stringRedisTemplate.opsForValue().get("test")
            logger.info { "트랜잭션에서 GET을 해오면 Null일까? = $testValueInTransaction" }
            if(stockCount.toLong() < 1){
                throw IllegalStateException("재고는 0미만일 수 없습니다. 재고를 채워주세요")
            }
            it.hashCommands().hIncrBy(key.toByteArray(), STOCK_ID.toByteArray(), -1)
            return@execute it.exec()
        }
    }

    companion object{
        const val PRODUCT_ITEM_ID_KEY = "productItem"
        const val STOCK_ID = "104"
    }
}

2가지 GET을 해오는 방식으로 테스트를 수행해 보았습니다.

  • 트랜잭션에서 GET
  • 트랜잭션내부에서 실행되지만 StringRedisTemplate으로 GET

첫 번째 방식은 Null이 반환되고 두 번째 방식은 실제 값이 반환됩니다.

두번째 방식처럼 트랜잭션의 multi()와 exec 사이에서 실행한다고 해당 명령이 트랜잭션 안으로 묶이지 않습니다.

단지 눈으로 보기에 트랜잭션으로 묶여있는 것처럼 보일 뿐입니다.

 

 

 

Redis Transaction에서 예외가 발생하면?

@Component
class RedisTransactionRollbackTest(
    private val stringRedisTemplate: StringRedisTemplate,
) {

    @PostConstruct
    fun rollbackTest(){
        stringRedisTemplate.opsForHash<String, String>().put(PRODUCT_ITEM_ID_KEY, STOCK_ID_TEST, "1")

        val stockCountBeforeTransaction = stringRedisTemplate.opsForHash<String, String>().get(PRODUCT_ITEM_ID_KEY, STOCK_ID_TEST)
            ?: throw NoSuchElementException("해당 id로 조회할 수 없습니다.")
        logger.info { "트랜잭션 호출 전 남은 재고 = $stockCountBeforeTransaction" }

        stringRedisTemplate.execute{
            it.multi()
            it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY.toByteArray(), STOCK_ID_TEST.toByteArray(), -1)
            throw RuntimeException("트랜잭션 에서 예외가 발생하면 어떻게 될까?")
            it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY.toByteArray(), STOCK_ID_TEST.toByteArray(), -1)

            return@execute it.exec()
        }

        val stockCountAfterTransaction = stringRedisTemplate.opsForHash<String, String>().get(PRODUCT_ITEM_ID_KEY, STOCK_ID_TEST)
            ?: throw NoSuchElementException("해당 id로 조회할 수 없습니다.")
        logger.info { "트랜잭션 호출 후 남은 재고 = $stockCountAfterTransaction" }

    }

    companion object{
        const val PRODUCT_ITEM_ID_KEY = "productItem"
        const val STOCK_ID_TEST = "105"
    }

}

105라는 상품 id를 가진 상품의 재고를 1로 세팅합니다.

이후 트랜잭션을 선언하고 재고를 -1 그리고 예외가 발생하고 -1을 다시 호출합니다.

트랜잭션이 실행되고 예외가 발생하면 어떻게 될까요?

 

exec()를 넘겨주기도 전에 예외가 발생했으므로 모든 트랜잭션 명령어는 전달되지 않습니다.

Redis에 HGET을 활용하여 실제 재고를 조회해 봐도 1개의 수량이 조회됩니다.

 

이번에는 exec()를 넘겨주지만 트랜잭션 명령어를 실행하다가 예외가 발생한 경우를 테스트해보고자 합니다.

@Component
class RedisTransactionRollbackTest(
    private val stringRedisTemplate: StringRedisTemplate,
) {

    fun rollbackTest(){
        stringRedisTemplate.opsForHash<String, String>().put(PRODUCT_ITEM_ID_KEY, STOCK_ID_TEST, "1")
        stringRedisTemplate.opsForHash<String, String>().put(PRODUCT_ITEM_ID_KEY, "1234", "stringValue")

        val stockCountBeforeTransaction = stringRedisTemplate.opsForHash<String, String>().get(PRODUCT_ITEM_ID_KEY, STOCK_ID_TEST)
            ?: throw NoSuchElementException("해당 id로 조회할 수 없습니다.")
        logger.info { "트랜잭션 호출 전 남은 재고 = $stockCountBeforeTransaction" }

        runCatching {
            stringRedisTemplate.execute{
                it.multi()
                //재고가 1인데 -1이면 0이 기대됨
                it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY.toByteArray(), STOCK_ID_TEST.toByteArray(), -1)
                //stringValue에 -1을 수행하면 예외가 발생한다
                it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY.toByteArray(), "1234".toByteArray(), -1)
                //재고가 0인데 -1이면 -1이 기대됨
                it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY.toByteArray(), STOCK_ID_TEST.toByteArray(), -1)

                return@execute it.exec()
            }
        }
        
        val stockCountAfterTransaction = stringRedisTemplate.opsForHash<String, String>().get(PRODUCT_ITEM_ID_KEY, STOCK_ID_TEST)
            ?: throw NoSuchElementException("해당 id로 조회할 수 없습니다.")
        logger.info { "트랜잭션 호출 후 남은 재고 = $stockCountAfterTransaction" }

    }

    companion object{
        const val PRODUCT_ITEM_ID_KEY = "productItem"
        const val STOCK_ID_TEST = "105"
    }

}

의도적으로 StringValue에 -1을 수행하여 예외를 발생시켜 보겠습니다.

  • 재고는 0이 될까요?
  • 재고는 -1이 될까요?
  • 재고는 1일까요?

정답은 -1입니다.

Redis Transaction의 특성 중 하나로 Rollback이 발생하지 않습니다.

이미 전달된 명령어들은 앞의 명령어의 실패와 상관없이 모두 실행됩니다.

(discard 되지 않는다)

 

runCatching을 활용하지 않는다면 다음과 같은 예외가 발생합니다.

Caused by: io.lettuce.core.RedisCommandExecutionException
: ERR hash value is not an integer

 

watch를 하던 중 예외가 발생하면?

watch란 Redis Transaction에서 CAS 연산을 사용하여 Optimistic Lock을 제공하는 명령어입니다.

처음에는 Redis Transaction이 원자성 연산이기 때문에 왜 watch 연산이 필요한지 잘 이해하지 못했습니다.

하지만 get 명령 같은 경우에는 null을 반환하기 때문에 Transaction 내부에서 사용할 수 없습니다.

따라서 같은 값에 대한 연산을 여러 개의 스레드가 Redis에 Transaction으로 요청하더라도 각 연산들은 묶여서 원자적으로 실행되나 이미 도달한 명령들에 의해서 예상치 못한 동작이 수행될 수 있습니다.

 

예를 들어 GET을 통해 재고 1이라는 값을 가져왔고 -1을 감소시키고자 했습니다.

비즈니스 정책상 "재고는 0 미만의 수가 될 수 없다"라고 정의되었고 트랜잭션으로 묶었기 때문에 원자적이라고 착각할 수 있습니다.

 

하지만 1일 때 감소시키는 트랜잭션 10개가 이미 Redis에 도달했다면 -9의 재고가 세팅됩니다.

이런 일을 막기 위해 다른 트랜잭션에 의해 변경되었다면 해당 명령을 수행하지 않음으로써 race condition을 방지할 수 있습니다.

 

watch 활용

stringRedisTemplate.execute{
	it.watch(PRODUCT_ITEM_ID_KEY.toByteArray())
	it.multi()
	//재고가 1인데 -5이면 -4가 기대됨
	it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY.toByteArray(), STOCK_ID_TEST.toByteArray(), -5)
	return@execute it.exec()
}

watch를 활용하면 초기재고가 1이고 여러 개의 스레드에도 동시에 요청하는 경우에도 -4라는 값으로 유지됩니다.

 

이때 watch에서 충돌이 발생하면 하위의 명령들은 몇 번 수행될까요? 또는 상위의 명령들을 몇 번 수행될까요?

stringRedisTemplate.execute{
	it.watch(PRODUCT_ITEM_ID_KEY.toByteArray())
	it.multi()

	//재고가 1이고 5를 증가시키면 몇 번 반영될까?
	it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY2.toByteArray(), "1234".toByteArray(), 5)
    
	//재고가 1인데 -5이면 CAS 연산으로 인해 한번만 적용되어 -4가 나올것으로 기대
	it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY.toByteArray(), STOCK_ID_TEST.toByteArray(), -5)
    
	//재고가 1이고 5를 증가시키면 몇 번 반영될까?
	it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY1.toByteArray(), "1234".toByteArray(), 5)
    
	return@execute it.exec()
}

테스트를 수행해 보면 나머지 연산들도 한 번씩 적용되어 6, 6이라는 결과가 나옵니다.

상위, 하위의 명령들도 각각 한 번씩만 적용되는 모습을 확인할 수 있습니다.

 

 

watch명령어를 활용하고 트랜잭션 내부에서 Key에 대해 2번 갱신이 가능할까?

stringRedisTemplate.execute{
	it.watch(PRODUCT_ITEM_ID_KEY.toByteArray())
	it.multi()
	it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY2.toByteArray(), "1234".toByteArray(), 5)
	//watch를 걸고 트랜잭션 내부에서 2번 갱신이 가능할까?
	it.hashCommands().hIncrBy(PRODUCT_ITEM_ID_KEY2.toByteArray(), "1234".toByteArray(), 100)
	return@execute it.exec()
}

초기 재고가 1일 때 +5, +100이 반영되어 106의 재고가 나오게 됩니다.

즉, 트랜잭션 내부에서 2번의 변경은 가능합니다.

 

마무리

Redis Transaction과 watch 명령어를 활용하여 동시성을 제어하는 것은 좋은 방법 중 하나이지만 한계점도 명확하게 드러납니다.

  • 내부에서 get을 해올 수 없다. 이로 인해 분기처리가 불가능하다
  • Redis Cluster Mode에서는 transaction을 활용할 수 없다. 
  • Transaction시 rollback이 나타나는 부분과 watch CAS 연산을 통해 충돌이 발생했을 때 어떤 명령까지가 반영되고 반영되지 않는지 파악해야 한다.

 

위의 한계점을 극복하기 위해 다음글에서는 LuaScript를 활용하여 동시성 제어를 시도해보겠습니다.

 

 

 

참고자료

https://dev.gmarket.com/69

https://gompangs.tistory.com/entry/Spring-Redis-Template-Transaction

https://willseungh0.tistory.com/201