Skip to content

Commit

Permalink
Change websockets library
Browse files Browse the repository at this point in the history
Change for nhooyr.io/websockets and adjust handlers
  • Loading branch information
referefref authored Mar 31, 2024
1 parent fe3ce10 commit f218af4
Showing 1 changed file with 43 additions and 16 deletions.
59 changes: 43 additions & 16 deletions server/honeymap.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package main

import (
"context"
"encoding/json"
"github.com/fw42/go-hpfeeds"
"github.com/fzzy/sockjs-go/sockjs"
"io/ioutil"
"log"
"net/http"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
"os"
"time"
"runtime"
"path"
"runtime"
"sync"
"time"
)

const staticDir = "../client"
Expand Down Expand Up @@ -46,22 +49,46 @@ func checkFatalError(err error) {
}
}

var sockjsClients *sockjs.SessionPool = sockjs.NewSessionPool()
var clients sync.Map

func wsHandler(w http.ResponseWriter, r *http.Request) {
conn, err := websocket.Accept(w, r, nil)
if err != nil {
log.Printf("WebSocket accept error: %v", err)
return
}

clients.Store(conn, struct{}{})

defer func() {
clients.Delete(conn)
conn.Close(websocket.StatusNormalClosure, "")
}()

func dataHandler(s sockjs.Session) {
sockjsClients.Add(s)
defer sockjsClients.Remove(s)
for {
m := s.Receive()
if m == nil {
_, _, err := conn.Read(context.Background())
if err != nil {
break
}
}
}

func broadcast(input chan hpfeeds.Message) {
for msg := range input {
sockjsClients.Broadcast(msg.Payload)
clients.Range(func(key, value interface{}) bool {
conn, ok := key.(*websocket.Conn)
if !ok {
return true
}

go func(conn *websocket.Conn) {
err := wsjson.Write(context.Background(), conn, msg.Payload)
if err != nil {
log.Printf("Error broadcasting message: %v", err)
}
}(conn)
return true
})
}
}

Expand All @@ -77,10 +104,12 @@ func hpfeedsConnect(config Config, geolocEvents chan hpfeeds.Message) {
hp.Subscribe("geoloc.events", geolocEvents)
<-hp.Disconnected
log.Printf("Lost connection to %s:%d :-(\n", config.Host, config.Port)
} else {
log.Printf("Connection failed: %v", err)
}
log.Printf("Reconnecting to %s:%d\n", config.Host, config.Port)
log.Printf("Reconnecting in %d seconds...", backoff)
time.Sleep(time.Duration(backoff) * time.Second)
if backoff <= 10 {
if backoff < 10 {
backoff++
}
}
Expand All @@ -90,15 +119,13 @@ func main() {
config := readConfig()

http.Handle("/", http.FileServer(http.Dir(dirname() + "/" + staticDir + "/")))
sockjsMux := sockjs.NewServeMux(http.DefaultServeMux)
sockjsConf := sockjs.NewConfig()
sockjsMux.Handle("/data", dataHandler, sockjsConf)
http.HandleFunc("/data", wsHandler)

geolocEvents := make(chan hpfeeds.Message)
go hpfeedsConnect(config, geolocEvents)
go broadcast(geolocEvents)

log.Printf("Binding Honeymap webserver to %s...", bind)
err := http.ListenAndServe(bind, sockjsMux)
err := http.ListenAndServe(bind, nil)
checkFatalError(err)
}

0 comments on commit f218af4

Please sign in to comment.