Use Batch Processing Gateway to automate job management in multi-cluster Amazon EMR on EKS environments

Use Batch Processing Gateway to automate job management in multi-cluster Amazon EMR on EKS environments


AWS customers often process petabytes of data using Amazon EMR on EKS. In enterprise environments with diverse workloads or varying operational requirements, customers frequently choose a multi-cluster setup due to the following advantages:

  • Better resiliency and no single point of failure – If one cluster fails, other clusters can continue processing critical workloads, maintaining business continuity
  • Better security and isolation – Increased isolation between jobs enhances security and simplifies compliance
  • Better scalability – Distributing workloads across clusters enables horizontal scaling to handle peak demands
  • Performance benefits – Minimizing Kubernetes scheduling delays and network bandwidth contention improves job runtimes
  • Increased flexibility – You can enjoy straightforward experimentation and cost optimization through workload segregation to multiple clusters

However, one of the disadvantages of a multi-cluster setup is that there is no straightforward method to distribute workloads and support effective load balancing across multiple clusters. This post proposes a solution to this challenge by introducing the Batch Processing Gateway (BPG), a centralized gateway that automates job management and routing in multi-cluster environments.

Challenges with multi-cluster environments

In a multi-cluster environment, Spark jobs on Amazon EMR on EKS need to be submitted to different clusters from various clients. This architecture introduces several key challenges:

  • Endpoint management – Clients must maintain and update connections for each target cluster
  • Operational overhead – Managing multiple client connections individually increases the complexity and operational burden
  • Workload distribution – There is no built-in mechanism for job routing across multiple clusters, which impacts configuration, resource allocation, cost transparency, and resilience
  • Resilience and high availability – Without load balancing, the environment lacks fault tolerance and high availability

BPG addresses these challenges by providing a single point of submission for Spark jobs. BPG automates job routing to the appropriate EMR on EKS clusters, providing effective load balancing, simplified endpoint management, and improved resilience. The proposed solution is particularly beneficial for customers with multi-cluster Amazon EMR on EKS setups using the Spark Kubernetes Operator with or without Yunikorn scheduler.

However, although BPG offers significant benefits, it is currently designed to work only with Spark Kubernetes Operator. Additionally, BPG has not been tested with the Volcano scheduler, and the solution is not applicable in environments using native Amazon EMR on EKS APIs.

Solution overview

Martin Fowler describes a gateway as an object that encapsulates access to an external system or resource. In this case, the resource is the EMR on EKS clusters running Spark. A gateway acts as a single point to confront this resource. Any code or connection interacts with the interface of the gateway only. The gateway then translates the incoming API request into the API offered by the resource.

BPG is a gateway specifically designed to provide a seamless interface to Spark on Kubernetes. It’s a REST API service to abstract the underlying Spark on EKS clusters details from users. It runs in its own EKS cluster communicating to Kubernetes API servers of different EKS clusters. Spark users submit an application to BPG through clients, then BPG routes the application to one of the underlying EKS clusters.

The process for submitting Spark jobs using BPG for Amazon EMR on EKS is as follows:

  1. The user submits a job to BPG using a client.
  2. BPG parses the request, translates it into a custom resource definition (CRD), and submits the CRD to an EMR on EKS cluster according to predefined rules.
  3. The Spark Kubernetes Operator interprets the job specification and initiates the job on the cluster.
  4. The Kubernetes scheduler schedules and manages the run of the jobs.

The following figure illustrates the high-level details of BPG. You can read more about BPG in the GitHub README.

Image showing the high-level details of Batch Processing Gateway

The proposed solution involves implementing BPG for multiple underlying EMR on EKS clusters, which effectively resolves the drawbacks discussed earlier. The following diagram illustrates the details of the solution.

Image showing the end to end architecture of of Batch Processing Gateway

Source Code

You can find the code base in the AWS Samples and Batch Processing Gateway GitHub repository.

In the following sections, we walk through the steps to implement the solution.

Prerequisites

Before you deploy this solution, make sure the following prerequisites are in place:

Clone the repositories to your local machine

