Skip to content

Commit

Permalink
Auto merge of #51060 - michaelwoerister:thread-safe-consts, r=<try>
Browse files Browse the repository at this point in the history
WIP: Make const decoding thread-safe.

This is an alternative to #50957. It's a proof of concept (e.g. it doesn't adapt metadata decoding, just the incr. comp. cache) but I think it turned out nice. It's rather simple and does not require passing around a bunch of weird closures, like we currently do.

If you (@Zoxc & @oli-obk) think this approach is good then I'm happy to finish and clean this up.

Note: The current version just spins when it encounters an in-progress decoding. I don't have a strong preference for this approach. Decoding concurrently is equally fine by me (or maybe even better because it doesn't require poisoning).

r? @Zoxc
  • Loading branch information
bors committed May 29, 2018
2 parents 4f6d9bf + 233c00c commit cd7eecf
Show file tree
Hide file tree
Showing 8 changed files with 449 additions and 115 deletions.
1 change: 1 addition & 0 deletions src/librustc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#![feature(trace_macros)]
#![feature(trusted_len)]
#![feature(catch_expr)]
#![feature(integer_atomics)]
#![feature(test)]
#![feature(in_band_lifetimes)]
#![feature(macro_at_most_once_rep)]
Expand Down
206 changes: 167 additions & 39 deletions src/librustc/mir/interpret/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ use std::io;
use std::ops::{Deref, DerefMut};
use std::hash::Hash;
use syntax::ast::Mutability;
use rustc_serialize::{Encoder, Decoder, Decodable, Encodable};
use rustc_serialize::{Encoder, Decodable, Encodable};
use rustc_data_structures::sorted_map::SortedMap;
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::sync::{Lock as Mutex, HashMapExt};
use rustc_data_structures::tiny_list::TinyList;
use byteorder::{WriteBytesExt, ReadBytesExt, LittleEndian, BigEndian};
use ty::codec::TyDecoder;
use std::sync::atomic::{AtomicU32, Ordering};
use std::num::NonZeroU32;

#[derive(Clone, Debug, PartialEq, RustcEncodable, RustcDecodable)]
pub enum Lock {
Expand Down Expand Up @@ -204,44 +209,163 @@ pub fn specialized_encode_alloc_id<
Ok(())
}

