ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [AWS] SQS 사용시 주의사항
    AWS 2023. 6. 4. 00:01

    SQS란?

    대기열(Queue)를 제공하여 분산 소프트웨어 시스템과 구성요소를 통합 및 분리할 수 있게 해주는 도구입니다.

     

     

    SQS 동작과정(기본 아키텍처)

    분산 메시징 시스템으로 이루어져 있습니다.

    https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-basic-architecture.html

    메세지는 한 지역 내의 여러 가용 영역에 걸쳐 분산 방식으로 저장됩니다.

    큐(메시지 A부터 E까지 보유)는 여러 Amazon SQS 서버에 메시지를 중복 저장합니다.

     

    Message Lifecycle

    https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-basic-architecture.html

    1. 생산자(component 1) 메세지 A를 대기열로 전송하고 이 메시지는 SQS 서버에 중복 분산되어 저장됩니다.

    2. 소비자(component 2)는 메시지를 대기열에서 소비합니다. 메시지 A가 처리되는 동안 대기열에는 그대로 존재하고 제한시간초과가 지속되는 동안 후속 수신 요청을 반환하지 않습니다.

    3. 소비자(component 2)는 제한시간초과가 만료되면 이 메시지가 삭제하여 다시 처리되지 못하도록 합니다. 

     

     

    Standard Queue Type을 사용할 시

    Amazon SQS는 중복성과 고가용성을 위해 여러 서버에 메시지 사본을 저장합니다.

    드물지만 메시지를 받거나 삭제할 때 메시지 사본을 저장하는 서버 중 하나를 사용할 수 없는 경우가 있을 수 있습니다.

    이 경우 사용할 수 없는 서버에서 메시지 사본이 삭제되지 않으며, 메시지를 받을 때 해당 메시지 사본을 다시 받을 수 있습니다.

     

    최소 1회의 메시지를 전송을 지원하기 때문에 2개 이상의 메시지 사본이 전송될 수 있습니다.

    즉, 멱등성을 고려해야 합니다.

     

    SQS 가시성 제한

    SQS에는 보기 제한 시간 = 가시성 제한이 존재합니다.

    가시성 제한은 메시지를 받은 뒤 특정 시간 동안 동일한 메시지를 다른 곳에서 꺼내 볼 수 없도록 제한하는 시간입니다.

    예를 들어 2대의 Server Instance가 존재하고 가시성 제한이 15초라고 생각해보겠습니다.

    이때 1대의 Instance에서 @SqsListener를 통해 메시지를 처리하고 있으면 다른 1대에서는 메시지를 처리할 수 없습니다.

    기본 표시 제한시간은 30초이며, 최솟값은 0초, 최대값은 12시간입니다.

     

    SQS를 사용할때는 중복메시지를 고려해야하기 때문에 멱등성을 고려해주어야 합니다.

    하지만 Multi Instance에서 한건만 처리해도 되는경우에는 굳이 메세지를 처리하는 리소스를 낭비할 필요는 없습니다.

     

    만약 여러 인스턴스에서 동시에 받아 동기화해야하는 상황이라면 가시성 제한을 0으로 설정해 모든 서버에서 메시지를 처리할 수 있도록 합니다.

     

    ThreadPoolTaskExecutor

    쓰레드풀을 별도로 설정하지 않으면 Queue마다 기본적으로 쓰레드풀을 생성합니다.

    DEFAULT_WORKER_THREADS의 기본값은 2입니다.

    protected AsyncTaskExecutor createDefaultTaskExecutor() {
    		String beanName = getBeanName();
    		ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    		threadPoolTaskExecutor.setThreadNamePrefix(
    				beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
    		int spinningThreads = this.getRegisteredQueues().size();
    
    		if (spinningThreads > 0) {
    			threadPoolTaskExecutor
    					.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);
    
    			int maxNumberOfMessagePerBatch = getMaxNumberOfMessages() != null
    					? getMaxNumberOfMessages() : DEFAULT_MAX_NUMBER_OF_MESSAGES;
    			threadPoolTaskExecutor
    					.setMaxPoolSize(spinningThreads * (maxNumberOfMessagePerBatch + 1));
    		}
    
    		// No use of a thread pool executor queue to avoid retaining message to long in
    		// memory
    		threadPoolTaskExecutor.setQueueCapacity(0);
    		threadPoolTaskExecutor.afterPropertiesSet();
    
    		return threadPoolTaskExecutor;
    
    	}

     

    Custom ThreadPool 생성

    @Configuration
    class AwsSqsCommonConfig {
    
      @Value("\${aws.sqs.parallel-size:10}")
      lateinit var parallelSize: String
    
      @Value("\${aws.sqs.long-polling-wait-seconds:10}")
      lateinit var longPollingWaitSeconds: String
    
      private val MAX_TIME_OUT = 20
    
      @Bean
      fun simpleMessageListenerContainerFactory(
        amazonSQSAws: AmazonSQSAsync,
        queueContainerTaskExecutor: AsyncTaskExecutor
      ): SimpleMessageListenerContainerFactory {
        val factory = SimpleMessageListenerContainerFactory()
        factory.setAmazonSqs(amazonSQSAws)
        factory.setMaxNumberOfMessages(parallelSize.toInt())
        factory.setWaitTimeOut(5)
        factory.setAutoStartup(true)
        factory.setTaskExecutor(queueContainerTaskExecutor)
        if (longPollingWaitSeconds.toInt() > 0) {
          factory.setWaitTimeOut(Math.min(MAX_TIME_OUT, longPollingWaitSeconds.toInt()))
        }
        return factory
      }
    
      @Bean
      fun queueContainerTaskExecutor(): AsyncTaskExecutor {
        val processor = Runtime.getRuntime().availableProcessors()
    
        val threadPoolTaskExecutor = ThreadPoolTaskExecutor()
        threadPoolTaskExecutor.setThreadNamePrefix("sqs-pool-task-")
        threadPoolTaskExecutor.corePoolSize = Math.max(processor, 5)
        threadPoolTaskExecutor.maxPoolSize = Math.min(processor * 10, 50)
        // No use of a thread pool executor queue to avoid retaining message to long in memory
        threadPoolTaskExecutor.setQueueCapacity(0)
        threadPoolTaskExecutor.afterPropertiesSet()
        return threadPoolTaskExecutor
      }
    }

     

     

     

     

     

     

     

     

     

    참고자료

    https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html

    'AWS' 카테고리의 다른 글

    Spring Cloud AWS 3.0 사용하기 - SNS, SQS  (0) 2023.06.08
    [AWS] sns, sqs aws와 연동해보기  (0) 2023.06.05
    [AWS] SpringBoot LocalStack으로 SNS, SQS 구성하기  (0) 2023.06.03
    AWS VPC란?  (0) 2023.05.23
    AWS MFA 설정  (0) 2023.05.05

    댓글

Designed by Tistory.