-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinflux_writer.go
145 lines (121 loc) · 3.42 KB
/
influx_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"bytes"
"context"
"fmt"
"net/http"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
type InfluxWriter struct {
Host string
Database string
Measurment string
Organization string
Token string
ForceUseId bool `json:",omitempty"`
UseInflux1 bool
Tags []Tag `json:",omitempty"`
}
type Tag struct {
Name string
Value string
}
func getTagMap(tags []Tag) (tagMap map[string]string) {
tagMap = map[string]string{}
for _, tag := range tags {
tagMap[tag.Name] = tag.Value
}
return
}
func (ifw *InfluxWriter) Send(slaves []*OwSlave) error {
if ifw.UseInflux1 {
return ifw.SendWithInflux1(slaves)
}
client := influxdb2.NewClient(ifw.Host, ifw.Token)
writeAPI := client.WriteAPIBlocking(ifw.Organization, ifw.Database)
defer client.Close()
var slavePoint, thermoPoint *write.Point
var err error
for _, slave := range slaves {
tags := getTagMap(append(ifw.Tags, ifw.getIdTag(slave)))
slavePoint = influxdb2.NewPoint(ifw.Measurment,
tags,
map[string]interface{}{"temperature": slave.Value},
time.Now())
if slave.Thermostat != nil {
thermoPoint = influxdb2.NewPoint(ifw.Measurment,
tags,
map[string]interface{}{
"setpoint": slave.Thermostat.Setpoint,
"real-sp": slave.Thermostat.GetSetpoint(),
"state": slave.Thermostat.CheckIfOn(),
"heatup": slave.Thermostat.CheckIfHeatUp(),
},
time.Now())
err = writeAPI.WritePoint(context.Background(), thermoPoint)
if err != nil {
return err
}
}
err = writeAPI.WritePoint(context.Background(), slavePoint)
if err != nil {
return err
}
}
return nil
}
func (ifw *InfluxWriter) SendWithInflux1(slaves []*OwSlave) error {
var query string
for _, slave := range slaves {
query += ifw.GetLine(slave)
if slave.Thermostat != nil {
query += ifw.GetThermoLines(slave.Thermostat)
}
}
req, err := http.NewRequest("POST", ifw.Host+"?db="+ifw.Database, bytes.NewBufferString(query))
if err != nil {
return fmt.Errorf("InfluxWriter Send: preparing request error:\n%w", err)
}
client := &http.Client{}
resp, err2 := client.Do(req)
if err2 != nil {
return fmt.Errorf("InfluxWriter Send: client.Do error:\n%w", err2)
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return fmt.Errorf("InfluxWriter Send: received non-success response code: %d", resp.StatusCode)
}
return nil
}
func (ifw *InfluxWriter) getIdTag(slave *OwSlave) (idTag Tag) {
idTag = Tag{Name: "id"}
if len(slave.Name) > 0 {
idTag.Value = slave.Name
} else {
idTag.Value = fmt.Sprintf("%012x", slave.Id)
}
return
}
func (ifw *InfluxWriter) GetLine(slave *OwSlave) (line string) {
line = ifw.Measurment
tags := append(ifw.Tags, ifw.getIdTag(slave))
for _, tag := range tags {
line += fmt.Sprintf(",%s=%s", tag.Name, tag.Value)
}
line += fmt.Sprintf(" temperature=%f\n", slave.Value)
return
}
func (ifw *InfluxWriter) GetThermoLines(thermo *Thermo) (line string) {
baseline := ifw.Measurment
tags := append(ifw.Tags, ifw.getIdTag(thermo.Sensor))
for _, tag := range tags {
baseline += fmt.Sprintf(",%s=%s", tag.Name, tag.Value)
}
line = baseline + fmt.Sprintf(" setpoint=%f\n", thermo.Setpoint)
line += baseline + fmt.Sprintf(" real-sp=%f\n", thermo.GetSetpoint())
line += baseline + fmt.Sprintf(" state=%v\n", thermo.CheckIfOn())
line += baseline + fmt.Sprintf(" heatup=%v\n", thermo.CheckIfHeatUp())
return
}