Mining Patents in the Cloud (part 3): Design Issues
In parts one and two of this series of blog postings, we’ve introduced SureChemOpen, and described how patent documents are processed to extract interesting chemical compounds for searching. We’ve also described how cloud technologies were employed to implement the pipeline.
The final part of this article digs into the design of the SureChemOpen pipeline, discussing what worked well, what didn’t work quite so well, and describing a few pitfalls you might encounter if you’re thinking of building a cloud-based data mining pipeline of your own. The discussion also touches on common operational and developmental requirements such as scalability, reliability, complexity, data integrity, and performance.
Scaling up and out was one of our top priorities, and it’s one of the key benefits of a cloud-based pipeline. The ability to (relatively) easily add new compute nodes means that additional processing capacity can be added as needed to deal with higher workload, and the use of a queuing system to coordinate work means that new task instances can be added almost without limit. Practically speaking, other bottlenecks will emerge in a pipeline with very large numbers of tasks – for example in accessing shared resources such as a database.
It’s also possible to utilize AWS “spot instances” to expand your processing capacity further – a spot instance is a lower-price compute node made available at off-peak times via a bidding system. While we’re not quite there yet with SureChem, it’s also possible to automate the use of spot instances to add tasks and increase capacity.
The main tradeoff in scaling is complexity.
A single, serialized, data processing pipeline instance is much easier to manage and maintain than a multi-node, multi-process, multi-threaded, highly concurrent system. In the process of scaling out we’ve had to think hard about the consequences of any two units of work being done at the same time, key challenges being database integrity (more on this below) and multi-threading. We’ve found that Java processes aggressively acquire memory, meaning multiple JVM instances on a single system is undesirable. As such, we’ve had to make sure our tasks can run alongside each other in the same JVM process and to partition those tasks that don’t play well together into different processes.
Scaling out also can’t be rushed – if you’re processing a very large volume of data, make sure you leave plenty of time to build up to full capacity, don’t just expect to switch on another twenty nodes and be done. While this is possible, it’s a good idea to take time to build the trust in your system – to work out the bugs, build essential monitoring and configuration infrastructure, etc. Once you’re confident your system will work seamlessly at scale, you can add more capacity in the knowledge that the risk of expensive mistakes has been minimized.
It’s also important to consider the cost of scaling. While small-scale processing is cheap, costs can quickly grow as you add more capacity (i.e. more EC2 nodes) and as you process more data. You’re also at the mercy of Amazon’s pricing policy, changes in which may mean fluctuating or unexpected costs.
Closely related to scaling is optimization – how to get the optimal performance from the resources you have. While it may be tempting to just add more capacity, make sure you’ve performed the easiest and most obvious optimizations first, at least.
The main workflow in data pipeline optimization is to find the bottleneck, remove it, then repeat. CloudWatch is your friend here, because it will show you where the bottleneck is in your system for any particular build or configuration. Look for one particular task which is preventing work from reaching other downstream tasks, then investigate why it’s taking so long. Check if your task is particularly CPU intensive, requires more memory (e.g. for caching), or is blocked on I/O.
For CPU intensive tasks, you may be lucky and find that the cause is unnecessarily complex code – in which case you can rework it to improve performance. More likely though, the complexity may be necessary or very expensive to speed up in which case you can look to scale out. Consider adding more instances of the bottleneck task; run these as new process instances, or as additional threads alongside existing tasks if you’re running JVM instances. In our initial pipeline configuration, we found that even though we had CPU-bound tasks, CPU utilization on our system was very low while memory use was high. By rearranging where different tasks were running, and by adding more instances of tasks running as threads alongside other tasks, we were able to increase throughput and system utilization massively. If CPU and memory resources are fully utilized, add new nodes.
You may find that your tasks are blocked on I/O – i.e. taking a long time to read or write to the filesystem, or similarly blocked on network accesses such as to a remote database or service. Again you may simply need to review your code to see if there are any obvious optimizations – unnecessary or poorly implemented I/O operations. If not, review the granularity of work units your tasks are performing; very fine grained tasks mean more I/O is required for coordination and for sharing of intermediate resources.
If it looks like the bottleneck is the network, make sure you batch as many remote requests as possible – particularly SQS messages and database requests. SQS provides batch reading and writing of messages (limited to 10 per request, unfortunately) which speeds up message handling and reduces costs. Similarly if you’re interacting with an SQL database, one large query or write operation will typically perform better than lots of small operations. In the SureChemOpen pipeline we increased annotation throughput by an order of magnitude, simply by performing a compound INSERT statement – interestingly this bottleneck only appeared once we were running in the cloud, because the pipeline was no longer interacting with the database locally.
Message filtering, Idempotence, and Transactions
Once you move to a distributed pipeline, you’ll need to consider what happens if your pipeline receives duplicate messages, or different messages with equivalent or interacting side effects.
Duplicate messages can occur for several reasons. They may appear because of bugs in your code, or due to bugs in your chosen messaging system – prevention of duplicate messages is a knotty problem and cannot be guaranteed, just made very very unlikely. You may also find duplicate messages passing through your system because a job has been re-submitted.
Whatever the reason, how duplicates are handled is an important consideration. One option is to filter them – for example by performing a check against a database and throwing away messages that have clearly been processed already. This helps ensure data integrity and prevent unnecessary rework; that said you may wish to make filtering conditional based on a flag included in the message. In SureChemOpen we reject messages that would cause documents to be re-processed, unless they are marked as requiring reannotation within the message.
Even if you choose to filter duplicates, it’s a good idea to try to make the processing of each task idempotent. Those familiar with RESTful principles may know that idempotence is a characteristic of the PUT verb in the HTTP protocol. Simply put, it means that state of the system is consistent no matter how many times a particular message is processed. Practically speaking, idempotence may mean resetting stored data prior to message processing, or using insert/update operations rather than purely inserting new records. SQL, for example, provides the INSERT IGNORE operator which can be used to ensure database consistency. In SureChemOpen, we clear out all annotations for a document prior to reannotation – failing to do so would simply mean duplicate records in the database. Similarly when storing new chemical names, we use an INSERT IGNORE operation to ensure we only record each name once.
Another integrity consideration is transactionality. Ideally, the processing of each message will be atomic – meaning that it’s either fully processed (and thus deleted from the input queue) or nothing is processed and there are no side effects. Failing to ensure atomic operations can mean inconsistent or incomplete data, or in some cases downstream messages that cannot be processed because required resources are missing. You may be able to partition your pipeline so that tasks are naturally atomic – e.g. in SureChemOpen, the result of processing an OCR correction message is a single, downstream message. The only side effect of processing failure is that the message hangs around on the queue and the logs are a bit noisy.
If your tasks aren’t naturally atomic, transactionality can be expensive – both in terms of development and processing time. If you’re using a Relational database such as MySQL then you’ll get a certain amount of transactionality for free (except for processing overhead), but make sure you consider message transmission too. One option is to retain a buffer of messages that will only be transmitted once the input message has been successfully processed.
Keeping your development team sane(ish)
While the above topics have focussed on data processing performance and integrity, it’s also important to consider developmental requirements. While your pipeline may be fast and scalable, it should also be easy to develop and maintain. Here are a few suggestions for how to make life easier:
Logs everywhere. We try to log every significant event, often as informational messages but sometimes as debug messages for low level events. The combination of thorough logging with a log collection service such as greylog2 makes tracking and problem determination easier.
Use a task management framework. The AWS SDK for Java makes it easy to interact with AWS services, but it doesn’t provide a framework for task management or message processing. We developed a simple framework for pipeline tasks which handles all SQS related activity such as reading, writing, and deleting from queues. Having a single, unified abstraction for task processing makes it easy to apply wide-reaching, fundamental changes to your core aspects of your pipeline. Some languages (such as Erlang) provide this for free, though choosing a less mainstream language often means integration with third party software will be harder.
Start with a zero-tolerance approach to exceptions – only add more lenient exception handling when your system is running reliably. Most exceptions in SureChemOpen are re-thrown as a runtime exceptions, causing message processing failure. This increases the visibility of issues during development.
Choose third party dependencies carefully. SureChemOpen relies heavily on third party software for name to structure conversion, structure processing, and image analysis. Because our pipeline is fully automated and we process massive numbers of documents, tasks must be highly resilient. We’ve managed to expose numerous bugs in third party tools by hitting edge cases. So either avoid third party software if you can, or limit the number of dependencies.
This article introduced SureChemOpen, and described the text mining pipeline and cloud-based implementation which provide the interesting chemical data at the heart of SureChem. It also discussed several design considerations to bear in mind when developing your own pipeline.
Overall we’ve found the combination of EC2, SQS, S3, and CloudWatch to be a good foundation for data mining in the cloud. By combining these technologies, we were able to build an easily scalable pipeline for generating SureChem data. The main tradeoffs for improved scalability are greater management and development complexity, along with less predictable costs.