diff --git a/.gitignore b/.gitignore index e601138..2357c3b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ *.csv # binary file -MQTTLogger \ No newline at end of file +MQTTLogger +.vscode/launch.json diff --git a/cmd/MQTTLogger/main.go b/cmd/MQTTLogger/main.go index 0851b3a..040c81d 100644 --- a/cmd/MQTTLogger/main.go +++ b/cmd/MQTTLogger/main.go @@ -18,6 +18,7 @@ func main() { NewLogger, config.NewConfig, mqtt.NewClient, + mqtt.NewTLSConfig, csv.NewHelper, ), fx.Invoke( diff --git a/config/config.go b/config/config.go index e02a4a5..6b3cd98 100644 --- a/config/config.go +++ b/config/config.go @@ -13,6 +13,7 @@ type Config struct { Username string Password string Topics []string + CACert string } func NewConfig(logger *zap.Logger) *Config { @@ -51,10 +52,16 @@ func NewConfig(logger *zap.Logger) *Config { } } + caCert := os.Getenv("MQTT_CACERT") + if caCert == "" { + logger.Warn("MQTT_CACERT is not set, not using TLS") + } + return &Config{ URI: uri, Username: username, Password: password, Topics: topics, + CACert: caCert, } } diff --git a/go.mod b/go.mod index 01a7714..267f444 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,16 @@ module MQTTLogger go 1.25.1 require ( - github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect + github.com/eclipse/paho.mqtt.golang v1.5.1 + github.com/joho/godotenv v1.5.1 + go.uber.org/fx v1.24.0 + go.uber.org/zap v1.27.0 +) + +require ( github.com/gorilla/websocket v1.5.3 // indirect - github.com/joho/godotenv v1.5.1 // indirect go.uber.org/dig v1.19.0 // indirect - go.uber.org/fx v1.24.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.44.0 // indirect golang.org/x/sync v0.17.0 // indirect golang.org/x/sys v0.36.0 // indirect diff --git a/go.sum b/go.sum index c7cbd50..057fab0 100644 --- a/go.sum +++ b/go.sum @@ -1,19 +1,23 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= go.uber.org/dig v1.19.0 h1:BACLhebsYdpQ7IROQ1AGPjrXcP5dF80U3gKoFzbaq/4= go.uber.org/dig v1.19.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= go.uber.org/fx v1.24.0 h1:wE8mruvpg2kiiL1Vqd0CC+tr0/24XIB10Iwp2lLWzkg= go.uber.org/fx v1.24.0/go.mod h1:AmDeGyS+ZARGKM4tlH4FY2Jr63VjbEDJHtqXTGP5hbo= -go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= -go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= -go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= @@ -22,3 +26,5 @@ golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/mqtt/mqtt.go b/internal/mqtt/mqtt.go index 35b6fda..5fe4934 100644 --- a/internal/mqtt/mqtt.go +++ b/internal/mqtt/mqtt.go @@ -1,29 +1,37 @@ package mqtt import ( + "fmt" "MQTTLogger/config" "MQTTLogger/internal/csv" + "crypto/tls" + "math/rand/v2" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" ) -func NewClient(logger *zap.Logger, config *config.Config, csvHelper *csv.Helper) mqtt.Client { +func NewClient(logger *zap.Logger, config *config.Config, csvHelper *csv.Helper, tlsConfig *tls.Config) mqtt.Client { + var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { csvHelper.AddEntry(time.Now().UTC().Format("2006-01-02 15:04:05"), msg.Topic(), string(msg.Payload())) } - opts := mqtt.NewClientOptions().AddBroker(config.URI).SetClientID("mqtt-logger") + randomClientID := "mqtt-logger-" + fmt.Sprint(rand.Int()) + logger.Info("Creating MQTT client", zap.String("clientID", randomClientID)) + opts := mqtt.NewClientOptions().AddBroker(config.URI).SetClientID(randomClientID) + if tlsConfig != nil { + opts.SetTLSConfig(tlsConfig) + } opts.SetKeepAlive(2 * time.Second) opts.SetDefaultPublishHandler(f) - opts.SetPingTimeout(1 * time.Second) - // TODO: Check this option and see if it solves the re-subscribe on reconnect issue - // opts.SetCleanSession(false) + opts.SetPingTimeout(1 * time.Second) opts.SetConnectionNotificationHandler(func(client mqtt.Client, notification mqtt.ConnectionNotification) { switch n := notification.(type) { case mqtt.ConnectionNotificationConnected: logger.Info("Connected to MQTT broker") + // Re-susbcribe to topics on reconnect StartListening(logger, config, client) case mqtt.ConnectionNotificationConnecting: logger.Info("connecting", zap.Bool("isReconnect", n.IsReconnect), zap.Int("attempt", n.Attempt)) diff --git a/internal/mqtt/tls.go b/internal/mqtt/tls.go new file mode 100644 index 0000000..6db5d99 --- /dev/null +++ b/internal/mqtt/tls.go @@ -0,0 +1,34 @@ +package mqtt + +import ( + "MQTTLogger/config" + "crypto/tls" + "crypto/x509" + "os" + + "go.uber.org/zap" +) + +func NewTLSConfig(logger *zap.Logger, config *config.Config) *tls.Config { + + if config.CACert == "" { + return nil + } + + certpool := x509.NewCertPool() + pemCerts, err := os.ReadFile(config.CACert) + if err == nil { + certpool.AppendCertsFromPEM(pemCerts) + } else { + logger.Fatal("error loading CA cert", zap.Error(err)) + } + + return &tls.Config{ + RootCAs: certpool, + // We use the provided cert not the one server sends. + ClientAuth: tls.NoClientCert, + ClientCAs: nil, + InsecureSkipVerify: true, // I know + Certificates: nil, + } +}