Skip to content

Commit

Permalink
It works!
Browse files Browse the repository at this point in the history
  • Loading branch information
sysx23 committed Jan 4, 2021
0 parents commit 54e39b7
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target
12 changes: 12 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "rslog"
version = "0.1.0"
authors = ["Rostyslav Ivanika <Rostyslav.Ivanika@gmail.com>"]
edition = "2018"


[dependencies]
redis = "0.19.0"
log = "0.4.11"
stderrlog = "0.5"
clap = {features = ["yaml"], version ="3.0.0-beta.2"}
23 changes: 23 additions & 0 deletions src/cli.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Redis slowlog reader
about: Prints redis slowlog to stdout
args:
- hostname:
about: Server hostname
short: h
takes_value: true
default_value: "127.0.0.1"
- port:
about: Server port
short: p
takes_value: true
default_value: "6379"
- verbose:
about: Sets the level of verbosity
short: v
multiple: true
takes_value: false
- quiet:
about: Silence all error messages
short: q
takes_value: false

78 changes: 78 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
extern crate clap;
extern crate log;
extern crate redis;
extern crate stderrlog;
use std::thread::sleep;
use std::time::Duration;
mod slowlog;
use slowlog::SlowlogRecord;
mod slowlog_reader;
use slowlog_reader::SlowlogReader;
use std::convert::TryFrom;

fn print_rec(r: &SlowlogRecord) {
println!(
"[{}] id: {},\tduration: {},\tclient: {},\tclient_name: {},\tcommand: {:?}",
r.time, r.id, r.duration, r.client_socket, r.client_name, r.command
)
}

fn error_handler(e: redis::RedisError) {
match e.kind() {
redis::ErrorKind::IoError => {
log::error!("Can't establish connection to redis cluster: {}", e)
}
_ => unimplemented!("Error not handled: {}({:?})", e, e.kind()),
}
}

fn create_slowlog_reader(client: redis::Client) -> SlowlogReader {
log::debug!("Creating slowlog reader");
loop {
match SlowlogReader::try_from(client.clone()) {
Err(e) => error_handler(e),
Ok(slr) => return slr,
}
sleep(Duration::new(2, 0))
}
}

fn main() {
// Parse args
let args = clap::App::from(clap::load_yaml!("cli.yaml"))
.version(clap::crate_version!())
.author(clap::crate_authors!())
.get_matches();
let hostname = args.value_of("hostname").unwrap();
let port: u16 = args.value_of("port").unwrap().parse().unwrap();
let verbosity = args.occurrences_of("verbose") as usize;
let quiet = args.is_present("quiet");
// Init logger
stderrlog::new()
.timestamp(stderrlog::Timestamp::Second)
.verbosity(verbosity)
.quiet(quiet)
.init()
.unwrap();
let redis_client = redis::Client::open((hostname, port)).unwrap();
let mut sl_reader = create_slowlog_reader(redis_client);

loop {
match sl_reader
.get()
.map_err(|e| sl_reader.redis_error_handler(e))
{
Ok(records) => {
for r in records.iter().rev() {
print_rec(r)
}
}
Err(e) => {
if let Err(e) = e {
error_handler(e)
}
}
}
sleep(Duration::new(2, 0));
}
}
35 changes: 35 additions & 0 deletions src/slowlog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#[derive(Debug, Default)]
pub struct SlowlogRecord {
pub id: u64,
pub time: u64,
pub duration: u32,
pub command: Vec<String>,
pub client_socket: String,
pub client_name: String,
}

fn next_value<T: redis::FromRedisValue>(
i: &mut std::slice::Iter<redis::Value>,
) -> redis::RedisResult<T> {
match i.next() {
Some(v) => redis::FromRedisValue::from_redis_value(v),
None => Err(redis::RedisError::from((
redis::ErrorKind::TypeError,
"The field is not found in the response",
))),
}
}

impl redis::FromRedisValue for SlowlogRecord {
fn from_redis_value(v: &redis::Value) -> redis::RedisResult<SlowlogRecord> {
let rows = &mut v.as_sequence().unwrap().iter();
Ok(SlowlogRecord {
id: next_value(rows)?,
time: next_value(rows)?,
duration: next_value(rows)?,
command: next_value(rows)?,
client_socket: next_value(rows)?,
client_name: next_value(rows)?,
})
}
}
92 changes: 92 additions & 0 deletions src/slowlog_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use crate::slowlog::SlowlogRecord;

pub struct SlowlogReader {
client: redis::Client,
con: redis::Connection,
last_id: i64,
length: u32,
uptime: u64,
}

impl std::convert::TryFrom<redis::Client> for SlowlogReader {
type Error = redis::RedisError;
fn try_from(client: redis::Client) -> Result<Self, Self::Error> {
let sl_reader = SlowlogReader {
con: client.get_connection()?,
client: client,
last_id: -1,
length: 128,
uptime: 0,
};
Ok(sl_reader)
}
}

fn get_slowlog(con: &mut redis::Connection, length: u32) -> redis::RedisResult<Vec<SlowlogRecord>> {
log::debug!("Executing slowlog query");
redis::cmd("SLOWLOG").arg("GET").arg(length).query(con)
}

fn get_uptime(con: &mut redis::Connection) -> redis::RedisResult<u64> {
let server_info = redis::cmd("INFO").arg("SERVER").query::<String>(con)?;
server_info
.lines()
.filter(|l| l.contains("uptime_in_seconds"))
.nth(0)
.ok_or((
redis::ErrorKind::TypeError,
"No uptime line in response from server",
))?
.split(':')
.nth(1)
.ok_or((
redis::ErrorKind::TypeError,
"No value for uptime in response from server",
))?
.parse::<u64>()
.map_err(|e: std::num::ParseIntError| {
redis::RedisError::from((
redis::ErrorKind::TypeError,
"Error while trying to parse uptime from response",
e.to_string(),
))
})
}

impl SlowlogReader {
pub fn get(&mut self) -> redis::RedisResult<Vec<SlowlogRecord>> {
self.check_for_restart()?;
let new_records: Vec<_> = get_slowlog(&mut self.con, self.length)?
.into_iter()
.filter(|r| r.id as i64 > self.last_id)
.collect();
self.last_id = new_records.get(0).map_or(self.last_id, |r| r.id as i64);
Ok(new_records)
}
pub fn update_connection(&mut self) -> Result<(), redis::RedisError> {
self.con = self.client.get_connection()?;
Ok(())
}

fn check_for_restart(&mut self) -> redis::RedisResult<()> {
let uptime = get_uptime(&mut self.con)?;
if uptime < self.uptime {
self.last_id = -1
}
self.uptime = uptime;
Ok(())
}

pub fn redis_error_handler(&mut self, e: redis::RedisError) -> Result<(), redis::RedisError> {
if matches!(e.kind(), redis::ErrorKind::IoError) {
log::warn!(
"Lost connection to redis cluster, trying to establish a new one. Error: {}",
e
);
if let Err(e) = self.update_connection() {
return Err(e);
}
}
Ok(())
}
}

0 comments on commit 54e39b7

Please sign in to comment.