We assume that all repositories are cloned into the home directory (~/). All relative paths provided are based on this assumption. If you have cloned the repositories to a different location, adjust the paths accordingly.

  1. Clone the BPG on EMR on EKS GitHub repo with the following command:
cd ~/
git clone git@github.com:aws-samples/batch-processing-gateway-on-emr-on-eks.git

The BPG repository is currently under active development. To provide a stable deployment experience consistent with the provided instructions, we have pinned the repository to the stable commit hash aa3e5c8be973bee54ac700ada963667e5913c865.

Before cloning the repository, verify any security updates and adhere to your organization’s security practices.

  1. Clone the BPG GitHub repo with the following command:
git clone git@github.com:apple/batch-processing-gateway.git
cd batch-processing-gateway
git checkout aa3e5c8be973bee54ac700ada963667e5913c865

Create two EMR on EKS clusters

The creation of EMR on EKS clusters is not the primary focus of this post. For comprehensive instructions, refer to Running Spark jobs with the Spark operator. However, for your convenience, we have included the steps for setting up the EMR on EKS virtual clusters named spark-cluster-a-v and spark-cluster-b-v in the GitHub repo. Follow these steps to create the clusters.

After successfully completing the steps, you should have two EMR on EKS virtual clusters named spark-cluster-a-v and spark-cluster-b-v running on the EKS clusters spark-cluster-a and spark-cluster-b, respectively.

To verify the successful creation of the clusters, open the Amazon EMR console and choose Virtual clusters under EMR on EKS in the navigation pane.

Image showing the Amazon EMR on EKS setup

Set up BPG on Amazon EKS

To set up BPG on Amazon EKS, complete the following steps:

  1. Change to the appropriate directory:
cd ~/batch-processing-gateway-on-emr-on-eks/bpg/

  1. Set up the AWS Region:
export AWS_REGION="<AWS_REGION>"

  1. Create a key pair. Make sure you follow your organization’s best practices for key pair management.
aws ec2 create-key-pair \
--region "$AWS_REGION" \
--key-name ekskp \
--key-type ed25519 \
--key-format pem \
--query "KeyMaterial" \
--output text > ekskp.pem
chmod 400 ekskp.pem
ssh-keygen -y -f ekskp.pem > eks_publickey.pem
chmod 400 eks_publickey.pem

Now you’re ready to create the EKS cluster.

By default, eksctl creates an EKS cluster in dedicated virtual private clouds (VPCs). To avoid reaching the default soft limit on the number of VPCs in an account, we use the --vpc-public-subnets parameter to create clusters in an existing VPC. For this post, we use the default VPC for deploying the solution. Modify the following code to deploy the solution in the appropriate VPC in accordance with your organization’s best practices. For official guidance, refer to Create a VPC.

  1. Get the public subnets for your VPC:
export DEFAULT_FOR_AZ_SUBNET=$(aws ec2 describe-subnets --region "$AWS_REGION" --filters "Name=default-for-az,Values=true" --query "Subnets[?AvailabilityZone != 'us-east-1e'].SubnetId" | jq -r '. | map(tostring) | join(",")')

  1. Create the cluster:
eksctl create cluster \
--name bpg-cluster \
--region "$AWS_REGION" \
--vpc-public-subnets "$DEFAULT_FOR_AZ_SUBNET" \
--with-oidc \
--ssh-access \
--ssh-public-key eks_publickey.pem \
--instance-types=m5.xlarge \
--managed

  1. On the Amazon EKS console, choose Clusters in the navigation pane and check for the successful provisioning of the bpg-cluster

Image showing the Amazon EKS based BPG cluster setup

In the next steps, we make the following changes to the existing batch-processing-gateway code base:

For your convenience, we have provided the updated files in the batch-processing-gateway-on-emr-on-eks repository. You can copy these files into the batch-processing-gateway repository.

  1. Replace POM xml file:
cp ~/batch-processing-gateway-on-emr-on-eks/bpg/pom.xml ~/batch-processing-gateway/pom.xml

  1. Replace DAO java file:
