Java/Executor Service

ScheduledThreadPoolExecutor란?

Junuuu 2024. 3. 17. 19:29

개요

ScheduledThreadPoolExecutor라는 개념을 알게 되어 이해해보고자 합니다.

 

해당 글을 읽기 전 Future, ThreadPool, ExecutorService, ThreadPoolExecutor 개념에 대해 잘 모른다면 다음 글을 읽고 오시면 좋습니다.

 

ScheduledThreadPoolExecutor란?

val fixedThreadPool = Executors.newFixedThreadPool(2)
val scheduledThreadPool: ExecutorService = Executors.newScheduledThreadPool(2)

//newScheduledThreadPool 내부 구현은 ScheduledThreadPoolExecutor로 이루어져 있다.
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
	return new ScheduledThreadPoolExecutor(corePoolSize);
}

만약 어떤 작업을 일정 시간 지연 후 실행하거나, 주기적으로 실행해야 한다면 ScheduledThreadPoolExecutor를 사용해볼 수 있습니다.

 

ThreadPoolExecutor를 활용하여 fixedThreadPool을 만들어내는 것 처럼 ScheduledThreadPoolExecutor를 활용하여 SchedueldExecutorService를 만들어 반환합니다.

 

// corePoolSize, maximumPoolSize가 동일, keepAliveTime은 0초, LinkedBlockingQueue 사용
public static ExecutorService newFixedThreadPool(int nThreads) {
	return new ThreadPoolExecutor(nThreads, nThreads,
		0L, TimeUnit.MILLISECONDS,
		new LinkedBlockingQueue<Runnable>()
	);
}

// corePoolSize, maximumPoolSize가 다름, keepAliveTime은 10초, DelayedWorkQueue 사용
public ScheduledThreadPoolExecutor(int corePoolSize) {
	super(corePoolSize, Integer.MAX_VALUE,
		DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
		new DelayedWorkQueue()
	);
}

 

super를 타고들어가면 내부적으로 ThreadPoolExecutor를 활용한다는 것은 동일하지만  DelayedWorkQueue를 사용하는 점, maximumPoolSize가 corePoolSize와 동일하지 않은 Integer.MAX_VALUE를 활용하는 점, keepAliveTime의 옵션값이 다르다는 차이점이 존재합니다.

 

 

BlockingQueue로 받는 인자가 다르다

public interface Collection<E> extends Iterable<E>
public interface Queue<E> extends Collection<E>
public interface BlockingQueue<E> extends Queue<E>
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>

ScheduledThreadPoolExecutor는 BlockingQueue의 인자로 DelayedWorkQueue를 넘겨주는데 LinkedBlokcingQueue와 어떤 차이점이 있을까요?

 

BlokcingQueue는 Queue를 구현하며, Queue는 Collection을 구현하고 Collection은 Iterable을 구현합니다.

LinkedBlokcingQueue와  DelayedWorkQueue는 모두 BlockingQueue의 인터페이스의 구현체입니다.

 

DelayedWorkQueue는 Delayed 인터페이스를 구현하는 제네릭 객체를 받으며 이를 위해서는 요소의 남은 지연 시간을 반환하는 getDelay 메서드를 제공해야 합니다.

 

public interface ScheduledFuture<V> extends Delayed, Future<V>
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V>
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>

실제로 Delayed 인터페이스에 디버깅을 찍고 살펴보면 java.util.concurrent 패키지의 SchedueldFuture 인터페이스가 Delayed와 Future를 구현하고 있습니다.

 

지연 시간이 가장 짧은 요소가 대기열의 맨 앞에 위치하게 됩니다.

작업은 Queue에 대기하다가 지연시간이 만료되면 실행됩니다.

 

SchedueldExecutorService가 제공하는 4가지 메서드

Delayed 인터페이스를 구현하는 객체를 제네릭으로 받아 DelayedWorkQueue를 활용하기 때문에 SecheduledExecutorService를 이용하는 경우에는 만약 어떤 작업을 일정 시간 지연 후 실행하거나, 주기적으로 실행해야 하는 경우에 활용할 수 있습니다.

 

ScheduledExecutorService는 4가지 메서드를 제공합니다.

  • schedule(Runnable command, long delay, TimeUnit unit) : 작업을 일정 시간 뒤에 한번 실행합니다.
  • schedule(Callable command, long delay, TimeUnit unit) : 작업을 일정 시간 뒤에 한번 실행하고, 그 결과를 리턴합니다.
  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) : 작업을 일정 시간 간격으로 반복적으로 실행시킵니다.
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit) : 작업이 완료되면 일정 시간 뒤에 다시 실행시킵니다. scheduleAtFixedRate()와 다른 점은 작업 종료시점이 기준이라는 것입니다.

 

schedule(Runnable)

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

반환값이 없는 Runnable을 인자로 받고 지연시킬 시간(delay)과 단위(TimeUnit)를 입력합니다.

