62 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			62 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package mqtt
 | 
						|
 | 
						|
import (
 | 
						|
	"MQTTLogger/config"
 | 
						|
	"MQTTLogger/internal/csv"
 | 
						|
	"time"
 | 
						|
 | 
						|
	mqtt "github.com/eclipse/paho.mqtt.golang"
 | 
						|
	"go.uber.org/zap"
 | 
						|
)
 | 
						|
 | 
						|
func NewClient(logger *zap.Logger, config *config.Config, csvHelper *csv.Helper) mqtt.Client {
 | 
						|
	var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 | 
						|
		csvHelper.AddEntry(time.Now().UTC().Format("2006-01-02 15:04:05"), msg.Topic(), string(msg.Payload()))
 | 
						|
	}
 | 
						|
 | 
						|
	opts := mqtt.NewClientOptions().AddBroker(config.URI).SetClientID("mqtt-logger")
 | 
						|
	opts.SetKeepAlive(2 * time.Second)
 | 
						|
	opts.SetDefaultPublishHandler(f)
 | 
						|
	opts.SetPingTimeout(1 * time.Second)
 | 
						|
	// TODO: Check this option and see if it solves the re-subscribe on reconnect issue
 | 
						|
	// opts.SetCleanSession(false)
 | 
						|
	opts.SetConnectionNotificationHandler(func(client mqtt.Client, notification mqtt.ConnectionNotification) {
 | 
						|
		switch n := notification.(type) {
 | 
						|
		case mqtt.ConnectionNotificationConnected:
 | 
						|
			logger.Info("Connected to MQTT broker")
 | 
						|
			StartListening(logger, config, client)
 | 
						|
		case mqtt.ConnectionNotificationConnecting:
 | 
						|
			logger.Info("connecting", zap.Bool("isReconnect", n.IsReconnect), zap.Int("attempt", n.Attempt))
 | 
						|
		case mqtt.ConnectionNotificationFailed:
 | 
						|
			logger.Error("connection failed", zap.String("error", n.Reason.Error()))
 | 
						|
		case mqtt.ConnectionNotificationLost:
 | 
						|
			logger.Error("connection lost", zap.String("error", n.Reason.Error()))
 | 
						|
		case mqtt.ConnectionNotificationBroker:
 | 
						|
			logger.Info("broker", zap.String("name", n.Broker.String()))
 | 
						|
		case mqtt.ConnectionNotificationBrokerFailed:
 | 
						|
			logger.Error("broker failed", zap.String("broker", n.Broker.String()), zap.String("error", n.Reason.Error()))
 | 
						|
		}
 | 
						|
	})
 | 
						|
 | 
						|
	if config.Username != "" && config.Password != "" {
 | 
						|
		opts.SetUsername(config.Username)
 | 
						|
		opts.SetPassword(config.Password)
 | 
						|
	}
 | 
						|
 | 
						|
	c := mqtt.NewClient(opts)
 | 
						|
	if token := c.Connect(); token.Wait() && token.Error() != nil {
 | 
						|
		logger.Fatal("error connecting to MQTT broker", zap.Error(token.Error()))
 | 
						|
	}
 | 
						|
 | 
						|
	return c
 | 
						|
}
 | 
						|
 | 
						|
func StartListening(logger *zap.Logger, config *config.Config, client mqtt.Client) {
 | 
						|
	logger.Info("Subscribing to", zap.Int("topics", len(config.Topics)))
 | 
						|
	for _, topic := range config.Topics {
 | 
						|
		if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
 | 
						|
			logger.Error("error subscribing to topic", zap.String("topic", topic), zap.Error(token.Error()))
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |