Skip to content

Commit

Permalink
Merge pull request AplaProject#117 from AplaProject/56-desync-monitoring
Browse files Browse the repository at this point in the history
56 desync monitoring
  • Loading branch information
potehinre authored Jan 23, 2018
2 parents a2e9fa8 + 258b612 commit 60780b4
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 0 deletions.
55 changes: 55 additions & 0 deletions packages/api/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package api

import (
"net/http"

"github.com/AplaProject/go-apla/packages/consts"
"github.com/AplaProject/go-apla/packages/converter"
"github.com/AplaProject/go-apla/packages/model"

log "github.com/sirupsen/logrus"
)

type GetMaxBlockIDResult struct {
MaxBlockID int64 `json:"max_block_id"`
}

func getMaxBlockID(w http.ResponseWriter, r *http.Request, data *apiData, logger *log.Entry) (err error) {
block := &model.Block{}
found, err := block.GetMaxBlock()
if err != nil {
log.WithFields(log.Fields{"type": consts.DBError, "error": err}).Error("getting max block")
return errorAPI(w, err, http.StatusInternalServerError)
}
if !found {
log.WithFields(log.Fields{"type": consts.NotFound}).Error("last block not found")
return errorAPI(w, `E_NOTFOUND`, http.StatusNotFound)
}
data.result = &GetMaxBlockIDResult{block.ID}
return nil
}

type GetBlockInfoResult struct {
Hash []byte `json:"hash"`
EcosystemID int64 `json:"ecosystem_id"`
KeyID int64 `json:"key_id"`
Time int64 `json:"time"`
Tx int32 `json:"tx_count"`
RollbacksHash []byte `json:"rollbacks_hash"`
}

func getBlockInfo(w http.ResponseWriter, r *http.Request, data *apiData, logger *log.Entry) (err error) {
blockID := converter.StrToInt64(data.params["id"].(string))
block := model.Block{}
found, err := block.Get(blockID)
if err != nil {
log.WithFields(log.Fields{"type": consts.DBError, "error": err}).Error("getting block")
return errorAPI(w, err, http.StatusInternalServerError)
}
if !found {
log.WithFields(log.Fields{"type": consts.NotFound, "id": blockID}).Error("block with id not found")
return errorAPI(w, `E_NOTFOUND`, http.StatusNotFound)
}
data.result = &GetBlockInfoResult{Hash: block.Hash, EcosystemID: block.EcosystemID, KeyID: block.KeyID, Time: block.Time, Tx: block.Tx, RollbacksHash: block.RollbacksHash}
return nil
}
18 changes: 18 additions & 0 deletions packages/api/block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package api

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestGetMaxBlockID(t *testing.T) {
var ret GetMaxBlockIDResult
err := sendGet(`maxblockid`, nil, &ret)
assert.NoError(t, err)
}

