-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,6 @@ | |
use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread}; | ||
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] | ||
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; | ||
#[cfg(feature = "wrapper-allocator")] | ||
use wrapper_allocator::ALLOCATOR_DATA; | ||
use crate::{ | ||
common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop}, | ||
prepare, prevalidate, LOG_TARGET, | ||
|
@@ -33,6 +31,8 @@ use polkadot_node_core_pvf::{ | |
}; | ||
use std::{any::Any, panic, path::PathBuf, sync::mpsc::channel}; | ||
use tokio::{io, net::UnixStream}; | ||
#[cfg(feature = "wrapper-allocator")] | ||
use wrapper_allocator::ALLOC; | ||
|
||
async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> { | ||
let pvf = framed_recv(stream).await?; | ||
|
@@ -112,18 +112,19 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { | |
let prepare_fut = rt_handle | ||
.spawn_blocking(move || { | ||
#[cfg(feature = "wrapper-allocator")] | ||
ALLOCATOR_DATA.checkpoint(); | ||
ALLOC.start_tracking(100_000_000); | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
s0me0ne-unkn0wn
Author
Contributor
|
||
|
||
let result = prepare_artifact(pvf); | ||
|
||
#[cfg(feature = "wrapper-allocator")] | ||
{ | ||
let peak = ALLOCATOR_DATA.checkpoint(); | ||
let (events, peak) = ALLOC.end_tracking(); | ||
gum::debug!( | ||
target: LOG_TARGET, | ||
%worker_pid, | ||
"prepare job peak allocation is {} bytes", | ||
"prepare job peak allocation is {} bytes in {} events", | ||
peak, | ||
events, | ||
); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,65 +14,138 @@ | |
// You should have received a copy of the GNU General Public License | ||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
use core::sync::atomic::{ Ordering::SeqCst, AtomicUsize }; | ||
//! Tracking global allocator. Initially just forwards allocation and deallocation requests | ||
//! to the underlying allocator. When tracking is enabled, stores every allocation event into | ||
//! pre-allocated backlog. When tracking mode is disables, replays the backlog and counts the | ||
//! number of allocation events and the peak allocation value. | ||
|
||
use core::alloc::{GlobalAlloc, Layout}; | ||
use std::sync::{ | ||
atomic::{AtomicUsize, Ordering::Relaxed}, | ||
RwLock, | ||
}; | ||
use tikv_jemallocator::Jemalloc; | ||
|
||
pub struct WrapperAllocatorData { | ||
allocated: AtomicUsize, | ||
checkpoint: AtomicUsize, | ||
peak: AtomicUsize, | ||
// limit: AtomicUsize, // Should we introduce a checkpoint limit and fail allocation if the limit is hit? | ||
struct WrapperAllocatorData { | ||
tracking: RwLock<bool>, | ||
backlog: Vec<isize>, | ||
backlog_index: AtomicUsize, | ||
} | ||
|
||
impl WrapperAllocatorData { | ||
/// Marks a new checkpoint. Returns peak allocation, in bytes, since the last checkpoint. | ||
pub fn checkpoint(&self) -> usize { | ||
let allocated = ALLOCATOR_DATA.allocated.load(SeqCst); | ||
let old_cp = ALLOCATOR_DATA.checkpoint.swap(allocated, SeqCst); | ||
ALLOCATOR_DATA.peak.swap(allocated, SeqCst).saturating_sub(old_cp) | ||
} | ||
// SAFETY: | ||
// * Tracking must only be performed by a single thread at a time | ||
// * `start_tracking` and `stop_tracking` must be called from the same thread | ||
// * Tracking periods must not overlap | ||
// * Caller must provide sufficient backlog size | ||
|
||
unsafe fn start_tracking(&mut self, backlog_size: usize) { | ||
// Allocate the backlog before locking anything. The allocation won't be available later. | ||
let backlog = Vec::with_capacity(backlog_size); | ||
This comment has been minimized.
Sorry, something went wrong.
mrcnski
Contributor
|
||
// Lock allocations, move the allocated vector to our place and start tracking. | ||
let mut tracking = self.tracking.write().unwrap(); | ||
assert!(!*tracking); // Shouldn't start tracking if already tracking | ||
self.backlog = backlog; | ||
self.backlog.resize(backlog_size, 0); | ||
self.backlog_index.store(0, Relaxed); | ||
*tracking = true; | ||
} | ||
|
||
unsafe fn end_tracking(&mut self) -> (usize, isize) { | ||
let mut tracking = self.tracking.write().unwrap(); | ||
assert!(*tracking); // Start/end calls must be consistent | ||
|
||
// At this point, all the allocation is blocked as all the threads are waiting for | ||
// read lock on `tracking`. The following code replays the backlog and calulates the | ||
// peak value. It must not perform any allocation, otherwise a deadlock will occur. | ||
let mut peak = 0; | ||
let mut alloc = 0; | ||
let mut events = 0usize; | ||
for i in 0..self.backlog.len() { | ||
if self.backlog[i] == 0 { | ||
break | ||
} | ||
events += 1; | ||
alloc += self.backlog[i]; | ||
if alloc > peak { | ||
peak = alloc | ||
} | ||
} | ||
*tracking = false; | ||
(events, peak) | ||
} | ||
|
||
#[inline] | ||
unsafe fn track(&mut self, alloc: isize) { | ||
let tracking = self.tracking.read().unwrap(); | ||
if !*tracking { | ||
return | ||
} | ||
let i = self.backlog_index.fetch_add(1, Relaxed); | ||
if i == self.backlog.len() { | ||
// We cannot use formatted text here as it would result in allocations and a deadlock | ||
panic!("Backlog size provided was not enough for allocation tracking"); | ||
This comment has been minimized.
Sorry, something went wrong.
mrcnski
Contributor
|
||
} | ||
// It is safe as the vector is pre-allocated and the index is acquired atomically | ||
self.backlog[i] = alloc; | ||
} | ||
} | ||
|
||
pub static ALLOCATOR_DATA: WrapperAllocatorData = WrapperAllocatorData { allocated: AtomicUsize::new(0), checkpoint: AtomicUsize::new(0), peak: AtomicUsize::new(0) }; | ||
static mut ALLOCATOR_DATA: WrapperAllocatorData = WrapperAllocatorData { | ||
tracking: RwLock::new(false), | ||
backlog: vec![], | ||
backlog_index: AtomicUsize::new(0), | ||
}; | ||
|
||
pub struct WrapperAllocator<A: GlobalAlloc>(A); | ||
|
||
struct WrapperAllocator<A: GlobalAlloc>(A); | ||
impl<A: GlobalAlloc> WrapperAllocator<A> { | ||
/// Start tracking with the given backlog size (in allocation events). Providing insufficient | ||
/// backlog size will result in a panic. | ||
pub fn start_tracking(&self, backlog_size: usize) { | ||
unsafe { | ||
ALLOCATOR_DATA.start_tracking(backlog_size); | ||
} | ||
} | ||
|
||
/// End tracking and return number of allocation events (as `usize`) and peak allocation | ||
/// value in bytes (as `isize`). Peak allocation value is not guaranteed to be neither | ||
/// non-zero nor positive. | ||
pub fn end_tracking(&self) -> (usize, isize) { | ||
unsafe { ALLOCATOR_DATA.end_tracking() } | ||
} | ||
} | ||
|
||
unsafe impl<A: GlobalAlloc> GlobalAlloc for WrapperAllocator<A> { | ||
// SAFETY: | ||
// * The wrapped methods are as safe as the underlying allocator implementation is | ||
// * In tracking mode, it is safe as long as a sufficient backlog size is provided when | ||
// entering the tracking mode | ||
|
||
#[inline] | ||
unsafe fn alloc(&self, layout: Layout) -> *mut u8 { | ||
ALLOCATOR_DATA.track(layout.size() as isize); | ||
self.0.alloc(layout) | ||
} | ||
|
||
#[inline] | ||
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { | ||
ALLOCATOR_DATA.track(layout.size() as isize); | ||
self.0.alloc_zeroed(layout) | ||
} | ||
|
||
#[inline] | ||
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) -> () { | ||
ALLOCATOR_DATA.track(-(layout.size() as isize)); | ||
self.0.dealloc(ptr, layout) | ||
} | ||
|
||
// SAFETY: The wrapped methods are as safe as the underlying allocator implementation is. | ||
|
||
#[inline] | ||
unsafe fn alloc(&self, layout: Layout) -> *mut u8 { | ||
let old_alloc = ALLOCATOR_DATA.allocated.fetch_add(layout.size(), SeqCst); | ||
ALLOCATOR_DATA.peak.fetch_max(old_alloc + layout.size(), SeqCst); | ||
self.0.alloc(layout) | ||
} | ||
|
||
#[inline] | ||
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { | ||
let old_alloc = ALLOCATOR_DATA.allocated.fetch_add(layout.size(), SeqCst); | ||
ALLOCATOR_DATA.peak.fetch_max(old_alloc + layout.size(), SeqCst); | ||
self.0.alloc_zeroed(layout) | ||
} | ||
|
||
#[inline] | ||
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) -> () { | ||
ALLOCATOR_DATA.allocated.fetch_sub(layout.size(), SeqCst); | ||
self.0.dealloc(ptr, layout) | ||
} | ||
|
||
#[inline] | ||
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { | ||
if new_size > layout.size() { | ||
let old_alloc = ALLOCATOR_DATA.allocated.fetch_add(new_size - layout.size(), SeqCst); | ||
ALLOCATOR_DATA.peak.fetch_max(old_alloc + new_size - layout.size(), SeqCst); | ||
} else { | ||
ALLOCATOR_DATA.allocated.fetch_sub(layout.size() - new_size, SeqCst); | ||
} | ||
self.0.realloc(ptr, layout, new_size) | ||
} | ||
#[inline] | ||
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { | ||
ALLOCATOR_DATA.track((new_size as isize) - (layout.size() as isize)); | ||
self.0.realloc(ptr, layout, new_size) | ||
} | ||
} | ||
|
||
#[global_allocator] | ||
static ALLOC: WrapperAllocator<Jemalloc> = WrapperAllocator(Jemalloc); | ||
pub static ALLOC: WrapperAllocator<Jemalloc> = WrapperAllocator(Jemalloc); |
This is a big allocation. Should it be included in the stats to get an accurate number for the thread's real memory usage? And is
400mb800mb per prepare job not too much? Can you remind me what are the advantages of saving every allocation?