이후에 ScheduledFuture<?>를 반환합니다.

 

scheduled()는 일정 시간 뒤에 Job을 실행시키는 메서드입니다.

 

@Test
fun `schedule (Runnable) 튜토리얼`(){
	val scheduledThreadPool: ScheduledExecutorService = Executors.newScheduledThreadPool(2)

	// Job을 스케쥴링합니다.
	println("Scheduled task : " + LocalTime.now())
	val runnable = Runnable { println("Runnable task : " + LocalTime.now()) }
	val delay = 3L

	val result: ScheduledFuture<*> = scheduledThreadPool.schedule(runnable, delay, TimeUnit.SECONDS)
	println(result.isDone) // false
	println(result.getDelay(TimeUnit.SECONDS)) //2
	println(result.get()) // null       
}

// 결과
Scheduled task : 13:47:28.120226900
false
2
Runnable task : 13:47:31.134501300
null

3초 뒤에 Runnable Task를 수행하고 ScheduledFuture를 반환받았습니다.

ScheduledFuture는 Future도 같이 구현하기 때문에 해당 작업이 끝났는지 isDone으로 확인할 수 있고 Delayed 인터페이스의 getDelay도 호출해 보면 몇 초가 남았는지도 확인할 수 있습니다.

3초 뒤에 실행된 후 Future 인터페이스의 get으로 결괏값을 가져오지만 Runnable Task는 반환값이 존재하지 않아 null이 반환됩니다.

 

 

schedule(Callable)

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

 

반환값이 존재하는 Callable을 인자로 받고 지연시킬 시간(delay)과 단위(TimeUnit)를 입력합니다.

이후에 ScheduledFuture<V>를 반환합니다.

 

scheduled()는 일정 시간 뒤에 Job을 실행시키는 메서드입니다.

 

@Test
fun `schedule (Callable) 튜토리얼`(){
	val scheduledThreadPool: ScheduledExecutorService = Executors.newScheduledThreadPool(2)

	// Job을 스케쥴링합니다.
	println("Scheduled task : " + LocalTime.now())
	val callable: Callable<String> = Callable { "Callable task : " + LocalTime.now()}
	val delay = 3L

	val result: ScheduledFuture<String> = scheduledThreadPool.schedule(callable, delay, TimeUnit.SECONDS)
	println(result.isDone)
	println(result.getDelay(TimeUnit.SECONDS))
	println(result.get())
}

// 결과
Scheduled task : 15:43:08.157142900
false
2
Callable task : 15:43:11.162979600

3초 뒤에 Callable Task를 수행하고 ScheduledFuture<String>를 반환받았습니다.

위의 Runnable 예제와 비슷하지만 get의 결괏값이 존재하기 때문에 해당 결과값을 출력할 수 있습니다.

또한 하나의 차이점은 Callable Task이서는 출력을 수행하기 않고 결과값 반환만 수행합니다.

(물론 출력을 수행하도록 할 수도 있습니다.)

 

scheduleAtFixedRate()

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit);

scheduleAtFixedRate()는 작업을 일정 시간 간격으로 실행시키는 메서드입니다.

scheduleAtFixedRate()에 전달되는 인자는 다음과 같습니다.

  • Runnable : 리턴 값이 없는 Runnable을 인자로 받습니다.
  • initialDelay : Job이 처음 실행될 때 기다리는 시간입니다.
  • delay : Job이 실행되는 시간 간격입니다.
  • TimeUnit : 시간 단위입니다.
@Test
fun `scheduleAtFixedRate (Runnable) 튜토리얼`(){
	val scheduledThreadPool: ScheduledExecutorService = Executors.newScheduledThreadPool(2)

	// Job을 스케쥴링합니다.
	println("Scheduled task : " + LocalTime.now())
	val runnable = Runnable {
		println("++ Repeat task : " + LocalTime.now())
		sleepSec(4)
		println("-- Repeat task : " + LocalTime.now())
	}
	val delay = 3L

	val result: ScheduledFuture<*> = scheduledThreadPool.scheduleAtFixedRate(runnable, 2L, delay, TimeUnit.SECONDS)
	println(result.isDone)
	println(result.getDelay(TimeUnit.SECONDS))
    
	sleepSec(10)
	result.cancel(true)

}

private fun sleepSec(sec: Int) {
	try {
		TimeUnit.SECONDS.sleep(sec.toLong())
	} catch (e: InterruptedException) {
		e.printStackTrace()
	}
}

설정값인 2초의 지연뒤에 Runnable Task를 수행합니다.

이후에는 3초 간격으로 해당 Task를 수행하는데 Runnable Task에서는 내부적으로 4초의 Sleep이 수행됩니다.

그리고 10초 뒤에 해당 스케쥴링을 cancel 하도록 구현하였습니다.

이런 경우 어떻게 될까요?

 

3초 간격으로 Task가 계속 수행될까요?

