Skip to content
This repository has been archived by the owner on Jun 25, 2019. It is now read-only.

Commit

Permalink
Merge pull request #72 from andig/readings
Browse files Browse the repository at this point in the history
* Move files
* Split meters and readings
* Embed Operation in Snip
* Fix race conditions
* Various simplifations
* Embed MeasurementCache
  • Loading branch information
andig committed Sep 21, 2018
2 parents 0718ed2 + 2e10ba8 commit 48be9a0
Show file tree
Hide file tree
Showing 21 changed files with 966 additions and 927 deletions.
46 changes: 24 additions & 22 deletions cmd/sdm630/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"strings"
"time"

"github.com/gonium/gosdm630"
. "github.com/gonium/gosdm630"
. "github.com/gonium/gosdm630/internal/meters"
"gopkg.in/urfave/cli.v1"
)

Expand All @@ -19,7 +20,7 @@ func main() {
app := cli.NewApp()
app.Name = "sdm"
app.Usage = "SDM modbus daemon"
app.Version = sdm630.RELEASEVERSION
app.Version = RELEASEVERSION
app.HideVersion = true
app.Flags = []cli.Flag{
// general
Expand All @@ -30,14 +31,14 @@ func main() {
},
cli.IntFlag{
Name: "comset, c",
Value: sdm630.ModbusComset9600_8N1,
Value: ModbusComset9600_8N1,
Usage: `which communication parameter set to use. Valid sets are
` + strconv.Itoa(sdm630.ModbusComset2400_8N1) + `: 2400 baud, 8N1
` + strconv.Itoa(sdm630.ModbusComset9600_8N1) + `: 9600 baud, 8N1
` + strconv.Itoa(sdm630.ModbusComset19200_8N1) + `: 19200 baud, 8N1
` + strconv.Itoa(sdm630.ModbusComset2400_8E1) + `: 2400 baud, 8E1
` + strconv.Itoa(sdm630.ModbusComset9600_8E1) + `: 9600 baud, 8E1
` + strconv.Itoa(sdm630.ModbusComset19200_8E1) + `: 19200 baud, 8E1
` + strconv.Itoa(ModbusComset2400_8N1) + `: 2400 baud, 8N1
` + strconv.Itoa(ModbusComset9600_8N1) + `: 9600 baud, 8N1
` + strconv.Itoa(ModbusComset19200_8N1) + `: 19200 baud, 8N1
` + strconv.Itoa(ModbusComset2400_8E1) + `: 2400 baud, 8E1
` + strconv.Itoa(ModbusComset9600_8E1) + `: 9600 baud, 8E1
` + strconv.Itoa(ModbusComset19200_8E1) + `: 19200 baud, 8E1
`,
},
cli.StringFlag{
Expand Down Expand Up @@ -122,11 +123,11 @@ func main() {

app.Action = func(c *cli.Context) {
// Set unique ID format
sdm630.UniqueIdFormat = c.String("unique_id_format")
UniqueIdFormat = c.String("unique_id_format")

// Parse the device_list parameter
deviceslice := strings.Split(c.String("device_list"), ",")
meters := make(map[uint8]*sdm630.Meter)
meters := make(map[uint8]*Meter)
for _, meterdef := range deviceslice {
splitdef := strings.Split(meterdef, ":")
if len(splitdef) != 2 {
Expand All @@ -137,47 +138,47 @@ func main() {
if err != nil {
log.Fatalf("Error parsing device id %s: %s. See -h for help.", meterdef, err.Error())
}
meter, err := sdm630.NewMeterByType(metertype, uint8(id), DEFAULT_METER_STORE_SECONDS)
meter, err := NewMeterByType(metertype, uint8(id), DEFAULT_METER_STORE_SECONDS)
if err != nil {
log.Fatalf("Unknown meter type %s for device %d. See -h for help.", metertype, id)
}
meters[uint8(id)] = meter
}

// create ModbusEngine with status
status := sdm630.NewStatus(meters)
qe := sdm630.NewModbusEngine(
status := NewStatus(meters)
qe := NewModbusEngine(
c.String("serialadapter"),
c.Int("comset"),
c.Bool("verbose"),
status,
)

// scheduler and meter data channel
scheduler, snips := sdm630.SetupScheduler(meters, qe)
scheduler, snips := SetupScheduler(meters, qe)
go scheduler.Run()

// tee that broadcasts meter messages to multiple recipients
tee := sdm630.NewQuerySnipBroadcaster(snips)
tee := NewQuerySnipBroadcaster(snips)
go tee.Run()

// longpoll firehose
var firehose *sdm630.Firehose
var firehose *Firehose
if false {
firehose = sdm630.NewFirehose(
firehose = NewFirehose(
tee.Attach(),
status,
c.Bool("verbose"))
go firehose.Run()
}

// websocket hub
hub := sdm630.NewSocketHub(tee.Attach(), status)
hub := NewSocketHub(tee.Attach(), status)
go hub.Run()

// MQTT client
if c.String("broker") != "" {
mqtt := sdm630.NewMqttClient(
mqtt := NewMqttClient(
tee.Attach(),
c.String("broker"),
c.String("topic"),
Expand All @@ -192,16 +193,17 @@ func main() {
}

// MeasurementCache for REST API
mc := sdm630.NewMeasurementCache(
mc := NewMeasurementCache(
meters,
tee.Attach(),
scheduler,
DEFAULT_METER_STORE_SECONDS,
c.Bool("verbose"),
)
go mc.Consume()

log.Printf("Starting API httpd at %s", c.String("url"))
sdm630.Run_httpd(
Run_httpd(
mc,
firehose,
hub,
Expand Down
42 changes: 22 additions & 20 deletions cmd/sdm630_httpd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"strings"
"time"

"github.com/gonium/gosdm630"
. "github.com/gonium/gosdm630"
. "github.com/gonium/gosdm630/internal/meters"
"gopkg.in/urfave/cli.v1"
)

Expand All @@ -19,7 +20,7 @@ func main() {
app := cli.NewApp()
app.Name = "sdm630_httpd"
app.Usage = "SDM630 power measurements via HTTP."
app.Version = sdm630.RELEASEVERSION
app.Version = RELEASEVERSION
app.HideVersion = true
app.Flags = []cli.Flag{
cli.StringFlag{
Expand All @@ -29,14 +30,14 @@ func main() {
},
cli.IntFlag{
Name: "comset, c",
Value: sdm630.ModbusComset9600_8N1,
Value: ModbusComset9600_8N1,
Usage: `which communication parameter set to use. Valid sets are
` + strconv.Itoa(sdm630.ModbusComset2400_8N1) + `: 2400 baud, 8N1
` + strconv.Itoa(sdm630.ModbusComset9600_8N1) + `: 9600 baud, 8N1
` + strconv.Itoa(sdm630.ModbusComset19200_8N1) + `: 19200 baud, 8N1
` + strconv.Itoa(sdm630.ModbusComset2400_8E1) + `: 2400 baud, 8E1
` + strconv.Itoa(sdm630.ModbusComset9600_8E1) + `: 9600 baud, 8E1
` + strconv.Itoa(sdm630.ModbusComset19200_8E1) + `: 19200 baud, 8E1
` + strconv.Itoa(ModbusComset2400_8N1) + `: 2400 baud, 8N1
` + strconv.Itoa(ModbusComset9600_8N1) + `: 9600 baud, 8N1
` + strconv.Itoa(ModbusComset19200_8N1) + `: 19200 baud, 8N1
` + strconv.Itoa(ModbusComset2400_8E1) + `: 2400 baud, 8E1
` + strconv.Itoa(ModbusComset9600_8E1) + `: 9600 baud, 8E1
` + strconv.Itoa(ModbusComset19200_8E1) + `: 19200 baud, 8E1
`,
},
cli.StringFlag{
Expand Down Expand Up @@ -70,11 +71,11 @@ func main() {

app.Action = func(c *cli.Context) {
// Set unique ID format
sdm630.UniqueIdFormat = c.String("unique_id_format")
UniqueIdFormat = c.String("unique_id_format")

// Parse the device_list parameter
deviceslice := strings.Split(c.String("device_list"), ",")
meters := make(map[uint8]*sdm630.Meter)
meters := make(map[uint8]*Meter)
for _, meterdef := range deviceslice {
splitdef := strings.Split(meterdef, ":")
if len(splitdef) != 2 {
Expand All @@ -85,52 +86,53 @@ func main() {
if err != nil {
log.Fatalf("Error parsing device id %s: %s. See -h for help.", meterdef, err.Error())
}
meter, err := sdm630.NewMeterByType(metertype, uint8(id), DEFAULT_METER_STORE_SECONDS)
meter, err := NewMeterByType(metertype, uint8(id), DEFAULT_METER_STORE_SECONDS)
if err != nil {
log.Fatalf("Unknown meter type %s for device %d. See -h for help.", metertype, id)
}
meters[uint8(id)] = meter
}

// create ModbusEngine with status
status := sdm630.NewStatus(meters)
qe := sdm630.NewModbusEngine(
status := NewStatus(meters)
qe := NewModbusEngine(
c.String("serialadapter"),
c.Int("comset"),
c.Bool("verbose"),
status,
)

// scheduler and meter data channel
scheduler, snips := sdm630.SetupScheduler(meters, qe)
scheduler, snips := SetupScheduler(meters, qe)
go scheduler.Run()

// tee that broadcasts meter messages to multiple recipients
tee := sdm630.NewQuerySnipBroadcaster(snips)
tee := NewQuerySnipBroadcaster(snips)
go tee.Run()

// Longpoll firehose
firehose := sdm630.NewFirehose(
firehose := NewFirehose(
tee.Attach(),
status,
c.Bool("verbose"))
go firehose.Run()

// websocket hub
hub := sdm630.NewSocketHub(tee.Attach(), status)
hub := NewSocketHub(tee.Attach(), status)
go hub.Run()

// MeasurementCache for REST API
mc := sdm630.NewMeasurementCache(
mc := NewMeasurementCache(
meters,
tee.Attach(),
scheduler,
DEFAULT_METER_STORE_SECONDS,
c.Bool("verbose"),
)
go mc.Consume()

log.Printf("Starting API httpd at %s", c.String("url"))
sdm630.Run_httpd(
Run_httpd(
mc,
firehose,
hub,
Expand Down
Loading

0 comments on commit 48be9a0

Please sign in to comment.