cp ~/batch-processing-gateway-on-emr-on-eks/bpg/LogDao.java ~/batch-processing-gateway/src/main/java/com/apple/spark/core/LogDao.java

  1. Replace the Dockerfile:
cp ~/batch-processing-gateway-on-emr-on-eks/bpg/Dockerfile ~/batch-processing-gateway/Dockerfile

Now you’re ready to build your Docker image.

  1. Create a private Amazon Elastic Container Registry (Amazon ECR) repository:
aws ecr create-repository --repository-name bpg --region "$AWS_REGION"

  1. Get the AWS account ID:
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)

  1. Authenticate Docker to your ECR registry:
aws ecr get-login-password --region "$AWS_REGION" | docker login --username AWS --password-stdin "$AWS_ACCOUNT_ID".dkr.ecr."$AWS_REGION".amazonaws.com

  1. Build your Docker image:
cd ~/batch-processing-gateway/
docker build \
--platform linux/amd64 \
--build-arg VERSION="1.0.0" \
--build-arg BUILD_TIME=$(date -u +"%Y-%m-%dT%H:%M:%SZ") \
--build-arg GIT_COMMIT=$(git rev-parse HEAD) \
--progress=plain \
--no-cache \
-t bpg:1.0.0 .

  1. Tag your image:
docker tag bpg:1.0.0 "$AWS_ACCOUNT_ID".dkr.ecr."$AWS_REGION".amazonaws.com/bpg:1.0.0

  1. Push the image to your ECR repository:
docker push "$AWS_ACCOUNT_ID".dkr.ecr."$AWS_REGION".amazonaws.com/bpg:1.0.0

The ImagePullPolicy in the batch-processing-gateway GitHub repo is set to IfNotPresent. Update the image tag in case you need to update the image.

  1. To verify the successful creation and upload of the Docker image, open the Amazon ECR console, choose Repositories under Private registry in the navigation pane, and locate the bpg repository:

Image showing the Amazon ECR setup

Set up an Amazon Aurora MySQL database

Complete the following steps to set up an Amazon Aurora MySQL-Compatible Edition database:

  1. List all default subnets for the given Availability Zone in a specific format:
DEFAULT_FOR_AZ_SUBNET_RFMT=$(aws ec2 describe-subnets --region "$AWS_REGION" --filters "Name=default-for-az,Values=true" --query "Subnets[*].SubnetId" | jq -c '.')

  1. Create a subnet group. Refer to create-db-subnet-group for more details.
aws rds create-db-subnet-group \
--db-subnet-group-name bpg-rds-subnetgroup \
--db-subnet-group-description "BPG Subnet Group for RDS" \
--subnet-ids "$DEFAULT_FOR_AZ_SUBNET_RFMT" \
--region "$AWS_REGION"

  1. List the default VPC:
export DEFAULT_VPC=$(aws ec2 describe-vpcs --region "$AWS_REGION" --filters "Name=isDefault,Values=true" --query "Vpcs[0].VpcId" --output text)

  1. Create a security group:
aws ec2 create-security-group \
--group-name bpg-rds-securitygroup \
--description "BPG Security Group for RDS" \
--vpc-id "$DEFAULT_VPC" \
--region "$AWS_REGION"

  1. List the bpg-rds-securitygroup security group ID:
export BPG_RDS_SG=$(aws ec2 describe-security-groups --filters "Name=group-name,Values=bpg-rds-securitygroup" --query "SecurityGroups[*].GroupId" --output text)

  1. Create the Aurora DB Regional cluster. Refer to create-db-cluster for more details.
aws rds create-db-cluster \
--database-name bpg \
--db-cluster-identifier bpg \
--engine aurora-mysql \
--engine-version 8.0.mysql_aurora.3.06.1 \
--master-username admin \
--manage-master-user-password \
--db-subnet-group-name bpg-rds-subnetgroup \
--vpc-security-group-ids "$BPG_RDS_SG" \
--region "$AWS_REGION"

  1. Create a DB Writer instance in the cluster. Refer to create-db-instance for more details.
