Compare commits
	
		
			2 Commits
		
	
	
		
			6374562137
			...
			4de23d2cc9
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 4de23d2cc9 | |||
| 349287cf8c | 
							
								
								
									
										3
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							@@ -2,4 +2,5 @@
 | 
				
			|||||||
*.csv
 | 
					*.csv
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# binary file
 | 
					# binary file
 | 
				
			||||||
MQTTLogger 
 | 
					MQTTLogger 
 | 
				
			||||||
 | 
					.vscode/launch.json
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -18,6 +18,7 @@ func main() {
 | 
				
			|||||||
			NewLogger,
 | 
								NewLogger,
 | 
				
			||||||
			config.NewConfig,
 | 
								config.NewConfig,
 | 
				
			||||||
			mqtt.NewClient,
 | 
								mqtt.NewClient,
 | 
				
			||||||
 | 
								mqtt.NewTLSConfig,
 | 
				
			||||||
			csv.NewHelper,
 | 
								csv.NewHelper,
 | 
				
			||||||
		),
 | 
							),
 | 
				
			||||||
		fx.Invoke(
 | 
							fx.Invoke(
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -13,6 +13,7 @@ type Config struct {
 | 
				
			|||||||
	Username string
 | 
						Username string
 | 
				
			||||||
	Password string
 | 
						Password string
 | 
				
			||||||
	Topics   []string
 | 
						Topics   []string
 | 
				
			||||||
 | 
						CACert   string
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewConfig(logger *zap.Logger) *Config {
 | 
					func NewConfig(logger *zap.Logger) *Config {
 | 
				
			||||||
@@ -51,10 +52,16 @@ func NewConfig(logger *zap.Logger) *Config {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						caCert := os.Getenv("MQTT_CACERT")
 | 
				
			||||||
 | 
						if caCert == "" {
 | 
				
			||||||
 | 
							logger.Warn("MQTT_CACERT is not set, not using TLS")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return &Config{
 | 
						return &Config{
 | 
				
			||||||
		URI:      uri,
 | 
							URI:      uri,
 | 
				
			||||||
		Username: username,
 | 
							Username: username,
 | 
				
			||||||
		Password: password,
 | 
							Password: password,
 | 
				
			||||||
		Topics:   topics,
 | 
							Topics:   topics,
 | 
				
			||||||
 | 
							CACert:   caCert,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										11
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								go.mod
									
									
									
									
									
								
							@@ -3,13 +3,16 @@ module MQTTLogger
 | 
				
			|||||||
go 1.25.1
 | 
					go 1.25.1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
require (
 | 
					require (
 | 
				
			||||||
	github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect
 | 
						github.com/eclipse/paho.mqtt.golang v1.5.1
 | 
				
			||||||
 | 
						github.com/joho/godotenv v1.5.1
 | 
				
			||||||
 | 
						go.uber.org/fx v1.24.0
 | 
				
			||||||
 | 
						go.uber.org/zap v1.27.0
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					require (
 | 
				
			||||||
	github.com/gorilla/websocket v1.5.3 // indirect
 | 
						github.com/gorilla/websocket v1.5.3 // indirect
 | 
				
			||||||
	github.com/joho/godotenv v1.5.1 // indirect
 | 
					 | 
				
			||||||
	go.uber.org/dig v1.19.0 // indirect
 | 
						go.uber.org/dig v1.19.0 // indirect
 | 
				
			||||||
	go.uber.org/fx v1.24.0 // indirect
 | 
					 | 
				
			||||||
	go.uber.org/multierr v1.11.0 // indirect
 | 
						go.uber.org/multierr v1.11.0 // indirect
 | 
				
			||||||
	go.uber.org/zap v1.27.0 // indirect
 | 
					 | 
				
			||||||
	golang.org/x/net v0.44.0 // indirect
 | 
						golang.org/x/net v0.44.0 // indirect
 | 
				
			||||||
	golang.org/x/sync v0.17.0 // indirect
 | 
						golang.org/x/sync v0.17.0 // indirect
 | 
				
			||||||
	golang.org/x/sys v0.36.0 // indirect
 | 
						golang.org/x/sys v0.36.0 // indirect
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										14
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										14
									
								
								go.sum
									
									
									
									
									
								
							@@ -1,19 +1,23 @@
 | 
				
			|||||||
 | 
					github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 | 
				
			||||||
 | 
					github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 | 
				
			||||||
github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
 | 
					github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE=
 | 
				
			||||||
github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
 | 
					github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU=
 | 
				
			||||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
 | 
					github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
 | 
				
			||||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 | 
					github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 | 
				
			||||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
 | 
					github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
 | 
				
			||||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
 | 
					github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
 | 
				
			||||||
 | 
					github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 | 
				
			||||||
 | 
					github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 | 
				
			||||||
 | 
					github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
 | 
				
			||||||
 | 
					github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
 | 
				
			||||||
go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4=
 | 
					go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4=
 | 
				
			||||||
go.uber.org/dig v1.19.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE=
 | 
					go.uber.org/dig v1.19.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE=
 | 
				
			||||||
go.uber.org/fx v1.24.0 h1:wE8mruvpg2kiiL1Vqd0CC+tr0/24XIB10Iwp2lLWzkg=
 | 
					go.uber.org/fx v1.24.0 h1:wE8mruvpg2kiiL1Vqd0CC+tr0/24XIB10Iwp2lLWzkg=
 | 
				
			||||||
go.uber.org/fx v1.24.0/go.mod h1:AmDeGyS+ZARGKM4tlH4FY2Jr63VjbEDJHtqXTGP5hbo=
 | 
					go.uber.org/fx v1.24.0/go.mod h1:AmDeGyS+ZARGKM4tlH4FY2Jr63VjbEDJHtqXTGP5hbo=
 | 
				
			||||||
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
 | 
					go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
 | 
				
			||||||
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
 | 
					go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
 | 
				
			||||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
 | 
					go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
 | 
				
			||||||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
 | 
					go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
 | 
				
			||||||
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
 | 
					 | 
				
			||||||
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
 | 
					 | 
				
			||||||
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
 | 
					go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
 | 
				
			||||||
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
 | 
					go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
 | 
				
			||||||
golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
 | 
					golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I=
 | 
				
			||||||
@@ -22,3 +26,5 @@ golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
 | 
				
			|||||||
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
 | 
					golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
 | 
				
			||||||
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
 | 
					golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
 | 
				
			||||||
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
 | 
					golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
 | 
				
			||||||
 | 
					gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 | 
				
			||||||
 | 
					gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,27 +1,37 @@
 | 
				
			|||||||
package mqtt
 | 
					package mqtt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"MQTTLogger/config"
 | 
						"MQTTLogger/config"
 | 
				
			||||||
	"MQTTLogger/internal/csv"
 | 
						"MQTTLogger/internal/csv"
 | 
				
			||||||
 | 
						"crypto/tls"
 | 
				
			||||||
 | 
						"math/rand/v2"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	mqtt "github.com/eclipse/paho.mqtt.golang"
 | 
						mqtt "github.com/eclipse/paho.mqtt.golang"
 | 
				
			||||||
	"go.uber.org/zap"
 | 
						"go.uber.org/zap"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewClient(logger *zap.Logger, config *config.Config, csvHelper *csv.Helper) mqtt.Client {
 | 
					func NewClient(logger *zap.Logger, config *config.Config, csvHelper *csv.Helper, tlsConfig *tls.Config) mqtt.Client {
 | 
				
			||||||
 | 
						
 | 
				
			||||||
	var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 | 
						var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 | 
				
			||||||
		csvHelper.AddEntry(time.Now().UTC().Format("2006-01-02 15:04:05"), msg.Topic(), string(msg.Payload()))
 | 
							csvHelper.AddEntry(time.Now().UTC().Format("2006-01-02 15:04:05"), msg.Topic(), string(msg.Payload()))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	opts := mqtt.NewClientOptions().AddBroker(config.URI).SetClientID("mqtt-logger")
 | 
						randomClientID := "mqtt-logger-" + fmt.Sprint(rand.Int())
 | 
				
			||||||
 | 
						logger.Info("Creating MQTT client", zap.String("clientID", randomClientID))
 | 
				
			||||||
 | 
						opts := mqtt.NewClientOptions().AddBroker(config.URI).SetClientID(randomClientID)
 | 
				
			||||||
 | 
						if tlsConfig != nil {
 | 
				
			||||||
 | 
							opts.SetTLSConfig(tlsConfig)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	opts.SetKeepAlive(2 * time.Second)
 | 
						opts.SetKeepAlive(2 * time.Second)
 | 
				
			||||||
	opts.SetDefaultPublishHandler(f)
 | 
						opts.SetDefaultPublishHandler(f)
 | 
				
			||||||
	opts.SetPingTimeout(1 * time.Second)
 | 
						opts.SetPingTimeout(1 * time.Second)	
 | 
				
			||||||
	opts.SetConnectionNotificationHandler(func(client mqtt.Client, notification mqtt.ConnectionNotification) {
 | 
						opts.SetConnectionNotificationHandler(func(client mqtt.Client, notification mqtt.ConnectionNotification) {
 | 
				
			||||||
		switch n := notification.(type) {
 | 
							switch n := notification.(type) {
 | 
				
			||||||
		case mqtt.ConnectionNotificationConnected:
 | 
							case mqtt.ConnectionNotificationConnected:
 | 
				
			||||||
			logger.Info("Connected to MQTT broker")
 | 
								logger.Info("Connected to MQTT broker")
 | 
				
			||||||
 | 
								// Re-susbcribe to topics on reconnect
 | 
				
			||||||
			StartListening(logger, config, client)
 | 
								StartListening(logger, config, client)
 | 
				
			||||||
		case mqtt.ConnectionNotificationConnecting:
 | 
							case mqtt.ConnectionNotificationConnecting:
 | 
				
			||||||
			logger.Info("connecting", zap.Bool("isReconnect", n.IsReconnect), zap.Int("attempt", n.Attempt))
 | 
								logger.Info("connecting", zap.Bool("isReconnect", n.IsReconnect), zap.Int("attempt", n.Attempt))
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										34
									
								
								internal/mqtt/tls.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										34
									
								
								internal/mqtt/tls.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,34 @@
 | 
				
			|||||||
 | 
					package mqtt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"MQTTLogger/config"
 | 
				
			||||||
 | 
						"crypto/tls"
 | 
				
			||||||
 | 
						"crypto/x509"
 | 
				
			||||||
 | 
						"os"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"go.uber.org/zap"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewTLSConfig(logger *zap.Logger, config *config.Config) *tls.Config {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if config.CACert == "" {
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						certpool := x509.NewCertPool()
 | 
				
			||||||
 | 
						pemCerts, err := os.ReadFile(config.CACert)
 | 
				
			||||||
 | 
						if err == nil {
 | 
				
			||||||
 | 
							certpool.AppendCertsFromPEM(pemCerts)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							logger.Fatal("error loading CA cert", zap.Error(err))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return &tls.Config{
 | 
				
			||||||
 | 
							RootCAs: certpool,
 | 
				
			||||||
 | 
							// We use the provided cert not the one server sends.
 | 
				
			||||||
 | 
							ClientAuth:         tls.NoClientCert,
 | 
				
			||||||
 | 
							ClientCAs:          nil,
 | 
				
			||||||
 | 
							InsecureSkipVerify: true, // I know
 | 
				
			||||||
 | 
							Certificates: nil,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user