Data-rich companies (e.g. LinkedIn, Facebook, Google, and Twitter) have historically built custom data pipelines over bare metal in custom-designed data centers. In order to meet strict requirements on data security, fault-tolerance, cost control, job scalability, and uptime, they need to closely manage their core technology. Like serving systems (e.g. web application servers and OLTP databases) that need to be up 24x7 to display content to users the world over, data pipelines need to be up and running in order to pick the most engaging and up-to-date content to display. In other words, updated ranking models, new content recommendations, and the like are what make data pipelines an integral part of an end user’s web experience by picking engaging, personalized content.
Agari, a data-driven email security company, is no different in its demand for a low-latency, reliable, and scalable data pipeline. It must process a flood of inbound email and email authentication metrics, analyze this data in a timely manner, often enriching it with 3rd party data and model-driven derived data, and publish findings. One twist is that Agari, unlike the companies listed above, operates completely in the cloud, specifically in AWS. This has turned out to be more a boon than a disadvantage.
Below is one such data pipeline used at Agari.
Data Flow
- An on-premises Collector (i.e. lives within one of our enterprise customer's data centers) sends customer telemetry data to S3
- An Apache Spark cluster consumes this data and publishes enriched, analyzed data back to S3
- Whenever an S3 object is written by Spark, an SNS (Simple Notification Service) push notification is generated and sent to an SQS (Simple Queue Service) queue
- An Importer process carries out the ETL (a.k.a. extract, transform, and load) of this enriched data into our analytic database
- A web application queries our DB to display reports and engaging visualizations that our enterprise customers use to draw useful insights
Dealing with a Bottleneck
The architecture above has many advantages. It leverages S3 as the integration data store -- S3 is where both input and derived data live. In non-cloud environments, this is typically HDFS. Since S3 is optimized for high-throughput operation, it is ideal for parallel ingest of data from tens of thousands of Collectors as well as for parallel read and write access via Spark, a massively parallel data processing system. The only problem with this architecture lies in the Importer. The Importer is a single job that turns out to be a bottleneck -- for one customer's data set, we experienced a 30-hour delay at the Importer. Thus, it seemed natural to parallelize the import phase as well.
Parallelizing Importers
SQS
To achieve this, we leveraged both SQS and ASGs (a.k.a. auto-scaling groups). Firstly, SQS is an ideal vehicle for delivering S3 object pointers to a set of parallel Consumers (i.e. Importers). SQS is a queue service that ensures at-least-once delivery of a message. When a message is consumed by an Importer, the message becomes invisible to other consumers for a configurable period of time (i.e. the visibility time-out period). Once the Importer commits the message data to the database, the Importer must ACK the message. If it does so within the visibility time-out period, SQS removes the message from the queue. If the visibility timeout expires before the ACK arrives, then the message becomes visible to other Importers and it is possible for the message to be consumed more than once. This is common in messaging systems, so it is a well-understood practice to ensure that Importers (consumers) are idempotent in their "commit" logic.
ASGs
Auto-scale groups have been in wide-spread use for a few years now, mostly in concert with ELBs (a.k.a. Elastic Load Balancers) to reliably scale web traffic and to maintain a minimum set of healthy servers. What is not widely discussed is their use in job processing or message processing. At Agari, we leverage ASGs to auto-scale our importers. This brought our importing time down from 30 hours to 15 minutes for the customer example previously mentioned.
Auto-Scaling Based on Average CPU
Our first attempt at auto-scaling Importers used average cluster CPU usage as the scaling trigger -- c.f. the 2 graphs below. In the first diagram, the orange trend line depicts SQS message sent rate. The 2 green lines portray SQS message consumption in the form of SQS receive and delete (ACK) rates. The blue line shows cumulative CPU. As you can see, a flood of data was sent to SQS (orange). To meet this demand, the cluster scaled out linearly, causing the cumulative CPU (blue) to go up linearly. The message consumption also increased, though its not as smooth a line (green).
The diagram below shows average CPU usage in the cluster during the run. Our triggering criteria for a scale out event was 40% CPU for a period of 5 minutes. As you can see, the auto-scaling activity maintained the CPU at 40% throughout the run by adding more machines.
Auto-Scaling Based on SQS Queue Depth
However, there were 2 issues with this. Firstly, we needed to maintain an ASG with a minimum size of 1, which meant always paying for a machine even if our data loads were not running - we currently run a few times a day. Secondly, CPU turned out to be an unreliable trigger since the ETL was IO-bound, not CPU bound. To address both of these problems, we moved to using SQS Queue Depth as the triggering mechanism. This allowed us to set the ASG min size to 0, thus providing cost savings, and also proved to be very reliable as shown below.
In the image above, the orange trend line represents S3 Writes that result in SQS notifications. This is done by the Spark job. The blue line represents the SQS queue depth at any point in time. The auto-scaled importers must consume all of these messages. In the first "hump", we started off without auto-scaling and a single node, so the rate of SQS message consumption was low. The sudden change in slope of the blue line reflects our turning on auto-scaling. The second "hump" reflects what happens now, with queue-depth-based auto-scaled importers. As the Spark job starts writing data to S3, the SQS queue depth grows, and the Importer cluster is scaled out to meet that demand. Now our Importer cluster completes even large data loads in ~ hour.
Conclusion
Agari avails of cloud and open-source technologies, such as S3, Spark, SNS, SQS, and ASGs to build a scalable, low-latency data pipeline. By leveraging S3 as our high-throughput integration store, Spark as our parallel computation engine, SQS as our highly scalable queue, and ASGs at the Importer layer, we are able to scale at all stages of our pipeline. By using SQS's queue-depth as an auto-scaling trigger, we reduce our EC2 costs by shrinking our cluster to size 0 while using a more reliable signal for IO-bound workloads. Another best-practice that I would like to share is our use of Avro. I mentioned earlier that S3 is our integration store. This means that all data input, intermediate, and output is stored in S3. Beyond picking S3 as our integration store, we also needed to pick a common file format -- Avro is a serialization format that provides schema-evolution and efficient binary serialization, does not require code generation, and is well supported in multiple languages, including Ruby, Java, and Python. It met our needs.
Acknowledgements
A special thanks to my colleagues Scot Kennedy and Andrew Flury for their help and leadership in building this pipeline.