aws rds create-db-instance \
--db-instance-identifier bpg \
--db-cluster-identifier bpg \
--db-instance-class db.r5.large \
--engine aurora-mysql \
--region "$AWS_REGION"

  1. To verify the successful creation of the RDS Regional cluster and Writer instance, on the Amazon RDS console, choose Databases in the navigation pane and check for the bpg database.

Image showing the RDS setup

Set up network connectivity

Security groups for EKS clusters are typically associated with the nodes and the control plane (if using managed nodes). In this section, we configure the networking to allow the node security group of the bpg-cluster to communicate with spark-cluster-a, spark-cluster-b, and the bpg Aurora RDS cluster.

  1. Identify the security groups of bpg-cluster, spark-cluster-a, spark-cluster-b, and the bpg Aurora RDS cluster:
# Identify Node Security Group of the bpg-cluster
BPG_CLUSTER_NODEGROUP_SG=$(aws ec2 describe-instances \
--filters Name=tag:eks:cluster-name,Values=bpg-cluster \
--query "Reservations[*].Instances[*].SecurityGroups[?contains(GroupName, 'eks-cluster-sg-bpg-cluster-')].GroupId" \
--region "$AWS_REGION" \
--output text | uniq)

# Identify Cluster security group of spark-cluster-a and spark-cluster-b
SPARK_A_CLUSTER_SG=$(aws eks describe-cluster --name spark-cluster-a --query "cluster.resourcesVpcConfig.clusterSecurityGroupId" --output text)
SPARK_B_CLUSTER_SG=$(aws eks describe-cluster --name spark-cluster-b --query "cluster.resourcesVpcConfig.clusterSecurityGroupId" --output text)

# Identify Cluster security group of bpg Aurora RDS cluster Writer Instance
BPG_RDS_WRITER_SG=$(aws ec2 describe-security-groups --filters "Name=group-name,Values=bpg-rds-securitygroup" --query "SecurityGroups[*].GroupId" --output text)

  1. Allow the node security group of the bpg-cluster to communicate with spark-cluster-a, spark-cluster-b, and the bpg Aurora RDS cluster:
# spark-cluster-a
aws ec2 authorize-security-group-ingress --group-id "$SPARK_A_CLUSTER_SG" --protocol tcp --port 443 --source-group "$BPG_CLUSTER_NODEGROUP_SG"

# spark-cluster-b
aws ec2 authorize-security-group-ingress --group-id "$SPARK_B_CLUSTER_SG" --protocol tcp --port 443 --source-group "$BPG_CLUSTER_NODEGROUP_SG"

# bpg-rds
aws ec2 authorize-security-group-ingress --group-id "$BPG_RDS_WRITER_SG" --protocol tcp --port 3306 --source-group "$BPG_CLUSTER_NODEGROUP_SG"

Deploy BPG

We deploy BPG for weight-based cluster selection. spark-cluster-a-v and spark-cluster-b-v are configured with a queue named dev and weight=50. We expect statistically equal distribution of jobs between the two clusters. For more information, refer to Weight Based Cluster Selection.

  1. Get the bpg-cluster context:
BPG_CLUSTER_CONTEXT=$(kubectl config view --output=json | jq -r '.contexts[] | select(.name | contains("bpg-cluster")) | .name')
kubectl config use-context "$BPG_CLUSTER_CONTEXT"

  1. Create a Kubernetes namespace for BPG:
kubectl create namespace bpg

The helm chart for BPG requires a values.yaml file. This file includes various key-value pairs for each EMR on EKS clusters, EKS cluster, and Aurora cluster. Manually updating the values.yaml file can be cumbersome. To simplify this process, we’ve automated the creation of the values.yaml file.

  1. Run the following script to generate the values.yaml file:
cd ~/batch-processing-gateway-on-emr-on-eks/bpg
chmod 755 create-bpg-values-yaml.sh
./create-bpg-values-yaml.sh

  1. Use the following code to deploy the helm chart. Make sure the tag value in both values.template.yaml and values.yaml matches the Docker image tag specified earlier.
