Apache SAMOA is an open source platform for mining big data streams. SAMOA features a Write-Once-Run-Anywhere (WORA) architecture which allows multiple Distributed Stream Processing Engines (DSPEs) to be integrated into the framework. In this blog, we’ll describe the integration with Apache Apex which is a YARN native, unified batch and stream processing engine.
Apache SAMOA – Background
Traditionally, all processing was non-distributed and often used to be done on a single machine. The non-distributed space already has a lot of tools, more so in the Batch space. Several tools have cropped up in this space over the period of time. R, WEKA, Matlab, Octave, Python libraries etc. have dominated this space for quite some time.
These tools worked well, but only on small datasets.
As the data sizes started exploding and the traditional tools seemed unable to cope with this growing demand. To address this scale of data, the paradigm shifted towards distributed processing though still in batch mode (example Apache Mahout). This meant that all of the data had to be available before the processing started. While some applications could work with this restriction, this was considered suboptimal for some applications such as the ones that needed low latency. At the same time, the stream processing options (on non-distributed systems) were also explored. MOA (Massive Online Analysis) project  which is closely related to WEKA  was also developed to address the online space on non-distributed systems.
Slowly, the sizes of data started increasing beyond the capacities of non-distributed systems and the trend started shifting towards real-time decision making and online processing.
Apache SAMOA was one of the tools developed to address the need of online stream mining for big data.
Apache Apex – YARN Native Framework
Background on YARN – Yet Another Resource Negotiator
Hadoop is a tool which is well known in the distributed space and is considered a game changer in the space of big data processing. Until recently, Hadoop was considered as a batch system which was only capable of running MapReduce algorithms on the given data. Even though this was a big deal some time back, the limitations became apparent to the users within a short period of time. It was a tedious job to translate all programming logic into the MapReduce framework; some of the times, it was not even possible. On top of that, it had the limitation of being a batch system which was against the recent trend of online processing and real-time decision making. To address the growing demands of real-time and near-real-time systems, Hadoop 2.0 introduced a resource management framework called YARN . Earlier with MapReduce being the only type of job running on Hadoop, YARN changes it by allowing a multitude of applications to run on a distributed Hadoop cluster. In fact, YARN actually gives up some of the micromanagement,specifically, the type of applications and limits its role to just resource management in the cluster. Hence the acronym YARN – Yet Another Resource Negotiator. It does not even track individual applications. YARN requires that the type of application that wishes to run on Hadoop 2.0, must provide its own Application Master which takes care of the components of its application. It provides an Application Master for MapReduce though, for backward compatibility with Hadoop 1.0. This means any MapReduce jobs which ran on Hadoop 1.0 would also be able to run on Hadoop 2.0.
Apache Apex is a Stream Processing platform that leverages the resource management capabilities provided by YARN. Hence, Apex is a YARN native application framework. Apex processes big data in a streaming fashion and considers batch as a special case of streaming – a limited stream.
Apex splits up the processing into multiple stages similar to a CPU pipeline. These stages are arranged in a directed acyclic graph or DAG. Each node in this DAG is an operator in the processing logic. The DAG starts with some special operators called Input Adapters which fetch data from external sources like file systems or databases and ends with operators called Output Adapters which store data to some external destination. The operators in the middle process the incoming tuples as per the required business logic and pass on the resultant tuples to the next operator. Figure below illustrates a DAG.
It is worth noting that each of these operators runs in a distributed fashion. They may be deployed on different nodes across the cluster. Apex follows a different model where the data goes to the computation as opposed to computation going to data in MapReduce.
Apache SAMOA’s architecture allows for multiple types of integrations. First is the ML- adapter layer which allows other machine learning libraries to integrate and be part of the SAMOA framework. However, our focus in this blog will be the second type of API called SPE – adapter layer.
This layer is provided to allow other stream processing engines (SPEs) to integrate with Apache SAMOA. This integration requires implementation for a set of functions which essentially map the topology in SAMOA to the topology in the target SPE. In the case of Apache Apex, we implemented the mapping from SAMOA topology to an Apex DAG. Doing this gives us the capability to run all SAMOA algorithms onto the target SPE, in our case Apex.
The core API exposed by the SAMOA will create the topology from scratch by adding operators and streams much like the populateDAG method does for Apex.
As the topology is created in SAMOA, we correspondingly add equivalent Apex operators and streams to create the Apex DAG. The following image shows the init() method that creates a Vertical Hoeffding Tree topology. To translate this topology into an Apex DAG, we need to map the calls like addProcessor(), createStream(), connectShuffleInputStream() etc into calls which would do the similar job for an Apex DAG.
Once this mapping is implemented, the code in the image would actually produce an Apex DAG instead of a SAMOA topology. This is when we use an Apex component factory. The purpose of the factory is to identify the specifics of the runner being used and translate the SAMOA topology into the runner specific topology.
In the following sections, we describe the mapping of various SAMOA topology elements into Apex components.
Processing Item and Processor
In SAMOA, a Processor is the basic logical processing unit. Basically, a Processor defines what needs to be done with an incoming tuple. On top of the Processor, is the wrapper called Processing Item which is actually the node in the SAMOA topology. This is depicted in the figure below.
The equivalent for Apex is as follows:
- Processing Item maps to an Operator
- Processor is what defines the process method of the Input Ports of the operator
- Streams in SAMOA map to the Apex stream defined using Input and Output ports
In addition, we have an Entrance Processor which is considered to be the generator of streams in SAMOA. This type of processor maps to an Input Operator in Apex which has a similar responsibility of generating streams.
Content Event is the type of message that flows through the SAMOA topology. It fits directly into the Apex framework as Apex supports any Object to flow through the DAG.
A Stream is a physical unit of SAMOA topology which connects different processors together. We create an Apex stream for each stream in SAMOA. However, in Apex, the stream has endpoints called ports (input and output ports). SAMOA does not have the concepts of ports. For this reason, when we define a stream in Apex with ports, we need to translate the method calls on the SAMOA stream to the calls on a port in Apex.
In addition to this distinction, SAMOA also supports different ways in which a Processing Item can absorb an incoming stream. The following are the connection semantics:
- Shuffle – Upstream data distributed randomly to downstream partitions (instances)
- Key – Upstream data distributed based on the key to downstream partitions. A key goes to exactly one downstream instance.
- All – Upstream data goes to all downstream partitions.
Apex supports a much richer set of such stream mappings. The way to do it is through Stream Codec  and Partitioners . We define custom stream codecs to handle (1) and (2). (3) is handled using a custom Partitioner which can send data to all downstream partitions.
A task is an execution entity in SAMOA. It is much like an execution of a DAG in Apex and actually defines one. However, it differs from an actual DAG at runtime in that it is only part of the DAG. For example, a task can be a Prequential Evaluation task. This task is responsible for running a Prequential Evaluation on the defined input data. However, the actual DAG will be decided based on what algorithm is chosen to be run in the Prequential Evaluation.
A topology builder will populate the SAMOA topology with Processors and Processing Items. In terms of Apex, this is like an Application which defines the populateDAG method.
Loops in the topology
Apex topology is a DAG which by definition means that no cycles can be present. However, a SAMOA topology may contain loops which is quite common in Machine Learning algorithms. To address this and many other use cases, a new feature was introduced in Apex v3.3.0 to support iterations in Apex. This is achieved by means of a new type of operator called Delay Operator.
A delay operator, for the user, is a simple pass through operator. However, it is considered as a special operator by the Apex Engine and which automatically increments the window id of the output tuples. The output port of a Delay Operator must connect to an upstream operator forming a loop in the topology. However, since the window id is incremented at the output, it is not exactly a loop in the physical execution. In other words, it resembles an unrolled loop similar to a stretched spring. Hence, the topology still maintains the nature of a DAG while still allowing loops in the topology.
Once the SAMOA topology is translated to an Apex topology, we walk through the topology to identify any loops and insert a Delay Operator in such a stream.
Serialization of Apex Ports
SAMOA uses streams to emit data to downstream operators. However, Apex uses another abstraction called ports to do the same thing. In normal usage, a user does not need to serialize Input and Output ports declared within an operator. The mapping of ports, however, is maintained in other objects and the ports are re-created when the operators are actually deployed in the target container. However, in the case of SAMOA, the serialization is needed as ports become the state of the streams. Re-creating the ports would not help, as the objects would change. This is because the mapping of streams to ports must be maintained even at run time for SAMOA to be able to co-ordinate the flow of tuples on the topology.
For interested readers, the API is described in detail in  with an example of Apache Storm. Also README.md in  shows how to use the Apex runner in SAMOA.
By Bhupesh Chawda, Engineer at DataTorrent and Committer at Apache Apex
 SAMOA Developers guide https://samoa.incubator.apache.org/documentation/SAMOA-Developers-Guide-0.0.1.pdf
 Apache SAMOA https://samoa.incubator.apache.org/index.html
 Apache Apex https://apex.apache.org/
 Apache Hadoop – YARN https://hadoop.apache.org/docs/r2.7.1/hadoop-yarn/hadoop-yarn-site/YARN.html
 Apache Apex Stream Codecs https://apex.apache.org/docs/apex/application_development/#stream-codec
 Apache Apex Custom Partitioning https://apex.apache.org/docs/apex/development_best_practices/#custom-partitioning
 Massive Online Analysis http://moa.cms.waikato.ac.nz/
 WEKA Project http://www.cs.waikato.ac.nz/ml/weka/
 Apex runner for SAMOA https://github.com/apache/incubator-samoa/tree/master/samoa-apex