How things work

Explore the fundamental concepts of Signal/Collect

Programming Model

The strength of the Signal/Collect programming model consists in it being:

The intuition behind the Signal/Collect programming model is that computations are executed on a graph, where the vertices are the computational units that interact by the means of signals that flow along the edges. This is similar to the actor programming model. All computations in the vertices are accomplished by collecting the incoming signals and then signaling the neighbors in the graph.

This means that every vertex, beyond holding the information about its own id and state, has a collect function which computes the new vertex state based on the old vertex state and the signals that were received.

class Vertex {
  def collect: State
}

Every edge has a reference to its source vertex (if it is attached to one), the id of the target vertex and its weight. It implements a signal function which computes the signal sent along this edge.

class Edge {
  def signal: Signal
}

Default Vertex Types

In any Signal/Collect graph different vertices and edges can be freely combined. Usually a vertex is implemented by extending one of the default vertex implementations and an edge is implemented by extending DefaultEdge. If the default implementations are not memory-efficient enough for a specific purpose, then it is possible to write custom graph elements.

Signal/Collect has two different default vertex implementations:

DataGraphVertex is suitable for algorithms that iteratively update state associated with vertices and edges. DataFlowVertex is suitable for dataflow computations where data is routed through a network of processing vertices. For the user the main noticeable difference between these implementations is how received signals are submitted to the collect function.

DataGraphVertex

In most iterative computations old values get overridden by newer ones . This is why the collect function of DataGraphVertex can access only the most recently received signal for each incoming edge that has received at least one signal already.

DataFlowVertex

In a dataflow computation no signal should ever be lost. This is why the collect function of DataFlowVertex receives the a single signal parameter to collect signals as they arrive.


Default Edge Types

Analogous to the default vertex implementations, Signal/Collect also provides default implementations for edges. Depending on the specific algorithm these edges can directly be used for constructing a graph or provide some predefined functionality and therefore reduce the functionality that needs to be implemented on top of them when creating a new class that extends them.

Three abstract edge implementations exist (the signal functionality needs to be provided on top of them):

Edges don't necessarily have to be algorithm specific. Sometimes one of the provided concrete edge implementations can be used without any modifications:

The concrete class StateForwarderEdge, which extends the DefaultEdge abstract class, forwards its current state whenever the signal method is invoked by the source vertex.


Signal Collect Operations

A Signal/Collect computation consists of edges and vertices executing two operations:

A Signal/Collect computation is guided by two elements:


Algorithm Termination

There are several different mechanisms to terminate algorithm execution.

Automated Convergence Detection

This is the usual way for a computation to end and it is enabled by default. See the section Continuous Execution below to find out how to explicitly disable it.

A computation ends when a computation has converged. Convergence is detected with the scoreSignal/scoreCollect functions on vertices and the respective signalThreshold/collectThreshold which are defined globally. The framework will execute signal/collect operations while the respective scores are above the thresholds. A computation ends when no score is above the respective threshold anymore.

IMPORTANT: Vertices are only re-scored using scoreCollect if:

Vertices are only re-scored using scoreSignal if:

This is the default implementation of the scoreSignal method in AbstractVertex:

def scoreSignal: Double = {
  if (edgesModifiedSinceSignalOperation) {
    1
  } else {
    lastSignalState match {
      case Some(oldState) if oldState == state => 0
      case noStateOrStateChanged               => 1
    }
  }
}

The default implementation of the scoreSignal function indicates that a vertex should have its edges signal if a new edge was added or if the state has changed.

IMPORTANT: Detection of a changed vertex state is by default done with a reference comparison, so if the state reference points to a mutable collection that has changed, no change will be detected. You should in this case either use immutable collections or override the scoreSignal function.

The scoreSignal function can be overridden with an algorithm-specific implementation, for example to use the residual as a convergence criterion:

override def scoreSignal: Double = {
  lastSignalState match {
    case None => 1
    case Some(oldState) => (state - oldState).abs
  }
}

