diff --git a/internal/analysis/processor/actions.go b/internal/analysis/processor/actions.go index c3f301ac..38ba45f2 100644 --- a/internal/analysis/processor/actions.go +++ b/internal/analysis/processor/actions.go @@ -4,6 +4,7 @@ package processor import ( "encoding/json" + "errors" "fmt" "log" "strings" @@ -150,32 +151,20 @@ func (a BirdWeatherAction) Execute(data interface{}) error { } // Execute sends the note to the MQTT broker -import ( -"errors" - "encoding/json" - "fmt" - "log" - "strings" - "time" - - "github.com/tphakala/birdnet-go/internal/birdnet" - "github.com/tphakala/birdnet-go/internal/birdweather" - "github.com/tphakala/birdnet-go/internal/conf" - "github.com/tphakala/birdnet-go/internal/datastore" - "github.com/tphakala/birdnet-go/internal/mqtt" - "github.com/tphakala/birdnet-go/internal/myaudio" - "github.com/tphakala/birdnet-go/internal/observation" -) func (a MqttAction) Execute(data interface{}) error { - if a.Settings.Realtime.MQTT.Topic == "" { - return errors.New("MQTT topic is not specified") - } + // Validate MQTT settings + if a.Settings.Realtime.MQTT.Topic == "" { + return errors.New("MQTT topic is not specified") + } + + // Create a JSON representation of the note noteJson, err := json.Marshal(a.Note) if err != nil { log.Printf("error marshalling note to JSON: %s\n", err) return err } + // Publish the note to the MQTT broker err = a.MqttClient.Publish(a.Settings.Realtime.MQTT.Topic, string(noteJson)) if err != nil { return err diff --git a/internal/analysis/processor/processor.go b/internal/analysis/processor/processor.go index c97ead0a..fd5b1e06 100644 --- a/internal/analysis/processor/processor.go +++ b/internal/analysis/processor/processor.go @@ -89,6 +89,7 @@ func New(settings *conf.Settings, ds datastore.Interface, bn *birdnet.BirdNET, a p.BwClient = birdweather.New(settings) } + // Initialize MQTT client if enabled in settings. if settings.Realtime.MQTT.Enabled { p.MqttClient = mqtt.New(settings) diff --git a/internal/analysis/processor/workers.go b/internal/analysis/processor/workers.go index 41ba5cbc..b7163d98 100644 --- a/internal/analysis/processor/workers.go +++ b/internal/analysis/processor/workers.go @@ -85,6 +85,7 @@ func (p *Processor) getDefaultActions(detection Detections) []Action { if p.Settings.Realtime.Log.Enabled { actions = append(actions, LogAction{Settings: p.Settings, EventTracker: p.EventTracker, Note: detection.Note}) } + if p.Settings.Output.SQLite.Enabled || p.Settings.Output.MySQL.Enabled { actions = append(actions, DatabaseAction{ Settings: p.Settings, @@ -94,19 +95,25 @@ func (p *Processor) getDefaultActions(detection Detections) []Action { AudioBuffer: p.AudioBuffer, Ds: p.Ds}) } + /* if p.Settings.Realtime.AudioExport.Enabled { actions = append(actions, SaveAudioAction{Settings: p.Settings, EventTracker: p.EventTracker, pcmData: detection.pcmDataExt, ClipName: detection.Note.ClipName}) }*/ + + // Add BirdWeatherAction if enabled if p.Settings.Realtime.Birdweather.Enabled { actions = append(actions, BirdWeatherAction{Settings: p.Settings, EventTracker: p.EventTracker, BwClient: p.BwClient, Note: detection.Note, pcmData: detection.pcmData3s}) } + + // Add MQTT action if enabled if p.Settings.Realtime.MQTT.Enabled { - if p.MqttClient == nil { - log.Println("MQTT client is not initialized, skipping MQTT action") - return actions - } + if p.MqttClient == nil { + log.Println("MQTT client is not initialized, skipping MQTT action") + return actions + } actions = append(actions, MqttAction{Settings: p.Settings, MqttClient: p.MqttClient, EventTracker: p.EventTracker, Note: detection.Note}) } + // Check if UpdateRangeFilterAction needs to be executed for the day today := time.Now().Truncate(24 * time.Hour) // Current date with time set to midnight if p.SpeciesListUpdated.Before(today) { diff --git a/internal/conf/config.go b/internal/conf/config.go index 1c042873..29ce2a6d 100644 --- a/internal/conf/config.go +++ b/internal/conf/config.go @@ -2,6 +2,7 @@ package conf import ( + "errors" "fmt" "os" "path/filepath" @@ -77,21 +78,6 @@ type Settings struct { Username string // MQTT username Password string // MQTT password } -func Load() (*Settings, error) { - settings := &Settings{} - if err := initViper(); err != nil { - return nil, fmt.Errorf("error initializing viper: %w", err) - } - if err := viper.Unmarshal(settings); err != nil { - return nil, fmt.Errorf("error unmarshaling config into struct: %w", err) - } - if settings.Realtime.MQTT.Enabled { - if settings.Realtime.MQTT.Broker == "" { - return nil, errors.New("MQTT broker URL is required when MQTT is enabled") - } - } - return settings, nil -} } WebServer struct { @@ -157,6 +143,12 @@ func Load() (*Settings, error) { return nil, fmt.Errorf("error unmarshaling config into struct: %w", err) } + // Validate MQTT settings + if settings.Realtime.MQTT.Enabled { + if settings.Realtime.MQTT.Broker == "" { + return nil, errors.New("MQTT broker URL is required when MQTT is enabled") + } + } return settings, nil } diff --git a/internal/mqtt/mqtt.go b/internal/mqtt/mqtt.go index a2f17f6a..19042089 100644 --- a/internal/mqtt/mqtt.go +++ b/internal/mqtt/mqtt.go @@ -14,13 +14,10 @@ type Client struct { internalClient mqtt.Client } -func New(settings *conf.Settings) (*Client, error) { - if settings == nil || settings.Realtime.MQTT.Broker == "" { - return nil, errors.New("invalid MQTT settings provided") - } +func New(settings *conf.Settings) *Client { return &Client{ Settings: settings, - }, nil + } } // Connect to MQTT broker @@ -41,8 +38,8 @@ func (c *Client) Connect() error { // It will wait infinitely until the connection is established if token := client.Connect(); token.Wait() && token.Error() != nil { - log.Printf("Failed to connect to MQTT broker: %s", token.Error()) - return errors.New("failed to connect to MQTT broker") + log.Printf("Failed to connect to MQTT broker: %s", token.Error()) + return errors.New("failed to connect to MQTT broker") } return nil