58 lines
2.1 KiB
Go
58 lines
2.1 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"MQTTLogger/config"
|
|
"MQTTLogger/internal/csv"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func NewClient(logger *zap.Logger, config *config.Config, csvHelper *csv.Helper) mqtt.Client {
|
|
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
|
|
csvHelper.AddEntry(time.Now().UTC().Format("2006-01-02 15:04:05"), msg.Topic(), string(msg.Payload()))
|
|
}
|
|
|
|
opts := mqtt.NewClientOptions().AddBroker(config.URI).SetClientID("mqtt-logger")
|
|
opts.SetKeepAlive(2 * time.Second)
|
|
opts.SetDefaultPublishHandler(f)
|
|
opts.SetPingTimeout(1 * time.Second)
|
|
opts.SetConnectionNotificationHandler(func(client mqtt.Client, notification mqtt.ConnectionNotification) {
|
|
switch n := notification.(type) {
|
|
case mqtt.ConnectionNotificationConnected:
|
|
logger.Info("Connected to MQTT broker")
|
|
case mqtt.ConnectionNotificationConnecting:
|
|
logger.Info("connecting", zap.Bool("isReconnect", n.IsReconnect), zap.Int("attempt", n.Attempt))
|
|
case mqtt.ConnectionNotificationFailed:
|
|
logger.Error("connection failed", zap.String("error", n.Reason.Error()))
|
|
case mqtt.ConnectionNotificationLost:
|
|
logger.Error("connection lost", zap.String("error", n.Reason.Error()))
|
|
case mqtt.ConnectionNotificationBroker:
|
|
logger.Info("broker", zap.String("name", n.Broker.String()))
|
|
case mqtt.ConnectionNotificationBrokerFailed:
|
|
logger.Error("broker failed", zap.String("broker", n.Broker.String()), zap.String("error", n.Reason.Error()))
|
|
}
|
|
})
|
|
|
|
if config.Username != "" && config.Password != "" {
|
|
opts.SetUsername(config.Username)
|
|
opts.SetPassword(config.Password)
|
|
}
|
|
|
|
c := mqtt.NewClient(opts)
|
|
if token := c.Connect(); token.Wait() && token.Error() != nil {
|
|
logger.Fatal("error connecting to MQTT broker", zap.Error(token.Error()))
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
func StartListening(logger *zap.Logger, config *config.Config, client mqtt.Client) {
|
|
for _, topic := range config.Topics {
|
|
if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
|
|
logger.Error("error subscribing to topic", zap.String("topic", topic), zap.Error(token.Error()))
|
|
}
|
|
}
|
|
}
|