How to Use Apache Spark with Hadoop to Query Data on S3

Apache Spark - Overview

In this tutorial we are going to use several technologies to install an Apache Spark cluster, upload data on Scaleway’s S3 and query the data stored on the S3 directly from spark using the Hadoop connector. We are going to use Terraform to provision the machines and to trigger some Ansible playbooks which will install and configure Spark. Once Spark is installed, we will log into the master machine using SSH to run Spark commands.

Requirements:

Configuring Terraform

1 . Start by downloading and installing Terraform on your local computer.

2 . Export several variables that are necessary to run terraform:

export SCW_ACCESS_KEY="<ACCESS_KEY>"
export SCW_SECRET_KEY="<SECRET_KEY>"
export SCW_DEFAULT_ORGANIZATION_ID="<ORG_ID>"
export SCALEWAY_REGION=fr-par

Important: Replace <ORG_ID>, <ACCESS_KEY> and <SECRET_KEY> with the credentials of your API token.

Configuring Ansible

1 . Install Ansible on your local computer. Depending on your computers operating system, the installation procedure may differ a bit:

On MacOS:

brew install ansible

On Ubuntu or Debian based machines:

apt update && apt install ansible

2 . Test if Ansible is working by running the following command from a terminal window:

$ ansible --version
ansible 2.7.0.dev0
  config file = None
  configured module search path = [u'/Users/sieben/.ansible/plugins/modules', u'/usr/share/ansible/plugins/modules']
  ansible python module location = /usr/local/lib/python2.7/site-packages/ansible-2.7.0.dev0-py2.7.egg/ansible
  executable location = /usr/local/bin/ansible
  python version = 2.7.16 (default, Sep  2 2019, 11:59:44) [GCC 4.2.1 Compatible Apple LLVM 10.0.1 (clang-1001.0.46.4)]

3 . Once Ansible is installed and working, run the following command:

ansible-galaxy install krzyzakp.ansible_spark

Creating a Specific SSH Key to Run Terraform

While it is possible to use the same SSH key for Terraform as for your regular SSH connections, it is not recommended. For security reasons it is therefore recommended to create and use a dedicated key for Terraform as described here.

1 . Create a new directory and enter the directory. The following directory name terraform-scaleway is an example and you can name it as you like:

mkdir spark-scaleway && cd spark-scaleway

2 . Create a new ssh key by running the following command:

ssh-keygen -f ssh

Writing the Ansible Playbooks

1 . Inside the spark-scaleway directory you should create a directory named playbooks, then change into this directory.

mkdir playbooks && cd playbooks

2 . Create a new file called master.yml and open it in a text editor, for example nano.

touch master.yml && nano master.yml

The content of the file should look like the following example:

- name: master
  hosts: "bastion"
  roles:
    - { role: 'krzyzakp.ansible_spark',  spark_version: '2.4.4', spark_worker_enabled: 'yes'}

3 . Save the file and exit nano.

4 . Create a new file called worker.yml and open it in a text editor, for example nano.

The content of the file should look like the following example:

- name: worker
  hosts: "*"
  roles:
    - { role: 'krzyzakp.ansible_spark',  spark_version: '2.4.4', spark_master_enabled: 'no', spark_worker_enabled: 'yes'}

5 . Save the file and exit nano.

Applying It All

1 . Create a new Terraform plan by creating the file scaleway.tf and opening it in a text editor:

touch scaleway.tf && nano scaleway.tf

2 . Copy / paste the following content into the file:

provider "scaleway" {
  zone = "fr-par-1"
}

resource "scaleway_account_ssh_key" "main" {
  name = "terraform"
  public_key = "<PUBLIC_SSH_KEY>"
}

resource scaleway_instance_ip "bastion" {
  server_id = scaleway_instance_server.bastion.id
}

resource "scaleway_instance_security_group" "bastion" {
  name = "sg-bastion"

  inbound_default_policy  = "drop"
  outbound_default_policy = "accept"

  inbound_rule {
    action = "accept"
    port   = "22"
  }

  inbound_rule {
    action = "accept"
    ip_range = "10.0.0.0/8"
  }

  inbound_rule {
    action = "accept"
    port   = "8080"
  }

}

resource "scaleway_instance_security_group" "worker" {
  name = "sg-worker"

  inbound_default_policy = "drop"
  outbound_default_policy = "accept"

  inbound_rule {
    action = "accept"
    ip     = scaleway_instance_server.bastion.private_ip
  }
}

