diff --git a/go.mod b/go.mod index a6bf434..3e096d6 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,14 @@ module github.com/ssp97/mqtt_relay go 1.21.5 require ( - github.com/256dpi/gomqtt v0.14.4 + github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/mochi-mqtt/server/v2 v2.6.5 gopkg.in/yaml.v2 v2.4.0 ) require ( - github.com/256dpi/mercury v0.2.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/rs/xid v1.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.7.0 // indirect ) diff --git a/go.sum b/go.sum index f9f86d1..b9b5013 100644 --- a/go.sum +++ b/go.sum @@ -1,51 +1,25 @@ -github.com/256dpi/gomqtt v0.14.4 h1:eO9FjRExQ9G9I3RTzDAEYhuS5KFy5RYudrnCvKSr8Z0= -github.com/256dpi/gomqtt v0.14.4/go.mod h1:s8uMqxWMl93jUyPGNutI1Duy2lxWkcQZ6qky+OPQmfM= -github.com/256dpi/mercury v0.2.0 h1:ImB0JYuZ28kwp2MpqnMdQFSD3z9mgaNYHrSjYuyP0LI= -github.com/256dpi/mercury v0.2.0/go.mod h1:xxgxZSQO7VUwxGLpk8yRVe/WF0MKH7nCIwSh4kUVMy4= -github.com/Pallinder/go-randomdata v1.2.0/go.mod h1:yHmJgulpD2Nfrm0cR9tI/+oAgRqCQQixsA8HyRZfV9Y= -github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzgHf/aS6+zVXRj14cVk9qg= -github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db/go.mod h1:rB3B4rKii8V21ydCbIzH5hZiCQE7f5E9SzUb/ZZx530= -github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= -github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= -github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= -github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:rZfgFAXFS/z/lEd6LJmf9HVZ1LkgYiHx5pHhV5DR16M= -github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= -github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= -github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= -github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mochi-mqtt/server/v2 v2.6.5 h1:9PiQ6EJt/Dx0ut0Fuuir4F6WinO/5Bpz9szujNwm+q8= github.com/mochi-mqtt/server/v2 v2.6.5/go.mod h1:TqztjKGO0/ArOjJt9x9idk0kqPT3CVN8Pb+l+PS5Gdo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/main.go b/main.go index 9248c27..f37d9c5 100644 --- a/main.go +++ b/main.go @@ -54,9 +54,10 @@ func main() { log.Fatalf("Failed to open log file: %v", err) } defer logFile.Close() - log.SetOutput(logFile) + // log.SetOutput(logFile) // Create a new MQTT server + mqtt.MqttClientInit(cfg) server := mqtt.MqttServerStart(cfg) <-done diff --git a/mqtt/client.go b/mqtt/client.go new file mode 100644 index 0000000..7adc5d7 --- /dev/null +++ b/mqtt/client.go @@ -0,0 +1,73 @@ +package mqtt + +import ( + "crypto/tls" + "fmt" + "log" + + mqtt "github.com/eclipse/paho.mqtt.golang" + mqttClient "github.com/eclipse/paho.mqtt.golang" + "github.com/ssp97/mqtt_relay/config" +) + +var globalCfg *config.Config + +func MqttClientInit(cfg *config.Config) { + globalCfg = cfg +} + +func MqttClientConnect(session *Session) (err error) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + ClientAuth: tls.NoClientCert, + } + opts := mqttClient.NewClientOptions() + opts.AddBroker(globalCfg.Relay.Broker) + opts.SetClientID(session.ClientId) + opts.SetTLSConfig(tlsConfig) + + opts.SetUsername(string(session.Username)) + opts.SetPassword(string(session.Password)) + + opts.OnConnect = func(c mqttClient.Client) { + fmt.Println("Connected to MQTT broker") + } + + client := mqttClient.NewClient(opts) + session.Client = &client + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Printf("Failed to connect to MQTT broker: %v\n", token.Error()) + err = token.Error() + } + + return +} + +func MqttClientDisconnect(session *Session) { + client := *session.Client + client.Disconnect(0) +} + +func MqttClientSubscribe(session *Session, topic string, qos byte) { + client := *session.Client + client.Subscribe(topic, qos, func(client mqtt.Client, msg mqtt.Message) { + log.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload()) + server := GetMqttServer() + err := server.Publish(msg.Topic(), msg.Payload(), msg.Retained(), msg.Qos()) + if err != nil { + log.Println("server.Publish err=", err) + } + + }) + +} + +func MqttClientUnsubscribe(session *Session, topic string) { + client := *session.Client + client.Unsubscribe(topic) +} + +func MqttClientPublish(session *Session, topic string, qos byte, retained bool, payload interface{}) { + client := *session.Client + client.Publish(topic, qos, retained, payload) +} diff --git a/mqtt/hook.go b/mqtt/hook.go index 7625ab9..f04ec05 100644 --- a/mqtt/hook.go +++ b/mqtt/hook.go @@ -35,7 +35,7 @@ func (h *MqttServerHook) Provides(b byte) bool { } func (h *MqttServerHook) Init(config any) error { - h.Log.Info("initialised") + h.Log.Info("hook was initialised") if _, ok := config.(*MqttServerHookOptions); !ok && config != nil { return mqttServerV2.ErrInvalidConfigType } @@ -55,14 +55,18 @@ func (h *MqttServerHook) subscribeCallback(cl *mqttServerV2.Client, sub packets. func (h *MqttServerHook) OnConnect(cl *mqttServerV2.Client, pk packets.Packet) error { h.Log.Info("client connected", "client", cl.ID) + // session := GetSession(cl.ID) + // if session != nil { + + // } // Example demonstrating how to subscribe to a topic within the hook. - h.config.Server.Subscribe("hook/direct/publish", 1, h.subscribeCallback) + //h.config.Server.Subscribe("hook/direct/publish", 1, h.subscribeCallback) // Example demonstrating how to publish a message within the hook - err := h.config.Server.Publish("hook/direct/publish", []byte("packet hook message"), false, 0) - if err != nil { - h.Log.Error("hook.publish", "error", err) - } + //err := h.config.Server.Publish("hook/direct/publish", []byte("packet hook message"), false, 0) + //if err != nil { + // h.Log.Error("hook.publish", "error", err) + //} return nil } @@ -78,22 +82,29 @@ func (h *MqttServerHook) OnDisconnect(cl *mqttServerV2.Client, err error, expire func (h *MqttServerHook) OnSubscribed(cl *mqttServerV2.Client, pk packets.Packet, reasonCodes []byte) { h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters) + session := GetSession(cl.ID) + for _, v := range pk.Filters { + MqttClientSubscribe(session, v.Filter, reasonCodes[0]) + } + } func (h *MqttServerHook) OnUnsubscribed(cl *mqttServerV2.Client, pk packets.Packet) { h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters) + session := GetSession(cl.ID) + for _, v := range pk.Filters { + MqttClientUnsubscribe(session, v.Filter) + } } func (h *MqttServerHook) OnPublish(cl *mqttServerV2.Client, pk packets.Packet) (packets.Packet, error) { - h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload)) - - pkx := pk - if string(pk.Payload) == "hello" { - pkx.Payload = []byte("hello world") - h.Log.Info("received modified packet from client", "client", cl.ID, "payload", string(pkx.Payload)) + session := GetSession(cl.ID) + if session != nil { + h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload)) + MqttClientPublish(session, pk.TopicName, pk.FixedHeader.Qos, pk.FixedHeader.Retain, pk.Payload) } - return pkx, nil + return pk, nil } func (h *MqttServerHook) OnPublished(cl *mqttServerV2.Client, pk packets.Packet) { @@ -103,6 +114,11 @@ func (h *MqttServerHook) OnPublished(cl *mqttServerV2.Client, pk packets.Packet) // OnConnectAuthenticate returns true/allowed for all requests. func (h *MqttServerHook) OnConnectAuthenticate(cl *mqttServerV2.Client, pk packets.Packet) bool { h.Log.Info("Auth", pk.Connect.Username, pk.Connect.Password) + session := CreateSession(cl.ID, pk.Connect.Username, pk.Connect.Password) + err := MqttClientConnect(session) + if err != nil { + return false + } return true } diff --git a/mqtt/server.go b/mqtt/server.go index 05dea77..8258e96 100644 --- a/mqtt/server.go +++ b/mqtt/server.go @@ -11,8 +11,16 @@ import ( "github.com/ssp97/mqtt_relay/config" ) +var globalServer *mqttServerV2.Server + +func GetMqttServer() *mqttServerV2.Server { + return globalServer +} + func MqttServerStart(cfg *config.Config) *mqttServerV2.Server { - server := mqttServerV2.New(nil) + server := mqttServerV2.New(&mqttServerV2.Options{ + InlineClient: true, + }) // _ = server.AddHook(new(auth.AllowHook), nil) var tlsConfig *tls.Config @@ -52,5 +60,6 @@ func MqttServerStart(cfg *config.Config) *mqttServerV2.Server { } }() + globalServer = server return server } diff --git a/mqtt/session.go b/mqtt/session.go index 3e228e3..b97828e 100644 --- a/mqtt/session.go +++ b/mqtt/session.go @@ -1,6 +1,25 @@ package mqtt +import mqttClient "github.com/eclipse/paho.mqtt.golang" + type Session struct { - Username string - Password string + ClientId string + Username []byte + Password []byte + Client *mqttClient.Client +} + +var Sessions map[string]*Session = make(map[string]*Session) + +func CreateSession(mqttId string, user, passwd []byte) *Session { + Sessions[mqttId] = &Session{ + ClientId: mqttId, + Username: user, + Password: passwd, + } + return Sessions[mqttId] +} + +func GetSession(mqttId string) *Session { + return Sessions[mqttId] } diff --git a/userdata/config.yml b/userdata/config.yml index 42ac800..7f90c54 100644 --- a/userdata/config.yml +++ b/userdata/config.yml @@ -6,7 +6,8 @@ server: key_file: "userdata/_.tcljd.com.key" relay: - broker: "tcp://39.108.92.71:1883" + #broker: "tls://39.108.92.71:1883" + broker: "tcp://mqtt.ts.yukishizuku.top:7071" enable_tls: true