86 lines
2.2 KiB
Go
86 lines
2.2 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"fmt"
|
|
"log"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
mqttClient "github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/ssp97/mqtt_relay/config"
|
|
)
|
|
|
|
var globalCfg *config.Config
|
|
|
|
func MqttClientInit(cfg *config.Config) {
|
|
globalCfg = cfg
|
|
}
|
|
|
|
func MqttClientConnect(session *Session) (err error) {
|
|
tlsConfig := &tls.Config{
|
|
InsecureSkipVerify: true,
|
|
ClientAuth: tls.NoClientCert,
|
|
}
|
|
opts := mqttClient.NewClientOptions()
|
|
opts.AddBroker(globalCfg.Relay.Broker)
|
|
opts.SetClientID(session.ClientId)
|
|
opts.SetTLSConfig(tlsConfig)
|
|
|
|
opts.SetUsername(string(session.Username))
|
|
opts.SetPassword(string(session.Password))
|
|
|
|
opts.OnConnect = func(c mqttClient.Client) {
|
|
fmt.Println("Connected to MQTT broker")
|
|
}
|
|
|
|
client := mqttClient.NewClient(opts)
|
|
session.Client = &client
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
log.Printf("Failed to connect to MQTT broker: %v\n", token.Error())
|
|
err = token.Error()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func MqttClientDisconnect(session *Session) {
|
|
client := *session.Client
|
|
client.Disconnect(0)
|
|
}
|
|
|
|
func MqttClientSubscribeMultiple(session *Session, filters map[string]byte) {
|
|
client := *session.Client
|
|
client.SubscribeMultiple(filters, func(client mqttClient.Client, msg mqttClient.Message) {
|
|
log.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
|
|
server := GetMqttServer()
|
|
err := server.Publish(msg.Topic(), msg.Payload(), msg.Retained(), msg.Qos())
|
|
if err != nil {
|
|
log.Println("server.Publish err=", err)
|
|
}
|
|
})
|
|
}
|
|
|
|
func MqttClientSubscribe(session *Session, topic string, qos byte) {
|
|
client := *session.Client
|
|
client.Subscribe(topic, qos, func(client mqtt.Client, msg mqtt.Message) {
|
|
log.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
|
|
server := GetMqttServer()
|
|
err := server.Publish(msg.Topic(), msg.Payload(), msg.Retained(), msg.Qos())
|
|
if err != nil {
|
|
log.Println("server.Publish err=", err)
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
func MqttClientUnsubscribe(session *Session, topic string) {
|
|
client := *session.Client
|
|
client.Unsubscribe(topic)
|
|
}
|
|
|
|
func MqttClientPublish(session *Session, topic string, qos byte, retained bool, payload interface{}) {
|
|
client := *session.Client
|
|
client.Publish(topic, qos, retained, payload)
|
|
}
|