Skip to content

Commit

Permalink
add timeout option, a bit of refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sysx23 committed Jan 15, 2021
1 parent 1d510c4 commit 16e1e4e
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 111 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ edition = "2018"
redis = "0.19.0"
log = "0.4.11"
stderrlog = "0.5"
lazy_static = "1"
clap = "3.0.0-beta.2"
5 changes: 4 additions & 1 deletion src/argument_parsing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub struct Config {
pub interval: u64,
pub verbosity: usize,
pub quiet: bool,
pub timeout: u64,
}

macro_rules! is_parsable {
Expand Down Expand Up @@ -54,7 +55,8 @@ pub fn get_config() -> Result<Config, clap::Error> {
.arg(
Arg::from("--timeout 'Timout for redis connection'")
.takes_value(true)
.validator(is_parsable!(u64, "hello there")),
.validator(is_parsable!(u64, "Timeout must be a positive integer"))
.default_value("30"),
)
.get_matches();

Expand All @@ -65,6 +67,7 @@ pub fn get_config() -> Result<Config, clap::Error> {
follow: args.is_present("follow") || args.occurrences_of("interval") > 0,
verbosity: args.occurrences_of("verbosity") as usize,
quiet: args.is_present("quiet"),
timeout: args.value_of("timeout").unwrap().parse().unwrap(),
};
Ok(config)
}
117 changes: 117 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
extern crate clap;
extern crate log;
extern crate redis;
extern crate stderrlog;
use std::thread::sleep;
use std::time::Duration;
mod slowlog;
use slowlog::SlowlogRecord;
mod argument_parsing;
mod slowlog_reader;
use slowlog_reader::SlowlogReader;
use std::convert::TryFrom;

#[derive(Clone)]
pub struct ConnectionProvider {
client: redis::Client,
timeout: u64,
}

impl From<(redis::Client, u64)> for ConnectionProvider {
fn from(arg: (redis::Client, u64)) -> ConnectionProvider {
ConnectionProvider {
client: arg.0,
timeout: arg.1,
}
}
}

impl ConnectionProvider {
pub fn get_connection(&self) -> redis::RedisResult<redis::Connection> {
self.client
.get_connection_with_timeout(Duration::from_secs(self.timeout))
}
}

fn print_rec(r: &SlowlogRecord) {
println!(
"[{}] id: {},\tduration: {},\tclient: {},\tclient_name: {},\tcommand: {:?}",
r.time, r.id, r.duration, r.client_socket, r.client_name, r.command
)
}

fn error_handler(e: redis::RedisError) {
match e.kind() {
redis::ErrorKind::IoError => {
log::error!("Can't establish connection to redis cluster: {}", e)
}
_ => unimplemented!("Error not handled: {}({:?})", e, e.kind()),
}
}

fn create_slowlog_reader(con_provider: ConnectionProvider, interval: u64) -> SlowlogReader {
log::debug!("Creating slowlog reader");
loop {
match SlowlogReader::try_from(con_provider.clone()) {
Err(e) => error_handler(e),
Ok(slr) => return slr,
}
sleep(Duration::new(interval, 0))
}
}

fn read_once(con_provider: ConnectionProvider) {
match {
move || -> Result<(), redis::RedisError> {
for r in slowlog_reader::get_slowlog(&mut con_provider.get_connection()?, 128)?.iter() {
print_rec(r)
}
Ok(())
}
}() {
Err(e) => error_handler(e),
Ok(_) => std::process::exit(0),
}
}

fn read_continiously(con_provider: ConnectionProvider, interval: u64) {
let mut sl_reader = create_slowlog_reader(con_provider, interval);

loop {
match sl_reader
.get()
.map_err(|e| sl_reader.redis_error_handler(e))
{
Ok(records) => {
for r in records.iter().rev() {
print_rec(r)
}
}
Err(e) => {
if let Err(e) = e {
error_handler(e)
}
}
}
sleep(Duration::new(interval, 0));
}
}

pub fn main() {
let config = argument_parsing::get_config()
.map_err(|e| e.exit())
.unwrap();
stderrlog::new()
.timestamp(stderrlog::Timestamp::Second)
.verbosity(config.verbosity)
.quiet(config.quiet)
.init()
.unwrap();
let redis_client = redis::Client::open((&config.hostname, config.port)).unwrap();
let connection_provider = ConnectionProvider::from((redis_client, config.interval));
if config.follow {
read_continiously(connection_provider, config.interval)
} else {
read_once(connection_provider)
}
}
101 changes: 3 additions & 98 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,99 +1,4 @@
extern crate clap;
extern crate log;
extern crate redis;
extern crate stderrlog;
use std::thread::sleep;
use std::time::Duration;
mod slowlog;
use slowlog::SlowlogRecord;
mod argument_parsing;
mod slowlog_reader;
use slowlog_reader::SlowlogReader;
use std::convert::TryFrom;
#[macro_use]
extern crate lazy_static;

lazy_static! {
static ref CONFIG: argument_parsing::Config = argument_parsing::get_config()
.map_err(|e| e.exit())
.unwrap();
}

fn print_rec(r: &SlowlogRecord) {
println!(
"[{}] id: {},\tduration: {},\tclient: {},\tclient_name: {},\tcommand: {:?}",
r.time, r.id, r.duration, r.client_socket, r.client_name, r.command
)
}

fn error_handler(e: redis::RedisError) {
match e.kind() {
redis::ErrorKind::IoError => {
log::error!("Can't establish connection to redis cluster: {}", e)
}
_ => unimplemented!("Error not handled: {}({:?})", e, e.kind()),
}
}

fn create_slowlog_reader(client: redis::Client, interval: u64) -> SlowlogReader {
log::debug!("Creating slowlog reader");
loop {
match SlowlogReader::try_from(client.clone()) {
Err(e) => error_handler(e),
Ok(slr) => return slr,
}
sleep(Duration::new(interval, 0))
}
}

fn read_once(client: redis::Client) {
match {
move || -> Result<(), redis::RedisError> {
for r in slowlog_reader::get_slowlog(&mut client.get_connection()?, 128)?.iter() {
print_rec(r)
}
Ok(())
}
}() {
Err(e) => error_handler(e),
Ok(_) => std::process::exit(0),
}
}

fn read_continiously(client: redis::Client) {
let mut sl_reader = create_slowlog_reader(client, CONFIG.interval);

loop {
match sl_reader
.get()
.map_err(|e| sl_reader.redis_error_handler(e))
{
Ok(records) => {
for r in records.iter().rev() {
print_rec(r)
}
}
Err(e) => {
if let Err(e) = e {
error_handler(e)
}
}
}
sleep(Duration::new(CONFIG.interval, 0));
}
}

fn main() {
stderrlog::new()
.timestamp(stderrlog::Timestamp::Second)
.verbosity(CONFIG.verbosity)
.quiet(CONFIG.quiet)
.init()
.unwrap();
let redis_client = redis::Client::open((&CONFIG.hostname, CONFIG.port)).unwrap();
if CONFIG.follow {
read_continiously(redis_client)
} else {
read_once(redis_client)
}
}
fn main(){
rslog::main();
}
22 changes: 11 additions & 11 deletions src/slowlog_reader.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use crate::slowlog::SlowlogRecord;
use crate::ConnectionProvider;

pub struct SlowlogReader {
client: redis::Client,
con: redis::Connection,
connection_provider: ConnectionProvider,
connection: redis::Connection,
last_id: i64,
length: u32,
uptime: u64,
}

impl std::convert::TryFrom<redis::Client> for SlowlogReader {
impl std::convert::TryFrom<ConnectionProvider> for SlowlogReader {
type Error = redis::RedisError;
fn try_from(client: redis::Client) -> Result<Self, Self::Error> {
fn try_from(connection_provider: ConnectionProvider) -> Result<Self, Self::Error> {
let sl_reader = SlowlogReader {
con: client.get_connection()?,
client: client,
connection: connection_provider.get_connection()?,
connection_provider,
last_id: -1,
length: 128,
uptime: 0,
Expand All @@ -34,8 +35,7 @@ fn get_uptime(con: &mut redis::Connection) -> redis::RedisResult<u64> {
let server_info = redis::cmd("INFO").arg("SERVER").query::<String>(con)?;
server_info
.lines()
.filter(|l| l.contains("uptime_in_seconds"))
.nth(0)
.find(|l| l.contains("uptime_in_seconds"))
.ok_or((
redis::ErrorKind::TypeError,
"No uptime line in response from server",
Expand All @@ -59,20 +59,20 @@ fn get_uptime(con: &mut redis::Connection) -> redis::RedisResult<u64> {
impl SlowlogReader {
pub fn get(&mut self) -> redis::RedisResult<Vec<SlowlogRecord>> {
self.check_for_restart()?;
let new_records: Vec<_> = get_slowlog(&mut self.con, self.length)?
let new_records: Vec<_> = get_slowlog(&mut self.connection, self.length)?
.into_iter()
.filter(|r| r.id as i64 > self.last_id)
.collect();
self.last_id = new_records.get(0).map_or(self.last_id, |r| r.id as i64);
Ok(new_records)
}
pub fn update_connection(&mut self) -> Result<(), redis::RedisError> {
self.con = self.client.get_connection()?;
self.connection = self.connection_provider.get_connection()?;
Ok(())
}

fn check_for_restart(&mut self) -> redis::RedisResult<()> {
let uptime = get_uptime(&mut self.con)?;
let uptime = get_uptime(&mut self.connection)?;
if uptime < self.uptime {
self.last_id = -1
}
Expand Down

0 comments on commit 16e1e4e

Please sign in to comment.