add relay

This commit is contained in:
yuanpai 2024-08-22 13:09:44 +08:00
parent d48f4407fd
commit ee6fb20d3e
8 changed files with 146 additions and 53 deletions

6
go.mod
View File

@ -3,14 +3,14 @@ module github.com/ssp97/mqtt_relay
go 1.21.5 go 1.21.5
require ( require (
github.com/256dpi/gomqtt v0.14.4 github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/mochi-mqtt/server/v2 v2.6.5 github.com/mochi-mqtt/server/v2 v2.6.5
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
) )
require ( require (
github.com/256dpi/mercury v0.2.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect github.com/gorilla/websocket v1.5.3 // indirect
github.com/rs/xid v1.4.0 // indirect github.com/rs/xid v1.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
) )

38
go.sum
View File

@ -1,51 +1,25 @@
github.com/256dpi/gomqtt v0.14.4 h1:eO9FjRExQ9G9I3RTzDAEYhuS5KFy5RYudrnCvKSr8Z0=
github.com/256dpi/gomqtt v0.14.4/go.mod h1:s8uMqxWMl93jUyPGNutI1Duy2lxWkcQZ6qky+OPQmfM=
github.com/256dpi/mercury v0.2.0 h1:ImB0JYuZ28kwp2MpqnMdQFSD3z9mgaNYHrSjYuyP0LI=
github.com/256dpi/mercury v0.2.0/go.mod h1:xxgxZSQO7VUwxGLpk8yRVe/WF0MKH7nCIwSh4kUVMy4=
github.com/Pallinder/go-randomdata v1.2.0/go.mod h1:yHmJgulpD2Nfrm0cR9tI/+oAgRqCQQixsA8HyRZfV9Y=
github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzgHf/aS6+zVXRj14cVk9qg=
github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db/go.mod h1:rB3B4rKii8V21ydCbIzH5hZiCQE7f5E9SzUb/ZZx530=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:rZfgFAXFS/z/lEd6LJmf9HVZ1LkgYiHx5pHhV5DR16M= github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jpillora/backoff v0.0.0-20170918002102-8eab2debe79d/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mochi-mqtt/server/v2 v2.6.5 h1:9PiQ6EJt/Dx0ut0Fuuir4F6WinO/5Bpz9szujNwm+q8= github.com/mochi-mqtt/server/v2 v2.6.5 h1:9PiQ6EJt/Dx0ut0Fuuir4F6WinO/5Bpz9szujNwm+q8=
github.com/mochi-mqtt/server/v2 v2.6.5/go.mod h1:TqztjKGO0/ArOjJt9x9idk0kqPT3CVN8Pb+l+PS5Gdo= github.com/mochi-mqtt/server/v2 v2.6.5/go.mod h1:TqztjKGO0/ArOjJt9x9idk0kqPT3CVN8Pb+l+PS5Gdo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View File

@ -54,9 +54,10 @@ func main() {
log.Fatalf("Failed to open log file: %v", err) log.Fatalf("Failed to open log file: %v", err)
} }
defer logFile.Close() defer logFile.Close()
log.SetOutput(logFile) // log.SetOutput(logFile)
// Create a new MQTT server // Create a new MQTT server
mqtt.MqttClientInit(cfg)
server := mqtt.MqttServerStart(cfg) server := mqtt.MqttServerStart(cfg)
<-done <-done

73
mqtt/client.go Normal file
View File

