Files
MQTTLogger/internal/mqtt/mqtt.go
2025-09-29 13:34:34 +03:30

53 lines
2.0 KiB
Go

package mqtt
import (
"MQTTLogger/config"
"MQTTLogger/internal/csv"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"go.uber.org/zap"
)
func NewClient(logger *zap.Logger, config *config.Config, csvHelper *csv.Helper) mqtt.Client {
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
csvHelper.AddEntry(time.Now().UTC().Format("2006-01-02 15:04:05"), msg.Topic(), string(msg.Payload()))
}
opts := mqtt.NewClientOptions().AddBroker(config.URI).SetClientID("mqtt-logger")
opts.SetKeepAlive(2 * time.Second)
opts.SetDefaultPublishHandler(f)
opts.SetPingTimeout(1 * time.Second)
opts.SetConnectionNotificationHandler(func(client mqtt.Client, notification mqtt.ConnectionNotification) {
switch n := notification.(type) {
case mqtt.ConnectionNotificationConnected:
logger.Info("Connected to MQTT broker")
case mqtt.ConnectionNotificationConnecting:
logger.Info("connecting", zap.Bool("isReconnect", n.IsReconnect), zap.Int("attempt", n.Attempt))
case mqtt.ConnectionNotificationFailed:
logger.Error("connection failed", zap.String("error", n.Reason.Error()))
case mqtt.ConnectionNotificationLost:
logger.Error("connection lost", zap.String("error", n.Reason.Error()))
case mqtt.ConnectionNotificationBroker:
logger.Info("broker", zap.String("name", n.Broker.String()))
case mqtt.ConnectionNotificationBrokerFailed:
logger.Error("broker failed", zap.String("broker", n.Broker.String()), zap.String("error", n.Reason.Error()))
}
})
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
logger.Fatal("error connecting to MQTT broker", zap.Error(token.Error()))
}
return c
}
func StartListening(logger *zap.Logger, config *config.Config, client mqtt.Client) {
for _, topic := range config.Topics {
if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
logger.Error("error subscribing to topic", zap.String("topic", topic), zap.Error(token.Error()))
}
}
}