From 327712a9c339480745d028428f6b3a199e2e0424 Mon Sep 17 00:00:00 2001 From: cn Date: Sun, 23 Dec 2018 23:16:16 +0100 Subject: [PATCH] miflorad: add MQTT sending via paho --- cmd/miflorad/main.go | 121 ++++++++++++++++++++++++++++++++----------- go.mod | 2 + go.sum | 4 ++ 3 files changed, 98 insertions(+), 29 deletions(-) diff --git a/cmd/miflorad/main.go b/cmd/miflorad/main.go index 826229a..f2ac931 100644 --- a/cmd/miflorad/main.go +++ b/cmd/miflorad/main.go @@ -13,19 +13,27 @@ import ( common "miflorad/common" impl "miflorad/common/ble" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/go-ble/ble" "github.com/go-ble/ble/examples/lib/dev" - "github.com/pkg/errors" ) +const mqttConnectTimeout = 10 * time.Second + // program version, will be populated on build var version string var ( - scanTimeout = flag.Duration("scantimeout", 10*time.Second, "timeout after that a scan per peripheral will be aborted") - interval = flag.Duration("interval", 25*time.Second, "metrics collection interval") - readRetries = flag.Int("readretries", 2, "number of times reading will be attempted per peripheral") + scanTimeout = flag.Duration("scantimeout", 10*time.Second, "timeout after that a scan per peripheral will be aborted") + readRetries = flag.Int("readretries", 2, "number of times reading will be attempted per peripheral") + interval = flag.Duration("interval", 25*time.Second, "metrics collection interval") + prefix = flag.String("prefix", "", "metrics name prefix") + brokerHost = flag.String("brokerhost", "localhost", "MQTT broker host to send metrics to") + brokerUser = flag.String("brokeruser", "", "MQTT broker user used for authentication") + brokerPassword = flag.String("brokerpassword", "", "MQTT broker password used for authentication") + brokerUseTLS = flag.Bool("brokerusetls", true, "whether TLS should be used for MQTT broker") + brokerTopicPrefix = flag.String("brokertopicprefix", "", "MQTT topic prefix for messages") ) type peripheral struct { @@ -36,10 +44,17 @@ type peripheral struct { var allPeripherals []*peripheral -var ( - countSuccess = 0 - countFailure = 0 -) +type mqttLogger struct { + level string +} + +func (logger mqttLogger) Println(a ...interface{}) { + fmt.Fprintln(os.Stderr, fmt.Sprintf("mqtt %s:", logger.level), a) +} + +func (logger mqttLogger) Printf(format string, a ...interface{}) { + fmt.Fprintf(os.Stderr, "mqtt %s: "+format, logger.level, a) +} func checkTooShortInterval() error { numPeripherals := int64(len(flag.Args())) @@ -60,7 +75,21 @@ func getVersion() string { } } -func readData(peripheral *peripheral, client ble.Client) error { +func getMQTTOptions() *mqtt.ClientOptions { + if *brokerUseTLS { + return mqtt.NewClientOptions(). + AddBroker(fmt.Sprintf("ssl://%s:8883", *brokerHost)). + SetUsername(*brokerUser). + SetPassword(*brokerPassword) + } else { + return mqtt.NewClientOptions(). + AddBroker(fmt.Sprintf("tcp://%s:1883", *brokerHost)). + SetUsername(*brokerUser). + SetPassword(*brokerPassword) + } +} + +func readData(peripheral *peripheral, client ble.Client, send chan string) error { // re-request meta data (for battery level) if last check more than 24 hours ago // Source: https://github.com/open-homeautomation/miflora/blob/ffd95c3e616df8843cc8bff99c9b60765b124092/miflora/miflora_poller.py#L92 if time.Since(peripheral.lastMetaDataFetch) >= 24*time.Hour { @@ -72,6 +101,12 @@ func readData(peripheral *peripheral, client ble.Client) error { peripheral.lastMetaDataFetch = time.Now() } + id := common.MifloraGetAlphaNumericID(peripheral.id) + timestamp := time.Now().Unix() + + send <- fmt.Sprintf("%s.miflora.%s.battery_level %d %d", *prefix, id, peripheral.metaData.BatteryLevel, timestamp) + send <- fmt.Sprintf("%s.miflora.%s.firmware_version %d %d", *prefix, id, peripheral.metaData.NumericFirmwareVersion(), timestamp) + if peripheral.metaData.RequiresModeChangeBeforeRead() { err2 := impl.RequestModeChange(client) if err2 != nil { @@ -84,14 +119,20 @@ func readData(peripheral *peripheral, client ble.Client) error { return errors.Wrap(err3, "can't request sensor data") } - fmt.Println(sensorData.Temperature, sensorData.Brightness) + send <- fmt.Sprintf("%s.miflora.%s.temperature %.1f %d", *prefix, id, sensorData.Temperature, timestamp) + send <- fmt.Sprintf("%s.miflora.%s.brightness %d %d", *prefix, id, sensorData.Brightness, timestamp) + send <- fmt.Sprintf("%s.miflora.%s.moisture %d %d", *prefix, id, sensorData.Moisture, timestamp) + send <- fmt.Sprintf("%s.miflora.%s.conductivity %d %d", *prefix, id, sensorData.Conductivity, timestamp) return nil } -func connectPeripheral(peripheral *peripheral) error { +func connectPeripheral(peripheral *peripheral, send chan string) error { fmt.Fprintf(os.Stderr, "Scanning for %s...\n", peripheral.id) + id := common.MifloraGetAlphaNumericID(peripheral.id) + timestamp := time.Now().Unix() + // only way to get back the found advertisement, must be buffered! foundAdvertisementChannel := make(chan ble.Advertisement, 1) @@ -112,11 +153,10 @@ func connectPeripheral(peripheral *peripheral) error { } timeConnectTook := time.Since(timeConnectStart).Seconds() - fmt.Println(timeConnectTook) - // fmt.Fprintf(os.Stdout, "%s.miflora.%s.connect_time %.2f %d\n", prefix, id, timeConnectTook, time.Now().Unix()) + send <- fmt.Sprintf("%s.miflora.%s.connect_time %.2f %d", *prefix, id, timeConnectTook, timestamp) - // foundAdvertisement := <-foundAdvertisementChannel - // fmt.Fprintf(os.Stdout, "%s.miflora.%s.rssi %d %d\n", prefix, id, foundAdvertisement.RSSI(), time.Now().Unix()) + foundAdvertisement := <-foundAdvertisementChannel + send <- fmt.Sprintf("%s.miflora.%s.rssi %d %d", *prefix, id, foundAdvertisement.RSSI(), timestamp) // Source: https://github.com/go-ble/ble/blob/master/examples/basic/explorer/main.go#L53 // Make sure we had the chance to print out the message. @@ -135,11 +175,10 @@ func connectPeripheral(peripheral *peripheral) error { return errors.Wrap(err, "can't descover profile") } - err2 := readData(peripheral, client) + err2 := readData(peripheral, client, send) timeReadoutTook := time.Since(timeReadoutStart).Seconds() - fmt.Println(timeReadoutTook) - // fmt.Fprintf(os.Stdout, "%s.miflora.%s.readout_time %.2f %d\n", prefix, id, timeReadoutTook, time.Now().Unix()) + send <- fmt.Sprintf("%s.miflora.%s.readout_time %.2f %d", *prefix, id, timeReadoutTook, timestamp) client.CancelConnection() @@ -148,10 +187,10 @@ func connectPeripheral(peripheral *peripheral) error { return err2 } -func readPeripheral(peripheral *peripheral) error { +func readPeripheral(peripheral *peripheral, send chan string) error { var err error for retry := 0; retry < *readRetries; retry++ { - err = connectPeripheral(peripheral) + err = connectPeripheral(peripheral, send) // stop retrying once we have a success, last err will be returned (or nil) if err == nil { break @@ -160,7 +199,7 @@ func readPeripheral(peripheral *peripheral) error { return err } -func readAllPeripherals(quit chan struct{}) { +func readAllPeripherals(quit chan struct{}, send chan string) { for _, peripheral := range allPeripherals { // check for quit signal (non-blocking) and terminate select { @@ -169,13 +208,13 @@ func readAllPeripherals(quit chan struct{}) { default: } - err := readPeripheral(peripheral) + err := readPeripheral(peripheral, send) if err != nil { - countFailure++ fmt.Fprintf(os.Stderr, "Failed to read peripheral %s, err: %s\n", peripheral.id, err) + id := common.MifloraGetAlphaNumericID(peripheral.id) + send <- fmt.Sprintf("%s.miflora.%s.failed 1 %d", *prefix, id, time.Now().Unix()) continue } - countSuccess++ } } @@ -194,6 +233,19 @@ func main() { fmt.Fprintf(os.Stderr, "miflorad version %s\n", getVersion()) + mqtt.WARN = mqttLogger{level: "warning"} + mqtt.ERROR = mqttLogger{level: "error"} + mqtt.CRITICAL = mqttLogger{level: "critical"} + + mqttClient := mqtt.NewClient(getMQTTOptions()) + + if token := mqttClient.Connect(); token.WaitTimeout(mqttConnectTimeout) && token.Error() != nil { + fmt.Fprintf(os.Stderr, "Failed to connect MQTT, err: %s\n", token.Error()) + os.Exit(1) + } + + fmt.Fprintf(os.Stderr, "Connected to MQTT broker %s\n", *brokerHost) + device, err := dev.NewDevice("default") if err != nil { fmt.Fprintf(os.Stderr, "Failed to open device, err: %s\n", err) @@ -203,9 +255,10 @@ func main() { intervalTicker := time.NewTicker(*interval) quit := make(chan struct{}) + send := make(chan string, 10) go func() { - fmt.Fprintf(os.Stderr, "Starting miflorad loop with %s interval...\n", *interval) + fmt.Fprintf(os.Stderr, "Starting loop with %s interval...\n", *interval) // populate all peripherals data structure allPeripherals = make([]*peripheral, len(flag.Args())) @@ -217,9 +270,20 @@ func main() { } // main loop - readAllPeripherals(quit) + readAllPeripherals(quit, send) for range intervalTicker.C { - readAllPeripherals(quit) + readAllPeripherals(quit, send) + } + }() + + go func() { + for metric := range send { + // fmt.Fprintln(os.Stdout, metric) + token := mqttClient.Publish(*brokerTopicPrefix+*prefix, 1, false, metric) + if token.WaitTimeout(1*time.Second) && token.Error() != nil { + fmt.Fprintf(os.Stderr, "Failed to publish MQTT, err: %s\n", token.Error()) + continue + } } }() @@ -233,8 +297,7 @@ func main() { // wait for last readPeripheral to finish (worst case) time.Sleep(*scanTimeout * time.Duration(*readRetries)) - fmt.Fprintf(os.Stderr, "Failures: %d\n", countFailure) - fmt.Fprintf(os.Stderr, "Successes: %d\n", countSuccess) + mqttClient.Disconnect(1000) if err := device.Stop(); err != nil { fmt.Fprintf(os.Stderr, "Failed to close device, err: %s\n", err) diff --git a/go.mod b/go.mod index 610aede..db62261 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module miflorad require ( github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 // indirect github.com/currantlabs/gatt v0.0.0-20161006170101-f949eac78f4e + github.com/eclipse/paho.mqtt.golang v1.1.1 github.com/fatih/structs v1.1.0 // indirect github.com/go-ble/ble v0.0.0-20181002102605-e78417b510a3 github.com/godbus/dbus v0.0.0-20181031085051-66d97ae // indirect @@ -15,4 +16,5 @@ require ( github.com/pkg/errors v0.8.0 github.com/sirupsen/logrus v1.2.0 // indirect github.com/stretchr/testify v1.2.2 + golang.org/x/net v0.0.0-20181220203305-927f97764cc3 // indirect ) diff --git a/go.sum b/go.sum index 5d8a681..6c27284 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/currantlabs/gatt v0.0.0-20161006170101-f949eac78f4e h1:qu1wqkuctiqRtg github.com/currantlabs/gatt v0.0.0-20161006170101-f949eac78f4e/go.mod h1:GCdlaU9vOYeye8wQtSZNyZ4j5PhmnJ2HUqhRZO0KoZI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.1.1 h1:iPJYXJLaViCshRTW/PSqImSS6HJ2Rf671WR0bXZ2GIU= +github.com/eclipse/paho.mqtt.golang v1.1.1/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/go-ble/ble v0.0.0-20181002102605-e78417b510a3 h1:rsLGztXl2QJvj4x/PAWzC1Zx6tnTDKlosaXAZfaXM8M= @@ -36,5 +38,7 @@ github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3 h1:eH6Eip3UpmR+yM/qI9Ijluzb1bNv/cAU/n+6l8tRSis= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=