@ -0,0 +1,73 @@
package mqtt
import (
"crypto/tls"
"fmt"
"log"
mqtt "github.com/eclipse/paho.mqtt.golang"
mqttClient "github.com/eclipse/paho.mqtt.golang"
"github.com/ssp97/mqtt_relay/config"
)
var globalCfg *config.Config
func MqttClientInit(cfg *config.Config) {
globalCfg = cfg
}
func MqttClientConnect(session *Session) (err error) {
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
ClientAuth: tls.NoClientCert,
}
opts := mqttClient.NewClientOptions()
opts.AddBroker(globalCfg.Relay.Broker)
opts.SetClientID(session.ClientId)
opts.SetTLSConfig(tlsConfig)
opts.SetUsername(string(session.Username))
opts.SetPassword(string(session.Password))
opts.OnConnect = func(c mqttClient.Client) {
fmt.Println("Connected to MQTT broker")
}
client := mqttClient.NewClient(opts)
session.Client = &client
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Printf("Failed to connect to MQTT broker: %v\n", token.Error())
err = token.Error()
}
return
}
func MqttClientDisconnect(session *Session) {
client := *session.Client
client.Disconnect(0)
}
func MqttClientSubscribe(session *Session, topic string, qos byte) {
client := *session.Client
client.Subscribe(topic, qos, func(client mqtt.Client, msg mqtt.Message) {
log.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
server := GetMqttServer()
err := server.Publish(msg.Topic(), msg.Payload(), msg.Retained(), msg.Qos())
if err != nil {
log.Println("server.Publish err=", err)
}
})
}
func MqttClientUnsubscribe(session *Session, topic string) {
client := *session.Client
client.Unsubscribe(topic)
}
func MqttClientPublish(session *Session, topic string, qos byte, retained bool, payload interface{}) {
client := *session.Client
client.Publish(topic, qos, retained, payload)
}

View File