cp ~/batch-processing-gateway/helm/batch-processing-gateway/values.yaml ~/batch-processing-gateway/helm/batch-processing-gateway/values.yaml.$(date +'%Y%m%d%H%M%S') \
&& cp ~/batch-processing-gateway-on-emr-on-eks/bpg/values.yaml ~/batch-processing-gateway/helm/batch-processing-gateway/values.yaml \
&& cd ~/batch-processing-gateway/helm/batch-processing-gateway/

kubectl config use-context "$BPG_CLUSTER_CONTEXT"

helm install batch-processing-gateway . --values values.yaml -n bpg

  1. Verify the deployment by listing the pods and viewing the pod logs:
kubectl get pods --namespace bpg
kubectl logs <BPG-PODNAME> --namespace bpg

  1. Exec into the BPG pod and verify the health check:
kubectl exec -it <BPG-PODNAME> -n bpg -- bash 
curl -u admin:admin localhost:8080/skatev2/healthcheck/status

We get the following output:

{"status":"OK"}

BPG is successfully deployed on the EKS cluster.

Test the solution

To test the solution, you can submit multiple Spark jobs by running the following sample code multiple times. The code submits the SparkPi Spark job to the BPG, which in turn submits the jobs to the EMR on EKS cluster based on the set parameters.

  1. Set the kubectl context to the bpg cluster:
kubectl config get-contexts | awk 'NR==1 || /bpg-cluster/'
kubectl config use-context "<CONTEXT_NAME>"

  1. Identify the bpg pod name:
kubectl get pods --namespace bpg

  1. Exec into the bpg pod:

kubectl exec -it "<BPG-PODNAME>" -n bpg -- bash

  1. Submit multiple Spark jobs using the curl. Run the below curl command to submit jobs to spark-cluster-a and spark-cluster-b:
curl -u user:pass localhost:8080/skatev2/spark -i -X POST \
-H 'Content-Type: application/json' \
-d '{
"applicationName": "SparkPiDemo",
"queue": "dev",
"sparkVersion": "3.5.0",
"mainApplicationFile": "local:///usr/lib/spark/examples/jars/spark-examples.jar",
"mainClass":"org.apache.spark.examples.SparkPi",
"driver": {
"cores": 1,
"memory": "2g",
"serviceAccount": "emr-containers-sa-spark",
"labels":{
"version": "3.5.0"
}
},
"executor": {
"instances": 1,
"cores": 1,
"memory": "2g",
"labels":{
"version": "3.5.0"
}
}
}'

After each submission, BPG will inform you of the cluster to which the job was submitted. For example:

HTTP/1.1 200 OK
Date: Sat, 10 Aug 2024 16:17:15 GMT
Content-Type: application/json
Content-Length: 67
{"submissionId":"spark-cluster-a-f72a7ddcfde14f4390194d4027c1e1d6"}
{"submissionId":"spark-cluster-a-d1b359190c7646fa9d704122fbf8c580"}
{"submissionId":"spark-cluster-b-7b61d5d512bb4adeb1dd8a9977d605df"}

  1. Verify that the jobs are running in the EMR cluster spark-cluster-a and spark-cluster-b:
kubectl config get-contexts | awk 'NR==1 || /spark-cluster-(a|b)/'
kubectl get pods -n spark-operator --context "<CONTEXT_NAME>"

You can view the Spark Driver logs to find the value of Pi as shown below:

kubectl logs <SPARK-DRIVER-POD-NAME> --namespace spark-operator --context "<CONTEXT_NAME>"

After successful completion of the job, you should be able to see the below message in the logs:

Pi is roughly 3.1452757263786317

We have successfully tested the weight-based routing of Spark jobs across multiple clusters.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the EMR on EKS virtual cluster:
VIRTUAL_CLUSTER_ID=$(aws emr-containers list-virtual-clusters --region="$AWS_REGION" --query "virtualClusters[?name=='spark-cluster-a-v' && state=='RUNNING'].id" --output text)
aws emr-containers delete-virtual-cluster --region="$AWS_REGION" --id "$VIRTUAL_CLUSTER_ID"
VIRTUAL_CLUSTER_ID=$(aws emr-containers list-virtual-clusters --region="$AWS_REGION" --query "virtualClusters[?name=='spark-cluster-b-v' && state=='RUNNING'].id" --output text)
aws emr-containers delete-virtual-cluster --region="$AWS_REGION" --id "$VIRTUAL_CLUSTER_ID"

  1. Delete the AWS Identity and Access Management (IAM) role:
