Skip to content

Commit

Permalink
Merge pull request #1346 from subspace/improve-archiver-performance-p…
Browse files Browse the repository at this point in the history
…art-2

Improve archiver performance (part 2)
  • Loading branch information
nazar-pc authored Apr 3, 2023
2 parents 0043ed4 + c2bbc11 commit 06e5c4d
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::archiver::Segment;
use alloc::vec::Vec;
use core::ops::{Deref, DerefMut};
use parity_scale_codec::{Encode, Output};
#[cfg(feature = "rayon")]
use rayon::prelude::*;
use subspace_core_primitives::crypto::kzg::{Commitment, Kzg};
use subspace_core_primitives::crypto::Scalar;
use subspace_core_primitives::RawRecord;
Expand Down Expand Up @@ -62,49 +64,66 @@ pub(super) fn update_record_commitments(
/// Processor is hidden to not expose unnecessary implementation details (like `Output` trait
/// implementation)
struct IncrementalRecordCommitmentsProcessor<'a> {
/// Processed bytes in the segment so far
processed_bytes: usize,
/// Buffer where current (partial) record is written
raw_record_buffer: Vec<u8>,
/// Number of bytes of recorded history segment for which commitments were already created
skip_bytes: usize,
/// Buffer where new bytes for which commitments need to be created are pushed
buffer: Vec<u8>,
/// Record commitments already created
incremental_record_commitments: &'a mut IncrementalRecordCommitmentsState,
/// Kzg instance used for commitments creation
kzg: &'a Kzg,
}

impl<'a> Drop for IncrementalRecordCommitmentsProcessor<'a> {
fn drop(&mut self) {
#[cfg(not(feature = "rayon"))]
let raw_records_bytes = self.buffer.chunks_exact(RawRecord::SIZE);
#[cfg(feature = "rayon")]
let raw_records_bytes = self.buffer.par_chunks_exact(RawRecord::SIZE);

let iter = raw_records_bytes
.map(|raw_record_bytes| {
raw_record_bytes
.array_chunks::<{ Scalar::SAFE_BYTES }>()
.map(Scalar::from)
})
.map(|record_chunks| {
let number_of_chunks = record_chunks.len();
let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());

record_chunks.collect_into(&mut scalars);

// Number of scalars for KZG must be a power of two elements
scalars.resize(scalars.capacity(), Scalar::default());

let polynomial = self
.kzg
.poly(&scalars)
.expect("KZG instance must be configured to support this many scalars; qed");
self.kzg
.commit(&polynomial)
.expect("KZG instance must be configured to support this many scalars; qed")
});

#[cfg(not(feature = "rayon"))]
iter.collect_into(&mut self.incremental_record_commitments.state);
// TODO: `collect_into_vec()`, unfortunately, truncates input, which is not what we want
// can be unified when https://github.com/rayon-rs/rayon/issues/1039 is resolved
#[cfg(feature = "rayon")]
self.incremental_record_commitments
.extend(&iter.collect::<Vec<_>>());
}
}

impl<'a> Output for IncrementalRecordCommitmentsProcessor<'a> {
fn write(&mut self, mut bytes: &[u8]) {
// Try to finish last partial record if possible

let record_offset = self.processed_bytes % RawRecord::SIZE;
let bytes_left_in_record = RawRecord::SIZE - record_offset;
if bytes_left_in_record > 0 {
let remaining_record_bytes;
(remaining_record_bytes, bytes) =
bytes.split_at(if bytes.len() >= bytes_left_in_record {
bytes_left_in_record
} else {
bytes.len()
});

self.update_commitment_state(remaining_record_bytes);

if remaining_record_bytes.len() == bytes_left_in_record {
self.create_commitment();
}
if self.skip_bytes >= bytes.len() {
self.skip_bytes -= bytes.len();
} else {
bytes = &bytes[self.skip_bytes..];
self.skip_bytes = 0;
self.buffer.extend_from_slice(bytes);
}

// Continue processing records (full and partial) from remaining data, at this point we have
// processed some number of full records, so can simply chunk the remaining bytes into
// record sizes
bytes.chunks(RawRecord::SIZE).for_each(|record| {
self.update_commitment_state(record);

// Store hashes of full records
if record.len() == RawRecord::SIZE {
self.create_commitment();
}
});
}
}

Expand All @@ -114,64 +133,11 @@ impl<'a> IncrementalRecordCommitmentsProcessor<'a> {
kzg: &'a Kzg,
) -> Self {
Self {
// TODO: Remove `processed_bytes`, `raw_record_buffer` should be sufficient
processed_bytes: 0,
raw_record_buffer: Vec::with_capacity(RawRecord::SIZE),
skip_bytes: incremental_record_commitments.len() * RawRecord::SIZE,
// Default to record size, may grow if necessary
buffer: Vec::with_capacity(RawRecord::SIZE),
incremental_record_commitments,
kzg,
}
}

/// Whether commitment for current record needs to be created
fn should_commit_to_record(&self, record_position: usize) -> bool {
self.incremental_record_commitments
.state
.get(record_position)
.is_none()
}

/// In case commitment is necessary for currently processed record, internal commitment state
/// will be updated with provided bytes.
///
/// NOTE: This method is called with bytes that either cover part of the record or stop at the
/// edge of the record.
fn update_commitment_state(&mut self, bytes: &[u8]) {
if self.should_commit_to_record(self.processed_bytes / RawRecord::SIZE) {
self.raw_record_buffer.extend_from_slice(bytes);
}
self.processed_bytes += bytes.len();
}

/// In case commitment is necessary for currently processed record, internal hashing state will
/// be finalized and commitment will be stored in shared state.
fn create_commitment(&mut self) {
if self.should_commit_to_record(self.processed_bytes / RawRecord::SIZE - 1) {
let scalars = {
let record_chunks = self
.raw_record_buffer
.array_chunks::<{ Scalar::SAFE_BYTES }>();
let number_of_chunks = record_chunks.len();
let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());

record_chunks.map(Scalar::from).collect_into(&mut scalars);

// Number of scalars for KZG must be a power of two elements
scalars.resize(scalars.capacity(), Scalar::default());

scalars
};
self.raw_record_buffer.clear();

let polynomial = self
.kzg
.poly(&scalars)
.expect("KZG instance must be configured to support this many scalars; qed");
let commitment = self
.kzg
.commit(&polynomial)
.expect("KZG instance must be configured to support this many scalars; qed");

self.incremental_record_commitments.state.push(commitment);
}
}
}
93 changes: 52 additions & 41 deletions crates/subspace-archiving/src/piece_reconstructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ extern crate alloc;
use alloc::string::String;
use alloc::vec::Vec;
use core::num::NonZeroUsize;
#[cfg(feature = "rayon")]
use rayon::prelude::*;
use subspace_core_primitives::crypto::kzg::{Commitment, Kzg, Polynomial};
use subspace_core_primitives::crypto::{blake2b_256_254_hash_to_scalar, Scalar};
use subspace_core_primitives::{ArchivedHistorySegment, Piece, RawRecord, RecordedHistorySegment};
use subspace_core_primitives::{ArchivedHistorySegment, Piece, RawRecord};
use subspace_erasure_coding::ErasureCoding;

