diff --git a/Cargo.toml b/Cargo.toml index e3c492d..ce87336 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,5 +9,4 @@ edition = "2018" redis = "0.19.0" log = "0.4.11" stderrlog = "0.5" -lazy_static = "1" clap = "3.0.0-beta.2" diff --git a/src/argument_parsing.rs b/src/argument_parsing.rs index c3a9710..d6f0680 100644 --- a/src/argument_parsing.rs +++ b/src/argument_parsing.rs @@ -7,6 +7,7 @@ pub struct Config { pub interval: u64, pub verbosity: usize, pub quiet: bool, + pub timeout: u64, } macro_rules! is_parsable { @@ -54,7 +55,8 @@ pub fn get_config() -> Result { .arg( Arg::from("--timeout 'Timout for redis connection'") .takes_value(true) - .validator(is_parsable!(u64, "hello there")), + .validator(is_parsable!(u64, "Timeout must be a positive integer")) + .default_value("30"), ) .get_matches(); @@ -65,6 +67,7 @@ pub fn get_config() -> Result { follow: args.is_present("follow") || args.occurrences_of("interval") > 0, verbosity: args.occurrences_of("verbosity") as usize, quiet: args.is_present("quiet"), + timeout: args.value_of("timeout").unwrap().parse().unwrap(), }; Ok(config) } diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..df468d7 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,117 @@ +extern crate clap; +extern crate log; +extern crate redis; +extern crate stderrlog; +use std::thread::sleep; +use std::time::Duration; +mod slowlog; +use slowlog::SlowlogRecord; +mod argument_parsing; +mod slowlog_reader; +use slowlog_reader::SlowlogReader; +use std::convert::TryFrom; + +#[derive(Clone)] +pub struct ConnectionProvider { + client: redis::Client, + timeout: u64, +} + +impl From<(redis::Client, u64)> for ConnectionProvider { + fn from(arg: (redis::Client, u64)) -> ConnectionProvider { + ConnectionProvider { + client: arg.0, + timeout: arg.1, + } + } +} + +impl ConnectionProvider { + pub fn get_connection(&self) -> redis::RedisResult { + self.client + .get_connection_with_timeout(Duration::from_secs(self.timeout)) + } +} + +fn print_rec(r: &SlowlogRecord) { + println!( + "[{}] id: {},\tduration: {},\tclient: {},\tclient_name: {},\tcommand: {:?}", + r.time, r.id, r.duration, r.client_socket, r.client_name, r.command + ) +} + +fn error_handler(e: redis::RedisError) { + match e.kind() { + redis::ErrorKind::IoError => { + log::error!("Can't establish connection to redis cluster: {}", e) + } + _ => unimplemented!("Error not handled: {}({:?})", e, e.kind()), + } +} + +fn create_slowlog_reader(con_provider: ConnectionProvider, interval: u64) -> SlowlogReader { + log::debug!("Creating slowlog reader"); + loop { + match SlowlogReader::try_from(con_provider.clone()) { + Err(e) => error_handler(e), + Ok(slr) => return slr, + } + sleep(Duration::new(interval, 0)) + } +} + +fn read_once(con_provider: ConnectionProvider) { + match { + move || -> Result<(), redis::RedisError> { + for r in slowlog_reader::get_slowlog(&mut con_provider.get_connection()?, 128)?.iter() { + print_rec(r) + } + Ok(()) + } + }() { + Err(e) => error_handler(e), + Ok(_) => std::process::exit(0), + } +} + +fn read_continiously(con_provider: ConnectionProvider, interval: u64) { + let mut sl_reader = create_slowlog_reader(con_provider, interval); + + loop { + match sl_reader + .get() + .map_err(|e| sl_reader.redis_error_handler(e)) + { + Ok(records) => { + for r in records.iter().rev() { + print_rec(r) + } + } + Err(e) => { + if let Err(e) = e { + error_handler(e) + } + } + } + sleep(Duration::new(interval, 0)); + } +} + +pub fn main() { + let config = argument_parsing::get_config() + .map_err(|e| e.exit()) + .unwrap(); + stderrlog::new() + .timestamp(stderrlog::Timestamp::Second) + .verbosity(config.verbosity) + .quiet(config.quiet) + .init() + .unwrap(); + let redis_client = redis::Client::open((&config.hostname, config.port)).unwrap(); + let connection_provider = ConnectionProvider::from((redis_client, config.interval)); + if config.follow { + read_continiously(connection_provider, config.interval) + } else { + read_once(connection_provider) + } +} diff --git a/src/main.rs b/src/main.rs index ae188d9..c4fe307 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,99 +1,4 @@ -extern crate clap; -extern crate log; -extern crate redis; -extern crate stderrlog; -use std::thread::sleep; -use std::time::Duration; -mod slowlog; -use slowlog::SlowlogRecord; -mod argument_parsing; -mod slowlog_reader; -use slowlog_reader::SlowlogReader; -use std::convert::TryFrom; -#[macro_use] -extern crate lazy_static; -lazy_static! { - static ref CONFIG: argument_parsing::Config = argument_parsing::get_config() - .map_err(|e| e.exit()) - .unwrap(); -} - -fn print_rec(r: &SlowlogRecord) { - println!( - "[{}] id: {},\tduration: {},\tclient: {},\tclient_name: {},\tcommand: {:?}", - r.time, r.id, r.duration, r.client_socket, r.client_name, r.command - ) -} - -fn error_handler(e: redis::RedisError) { - match e.kind() { - redis::ErrorKind::IoError => { - log::error!("Can't establish connection to redis cluster: {}", e) - } - _ => unimplemented!("Error not handled: {}({:?})", e, e.kind()), - } -} - -fn create_slowlog_reader(client: redis::Client, interval: u64) -> SlowlogReader { - log::debug!("Creating slowlog reader"); - loop { - match SlowlogReader::try_from(client.clone()) { - Err(e) => error_handler(e), - Ok(slr) => return slr, - } - sleep(Duration::new(interval, 0)) - } -} - -fn read_once(client: redis::Client) { - match { - move || -> Result<(), redis::RedisError> { - for r in slowlog_reader::get_slowlog(&mut client.get_connection()?, 128)?.iter() { - print_rec(r) - } - Ok(()) - } - }() { - Err(e) => error_handler(e), - Ok(_) => std::process::exit(0), - } -} - -fn read_continiously(client: redis::Client) { - let mut sl_reader = create_slowlog_reader(client, CONFIG.interval); - - loop { - match sl_reader - .get() - .map_err(|e| sl_reader.redis_error_handler(e)) - { - Ok(records) => { - for r in records.iter().rev() { - print_rec(r) - } - } - Err(e) => { - if let Err(e) = e { - error_handler(e) - } - } - } - sleep(Duration::new(CONFIG.interval, 0)); - } -} - -fn main() { - stderrlog::new() - .timestamp(stderrlog::Timestamp::Second) - .verbosity(CONFIG.verbosity) - .quiet(CONFIG.quiet) - .init() - .unwrap(); - let redis_client = redis::Client::open((&CONFIG.hostname, CONFIG.port)).unwrap(); - if CONFIG.follow { - read_continiously(redis_client) - } else { - read_once(redis_client) - } -} +fn main(){ + rslog::main(); +} \ No newline at end of file diff --git a/src/slowlog_reader.rs b/src/slowlog_reader.rs index b3a0406..3a90754 100644 --- a/src/slowlog_reader.rs +++ b/src/slowlog_reader.rs @@ -1,19 +1,20 @@ use crate::slowlog::SlowlogRecord; +use crate::ConnectionProvider; pub struct SlowlogReader { - client: redis::Client, - con: redis::Connection, + connection_provider: ConnectionProvider, + connection: redis::Connection, last_id: i64, length: u32, uptime: u64, } -impl std::convert::TryFrom for SlowlogReader { +impl std::convert::TryFrom for SlowlogReader { type Error = redis::RedisError; - fn try_from(client: redis::Client) -> Result { + fn try_from(connection_provider: ConnectionProvider) -> Result { let sl_reader = SlowlogReader { - con: client.get_connection()?, - client: client, + connection: connection_provider.get_connection()?, + connection_provider, last_id: -1, length: 128, uptime: 0, @@ -34,8 +35,7 @@ fn get_uptime(con: &mut redis::Connection) -> redis::RedisResult { let server_info = redis::cmd("INFO").arg("SERVER").query::(con)?; server_info .lines() - .filter(|l| l.contains("uptime_in_seconds")) - .nth(0) + .find(|l| l.contains("uptime_in_seconds")) .ok_or(( redis::ErrorKind::TypeError, "No uptime line in response from server", @@ -59,7 +59,7 @@ fn get_uptime(con: &mut redis::Connection) -> redis::RedisResult { impl SlowlogReader { pub fn get(&mut self) -> redis::RedisResult> { self.check_for_restart()?; - let new_records: Vec<_> = get_slowlog(&mut self.con, self.length)? + let new_records: Vec<_> = get_slowlog(&mut self.connection, self.length)? .into_iter() .filter(|r| r.id as i64 > self.last_id) .collect(); @@ -67,12 +67,12 @@ impl SlowlogReader { Ok(new_records) } pub fn update_connection(&mut self) -> Result<(), redis::RedisError> { - self.con = self.client.get_connection()?; + self.connection = self.connection_provider.get_connection()?; Ok(()) } fn check_for_restart(&mut self) -> redis::RedisResult<()> { - let uptime = get_uptime(&mut self.con)?; + let uptime = get_uptime(&mut self.connection)?; if uptime < self.uptime { self.last_id = -1 }