« javalobby, ibatis, Spring as my next toys | Main | Release 2.0.3 Now Available »

June 20, 2006

Clustering a massive computation. Fun for me and you!

posted by hung

To demonstrate how to use DSO in distributed computing projects, I have written a small demo that computes and displays a Mandelbrot fractal. I assume you have some knowledge of our product, but if not, it would be helpful if you download http://www.terracottatech.com/downloads.jsp and check out our other demos first. Our product guide

http://www.terracottatech.com/product-docs/product-guide/toc.html

is also a great start.

You can read more about the Mandelbrot set here:

http://en.wikipedia.org/wiki/Mandelbrot_set

To begin, take an (x, y) coordinate, and iterate a fixed amount of time to calculate color value for that point. Each line of the set can be computed independently; hence the formula is a great candidate for distributed computing. Each computer (node) can compute a portion of the set and forward the result to the viewer.

The demo utilizes Model View Controller (MVC) design. A MandelbrotModel object is declared as a root and shared between the nodes. Each node can have a viewer to display the fractal, however this is optional. The model maintains a listener list of viewers who registered themselves upon start up.

The initial design predetermines the the task and is divided evenly. For instance, let say you have three nodes then each node would process 1/3 of the task, sliced horizontally. However, for this particular fractal, the calculation is more intense in the middle region (where the pixels are brighter than the rest). As a result, the second node who handles the mid-section has the heaviest load. This is not desirable since we would like the nodes to have roughly the same amount of work. The new design is to divide the work load into 20 segments, store them in a queue, and have the node pick out one segment to work on and the next until all the tasks are done. This way, we max out the computing power of each node and improve the overall performance.

Questions may arise about how to notify the viewers to update the segment that has been computed? As mentioned earlier, each node can have their own viewer. Also, each viewer has a reference to the shared model that has all the data (pixel values). Incidentally, Terracotta DSO has a distributed method mechanism which turns out to be a great solution to the problem. Whenever a node finishes a segment, it will invoke a distributed method call of the model to let the model know that segment is ready for display. The model, in turn, will go through its list of registered viewers and pass along the segment information.

Here is the pseudo code for each node:

While work load is not empty, do

  • Check out one segment from the work load
  • Process the segment for data (calculating pixel color line by line within the segment)
  • Update the model with the new data

End while

And here is the actual code:

public void start()
{
    while (true)
    {
        Segment segment;
        synchronized (workLoad)
        {
            if (workLoad.size() == 0)
            {
                System.out.println("Workload empty. Return");
                return;
            }

            segment = (Segment) workLoad.remove(0);
        }

        int[][] data = new int[segment.getRowCount()][width];
        for (int r = segment.getStart(); r < segment.getEnd(); r++)
        {
            int rowIndex = r - segment.getStart();
            for (int c = 0; c < width; c++)
            {
                int rgb = calculatePixel(c, r);
                data[rowIndex][c] = rgb;
            }
        }

        synchronized (model)
        {
            model.addSegmentData(data, segment);
        }
    }            
}

Note that accessing shared roots in DSO has to be done inside of a synchronized block.

Now we see how the model handles each segment updated by the nodes:

public void addSegmentData(int[][] data, Segment segment)
{
    for (int row = segment.getStart(); row < segment.getEnd(); row++)
    {
        System.arraycopy(data[row - segment.getStart()], 0,
                                    rawData[row], 0, width);
    }

    fireSegmentUpdated(segment);
}


public void fireSegmentUpdated(Segment segment)
{
    if (listeners != null)
    {
        for (Iterator it = listeners.iterator(); it.hasNext();)
        {
            UpdateListener updateListener = (UpdateListener) it.next();
            updateListener.processSegment(segment);
        }
    }
}

The fireSegmentUpdated() call is our distributed call. Just declare it in the tc-config.xml file:


    
        void tc.demo.mandelbrot.MandelbrotModel.fireSegmentUpdated(..)
    
 

Once invoked, this method will be called on each JVM. The viewers, which implement UpdateListener interface, get the pixel values from model and set them into their own view frame. As below:

public void processSegment(Segment r)
{
    int startRow = r.getStart();
    int endRow = r.getEnd();

    synchronized (model)
    {
        int[][] rawData = model.getRawData();

        for (int row = startRow; row < endRow; row++)
        {
            for (int col = 0; col < width; col++)
            {
                frame.setPixel(row, col, rawData[row][col]);
            }
        }
    }

    frame.repaint(0, startRow, width, r.getRowCount());

}