/// Reconstructor-related instantiation error.
Expand Down Expand Up @@ -143,46 +145,50 @@ impl PiecesReconstructor {
}
}

let mut source_record_commitments =
Vec::with_capacity(RecordedHistorySegment::NUM_RAW_RECORDS);
for (piece, maybe_input_piece) in
reconstructed_pieces.iter_mut().zip(input_pieces).step_by(2)
{
if let Some(input_piece) = maybe_input_piece {
source_record_commitments.push(
let source_record_commitments = {
#[cfg(not(feature = "rayon"))]
let iter = reconstructed_pieces.iter_mut().zip(input_pieces).step_by(2);
#[cfg(feature = "rayon")]
let iter = reconstructed_pieces
.par_iter_mut()
.zip_eq(input_pieces)
.step_by(2);

iter.map(|(piece, maybe_input_piece)| {
if let Some(input_piece) = maybe_input_piece {
Commitment::try_from_bytes(input_piece.commitment())
.map_err(|_error| ReconstructorError::InvalidInputPieceCommitment)?,
);
} else {
let scalars = {
let record_chunks = piece.record().full_scalar_arrays();
let number_of_chunks = record_chunks.len();
let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());

for record_chunk in record_chunks {
scalars.push(
Scalar::try_from(record_chunk)
.map_err(ReconstructorError::DataShardsReconstruction)?,
);
}

// Number of scalars for KZG must be a power of two elements
scalars.resize(scalars.capacity(), Scalar::default());

scalars
};

let polynomial = self
.kzg
.poly(&scalars)
.expect("KZG instance must be configured to support this many scalars; qed");
let commitment = self
.kzg
.commit(&polynomial)
.expect("KZG instance must be configured to support this many scalars; qed");
source_record_commitments.push(commitment);
}
}
.map_err(|_error| ReconstructorError::InvalidInputPieceCommitment)
} else {
let scalars = {
let record_chunks = piece.record().full_scalar_arrays();
let number_of_chunks = record_chunks.len();
let mut scalars = Vec::with_capacity(number_of_chunks.next_power_of_two());

for record_chunk in record_chunks {
scalars.push(
Scalar::try_from(record_chunk)
.map_err(ReconstructorError::DataShardsReconstruction)?,
);
}

// Number of scalars for KZG must be a power of two elements
scalars.resize(scalars.capacity(), Scalar::default());

scalars
};

let polynomial = self.kzg.poly(&scalars).expect(
"KZG instance must be configured to support this many scalars; qed",
);
let commitment = self.kzg.commit(&polynomial).expect(
"KZG instance must be configured to support this many scalars; qed",
);

Ok(commitment)
}
})
.collect::<Result<Vec<_>, _>>()?
};
let record_commitments = self
.erasure_coding
.extend_commitments(&source_record_commitments)
Expand Down Expand Up @@ -220,7 +226,12 @@ impl PiecesReconstructor {
) -> Result<ArchivedHistorySegment, ReconstructorError> {
let (mut pieces, polynomial) = self.reconstruct_shards(segment_pieces)?;

pieces.iter_mut().enumerate().for_each(|(position, piece)| {
#[cfg(not(feature = "rayon"))]
let iter = pieces.iter_mut().enumerate();
#[cfg(feature = "rayon")]
let iter = pieces.par_iter_mut().enumerate();

iter.for_each(|(position, piece)| {
piece.witness_mut().copy_from_slice(
&self
.kzg
Expand Down
Loading

0 comments on commit 06e5c4d

Please sign in to comment.