Disclaimer: These are planning documents. The functionalities described here may be unimplemented, partially implemented, or implemented differently than the original design.

Consensus Runner

Summary

The consensus runner component performs two important operations for a given service: fetching the outstanding ConsensusActions to be executed, followed by fetching the next available event to be processed via the appropriate consensus algorithm.

Motivation

As the consensus algorithms do not require threads, processing actions and events should also not require any threads. The consensus runner provides a general solution for this problem.

Guide-level Explanation

A ConsensusRunner struct will operate on a given fully-qualified service ID (FQSI). As part of a timer execution, it will fetch the outstanding actions and execute them. Next, it will fetch the next unprocessed event and run that through the appropriate algorithm. This will repeat on the next timer execution for the FQSI.

Actions

Actions are the direct result of processing an event via an Augrim consensus algorithm. However, all actions from an event must be processed before proceeding to the next event. The ConsensusRunner will load the next set of unprocessed actions and execute them via an ActionRunner. See the ActionRunner design for more details.

The unprocessed actions will be provided by the ScabbardStore::list_consensus_actions function.

Actions should be run both before and after the events are processed. This ensures that any actions are processed before the next event is executed, in the case of a restart. Actions should be processed after the event to complete the processing of the event and its resulting action(s).

Events

Augrim consensus algorithms rely on events to produce actions. The consensus runner provides the glue to load those events from the database and process them and store their results in the database.

The ScabbardStore::list_consensus_events should return any unprocessed events available in its underlying storage. The consensus runner should execute the first of these unprocessed events, if one is available.

The consensus runner will support systems that leave the choice of consensus up to a run-time configuration parameter. This is accomplished via the augrim Algorithm trait, a system for converting events (similar to the Trait Adapter Pattern in the splinter service API), and a new trait to provide an algorithm route that can be added to the event.

The consensus runner will then load the context for the given FQSI and execute the event using the identified algorithm.

Reference-level Explanation

Main Execution

The consensus runner will have the following items:

struct ConsensusRunner<E: StoreCommandExecutor> {
    pooled_scabbard_store_factory: Arc<dyn PooledScabbardStoreFactory>,
    action_runner: ConsensusActionRunner<<E as StoreCommandExecutor>::Context>,
    algorithms: HashMap<
        String,
        Box<
            dyn Algorithm<
                Event = ConsensusEvent,
                Action = ConsensusAction,
                Context = ConsensusContext,
            >,
        >,
    >,
    consensus_store_command_factory:
        ConsensusStoreCommandFactory<<E as StoreCommandExecutor>::Context>,
    store_command_executor: Arc<E>,
}

The ConsensusRunner effective runs the following loop:

fn run(&self, service_id: &FullyQualifiedServiceId)
    -> Result<(), InternalError>
{
    let store = self.pooled_scabbard_store_factory.new_store();

    let unprocessed_actions = store.list_consensus_actions(service_id)?;

    if !unprocessed_actions.is_empty() {
        // run each action and execute the commands before running the next action
        for action in unprocessed_actions {
            let commands = self.action_runner.run_actions(vec![action], service_id)?;
            self.store_command_executor.execute(commands)?;
        }
    }

    let unprocessed_event = store
        .list_consensus_events(service_id)?
        .get(0)
        .cloned();

    let event = match unprocessed_event {
        Some(event) => event,
        None => {
            // No events
            return Ok(());
        }
    };

    let (event_id, event) = event.deconstruct();

    let context = store
        .get_current_consensus_context(service_id)?
        .ok_or_else(|| {
            InternalError::with_message(format!(
                "No scabbard context for service {}",
                service_id
            ))
        })?;

    let epoch = context.epoch();

    let algorithm = self.algorithms.get(event.algorithm_name()).ok_or_else(|| {
        InternalError::with_message(format!("{} is not configured", event.algorithm_name()))
    })?;
    let actions = algorithm
        .event(event, context)
        .map_err(|e| InternalError::from_source(Box::new(e)))?;

    let commands = vec![
        self.consensus_store_command_factory
            .new_save_actions_command(service_id, actions, event_id),
        self.consensus_store_command_factory
            .new_mark_event_complete_command(service_id, event_id, epoch),
    ];
    self.store_command_executor.execute(commands)?;

    // run the resulting actions
    let unprocessed_actions = store.list_consensus_actions(service_id)?;

    if !unprocessed_actions.is_empty() {
        // run each action and execute the commands before running the next action
        for action in unprocessed_actions {
            let commands = self.action_runner.run_actions(vec![action], service_id)?;
            self.store_command_executor.execute(commands)?;
        }
    }

    Ok(())
}

