Parallel Data Processing

When a transformation is processed in parallel, the whole graph (or its parts) runs in parallel on multiple cluster nodes having each node process just a part of the data.

So the more nodes we have in the cluster, the more data can be processed in the specified time.

The data may be split(partitioned) before the graph execution or by the graph itself on the fly. The resulting data may be stored in partitions or gathered and stored as one group of data.

The curve of scalability may differ according to the type of transformation. It may be almost linear, which is almost always ideal, except when there is a single data source which cannot be read by multiple readers in parallel limiting the speed of further data transformation. In such cases it is not beneficial to have parallel data processing since it would actually wait for input data.

Node Allocation

Node allocation is the specification of which cluster nodes will run the graph and which parts of the graph they will run. Allocation is basically specified by the partitioned sandboxes used in the graph phase. Each phase may have its own (just one) allocation. Basically, each partitioned sandbox has a list of locations. When some part of the graph runs in parallel, there is one worker for each partitioned sandbox location. See "Partitioned sandbox" in Partitioned and Local Sandboxes for details.

Allocation is specified in the graph either by:

  • sandbox resources pointing to a partitioned sandbox, if workers read/write some partitioned data to/from their own location of this partitioned sandbox, or by

  • the node attribute "node allocation", if workers do not read/write their partitioned data, however there must be an allocation specified.

If there is a conflict, execution fails and an error message appears containing the description of the conflict. A single conflict may be caused by using two different allocations in a single phase.

Partitioning/gathering Data

As mentioned before, data may be partitioned and gathered in multiple ways. It may be prepared before the graph is executed or it may be partitioned on the fly.

Partitioning/gathering "on the fly"

There are two special components to consider: ClusterPartitioner and ClusterGather. Both work similarly, but in the opposite way.

ClusterPartitioner works like a common partitioner, but node allocation is applied simultaneously behind the ClusterPartitioner component. All components preceding the ClusterPartitioner run on just one node (so called the primary worker - see below) whereas components behind the ClusterPartitioner run in parallel according to node allocation. Thus, these nodes work with just part of the data. There are more partitioning types: "round-robin" (default), "by record key", and "by load".

ClusterGather works in the opposite way. Components preceding the gather run in parallel while components behind the gather run on just one node (primary worker). The cluster gather component gathers records in the same way as SimpleGather and its attributes are the same. By default it does not sort input records in any way. It just gathers them in the order they come.

Primary worker node - some parts of the graph designed to run in parallel may run on a single node anyway. i.e. the part where the graph reads/writes data from/to a single resource. It may be the part preceding ClusterPartitioner or the part behind ClusterGatherer respectively. It also may be on all components in the phase which do not have node allocation specified at all. Each phase may have its own primary worker. All graph primary workers are chosen during graph execution primarily according to the local sandbox datasources used in the phases. Basically, the node which has direct(local) access to a sandbox datasource(s) used in the phase is selected as the primary worker. Of course, there may be multiple different local sandbox datasources, or even no local sandbox datasources used in the phase. In such cases, the server uses some minor parameters to choose the primary worker.

Both components may be combined in a single phase in any way, but there must be just one node allocation and just one primary worker in each single phase.

This example shows how data would be processed in 2 different node allocations, on 2 different primary workers.

  • phase 1 starts

    • processing data on primary worker (nodeA)

    • cluster partitioner component

    • processing data in parallel (nodeA, nodeB, nodeC)

    • cluster gatherer component

    • processing data on primary worker (nodeA)

  • phase 1 ends

  • phase 2 starts

    • processing data on primary worker (nodeA)

    • cluster partitioner component

    • processing data in parallel (nodeB, nodeD)

  • phase 2 ends

  • phase 3 starts

    • processing data in parallel (nodeB, nodeD)

    • cluster gatherer component

    • processing data on primary worker (nodeD)

  • phase 3 ends

Results are stored on a different node (nodeD) then the node that read (nodeA) and data is actually repartitioned (from nodeA, nodeB, nodeC to nodeB, nodeD).

Partitioning/gathering data by external tools

