ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Cloud AWS 3.0 사용하기 - SNS, SQS
    AWS 2023. 6. 8. 00:01

    개요

    AWS는 Amazon Web Server를 잘 이용할 수 있도록 Java SDK를 제공합니다.

    Java SDK는 AWS에서 사용할 수 있는 모든 기능을 제공하지만, 이를 Spring 관용적 방식으로 사용하려면 상당한 양의 로우레벨 코드가 필요합니다.

    Spring Cloud AWS는 애플리케이션 개발자가 가장 많이 사용되는 AWS 서비스를 사용하고 로우레벨 코드를 최대한 피할 수 있도록 이미 통합된 Spring 기반 모듈을 제공합니다.

     

     

    Spring Cloud AWS BOM 사용하기

    dependencies {
      //Spring Cloud AWS를 위한 의존성
      //https://docs.awspring.io/spring-cloud-aws/docs/3.0.0/reference/html/index.html
      implementation(enforcedPlatform("io.awspring.cloud:spring-cloud-aws-dependencies:3.0.0"))
      implementation("io.awspring.cloud:spring-cloud-aws-starter-sqs")
      implementation("io.awspring.cloud:spring-cloud-aws-starter-sns")
      implementation("io.awspring.cloud:spring-cloud-aws-starter")
    
      //Kubernetes AND AWS EKS환경에서 AWS Credentials를 사용
      implementation("software.amazon.awssdk:sts")
    }

    spring-cloud-aws-dependencies를 사용하면 종속성 버전을 직접 지정하고 유지 관리할 필요가 없어집니다.

    sns, sqs를 사용하기 위해 의존성을 추가합니다.

     

    spring cloud aws core(credential, region)을 관리하기 위해 aws-starter 의존성을 추가합니다.

    Spring AWS Cloud는 3가지 방법으로 AwsCredentialsProvider 인터페이스로 자격 증명을 반환합니다.

    1. DefaultCredentialsProvider
    2. StsWebIdentityTokenFileCredentialsProvider - recommended for EKS
    3. Custom AwsCredentialsProvider

     

    이때 2번째 방법을 dev, prod 환경에서 사용하기 위해 sts 의존성을 추가합니다.

     

    SnsClient , SqsAsyncClient 사용하기

    SnsTemaplte, Sns Operations등이 있지만 모든 하위 수준의 작업에 접근하려면 SnsClient를 활용하는것이 좋습니다.
    이전에 aws sdk v1을 사용할때는 직접 sns, sqs를 생성하고 연동하여 로컬에서 테스트하고 github actions가 빌드하도록 하였습니다.
    하지만 spring cloud aws를 사용하게 되었을때는 sns, sqs를 생성하는것 까지는 잘 되었지만 subscribe하는 부분에 권한이 없어 실패하였습니다.
    이에 따라 docker-compose up을 수행하고 sns, sqs를 만들고 구독하는 절차를 수행해주었습니다.
     

    docker-compose-start.sh

    docker-compose stop
    docker-compose up --detach
    ./init-sns-sqs.sh

     

    ./init-sns.sqs.sh

    #!/bin/bash
    
    # -- > Create 'Item Info' SNS topic
    echo Creating SNS topic  \'SignUpMemberTopic\'...
    aws --endpoint-url=http://localhost:4566 sns create-topic --name SignUpMemberTopic
    
    # -- > Create 'Item Info' SQS queue
    echo Creating SQS queue  \'MemberSessionSaveQueue\'...
    aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name MemberSessionSaveQueue
    
    # -- > Subscribing SQS queue 'Item Info' to SNS topic 'Item Info'
    echo Subscribing SQS queue \'MemberSessionSaveQueue\' to SNS topic \'SignUpMemberTopic\'...
    aws --endpoint-url=http://localhost:4566 sns subscribe --topic-arn arn:aws:sns:ap-northeast-2:000000000000:SignUpMemberTopic --protocol sqs --notification-endpoint arn:aws:sqs:ap-northeast-2:000000000000:MemberSessionSaveQueue --region ap-northeast-2
    
    # --> List SNS Topic
    echo Listing topics ...
    aws --endpoint-url=http://localhost:4566 sns list-topics
    
    
    # --> List SQS Queues
    echo Listing queues ...
    aws --endpoint-url=http://localhost:4566 sqs list-queues
    
    # --> List SNS Topic
    echo Listing subscriptions ...
    aws --endpoint-url=http://localhost:4566 sns list-subscriptions

    aws cli를 활용하여 직접 sns topic, sqs queue를 생성하고 subscribe 시켜줍니다.

     

    LocalAwsSnsConfig

    @Configuration
    @Profile("local")
    class LocalAwsSnsConfig {
      @Bean
      fun amazonSNS(): SnsClient {
        return SnsClient.builder()
          .endpointOverride(URI.create("http://localhost:4566"))
          .credentialsProvider(ProfileCredentialsProvider.create())
          .region(Region.AP_NORTHEAST_2)
          .build()
      }
    
    }

     

    LocalPublishSignUpEvent

    @Component
    @Profile("local")
    class LocalPublishSignUpEvent(
      private val snsClient: SnsClient,
    ) {
      @TransactionalEventListener
      fun awsSnsPublishTest(signUpEvent: SignUpEvent): PublishResponse {
        val subject = "TEST 제목"
        val message = mapOf(
          "id" to signUpEvent.id,
          "sessionId" to signUpEvent.sessionId,
        )
        val publishRequest = PublishRequest.builder()
          .topicArn("arn:aws:sns:ap-northeast-2:000000000000:SignUpMemberTopic")
          .message(ObjectMapper().writeValueAsString(message))
          .subject(subject)
          .build()
        val publishResult = snsClient.publish(publishRequest)
        println(publishResult)
        return publishResult
      }
    }

     

    AwsSqsLocalConfig

    @Configuration
    @Profile("local")
    class AwsSqsLocalConfig {
      @Bean
      fun sqsAsyncClient(): SqsAsyncClient {
        return SqsAsyncClient.builder()
          .endpointOverride(URI.create("http://localhost:4566"))
          .credentialsProvider(ProfileCredentialsProvider.create())
          .region(Region.AP_NORTHEAST_2)
          .build()
      }
    }

     

    AwsSqsCommonConfig

     

    @Configuration
    class AwsSqsCommonConfig {
      @Bean
      fun defaultSqsListenerContainerFactory(
        sqsAsyncClient: SqsAsyncClient,
        queueContainerTaskExecutor: AsyncTaskExecutor,
      ): SqsMessageListenerContainerFactory<Any> {
        return SqsMessageListenerContainerFactory
          .builder<Any>()
          .configure { options ->
            options
              .acknowledgementMode(AcknowledgementMode.ON_SUCCESS)
              .maxMessagesPerPoll(10)
              .pollTimeout(Duration.ofSeconds(5))
              .componentsTaskExecutor(queueContainerTaskExecutor)
              .queueNotFoundStrategy(QueueNotFoundStrategy.FAIL)
          }
          .sqsAsyncClient(sqsAsyncClient)
          .build()
      }
    
      @Bean
      fun queueContainerTaskExecutor(): AsyncTaskExecutor {
        val factory = MessageExecutionThreadFactory()
        val processor = Runtime.getRuntime().availableProcessors()
    
        val threadPoolTaskExecutor = ThreadPoolTaskExecutor()
        threadPoolTaskExecutor.setThreadFactory(factory)
        threadPoolTaskExecutor.setThreadNamePrefix("sqs-pool-task-")
        threadPoolTaskExecutor.corePoolSize = Math.max(processor, 5)
        threadPoolTaskExecutor.maxPoolSize = Math.min(processor * 10, 50)
        // No use of a thread pool executor queue to avoid retaining message to long in memory
        threadPoolTaskExecutor.queueCapacity = 0
        threadPoolTaskExecutor.afterPropertiesSet()
        return threadPoolTaskExecutor
      }
    }

    AcknowlegemetMode를 성공했을 때만 삭제하도록 defualtSqsListenerContainerFactory를 만들어서 설정합니다.

    이후 @SqsListener에서 factory로 defaultSqsListenerContainerFactory를 사용하여 listener의 설정과 스레드풀들을 설정합니다.

    특이한 점으로는 MessageExecutionThreadFactory를 통해 Executor를 생성해야만 합니다.

     

    MessagingLocalConfig

    @Configuration
    @Profile("local")
    class MessagingLocalConfig(
      private val sqsAsyncClient: SqsAsyncClient
    ) {
      @PostConstruct
      fun createQueue() {
        val request = CreateQueueRequest.builder()
          .queueName("MemberSessionSaveQueue")
          .build()
        sqsAsyncClient.createQueue(request)
      }
    }

    dev환경에서 build를 수행할 때는 이미 AWS환경에 MemberSessionSaveQueue가 존재하지만 github actions를 수행할 때는 localstack만 수행하도록 하고 코드단에서 MemberSessionSaveQueue를 생성하여 build가 정상적으로 수행하기 위한 용도로 넣어주었습니다.

     

    LocalSqsCoonsumeSignUpEvent

    @Component
    @Profile("local")
    class LocalSqsConsumeSignUpEvent(
      private val applicationEventPublisher: ApplicationEventPublisher,
    ) {
    
      @SqsListener(value = ["MemberSessionSaveQueue"], factory = "defaultSqsListenerContainerFactory")
      fun listen(@Payload message: String) {
        println("MemberSessionSaveQueue Message Consume Message: $message")
        // assuming the message is stored in a variable called "message"
        val mapper = jacksonObjectMapper()
        val json = mapper.readTree(message)
    
        // retrieve the SignUpEvent object
        val signUpEvent = json["Message"].asText()
    
        // parse the SignUpEvent object as a JSON object
        val signUpEventJson = mapper.readTree(signUpEvent)
    
        // retrieve the id and sessionId values
        val id = signUpEventJson["id"].asText()
        val sessionId = signUpEventJson["sessionId"].asText()
    
        println("---------------------------------------------")
        println("id: $id")
        println("sessionId: $sessionId")
        println("---------------------------------------------")
    
        applicationEventPublisher.publishEvent(
          SignUpEvent(
            id = id,
            sessionId = sessionId
          )
        )
        println("Spring 내부 이벤트 발행 성공")
      }
    }

     

    주의사항

    https://docs.awspring.io/spring-cloud-aws/docs/3.0.0/reference/html/appendix.html

    yml 설정이 달라져 신경써야합니다.

    저의 경우에는 test 코드를 위한 yml을 신경쓰지 못하였고 이로인해 github actions test에 계속 실패하였습니다.

     

     

     

     

     

    참고자료

    https://docs.awspring.io/spring-cloud-aws/docs/3.0.0/reference/html/index.html

     

    Spring Cloud AWS

    Secrets Manager helps to protect secrets needed to access your applications, services, and IT resources. The service enables you to easily rotate, manage, and retrieve database credentials, API keys, and other secrets throughout their lifecycle. Spring Clo

    docs.awspring.io

    https://medium.com/dandelion-tutorials/using-localstack-with-spring-cloud-for-amazon-web-services-d1f56744778a

     

    Using LocalStack with Spring Cloud for Amazon Web Services

    In this tutorial we integrate Spring Cloud AWS application with SNS and SQS services provided by LocalStack.

    medium.com

    https://www.hides.kr/1097

     

    Localstack으로 AWS SNS + SQS 로컬에서 도커로 테스트하는 방법

    개요 기본적으로 개발 환경에서 테스트를 진행할 때 dockerize하여 진행하는 방법을 선호한다. 케이스마다 다르겠지만 실제로 존재하는 서비스에 붙어서 진행하는게 아닌 모든걸 도커화하여 진행

    www.hides.kr

     

     

    'AWS' 카테고리의 다른 글

    [AWS] Amazon API Gateway란?  (0) 2023.06.17
    [AWS] SQS DLQ 설정하기  (0) 2023.06.13
    [AWS] sns, sqs aws와 연동해보기  (0) 2023.06.05
    [AWS] SQS 사용시 주의사항  (0) 2023.06.04
    [AWS] SpringBoot LocalStack으로 SNS, SQS 구성하기  (0) 2023.06.03

    댓글

Designed by Tistory.