This run function references several methods on the ScabbardStore trait: list_consensus_actions, list_consensus_events, and get_current_consensus_context.

The ConsensusEvent will include the following method

impl ConsensusEvent {
    /// Returns the associated algorithm name for this item.
    fn algorithm_name(&self) -> &str {
        // ...
    }
}

This provides a means to route the event to the appropriate algorithm for processing.

The ConsensusStoreCommandFactory struct provides the following methods:

pub struct ConsensusStoreCommandFactory<C> {
    fn new_save_actions_command(
        &self,
        service_id: &FullyQualifiedServiceId,
        actions: Vec<ScabbardAction>,
    ) -> Box<dyn StoreCommand<Context = C>> {
        // ...
    }

    fn new_mark_event_complete_command(
        &self,
        service_id: &FullyQualifiedServiceId,
        event_id: i64,
    ) -> Box<dyn StoreCommand<Context = C>> {
        // ...
    }
}

Algorithm Trait Adapter Pattern

In order to easily facilitate the run-time selection of Algorithm we will need a function that may transform an algorithm such that it may accept inputs and produce outputs for the general system. This could be used to handle things like run-time switching, serialization, or the like.

On the Algorithm trait (or an AlgorithmExt trait in scabbard), a method can be added:

trait Algorithm {
    ...
    fn into_algorithm<E, A, C>(self) -> IntoAlgorithm<Self, E, A, C>
    where
        Self: Sized,
        Self::Event: TryFrom<E, Error = InternalError>,
        A: TryFrom<Self::Action, Error = InternalError>,
        Self::Context: TryFrom<C, Error = InternalError>,
    {
        ...
    }
}

Event and Context use TryFrom implementations from their input versions. The output Action expects a TryFrom implementation from the internal one.

where IntoAlgorithm<T, E, A, C> is defined as

struct IntoAlgorithm<T, E, A, C>
where
    P: Process,
    T: Algorithm<P>,
    <T as Algorithm<P>>::Event: TryFrom<E, Error = InternalError>,
    A: TryFrom<<T as Algorithm<P>>::Action, Error = InternalError>,
    <T as Algorithm<P>>::Context: TryFrom<C, Error = InternalError>,
{
    ...
}

In scabbard, each of the event, action, and context for the supported algorithms will be wrapped in an enum. For example, the event would be:

use augrim::algorithm::two_phase_commit::TwoPhaseCommitEvent;
use augrim::algorithm::three_phase_commit::ThreePhaseCommitEvent;

enum ConsensusEvent {
    TwoPhaseCommit(TwoPhaseCommitEvent),
    ThreePhaseCommit(ThreePhaseCommitEvent),
    ...
}

A similar enum would exist for ConsensusAction and ConsensusContext.

And each algorithm specified by the enum would have a TryFrom implementation for the scabbard algorithm. For example, two-phase commit event would have:

use augrim::algorithm::two_phase_commit::TwoPhaseCommitEvent;

impl TryFrom<ConsensusEvent> for TwoPhaseCommitEvent {
    type Error = InternalError;

    fn try_from(evt: ConsensusEvent) -> Result<Self, Self::Error> {
        ...
    }
}