Java/Executor Service

ThreadPoolExecutor란?

Junuuu 2023. 10. 20. 00:01
728x90

ThreadPoolExecutor란?

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

Executors의 클래스의 메서드에서 고정된 스레드풀의 개수를 반환하는 newFixedThreadPool에서는 내부적으로 ThreadPoolExecutor를 생성하여 반환합니다.

 

newFixedThreadPool을 호출할 때는 스레드의 개수만 지정해 주면 되지만 내부적으로는 기본적인 값들이 세팅돼서 제공됩니다.

 

// Executors를 사용해서 간편하게 스레드풀 1개 생성
val executorService1: ExecutorService = Executors.newFixedThreadPool(1)

// ThreadPoolExecutor를 사용해서 디테일한 설정을 수행
val executorService: ExecutorService = ThreadPoolExecutor(
	1, // corePoolSize
	2, // maximumPoolSize
	60L, // keepAliveTime
	TimeUnit.MILLISECONDS, // timeUnit
	LinkedBlockingQueue<Runnable>(1), // workQueue
	CallerRunsPolicy() // handler
)

스레드 풀에서 사용할 수 있는 하나의 스레드를 이용하여 각 Task를 실행하는 ExecutorService입니다.

대부분 Executors에서 제공하는 Factory Method를 사용하는 것이 일반적이며 그렇지 않을 경우 수동으로 설정을 선택합니다.

 

수동으로 설정하는 경우 디테일한 설정들을 넣어줄 수 있게 됩니다.

설정에 대한 자세한 소개들은 아래에서 다룹니다.

 

AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService {

	...
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }	
    ...
}

AbstractExecutorService는 ExecutorService 인터페이스를 구현하는 추상클래스입니다.

Runnable객체인 task를 받아 RunnableFuture 객체로 만들고 Executor 인터페이스의 execute메서드를 수행하여 반환합니다.

Callable객체인 task를 받는 경우에는 RunnableFuture가 Void가 아닌 객체타입을 가집니다.

 

 

ThreadPoolExecutor 생성시 설정

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
  • corePoolSize : 풀에 유지할 스레드 수
  • maximumPoolSize : 풀에 유지할 최대 스레드 수
  • keepAliveTime : corePoolSize보다 스레드가 큰 경우 유휴 상태가 유지되는 시간보다 더 길면 초과 스레드가 종료됩니다.
  • TimeUnit : keepAliveTime에 사용될 시간의 단위를 설정합니다.
  • BlockingQueue<Runnable> : corePoolSize보다 이상의 스레드가 실행 중이라면 새 스레드를 추가하는 것보다 요청을 큐에 대기시킵니다. 요청을 대기열에 넣을 수 없는 경우 maximumPoolSize을 초과하지 않는 한 새 스레드가 생성됩니다.
    • SynchronousQueue, LinkedBlockingQueue, ArrayBlockingQueue를 활용합니다.
    • SynchronousQueue -> 바로바로 처리
    • LinkedBlockingQueue -> 크기가 동적이고 용량이 무한대
    • ArrayBlockingQueue -> 크기가 고정

 

  • ThreadFactory : 보통 defaultThreadFactory를 사용하며 데몬이 아닌 상태로 우선순위는 5인 스레드를 생성합니다.
  • RejectedExecutionHandler : 모든 스레드가 사용되고 있고, work Queue가 포화상태라면 새 작업은 거부됩니다. 이때 다양한 정책을 설정할 수 있습니다.
    • AbrotPolicy:  refectedExecution 예외발생시키기 (기본 전략)
    • CallerRunsPolicy: 실행을 호출하는 스레드가 스스로 작업을 실행
    • DiscardPolicy: 현재 요청 Task 삭제됨
    • DiscardOldestPolocy: 가장 오래된 Task 삭제

 

 

보통 corePoolSize, maximumPoolSize는 생성 시에 설정되지만 동적으로 변경도 가능합니다.

 

ThreadPoolExecutor 설정 테스트

@Component
class ThreadPoolExecutor: ApplicationRunner {

    override fun run(args: ApplicationArguments) {
        val executorService: ExecutorService = ThreadPoolExecutor(
            1,
            2,
            60L,
            TimeUnit.MILLISECONDS,
            SynchronousQueue<Runnable>(),
//            CallerRunsPolicy()
        )

        val myTaskProcessOneSeconds = {
            logger.info{"1초가 걸립니다."}
            Thread.sleep(1000)
        }

        executorService.submit(myTaskProcessOneSeconds)
        executorService.submit(myTaskProcessOneSeconds)
        executorService.submit(myTaskProcessOneSeconds)

//        executorService.awaitTermination(10L, TimeUnit.SECONDS)
    }
}

corePoolSize = 1

maximumPoolSize = 2

그리고 기본 RejectedExecutionHandler인 AbortPolicy를 적용해 보면 RejectedExecutionException이 발생합니다.

(스레드는 최대 2개인데 Queue의 개수를 초과하여 3개의 스레드를 실행하려 했기 때문입니다.)

 

 

이번에는 AbortPolicy가 아닌 CallerRunsPolicy로 설정해 보았습니다.

2023-09-30T02:41:32.762+09:00  INFO 2455 --- [pool-6-thread-1] com.example.study.log.Logging            : 1초가 걸립니다.
2023-09-30T02:41:32.762+09:00  INFO 2455 --- [pool-6-thread-2] com.example.study.log.Logging            : 1초가 걸립니다.
2023-09-30T02:41:32.762+09:00  INFO 2455 --- [           main] com.example.study.log.Logging            : 1초가 걸립니다.

2개는 ExecutorService에서 실행되었고, 하나의 Task는 사용가능한 스레드풀의 최대개수 초과하여 main스레드에서 실행되었습니다.

 

 

 

다시 AbortPolicy 설정 + LinkedBlockingQueue<Runnable>(1)

이제 초과되는 task가 큐에 저장되기 때문에 3개의 task까지 예외가 발생하지 않습니다.

하지만 task가 4개가 되는 순간 다시 RejectedExecutionException가 발생합니다.

 

마무리

이제 corePoolSize, maximumPoolSize, workQueue 사이즈의 관계를 이해할 수 있게 되었습니다.

1. 요청을 들어오게 되면 corePoolSize까지 스레드의 개수를 늘린다.

2. 만약 corePoolSize가 가득 차면 workQueue 사이즈까지 작업을 채운다.

3. 만약 workQueue 사이즈가 가득 차면 maximumPoolSize까지 스레드의 개수를 늘린다.

4. 그래도 초과하게 되면 RejectedExecutionHandler에서 Task에 대해 예외 또는 작업을 호출하는 스레드에서 처리한다.