EMR을 이용한 자동 Spark 배치 만들기

회사에서 많은 AWS 서비스를 사용해보면서 어느 날 번뜩 떠오른 아이디어가 있었습니다. 사실 전문가 분들은 이미 많이 사용하고 있는 것 같았지만 왠지 내가 대단한 발견이라도 한 것 같고 나름의 성취감도 있었지요. AWS의 EMR을 이용하여 Spark 배치 작업을 자동으로 실행하고 작업이 종료되면 EMR도 종료하여 비용 절감까지 가능한 아키텍처를 살펴볼까 합니다.

기본적으로 사용하게 될 서비스는 다음과 같습니다.

    • CloudWatch Events
    • Lambda
    • EMR
    • S3

이제 이것들을 잘 묶기만 하면 됩니다.

Lambda로 EMR 생성

Lambda에서는 원하는 언어로 함수를 작성하여 손쉽게 실행할 수 있습니다. 여기에서 저는 Python과 AWS SDK를 이용하여 EMR을 만들어 보겠습니다. Python 3를 사용하고 boto3 패키지를 불러와서 아래와 같이 EMR 생성 코드를 작성해볼 수 있습니다.

import os
import boto3

aws_key = os.environ['AWS_KEY']
aws_skey = os.environ['AWS_SKEY']

def lambda_handler(event, context):
    session = boto3.session.Session(region_name='ap-northeast-2') 
    emr_client = session.client('emr', aws_access_key_id = aws_key, aws_secret_access_key = aws_skey)
    
    response = emr_client.run_job_flow(
        Name='<EMR Name>',
        LogUri='s3://log-bucket-name/log-folder/',
        ReleaseLabel='emr-5.21.0',
        Instances={
            'InstanceGroups': [
                {
                    'Name': 'master',
                    'Market': 'SPOT',
                    'InstanceRole': 'MASTER',
                    'InstanceType': 'm4.large',
                    'InstanceCount': 1
                },
                {
                    'Name': 'slave',
                    'Market': 'SPOT',
                    'InstanceRole': 'CORE',
                    'InstanceType': 'r3.xlarge',
                    'InstanceCount': 4
                }],
            'Ec2KeyName': '<Key pair name>',
            'KeepJobFlowAliveWhenNoSteps': False,
            'TerminationProtected': False,
            'Ec2SubnetId': '<subnet ID>',
            'AdditionalMasterSecurityGroups': [
                '<additional security group ID>'
            ]
        },
        Applications=[{
            'Name': 'Spark'
        }],
        VisibleToAllUsers=False,
        JobFlowRole='EMR_EC2_DefaultRole',
        ServiceRole='EMR_DefaultRole'
    )
    return response

위 코드를 넣고 테스트를 진행하면 EMR이 생성되는 것을 손쉽게 확인해볼 수 있습니다. 테스트의 이벤트는 아무거나 넣어주면 됩니다.

Spark 단계 추가

그러면, Spark 작업을 어떻게 자동으로 실행할 수 있을까요? 가장 간단한 방법은 EMR을 생성하면서 Steps (단계)를 추가하는 것입니다. 코드에 단계를 추가하기에 앞서, EMR에서 작업할 Spark jar 파일을 S3에 올려두는 것이 좋습니다.

Steps=[{
            'Name': 'Main',
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['spark-submit',
                    '--master', 'yarn', '--deploy-mode', 'client',
                    '--class', 'main class',
                    's3://bucket-name/folder/file.jar'
                ]
            }
        }],

command-runner.jar은 기본적으로 내장된 파일입니다. 이걸 통해서 spark-submit 단계를 추가할 수 있게 됩니다. 본래 Hadoop jar을 넣어서 자동 실행하는 것이지만 command-runner가 Spark로 명령을 전달해주는 것입니다. Spark jar 위치를 지정할 때, 해당 코드로 EMR을 만드는 계정이 S3에 접근할 수 있는 권한이 있어야 합니다.

단계를 실행하다가 실패할 경우에는 TERMINATE_CLUSTER 옵션을 통해서 EMR을 즉시 종료하게 됩니다. 메인 코드에서 ‘KeepJobFlowAliveWhenNoSteps’: False 옵션이 있어서 단계가 없거나 종료되면 EMR이 유지되지 않고 역시 자동 종료됩니다.

스케쥴

이제 EMR이 Spark 배치 작업을 수행할 수 있습니다. 그러면 스케쥴링만 걸어줘도 자동화를 달성할 수 있게 됩니다. Lambda 코드에 CloudWatch Events를 트리거로 걸어줍니다.

그리고 여기에는 cron 작업이나 rate 함수를 이용하여 주기적으로 이벤트를 발생시켜 Lambda를 실행하도록 할 수 있습니다. 그 외에도 다른 많은 트리거를 추가함으로써 EMR 생성 후에 자동으로 Spark 배치가 실행되도록 하는 것이 가능합니다.

마치며…

자, 이렇게 EMR을 필요할 때에만 생성하여 작업을 진행하고 종료시킴으로써 자동화와 비용 절감을 모두 잡았습니다. 다만 문제점이 몇 가지 있어서 알려드립니다.

    • Spark 배치 작업의 결과로 파일이 생성되면 EMR 종료와 함께 삭제됨.
    • 현재 작업중인 단계를 정확히 파악이 어려움.
    • EMR 생성시에 아이피가 랜덤하여 특정 API에 방화벽 등록이 어려움.

이러한 문제점을 해결하기 위해서는 추가적인 작업이 필요합니다. EMR에서 생성되는 파일은 반드시 S3로 저장하여 손실을 막고, 작업 이력은 EMR의 모니터링 항목에서 yarn application의 실행 상태를 확인함으로써 파악이 가능합니다. 아이피 문제는 Bootstrap 작업을 통해서 EMR이 생성되자마자 미리 준비해둔 EIP를 할당하는 쉘 스크립트를 실행해주면 됩니다.