Skip to content

Commit

Permalink
Adapt main and handler code for config
Browse files Browse the repository at this point in the history
Both read from / push out for a given (optional) config file.

Added some more useless coverage-increasing unipitt tests.
  • Loading branch information
mhemeryck committed Nov 29, 2018
1 parent 09f3e3e commit e49b53d
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 10 deletions.
4 changes: 3 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func main() {
flag.StringVar(&sysFsRoot, "sysfs_root", unipitt.SysFsRoot, "Root folder to search for digital inputs")
var payload string
flag.StringVar(&payload, "payload", Payload, "Default MQTT message payload")
var configFile string
flag.StringVar(&configFile, "config", "", "Config file name")
flag.Parse()

// Show version and exit
Expand All @@ -53,7 +55,7 @@ func main() {
}

// Setup handler
handler, err := unipitt.NewHandler(broker, clientID, caFile, sysFsRoot)
handler, err := unipitt.NewHandler(broker, clientID, caFile, sysFsRoot, configFile)
if err != nil {
log.Fatal(err)
}
Expand Down
37 changes: 30 additions & 7 deletions unipitt.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,25 @@ type Handler struct {
readers []DigitalInputReader
writerMap map[string]DigitalOutputWriter
client mqtt.Client
config Configuration
}

// NewHandler prepares and sets up an entire unipitt handler
func NewHandler(broker string, clientID string, caFile string, sysFsRoot string) (h *Handler, err error) {
func NewHandler(broker string, clientID string, caFile string, sysFsRoot string, configFile string) (h *Handler, err error) {
h = &Handler{}

// Check if there's a mapping to be read
if configFile != "" {
log.Printf("Reading configuration file %s\n", configFile)
c, err := configFromFile(configFile)
if err != nil {
log.Printf("Error reading config file %s: %s\n", configFile, err)
} else {
h.config = c
}
}

// Digital writer setup
// Set message handler as callback
h.writerMap, err = FindDigitalOutputWriters(sysFsRoot)
if err != nil {
log.Printf("Error creating a map of digital output writers: %s\n", err)
Expand All @@ -45,14 +57,16 @@ func NewHandler(broker string, clientID string, caFile string, sysFsRoot string)
tlsConfig, err := NewTLSConfig(caFile)
if err != nil {
log.Printf("Error reading MQTT CA file %s: %s\n", caFile, err)
} else {
opts.SetTLSConfig(tlsConfig)
return h, err
}
opts.SetTLSConfig(tlsConfig)
}

// Callbacks for subscribe
var cb mqtt.MessageHandler = func(c mqtt.Client, msg mqtt.Message) {
if writer, ok := h.writerMap[msg.Topic()]; ok {
log.Printf("Handling message on topic %s\n", msg.Topic())
// Find corresponding writer
if writer, ok := h.writerMap[h.config.Name(msg.Topic())]; ok {
err := writer.Update(string(msg.Payload()) == MsgTrueValue)
if err != nil {
log.Printf("Error updating digital output with name %s: %s\n", writer.Name, err)
Expand All @@ -66,6 +80,13 @@ func NewHandler(broker string, clientID string, caFile string, sysFsRoot string)
if token := c.Subscribe(name, 0, cb); token.Wait() && token.Error() != nil {
log.Print(err)
}
// Also subscribe any given mapped topic for the names
topic := h.config.Topic(name)
if topic != name {
if token := c.Subscribe(topic, 0, cb); token.Wait() && token.Error() != nil {
log.Print(err)
}
}
}
}

Expand All @@ -81,6 +102,7 @@ func NewHandler(broker string, clientID string, caFile string, sysFsRoot string)
return
}
log.Printf("Created %d digital input reader instances from path %s\n", len(h.readers), sysFsRoot)

return
}

Expand All @@ -101,8 +123,9 @@ func (h *Handler) Poll(done chan bool, interval int, payload string) (err error)
if d.Err != nil {
log.Printf("Found error %s for name %s\n", d.Err, d.Name)
} else {
log.Printf("Trigger for name %s\n", d.Name)
if token := h.client.Publish(d.Name, 0, false, payload); token.Wait() && token.Error() != nil {
// Determine topic from config
log.Printf("Trigger for name %s, using topic %s\n", d.Name, h.config.Topic(d.Name))
if token := h.client.Publish(h.config.Topic(d.Name), 0, false, payload); token.Wait() && token.Error() != nil {
go backoff.Retry(h.connect, backoff.NewExponentialBackOff())
}
}
Expand Down
226 changes: 224 additions & 2 deletions unipitt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func TestHandler(t *testing.T) {
clientID := "unipitt"
caFile := ""
payload := "bar"
configFile := ""
pollingInterval := 50

// Setup a folder structure
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestHandler(t *testing.T) {
defer f.Close()
defer os.RemoveAll(root) // clean up

handler, err := NewHandler(broker, clientID, caFile, sysFsRoot)
handler, err := NewHandler(broker, clientID, caFile, sysFsRoot, configFile)
if err != nil {
t.Fatal(err)
}
Expand All @@ -64,7 +65,6 @@ func TestHandler(t *testing.T) {

// Start polling (blocking)
done := make(chan bool)
defer close(done)
// Trigger a send
go func() {
// Trigger a send
Expand All @@ -85,3 +85,225 @@ func TestHandler(t *testing.T) {
t.Fatal("Expected a reconnect for MQTT broker, did not find one")
}
}

func TestHandlerConfig(t *testing.T) {
// Config file setup
configFile, err := ioutil.TempFile("", "config")
if err != nil {
t.Fatal(err)
}
defer os.Remove(configFile.Name())

content := []byte(`
topics:
di_1_01: kitchen switch
do_2_02: living light
`)
if _, err := configFile.Write(content); err != nil {
t.Fatal(err)
}
if err := configFile.Close(); err != nil {
t.Fatal(err)
}

expected := &Configuration{
Topics: map[string]string{
"di_1_01": "kitchen switch",
"do_2_02": "living light",
},
}

// Handler setup
broker := "mqtts://foo"
clientID := "unipitt"
caFile := ""

// Setup a folder structure
folder := "di_1_01"
// Create temporary folder, only if it does not exist already
root, err := ioutil.TempDir("", "unipitt")
if err != nil {
t.Fatal(err)
}
sysFsRoot := filepath.Join(root, folder)
if _, pathErr := os.Stat(sysFsRoot); pathErr != nil {
err := os.Mkdir(sysFsRoot, os.ModePerm)
if err != nil {
t.Fatal(err)
}
}
// Create temporary path
tmpfn := filepath.Join(sysFsRoot, "di_value")
// Create temporary file handle
f, err := os.Create(tmpfn)
if err != nil {
t.Fatal(err)
}
// Put in zero-value
_, err = f.WriteString("0\n")
if err != nil {
t.Fatal(err)
}
defer f.Close()
defer os.RemoveAll(root) // clean up

handler, err := NewHandler(broker, clientID, caFile, sysFsRoot, configFile.Name())
if err != nil {
t.Fatal(err)
}
defer handler.Close()

// Check the topics mapped
for k, v := range expected.Topics {
if topic, ok := handler.config.Topics[k]; !ok {
t.Errorf("Could not find topic for name %s\n", k)
} else if topic != v {
t.Errorf("Expected topic to be %s, but got %s\n", v, topic)
}
}
}

func TestHandlerConfigUnmarshalIssue(t *testing.T) {
// Config file setup
configFile, err := ioutil.TempFile("", "config")
if err != nil {
t.Fatal(err)
}
defer os.Remove(configFile.Name())

content := []byte("foo")
if _, err := configFile.Write(content); err != nil {
t.Fatal(err)
}
if err := configFile.Close(); err != nil {
t.Fatal(err)
}

// Handler setup
broker := "mqtts://foo"
clientID := "unipitt"
caFile := ""

// Setup a folder structure
folder := "di_1_01"
// Create temporary folder, only if it does not exist already
root, err := ioutil.TempDir("", "unipitt")
if err != nil {
t.Fatal(err)
}
sysFsRoot := filepath.Join(root, folder)
if _, pathErr := os.Stat(sysFsRoot); pathErr != nil {
err := os.Mkdir(sysFsRoot, os.ModePerm)
if err != nil {
t.Fatal(err)
}
}
// Create temporary path
tmpfn := filepath.Join(sysFsRoot, "di_value")
// Create temporary file handle
f, err := os.Create(tmpfn)
if err != nil {
t.Fatal(err)
}
// Put in zero-value
_, err = f.WriteString("0\n")
if err != nil {
t.Fatal(err)
}
defer f.Close()
defer os.RemoveAll(root) // clean up

// Setup log monitoring
var buf bytes.Buffer
log.SetOutput(&buf)
defer func() {
log.SetOutput(os.Stderr)
}()

handler, err := NewHandler(broker, clientID, caFile, sysFsRoot, configFile.Name())
if err != nil {
t.Fatal(err)
}
defer handler.Close()

if !bytes.Contains(buf.Bytes(), []byte("Error reading config file")) {
t.Fatal("Expected an error reading the config file, nothing happened")
}
}

func TestHandlerNoSysFsRoot(t *testing.T) {
broker := "mqtts://foo"
clientID := "unipitt"
caFile := ""
configFile := ""

sysFsRoot := "foo"

handler, err := NewHandler(broker, clientID, caFile, sysFsRoot, configFile)
if err == nil {
t.Fatalf("Expected an error without a valid sys fs folder, got none")
}
defer handler.Close()
}

func TestHandlerCaFileIssue(t *testing.T) {
broker := "mqtts://foo"
clientID := "unipitt"
caFile := "foo"
configFile := ""

// Setup a folder structure
folder := "di_1_01"
// Create temporary folder, only if it does not exist already
root, err := ioutil.TempDir("", "unipitt")
if err != nil {
t.Fatal(err)
}
sysFsRoot := filepath.Join(root, folder)
if _, pathErr := os.Stat(sysFsRoot); pathErr != nil {
err := os.Mkdir(sysFsRoot, os.ModePerm)
if err != nil {
t.Fatal(err)
}
}

handler, err := NewHandler(broker, clientID, caFile, sysFsRoot, configFile)
if err == nil {
t.Fatal("Expected an error with invalid caFile, got none")
}
defer handler.Close()
}

func TestHandlerCaFile(t *testing.T) {
broker := "mqtts://foo"
clientID := "unipitt"
configFile := ""

f, err := ioutil.TempFile("", "ca")
if err != nil {
t.Fatal(err)
}
caFile := f.Name()
defer os.Remove(caFile)

// Setup a folder structure
folder := "di_1_01"
// Create temporary folder, only if it does not exist already
root, err := ioutil.TempDir("", "unipitt")
if err != nil {
t.Fatal(err)
}
sysFsRoot := filepath.Join(root, folder)
if _, pathErr := os.Stat(sysFsRoot); pathErr != nil {
err := os.Mkdir(sysFsRoot, os.ModePerm)
if err != nil {
t.Fatal(err)
}
}

handler, err := NewHandler(broker, clientID, caFile, sysFsRoot, configFile)
if err != nil {
t.Fatal(err)
}
defer handler.Close()
}

0 comments on commit e49b53d

Please sign in to comment.