Skip to content

Commit

Permalink
Merge pull request #60 from bhagatvansh/main
Browse files Browse the repository at this point in the history
Fixes #57 , channels and go routine added to bark client @bhagatvansh good work on this one.
  • Loading branch information
vaibhav-kaushal committed Oct 3, 2023
2 parents 0d22c33 + df80f3d commit ef600e1
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 28 deletions.
3 changes: 0 additions & 3 deletions .env

This file was deleted.

11 changes: 11 additions & 0 deletions client/channels/client_request_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package channels

import "github.com/techrail/bark/models"

const ClientChannelCapacity = 10000

var ClientChannel chan models.BarkLog

func init() {
ClientChannel = make(chan models.BarkLog, ClientChannelCapacity)
}
17 changes: 7 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package client

import (
"fmt"
`strings`
"github.com/techrail/bark/client/controllers"
"github.com/techrail/bark/client/services/clientLogSender"
"strings"

`github.com/techrail/bark/appRuntime`
`github.com/techrail/bark/constants`
"github.com/techrail/bark/appRuntime"
"github.com/techrail/bark/constants"
"github.com/techrail/bark/models"
)

Expand Down Expand Up @@ -145,13 +147,7 @@ func (c *Config) sendLogToServer(message, logLevel string) {

log.Code = getCode(&log)

go func() {
_, err := PostLog(c.BaseUrl+"/insertSingle", log)
if err.Severity == 1 {
fmt.Println(err.Error())
return
}
}()
controllers.SendSingleToClientChannel(log)

fmt.Printf("%s:\t %s -- %s\n", logLevel, c.SessionName, message)
// Todo: Add uber zap to avoid printing with PrintF (We don't want to handle sendLogToServer printing)
Expand All @@ -167,6 +163,7 @@ func NewClient(url, errLevel, svcName, sessName string) *Config {
sessName = appRuntime.SessionName
fmt.Printf("L#1L3WBF - Using %v as Session Name", sessName)
}
go clientLogSender.StartSendingLogs(url)

return &Config{
BaseUrl: url,
Expand Down
11 changes: 11 additions & 0 deletions client/controllers/request_ingestion_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package controllers

import (
"github.com/techrail/bark/client/services/ingestion"
"github.com/techrail/bark/models"
)

func SendSingleToClientChannel(l models.BarkLog) {

go ingestion.InsertSingleRequest(l)
}
4 changes: 2 additions & 2 deletions client/network.go → client/network/network.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package client
package network

import (
"encoding/json"
`fmt`
"fmt"

"github.com/valyala/fasthttp"

Expand Down
47 changes: 47 additions & 0 deletions client/services/clientLogSender/log_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package clientLogSender

import (
"fmt"
"github.com/techrail/bark/client/channels"
"github.com/techrail/bark/client/network"
"github.com/techrail/bark/constants"
"github.com/techrail/bark/models"
"time"
)

const logBatchSizeStandard = 100

func StartSendingLogs(serverUrl string) {
clientChannelLength := 0
for {
clientChannelLength = len(channels.ClientChannel)
var logBatch = []models.BarkLog{}
if clientChannelLength > logBatchSizeStandard {
// Bulk insert
for i := 0; i < logBatchSizeStandard; i++ {
elem, ok := <-channels.ClientChannel
if !ok {
fmt.Println("E# - Error occurred while getting batch from channel")
break // Something went wrong
}
logBatch = append(logBatch, elem)
}
_, err := network.PostLogs(serverUrl+constants.BatchInsertUrl, logBatch)

if err.Severity == 1 {
fmt.Println(err.Msg)
}
fmt.Println("L# - Batch sent at ", time.Now().Format("2006-01-02 15:04:05"))
} else if clientChannelLength > 0 && clientChannelLength < logBatchSizeStandard {
// Commit one at a time
singleLog := <-channels.ClientChannel
_, err := network.PostLog(serverUrl+constants.SingleInsertUrl, singleLog)
if err.Severity == 1 {
fmt.Println(err.Msg)
}
} else {
time.Sleep(1 * time.Second)

}
}
}
16 changes: 16 additions & 0 deletions client/services/ingestion/log_request_ingestion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ingestion

import (
"fmt"
"github.com/techrail/bark/client/channels"
"github.com/techrail/bark/models"
)

func InsertSingleRequest(logEntry models.BarkLog) {
if len(channels.ClientChannel) > channels.ClientChannelCapacity-1 {
fmt.Printf("E#1LB9MN - Channel is full. Cannot push. Log received: | %v\n", logEntry)
return
}
channels.ClientChannel <- logEntry

}
5 changes: 5 additions & 0 deletions constants/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ const (
Info = "INFO"
Debug = "DEBUG"
)

const (
SingleInsertUrl = "insertSingle"
BatchInsertUrl = "insertMultiple"
)
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Init() string {
if len(port) == 0 {
port = "8080"
}
address := ":"+port
address := ":" + port
return address
}

Expand All @@ -41,5 +41,5 @@ func main() {
}
go dbLogWriter.StartWritingLogs()
log.Fatal(fasthttp.ListenAndServe(address, r.Handler))

}
14 changes: 14 additions & 0 deletions services/dbLogWriter/db_log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ func StartWritingLogs() {
if appRuntime.ShutdownRequested.Load() == true {
if len(channels.LogChannel) == 0 {
return
} else {
for i := 0; i < len(channels.LogChannel); i++ {
elem, ok := <-channels.LogChannel
if !ok {
fmt.Println("E#1KSPGX - Error occured while getting batch from channel")
break // Something went wrong
}
logBatch = append(logBatch, elem)
}
err := BarkLogDao.InsertBatch(logBatch)
if err != nil {
fmt.Println(err)
}
fmt.Println("L#1KSPHD - Batch inserted at ", time.Now().Format("2006-01-02 15:04:05"))
}
} else {
// fmt.Println("in sleep")
Expand Down
24 changes: 13 additions & 11 deletions tests/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,25 @@ import (
func Test_requester(t *testing.T) {
logClient := client.NewClient("http://localhost:8080/", "INFO", "ServicName", "localRun")

// Print with formatter

// Print with formatter
logClient.Error("Anime: Naruto")
logClient.Info("Anime: One Piece")
logClient.Debug("Anime: Bleach")
logClient.Warn("Anime: AOT")

// Print without formatter
logClient.Errorf("Anime: %s", "Full Metal Alchemist")
logClient.Infof("Anime: %s", "Tokyo Ghoul")
logClient.Warnf("Anime: %s", "")
logClient.Debugf("I want to print something! %s", "weirdString")

// Multiple Logs
//logClient.Debug("Anime: Bleach")
//logClient.Warn("Anime: AOT")
//
//// Print without formatter
//logClient.Errorf("Anime: %s", "Full Metal Alchemist")
//logClient.Infof("Anime: %s", "Tokyo Ghoul")
//logClient.Warnf("Anime: %s", "")
//logClient.Debugf("I want to print something! %s", "weirdString")
//
//// Multiple Logs
var logs []models.BarkLog
logs = make([]models.BarkLog, 3)
logs[0] = models.BarkLog{Message: "someMessage"}
logs[1] = models.BarkLog{Message: "someMessage"}
logs[2] = models.BarkLog{Message: "someMessage"}
client.PostLogs("http://localhost:8080/insertMultiple", logs)
logClient.Debug("Random error")
}

0 comments on commit ef600e1

Please sign in to comment.