Skip to content

Commit

Permalink
+ fix #1 - Offline dependencies installation
Browse files Browse the repository at this point in the history
+ fix #1.1 -  Update Requirements and README
+ fix #2 - irrelevant data for jboss cpu
+ fix 3 - init SQL / DB schema re-struct + code re-factoring
+ fix 3.1 - add f_rebuild function
+ fix #3.2 - adjusted mechanism of data displaying from cluster
+ fix #5 - add more network stats (errors, dropped, etc)
+ fix #8 -  customizable disk path
+ fix #9 - add background process to truncate inner DB after X days (number of days to be defined in settings.json)
  • Loading branch information
swifty94 committed Aug 11, 2021
1 parent 1b6ef48 commit 5165fea
Show file tree
Hide file tree
Showing 13 changed files with 498 additions and 317 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ log/app.log*
reports/*.csv
venv/
__pycache__
db/*.db
db/*.db
mainold.js
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Stack:
Requirements:
---
- Python 3.X
- Internet connection in order to install dependencies (offline installation to be included in next build)
- gcc version > 7.4.X

Installation:
---
Expand Down
36 changes: 27 additions & 9 deletions app.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,39 @@ def __init__(self):
self.IpAddr = JsonSettings.parseJson(self.json, 'IpAddr')
self.TcpPort = int(JsonSettings.parseJson(self.json, 'TcpPort'))
self.dbUpdateInterval = int(JsonSettings.parseJson(self.json, 'DbUpdateIntervalSec'))
self.dbTruncateInterval = int(JsonSettings.parseJson(self.json, 'DbTruncateIntervalSec'))

def run(self):
try:
scheduler = BackgroundScheduler()
DbUpdate = DbWorker()
logging.info(f'{self.cn} Database validation')
DbUpdate.periodicUpdate()
logging.info(f'{self.cn} Setting up DbUpdate BackgroundScheduler')
scheduler.add_job(func=DbUpdate.periodicUpdate, trigger="interval", seconds=self.dbUpdateInterval)
scheduler.start()
logging.info(f'{self.cn} Setting up Flask WSGI server')
logging.info(f'{self.cn} -----> WSGI server started <-----')
logging.info(f'{self.cn} Running on: {self.IpAddr}:{self.TcpPort}')
logging.info(f'{self.cn} DB update periodic interval: {self.dbUpdateInterval} sec')
serve(app, host=self.IpAddr, port=self.TcpPort)
DbUpdate.periodicUpdate()
if self.dbUpdateInterval == 0:
logging.info(f"{self.cn} DbUpdate: !!!DISABLED.")
logging.info(f'{self.cn} Setting up Flask WSGI server')
logging.info(f'{self.cn} -----> WSGI server started <-----')
logging.info(f'{self.cn} Running on: {self.IpAddr}:{self.TcpPort}')
serve(app, host=self.IpAddr, port=self.TcpPort)
elif self.dbUpdateInterval > 0 and self.dbTruncateInterval > 0:
logging.info(f'{self.cn} Setting up DbUpdate BackgroundScheduler')
scheduler.add_job(func=DbUpdate.periodicUpdate, trigger="interval", seconds=self.dbUpdateInterval)
logging.info(f'{self.cn} Setting up DbTruncate BackgroundScheduler')
scheduler.add_job(func=DbUpdate.periodicTruncate, trigger="interval", seconds=self.dbTruncateInterval)
scheduler.start()
logging.info(f'{self.cn} Setting up Flask WSGI server')
logging.info(f'{self.cn} -----> WSGI server started <-----')
logging.info(f'{self.cn} Running on: {self.IpAddr}:{self.TcpPort}')
logging.info(f'{self.cn} DB update periodic interval: {self.dbUpdateInterval} sec')
logging.info(f'{self.cn} DB truncate periodic interval: {self.dbTruncateInterval} sec')
serve(app, host=self.IpAddr, port=self.TcpPort)
elif self.dbUpdateInterval < 0 or self.dbUpdateInterval is None:
exception_line = f"""
\ndbUpdateInterval cannot be None or negative int
\n:param dbUpdateInterval == 0 -> disabled or dbUpdateInterval > 0 -> enabled
\nsettings.json -> dbUpdateInterval = {self.dbUpdateInterval}
"""
raise Exception(exception_line)
except Exception as e:
logging.critical(f'{self.cn} Critical exception: {e}', exc_info=1)
logging.critical(f'{self.cn} Application cannot be started due to the error above since it has occured in the entry point\nEXIT')
Expand Down
13 changes: 9 additions & 4 deletions bin/app.bat
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ rem Windows Bat script for easy application startup
rem :Setting up venv + install dependencies if this is first run
rem :Starting app from venv if already set
rem :Adjust APP_HOME to actual path of project

SET APP_HOME=C:\FTStats_v2.0.0\
cd %APP_HOME%
set arg=%1
SET APP_HOME=C:\Users\kiril\Documents\LenovoData\Dev\FTStats_v3.0.0
cd %APP_HOME%
if exist venv\ (
echo "Found venv"
.\venv\Scripts\activate && pythonw app.py
.\venv\Scripts\activate
echo "Done"
echo "Starting server"
pythonw app.py
) else (
echo "Venv not found. Creating + activating"
py -m venv venv
Expand All @@ -16,5 +19,7 @@ if exist venv\ (
FOR %%i in (dep\*.whl) DO venv\Scripts\pip3.9.exe install %%i
FOR %%i in (dep\*.tar.gz) DO venv\Scripts\pip3.9.exe install %%i
cd %APP_HOME%
echo "Done"
echo "Starting server"
pythonw app.py
)
2 changes: 1 addition & 1 deletion bin/app.sh
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ case "$1" in
f_rebuild
;;
*)
echo "Usage: app { start | stop | restart | status | rebuild}"
echo "Usage: app { start | stop | restart | status | rebuild }"
exit 1
esac

Expand Down
102 changes: 64 additions & 38 deletions controllers.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import json
import psutil
import socket
import os.path
import os
from os import path
import sqlite3
import platform
import requests
from os import path
from bs4 import BeautifulSoup
from clickhouse_driver import connect
from clickhouse_driver.errors import NetworkError
Expand Down Expand Up @@ -82,6 +82,7 @@ def __init__(self):
self.cn = __class__.__name__
self.acsUrl = JsonSettings.parseJson('settings.json', 'AcsStatsUrl')
self.qoeDbStr = JsonSettings.parseJson('settings.json', 'QoeDbConnectionString')
self.mountpoint = JsonSettings.parseJson('settings.json', 'mountpoint')

def _getJbossPid(self) -> int:
try:
Expand Down Expand Up @@ -206,15 +207,13 @@ def getCpu(self):

def getNetwork(self) -> Dict:
try:
net_io = psutil.net_io_counters()
sent_b = round((net_io.bytes_sent/1024/1024/1024),2)
recv_b = round((net_io.bytes_recv/1024/1024/1024),2)
net_io = psutil.net_io_counters()
errin = net_io.errin
errout = net_io.errout
dropin = net_io.dropin
dropout = net_io.dropout
values = [sent_b, recv_b, errin, errout, dropin, dropout]
keys = ['sent_b', 'recv_b', 'errin', 'errout', 'dropin', 'dropout']
values = [errin, errout, dropin, dropout]
keys = ['errin', 'errout', 'dropin', 'dropout']
data = JsonSettings.fillDict(keys, values)
return data
except Exception as e:
Expand All @@ -223,14 +222,11 @@ def getNetwork(self) -> Dict:

def getDisk(self) -> Dict:
try:
d = psutil.disk_usage('/')
d = psutil.disk_usage(self.mountpoint)
u_disk = round((d.used/1024/1024/1024),2)
f_disk = round((d.free/1024/1024/1024),2)
disk_io = psutil.disk_io_counters()
read_io = disk_io.read_bytes
write_io = disk_io.write_bytes
values = [u_disk,f_disk,read_io,write_io]
keys = ['u_disk','f_disk','read_io','write_io']
f_disk = round((d.free/1024/1024/1024),2)
values = [u_disk,f_disk]
keys = ['u_disk','f_disk']
data = JsonSettings.fillDict(keys,values)
return data
except Exception as e:
Expand Down Expand Up @@ -287,12 +283,10 @@ def getQoeData(self):
response = requests.get(self.acsUrl)
page = BeautifulSoup(response.text, 'html.parser')
qoe_sessions_min = page.find("td", text="QoESession per min (cur hour):").find_next_sibling("td").text
cpe_data_serial = self.clickhouseSelect("select count(*) from ftacs_qoe_ui_data.cpe_data")
kpi_data_serial = self.clickhouseSelect("select count(*) from ftacs_qoe_ui_data.kpi_data")
qoedb_size = self.clickhouseSelect("SELECT round((sum(x)/1024/1024/1024),2) FROM (SELECT sum(bytes) as x FROM system.parts WHERE active AND database = 'ftacs_qoe_ui_data' GROUP BY bytes ORDER BY bytes DESC) y")
sysdb_size = self.clickhouseSelect("SELECT round((sum(x)/1024/1024/1024),2) FROM (SELECT sum(bytes) as x FROM system.parts WHERE active AND database = 'system' GROUP BY bytes ORDER BY bytes DESC) y")
keys = ['qoe_sessions_min','cpe_data_serial','kpi_data_serial','qoedb_size','sysdb_size']
values = [qoe_sessions_min,cpe_data_serial,kpi_data_serial,qoedb_size,sysdb_size]
cpe_data_serial = self.clickhouseSelect("select count(*) from ftacs_qoe_ui_data.cpe_data")
qoedb_size = self.clickhouseSelect("SELECT round((sum(x)/1024/1024/1024),2) FROM (SELECT sum(bytes) as x FROM system.parts WHERE active AND database = 'ftacs_qoe_ui_data' GROUP BY bytes ORDER BY bytes DESC) y")
keys = ['qoe_sessions_min','cpe_data_serial','qoedb_size',]
values = [qoe_sessions_min,cpe_data_serial,qoedb_size]
qoe_data = JsonSettings.fillDict(keys,values)
return qoe_data
except Exception as e:
Expand Down Expand Up @@ -338,7 +332,7 @@ def connect(self) -> object:

def initDb(self) -> None:
"""
:Initiating database if not exist \n
Initiating database if not exist \n
:Accept - None\n
:Return - None
"""
Expand All @@ -365,7 +359,20 @@ def initDb(self) -> None:
logging.info(f'{self.cn} Database created with tables:\n{tables.fetchall()}')
if connection:
connection.close()


def delDb(self) -> bool:
"""
Void method to remove DB if needed\n
:Accept - None\n
:Return - True if succeded
"""
try:
os.remove(self.dbname)
if not os.path.isfile(self.dbname):
return True
except Exception as e:
logging.error(f'{self.cn} Error \n{e}', exc_info=1)

def selectData(self, sql: str) -> List:
"""
Select data from DB\n
Expand Down Expand Up @@ -412,7 +419,14 @@ class DbWorker(object):
def __init__(self):
self.cn = __class__.__name__
self.data = DataCollector()
self.db = SqlProcessor()
self.db = SqlProcessor()

def _get_bindings(self, sql_values):
bindings = '('
for char in range(len(sql_values)):
bindings += '?,'
bindings += ')'
return bindings

def getJsonValues(self, json: Dict) -> tuple:
values = []
Expand Down Expand Up @@ -443,21 +457,22 @@ def insertStats(self):
ram = self.getJsonValues(self.data.getRam())
disk = self.getJsonValues(self.data.getDisk())
network = self.getJsonValues(self.data.getNetwork())
qoe = self.getJsonValues(self.data.getQoeData())
values = cpu + ram + disk + network + qoe
bindings = '('
for char in range(len(values)):
bindings += '?,'
bindings += ')'
isQoe = JsonSettings.parseJson('settings.json','collectQoe')
if isQoe:
qoe = self.getJsonValues(self.data.getQoeData())
values = cpu + ram + disk + network + qoe
else:
values = cpu + ram + disk + network + (0,0,0,0,0)

bindings = self._get_bindings(values)
sql = f"""
INSERT INTO stats ('javacpu','cpu_percent', 'loadavg',
'javamem', 'freeram', 'usedram',
'u_disk','f_disk','read_io','write_io',
'sent_b','recv_b', 'errin', 'errout', 'dropin', 'dropout',
'qoe_sessions_min','cpe_data_serial','kpi_data_serial','qoedb_size','sysdb_size')
VALUES {bindings.replace('?,)','?)')}
"""
self.db.insertData(sql, values)
INSERT INTO stats ('javacpu','cpu_percent', 'loadavg',
'javamem', 'freeram', 'usedram',
'u_disk','f_disk','errin', 'errout', 'dropin', 'dropout',
'qoe_sessions_min','cpe_data_serial','kpi_data_serial','qoedb_size','sysdb_size')
VALUES {bindings.replace('?,)','?)')}
"""
self.db.insertData(sql, values)
except Exception as e:
logging.error(f'{self.cn} Exception: {e}', exc_info=1)
logging.error(f'{self.cn} SQL: {sql}')
Expand Down Expand Up @@ -507,6 +522,17 @@ def periodicUpdate(self):
logging.critical(f'{self.cn} Exception: {e}')
logging.critical(f'{self.cn} StackTrace: \n', exc_info=1)

def periodicTruncate(self):
"""
Method that recreating the DB based on DbTruncateIntervalSec in settings\n
"""
try:
if self.db.delDb():
self.periodicUpdate()
except Exception as e:
logging.critical(f'{self.cn} Exception: {e}')
logging.critical(f'{self.cn} StackTrace: \n', exc_info=1)

class ReportMetaData(object):
def __init__(self):
self.cn = __class__.__name__
Expand All @@ -526,4 +552,4 @@ def userKeys() -> List:
user_keys.append(j)
return user_keys
except Exception as e:
logging.error(f'{cn} Exception: {e}', exc_info=1)
logging.error(f'{cn} Exception: {e}', exc_info=1)
6 changes: 1 addition & 5 deletions db/db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ CREATE TABLE IF NOT EXISTS `stats` (
`freeram` INTEGER,
`usedram` INTEGER,
`u_disk` INTEGER,
`f_disk` INTEGER,
`read_io` INTEGER,
`write_io` INTEGER,
`sent_b` INTEGER,
`recv_b` INTEGER,
`f_disk` INTEGER,
`errin` INTEGER,
`errout` INTEGER,
`dropin` INTEGER,
Expand Down
23 changes: 16 additions & 7 deletions model.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,16 @@ def _getMeta(self):
logging.error(f'{self.cn} Exception: {e}', exc_info=1)

def _set(self, *args: AnyStr) -> Dict:
"""
:param args* -> key to paste into SELECT statetment\n
:return -> data dictionary\n
:e.g: _set("freeram,usedram,javamem,updated")
"""
try:
sql = f"select {args} from stats order by updated".replace('\'','').replace('(','').replace(')','').replace('updated,','updated')
sql = f"select {args} from stats order by updated".replace('\'','').replace('(','').replace(')','').replace('updated,','updated')
data = self.db.selectData(sql)
logging.info(f'SQL query used:\n{sql}')
logging.info(f'Data from backend:\n{data}')
main_array = []
data_object = {}
for item in data:
Expand All @@ -73,15 +80,15 @@ def _set(self, *args: AnyStr) -> Dict:
def createView(self, kpi: str) -> Dict:
try:
if kpi == 'ram':
data = self._set("freeram,usedram,javamem,updated")
data = self._set("freeram, usedram, javamem, updated")
elif kpi == 'cpu':
data = self._set("javacpu,cpu_percent,loadavg,updated")
data = self._set("javacpu, cpu_percent, loadavg, updated")
elif kpi == 'disk':
data = self._set("u_disk,f_disk,read_io,write_io,updated")
data = self._set("u_disk, f_disk, updated")
elif kpi == 'net':
data = self._set("sent_b, recv_b, errin, errout, dropin, dropout, updated")
data = self._set("errin, errout, dropin, dropout, updated")
elif kpi == 'qoe':
data = self._set("qoe_sessions_min,cpe_data_serial,kpi_data_serial,qoedb_size,sysdb_size,updated")
data = self._set("qoe_sessions_min,cpe_data_serial,qoedb_size,updated")
elif kpi == 'sessions':
columns = self.meta.userKeys()
columns = str(columns).replace('[','').replace(']','').replace('\'','')
Expand All @@ -92,6 +99,8 @@ def createView(self, kpi: str) -> Dict:
main_array.append(list(item))
data_object["data"] = main_array
return data_object
logging.info(f'Data from backend:\n{data_object}')

return data
except Exception as e:
logging.error(f'{self.cn} Exception: {e}', exc_info=1)
Expand All @@ -112,7 +121,7 @@ def createServerReport(self):
report_name = f'reports/FTStats_server_report_{date}.csv'
connection = self.db.connect()
cursor = connection.cursor()
cursor.execute("select javacpu,cpu_percent,loadavg,javamem,freeram,usedram,u_disk,f_disk,read_io,write_io, sent_b, recv_b, errin, errout, dropin, dropout,qoe_sessions_min,cpe_data_serial,kpi_data_serial,qoedb_size,sysdb_size, strftime('%Y_%d_%m_%H_%M_%S', updated) as timestamp from stats")
cursor.execute("select javacpu,cpu_percent,loadavg,javamem,freeram,usedram,u_disk,f_disk, errin, errout, dropin, dropout,qoe_sessions_min,cpe_data_serial,qoedb_size, strftime('%Y_%d_%m_%H_%M_%S', updated) as timestamp from stats")
with open(report_name, "w", newline='') as csv_file:
csv_writer = csv.writer(csv_file, delimiter=',', lineterminator='\n')
csv_writer.writerow([i[0] for i in cursor.description])
Expand Down
Loading

0 comments on commit 5165fea

Please sign in to comment.