resource "scaleway_instance_server" "bastion" {
  name  = "bastion"
  type  = "DEV1-S"
  image = "ubuntu-bionic"
  tags  = [ "bastion" ]

  security_group_id =  scaleway_instance_security_group.bastion.id

  connection {
    host         = self.public_ip
    user         = "root"
    private_key  = file("<PATH_TO_PRIVATE_SSH_KEY>")
  }

  provisioner "remote-exec" {
    inline = [
      "apt-get update",
      "DEBIAN_FRONTEND=noninteractive apt-get install -yq python openjdk-8-jre-headless zookeeper zookeeperd",
    ]
  }

  provisioner "local-exec" {
  command = <<EOT
    export ANSIBLE_HOST_KEY_CHECKING=False;
    ansible-playbook -u root --private-key ${path.module}/ssh -i ${self.public_ip}, --extra-vars "spark_master_host='${scaleway_instance_server.bastion.private_ip}' spark_master_url='spark://${scaleway_instance_server.bastion.private_ip}:7077'" ${path.module}/playbooks/master.yml
    EOT
  }
}

resource "scaleway_instance_server" "worker" {
  count = 3

  name  = "worker-${count.index}"
  type  = "DEV1-S"
  image = "ubuntu-bionic"
  tags  = [ "worker" ]

  security_group_id = scaleway_instance_security_group.worker.id
  #disable_public_ip = true

  connection {
    bastion_host = scaleway_instance_ip.bastion.address
    host         = self.private_ip
    user         = "root"
    private_key  = file("${path.module}/ssh")
  }

  provisioner "remote-exec" {
    inline = [
      "apt-get update",
      "DEBIAN_FRONTEND=noninteractive apt-get install -yq python openjdk-8-jre-headless",
    ]
  }

  provisioner "local-exec" {
  command = <<EOT
    export ANSIBLE_HOST_KEY_CHECKING=False;
    ansible-playbook -u root --private-key ${path.module}/ssh -i ${self.private_ip}, --extra-vars "spark_master_host='${scaleway_instance_server.bastion.private_ip}' spark_master_url='spark://${scaleway_instance_server.bastion.private_ip}:7077'" --ssh-common-args '-o ProxyCommand="ssh -W %h:%p -q root@${scaleway_instance_ip.bastion.address} -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no"' ${path.module}/playbooks/worker.yml
    EOT
  }

}

resource "null_resource" "setup_cluster" {
  triggers = {
    worker_ips = "${join("\n", scaleway_instance_server.worker.*.private_ip)}"
    }

  connection {
    host         = scaleway_instance_ip.bastion.address
    user         = "root"
    private_key  = file("${path.module}/ssh")
  }

  provisioner "remote-exec" {
    inline = [
      "echo \"${scaleway_instance_server.bastion.private_ip}\n${join("\n", scaleway_instance_server.worker.*.private_ip)}\" > /opt/spark-2.3.4/conf/slaves",
      "systemctl restart spark-master.service"
    ]
  }

}

3 . Save the file and exit the text editor.

4 . Run the following commands to initalize Terraform and to apply the plan:

terraform init
terraform apply

The command above will generate a large output. Check out if any errors appear.

Verifying the Installation

1 . Get the public ip of the bastion machine from your scaleway console and open your web browser. Then point it to the following address : http://<BASTION_PUBLIC_IP>:8080/

2 . Check that there are four workers registered on the spark master.

3 . You can log into the bastion machine using SSH with the following command from a terminal window:

ssh -i ssh root@<BASTION_PUBLIC_IP>

4 . Run the following command to test the cluster:

/opt/spark-2.4.4/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://<BASTION_PRIVATE_IP>:7077   --executor-memory 512M --num-executors  4 --driver-cores 4 --executor-cores 4 --total-executor-cores 16 /opt/spark-2.4.4/examples/jars/spark-examples_2.11-2.4.4.jar 10000

You will see the application running on the web interface.

Uploading Data to the S3 fr-par Cluster

You need a dataset to upload on the cluster. In case you don’t have any real data yet or you do not want to use your real data for testing, you can generate a sample data set here: https://mockaroo.com

1 . Download the schema and upload it the following way using the AWS-CLI:

aws s3 cp <MY_DATA_SET> s3://<MY_BUCKET>/<OBJECT_NAME>

Querying the Data

1 . Connect yourself to the bastion machine via SSH and start the shell the following way:

/opt/spark-2.4.4/bin/spark-shell --master spark://10.65.32.49:7077  --packages org.apache.hadoop:hadoop-aws:2.7.4

2 . Run the following commands inside the shell:

sc.hadoopConfiguration.set("fs.s3a.endpoint", "https://s3.fr-par.scw.cloud");
sc.hadoopConfiguration.set("fs.s3a.access.key","<ACCESS_KEY>")
sc.hadoopConfiguration.set("fs.s3a.secret.key","<SECRET_KEY>")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.sql.DataFrame
val data = sqlContext.read.format("csv").option("header", "true").load("s3a://<MY_BUCKET>/<OBJECT_NAME>")
data.registerTempTable("data")
sqlContext.sql("select * from data where id = 10").show()

For more information how to query your data, you can refer to the official spark documentation.

Discover a New Cloud Experience

Deploy SSD Cloud Servers in seconds.