miflorad: add support for influx MQTT message format in addition to graphite

This commit is contained in:
cn 2019-01-02 12:57:26 +01:00
parent 0cd41a924d
commit 72dfaa8ed2
4 changed files with 199 additions and 20 deletions

View File

@ -24,6 +24,7 @@ build: cmd/miflorad/miflorad cmd/munin-miflora/munin-miflora cmd/munin-miflora-g
.PHONY: test
test: build
cd cmd/miflorad && go test -v -race && cd ../..
cd common && go test -v -race && cd ..
.PHONY: remote-run

View File

@ -0,0 +1,48 @@
package main
import (
"fmt"
"strings"
"time"
)
func publishGraphite(metric mifloraMetric, publish chan string, metricsBase string) {
timestamp := time.Now().Unix()
prefix := fmt.Sprintf("%s.miflora.%s", metricsBase, metric.getPeripheralId())
switch metric := metric.(type) {
case mifloraDataMetric:
publish <- fmt.Sprintf("%s.battery_level %d %d", prefix, metric.metaData.BatteryLevel, timestamp)
publish <- fmt.Sprintf("%s.firmware_version %d %d", prefix, metric.metaData.NumericFirmwareVersion(), timestamp)
publish <- fmt.Sprintf("%s.temperature %.1f %d", prefix, metric.sensorData.Temperature, timestamp)
publish <- fmt.Sprintf("%s.brightness %d %d", prefix, metric.sensorData.Brightness, timestamp)
publish <- fmt.Sprintf("%s.moisture %d %d", prefix, metric.sensorData.Moisture, timestamp)
publish <- fmt.Sprintf("%s.conductivity %d %d", prefix, metric.sensorData.Conductivity, timestamp)
publish <- fmt.Sprintf("%s.connect_time %.2f %d", prefix, metric.connectTime, timestamp)
publish <- fmt.Sprintf("%s.readout_time %.2f %d", prefix, metric.readoutTime, timestamp)
publish <- fmt.Sprintf("%s.rssi %d %d", prefix, metric.rssi, timestamp)
case mifloraErrorMetric:
publish <- fmt.Sprintf("%s.failed %d %d", prefix, metric.failed, timestamp)
}
}
func publishInflux(metric mifloraMetric, publish chan string) {
var b strings.Builder
b.WriteString(fmt.Sprintf("miflora,id=%s ", metric.getPeripheralId()))
switch metric := metric.(type) {
case mifloraDataMetric:
b.WriteString(fmt.Sprintf("battery_level=%d,", metric.metaData.BatteryLevel))
b.WriteString(fmt.Sprintf("firmware_version=%d,", metric.metaData.NumericFirmwareVersion()))
b.WriteString(fmt.Sprintf("temperature=%.1f,", metric.sensorData.Temperature))
b.WriteString(fmt.Sprintf("brightness=%d,", metric.sensorData.Brightness))
b.WriteString(fmt.Sprintf("moisture=%d,", metric.sensorData.Moisture))
b.WriteString(fmt.Sprintf("conductivity=%d,", metric.sensorData.Conductivity))
b.WriteString(fmt.Sprintf("connect_time=%.2f,", metric.connectTime))
b.WriteString(fmt.Sprintf("readout_time=%.2f,", metric.readoutTime))
b.WriteString(fmt.Sprintf("rssi=%d", metric.rssi))
case mifloraErrorMetric:
b.WriteString(fmt.Sprintf("failed=%d", metric.failed))
}
b.WriteString(fmt.Sprintf(" %d", time.Now().UnixNano()))
publish <- b.String()
}

View File