pub fn specialized_decode_alloc_id<
'a, 'tcx,
D: Decoder,
CACHE: FnOnce(&mut D, AllocId),
>(
decoder: &mut D,
tcx: TyCtxt<'a, 'tcx, 'tcx>,
cache: CACHE,
) -> Result<AllocId, D::Error> {
match AllocKind::decode(decoder)? {
AllocKind::Alloc => {
let alloc_id = tcx.alloc_map.lock().reserve();
trace!("creating alloc id {:?}", alloc_id);
// insert early to allow recursive allocs
cache(decoder, alloc_id);

let allocation = <&'tcx Allocation as Decodable>::decode(decoder)?;
trace!("decoded alloc {:?} {:#?}", alloc_id, allocation);
tcx.alloc_map.lock().set_id_memory(alloc_id, allocation);

Ok(alloc_id)
},
AllocKind::Fn => {
trace!("creating fn alloc id");
let instance = ty::Instance::decode(decoder)?;
trace!("decoded fn alloc instance: {:?}", instance);
let id = tcx.alloc_map.lock().create_fn_alloc(instance);
trace!("created fn alloc id: {:?}", id);
cache(decoder, id);
Ok(id)
},
AllocKind::Static => {
trace!("creating extern static alloc id at");
let did = DefId::decode(decoder)?;
let alloc_id = tcx.alloc_map.lock().intern_static(did);
cache(decoder, alloc_id);
Ok(alloc_id)
},
// Used to avoid infinite recursion when decoding cyclic allocations.
type DecodingSessionId = NonZeroU32;

#[derive(Clone)]
enum State {
Empty,
InProgressNonAlloc(TinyList<DecodingSessionId>),
InProgress(TinyList<DecodingSessionId>, AllocId),
Done(AllocId),
}

pub struct AllocDecodingState {
// For each AllocId we keep track of which decoding state it's currently in.
decoding_state: Vec<Mutex<State>>,
// The offsets of each allocation in the data stream.
data_offsets: Vec<u32>,
}

impl AllocDecodingState {

pub fn new_decoding_session(&self) -> AllocDecodingSession {
static DECODER_SESSION_ID: AtomicU32 = AtomicU32::new(0);
let counter = DECODER_SESSION_ID.fetch_add(1, Ordering::SeqCst);

// Make sure this is never zero
let session_id = DecodingSessionId::new((counter & 0x7FFFFFFF) + 1).unwrap();

AllocDecodingSession {
state: self,
session_id,
}
}

pub fn new(data_offsets: Vec<u32>) -> AllocDecodingState {
let decoding_state: Vec<_> = ::std::iter::repeat(Mutex::new(State::Empty))
.take(data_offsets.len())
.collect();

AllocDecodingState {
decoding_state: decoding_state,
data_offsets,
}
}
}

#[derive(Copy, Clone)]
pub struct AllocDecodingSession<'s> {
state: &'s AllocDecodingState,
session_id: DecodingSessionId,
}

impl<'s> AllocDecodingSession<'s> {

// Decodes an AllocId in a thread-safe way.
pub fn decode_alloc_id<'a, 'tcx, D>(&self,
decoder: &mut D)
-> Result<AllocId, D::Error>
where D: TyDecoder<'a, 'tcx>,
'tcx: 'a,
{
// Read the index of the allocation
let idx = decoder.read_u32()? as usize;
let pos = self.state.data_offsets[idx] as usize;

// Decode the AllocKind now so that we know if we have to reserve an
// AllocId.
let (alloc_kind, pos) = decoder.with_position(pos, |decoder| {
let alloc_kind = AllocKind::decode(decoder)?;
Ok((alloc_kind, decoder.position()))
})?;

// Check the decoding state, see if it's already decoded or if we should
// decode it here.
let alloc_id = {
let mut entry = self.state.decoding_state[idx].lock();

match *entry {
State::Done(alloc_id) => {
return Ok(alloc_id);
}
ref mut entry @ State::Empty => {
// We are allowed to decode
match alloc_kind {
AllocKind::Alloc => {
// If this is an allocation, we need to reserve an
// AllocId so we can decode cyclic graphs.
let alloc_id = decoder.tcx().alloc_map.lock().reserve();
*entry = State::InProgress(
TinyList::new_single(self.session_id),
alloc_id);
Some(alloc_id)
},
AllocKind::Fn | AllocKind::Static => {
// Fns and statics cannot be cyclic and their AllocId
// is determined later by interning
*entry = State::InProgressNonAlloc(
TinyList::new_single(self.session_id));
None
}
}
}
State::InProgressNonAlloc(ref mut sessions) => {
if sessions.contains(&self.session_id) {
bug!("This should be unreachable")
} else {
// Start decoding concurrently
sessions.insert(self.session_id);
None
}
}
State::InProgress(ref mut sessions, alloc_id) => {
if sessions.contains(&self.session_id) {
// Don't recurse.
return Ok(alloc_id)
} else {
// Start decoding concurrently
sessions.insert(self.session_id);
Some(alloc_id)
}
}
}
};

// Now decode the actual data
let alloc_id = decoder.with_position(pos, |decoder| {
match alloc_kind {
AllocKind::Alloc => {
let allocation = <&'tcx Allocation as Decodable>::decode(decoder)?;
// We already have a reserved AllocId.
let alloc_id = alloc_id.unwrap();
trace!("decoded alloc {:?} {:#?}", alloc_id, allocation);
decoder.tcx().alloc_map.lock().set_id_same_memory(alloc_id, allocation);
Ok(alloc_id)
},
AllocKind::Fn => {
assert!(alloc_id.is_none());
trace!("creating fn alloc id");
let instance = ty::Instance::decode(decoder)?;
trace!("decoded fn alloc instance: {:?}", instance);
let alloc_id = decoder.tcx().alloc_map.lock().create_fn_alloc(instance);
Ok(alloc_id)
},
AllocKind::Static => {
assert!(alloc_id.is_none());
trace!("creating extern static alloc id at");
let did = DefId::decode(decoder)?;
let alloc_id = decoder.tcx().alloc_map.lock().intern_static(did);
Ok(alloc_id)
}
}
})?;

self.state.decoding_state[idx].with_lock(|entry| {
*entry = State::Done(alloc_id);
});

Ok(alloc_id)
}
}

Expand Down Expand Up @@ -340,6 +464,10 @@ impl<'tcx, M: fmt::Debug + Eq + Hash + Clone> AllocMap<'tcx, M> {
bug!("tried to set allocation id {}, but it was already existing as {:#?}", id, old);
}
}

