commit 922695c23ed201f7d95660016268ba023cc04ce2 Author: Mohammad Mahdi Date: Mon Sep 29 13:34:34 2025 +0330 Base project diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2eea525 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/cmd/MQTTLogger/main.go b/cmd/MQTTLogger/main.go new file mode 100644 index 0000000..56b0d5c --- /dev/null +++ b/cmd/MQTTLogger/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "MQTTLogger/config" + "MQTTLogger/internal/csv" + "MQTTLogger/internal/mqtt" + + "go.uber.org/fx" + "go.uber.org/zap" +) + +func main() { + fx.New( + fx.Provide( + NewLogger, + config.NewConfig, + mqtt.NewClient, + csv.NewHelper, + ), + fx.Invoke( + csv.InitializeHeader, + mqtt.SetupMQTTLogging, + mqtt.StartListening, + ), + fx.Logger(NewFXLogger()), + ).Run() +} + +func NewLogger() (*zap.Logger, error) { + return zap.NewDevelopment() +} + +// Simple logger for fx +type FXLogger struct { + logger *zap.Logger +} + +func NewFXLogger() *FXLogger { + logger, _ := zap.NewDevelopment() + return &FXLogger{logger: logger} +} + +func (l *FXLogger) Printf(str string, args ...any) { + l.logger.Sugar().Infof(str, args...) +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..9732d5f --- /dev/null +++ b/config/config.go @@ -0,0 +1,50 @@ +package config + +import ( + "os" + + "github.com/joho/godotenv" + "go.uber.org/zap" +) + +type Config struct { + URI string + Username string + Password string + Topics []string +} + +func NewConfig(logger *zap.Logger) *Config { + err := godotenv.Load() + if err != nil { + logger.Warn("Error loading .env file, using environment variables") + } + + uri := os.Getenv("MQTT_URI") + if uri == "" { + logger.Fatal("MQTT_URI is not set") + } + + username := os.Getenv("MQTT_USERNAME") + if username == "" { + logger.Warn("MQTT_USERNAME is not set") + } + + password := os.Getenv("MQTT_PASSWORD") + if password == "" { + logger.Warn("MQTT_PASSWORD is not set") + } + + topics := os.Getenv("MQTT_TOPICS") + if topics == "" { + logger.Warn("MQTT_TOPICS is not set, defaulting to #") + topics = "#" + } + + return &Config{ + URI: uri, + Username: username, + Password: password, + Topics: []string{topics}, + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..01a7714 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module MQTTLogger + +go 1.25.1 + +require ( + github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/joho/godotenv v1.5.1 // indirect + go.uber.org/dig v1.19.0 // indirect + go.uber.org/fx v1.24.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/sys v0.36.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c7cbd50 --- /dev/null +++ b/go.sum @@ -0,0 +1,24 @@ +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4= +go.uber.org/dig v1.19.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= +go.uber.org/fx v1.24.0 h1:wE8mruvpg2kiiL1Vqd0CC+tr0/24XIB10Iwp2lLWzkg= +go.uber.org/fx v1.24.0/go.mod h1:AmDeGyS+ZARGKM4tlH4FY2Jr63VjbEDJHtqXTGP5hbo= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= diff --git a/internal/csv/helper.go b/internal/csv/helper.go new file mode 100644 index 0000000..2036b47 --- /dev/null +++ b/internal/csv/helper.go @@ -0,0 +1,54 @@ +package csv + +import ( + "encoding/csv" + "os" + + "go.uber.org/fx" + "go.uber.org/zap" +) + +type Helper struct { + fx.In + csvFile *os.File + csvWriter *csv.Writer + logger *zap.Logger +} + +func NewHelper(lc fx.Lifecycle, logger *zap.Logger) *Helper { + csvFile, err := os.OpenFile("mqtt-logs.csv", os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + logger.Fatal("error opening csv file", zap.Error(err)) + } + + csvWriter := csv.NewWriter(csvFile) + + return &Helper{ + csvFile: csvFile, + csvWriter: csvWriter, + logger: logger, + } +} + +func (h *Helper) AddEntry(date string, topic string, message string) error { + record := []string{date, topic, message} + if err := h.csvWriter.Write(record); err != nil { + h.logger.Error("error writing to csv file", zap.Error(err)) + return err + } + + h.csvWriter.Flush() + + return nil +} + +func (h *Helper) Close() error { + h.csvWriter.Flush() + + if err := h.csvFile.Close(); err != nil { + h.logger.Error("error closing csv file", zap.Error(err)) + return err + } + + return nil +} diff --git a/internal/csv/init.go b/internal/csv/init.go new file mode 100644 index 0000000..4f4df96 --- /dev/null +++ b/internal/csv/init.go @@ -0,0 +1,33 @@ +package csv + +import ( + "encoding/csv" + "io" + "os" + + "go.uber.org/zap" +) + +func InitializeHeader(logger *zap.Logger) { + csvFile, err := os.OpenFile("mqtt-logs.csv", os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + logger.Fatal("error opening csv file", zap.Error(err)) + } + defer csvFile.Close() + + r := csv.NewReader(csvFile) + w := csv.NewWriter(csvFile) + record, err := r.Read() + + if err == io.EOF { + w.Write([]string{"Date (UTC)", "Topic", "Message"}) + w.Flush() + } else if err != nil { + logger.Fatal("error initializing csv file", zap.Error(err)) + } else if record[0] == "Date (UTC)" && record[1] == "Topic" && record[2] == "Message" { + return + } else { + logger.Fatal("First row of csv file is not as expected. Please make sure the file is in the correct format.") + } + +} diff --git a/internal/mqtt/logger.go b/internal/mqtt/logger.go new file mode 100644 index 0000000..93277b6 --- /dev/null +++ b/internal/mqtt/logger.go @@ -0,0 +1,24 @@ +package mqtt + +import ( + "log" + "strings" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "go.uber.org/zap" +) + +type ZapWriter struct { + sugar *zap.SugaredLogger +} + +func (z *ZapWriter) Write(p []byte) (n int, err error) { + z.sugar.Debug(strings.TrimSpace(string(p))) + return len(p), nil +} + +func SetupMQTTLogging(logger *zap.Logger) { + sugar := logger.Sugar() + mqtt.DEBUG = log.New(&ZapWriter{sugar: sugar}, "[DEBUG] ", 0) + mqtt.ERROR = log.New(&ZapWriter{sugar: sugar}, "[ERROR] ", 0) +} \ No newline at end of file diff --git a/internal/mqtt/mqtt.go b/internal/mqtt/mqtt.go new file mode 100644 index 0000000..eb3994e --- /dev/null +++ b/internal/mqtt/mqtt.go @@ -0,0 +1,52 @@ +package mqtt + +import ( + "MQTTLogger/config" + "MQTTLogger/internal/csv" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "go.uber.org/zap" +) + +func NewClient(logger *zap.Logger, config *config.Config, csvHelper *csv.Helper) mqtt.Client { + var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { + csvHelper.AddEntry(time.Now().UTC().Format("2006-01-02 15:04:05"), msg.Topic(), string(msg.Payload())) + } + + opts := mqtt.NewClientOptions().AddBroker(config.URI).SetClientID("mqtt-logger") + opts.SetKeepAlive(2 * time.Second) + opts.SetDefaultPublishHandler(f) + opts.SetPingTimeout(1 * time.Second) + opts.SetConnectionNotificationHandler(func(client mqtt.Client, notification mqtt.ConnectionNotification) { + switch n := notification.(type) { + case mqtt.ConnectionNotificationConnected: + logger.Info("Connected to MQTT broker") + case mqtt.ConnectionNotificationConnecting: + logger.Info("connecting", zap.Bool("isReconnect", n.IsReconnect), zap.Int("attempt", n.Attempt)) + case mqtt.ConnectionNotificationFailed: + logger.Error("connection failed", zap.String("error", n.Reason.Error())) + case mqtt.ConnectionNotificationLost: + logger.Error("connection lost", zap.String("error", n.Reason.Error())) + case mqtt.ConnectionNotificationBroker: + logger.Info("broker", zap.String("name", n.Broker.String())) + case mqtt.ConnectionNotificationBrokerFailed: + logger.Error("broker failed", zap.String("broker", n.Broker.String()), zap.String("error", n.Reason.Error())) + } + }) + + c := mqtt.NewClient(opts) + if token := c.Connect(); token.Wait() && token.Error() != nil { + logger.Fatal("error connecting to MQTT broker", zap.Error(token.Error())) + } + + return c +} + +func StartListening(logger *zap.Logger, config *config.Config, client mqtt.Client) { + for _, topic := range config.Topics { + if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil { + logger.Error("error subscribing to topic", zap.String("topic", topic), zap.Error(token.Error())) + } + } +}