The default implementations of the scoreCollect function can be found in DataGraphVertex (Scala version, Java version) and DataFlowVertex (Scala version, Java version). The following is the implementation in Scala of the scoreCollect function in the DataGraphVertex. Collecting will only be done when either the mostRecentSignalMap is not empty or when some edges were modified.

def scoreCollect: Double = {
    if (!mostRecentSignalMap.isEmpty) {
      1.0
    } else if (edgesModifiedSinceCollectOperation) {
      1.0
    } else {
      0.0
    }
}

When you extend either the DataGraphVertex or the DataFlowVertex, the scoreCollect function can be overridden, if necessary.

Global Termination Conditions

It is possible to define termination conditions that depend on some global criterion by using AggregationOperations. There is a detailed description of how to use this including a usage example in the AggregationOperations wiki article.

Time

It is possible to set a time limit in milliseconds for a computation. The framework will terminate the computation when the limit is overstepped.

Usage:

val execConfig = ExecutionConfiguration.withTimeLimit(10000) // 10 seconds
graph.execute(execConfig)

Computation Steps

For synchronous computations it is possible to limit the number of computation steps that get executed. A computation step is a signal step followed by a collect step. A signal step is the parallel execution of the signal operation on all edges of vertices that have a signal score > signal threshold. A collect step is the parallel execution of collect operations on vertices that have a collect score > collect threshold.

Usage:

val execConfig = ExecutionConfiguration
      .withExecutionMode(ExecutionMode.Synchronous)
      .withStepsLimit(1)
graph.execute(execConfig)

Continuous Execution

Continuous execution is an asynchronous execution mode that disables the automated termination upon convergence detection. The main use case is for dataflow computations where the system should keep running even if it converges occasionally. When in this execution mode the Graph.execute method does not block, but it returns immediately.

Usage:

val execConfig = ExecutionConfiguration.withExecutionMode(ExecutionMode.ContinuousAsynchronous)
graph.execute(execConfig)

Execution modes

Signal/Collect supports different execution modes. The execution mode determines what kind of guarantees about the execution order of the signal/collect operations a user gets.

Synchronous execution

A synchronous computation consists of alternating parallel signaling/collecting phases for vertices. Phase transitions are globally synchronized, which guarantees that all vertices are in the same phase at the same time. One signaling phase followed by one collecting phase is referred to as a computation step. This execution mode is closely related to the Bulk Synchronous Parallel (BSP) model and Pregel.

In order to start a computation using the synchronous execution mode we call the execute method on our graph, with the parameter of an execution configuration, for which we already specified the ExecutionMode:

val graph = GraphBuilder.build
val execConfig = ExecutionConfiguration.withExecutionMode(ExecutionMode.Synchronous)
graph.execute(execConfig)

Asynchronous execution

In an asynchronous computation the framework does not give any guarantees about the order in which signal/collect operations get executed. This allows workers to operate without central synchronisation, which can improve performance. Some algorithms also have better convergence when run asynchronously, because it can prevent oscillations. This execution mode is closely related to the Actor Model. The default execution mode of Signal/Collect is to start with one synchronous signaling step followed by an asynchronous execution, we refer to this mode as OptimizedAsynchronousExecutionMode.

How to start a computation using the default execution mode:

val graph = GraphBuilder.build
graph.execute

How to start a computation using the pure asynchronous execution mode:

val graph = GraphBuilder.build
val execConfig = ExecutionConfiguration.withExecutionMode(ExecutionMode.PureAsynchronous)
graph.execute(execConfig)

There is also a continuous asynchronous execution mode, but apart from not blocking, but immediately returning and from not using termination detection (see Algorithm Termination), it is equivalent to the pure asynchronous execution mode:

val graph = GraphBuilder.build
val execConfig = ExecutionConfiguration.withExecutionMode(ExecutionMode.ContinuousAsynchronous)
graph.execute(execConfig)

Distributed Execution

The current release of the Signal/Collect framework enables distributing the execution of an algorithm among multiple machines.

We currently only included deployment code for Torque cluster instances. Official support for deployment on Amazon EC2 instances will follow in the near future.