Implementing Program Status Scheduling at Cask
Sections
As an intern at Cask this summer, I had the opportunity to work on Cask's flagship product: CDAP. CDAP (Cask Data
Application Platform) helps you build big-data applications on a layer of abstraction over a distributed environment.
You can develop CDAP applications and test them locally on your computer, then package the application, and run it on a
distributed environment. Basically, you can use Mapreduce, Spark, and take advantage of the resources of a cluster,
while knowing almost nothing about a distributed environment. This summer, I implemented program status scheduling on
the platform to allow users to trigger a programs based on the program status of another program, while passing event
data between the program runs.
## Use Case
There are many use cases for this feature. By introducing program dependencies, we can, in turn, have smaller programs
that focus on completing one part of a greater data application process. For instance, we can build a CDAP pipeline to
download a feed from FTP and perform some initial pre-processing, storing it into a CDAP dataset. If this process
succeeds, then we would like to run a Spark job to process the dataset and output it to an S3 bucket.
With this feature, this process can be separated into two pipelines, where the Spark job processing pipeline can run
after the FTP download completes, and only if the FTP download completes.
## Designing
One of the unique aspects of my internship at Cask was the design process. Design played a very central role on this
project, and on all projects at Cask. I spent a lot of time understanding the current process of understanding the
lifecycle of programs on CDAP in both a standalone and distributed environment, as well as how programs can be scheduled
in other ways (by time, or by the arrival of a new partition on a dataset).
After discussing the design with others, a core piece of the design was that all programs need to be notified of their
program status. When saving the program status in storage, we should also see if there was a schedule created that
depends on that program status. In other words, we were thinking of something like this:
The high level approach (highly simplified) is pretty straightforward: - A notification arrives. - Check in a queue if
it satisfies a schedule. - Remove it from the queue, check if any constraints on that schedule are satisfied (time of
day, concurrent runs, etc). - Start the program when those constraints are satisfied, if they're satisfied.
A big part of the design was understanding the entire lifecycle of a program on our platform. All that work was
consolidated into a diagram like the one below:
## The Problems
But it wasn't that simple. We didn't have a way of sending notifications that a program was done. The design told us we
needed to refactor first. The refactoring work served two purposes:
We needed to unify how program states were recorded, so that program states accurately depict the correct state of the
program. Remember that CDAP works both on a standalone computer and in a distributed environment - the lifecycle of
programs is very different in both environments, and needed consistent states in all cases.
We needed to use an internal transactional messaging system (think Kafka, but transactional/consistent, and durable in
case services go down) to guarantee that these notifications would arrive exactly once, in order, and be visible to any
internal part of CDAP. These notifications needed to be sent from anywhere in CDAP.
Improving program state recording:
Let me talk a bit about the refactoring work, which became a bit more involved than expected:
Unifying program states was tricky. We needed a separate way of recording program states in local mode and in
distributed mode. Local mode was relatively easy. Each program has a runner to run the program, and a controller. We
simply add a listener to each controller to look for state changes, and react to those. In distributed mode, however, it
was inconsistent and too late to respond there. We needed to record program states in the Application Master in
distributed mode, which is responsible for handing out containers/resources to the programs. This is the most accurate
and reasonable place where the program state for any program is known.
Then, using a transactional messaging system to record program states was also more difficult than we thought, for other
reasons. Publishing messages was easy, but consuming them efficiently was a bit more difficult. We needed to built a
message fetcher that would consume notifications in batch and persist the program states transactionally. Once we did
all that, we needed to fix all of the tests to correctly check for the program status, as these notifications were a
cleaner, more reliable, but slightly slower way of updating program states.
With notifications being published and consumed internally by the platform, it became much easier to implement the
original goal and follow the diagram: scheduling programs.
At this point, a thread can subscribe to the topic on the messaging system and persist program state changes to the
store. Scheduling can also subscribe to the same topic and schedule jobs.
## Conclusion
I learned a lot at Cask, about all of the buzz words around big data, about design, about open source software. Dealing
with a distributed environment with multi-threaded code and handling race conditions made this a much more challenging
internship than, say, classic web-development. I definitely have a greater appreciation for Java now.
Check out my design docs I made:
Program status scheduling: https://wiki.cask.co/display/CE/Program+Status+Based+Scheduling
Refining program states: https://wiki.cask.co/display/CE/Refined+Program+States
Read other posts