@ -35,7 +35,7 @@ func (h *MqttServerHook) Provides(b byte) bool {
} }
func (h *MqttServerHook) Init(config any) error { func (h *MqttServerHook) Init(config any) error {
h.Log.Info("initialised") h.Log.Info("hook was initialised")
if _, ok := config.(*MqttServerHookOptions); !ok && config != nil { if _, ok := config.(*MqttServerHookOptions); !ok && config != nil {
return mqttServerV2.ErrInvalidConfigType return mqttServerV2.ErrInvalidConfigType
} }
@ -55,14 +55,18 @@ func (h *MqttServerHook) subscribeCallback(cl *mqttServerV2.Client, sub packets.
func (h *MqttServerHook) OnConnect(cl *mqttServerV2.Client, pk packets.Packet) error { func (h *MqttServerHook) OnConnect(cl *mqttServerV2.Client, pk packets.Packet) error {
h.Log.Info("client connected", "client", cl.ID) h.Log.Info("client connected", "client", cl.ID)
// session := GetSession(cl.ID)
// if session != nil {
// }
// Example demonstrating how to subscribe to a topic within the hook. // Example demonstrating how to subscribe to a topic within the hook.
h.config.Server.Subscribe("hook/direct/publish", 1, h.subscribeCallback) //h.config.Server.Subscribe("hook/direct/publish", 1, h.subscribeCallback)
// Example demonstrating how to publish a message within the hook // Example demonstrating how to publish a message within the hook
err := h.config.Server.Publish("hook/direct/publish", []byte("packet hook message"), false, 0) //err := h.config.Server.Publish("hook/direct/publish", []byte("packet hook message"), false, 0)
if err != nil { //if err != nil {
h.Log.Error("hook.publish", "error", err) // h.Log.Error("hook.publish", "error", err)
} //}
return nil return nil
} }
@ -78,22 +82,29 @@ func (h *MqttServerHook) OnDisconnect(cl *mqttServerV2.Client, err error, expire
func (h *MqttServerHook) OnSubscribed(cl *mqttServerV2.Client, pk packets.Packet, reasonCodes []byte) { func (h *MqttServerHook) OnSubscribed(cl *mqttServerV2.Client, pk packets.Packet, reasonCodes []byte) {
h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters) h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters)
session := GetSession(cl.ID)
for _, v := range pk.Filters {
MqttClientSubscribe(session, v.Filter, reasonCodes[0])
}
} }
func (h *MqttServerHook) OnUnsubscribed(cl *mqttServerV2.Client, pk packets.Packet) { func (h *MqttServerHook) OnUnsubscribed(cl *mqttServerV2.Client, pk packets.Packet) {
h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters) h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters)
session := GetSession(cl.ID)
for _, v := range pk.Filters {
MqttClientUnsubscribe(session, v.Filter)
}
} }
func (h *MqttServerHook) OnPublish(cl *mqttServerV2.Client, pk packets.Packet) (packets.Packet, error) { func (h *MqttServerHook) OnPublish(cl *mqttServerV2.Client, pk packets.Packet) (packets.Packet, error) {
session := GetSession(cl.ID)
if session != nil {
h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload)) h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload))
MqttClientPublish(session, pk.TopicName, pk.FixedHeader.Qos, pk.FixedHeader.Retain, pk.Payload)
pkx := pk
if string(pk.Payload) == "hello" {
pkx.Payload = []byte("hello world")
h.Log.Info("received modified packet from client", "client", cl.ID, "payload", string(pkx.Payload))
} }
return pkx, nil return pk, nil
} }
func (h *MqttServerHook) OnPublished(cl *mqttServerV2.Client, pk packets.Packet) { func (h *MqttServerHook) OnPublished(cl *mqttServerV2.Client, pk packets.Packet) {
@ -103,6 +114,11 @@ func (h *MqttServerHook) OnPublished(cl *mqttServerV2.Client, pk packets.Packet)
// OnConnectAuthenticate returns true/allowed for all requests. // OnConnectAuthenticate returns true/allowed for all requests.
func (h *MqttServerHook) OnConnectAuthenticate(cl *mqttServerV2.Client, pk packets.Packet) bool { func (h *MqttServerHook) OnConnectAuthenticate(cl *mqttServerV2.Client, pk packets.Packet) bool {
h.Log.Info("Auth", pk.Connect.Username, pk.Connect.Password) h.Log.Info("Auth", pk.Connect.Username, pk.Connect.Password)
session := CreateSession(cl.ID, pk.Connect.Username, pk.Connect.Password)
err := MqttClientConnect(session)
if err != nil {
return false
}
return true return true
} }

View File

@ -11,8 +11,16 @@ import (
"github.com/ssp97/mqtt_relay/config" "github.com/ssp97/mqtt_relay/config"
) )
var globalServer *mqttServerV2.Server
func GetMqttServer() *mqttServerV2.Server {
return globalServer
}
func MqttServerStart(cfg *config.Config) *mqttServerV2.Server { func MqttServerStart(cfg *config.Config) *mqttServerV2.Server {
server := mqttServerV2.New(nil) server := mqttServerV2.New(&mqttServerV2.Options{
InlineClient: true,
})
// _ = server.AddHook(new(auth.AllowHook), nil) // _ = server.AddHook(new(auth.AllowHook), nil)
var tlsConfig *tls.Config var tlsConfig *tls.Config
@ -52,5 +60,6 @@ func MqttServerStart(cfg *config.Config) *mqttServerV2.Server {
} }
}() }()
globalServer = server
return server return server
} }

View File

@ -1,6 +1,25 @@
package mqtt package mqtt
import mqttClient "github.com/eclipse/paho.mqtt.golang"
type Session struct { type Session struct {
Username string ClientId string
Password string Username []byte
Password []byte
Client *mqttClient.Client
}
var Sessions map[string]*Session = make(map[string]*Session)
func CreateSession(mqttId string, user, passwd []byte) *Session {
Sessions[mqttId] = &Session{
ClientId: mqttId,
Username: user,
Password: passwd,
}
return Sessions[mqttId]
}
func GetSession(mqttId string) *Session {
return Sessions[mqttId]
} }

View File

@ -6,7 +6,8 @@ server:
key_file: "userdata/_.tcljd.com.key" key_file: "userdata/_.tcljd.com.key"
relay: relay:
broker: "tcp://39.108.92.71:1883" #broker: "tls://39.108.92.71:1883"
broker: "tcp://mqtt.ts.yukishizuku.top:7071"
enable_tls: true enable_tls: true