aws iam delete-role-policy --role-name sparkjobrole --policy-name EMR-Spark-Job-Execution
aws iam delete-role --role-name sparkjobrole

  1. Delete the RDS DB instance and DB cluster:
aws rds delete-db-instance \
--db-instance-identifier bpg \
--skip-final-snapshot

aws rds delete-db-cluster \
--db-cluster-identifier bpg \
--skip-final-snapshot

  1. Delete the bpg-rds-securitygroup security group and bpg-rds-subnetgroup subnet group:
BPG_SG=$(aws ec2 describe-security-groups --filters "Name=group-name,Values=bpg-rds-securitygroup" --query "SecurityGroups[*].GroupId" --output text)
aws ec2 delete-security-group --group-id "$BPG_SG"
aws rds delete-db-subnet-group --db-subnet-group-name bpg-rds-subnetgroup

  1. Delete the EKS clusters:
eksctl delete cluster --region="$AWS_REGION" --name=bpg-cluster
eksctl delete cluster --region="$AWS_REGION" --name=spark-cluster-a
eksctl delete cluster --region="$AWS_REGION" --name=spark-cluster-b

  1. Delete bpg ECR repository:
aws ecr delete-repository --repository-name bpg --region="$AWS_REGION" --force

  1. Delete the key pairs:
aws ec2 delete-key-pair --key-name ekskp
aws ec2 delete-key-pair --key-name emrkp

Conclusion

In this post, we explored the challenges associated with managing workloads on EMR on EKS cluster and demonstrated the advantages of adopting a multi-cluster deployment pattern. We introduced Batch Processing Gateway (BPG) as a solution to these challenges, showcasing how it simplifies job management, enhances resilience, and improves horizontal scalability in multi-cluster environments. By implementing BPG, we illustrated the practical application of the gateway architecture pattern for submitting Spark jobs on Amazon EMR on EKS. This post provides a comprehensive understanding of the problem, the benefits of the gateway architecture, and the steps to implement BPG effectively.

We encourage you to evaluate your existing Spark on Amazon EMR on EKS implementation and consider adopting this solution. It allows users to submit, examine, and delete Spark applications on Kubernetes with intuitive API calls, without needing to worry about the underlying complexities.

For this post, we focused on the implementation details of the BPG. As a next step, you can explore integrating BPG with clients such as Apache Airflow, Amazon Managed Workflows for Apache Airflow (Amazon MWAA), or Jupyter notebooks. BPG works well with the Apache Yunikorn scheduler. You can also explore integrating BPG to use Yunikorn queues for job submission.


About the Authors

Image of Author: Umair NawazUmair Nawaz is a Senior DevOps Architect at Amazon Web Services. He works on building secure architectures and advises enterprises on agile software delivery. He is motivated to solve problems strategically by utilizing modern technologies.

Image of Author: Ravikiran RaoRavikiran Rao is a Data Architect at Amazon Web Services and is passionate about solving complex data challenges for various customers. Outside of work, he is a theater enthusiast and amateur tennis player.

Image of Author: Sri PotluriSri Potluri is a Cloud Infrastructure Architect at Amazon Web Services. He is passionate about solving complex problems and delivering well-structured solutions for diverse customers. His expertise spans across a range of cloud technologies, ensuring scalable and reliable infrastructure tailored to each project’s unique challenges.

Image of Author: Suvojit DasguptaSuvojit Dasgupta is a Principal Data Architect at Amazon Web Services. He leads a team of skilled engineers in designing and building scalable data solutions for AWS customers. He specializes in developing and implementing innovative data architectures to address complex business challenges.

Post Comment