Scheduled task : 16:34:46.237058800
false
1
++ Repeat task : 16:34:48.245708800
-- Repeat task : 16:34:52.246435100
++ Repeat task : 16:34:52.246435100
-- Repeat task : 16:34:56.251723100

초기에 시작(46초)되고 2초의 지연(48) 초 후에 4초 간격으로 Task가 순차적으로 수행됩니다.

하나의 Task에 대해서 순차적으로 실행하며 스레드풀내의 유휴스레드를 병렬적으로 사용하지는 않습니다.

이후에 46초 -> 56초로 10초가 지나서 스케쥴링이 취소되어 더 이상 실행되지 않습니다.

만약 이때 Future<?>의 get을 호출해 버린다면 결과를 받아오기 위해 무한하게 실행되면서 하위 로직이 호출되지 않으므로 주의해야 합니다.

 

scheduleWithFixedDelay()

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit);

scheduleWithFixedDelay()는 작업이 완료된 후 일정 시간 뒤에 다시 작업을 실행시키는 메서드입니다.

scheduleWithFixedDelay()에 전달되는 인자는 다음과 같습니다.

  • Runnable : 리턴 값이 없는 Runnable을 인자로 받습니다.
  • initialDelay : Job이 처음 실행될 때 기다리는 시간입니다.
  • delay : Job이 완료된 후 다음 Job이 실행될 때까지 기다리는 시간입니다.
  • TimeUnit : 시간 단위입니다.
@Test
fun `scheduleWithFixedDelay (Runnable) 튜토리얼`(){
	val scheduledThreadPool: ScheduledExecutorService = Executors.newScheduledThreadPool(2)

	// Job을 스케쥴링합니다.
	println("Scheduled task : " + LocalTime.now())
	val runnable = Runnable {
		println("++ Repeat task : " + LocalTime.now())
		sleepSec(4)
		println("-- Repeat task : " + LocalTime.now())
	}
	val delay = 3L

	val result: ScheduledFuture<*> = scheduledThreadPool.scheduleWithFixedDelay(runnable, 2L, delay, TimeUnit.SECONDS)
	println(result.isDone)
	println(result.getDelay(TimeUnit.SECONDS))

	sleepSec(18)
	result.cancel(true)
}

설정값인 2초의 지연뒤에 Runnable Task를 수행합니다.

내부적으로 4초가 걸리는 작업이 끝난 후 3초 간격으로 Task를 수행하도록 설정하였습니다.

그리고 18초 뒤에 해당 스케쥴링을 cancel 하도록 구현하였습니다.

 

 

출력 결과

Scheduled task : 16:58:50.623786300
false
1
++ Repeat task : 16:58:52.635429600
-- Repeat task : 16:58:56.647167700
++ Repeat task : 16:58:59.658711100
-- Repeat task : 16:59:03.665751500
++ Repeat task : 16:59:06.681097200
java.lang.InterruptedException: sleep interrupted
	at java.base/java.lang.Thread.sleep(Native Method)
	at java.base/java.lang.Thread.sleep(Thread.java:337)
	at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
	at com.example.study.ScheduledThreadPoolExecutor.sleepSec(ScheduledThreadPoolExecutor.kt:73)
	at com.example.study.ScheduledThreadPoolExecutor.scheduleWithFixedDelay__Runnable__튜토리얼$lambda$3(ScheduledThreadPoolExecutor.kt:88)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
-- Repeat task : 16:59:08.636173400

초기 시작(50초) 후에 2초 뒤(52초)에 task가 수행됩니다.

이후 작업은 4초 뒤(56초)에 완료되며, 그로부터 3초 후(59초)에 다음 작업이 수행됩니다.

 

scheduleAtFixedRate 예제 작업에서는 예외가 발생하지 않은 이유가 2초 지연 + 4초 지연 + 4초 지연으로 10초에 딱 맞았기 때문에 발생하지 않았고 scheduleWithFixedDelay 예제에서는 cancel이 호출되는 시점에 Thread가 sleep 되고 있어서 예외가 발생하면서 sleep이 취소되며 맨 마지막 작업인 06초에 시작된 작업이 4초 뒤에 마무리되지 않고 2초 뒤인 08초에 마무리됩니다.

 

해당 작업에서는 의도적으로 sleep을 활용하였고 cancel이 호출될 때 InterruptedException이 되었지만 실제로 http 호출 등의 동작과정에서는 해당 요청이 모두 처리하고 스케쥴링된 Task를 종료합니다.

 

 

마무리

ScheduledThreadPoolExecutor에 대해 정리하다가 몇 개월 전에 같은 주제인 ScheduledExecutorService로 글을 정리했다는 사실을 알게 되었습니다..

해당 글에는 다루지 않은 동작과정, 동작 주의사항, 성능테스트를 다루기 때문에 같이 읽어봐도 좋을 것 같습니다.

 

 

참고자료

https://codechacha.com/ko/java-scheduled-thread-pool-executor/