@ -0,0 +1,98 @@
package main
import (
_ "fmt"
"strconv"
"strings"
"testing"
common "miflorad/common"
"github.com/stretchr/testify/assert"
)
func TestPublishGraphite(t *testing.T) {
tables := []struct {
metric mifloraMetric
}{
{mifloraErrorMetric{peripheralId: "peri", failed: 1}},
{mifloraDataMetric{
peripheralId: "peri",
metaData: common.VersionBatteryResponse{BatteryLevel: 100, FirmwareVersion: "2.7.0"},
sensorData: common.SensorDataResponse{Temperature: 24.2, Brightness: 121, Moisture: 16, Conductivity: 101},
connectTime: 3.42,
readoutTime: 0.23,
rssi: -77,
}},
}
for _, table := range tables {
publish := make(chan string, 100)
publishGraphite(table.metric, publish, "foo.base")
close(publish)
switch table.metric.(type) {
case mifloraErrorMetric:
for line := range publish {
parts := strings.Split(line, " ")
assert.Equal(t, len(parts), 3)
assert.Equal(t, parts[0], "foo.base.miflora.peri.failed")
assert.Equal(t, parts[1], "1")
timestamp, err := strconv.ParseInt(parts[2], 10, 64)
assert.NoError(t, err)
assert.True(t, timestamp >= 0)
}
case mifloraDataMetric:
for line := range publish {
parts := strings.Split(line, " ")
assert.Equal(t, len(parts), 3)
assert.Equal(t, strings.Index(parts[0], "foo.base.miflora.peri"), 0)
assert.True(t, len(parts[1]) > 0)
timestamp, err := strconv.ParseInt(parts[2], 10, 64)
assert.NoError(t, err)
assert.True(t, timestamp >= 0)
}
}
}
}
func TestPublishInflux(t *testing.T) {
tables := []struct {
metric mifloraMetric
}{
{mifloraErrorMetric{peripheralId: "peri", failed: 1}},
{mifloraDataMetric{
peripheralId: "peri",
metaData: common.VersionBatteryResponse{BatteryLevel: 100, FirmwareVersion: "2.7.0"},
sensorData: common.SensorDataResponse{Temperature: 24.2, Brightness: 121, Moisture: 16, Conductivity: 101},
connectTime: 3.42,
readoutTime: 0.23,
rssi: -77,
}},
}
for _, table := range tables {
publish := make(chan string, 100)
publishInflux(table.metric, publish)
close(publish)
switch table.metric.(type) {
case mifloraErrorMetric:
line := <-publish
parts := strings.Split(line, " ")
assert.Equal(t, len(parts), 3)
assert.Equal(t, parts[0], "miflora,id=peri")
assert.Equal(t, parts[1], "failed=1")
timestamp, err := strconv.ParseInt(parts[2], 10, 64)
assert.NoError(t, err)
assert.True(t, timestamp >= 0)
case mifloraDataMetric:
line := <-publish
parts := strings.Split(line, " ")
assert.Equal(t, len(parts), 3)
assert.Equal(t, parts[0], "miflora,id=peri")
assert.True(t, len(parts[1]) > 0)
timestamp, err := strconv.ParseInt(parts[2], 10, 64)
assert.NoError(t, err)
assert.True(t, timestamp >= 0)
}
}
}

View File

