Publisher for Scabbard v3
Summary
The publisher component is in charge of executing pending batches and building the finished result that will be used by some supervisor in consensus agreement. The published result may be a block, like in Sawtooth or the execution result of one or more batches like in Scabbard.
The following design proposes an implementation for publishing that is flexible for many use cases while keeping in mind reducing the required number of threads that are actively running at one time. Thread reduction is important for high availability requirements for v3 Scabbard.
Guide-level explanation
The publisher is in charge of returning the next item to publish to the
supervisor and consensus. It will return the Artifact
to agree upon, for
example a block or a batch execution result. The publisher needs to be able to
continue to publish until it is told to stop. The work may be canceled if the
building Artifact
will no longer be valid, for example if the current chain
head has changed.
The following design is done with the goal of defining Rust structs that would take a set of traits. These traits would allow the same publisher structs to be used in any situation (e.i. Scabbard or Sawtooth).
The supervisor will use a PublishFactory
that will be used to start the
execution. Once started, a PublishHandle
is returned that is used to finish or
cancel the publishing.
PublishFactory.start(..)
The supervisor will tell the publisher to start building a new
Artifact
. This is done prior to consensus actually being ready to
publish to avoid having to wait for the completion of transaction processing
before receiving the result.
The publisher will get batches from an iterator passed on start. The publisher
will pass the batch/batches to the BatchVerifier
, which will return batch
execution results. Results will not be returned until finish()
is called. If
the implementation allows for multiple batches, this allows for batches to
continue to be added and scheduled if they are available.
PublishHandle.finish()
When the supervisor is ready to publish a new Artifact
, it will call
finish()
on the PublishHandle
. At this time, implementations should abort
any scheduled and current batch validation. The completed batch execution
results will be converted to the expected Artifact
and returned to the
supervisor.
PublishHandle.cancel()
It is possible that work that is in progress will never be published, for
example if the state root hash has changed. Therefore, the supervisor tells the
PublishHandle
to cancel()
the current work, aborting all execution and
throwing away any completed batch execution results.
Reference-level explanation
Traits used by Publisher
The following traits will be used by either being Boxed and passed to the structs, as generics or as associated types. The traits will live in the Sawtooth library.
Artifact
The return type after publishing is complete. Defines one of the generics for the publisher. It will live in sawtooth-lib.
/// An artifact.
///
/// An artifact is an identifiable product of the publishing process and the
/// core value to be validated by a system based on Sawtooth.
pub trait Artifact: Clone + Send {
/// The type of the identifier.
type Identifier: ?Sized;
/// Returns a reference to this artifact's identifier.
fn artifact_id(&self) -> &Self::Identifier;
}
PublishingContext
The context that contains specific information for the building Artifact
.
This trait will live in sawtooth-lib and is used to define one of the generics
for the publisher.
/// The context used by the publisher
pub trait PublishingContext: Send {}
Batch
The trait for a batch that will be used by the publisher. This trait will live in sawtooth-lib and is used to define one of the generics for the publisher.
pub trait Batch: Send {}
ArtifactCreator
This trait will be Boxed and used to generate the resulting Artifact
from the resulting execution results.
/// An artifact creator.
///
/// Used to create new instances of `Artifact`.
pub trait ArtifactCreator: Send {
/// The context that contains extraneous information required to create the
/// `Artifact`.
type Context;
/// The type of input for the specific `Artifact`
type Input;
/// The `Artifact` type
type Artifact;
/// Creates a new `Artifact`
///
/// Returns a an `Artifact` created from the context and input
fn create(
&self,
context: &mut Self::Context,
input: Self::Input,
) -> Result<Self::Artifact, InternalError>;
}
The publisher will receive a factory for creating this creator, as one is
required per call to start()
.
/// An artifact creator factory.
///
/// Used to create new instances of `ArtifactCreator`.
pub trait ArtifactCreatorFactory {
/// The `ArtifactCreator` type
type ArtifactCreator;
/// Returns a new `ArtifactCreator`
fn new_creator(&self) -> Result<Self::ArtifactCreator, InternalError>;
}
PendingBatches
This trait will be used to get the pending batches, from the PendingQueue
for
example. This is similar to an Iterator
but needs to be Send
. A Boxed
version will be passed on start.
/// Return the next `Batch` that should be executed
pub trait PendingBatches<B: Batch>: Send {
fn next(&mut self) -> Result<Option<B>, InternalError>;
}
BatchVerifier
This trait will be used to execute the transactions inside of the batches and
return a list of BatchExecutionResults
. This trait is meant to mimic the
Scheduler
trait in Transact, however it is currently simplified to take out
any assumptions about the threading model in the Executor
and Scheduler
.
/// Result of executing a batch.
pub trait BatchExecutionResult: Send {}
/// Verify the contents of a `Batch` and its transactions
pub trait BatchVerifier: Send {
type Batch: Batch;
type Context: PublishingContext;
type ExecutionResult: BatchExecutionResult;
/// Add a new batch to the verifier
fn add_batch(&mut self, batch: Self::Batch) -> Result<(), InternalError>;
/// Finalize the verification of the batches, returning the execution results of batches
/// that have completed execution.
fn finalize(&mut self) -> Result<Vec<Self::ExecutionResult>, InternalError>;
/// Cancel exeuction of batches, discarding all excution results
fn cancel(&mut self) -> Result<(), InternalError>;
}
When looking at the current uses of Scheduler.result_callback()
, all examples
either directly send the new batch result over a channel or convert it into a
SchedulerEvent(Result, Complete, Error)
which is required to know when all
batches have been completed. This finalize method would not be useful for the
use case where all batches submitted must be completed, but does work for the
publisher where execution must be able to be halted at any point.
A new batch verifier will be created when new publishing begins. As such, the
publisher will have a BatchVerifierFactory
.
/// A Factory to create `BatchVerifier` instances
pub trait BatchVerifierFactory {
type Batch: Batch;
type Context: PublishingContext;
type ExecutionResult: BatchExecutionResult;
// allow type complexity here because of the use of associated types
#[allow(clippy::type_complexity)]
/// Start execution of batches in relation to a specific context by
/// returning a `BatchVerifier`
fn start(
&mut self,
context: Self::Context,
) -> Result<
Box<
dyn BatchVerifier<
Batch = Self::Batch,
Context = Self::Context,
ExecutionResult = Self::ExecutionResult,
>,
>,
InternalError,
>;
}
Publisher Structs
The following structs are meant to only be implemented once and should be
flexible enough to be used for all future use cases. This is possible by using
both generics and Boxed trait implementations. The publisher will be made up
of two main structs called the PublishFactory
and PublishHandle
.
PublishFactory
The PublishFactory
will be owned by a supervisor and will be used to start
the publishing for a Artifact
. The supervisor will pass a
PublishingContext
and Box<PendingBatches>
to the start()
function. The
PublishFactory
will spin up a thread for the context.
pub enum PublishMessage {
Cancel,
Finish,
Next,
Dropped,
}
pub struct PublishFactory<B, C, R, I>
where
B: 'static + Batch + Clone,
C: 'static + PublishingContext + Clone,
R: 'static + Artifact,
I: 'static + BatchExecutionResult,
{
artifact_creator_factory: Box<
dyn ArtifactCreatorFactory<
ArtifactCreator = Box<dyn ArtifactCreator<Context = C, Input = Vec<I>, Artifact = R>>,
>,
>,
batch_verifier_factory:
Box<dyn BatchVerifierFactory<Batch = B, Context = C, ExecutionResult = I>>,
}
impl<B, C, R, I> PublishFactory<B, C, R, I>
where
B: 'static + Batch + Clone,
C: 'static + PublishingContext + Clone,
R: 'static + Artifact,
I: 'static + BatchExecutionResult,
{
pub fn new(
artifact_creator_factory: Box<
dyn ArtifactCreatorFactory<
ArtifactCreator = Box<
dyn ArtifactCreator<Context = C, Input = Vec<I>, Artifact = R>,
>,
>,
>,
batch_verifier_factory: Box<
dyn BatchVerifierFactory<Batch = B, Context = C, ExecutionResult = I>,
>,
) -> Self {
Self {
artifact_creator_factory,
batch_verifier_factory,
}
}
}
impl<B, C, R, I> PublishFactory<B, C, R, I>
where
B: 'static + Batch + Clone,
C: 'static + PublishingContext + Clone,
R: 'static + Artifact,
I: 'static + BatchExecutionResult,
{
pub fn new(
artifact_creator_factory: Box<
dyn ArtifactCreatorFactory<
ArtifactCreator = Box<
dyn ArtifactCreator<Context = C, Input = Vec<I>, Artifact = R>,
>,
>,
>,
batch_verifier_factory: Box<
dyn BatchVerifierFactory<Batch = B, Context = C, ExecutionResult = I>,
>,
) -> Self {
Self {
artifact_creator_factory,
batch_verifier_factory,
}
}
}
impl<B, C, R, I> PublishFactory<B, C, R, I>
where
B: 'static + Batch + Clone,
C: 'static + PublishingContext + Clone,
R: 'static + Artifact,
I: 'static + BatchExecutionResult,
{
/// Start building the next `Artifact`
///
/// # Arguments
///
/// * `context` - Implementation specific context for the publisher
/// * `batches` - An interator the returns the next batch to execute
///
/// Returns a PublishHandle that can be used to finish or cancel the executing batch
pub fn start(
&mut self,
mut context: C,
mut batches: Box<dyn PendingBatches<B>>,
) -> Result<PublishHandle<R>, InternalError> {
let (sender, rc) = channel();
let mut verifier = self.batch_verifier_factory.start(context.clone())?;
let artifact_creator = self.artifact_creator_factory.new_creator()?;
let join_handle = thread::spawn(move || loop {
// drain the queue
while let Some(batch) = batches.next().map_err(|err| err.reduce_to_string())? {
verifier
.add_batch(batch)
.map_err(|err| err.reduce_to_string())?;
}
// Check to see if the batch result should be finished/canceled
match rc.recv() {
Ok(PublishMessage::Cancel) => {
verifier.cancel().map_err(|err| err.reduce_to_string())?;
return Ok(None);
}
Ok(PublishMessage::Finish) => {
let results = verifier.finalize().map_err(|err| err.reduce_to_string())?;
return Ok(Some(
artifact_creator
.create(&mut context, results)
.map_err(|err| err.reduce_to_string())?,
));
}
Ok(PublishMessage::Dropped) => {
return Ok(None);
}
Ok(PublishMessage::Next) => {
while let Some(batch) = batches.next().map_err(|err| err.reduce_to_string())? {
verifier
.add_batch(batch)
.map_err(|err| err.reduce_to_string())?;
}
}
Err(err) => {
return Err(InternalError::from_source(Box::new(err)).reduce_to_string());
}
};
});
Ok(PublishHandle::new(sender, join_handle))
}
}
PublishHandler
Once the thread is created a PublishHandler
will be returned that can be used
by the supervisor to cancel or finish the execution, and receive the finished
Artifacts
.
If the PublishHandle
is dropped without calling either function, the thread
will also be shutdown without issue.
/// Handler for interating with a publishing thread
pub struct PublishHandle<R>
where
R: Artifact,
{
sender: Option<Sender<PublishMessage>>,
join_handle: Option<thread::JoinHandle<Result<Option<R>, String>>>,
}
impl<R> PublishHandle<R>
where
R: Artifact,
{
/// Create a new `PublishHandle`
///
/// Arguments
///
/// * `sender` - The `Sender` for seting updates to the publishing thread
/// * `join_handle` - The `JoinHandle` to the publishing thread, used to get the published
/// `Artifact`
pub fn new(
sender: Sender<PublishMessage>,
join_handle: thread::JoinHandle<Result<Option<R>, String>>,
) -> Self {
Self {
sender: Some(sender),
join_handle: Some(join_handle),
}
}
/// Finish constructing the block, returning a result that contains the bytes that consensus
/// must agree upon and a list of TransactionReceipts. Any batches that are not finished
/// processing, will be returned to the pending state.
pub fn finish(mut self) -> Result<R, InternalError> {
if let Some(sender) = self.sender.take() {
sender
.send(PublishMessage::Finish)
.map_err(|err| InternalError::from_source(Box::new(err)))?;
match self
.join_handle
.take()
.ok_or_else(|| InternalError::with_message("Missing join handle".into()))?
.join()
.map_err(|err| {
InternalError::with_message(format!(
"Unable to call join on join handle: {:?}",
err
))
})? {
Ok(Some(result)) => Ok(result),
// no result returned
Ok(None) => Err(InternalError::with_message(
"Publishing was finalized, should have received results".into(),
)),
Err(err) => Err(InternalError::with_message(err)),
}
} else {
// already called finish or cancel
Err(InternalError::with_message(
"Publishing has already been finished or canceled".into(),
))
}
}
/// Cancel the currently building block, putting all batches back into a pending state
pub fn cancel(mut self) -> Result<(), InternalError> {
if let Some(sender) = self.sender.take() {
sender
.send(PublishMessage::Cancel)
.map_err(|err| InternalError::from_source(Box::new(err)))?;
match self
.join_handle
.take()
.ok_or_else(|| InternalError::with_message("Missing join handle".into()))?
.join()
.map_err(|err| {
InternalError::with_message(format!(
"Unable to call join on join handle: {:?}",
err
))
})? {
// Did not expect any results
Ok(Some(_)) => Err(InternalError::with_message(
"Publishing was cancel, should not have got results".into(),
)),
Ok(None) => Ok(()),
Err(err) => Err(InternalError::with_message(err)),
}
} else {
// already called finish or cancel
Err(InternalError::with_message(
"Publishing has already been finished or canceled".into(),
))
}
}
/// Notify that there is a batch added to the pending queue
pub fn next_batch(&self) -> Result<(), InternalError> {
if let Some(sender) = &self.sender {
sender
.send(PublishMessage::Next)
.map_err(|err| InternalError::from_source(Box::new(err)))
} else {
Ok(())
}
}
}
impl<R> Drop for PublishHandle<R>
where
R: Artifact,
{
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
match sender.send(PublishMessage::Dropped) {
Ok(_) => (),
Err(_) => {
error!("Unable to shutdown Publisher thread")
}
}
}
}
}
Example
An example of using the above API is as follows:
let artifact_creator_factory = Box::new(TestArtifactCreatorFactory {});
let batch_verifier_factory = Box::new(TestBatchVerifierFactory {});
let mut publish_factory: PublishFactory<
TestBatch,
Arc<Mutex<TestContext>>,
TestArtifact,
TestBatchExecutionResult,
> = PublishFactory::new(artifact_creator_factory, batch_verifier_factory);
let pending_batches = Box::new(BatchIter {
batches: vec![TestBatch {
value: "value_1".to_string(),
}],
});
let context = Arc::new(Mutex::new(TestContext {
current_block_height: 0,
current_state_value: "genesis".to_string(),
}));
// Start publishing for first batch
let publish_handle = publish_factory
.start(context.clone(), pending_batches)
.expect("Unable to start publishing thread");
publish_handle
.next_batch()
.expect("Unable to notify publisher thread of new batch");
// Finish the publishing of the artifact
let artifact = publish_handle
.finish()
.expect("Unable to finalize publishing thread");
assert_eq!(artifact.block_height, 1);
assert_eq!(artifact.current_value, "value_1".to_string());
Rationale and alternatives
Another option investigated was to define the Publisher as a set of traits that would require every use case to create their own implementation.
While this would allow for more flexibility, it would require more work by developers when working on a new use case. We do not currently have any use case that differs from the above process.
Prior art
This design is inspired by the existing Publisher in Sawtooth Core and the Sawtooth library. As well as the publishing process that is used in Scabbard and the transaction execution process in Transact.
Unresolved questions
Should the PublisherFactory
enforce the number of threads allowed with a
thread pool? Or should the caller control how many publishing threads should
exist, as publisher threads correlated to the number of PublishHandles
?
The current Transact implementation requires several threads running for the transaction execution. How this API may be improved to reduce the number of threads required is yet to be determined.