diff --git a/Makefile b/Makefile index 679c4ac..bbfc93a 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,7 @@ build: cmd/miflorad/miflorad cmd/munin-miflora/munin-miflora cmd/munin-miflora-g .PHONY: test test: build + cd cmd/miflorad && go test -v -race && cd ../.. cd common && go test -v -race && cd .. .PHONY: remote-run diff --git a/cmd/miflorad/formatters.go b/cmd/miflorad/formatters.go new file mode 100644 index 0000000..5951af0 --- /dev/null +++ b/cmd/miflorad/formatters.go @@ -0,0 +1,48 @@ +package main + +import ( + "fmt" + "strings" + "time" +) + +func publishGraphite(metric mifloraMetric, publish chan string, metricsBase string) { + timestamp := time.Now().Unix() + prefix := fmt.Sprintf("%s.miflora.%s", metricsBase, metric.getPeripheralId()) + + switch metric := metric.(type) { + case mifloraDataMetric: + publish <- fmt.Sprintf("%s.battery_level %d %d", prefix, metric.metaData.BatteryLevel, timestamp) + publish <- fmt.Sprintf("%s.firmware_version %d %d", prefix, metric.metaData.NumericFirmwareVersion(), timestamp) + publish <- fmt.Sprintf("%s.temperature %.1f %d", prefix, metric.sensorData.Temperature, timestamp) + publish <- fmt.Sprintf("%s.brightness %d %d", prefix, metric.sensorData.Brightness, timestamp) + publish <- fmt.Sprintf("%s.moisture %d %d", prefix, metric.sensorData.Moisture, timestamp) + publish <- fmt.Sprintf("%s.conductivity %d %d", prefix, metric.sensorData.Conductivity, timestamp) + publish <- fmt.Sprintf("%s.connect_time %.2f %d", prefix, metric.connectTime, timestamp) + publish <- fmt.Sprintf("%s.readout_time %.2f %d", prefix, metric.readoutTime, timestamp) + publish <- fmt.Sprintf("%s.rssi %d %d", prefix, metric.rssi, timestamp) + case mifloraErrorMetric: + publish <- fmt.Sprintf("%s.failed %d %d", prefix, metric.failed, timestamp) + } +} + +func publishInflux(metric mifloraMetric, publish chan string) { + var b strings.Builder + b.WriteString(fmt.Sprintf("miflora,id=%s ", metric.getPeripheralId())) + switch metric := metric.(type) { + case mifloraDataMetric: + b.WriteString(fmt.Sprintf("battery_level=%d,", metric.metaData.BatteryLevel)) + b.WriteString(fmt.Sprintf("firmware_version=%d,", metric.metaData.NumericFirmwareVersion())) + b.WriteString(fmt.Sprintf("temperature=%.1f,", metric.sensorData.Temperature)) + b.WriteString(fmt.Sprintf("brightness=%d,", metric.sensorData.Brightness)) + b.WriteString(fmt.Sprintf("moisture=%d,", metric.sensorData.Moisture)) + b.WriteString(fmt.Sprintf("conductivity=%d,", metric.sensorData.Conductivity)) + b.WriteString(fmt.Sprintf("connect_time=%.2f,", metric.connectTime)) + b.WriteString(fmt.Sprintf("readout_time=%.2f,", metric.readoutTime)) + b.WriteString(fmt.Sprintf("rssi=%d", metric.rssi)) + case mifloraErrorMetric: + b.WriteString(fmt.Sprintf("failed=%d", metric.failed)) + } + b.WriteString(fmt.Sprintf(" %d", time.Now().UnixNano())) + publish <- b.String() +} diff --git a/cmd/miflorad/formatters_test.go b/cmd/miflorad/formatters_test.go new file mode 100644 index 0000000..61f772a --- /dev/null +++ b/cmd/miflorad/formatters_test.go @@ -0,0 +1,98 @@ +package main + +import ( + _ "fmt" + "strconv" + "strings" + "testing" + + common "miflorad/common" + + "github.com/stretchr/testify/assert" +) + +func TestPublishGraphite(t *testing.T) { + tables := []struct { + metric mifloraMetric + }{ + {mifloraErrorMetric{peripheralId: "peri", failed: 1}}, + {mifloraDataMetric{ + peripheralId: "peri", + metaData: common.VersionBatteryResponse{BatteryLevel: 100, FirmwareVersion: "2.7.0"}, + sensorData: common.SensorDataResponse{Temperature: 24.2, Brightness: 121, Moisture: 16, Conductivity: 101}, + connectTime: 3.42, + readoutTime: 0.23, + rssi: -77, + }}, + } + + for _, table := range tables { + publish := make(chan string, 100) + publishGraphite(table.metric, publish, "foo.base") + close(publish) + switch table.metric.(type) { + case mifloraErrorMetric: + for line := range publish { + parts := strings.Split(line, " ") + assert.Equal(t, len(parts), 3) + assert.Equal(t, parts[0], "foo.base.miflora.peri.failed") + assert.Equal(t, parts[1], "1") + timestamp, err := strconv.ParseInt(parts[2], 10, 64) + assert.NoError(t, err) + assert.True(t, timestamp >= 0) + } + case mifloraDataMetric: + for line := range publish { + parts := strings.Split(line, " ") + assert.Equal(t, len(parts), 3) + assert.Equal(t, strings.Index(parts[0], "foo.base.miflora.peri"), 0) + assert.True(t, len(parts[1]) > 0) + timestamp, err := strconv.ParseInt(parts[2], 10, 64) + assert.NoError(t, err) + assert.True(t, timestamp >= 0) + } + } + } +} + +func TestPublishInflux(t *testing.T) { + tables := []struct { + metric mifloraMetric + }{ + {mifloraErrorMetric{peripheralId: "peri", failed: 1}}, + {mifloraDataMetric{ + peripheralId: "peri", + metaData: common.VersionBatteryResponse{BatteryLevel: 100, FirmwareVersion: "2.7.0"}, + sensorData: common.SensorDataResponse{Temperature: 24.2, Brightness: 121, Moisture: 16, Conductivity: 101}, + connectTime: 3.42, + readoutTime: 0.23, + rssi: -77, + }}, + } + + for _, table := range tables { + publish := make(chan string, 100) + publishInflux(table.metric, publish) + close(publish) + switch table.metric.(type) { + case mifloraErrorMetric: + line := <-publish + parts := strings.Split(line, " ") + assert.Equal(t, len(parts), 3) + assert.Equal(t, parts[0], "miflora,id=peri") + assert.Equal(t, parts[1], "failed=1") + timestamp, err := strconv.ParseInt(parts[2], 10, 64) + assert.NoError(t, err) + assert.True(t, timestamp >= 0) + case mifloraDataMetric: + line := <-publish + parts := strings.Split(line, " ") + assert.Equal(t, len(parts), 3) + assert.Equal(t, parts[0], "miflora,id=peri") + assert.True(t, len(parts[1]) > 0) + timestamp, err := strconv.ParseInt(parts[2], 10, 64) + assert.NoError(t, err) + assert.True(t, timestamp >= 0) + } + } +} diff --git a/cmd/miflorad/main.go b/cmd/miflorad/main.go index 7408649..4d0f1c8 100644 --- a/cmd/miflorad/main.go +++ b/cmd/miflorad/main.go @@ -28,12 +28,20 @@ var ( scanTimeout = flag.Duration("scantimeout", 10*time.Second, "timeout after that a scan per peripheral will be aborted") readRetries = flag.Int("readretries", 2, "number of times reading will be attempted per peripheral") interval = flag.Duration("interval", 25*time.Second, "metrics collection interval") - prefix = flag.String("prefix", "", "metrics name prefix") brokerHost = flag.String("brokerhost", "localhost", "MQTT broker host to send metrics to") brokerUser = flag.String("brokeruser", "", "MQTT broker user used for authentication") brokerPassword = flag.String("brokerpassword", "", "MQTT broker password used for authentication") brokerUseTLS = flag.Bool("brokerusetls", true, "whether TLS should be used for MQTT broker") brokerTopicPrefix = flag.String("brokertopicprefix", "", "MQTT topic prefix for messages") + publishFormatFlag = flag.String("publishformat", "graphite", "MQTT message content format") + graphitePrefix = flag.String("graphiteprefix", "", "Graphite metrics name prefix") +) + +type publishFormat int + +const ( + graphiteFormat publishFormat = iota + influxFormat publishFormat = iota ) type peripheral struct { @@ -44,7 +52,11 @@ type peripheral struct { var allPeripherals []*peripheral -type metric struct { +type mifloraMetric interface { + getPeripheralId() string +} + +type mifloraDataMetric struct { peripheralId string metaData common.VersionBatteryResponse sensorData common.SensorDataResponse @@ -53,6 +65,19 @@ type metric struct { rssi int } +func (m mifloraDataMetric) getPeripheralId() string { + return m.peripheralId +} + +type mifloraErrorMetric struct { + peripheralId string + failed int +} + +func (m mifloraErrorMetric) getPeripheralId() string { + return m.peripheralId +} + type mqttLogger struct { level string } @@ -125,7 +150,7 @@ func readData(peripheral *peripheral, client ble.Client) (common.SensorDataRespo return sensorData, nil } -func connectPeripheral(peripheral *peripheral, send chan metric) error { +func connectPeripheral(peripheral *peripheral, send chan mifloraMetric) error { fmt.Fprintf(os.Stderr, "Scanning for %s...\n", peripheral.id) // only way to get back the found advertisement, must be buffered! @@ -178,7 +203,7 @@ func connectPeripheral(peripheral *peripheral, send chan metric) error { return errors.Wrap(err2, "can't read data") } - send <- metric{ + send <- mifloraDataMetric{ peripheralId: common.MifloraGetAlphaNumericID(peripheral.id), sensorData: sensorData, metaData: peripheral.metaData, @@ -190,7 +215,7 @@ func connectPeripheral(peripheral *peripheral, send chan metric) error { return nil } -func readPeripheral(peripheral *peripheral, send chan metric) error { +func readPeripheral(peripheral *peripheral, send chan mifloraMetric) error { var err error for retry := 0; retry < *readRetries; retry++ { err = connectPeripheral(peripheral, send) @@ -202,7 +227,7 @@ func readPeripheral(peripheral *peripheral, send chan metric) error { return err } -func readAllPeripherals(quit chan struct{}, send chan metric) { +func readAllPeripherals(quit chan struct{}, send chan mifloraMetric) { for _, peripheral := range allPeripherals { // check for quit signal (non-blocking) and terminate select { @@ -234,6 +259,17 @@ func main() { os.Exit(1) } + var format publishFormat + switch *publishFormatFlag { + case "graphite": + format = graphiteFormat + case "influx": + format = influxFormat + default: + fmt.Fprintf(os.Stderr, "Unrecognized publish format %s! Exiting...\n", *publishFormatFlag) + os.Exit(1) + } + fmt.Fprintf(os.Stderr, "miflorad version %s\n", getVersion()) mqtt.WARN = mqttLogger{level: "warning"} @@ -258,7 +294,7 @@ func main() { intervalTicker := time.NewTicker(*interval) quit := make(chan struct{}) - send := make(chan metric, 1) + send := make(chan mifloraMetric, 1) publish := make(chan string, 10) go func() { @@ -281,19 +317,15 @@ func main() { }() go func() { - for metric := range send { - timestamp := time.Now().Unix() - id := metric.peripheralId - - publish <- fmt.Sprintf("%s.miflora.%s.battery_level %d %d", *prefix, id, metric.metaData.BatteryLevel, timestamp) - publish <- fmt.Sprintf("%s.miflora.%s.firmware_version %d %d", *prefix, id, metric.metaData.NumericFirmwareVersion(), timestamp) - publish <- fmt.Sprintf("%s.miflora.%s.temperature %.1f %d", *prefix, id, metric.sensorData.Temperature, timestamp) - publish <- fmt.Sprintf("%s.miflora.%s.brightness %d %d", *prefix, id, metric.sensorData.Brightness, timestamp) - publish <- fmt.Sprintf("%s.miflora.%s.moisture %d %d", *prefix, id, metric.sensorData.Moisture, timestamp) - publish <- fmt.Sprintf("%s.miflora.%s.conductivity %d %d", *prefix, id, metric.sensorData.Conductivity, timestamp) - publish <- fmt.Sprintf("%s.miflora.%s.connect_time %.2f %d", *prefix, id, metric.connectTime, timestamp) - publish <- fmt.Sprintf("%s.miflora.%s.readout_time %.2f %d", *prefix, id, metric.readoutTime, timestamp) - publish <- fmt.Sprintf("%s.miflora.%s.rssi %d %d", *prefix, id, metric.rssi, timestamp) + switch format { + case graphiteFormat: + for metric := range send { + publishGraphite(metric, publish, *graphitePrefix) + } + case influxFormat: + for metric := range send { + publishInflux(metric, publish) + } } }()