mirror of
				https://github.com/cmur2/miflorad.git
				synced 2025-10-31 20:25:10 +01:00 
			
		
		
		
	miflorad: introduce metric struct to separate metrics collection and formatting
This commit is contained in:
		| @@ -44,6 +44,15 @@ type peripheral struct { | |||||||
|  |  | ||||||
| var allPeripherals []*peripheral | var allPeripherals []*peripheral | ||||||
|  |  | ||||||
|  | type metric struct { | ||||||
|  | 	peripheralId string | ||||||
|  | 	metaData     common.VersionBatteryResponse | ||||||
|  | 	sensorData   common.SensorDataResponse | ||||||
|  | 	connectTime  float64 | ||||||
|  | 	readoutTime  float64 | ||||||
|  | 	rssi         int | ||||||
|  | } | ||||||
|  |  | ||||||
| type mqttLogger struct { | type mqttLogger struct { | ||||||
| 	level string | 	level string | ||||||
| } | } | ||||||
| @@ -89,50 +98,36 @@ func getMQTTOptions() *mqtt.ClientOptions { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func readData(peripheral *peripheral, client ble.Client, send chan string) error { | func readData(peripheral *peripheral, client ble.Client) (common.SensorDataResponse, error) { | ||||||
| 	// re-request meta data (for battery level) if last check more than 24 hours ago | 	// re-request meta data (for battery level) if last check more than 24 hours ago | ||||||
| 	// Source: https://github.com/open-homeautomation/miflora/blob/ffd95c3e616df8843cc8bff99c9b60765b124092/miflora/miflora_poller.py#L92 | 	// Source: https://github.com/open-homeautomation/miflora/blob/ffd95c3e616df8843cc8bff99c9b60765b124092/miflora/miflora_poller.py#L92 | ||||||
| 	if time.Since(peripheral.lastMetaDataFetch) >= 24*time.Hour { | 	if time.Since(peripheral.lastMetaDataFetch) >= 24*time.Hour { | ||||||
| 		metaData, err := impl.RequestVersionBattery(client) | 		metaData, err := impl.RequestVersionBattery(client) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return errors.Wrap(err, "can't request version battery") | 			return common.SensorDataResponse{}, errors.Wrap(err, "can't request version battery") | ||||||
| 		} | 		} | ||||||
| 		peripheral.metaData = metaData | 		peripheral.metaData = metaData | ||||||
| 		peripheral.lastMetaDataFetch = time.Now() | 		peripheral.lastMetaDataFetch = time.Now() | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	id := common.MifloraGetAlphaNumericID(peripheral.id) |  | ||||||
| 	timestamp := time.Now().Unix() |  | ||||||
|  |  | ||||||
| 	send <- fmt.Sprintf("%s.miflora.%s.battery_level %d %d", *prefix, id, peripheral.metaData.BatteryLevel, timestamp) |  | ||||||
| 	send <- fmt.Sprintf("%s.miflora.%s.firmware_version %d %d", *prefix, id, peripheral.metaData.NumericFirmwareVersion(), timestamp) |  | ||||||
|  |  | ||||||
| 	if peripheral.metaData.RequiresModeChangeBeforeRead() { | 	if peripheral.metaData.RequiresModeChangeBeforeRead() { | ||||||
| 		err2 := impl.RequestModeChange(client) | 		err2 := impl.RequestModeChange(client) | ||||||
| 		if err2 != nil { | 		if err2 != nil { | ||||||
| 			return errors.Wrap(err2, "can't request mode change") | 			return common.SensorDataResponse{}, errors.Wrap(err2, "can't request mode change") | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	sensorData, err3 := impl.RequestSensorData(client) | 	sensorData, err3 := impl.RequestSensorData(client) | ||||||
| 	if err3 != nil { | 	if err3 != nil { | ||||||
| 		return errors.Wrap(err3, "can't request sensor data") | 		return common.SensorDataResponse{}, errors.Wrap(err3, "can't request sensor data") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	send <- fmt.Sprintf("%s.miflora.%s.temperature %.1f %d", *prefix, id, sensorData.Temperature, timestamp) | 	return sensorData, nil | ||||||
| 	send <- fmt.Sprintf("%s.miflora.%s.brightness %d %d", *prefix, id, sensorData.Brightness, timestamp) |  | ||||||
| 	send <- fmt.Sprintf("%s.miflora.%s.moisture %d %d", *prefix, id, sensorData.Moisture, timestamp) |  | ||||||
| 	send <- fmt.Sprintf("%s.miflora.%s.conductivity %d %d", *prefix, id, sensorData.Conductivity, timestamp) |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func connectPeripheral(peripheral *peripheral, send chan string) error { | func connectPeripheral(peripheral *peripheral, send chan metric) error { | ||||||
| 	fmt.Fprintf(os.Stderr, "Scanning for %s...\n", peripheral.id) | 	fmt.Fprintf(os.Stderr, "Scanning for %s...\n", peripheral.id) | ||||||
|  |  | ||||||
| 	id := common.MifloraGetAlphaNumericID(peripheral.id) |  | ||||||
| 	timestamp := time.Now().Unix() |  | ||||||
|  |  | ||||||
| 	// only way to get back the found advertisement, must be buffered! | 	// only way to get back the found advertisement, must be buffered! | ||||||
| 	foundAdvertisementChannel := make(chan ble.Advertisement, 1) | 	foundAdvertisementChannel := make(chan ble.Advertisement, 1) | ||||||
|  |  | ||||||
| @@ -153,10 +148,6 @@ func connectPeripheral(peripheral *peripheral, send chan string) error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	timeConnectTook := time.Since(timeConnectStart).Seconds() | 	timeConnectTook := time.Since(timeConnectStart).Seconds() | ||||||
| 	send <- fmt.Sprintf("%s.miflora.%s.connect_time %.2f %d", *prefix, id, timeConnectTook, timestamp) |  | ||||||
|  |  | ||||||
| 	foundAdvertisement := <-foundAdvertisementChannel |  | ||||||
| 	send <- fmt.Sprintf("%s.miflora.%s.rssi %d %d", *prefix, id, foundAdvertisement.RSSI(), timestamp) |  | ||||||
|  |  | ||||||
| 	// Source: https://github.com/go-ble/ble/blob/master/examples/basic/explorer/main.go#L53 | 	// Source: https://github.com/go-ble/ble/blob/master/examples/basic/explorer/main.go#L53 | ||||||
| 	// Make sure we had the chance to print out the message. | 	// Make sure we had the chance to print out the message. | ||||||
| @@ -175,19 +166,31 @@ func connectPeripheral(peripheral *peripheral, send chan string) error { | |||||||
| 		return errors.Wrap(err, "can't descover profile") | 		return errors.Wrap(err, "can't descover profile") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	err2 := readData(peripheral, client, send) | 	sensorData, err2 := readData(peripheral, client) | ||||||
|  |  | ||||||
| 	timeReadoutTook := time.Since(timeReadoutStart).Seconds() | 	timeReadoutTook := time.Since(timeReadoutStart).Seconds() | ||||||
| 	send <- fmt.Sprintf("%s.miflora.%s.readout_time %.2f %d", *prefix, id, timeReadoutTook, timestamp) |  | ||||||
|  |  | ||||||
| 	client.CancelConnection() | 	client.CancelConnection() | ||||||
|  |  | ||||||
| 	<-done | 	<-done | ||||||
|  |  | ||||||
| 	return err2 | 	if err2 != nil { | ||||||
|  | 		return errors.Wrap(err2, "can't read data") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| func readPeripheral(peripheral *peripheral, send chan string) error { | 	send <- metric{ | ||||||
|  | 		peripheralId: common.MifloraGetAlphaNumericID(peripheral.id), | ||||||
|  | 		sensorData:   sensorData, | ||||||
|  | 		metaData:     peripheral.metaData, | ||||||
|  | 		connectTime:  timeConnectTook, | ||||||
|  | 		readoutTime:  timeReadoutTook, | ||||||
|  | 		rssi:         (<-foundAdvertisementChannel).RSSI(), | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func readPeripheral(peripheral *peripheral, send chan metric) error { | ||||||
| 	var err error | 	var err error | ||||||
| 	for retry := 0; retry < *readRetries; retry++ { | 	for retry := 0; retry < *readRetries; retry++ { | ||||||
| 		err = connectPeripheral(peripheral, send) | 		err = connectPeripheral(peripheral, send) | ||||||
| @@ -199,7 +202,7 @@ func readPeripheral(peripheral *peripheral, send chan string) error { | |||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
|  |  | ||||||
| func readAllPeripherals(quit chan struct{}, send chan string) { | func readAllPeripherals(quit chan struct{}, send chan metric) { | ||||||
| 	for _, peripheral := range allPeripherals { | 	for _, peripheral := range allPeripherals { | ||||||
| 		// check for quit signal (non-blocking) and terminate | 		// check for quit signal (non-blocking) and terminate | ||||||
| 		select { | 		select { | ||||||
| @@ -211,8 +214,8 @@ func readAllPeripherals(quit chan struct{}, send chan string) { | |||||||
| 		err := readPeripheral(peripheral, send) | 		err := readPeripheral(peripheral, send) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			fmt.Fprintf(os.Stderr, "Failed to read peripheral %s, err: %s\n", peripheral.id, err) | 			fmt.Fprintf(os.Stderr, "Failed to read peripheral %s, err: %s\n", peripheral.id, err) | ||||||
| 			id := common.MifloraGetAlphaNumericID(peripheral.id) | 			// id := common.MifloraGetAlphaNumericID(peripheral.id) | ||||||
| 			send <- fmt.Sprintf("%s.miflora.%s.failed 1 %d", *prefix, id, time.Now().Unix()) | 			// send <- fmt.Sprintf("%s.miflora.%s.failed 1 %d", *prefix, id, time.Now().Unix()) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @@ -255,7 +258,8 @@ func main() { | |||||||
|  |  | ||||||
| 	intervalTicker := time.NewTicker(*interval) | 	intervalTicker := time.NewTicker(*interval) | ||||||
| 	quit := make(chan struct{}) | 	quit := make(chan struct{}) | ||||||
| 	send := make(chan string, 10) | 	send := make(chan metric, 1) | ||||||
|  | 	publish := make(chan string, 10) | ||||||
|  |  | ||||||
| 	go func() { | 	go func() { | ||||||
| 		fmt.Fprintf(os.Stderr, "Starting loop with %s interval...\n", *interval) | 		fmt.Fprintf(os.Stderr, "Starting loop with %s interval...\n", *interval) | ||||||
| @@ -278,8 +282,25 @@ func main() { | |||||||
|  |  | ||||||
| 	go func() { | 	go func() { | ||||||
| 		for metric := range send { | 		for metric := range send { | ||||||
| 			// fmt.Fprintln(os.Stdout, metric) | 			timestamp := time.Now().Unix() | ||||||
| 			token := mqttClient.Publish(*brokerTopicPrefix+*prefix, 1, false, metric) | 			id := metric.peripheralId | ||||||
|  |  | ||||||
|  | 			publish <- fmt.Sprintf("%s.miflora.%s.battery_level %d %d", *prefix, id, metric.metaData.BatteryLevel, timestamp) | ||||||
|  | 			publish <- fmt.Sprintf("%s.miflora.%s.firmware_version %d %d", *prefix, id, metric.metaData.NumericFirmwareVersion(), timestamp) | ||||||
|  | 			publish <- fmt.Sprintf("%s.miflora.%s.temperature %.1f %d", *prefix, id, metric.sensorData.Temperature, timestamp) | ||||||
|  | 			publish <- fmt.Sprintf("%s.miflora.%s.brightness %d %d", *prefix, id, metric.sensorData.Brightness, timestamp) | ||||||
|  | 			publish <- fmt.Sprintf("%s.miflora.%s.moisture %d %d", *prefix, id, metric.sensorData.Moisture, timestamp) | ||||||
|  | 			publish <- fmt.Sprintf("%s.miflora.%s.conductivity %d %d", *prefix, id, metric.sensorData.Conductivity, timestamp) | ||||||
|  | 			publish <- fmt.Sprintf("%s.miflora.%s.connect_time %.2f %d", *prefix, id, metric.connectTime, timestamp) | ||||||
|  | 			publish <- fmt.Sprintf("%s.miflora.%s.readout_time %.2f %d", *prefix, id, metric.readoutTime, timestamp) | ||||||
|  | 			publish <- fmt.Sprintf("%s.miflora.%s.rssi %d %d", *prefix, id, metric.rssi, timestamp) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	go func() { | ||||||
|  | 		for line := range publish { | ||||||
|  | 			// fmt.Fprintln(os.Stdout, line) | ||||||
|  | 			token := mqttClient.Publish(*brokerTopicPrefix, 1, false, line) | ||||||
| 			if token.WaitTimeout(1*time.Second) && token.Error() != nil { | 			if token.WaitTimeout(1*time.Second) && token.Error() != nil { | ||||||
| 				fmt.Fprintf(os.Stderr, "Failed to publish MQTT, err: %s\n", token.Error()) | 				fmt.Fprintf(os.Stderr, "Failed to publish MQTT, err: %s\n", token.Error()) | ||||||
| 				continue | 				continue | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user