« QCon Panel: Which clustering to use when | Main | Clustered Fork / Join »

December 17, 2008

The man asked for SQL but he wanted POJO

posted by ari

Was talking to some Terracotta users last week. They had a classic Business Process Mgmt challenge.

An upstream system would create work for downstream systems to execute. But the work was spread across many of these downstream services and could take days to complete before reporting successful results to the upstream server.

The existing system is built via JPA and Oracle underneath. There are several special cases handled in the production system. For example, any job that requires invoking a downstream service that is down ends up getting timed out and gets stuck in the db as DEAD (some column in a table somewhere is updated when the job times out). Jobs that are attempted to be run against services that do not yet exist also end up DEAD in the jobs table in the DB.

So the production operations team and the developers regularly run SQL scripts that find all jobs created between certain date ranges who's status is DEAD and kick those jobs by resetting them to INITIAL state. Then the job processor will see them again (simply SELECT * from JOBS_TABLE where JOB_STATUS=INITIAL type thing).

Now, the developers are annoyed with the constant maintenance of this system and implement a replacement using Terracotta and POJOs and collections. There is now a map of jobs and a job is some sort of POJO (I don't have the details, sorry). Problem is, the production team asked, "how do we kick DEAD jobs without a SQL interface to this thing?" And so the developers came to me and asked, "when will Terracotta have a SQL interface and are there workarounds using, for example, Compass or Lucene, or Hibernate Search that you can help us with?"

My answer: step back a moment. The requirement is not for a SQL interface here. It is for a well-designed system that can restart DEAD jobs that die due to temporary timeout or due to misconfiguration / non-existent service definitions. Well, the system is easily designed without any SQL requirement or search requirement whatsoever! (I said triumphantly...)

I don't think I did a good job explaining myself to the users but I want to do so now. First, move to an event driven architecture. Forgo the notion of a simple map and a POJO job and iterating on the map once a minute via a daemon thread. This is very DB like in its nature--scan all the data over and over again looking for rows or objects that match a pattern.

Instead I think this use case should be implemented in an event driven manner. Write each service as sitting behind a clean interface with perhaps an invoke() method or something simple. Build a pipelining system that creates a queue (LinkedBlockingQueue perhaps) of work per service-type. Then have a thread pool (ExecutorService perhaps) that sits on the tail end of each work queue take()ing work off that queue and firing UnitOfWork.invoke() (from the interface). If the UnitOfWork times out, re-queue it. If the service-type does not exist, create its queue but do not instantiate its thread pool. Work will just pile up until you OOME on that queue (don't worry about that at the moment...Terracotta will take care of spilling the queue to disk so you don't OOME). Last, write a servlet to update service definitions in the application (turn them on, turn them off, start or pause processing, reroute a service to another service, etc.).

Now what have we achieved? We have a single-JVM multi-threaded way using SEDA-type constructs to process many different types of service requests. We have a sorting mechanism that keeps each service request in a queue where only requests of that service reside. We have a way to turn services on and off without touching each task itself but instead updating some metadata about that service. And we have a multi-tasking architecture that can invoke many different services in parallel each in its own thread pool.

Now the only problems with our architecture are:

1. It doesn't scale past 1 JVM without replacing our java.util.Queue interfaces with JMS (ick!)
2. It doesn't persist the jobs in case of JVM failure
3. The status of a service--its metadata--needs to be consistent across all threads across all JVMs so that I can flip a service on / off simply by updating the map of services inside our servlet. We might even want to notify() someone in a thread pool that the service changed. But you can't notify() across JVMs.
4. If a service is defined weeks or months before it is activated, then we would have jobs sitting waiting to execute for that long period of time. Surely a POJO-based job mgmt solution would OOME

Terracotta eliminates these 4 issues for us transparently if we build the event driven system:
1. LinkedBlockingQueue and ExecutorService are transparently clustered with Terracotta. Create jobs on one JVM, take() them and invoke() their invokable interface on another. This allows us to go so far as to create a cluster of JVMs working together on a single service or on any number of services. E.g., Service A is backed by 10 JVMs while Service B is on 2 and Services C, D, and E are all on one JVM.

2. Jobs are POJOs in this UnitOfWork / LBQ model. The queues need to be flagged as Terracotta roots and thus all jobs inside them are persisted to the Terracotta array and to the array's disk transparently.

3. Service status map is another Terracotta root and wait() and notify() will work across threads _across_ JVMs with Terracotta.

4. A queue of service requests can just sit in memory as far as you, the developer are concerned. Terracotta will transparently flush requests from your JVM to our array and from our array's memory down to disk in case of failure.

So the development team asserted, "Terracotta gives us durable job state and spill-to-disk transactionally underneath our workload processor."

The operations team retorted, "How do I update services and re-run DEAD tasks without SQL."

I respond to the operators by saying, "Forget the notion of embedding a service's state inside each request. Go to thread pools, work queues, metadata structures, and wait() / notify() all built in a single JVM. Its not easy but it will prove very powerful and scalable...well beyond what a DB can support and without a rewrite once built."

So now I hear they are beginning implementation over the holidays of a LinkedBlockingQueue based solution.

What do you all think. Should I write up such a container's implementation in the Terracotta forge? Would it help you with anything you are working on? And, BTW, would you want to compose multiple services together through an orchestration service like Apache ServiceMix?

Let me know via comments or email.

Cheers,

--Ari

Trackback Pings

TrackBack URL for this entry:
http://blog.terracottatech.com/cgi-bin/mt/mt-tb.cgi/79

Comments

Ari,
We've started looking into using Terracotta for the next version of our internal software and what you describe here is along the same lines of something we'd end up using. So yeah, I think you should put the implementation on the forge. It'd be interesting to look at.

Posted by: James Asher at December 17, 2008 7:30 AM

Hi Ari -

I have a couple of questions ..

"Terracotta gives us durable job state and spill-to-disk transactionally underneath our workload processor."

What is the definition of transaction here ? Can I define my transaction boundaries ?

In case I need a write behind to database, what happens in case of a system crash ? Do I lose in-memory data ?

Cheers.
- Debasish

Posted by: Debasish Ghosh at December 17, 2008 8:08 AM

Debasish,

Good questions.

1. Terracotta transactions means that everything you change from lock-acquire to lock-release is pushed as a unit to the Terracotta array or not at all. An ACK from the array means your data is safe on at least 2 array nodes and will, thus, make it to 2 disks on 2 separate machines, safely.

2. You can mark your methods as transactions. You can use synchronization keywords / constructs from the Java language as transactions. You can use util.concurrent lock objects as transaction begin and end markers. If all that fails, there is a utility class in the core OSS kit and you can call its transaction markers directly.

3. Write behind to a db, I would use TIM-async for async processing. Everything in memory that you want to flush to the DB is actually under Terracotta's control so you won't lose anything if the JVM or our array or both crash before flushing to the DB.

--Ari

Posted by: ARI ZILKA at December 17, 2008 10:41 AM

Ari -

This is one of the questions which pops up frequently when I talk to clients about TC ..

"Everything in memory that you want to flush to the DB is actually under Terracotta's control so you won't lose anything if the JVM or our array or both crash before flushing to the DB."

Since you are not doing anything like XA or 2PC, how do you ensure 0 data loss in case of crash ? Any pointer to details will be helpful ..

Cheers.
- Debasish

Posted by: Debasish Ghosh at December 17, 2008 8:35 PM

Debasish,

Ah, ok. I should answer this follow up question in a whole new blog entry, and I will.

But before I write that long-winded thing here's a sneak peek: 1.5PC between Terracotta cache update and the DB. Use a Terracotta sync-write transaction to flag the cache dirty. Then use a regular Terracotta transaction to update the cache and db and flag the cache as clean. If anything fails in the regular transaction, the cache will stay dirty and you will fall back to the DB, thus losing nothing.

--Ari

Posted by: ARI ZILKA at December 17, 2008 9:48 PM

It would be great to have some out-of-the-box solutions in Terracotta. The simplicity is a key to end-users/developers and if you provide simple interfaces to ready-to-use solutions like one you described (TMS like JMS :))))) people will appreciate.

Posted by: Dennis Kharlamov at December 19, 2008 4:27 AM

You definitely should provide this out of the box!

Posted by: Nicolás Cornaglia at December 19, 2008 12:46 PM

I'm interested in this architecture and would love to see an example. In addition, can you help me understand what ports are necessary to enable communications between the queue and worker nodes? My application has a firewall between the queue and worker nodes. We can open ports in the firewall, but we need to know how many there are, and what limitations might exist. For example, does each worker node reserve a port, or can it have multiple worker threads internally to share a port?

Posted by: David I at February 17, 2009 9:31 AM

l45Jy3 awhnwekrckco, [url=http://srrwzvwxpcjh.com/]srrwzvwxpcjh[/url], [link=http://rmmggqqxzjsh.com/]rmmggqqxzjsh[/link], http://qcnzgokhgyja.com/

Posted by: dnlohq at April 27, 2009 1:18 PM

Post a comment




Remember Me?

(you may use HTML tags for style)