func TestGetBlockInfo(t *testing.T) {
var ret GetBlockInfoResult
err := sendGet(`block/1`, nil, &ret)
assert.NoError(t, err)
}
2 changes: 2 additions & 0 deletions packages/api/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func Route(route *hr.Router) {
get(`txstatus/:hash`, ``, authWallet, txstatus)
get(`test/:name`, ``, getTest)
get(`history/:table/:id`, ``, authWallet, getHistory)
get(`block/:id`, ``, getBlockInfo)
get(`maxblockid`, ``, getMaxBlockID)

post(`content/page/:name`, ``, authWallet, getPage)
post(`content/menu/:name`, ``, authWallet, getMenu)
Expand Down
35 changes: 35 additions & 0 deletions tools/desync_monitor/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package config

import (
"github.com/BurntSushi/toml"
)

type Daemon struct {
DaemonMode bool `toml:"daemon"`
QueryingPeriod int `toml:"querying_period"`
}

type AlertMessage struct {
To string `toml:"to"`
From string `toml:"from"`
Subject string `toml:"subject"`
}

type Smtp struct {
Host string `toml:"host"`
Port int `toml:"port"`
Username string `toml:"username"`
Password string `toml:"password"`
}

type Config struct {
Daemon Daemon `toml:"daemon"`
AlertMessage AlertMessage `toml:"alert_message"`
Smtp Smtp `toml:"smtp"`
NodesList []string `toml:"nodes_list"`
}

func (c *Config) Read(fileName string) error {
_, err := toml.DecodeFile(fileName, c)
return err
}
139 changes: 139 additions & 0 deletions tools/desync_monitor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package main

import (
"flag"
"fmt"
"math"
"net/smtp"
"strings"
"time"

"github.com/AplaProject/go-apla/tools/desync_monitor/config"
"github.com/AplaProject/go-apla/tools/desync_monitor/query"

log "github.com/sirupsen/logrus"
)

const confPathFlagName = "confPath"
const nodesListFlagName = "nodesList"
const daemonModeFlagName = "daemonMode"
const queryingPeriodFlagName = "queryingPeriod"
const alertMessageToFlagName = "alertMessageTo"
const alertMessageSubjFlagName = "alertMessageSubj"
const alertMessageFromFlagName = "alertMessageFrom"
const smtpHostFlagName = "smtpHost"
const smtpPortFlagName = "smtpPort"
const smtpUsernameFlagName = "smtpUsername"
const smtpPasswordFlagName = "smtpPassword"

var configPath *string = flag.String(confPathFlagName, "config.toml", "path to desync monitor config")
var nodesList *string = flag.String(nodesListFlagName, "127.0.0.1:7079", "which nodes to query, in format url1,url2,url3")
var daemonMode *bool = flag.Bool(daemonModeFlagName, false, "start as daemon")
var queryingPeriod *int = flag.Int(queryingPeriodFlagName, 1, "period of querying nodes in seconds, if started as daemon")

var alertMessageTo *string = flag.String(alertMessageToFlagName, "alert@apla.io", "email adress to send alert")
var alertMessageSubj *string = flag.String(alertMessageSubjFlagName, "problem with nodes synchronization", "alert message subject")
var alertMessageFrom *string = flag.String(alertMessageFromFlagName, "monitor@apla.io", "email adress from witch to send alert")

var smtpHost *string = flag.String(smtpHostFlagName, "", "host of smtp server, to send alert email")
var smtpPort *int = flag.Int(smtpPortFlagName, 25, "port of smtp server")
var smtpUsername *string = flag.String(smtpUsernameFlagName, "", "login to smtp server")
var smtpPassword *string = flag.String(smtpPasswordFlagName, "", "password to smtp server")

func minElement(slice []int64) int64 {
var min int64 = math.MaxInt64
for _, blockID := range slice {
if blockID < min {
min = blockID
}
}
return min
}

func flagsOverrideConfig(conf *config.Config) {
flag.Visit(func(flag *flag.Flag) {
switch flag.Name {
case nodesListFlagName:
nodesList := strings.Split(*nodesList, ",")
conf.NodesList = nodesList
case daemonModeFlagName:
conf.Daemon.DaemonMode = *daemonMode
case queryingPeriodFlagName:
conf.Daemon.QueryingPeriod = *queryingPeriod
case alertMessageToFlagName:
conf.AlertMessage.To = *alertMessageTo
case alertMessageSubjFlagName:
conf.AlertMessage.Subject = *alertMessageSubj
case alertMessageFromFlagName:
conf.AlertMessage.From = *alertMessageFrom
case smtpHostFlagName:
conf.Smtp.Host = *smtpHost
case smtpPortFlagName:
conf.Smtp.Port = *smtpPort
case smtpUsernameFlagName:
conf.Smtp.Username = *smtpUsername
case smtpPasswordFlagName:
conf.Smtp.Password = *smtpPassword
}
})
}

func sendEmail(smtpConf *config.Smtp, alertConf *config.AlertMessage, message string) error {
auth := smtp.PlainAuth("", smtpConf.Username, smtpConf.Password, smtpConf.Host)
to := []string{alertConf.To}
msg := []byte(fmt.Sprintf("From: %s\r\n", alertConf.From) +
fmt.Sprintf("To: %s\r\n", alertConf.To) +
fmt.Sprintf("Subject: %s\r\n", alertConf.Subject) +
"\r\n" +
fmt.Sprintf("%s\r\n", message))
err := smtp.SendMail(fmt.Sprintf("%s:%d", smtpConf.Host, smtpConf.Port), auth, alertConf.From, to, msg)
if err != nil {
log.WithFields(log.Fields{"error": err}).Error("sending email")
}
return err
}

func monitor(conf *config.Config) {
maxBlockIDs, err := query.MaxBlockIDs(conf.NodesList)
if err != nil {
sendEmail(&conf.Smtp, &conf.AlertMessage, "problem getting node max block id :"+err.Error())
return
}
blockInfos, err := query.BlockInfo(conf.NodesList, minElement(maxBlockIDs))
if err != nil {
sendEmail(&conf.Smtp, &conf.AlertMessage, "problem getting node block info :"+err.Error())
return
}
hash2Node := map[string][]string{}
for node, blockInfo := range blockInfos {
rollbacksHash := string(blockInfo.RollbacksHash)
if _, ok := hash2Node[rollbacksHash]; !ok {
hash2Node[rollbacksHash] = []string{}
}
hash2Node[rollbacksHash] = append(hash2Node[rollbacksHash], node)
}
if len(hash2Node) > 1 {
hash2NodeStrResults := []string{}
for k, v := range hash2Node {
hash2NodeStrResults = append(hash2NodeStrResults, fmt.Sprintf("%x: %s", k, v))
}
sendEmail(&conf.Smtp, &conf.AlertMessage, fmt.Sprintf("nodes unsynced. Rollback hashes are: %s", strings.Join(hash2NodeStrResults, ",")))
}
}

func main() {
flag.Parse()
conf := &config.Config{}
if err := conf.Read(*configPath); err != nil {
log.WithFields(log.Fields{"error": err}).Fatal("reading config")
}
flagsOverrideConfig(conf)
if conf.Daemon.DaemonMode {
ticker := time.NewTicker(time.Second * time.Duration(conf.Daemon.QueryingPeriod))
for _ = range ticker.C {
monitor(conf)
}
} else {
monitor(conf)
}
}
71 changes: 71 additions & 0 deletions tools/desync_monitor/query/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package query

import (
"fmt"
"sync"

"github.com/AplaProject/go-apla/packages/api"
)

const maxBlockIDEndpoint = "/api/v2/maxblockid"
const blockInfoEndpoint = "/api/v2/block/%d"

type MaxBlockID struct {
MaxBlockID int64 `json:"max_block_id"`
}

func MaxBlockIDs(nodesList []string) ([]int64, error) {
wg := sync.WaitGroup{}
workResults := ConcurrentMap{m: map[string]interface{}{}}
for _, nodeUrl := range nodesList {
wg.Add(1)
go func(url string) {
defer wg.Done()
maxBlockID := &MaxBlockID{}
if err := sendGetRequest(url+maxBlockIDEndpoint, maxBlockID); err != nil {
workResults.Set(url, err)
return
}
workResults.Set(url, maxBlockID.MaxBlockID)
}(nodeUrl)
}
wg.Wait()
maxBlockIds := []int64{}
for _, result := range workResults.m {
switch res := result.(type) {
case int64:
maxBlockIds = append(maxBlockIds, res)
case error:
return nil, res
}
}
return maxBlockIds, nil
}

func BlockInfo(nodesList []string, blockID int64) (map[string]*api.GetBlockInfoResult, error) {
wg := sync.WaitGroup{}
workResults := ConcurrentMap{m: map[string]interface{}{}}
for _, nodeUrl := range nodesList {
wg.Add(1)
go func(url string) {
defer wg.Done()
blockInfo := &api.GetBlockInfoResult{}
if err := sendGetRequest(url+fmt.Sprintf(blockInfoEndpoint, blockID), blockInfo); err != nil {
workResults.Set(url, err)
return
}
workResults.Set(url, blockInfo)
}(nodeUrl)
}
wg.Wait()
result := map[string]*api.GetBlockInfoResult{}
for nodeUrl, blockInfoOrError := range workResults.m {
switch res := blockInfoOrError.(type) {
case error:
return nil, res
case *api.GetBlockInfoResult:
result[nodeUrl] = res
}
}
return result, nil
}
54 changes: 54 additions & 0 deletions tools/desync_monitor/query/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package query

import (
"encoding/json"
"fmt"
"net/http"
"sync"

"io/ioutil"

log "github.com/sirupsen/logrus"
)

type ConcurrentMap struct {
m map[string]interface{}
mu sync.RWMutex
}

func (c *ConcurrentMap) Set(key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.m[key] = value
}

func (c ConcurrentMap) Get(key string) (bool, interface{}) {
c.mu.RLock()
defer c.mu.RUnlock()
res, ok := c.m[key]
return ok, res
}

func sendGetRequest(url string, v interface{}) error {
resp, err := http.Get(url)
if err != nil {
log.WithFields(log.Fields{"url": url, "error": err}).Error("get requesting url")
return err
}
if resp.StatusCode != http.StatusOK {
err := fmt.Errorf("status code is not OK %d", resp.StatusCode)
log.WithFields(log.Fields{"url": url, "error": err}).Error("incorrect status code")
return err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.WithFields(log.Fields{"url": url, "error": err}).Error("reading response body")
return err
}
if err := json.Unmarshal(data, v); err != nil {
log.WithFields(log.Fields{"data": string(data), "error": err}).Error("unmarshalling json to struct")
return err
}
return nil
}

0 comments on commit 60780b4

Please sign in to comment.