@ -28,12 +28,20 @@ var (
scanTimeout = flag.Duration("scantimeout", 10*time.Second, "timeout after that a scan per peripheral will be aborted")
readRetries = flag.Int("readretries", 2, "number of times reading will be attempted per peripheral")
interval = flag.Duration("interval", 25*time.Second, "metrics collection interval")
prefix = flag.String("prefix", "", "metrics name prefix")
brokerHost = flag.String("brokerhost", "localhost", "MQTT broker host to send metrics to")
brokerUser = flag.String("brokeruser", "", "MQTT broker user used for authentication")
brokerPassword = flag.String("brokerpassword", "", "MQTT broker password used for authentication")
brokerUseTLS = flag.Bool("brokerusetls", true, "whether TLS should be used for MQTT broker")
brokerTopicPrefix = flag.String("brokertopicprefix", "", "MQTT topic prefix for messages")
publishFormatFlag = flag.String("publishformat", "graphite", "MQTT message content format")
graphitePrefix = flag.String("graphiteprefix", "", "Graphite metrics name prefix")
)
type publishFormat int
const (
graphiteFormat publishFormat = iota
influxFormat publishFormat = iota
)
type peripheral struct {
@ -44,7 +52,11 @@ type peripheral struct {
var allPeripherals []*peripheral
type metric struct {
type mifloraMetric interface {
getPeripheralId() string
}
type mifloraDataMetric struct {
peripheralId string
metaData common.VersionBatteryResponse
sensorData common.SensorDataResponse
@ -53,6 +65,19 @@ type metric struct {
rssi int
}
func (m mifloraDataMetric) getPeripheralId() string {
return m.peripheralId
}
type mifloraErrorMetric struct {
peripheralId string
failed int
}
func (m mifloraErrorMetric) getPeripheralId() string {
return m.peripheralId
}
type mqttLogger struct {
level string
}
@ -125,7 +150,7 @@ func readData(peripheral *peripheral, client ble.Client) (common.SensorDataRespo
return sensorData, nil
}
func connectPeripheral(peripheral *peripheral, send chan metric) error {
func connectPeripheral(peripheral *peripheral, send chan mifloraMetric) error {
fmt.Fprintf(os.Stderr, "Scanning for %s...\n", peripheral.id)
// only way to get back the found advertisement, must be buffered!
@ -178,7 +203,7 @@ func connectPeripheral(peripheral *peripheral, send chan metric) error {
return errors.Wrap(err2, "can't read data")
}
send <- metric{
send <- mifloraDataMetric{
peripheralId: common.MifloraGetAlphaNumericID(peripheral.id),
sensorData: sensorData,
metaData: peripheral.metaData,
@ -190,7 +215,7 @@ func connectPeripheral(peripheral *peripheral, send chan metric) error {
return nil
}
func readPeripheral(peripheral *peripheral, send chan metric) error {
func readPeripheral(peripheral *peripheral, send chan mifloraMetric) error {
var err error
for retry := 0; retry < *readRetries; retry++ {
err = connectPeripheral(peripheral, send)
@ -202,7 +227,7 @@ func readPeripheral(peripheral *peripheral, send chan metric) error {
return err
}
func readAllPeripherals(quit chan struct{}, send chan metric) {
func readAllPeripherals(quit chan struct{}, send chan mifloraMetric) {
for _, peripheral := range allPeripherals {
// check for quit signal (non-blocking) and terminate
select {
@ -234,6 +259,17 @@ func main() {
os.Exit(1)
}
var format publishFormat
switch *publishFormatFlag {
case "graphite":
format = graphiteFormat
case "influx":
format = influxFormat
default:
fmt.Fprintf(os.Stderr, "Unrecognized publish format %s! Exiting...\n", *publishFormatFlag)
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "miflorad version %s\n", getVersion())
mqtt.WARN = mqttLogger{level: "warning"}
@ -258,7 +294,7 @@ func main() {
intervalTicker := time.NewTicker(*interval)
quit := make(chan struct{})
send := make(chan metric, 1)
send := make(chan mifloraMetric, 1)
publish := make(chan string, 10)
go func() {
@ -281,19 +317,15 @@ func main() {
}()
go func() {
for metric := range send {
timestamp := time.Now().Unix()
id := metric.peripheralId
publish <- fmt.Sprintf("%s.miflora.%s.battery_level %d %d", *prefix, id, metric.metaData.BatteryLevel, timestamp)
publish <- fmt.Sprintf("%s.miflora.%s.firmware_version %d %d", *prefix, id, metric.metaData.NumericFirmwareVersion(), timestamp)
publish <- fmt.Sprintf("%s.miflora.%s.temperature %.1f %d", *prefix, id, metric.sensorData.Temperature, timestamp)
publish <- fmt.Sprintf("%s.miflora.%s.brightness %d %d", *prefix, id, metric.sensorData.Brightness, timestamp)
publish <- fmt.Sprintf("%s.miflora.%s.moisture %d %d", *prefix, id, metric.sensorData.Moisture, timestamp)
publish <- fmt.Sprintf("%s.miflora.%s.conductivity %d %d", *prefix, id, metric.sensorData.Conductivity, timestamp)
publish <- fmt.Sprintf("%s.miflora.%s.connect_time %.2f %d", *prefix, id, metric.connectTime, timestamp)
publish <- fmt.Sprintf("%s.miflora.%s.readout_time %.2f %d", *prefix, id, metric.readoutTime, timestamp)
publish <- fmt.Sprintf("%s.miflora.%s.rssi %d %d", *prefix, id, metric.rssi, timestamp)
switch format {
case graphiteFormat:
for metric := range send {
publishGraphite(metric, publish, *graphitePrefix)
}
case influxFormat:
for metric := range send {
publishInflux(metric, publish)
}
}
}()