diff --git a/cmd/miflorad/main.go b/cmd/miflorad/main.go index f2ac931..7408649 100644 --- a/cmd/miflorad/main.go +++ b/cmd/miflorad/main.go @@ -44,6 +44,15 @@ type peripheral struct { var allPeripherals []*peripheral +type metric struct { + peripheralId string + metaData common.VersionBatteryResponse + sensorData common.SensorDataResponse + connectTime float64 + readoutTime float64 + rssi int +} + type mqttLogger struct { level string } @@ -89,50 +98,36 @@ func getMQTTOptions() *mqtt.ClientOptions { } } -func readData(peripheral *peripheral, client ble.Client, send chan string) error { +func readData(peripheral *peripheral, client ble.Client) (common.SensorDataResponse, error) { // re-request meta data (for battery level) if last check more than 24 hours ago // Source: https://github.com/open-homeautomation/miflora/blob/ffd95c3e616df8843cc8bff99c9b60765b124092/miflora/miflora_poller.py#L92 if time.Since(peripheral.lastMetaDataFetch) >= 24*time.Hour { metaData, err := impl.RequestVersionBattery(client) if err != nil { - return errors.Wrap(err, "can't request version battery") + return common.SensorDataResponse{}, errors.Wrap(err, "can't request version battery") } peripheral.metaData = metaData peripheral.lastMetaDataFetch = time.Now() } - id := common.MifloraGetAlphaNumericID(peripheral.id) - timestamp := time.Now().Unix() - - send <- fmt.Sprintf("%s.miflora.%s.battery_level %d %d", *prefix, id, peripheral.metaData.BatteryLevel, timestamp) - send <- fmt.Sprintf("%s.miflora.%s.firmware_version %d %d", *prefix, id, peripheral.metaData.NumericFirmwareVersion(), timestamp) - if peripheral.metaData.RequiresModeChangeBeforeRead() { err2 := impl.RequestModeChange(client) if err2 != nil { - return errors.Wrap(err2, "can't request mode change") + return common.SensorDataResponse{}, errors.Wrap(err2, "can't request mode change") } } sensorData, err3 := impl.RequestSensorData(client) if err3 != nil { - return errors.Wrap(err3, "can't request sensor data") + return common.SensorDataResponse{}, errors.Wrap(err3, "can't request sensor data") } - send <- fmt.Sprintf("%s.miflora.%s.temperature %.1f %d", *prefix, id, sensorData.Temperature, timestamp) - send <- fmt.Sprintf("%s.miflora.%s.brightness %d %d", *prefix, id, sensorData.Brightness, timestamp) - send <- fmt.Sprintf("%s.miflora.%s.moisture %d %d", *prefix, id, sensorData.Moisture, timestamp) - send <- fmt.Sprintf("%s.miflora.%s.conductivity %d %d", *prefix, id, sensorData.Conductivity, timestamp) - - return nil + return sensorData, nil } -func connectPeripheral(peripheral *peripheral, send chan string) error { +func connectPeripheral(peripheral *peripheral, send chan metric) error { fmt.Fprintf(os.Stderr, "Scanning for %s...\n", peripheral.id) - id := common.MifloraGetAlphaNumericID(peripheral.id) - timestamp := time.Now().Unix() - // only way to get back the found advertisement, must be buffered! foundAdvertisementChannel := make(chan ble.Advertisement, 1) @@ -153,10 +148,6 @@ func connectPeripheral(peripheral *peripheral, send chan string) error { } timeConnectTook := time.Since(timeConnectStart).Seconds() - send <- fmt.Sprintf("%s.miflora.%s.connect_time %.2f %d", *prefix, id, timeConnectTook, timestamp) - - foundAdvertisement := <-foundAdvertisementChannel - send <- fmt.Sprintf("%s.miflora.%s.rssi %d %d", *prefix, id, foundAdvertisement.RSSI(), timestamp) // Source: https://github.com/go-ble/ble/blob/master/examples/basic/explorer/main.go#L53 // Make sure we had the chance to print out the message. @@ -175,19 +166,31 @@ func connectPeripheral(peripheral *peripheral, send chan string) error { return errors.Wrap(err, "can't descover profile") } - err2 := readData(peripheral, client, send) + sensorData, err2 := readData(peripheral, client) timeReadoutTook := time.Since(timeReadoutStart).Seconds() - send <- fmt.Sprintf("%s.miflora.%s.readout_time %.2f %d", *prefix, id, timeReadoutTook, timestamp) client.CancelConnection() <-done - return err2 + if err2 != nil { + return errors.Wrap(err2, "can't read data") + } + + send <- metric{ + peripheralId: common.MifloraGetAlphaNumericID(peripheral.id), + sensorData: sensorData, + metaData: peripheral.metaData, + connectTime: timeConnectTook, + readoutTime: timeReadoutTook, + rssi: (<-foundAdvertisementChannel).RSSI(), + } + + return nil } -func readPeripheral(peripheral *peripheral, send chan string) error { +func readPeripheral(peripheral *peripheral, send chan metric) error { var err error for retry := 0; retry < *readRetries; retry++ { err = connectPeripheral(peripheral, send) @@ -199,7 +202,7 @@ func readPeripheral(peripheral *peripheral, send chan string) error { return err } -func readAllPeripherals(quit chan struct{}, send chan string) { +func readAllPeripherals(quit chan struct{}, send chan metric) { for _, peripheral := range allPeripherals { // check for quit signal (non-blocking) and terminate select { @@ -211,8 +214,8 @@ func readAllPeripherals(quit chan struct{}, send chan string) { err := readPeripheral(peripheral, send) if err != nil { fmt.Fprintf(os.Stderr, "Failed to read peripheral %s, err: %s\n", peripheral.id, err) - id := common.MifloraGetAlphaNumericID(peripheral.id) - send <- fmt.Sprintf("%s.miflora.%s.failed 1 %d", *prefix, id, time.Now().Unix()) + // id := common.MifloraGetAlphaNumericID(peripheral.id) + // send <- fmt.Sprintf("%s.miflora.%s.failed 1 %d", *prefix, id, time.Now().Unix()) continue } } @@ -255,7 +258,8 @@ func main() { intervalTicker := time.NewTicker(*interval) quit := make(chan struct{}) - send := make(chan string, 10) + send := make(chan metric, 1) + publish := make(chan string, 10) go func() { fmt.Fprintf(os.Stderr, "Starting loop with %s interval...\n", *interval) @@ -278,8 +282,25 @@ func main() { go func() { for metric := range send { - // fmt.Fprintln(os.Stdout, metric) - token := mqttClient.Publish(*brokerTopicPrefix+*prefix, 1, false, metric) + timestamp := time.Now().Unix() + id := metric.peripheralId + + publish <- fmt.Sprintf("%s.miflora.%s.battery_level %d %d", *prefix, id, metric.metaData.BatteryLevel, timestamp) + publish <- fmt.Sprintf("%s.miflora.%s.firmware_version %d %d", *prefix, id, metric.metaData.NumericFirmwareVersion(), timestamp) + publish <- fmt.Sprintf("%s.miflora.%s.temperature %.1f %d", *prefix, id, metric.sensorData.Temperature, timestamp) + publish <- fmt.Sprintf("%s.miflora.%s.brightness %d %d", *prefix, id, metric.sensorData.Brightness, timestamp) + publish <- fmt.Sprintf("%s.miflora.%s.moisture %d %d", *prefix, id, metric.sensorData.Moisture, timestamp) + publish <- fmt.Sprintf("%s.miflora.%s.conductivity %d %d", *prefix, id, metric.sensorData.Conductivity, timestamp) + publish <- fmt.Sprintf("%s.miflora.%s.connect_time %.2f %d", *prefix, id, metric.connectTime, timestamp) + publish <- fmt.Sprintf("%s.miflora.%s.readout_time %.2f %d", *prefix, id, metric.readoutTime, timestamp) + publish <- fmt.Sprintf("%s.miflora.%s.rssi %d %d", *prefix, id, metric.rssi, timestamp) + } + }() + + go func() { + for line := range publish { + // fmt.Fprintln(os.Stdout, line) + token := mqttClient.Publish(*brokerTopicPrefix, 1, false, line) if token.WaitTimeout(1*time.Second) && token.Error() != nil { fmt.Fprintf(os.Stderr, "Failed to publish MQTT, err: %s\n", token.Error()) continue