pub fn set_id_same_memory(&mut self, id: AllocId, mem: M) {
self.id_to_type.insert_same(id, AllocType::Memory(mem));
}
}

#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash, RustcEncodable, RustcDecodable)]
Expand Down
51 changes: 10 additions & 41 deletions src/librustc/ty/maps/on_disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use hir::def_id::{CrateNum, DefIndex, DefId, LocalDefId,
use hir::map::definitions::DefPathHash;
use ich::{CachingCodemapView, Fingerprint};
use mir::{self, interpret};
use mir::interpret::{AllocDecodingSession, AllocDecodingState};
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::sync::{Lrc, Lock, HashMapExt, Once};
use rustc_data_structures::indexed_vec::{IndexVec, Idx};
use rustc_serialize::{Decodable, Decoder, Encodable, Encoder, opaque,
SpecializedDecoder, SpecializedEncoder,
UseSpecializedDecodable, UseSpecializedEncodable};
use session::{CrateDisambiguator, Session};
use std::cell::RefCell;
use std::mem;
use syntax::ast::NodeId;
use syntax::codemap::{CodeMap, StableFilemapId};
Expand Down Expand Up @@ -77,11 +77,7 @@ pub struct OnDiskCache<'sess> {
// `serialized_data`.
prev_diagnostics_index: FxHashMap<SerializedDepNodeIndex, AbsoluteBytePos>,

// Alloc indices to memory location map
prev_interpret_alloc_index: Vec<AbsoluteBytePos>,

/// Deserialization: A cache to ensure we don't read allocations twice
interpret_alloc_cache: RefCell<FxHashMap<usize, interpret::AllocId>>,
alloc_decoding_state: AllocDecodingState,
}

// This type is used only for (de-)serialization.
Expand All @@ -92,7 +88,7 @@ struct Footer {
query_result_index: EncodedQueryResultIndex,
diagnostics_index: EncodedQueryResultIndex,
// the location of all allocations
interpret_alloc_index: Vec<AbsoluteBytePos>,
interpret_alloc_index: Vec<u32>,
}

type EncodedQueryResultIndex = Vec<(SerializedDepNodeIndex, AbsoluteBytePos)>;
Expand Down Expand Up @@ -149,8 +145,7 @@ impl<'sess> OnDiskCache<'sess> {
query_result_index: footer.query_result_index.into_iter().collect(),
prev_diagnostics_index: footer.diagnostics_index.into_iter().collect(),
synthetic_expansion_infos: Lock::new(FxHashMap()),
prev_interpret_alloc_index: footer.interpret_alloc_index,
interpret_alloc_cache: RefCell::new(FxHashMap::default()),
alloc_decoding_state: AllocDecodingState::new(footer.interpret_alloc_index),
}
}