Partitioning data on the fly may in some cases be an unnecessary bottleneck. Splitting data using low-level tools can be much better for scalability. The optimal case being, that each running worker reads data from an independent data source. Thus there does not have to be a ClusterPartitioner component in the first phase and the graph runs in parallel from the beginning.

  • phase 1 starts

    • processing data in parallel (nodeA, nodeB, nodeC)

    • cluster gatherer component

    • processing data on primary worker (nodeA)

  • phase 1 ends

Or the whole graph may run in parallel, however the results would be partitioned.

  • phase 1 starts

    • processing data in parallel (nodeA, nodeB, nodeC)

  • phase 1 ends

Partitioned and Local Sandboxes

Partitioned and local sandboxes were mentioned in previous sections. These new sandbox types were introduced in version 3.0 and they are vital for parallel data processing.

Together with shared sandboxes, we have three sandbox types in total.

Shared sandbox

This type of sandbox must be used for all data which is supposed to be accessible on all cluster nodes. This includes all graphs, metadata, connections, classes and input/output data for graphs which should support HA, as described above.

Dialog form for creating new shared sandbox

Figure 22.1. Dialog form for creating new shared sandbox


As you can see in the screenshot above, you cannot specify any root path on the filesystem. Shared sandboxes are stored in the directory specified by "cluster.shared_sandboxes_path". Each shared sandbox has its own subdirectory in it, which is named by sandbox ID.

Local sandbox

This sandbox type is intended for data, which is accessible only by certain cluster nodes. It may include massive input/output files. The purpose being, that any cluster node may access content of this type of sandbox, but only one has local(fast) access and this node must be up and running to provide data. The graph may use resources from multiple sandboxes which are physically stored on different nodes since cluster nodes are able to create network streams transparently as if the resource was a local file. See Using a Sandbox Resource as a Component Data Source for details.

Do not use local sandbox for common project data (graphs, metadata, connections, lookups, properties files, etc.). It would cause odd behaviour. Use shared sandboxes instead.

Dialog form for creating new local sandbox

Figure 22.2. Dialog form for creating new local sandbox


Partitioned sandbox

This type of sandbox is actually an abstract wrapper for a couple of physical locations existing typically on different cluster nodes. However, there may be multiple locations on the same node. A partitioned sandbox has two purposes which are both closely related to parallel data processing.

  1. node allocation specification - locations of a partitioned sandbox define the workers which will run the graph or its parts. So each physical location will cause a single worker to run. This worker does not have to actually store any data to "its" location. It is just a way to tell the CloverETL Server: "execute this graph/phase in parallel on these nodes"

  2. storage for part of the data during parallel data processing. Each physical location contains only part of the data. In a typical use, we have input data split in more input files, so we put each file into a different location and each worker processes its own file.

As you can see on the screenshot above, for a partitioned sandbox, you can specify one or more physical locations on different cluster nodes.

Do not use partitioned sandbox for common project data (graphs, metadata, connections, lookups, properties files, etc.). It would cause odd behavior. Use shared sandboxes instead.

Using a Sandbox Resource as a Component Data Source

A sandbox resource, whether it is a shared, local or partitioned sandbox (or ordinary sandbox on standalone server), is specified in the graph under the fileURL attributes as a so called sandbox URL like this:

sandbox://data/path/to/file/file.dat

where "data" is a code for sandbox and "path/to/file/file.dat" is the path to the resource from the sandbox root. URL is evaluated by CloverETL Server during graph execution and a component (reader or writer) obtains the opened stream from the server. This may be a stream to a local file or to some other remote resource. Thus, a graph does not have to run on the node which has local access to the resource. There may be more sandbox resources used in the graph and each of them may be on a different node. In such cases, CloverETL Server would choose the node with the most local resources to minimalize remote streams.

The sandbox URL has a specific use for parallel data processing. When the sandbox URL with the resource in a partitioned sandbox is used, that part of the graph/phase runs in parallel, according to the node allocation specified by the list of partitioned sandbox locations. Thus, each worker has it is own local sandbox resource. CloverETL Server evaluates the sandbox URL on each worker and provides an open stream to a local resource to the component.

The sandbox URL may be used on standalone server as well. It is excellent choice when graph references some resources from different sandboxes. It may be metadata, lookup definition or input/output data. Of course, referenced sandbox must be accessible for the user who executes the graph.