One other note, when you have multiple nodes starting up, you want them to wait until all the nodes have been online. The last thing we want to have is one node sending updated info to the model when others are still initializing or not yet online (recognized by the DSO server) DSO supports wait() and notifyAll() calls on shared roots so that these calls are clustered among the JVM's. I declare another root as nodeList to keep track of the nodes. It’s only there to let the current node know when all other nodes are ready.

Below is the code for our Main class that contains the main method. It has three roots: the nodeList, the workLoad, and the model. These roots are shared among the nodes by the DSO server:

package tc.demo.mandelbrot;

import java.util.ArrayList;
import java.util.List;

public class Main
{
    private static int       NUM_NODES;
    private static int       SEGMENT;
    public static int        SEGMENT_COUNT   = 20;

    /* ROOTS */
    private List             nodeList        = new ArrayList();
    private List             workLoad        = new ArrayList();
    private MandelbrotModel  model;

    private CalculateNode    node;
    private int              width;
    private int              height;

    public Main(int width, int height)
    {
        this.width = width;
        this.height = height;

        model = new MandelbrotModel(width, height);
    }

    public void registerCalculateNode(CalculateNode w)
    {
        synchronized (nodeList)
        {
            w.setWorkLoad(workLoad);
            nodeList.add(w);
            nodeList.notifyAll();
        }
    }

    public void startMandelbrot(boolean hasViewer)
    {
        if (hasViewer)
        {
            Viewer viewer = new Viewer(width, height);
            viewer.setModel(model);

            synchronized (model)
            {
                model.addUpdateListener(viewer);
            }
        }

        // create new node and register it
        node = new CalculateNode(width, height);
        node.setModel(model);
        registerCalculateNode(node);

        // prepare work load
        // do only once, performed by the first node
        synchronized (workLoad)
        {
            if (workLoad.size() == 0)
            {
                for (int i = 0; i < SEGMENT_COUNT; i++)
                {
                    Segment r = new Segment(i * SEGMENT, (i + 1) * SEGMENT);
                    workLoad.add(r);
                }
            }
        }

        // wait till all the nodes are online
        System.out.println("Waiting for other nodes...");

        synchronized (nodeList)
        {
            while (nodeList.size() < NUM_NODES)
            {
                try
                {
                    nodeList.wait();
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }

        // start local calculation
        node.start();

    }

    /**
     * @param args
     */
    public static void main(String[] args)
    {
        if (args.length < 3)
        {
            printUsage();
            System.exit(1);
        }

        int width = Integer.parseInt(args[0]);
        int height = Integer.parseInt(args[1]);
        NUM_NODES = Integer.parseInt(args[2]);
        SEGMENT = height / SEGMENT_COUNT;
        boolean hasViewer = args.length >= 4;

        Main d = new Main(width, height);
        d.startMandelbrot(hasViewer);
    }

    public static void printUsage()
    {
        System.out.println("dso-java Main    [v]");
        System.out.println("v: display frame");
    }
}

As you can see, DSO requires no code changes in your application. You just need to write it carefully so that it is synchronized across multiple JVM’s. The same code runs just fine with pure java:

%> java –classpath classes tc.demo.mandelbrot.Main 600 600 1 v

To run it with DSO and deploy with two nodes, run the command on each node:

%> run.sh 600 600 2 v

You can have two nodes deployed on the same machine but that would be no fun. Try running each node on a different machine or a different O. Just make sure you modify the server field in tc-config.xml so the node knows where DSO server is located.

Note: You have to restart DSO server each time you want to try another run because the nodeList is never reset once the run is finished.

You can download the demo and try it with our Eclipse plug-in, another great supporting feature provided by Terracotta to help development with DSO.

Comments

How well can Terracotta scale under load? Wouldn't the AOP thing make the whole thing succumb under load, due to high network traffic etc?

Posted by: Prat at August 2, 2006 4:27 AM

Hi Prat. You are right that some additional network I/O. This is true of any coherent distributed cache - machines have to talk to each other to keep the cache in sync.

However, Terracotta is quite smart about minimizing message size (by sending only changed data) as well as minimizing frequency of messages (by batching changes and sending only when locks are released).

Posted by: Patrick Calahan at August 2, 2006 1:17 PM

Post a comment




Remember Me?

(you may use HTML tags for style)