Expand All @@ -166,8 +161,7 @@ impl<'sess> OnDiskCache<'sess> {
query_result_index: FxHashMap(),
prev_diagnostics_index: FxHashMap(),
synthetic_expansion_infos: Lock::new(FxHashMap()),
prev_interpret_alloc_index: Vec::new(),
interpret_alloc_cache: RefCell::new(FxHashMap::default()),
alloc_decoding_state: AllocDecodingState::new(Vec::new()),
}
}

Expand Down Expand Up @@ -291,7 +285,7 @@ impl<'sess> OnDiskCache<'sess> {
}
for idx in n..new_n {
let id = encoder.interpret_allocs_inverse[idx];
let pos = AbsoluteBytePos::new(encoder.position());
let pos = encoder.position() as u32;
interpret_alloc_index.push(pos);
interpret::specialized_encode_alloc_id(
&mut encoder,
Expand Down Expand Up @@ -424,8 +418,7 @@ impl<'sess> OnDiskCache<'sess> {
file_index_to_file: &self.file_index_to_file,
file_index_to_stable_id: &self.file_index_to_stable_id,
synthetic_expansion_infos: &self.synthetic_expansion_infos,
prev_interpret_alloc_index: &self.prev_interpret_alloc_index,
interpret_alloc_cache: &self.interpret_alloc_cache,
alloc_decoding_session: self.alloc_decoding_state.new_decoding_session(),
};

match decode_tagged(&mut decoder, dep_node_index) {
Expand Down Expand Up @@ -487,9 +480,7 @@ struct CacheDecoder<'a, 'tcx: 'a, 'x> {
synthetic_expansion_infos: &'x Lock<FxHashMap<AbsoluteBytePos, SyntaxContext>>,
file_index_to_file: &'x Lock<FxHashMap<FileMapIndex, Lrc<FileMap>>>,
file_index_to_stable_id: &'x FxHashMap<FileMapIndex, StableFilemapId>,
interpret_alloc_cache: &'x RefCell<FxHashMap<usize, interpret::AllocId>>,
/// maps from index in the cache file to location in the cache file
prev_interpret_alloc_index: &'x [AbsoluteBytePos],
alloc_decoding_session: AllocDecodingSession<'x>,
}

impl<'a, 'tcx, 'x> CacheDecoder<'a, 'tcx, 'x> {
Expand Down Expand Up @@ -612,30 +603,8 @@ implement_ty_decoder!( CacheDecoder<'a, 'tcx, 'x> );

impl<'a, 'tcx, 'x> SpecializedDecoder<interpret::AllocId> for CacheDecoder<'a, 'tcx, 'x> {
fn specialized_decode(&mut self) -> Result<interpret::AllocId, Self::Error> {
let tcx = self.tcx;
let idx = usize::decode(self)?;
trace!("loading index {}", idx);

if let Some(cached) = self.interpret_alloc_cache.borrow().get(&idx).cloned() {
trace!("loading alloc id {:?} from alloc_cache", cached);
return Ok(cached);
}
let pos = self.prev_interpret_alloc_index[idx].to_usize();
trace!("loading position {}", pos);
self.with_position(pos, |this| {
interpret::specialized_decode_alloc_id(
this,
tcx,
|this, alloc_id| {
trace!("caching idx {} for alloc id {} at position {}", idx, alloc_id, pos);
assert!(this
.interpret_alloc_cache
.borrow_mut()
.insert(idx, alloc_id)
.is_none());
},
)
})
let alloc_decoding_session = self.alloc_decoding_session;
alloc_decoding_session.decode_alloc_id(self)
}
}
impl<'a, 'tcx, 'x> SpecializedDecoder<Span> for CacheDecoder<'a, 'tcx, 'x> {
Expand Down
1 change: 1 addition & 0 deletions src/librustc_data_structures/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub mod control_flow_graph;
pub mod flock;
pub mod sync;
pub mod owning_ref;
pub mod tiny_list;
pub mod sorted_map;

pub struct OnDrop<F: Fn()>(pub F);
Expand Down
Loading

0 comments on commit cd7eecf

Please sign in to comment.