[0] Past, Present, Future - What I've Learned From Packaging Spark Applications


#1

Intro

My company runs a myriad of batch style Spark/Pyspark workloads which I’ve had the privilege of creating the ops and workflows for over the past few years. When I joined up with PeopleDataLabs they were using CDH, and I was really excited about deploying charmed bigtop alongside it and seeing what held up and what didn’t when running our workload. I set forth in speccing out the networks, cabs, servers and setting up some infrastructure to facilitate my juju deployed spark/hadoop, but lets back up a moment before any of that. First, I set out to enumerate and identify the requirements and constraints of our Spark workloads, and couple those requirements with their respective compatible components in the charmed bigtop stack.

Spark Workload Requirements

As someone fairly new to Spark at the time, the top level differentiator that stuck out to me was that there were two primary types of workloads that Spark could facilitate; batch processing, stream processing.

Batch processing is essentially static processing. You have input data in one location, load it, process it, write the output to the output location; batch processing.

Stream processing has less to do with aggregating over large static sources, and more to do with in flight processing on streams of data that never touch disk in their lifetime that Spark is aware of.

My company’s production Spark workloads consist of batch processing, ETL type, semi-automated, headless workloads running on Spark on Yarn on HDFS. Development workloads consist of jupyter notebooks running on Spark/Yarn/HDFS backends (the dev version of our production workloads).

After doing some research and diving into our codebase, I had a few key takeaways:

  1. I was targeting a batch processing architecture.
  2. Physical infra requirements == fat pipe + big disk.
  3. We were using HDFS for our distributed backend (where the input and output data goes).
  4. The only component of Hadoop we used was HDFS, everything else was Spark/Pyspark (minus a few small hadoop libs).
  5. Our codebase was dependent on the Spark 2.2.0 API.
  6. Every node in the cluster needed to have identical dependancies in identical locations on the filesystem in order for a job to run.
  7. Our method of distributing code changes to nodes in the cluster was to zip the ipython notebook work dir and scp it to every node in the cluster.
  8. We had some dependancies on hadoop libs, so we needed hadoop and libs to exist in the filesystem where Spark executors run, but no running hadoop component processes were needed if we replace HDFS with another storage backend.

Assemble the Solution

At this point I had a few basic requirements of our workload to see if I would be able to assemble a demo of our workload running on charmed bigtop.

I started to match things up, what would work together and what wouldn’t, here were my findings:

  • Charmed bigtop only supported Spark 2.1.?, I needed support for > 2.2.0 Spark API.
  • Our use case called for a serious HDFS backend, greater then 1PB of SSD storage was needed across our Hadoop task nodes to facilitate storing the input and output of one of our ETL jobs. Unfortunately the hadoop charms put HDFS on / and have no further support for storage.

Due to these findings, I knew right from the start that the charmed bigtop stack would not work for us and I would need to find another solution to provision our Spark workloads.

Letting this all ruminate, I thought to myself “I can use juju resources to version, package and provision the spark and Hadoop tarballs, juju storage to facilitate the HDFS storage. This is going to be great! I can do it!”

Following further analysis on moving forward with HDFS, it made a lot of sense to look into decoupling the storage infrastructure from the compute nodes.

Decoupling Storage and Compute

The upsides to decoupling storage and compute, and using Ceph/S3A + Spark in place of Yarn/Hadoop/HDFS + Spark for our batch processing jobs were many.

Access and Consumability

Moving the data to Ceph allowed us to have many different use cases accessing the data simultaneously, opposed to when the data was in HDFS, it could only really be used by the cluster that the data was stored on. Previously, if we wanted to have multiple Hadoop clusters access the data, we needed to have the data on the respective HDFS filesystem for each cluster. This started to create contention for access to the data, the ability to process it, and the resources needed to house it. Moving the data to Ceph and accessing it remotely via S3A solved this problem.

Lifecycle and Management

Mitigating the data back and forth from HDFS to long term storage was a pain point. We could process data on HDFS just fine, but getting the data in and out of the distributed filesystem to a proper long term storage system (or any other database/filesystem) on a frequent basis was creating contention in how often we could run jobs because a user would need to wait for data to finish transferring before the cluster was available to run a job.

Consistency and Scale

The S3A protocol can be used with AWS S3 the same as it can be used with Ceph/Radosgw. Decoupling and moving to S3 protocol opened up a whole new world for us as we were already using S3 a fair amount for other storage purposes, just not for our processing backend. This change allowed us to run our jobs in the cloud using Spark deployed on instances pointing at a private S3 VPE in our VPC, the same way we ran the job at the datacenter pointing at Ceph/Radosgw. We could now deploy large and ephemeral processing clusters in the cloud, which alleviated the physical resource contention; users could now deploy clusters of unlimited size on demand and throw them away when the job is finished. This allowed us to scale our workloads immediately.

Simplification

Concerning the simplification of our compute workload, after decoupling the storage to Ceph and dropping the need for HDFS, we only had to account for a single process; Spark. Previously we needed to account for a whole ocean of applications that needed to run in harmony to facilitate what is CDH, and the running of our Pyspark applications. Now we simply needed a Spark standalone deploy with our code on the nodes, and access to an S3 like storage system.

I figured out that we didn’t actually depend on a HDFS backend. We could replace HDFS with S3 and our jobs would not notice the difference given the physical environment was configured to support the high network throughput needed to facilitate a remote storage batch processing environment. Having the data on separate infrastructure allowed us to manage the compute and storage independent of each other, enhanced access, lifecycle, and opened up doorways for us to more easily run our workload in the cloud and on-prem alike.

Implementation

Knowing that I wasn’t going to need to account for HDFS anymore, I took a closer look at what storage requirements Spark has itself. Spark has a need for executor workspace and a cache, I felt they both fit well into Juju’s storage model so I used Juju storage to facilitate Spark’s local storage needs

Provisioning the Spark and Hadoop libs seemed to be right in the Juju resources wheelhouse, so I went ahead with this using the spark and hadoop upstream tarballs attached as charm resources via layer-spark-base and layer-hadoop-base.

Private code deps and workflow was accommodated via layer-conda. This allowed our developers to interface to Juju to get our code dependancies on to the nodes. I packaged our data processing code up as a python package, so that our developers could use this layer-conda config to pull our code deps into the conda env at will, and also provide a more formal interface to managing dependencies.

Combining layer-conda, layer-spark, layer-hadoop, and layer-jupyter-notebook I was able to create a somewhat more manageable version of a code execution environment that featured support for the things our environment needed.

Result

I swapped out the HDFS backend for a Juju deployed Ceph + Radosgw, ramped up our networking, threw some blades in the cabs, deployed my new charms and let’er rip.

In this new architecture we were running Spark standalone clusters that were reading and writing data to the S3 backend using the S3A to communicate with an S3 endpoint from Spark.

This setup was far better than what we previously had in place, but things still weren’t exactly great. I’ll follow up with why this setup wasn’t optimal and how I came up with next iteration of Spark packaging and deployment in the next post in this series.


[1] Past, Present, Future - What I’ve Learned From Packaging Spark Applications
#2

A great read with my afternoon cup of coffee. Looking forward to the next instalment.