The Initial Architecture

Initial Architecture

We want to perform an equi-join of two streams (that are represented by inputs A and B), and then join the result with a third stream (represented by input C).

The initial architecture is implemented directly using two hash join operations.

Here we show the derivation of an equivalent, parallel implementation of this architecture.

In this derivation we use an array notation to denote replicated boxes or ports, i.e., whenever we were supposed to have N boxes/ports, only one box is used that has [N] attached to its name.

Step 1: Hash Join Bloom Filter Refinements

Original Architecture
New Architecture

This step refines both HJOIN boxes with an implementation that starts by building a bloom filter for input stream A, and then filters input stream B before applying the Hash Join operation to both streams.

Rules Used: HJOIN → bloomfilterhjoin

Step 2: Parallel Refinements

Original Architecture
New Architecture

This step refines all boxes present in the architecture (BLOOM, BFILTER and HJOIN) with parallel implementations.

There are two instances of each box, and both are refined (although only one instance of each is marked on the figure).

Rules Used: BLOOM → parallelbloom / BFILTER → parallelbfilter / HJOIN → parallelhjoin

Step 3: Optimize Merge Split Compositions

Original Architecture
New Architecture

This step applies an optimization to the architecture that replaces the compositions of MERGE and HSPLIT with the identity (there are four instances of this optimization, two marked on the figure), and another optimization that replaces the compositions of MMERGE and MSPLIT with the identity (there are two instances of this optimization, one marked on the figure).

Rules Used: ms_mergehsplit → MERGEHSPLIT → ms_identity / mms_mmergemsplit → MMERGESPLIT → mms_identity

Step 4: Rotation Optimization

Original Architecture
New Architecture

This step applies an optimization to the architecture that replaces the compositions of MERGE and HSPLIT with an equivalent implementation that starts by splitting each input stream (in parallel), and then merges all sub-streams that have the same hash value.

This is the last step of the derivation. We now have the optimized parallel implementation of the architecture.

Rules Used: ms_mergesplit → MERGESPLIT → ms_splitmerge