Skip to content

Commit

Permalink
Upgred to go 1.21
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin committed Jan 21, 2024
1 parent 9c9a3f7 commit 8ea4f0e
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 55 deletions.
13 changes: 9 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
module github.com/bezineb5/go-h264-streamer

go 1.14
go 1.21

require (
github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2
github.com/gorilla/handlers v1.5.2
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
)

require (
github.com/felixge/httpsnoop v1.0.3 // indirect
golang.org/x/net v0.17.0 // indirect
)
16 changes: 10 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
github.com/gorilla/handlers v1.4.2 h1:0QniY0USkHQ1RGCLfKxeNHK9bkDHGRYGNDFBCS+YARg=
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
19 changes: 11 additions & 8 deletions stream/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"context"
"io"
"log"
"log/slog"
"os/exec"
"strconv"
"sync"
Expand Down Expand Up @@ -41,9 +41,11 @@ func Video(options CameraOptions, writer io.Writer, connectionsChange chan int)

for n := range connectionsChange {
if n == 0 {
// No more connections, stop the camera
firstConnection = true
stopChan <- struct{}{}
} else if firstConnection {
// First connection, start the camera
firstConnection = false
go startCamera(options, writer, stopChan, &cameraStarted)
}
Expand All @@ -53,7 +55,7 @@ func Video(options CameraOptions, writer io.Writer, connectionsChange chan int)
func startCamera(options CameraOptions, writer io.Writer, stop <-chan struct{}, mutex *sync.Mutex) {
mutex.Lock()
defer mutex.Unlock()
defer log.Println("Stopped raspivid")
defer slog.Info("startCamera: Stopped camera")

args := []string{
"--inline", // H264: Force PPS/SPS header with every I frame
Expand Down Expand Up @@ -87,14 +89,14 @@ func startCamera(options CameraOptions, writer io.Writer, stop <-chan struct{},

stdout, err := cmd.StdoutPipe()
if err != nil {
log.Println(err)
slog.Error("startCamera: Error getting stdout pipe", slog.Any("error", err))
return
}
if err := cmd.Start(); err != nil {
log.Print(err)
slog.Error("startCamera: Error starting camera", slog.Any("error", err))
return
}
log.Println("Started "+command, cmd.Args)
slog.Debug("startCamera: Started camera", slog.String("command", command), slog.Any("args", args))

p := make([]byte, readBufferSize)
buffer := make([]byte, bufferSizeKB*1024)
Expand All @@ -104,16 +106,17 @@ func startCamera(options CameraOptions, writer io.Writer, stop <-chan struct{},
for {
select {
case <-stop:
log.Println("Stop requested")
slog.Debug("startCamera: Stop requested")
return
default:
n, err := stdout.Read(p)
if err != nil {
if err == io.EOF {
log.Println("[" + command + "] EOF")
slog.Debug("startCamera: EOF", slog.String("command", command))
return
}
log.Println(err)
slog.Error("startCamera: Error reading from camera; ignoring", slog.Any("error", err))
continue
}

copied := copy(buffer[currentPos:], p[:n])
Expand Down
65 changes: 28 additions & 37 deletions websocket_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"io"
"log"
"log/slog"
"net/http"
"time"

Expand All @@ -22,11 +22,11 @@ type WebSocketHandler interface {

// webSocketHandler main structure
type webSocketHandler struct {
connections map[*connection]bool // Registered connections.
broadcast chan []byte // Inbound messages from the connections.
register chan *connection // Register requests from the connections.
unregister chan *connection // Unregister requests from connections.
connectionNumber chan int
connections map[*connection]bool // Registered connections.
broadcast chan []byte // Inbound messages from the connections.
register chan *connection // Register requests from the connections.
unregister chan *connection // Unregister requests from connections.
connectionCount chan int
}

var upgrader = websocket.Upgrader{
Expand All @@ -40,15 +40,12 @@ func (c *connection) reader(errCh chan bool) {
for {
messageType, message, err := c.ws.ReadMessage()
if err != nil {
log.Println("[Reader] Error", err)
slog.Error("connection: Error reading message from websocket: ", err)
defer func() { errCh <- true }()
return
}

log.Println("Received message ", messageType, message, err)
log.Println("Received message " + string(message))

//parseMessageSent(string(message))
slog.Info("connection: Received message; ignoring", slog.Int("messageType", messageType), slog.String("message", string(message)))
}
}

Expand All @@ -57,8 +54,8 @@ func (c *connection) writer(errCh chan bool) {
for msg := range c.send {
err := c.ws.WriteMessage(websocket.BinaryMessage, msg)
if err != nil {
log.Println("[Writer] Error", err)
defer func() { errCh <- true }()
slog.Error("connection: Error writing message to websocket: ", err)
errCh <- true
break
}
}
Expand All @@ -71,15 +68,15 @@ func (wsh *webSocketHandler) Handler(w http.ResponseWriter, r *http.Request) {

ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("Got error while upgrading connection", err)
slog.Error("connection: Error upgrading connection to websocket: ", err)
return
}
defer ws.Close()

// we have a initialized websocket connection.
c := &connection{ws, make(chan []byte, 10)}

log.Println("Got connection")
slog.Debug("connection: Got connection")
// put it in the registration channel for the hub to take it.
wsh.register <- c
// create error channel. It will be used in case of errors to
Expand Down Expand Up @@ -107,62 +104,56 @@ func (wsh *webSocketHandler) run() {
for {
select {
case c := <-wsh.register:
log.Println("[hub] register")
wsh.connections[c] = true
log.Println("Register call end -> Number of current connections: ", len(wsh.connections))
if wsh.connectionNumber != nil {
wsh.connectionNumber <- len(wsh.connections)
slog.Debug("webSocketHandler: Register call", slog.Int("number of connections", len(wsh.connections)))
if wsh.connectionCount != nil {
wsh.connectionCount <- len(wsh.connections)
}
break

case c := <-wsh.unregister:
log.Println("[hub] unregister")
if _, ok := wsh.connections[c]; ok {
delete(wsh.connections, c)
close(c.send)
}
log.Println("Unregister call end -> Number of current connections: ", len(wsh.connections))
if wsh.connectionNumber != nil {
wsh.connectionNumber <- len(wsh.connections)
slog.Debug("webSocketHandler: Unregister call", slog.Int("number of connections", len(wsh.connections)))
if wsh.connectionCount != nil {
wsh.connectionCount <- len(wsh.connections)
}
break

case msg := <-wsh.broadcast:
for c := range wsh.connections {
select {
case c.send <- msg:
continue
case <-time.After(100 * time.Millisecond):
log.Println("[WSHandler] Skipping message to connection")
slog.Warn("webSocketHandler: Timeout sending message to connection")
// skip message if timeout
}
}
break
}
}
}

// Send puts message body into the queue of messages that have to be
// broadcasted to clients.
func (wsh *webSocketHandler) Write(data []byte) (n int, err error) {
func (wsh *webSocketHandler) Write(data []byte) (int, error) {
// Optimization: don't send if there is no connection
if len(wsh.connections) <= 0 {
return
return 0, nil
}

wsh.broadcast <- data
n = len(data)
return
return len(data), nil
}

// NewWebSocketHandler builds new websocket handler to communicate upstream
func NewWebSocketHandler(connectionNumber chan int) WebSocketHandler {
func NewWebSocketHandler(connectionCount chan int) WebSocketHandler {
wsh := webSocketHandler{
broadcast: make(chan []byte),
register: make(chan *connection),
unregister: make(chan *connection),
connections: make(map[*connection]bool),
connectionNumber: connectionNumber,
broadcast: make(chan []byte),
register: make(chan *connection),
unregister: make(chan *connection),
connections: make(map[*connection]bool),
connectionCount: connectionCount,
}

go wsh.run()
Expand Down

0 comments on commit 8ea4f0e

Please sign in to comment.