diff --git a/Cargo.lock b/Cargo.lock index e632847..9003c45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,47 +3,29 @@ version = 3 [[package]] -name = "cfg-if" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" - -[[package]] -name = "crossbeam-deque" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" -dependencies = [ - "crossbeam-epoch", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-epoch" -version = "0.9.18" +name = "ahash" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ - "crossbeam-utils", + "cfg-if", + "getrandom", + "once_cell", + "version_check", + "zerocopy", ] [[package]] -name = "crossbeam-utils" -version = "0.8.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" - -[[package]] -name = "either" -version = "1.11.0" +name = "cfg-if" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "getrandom" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", "libc", @@ -52,76 +34,86 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.153" +version = "0.2.154" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "onebrc" version = "0.1.0" dependencies = [ - "rand", - "rayon", + "ahash", ] [[package]] -name = "ppv-lite86" -version = "0.2.17" +name = "proc-macro2" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" - -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +checksum = "8ad3d49ab951a01fbaafe34f2ec74122942fe18a3f9814c3268f1bb72042131b" dependencies = [ - "libc", - "rand_chacha", - "rand_core", + "unicode-ident", ] [[package]] -name = "rand_chacha" -version = "0.3.1" +name = "quote" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ - "ppv-lite86", - "rand_core", + "proc-macro2", ] [[package]] -name = "rand_core" -version = "0.6.4" +name = "syn" +version = "2.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +checksum = "c993ed8ccba56ae856363b1845da7266a7cb78e1d146c8a32d54b45a8b831fc9" dependencies = [ - "getrandom", + "proc-macro2", + "quote", + "unicode-ident", ] [[package]] -name = "rayon" -version = "1.10.0" +name = "unicode-ident" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" -dependencies = [ - "either", - "rayon-core", -] +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] -name = "rayon-core" -version = "1.12.1" +name = "version_check" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" -dependencies = [ - "crossbeam-deque", - "crossbeam-utils", -] +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "zerocopy" +version = "0.7.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index 6b2dc03..a075786 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,13 @@ version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[profile.release] +codegen-units = 1 +lto = "fat" +panic = "abort" +strip = "symbols" [dependencies] -rayon = "1.10.0" -rand = "0.8.5" +ahash = "0.8.11" +#debug = true + diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..ddff440 --- /dev/null +++ b/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["-C", "target-cpu=native"] diff --git a/src/main.rs b/src/main.rs index ae59014..f792e36 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,8 +14,9 @@ IEEE 754 rounding-direction "roundTowardPositive" */ -use std::collections::HashMap; +use ahash::AHashMap; use std::str; +use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::sync::Mutex; use std::thread; @@ -24,35 +25,38 @@ use std::{ fs::File, io::{Read, Seek, SeekFrom}, }; +// use std::collections::HashMap; +// type AHashMap = HashMap; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] struct WeatherData { - total_temperature: f64, - min_temperature: f64, - max_temperature: f64, + total_temperature: f32, + min_temperature: f32, + max_temperature: f32, count: u32, - mean_temperature: f64, + mean_temperature: f32, } impl WeatherData { + #[inline(always)] fn merge(&mut self, other: &WeatherData) { self.total_temperature += other.total_temperature; self.count += other.count; self.min_temperature = self.min_temperature.min(other.min_temperature); self.max_temperature = self.max_temperature.max(other.max_temperature); } - - fn add_temperature(&mut self, temperature: f64) { + #[inline(always)] + fn add_temperature(&mut self, temperature: f32) { self.min_temperature = self.min_temperature.min(temperature); self.max_temperature = self.max_temperature.max(temperature); self.total_temperature += temperature; self.count += 1; } - + #[inline(always)] fn update_mean(&mut self) { - self.mean_temperature = self.total_temperature / self.count as f64; + self.mean_temperature = self.total_temperature / self.count as f32; } - + #[inline(always)] fn round(&mut self) { self.mean_temperature = (self.mean_temperature * 10.0).round() / 10.0; self.min_temperature = (self.min_temperature * 10.0).round() / 10.0; @@ -60,14 +64,20 @@ impl WeatherData { } } -fn process_weather_line(line: &str) -> Result<(&str, WeatherData), &'static str> { +const KEY_SIZE: usize = 16; +type Key = [u8; KEY_SIZE]; +type StationTemperatures = AHashMap; + +fn process_weather_line(line: &str) -> (Key, WeatherData) { let parts: Vec<&str> = line.split(';').collect(); if parts.len() != 2 || line.is_empty() { - return Err("Invalid line"); + panic!("Invalid line"); } - - let station_name = parts[0]; - let temperature = parts[1].parse::().unwrap(); + let mut key = [0u8; KEY_SIZE]; + let name = parts[0].as_bytes(); + let n = name.len().min(KEY_SIZE); + key[..n].copy_from_slice(&name[..n]); + let temperature = parts[1].parse::().unwrap(); let weather_data = WeatherData { total_temperature: temperature, @@ -77,37 +87,38 @@ fn process_weather_line(line: &str) -> Result<(&str, WeatherData), &'static str> mean_temperature: 0.0, }; - Ok((station_name, weather_data)) + (key, weather_data) } -fn process_buffer(buf: &[u8]) -> (HashMap, u32) { - let mut station_temperatures: HashMap = HashMap::new(); - let mut station_name = String::new(); +#[inline(always)] +fn process_buffer(buf: &[u8]) -> (StationTemperatures, u32) { + let mut station_temperatures: StationTemperatures = AHashMap::with_capacity(1000); + let mut station_name = [0u8; KEY_SIZE]; let mut temperature = 0.0; let mut lines_count = 0; let mut negative_multiplier = 1; let mut state = 0; + let mut c = 0; - for (index, &byte) in buf.iter().enumerate() { + buf.iter().enumerate().for_each(|(index, &byte)| { if byte == b';' { state = 1; } else if state == 0 { - station_name.push(byte as char); + station_name[c] = byte; } else if byte == b'.' { - temperature = temperature + (u8::from(buf[index + 1]) - 48) as f64 * 0.1; - temperature = temperature * negative_multiplier as f64; + temperature = temperature + (u8::from(buf[index + 1]) - 48) as f32 * 0.1; + temperature = temperature * negative_multiplier as f32; state = 2; } else if byte == b'-' { negative_multiplier = -1; - continue; } else if state == 1 { - temperature = temperature * 10.0 + (u8::from(byte) - 48) as f64; + temperature = temperature * 10.0 + (u8::from(byte) - 48) as f32; } else if byte == b'\n' { if let Some(data) = station_temperatures.get_mut(&station_name) { data.add_temperature(temperature); } else { station_temperatures.insert( - station_name.to_string(), + station_name, WeatherData { total_temperature: temperature, count: 1, @@ -119,132 +130,139 @@ fn process_buffer(buf: &[u8]) -> (HashMap, u32) { } lines_count += 1; - station_name.clear(); + station_name.fill(0); temperature = 0.0; negative_multiplier = 1; state = 0; + c = KEY_SIZE; } - } - - return (station_temperatures, lines_count); -} -fn process_thread( - buf: &[u8], - extra_buffer_size: usize, - total_lines: Arc>, -) -> HashMap { - let mut start_index = 0; - let mut end_index = buf.len() - extra_buffer_size; - - for (index, &byte) in buf.iter().enumerate() { - if byte == b'\n' { - start_index = index + 1; - break; + if c + 1 < KEY_SIZE { + c += 1; + } else { + c = 0; } - } - - for (index, &byte) in buf[buf.len() - extra_buffer_size..].iter().enumerate() { - if byte == b'\n' { - end_index = index + buf.len() - extra_buffer_size + 1; - break; - } - } + }); - let (station_temperatures, lines_count) = process_buffer(&buf[start_index..end_index]); - - let mut total_lines = total_lines.lock().unwrap(); - *total_lines += lines_count; - drop(total_lines); + return (station_temperatures, lines_count); +} - return station_temperatures; +fn process_thread(buf: &[u8], extra_buffer_size: usize) -> (StationTemperatures, u32) { + let start_index = buf + .iter() + .position(|&b| b == b'\n') + .map(|i| i + 1) + .unwrap_or(0); + + let buf_default_pos = buf.len() - extra_buffer_size; + let end_index = buf[buf_default_pos..] + .iter() + .position(|&b| b == b'\n') + .map(|i| i + buf_default_pos + 1) + .unwrap_or(buf_default_pos); + + process_buffer(&buf[start_index..end_index]) } +const TOTAL_LINES: usize = 1_000_000_000; +const AVG_ROW_SIZE: usize = 14; +const THREAD_COUNT: usize = 250; +const BUFFER_SIZE: usize = 2_000_000; +const STAGE_COUNT: usize = (TOTAL_LINES * AVG_ROW_SIZE).div_ceil(THREAD_COUNT * BUFFER_SIZE); +const SINGLE_ROW_SIZE: usize = 64; + fn main() { let start_time = time::Instant::now(); + println!("buffer size: {:?}", BUFFER_SIZE); + // let cores: usize = std::thread::available_parallelism().unwrap().into(); // println!("{}", cores); - // let file_path = "weather_stations.csv"; + //let file_path = "weather_stations.csv"; let file_path = "measurements.txt"; - let stage_count = 30; - let max_threads = 250; - let buffer_size = 2000000; - let single_row_size = 100; - - let mut station_temperatures: HashMap = HashMap::new(); + let mut station_temperatures: StationTemperatures = AHashMap::with_capacity(500); // Process first line let mut file = File::open(file_path).expect("Unable to open file"); - let mut buf = vec![0; single_row_size]; + let mut buf = [0; KEY_SIZE + 5]; file.seek(SeekFrom::Start(0)).unwrap(); file.read(&mut buf).unwrap(); let first_line = str::from_utf8(&buf) .unwrap() .split('\n') .collect::>()[0]; - let first_line_data = process_weather_line(first_line).unwrap(); - station_temperatures.insert(first_line_data.0.to_string(), first_line_data.1); + let (key, value) = process_weather_line(first_line); + station_temperatures.insert(key, value); - let total_lines = Arc::new(Mutex::new(1)); - let station_temperatures_list: Arc>>> = - Arc::new(Mutex::new(Vec::new())); + let total_lines = Arc::new(AtomicU32::new(1)); + let station_temperatures_list: Arc>> = + Arc::new(Mutex::new(Vec::with_capacity(THREAD_COUNT))); - for stage_index in 0..stage_count { - let mut file_reader_threads = Vec::new(); + (0..STAGE_COUNT).for_each(|stage_index| { + let mut file_reader_threads = Vec::with_capacity(THREAD_COUNT); + + (0..THREAD_COUNT).for_each(|thread_index| { + let mut buf = [0; BUFFER_SIZE + SINGLE_ROW_SIZE]; + let start = stage_index * BUFFER_SIZE * THREAD_COUNT + thread_index * BUFFER_SIZE; - for thread_index in 0..max_threads { - let mut file = File::open(file_path).expect("Unable to open file"); - let mut buf = vec![0; buffer_size + single_row_size]; - let start = stage_index * buffer_size * max_threads + thread_index * buffer_size; - let total_lines = Arc::clone(&total_lines); let station_temperatures_list = Arc::clone(&station_temperatures_list); + let total_lines = Arc::clone(&total_lines); + + let mut file = File::open(file_path).expect("Unable to open file"); let file_reader_thread = thread::spawn(move || { file.seek(SeekFrom::Start(start as u64)).unwrap(); file.read(&mut buf).unwrap(); - let station_temperatures = process_thread(&buf, single_row_size, total_lines); + let (station_temperatures, lines_count) = process_thread(&buf, SINGLE_ROW_SIZE); + + total_lines.fetch_add(lines_count, std::sync::atomic::Ordering::SeqCst); let mut station_temperatures_list = station_temperatures_list.lock().unwrap(); station_temperatures_list.push(station_temperatures); }); file_reader_threads.push(file_reader_thread); - } + }); - for file_reader_thread in file_reader_threads { - file_reader_thread.join().unwrap(); - } + file_reader_threads + .into_iter() + .for_each(|thread| thread.join().unwrap()); println!("Stage: {:?} completed", stage_index); - } + }); let station_temperatures_list = station_temperatures_list.lock().unwrap(); - - for station_temperatures_data in station_temperatures_list.iter() { - for (station_name, data) in station_temperatures_data { + station_temperatures_list.iter().for_each(|st| { + st.iter().for_each(|(station_name, data)| { if let Some(parent_data) = station_temperatures.get_mut(station_name) { parent_data.merge(&data); } else { - station_temperatures.insert(station_name.to_string(), data.clone()); + station_temperatures.insert(*station_name, *data); } - } - } + }); + }); - for (_, data) in station_temperatures.iter_mut() { + station_temperatures.values_mut().for_each(|data| { data.update_mean(); data.round(); - } + }); + + let end_time = start_time.elapsed(); let mut station_temperatures: Vec<_> = station_temperatures.iter().collect(); station_temperatures.sort_by(|a, b| a.0.cmp(b.0)); for (station_name, data) in station_temperatures.iter() { println!( - "Station: {}, Min: {}, Mean: {}, Max: {}", - station_name, data.min_temperature, data.mean_temperature, data.max_temperature + "Station: {:?}, Min: {}, Mean: {}, Max: {}", + str::from_utf8(station_name.as_slice()) + .unwrap() + .replace("\0", ""), + data.min_temperature, + data.mean_temperature, + data.max_temperature ); // println!( // "{}={}/{}/{}", @@ -252,7 +270,10 @@ fn main() { // ); } - println!("Total lines: {:?}", *total_lines.lock().unwrap()); + println!( + "Total lines: {:?}", + total_lines.load(std::sync::atomic::Ordering::SeqCst) + ); println!("Total stations: {:?}", station_temperatures.len()); - println!("Elapsed time: {:?}", start_time.elapsed()); + println!("Elapsed time: {:?}", end_time); }