-
Java Executors - ScheduledExecutorServiceJava/Executor Service 2023. 6. 15. 00:01
개요
Socket을 Connect 하고 5분 후 timeout을 발생시키기 위해 ScheduledExecutorService를 활용하게 되었고, 이에 대해 자세하게 알아보고자 합니다.
Executor 인터페이스
동시에 여러 요청을 처리해야 하는 경우 매번 새로운 쓰레드를 만드는 것은 비효율적입니다.
따라서 쓰레드를 미리 만들어두고 재사용하는 Thread Pool이라는 개념이 등장하게 되었고 Executor는 쓰레드 풀의 구현을 위한 인터페이스입니다.
public interface Executor { void execute(Runnable command); }
쓰레드는 크게 작업의 등록과 실행으로 나누어집니다.
이때 Executor는 등록된 작업(Runnable)을 실행하기 위한 인터페이스이며, 작업의 실행만을 책임집니다.
ExecutorService 인터페이스
ExecutorService는 작업을 등록, 실행을 위한 인터페이스입니다.
Executor를 상속받아 작업 등록뿐만 아니라 실행을 위한 책임도 가집니다.
대표적으로 ThreadPoolExecutor가 ExecutorService의 구현체이며, 내부에 있는 Blocking Queue에 작업들을 등록해 둡니다.
Executors란?
이 패키지에 정의된 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory 및 Callable 클래스에 대한 팩토리 및 유틸리티 메서드입니다. 이 클래스는 다음과 같은 종류의 메서드를 지원합니다:
val executorService = Executors.newSingleThreadScheduledExecutor()
예를 들어 이와 같은 형태로 Executors를 팩토리처럼 활용하여 ScheduledExecutorService를 만들어낼 수 있습니다.
newSingleThreadScheduledExecutor는 1개의 쓰레드만을 갖는 쓰레드 풀을 생성합니다.
ScheduledExecutorService 인터페이스
public interface ScheduledExecutorService extends ExecutorService
ExecutorService 인터페이스를 상속받고 있습니다.
ExecutorService는 작업의 등록, 실행을 담당하고 있습니다.
그러면 ScheduledExecutorSerivce는 여기에 추가적으로 특정 시간 이후에 또는 주기적으로 작업을 실행하는 책임을 가집니다.
ScheduledExecutorService의 메서드
schedule() : 지정된 지연 후에 활성화되는 원샷 액션을 생성하고 실행합니다.
scheduleAtFixedRate() : 특정 시간 이후 처음 작업을 실행시키고, 작업이 실행되고 특정 시간마다 작업을 실행시킵니다.
scheduleWithFixedDelay(): 특정 시간 이후 처음 작업을 실행시키고, 작업이 완료되면 특정 시간마다 작업을 실행시킵니다.
ScheduledExecutorSerivce의 동작과정
ScheduledExecutorSerivce는 Thread Pool위에서 동작합니다.
처음에 ScheduledExecutorSerivce를 생성할 때 Thread Pool에서 사용할 Thread의 개수를 지정할 수 있습니다.
이후 Task들을 스케쥴링하여 관리할 수 있으며 특정시간주기로 지속적으로 실행할 수 있습니다.
이때 task를 예약하게 되면ScheduledFuture객체를 반환받고, 이는 task를 취소하거나, 완료되었는지 감시하고 제어할 수 있습니다.
BlockingQueue가 scheduled된 작업들이 실행되기 전까지 소유하고 있습니다. 이때 ScheduledExecutorService는 BlokcingQueue로 DelayedWorkQueue를 사용합니다.
예를 들어 schedule()이란 메서드를 사용할 때, DelayedWorkQueue는 실행 시간에 따라 예약된 작업의 순서를 정합니다. 스레드 풀의 작업자 스레드는 실행할 작업을 얻기 위해 지속적으로 BlockingQueue를 폴링 합니다. 사용 가능한 작업이 없는 경우 작업자 스레드는 작업이 큐에 추가되거나 다음 작업에 대한 지정된 지연 시간이 지날 때까지 차단됩니다.
ScheduledExecutorService의 주의사항
"ScheduledThreadPoolExecutor는 ThreadPool을 사용하고 있으며, Timer가 설정한 주기에 맞춰 task를 유휴스레드가 수행한다"로 이해할 수 있습니다.
예를 들어 3초가 걸리는 Runnable task가 8개 쓰레드가 있는 ScheduledThreadPoolExecutor에 100ms 마다 수행한다면
0.8초가 지나면 쓰레드 8개가 전부 작업을 수행하고 있어야 합니다.
scheduledExecutorService.scheduleAtFixedRate(task, 0, 100, TimeUnit.MILLISECONDS);
위의 작업을 수행했을 때 0.1초마다 한 번씩 task를 수행시켜 주길 바랍니다.
하지만 실제로 수행해 보면 병렬적으로 수행되지 않고 순차적으로 수행됩니다.
2023-05-27T15:37:00.817+09:00 INFO 2781 --- [pool-2-thread-1] com.example.study.log.Logging : start 2023-05-27T15:37:03.821+09:00 INFO 2781 --- [pool-2-thread-1] com.example.study.log.Logging : finish 2023-05-27T15:37:03.822+09:00 INFO 2781 --- [pool-2-thread-1] com.example.study.log.Logging : start 2023-05-27T15:37:06.829+09:00 INFO 2781 --- [pool-2-thread-1] com.example.study.log.Logging : finish 2023-05-27T15:37:06.831+09:00 INFO 2781 --- [pool-2-thread-1] com.example.study.log.Logging : start 2023-05-27T15:37:09.832+09:00 INFO 2781 --- [pool-2-thread-1] com.example.study.log.Logging : finish
task 별로 따로 큐가 존재하며 주기가 같은 하나의 task는 하나의 스레드만 사용합니다. ThreadPool이 존재하는 이유는 주기가 다양한 task마다 스레드풀의 스레드를 재사용합니다.
공식 문서에서도 다음과 같은 설명이 존재합니다.
scheduleAtFixedRate 또는 scheduleWithFixedDelay를 통해 예약된 주기적 작업의 연속적인 실행은 겹치지 않습니다. 서로 다른 스레드에서 서로 다른 실행이 수행될 수 있지만, 이전 실행의 효과는 후속 실행의 효과보다 먼저 발생합니다.ScheduledExecutorService 병렬적으로 수행시키기
@RestController class ScheduledParallelController( private val scheduledExecutorServiceParallelConfig: ScheduledExecutorService, ) { @PostConstruct fun test(){ val sleepThreadSecondsTest = Runnable { logger.info { "start"} Thread.sleep(3000) logger.info { "finish"} } //0.1초 마다 task 생성을 기대한다. repeat( times = 4, action = {scheduledExecutorServiceParallelConfig.scheduleAtFixedRate( sleepThreadSecondsTest, 0, 100, TimeUnit.MILLISECONDS, )} ) } }
다음과 4개의 task를 실행시키게 된다면 병렬적으로 수행할 수 있습니다.
이제 여러 쓰레드가 병렬적으로 수행됨 확인할 수 있습니다.
2023-05-27T15:48:55.261+09:00 INFO 2839 --- [pool-2-thread-3] com.example.study.log.Logging : finish 2023-05-27T15:48:55.261+09:00 INFO 2839 --- [pool-2-thread-4] com.example.study.log.Logging : finish 2023-05-27T15:48:55.261+09:00 INFO 2839 --- [pool-2-thread-2] com.example.study.log.Logging : finish 2023-05-27T15:48:55.261+09:00 INFO 2839 --- [pool-2-thread-3] com.example.study.log.Logging : start 2023-05-27T15:48:55.261+09:00 INFO 2839 --- [pool-2-thread-1] com.example.study.log.Logging : finish 2023-05-27T15:48:55.261+09:00 INFO 2839 --- [pool-2-thread-2] com.example.study.log.Logging : start 2023-05-27T15:48:55.261+09:00 INFO 2839 --- [pool-2-thread-4] com.example.study.log.Logging : start 2023-05-27T15:48:55.261+09:00 INFO 2839 --- [pool-2-thread-1] com.example.study.log.Logging : start 2023-05-27T15:48:58.265+09:00 INFO 2839 --- [pool-2-thread-3] com.example.study.log.Logging : finish 2023-05-27T15:48:58.265+09:00 INFO 2839 --- [pool-2-thread-3] com.example.study.log.Logging : start 2023-05-27T15:48:58.266+09:00 INFO 2839 --- [pool-2-thread-2] com.example.study.log.Logging : finish 2023-05-27T15:48:58.266+09:00 INFO 2839 --- [pool-2-thread-2] com.example.study.log.Logging : start 2023-05-27T15:48:58.266+09:00 INFO 2839 --- [pool-2-thread-4] com.example.study.log.Logging : finish
ScheduledExecutorService 성능테스트
간단하게 ConcurrentHashMap을 통해 session을 저장하고, 5초 뒤에는 스케쥴링된 task를 통해 삭제하는 SessionManager를 통해 ScheduledExecutorService의 성능에 문제가 없는지 테스트해 보고자 합니다.
@Bean fun scheduledExecutorServicePerformanceConfig(): ScheduledExecutorService{ return Executors.newScheduledThreadPool(PERFORMANCE_SCHEDULED_THREAD_POOL_SIZE) }
Thread Pool Size는 2로 주었습니다.
@RestController class ScheduledPerformanceController( private val scheduledExecutorServicePerformanceConfig: ScheduledExecutorService, private val sessionManager: SessionManager, ) { @PostMapping("/performance") fun test(){ val userId = UUID.randomUUID().toString() sessionManager.registerSession(userId) //요청을 받으면 스케쥴링은 5초뒤에 시작되고, session을 제거한다. scheduledExecutorServicePerformanceConfig.schedule( { sessionManager.removeSession(userId) }, TIMEOUT_SECONDS, TimeUnit.SECONDS ) } companion object{ const val TIMEOUT_SECONDS = 5L } }
Thread Pool이 4개일 때
3개의 프로세스로 33개의 쓰레드로 99명의 가상사용자
에러 0건
평균 12000 TPS
평균 응답시간 3ms
10개의 프로세스로 300개의 쓰레드로 3000명 가상사용자
에러 439건
평균 1600 TPS
평균 응답시간 900ms
Treahd Pool이 50개일 때
3개의 프로세스로 33개의 쓰레드로 99명의 가상사용자
에러 0건
평균 12000 TPS
평균 응답시간 3ms
10개의 프로세스로 300개의 쓰레드로 3000명 가상사용자
에러 851건
평균 1900 TPS
평균 응답시간 1270ms
local 환경에서 진행하였고 위의 결과로 유의미한 판단을 내리기는 어렵지만, 어느 정도 요청은 처리하는데 무리 없음을 확인했습니다.
만약 에러건이 생기면 Session은 어떻게 관리될까?
성능테스트 후 session의 개수를 조회해 보았습니다.
session이 잘 제거되지 않으면 추후 OOM이 발생할 수 있습니다.
233건의 에러가 발생하였고 session-count를 조회해 보았지만 0개로 남아있지 않았습니다.
이를 통해 에러율과 session은 관계없을음 파악하였습니다.
github 링크
https://github.com/Junuu/executors-study
구현한 내용에 대해 정리한 github입니다.
여러 로그가 혼재해서 보기 힘든 경우 관심사가 아닌 Controller의 @RestController를 주석으로 처리하면 Bean으로 등록되지 않게 됩니다.
참고자료
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html
https://mangkyu.tistory.com/259
https://jeong-pro.tistory.com/188
'Java > Executor Service' 카테고리의 다른 글
Future를 활용하여 Timeout 구현하기 (0) 2024.04.07 ScheduledThreadPoolExecutor란? (1) 2024.03.17 ThreadPoolTaskExecutor란? (ThreadPoolTaskExecutor vs ThreadPoolExecutor) (0) 2023.10.21 ThreadPoolExecutor란? (0) 2023.10.20 ExecutorService란? (0) 2023.10.19