mqtt_relay/mqtt/hook.go
2024-08-22 02:58:23 +08:00

113 lines
3.3 KiB
Go

package mqtt
import (
"bytes"
"fmt"
mqttServerV2 "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
)
type MqttServerHookOptions struct {
Server *mqttServerV2.Server
}
type MqttServerHook struct {
mqttServerV2.HookBase
config *MqttServerHookOptions
}
func (h *MqttServerHook) ID() string {
return "events-example"
}
func (h *MqttServerHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqttServerV2.OnConnect,
mqttServerV2.OnDisconnect,
mqttServerV2.OnSubscribed,
mqttServerV2.OnUnsubscribed,
mqttServerV2.OnPublished,
mqttServerV2.OnPublish,
mqttServerV2.OnConnectAuthenticate,
mqttServerV2.OnACLCheck,
}, []byte{b})
}
func (h *MqttServerHook) Init(config any) error {
h.Log.Info("initialised")
if _, ok := config.(*MqttServerHookOptions); !ok && config != nil {
return mqttServerV2.ErrInvalidConfigType
}
h.config = config.(*MqttServerHookOptions)
if h.config.Server == nil {
return mqttServerV2.ErrInvalidConfigType
}
return nil
}
// subscribeCallback handles messages for subscribed topics
func (h *MqttServerHook) subscribeCallback(cl *mqttServerV2.Client, sub packets.Subscription, pk packets.Packet) {
h.Log.Info("hook subscribed message", "client", cl.ID, "topic", pk.TopicName)
}
func (h *MqttServerHook) OnConnect(cl *mqttServerV2.Client, pk packets.Packet) error {
h.Log.Info("client connected", "client", cl.ID)
// Example demonstrating how to subscribe to a topic within the hook.
h.config.Server.Subscribe("hook/direct/publish", 1, h.subscribeCallback)
// Example demonstrating how to publish a message within the hook
err := h.config.Server.Publish("hook/direct/publish", []byte("packet hook message"), false, 0)
if err != nil {
h.Log.Error("hook.publish", "error", err)
}
return nil
}
func (h *MqttServerHook) OnDisconnect(cl *mqttServerV2.Client, err error, expire bool) {
if err != nil {
h.Log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err)
} else {
h.Log.Info("client disconnected", "client", cl.ID, "expire", expire)
}
}
func (h *MqttServerHook) OnSubscribed(cl *mqttServerV2.Client, pk packets.Packet, reasonCodes []byte) {
h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters)
}
func (h *MqttServerHook) OnUnsubscribed(cl *mqttServerV2.Client, pk packets.Packet) {
h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters)
}
func (h *MqttServerHook) OnPublish(cl *mqttServerV2.Client, pk packets.Packet) (packets.Packet, error) {
h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload))
pkx := pk
if string(pk.Payload) == "hello" {
pkx.Payload = []byte("hello world")
h.Log.Info("received modified packet from client", "client", cl.ID, "payload", string(pkx.Payload))
}
return pkx, nil
}
func (h *MqttServerHook) OnPublished(cl *mqttServerV2.Client, pk packets.Packet) {
h.Log.Info("published to client", "client", cl.ID, "payload", string(pk.Payload))
}
// OnConnectAuthenticate returns true/allowed for all requests.
func (h *MqttServerHook) OnConnectAuthenticate(cl *mqttServerV2.Client, pk packets.Packet) bool {
h.Log.Info("Auth", pk.Connect.Username, pk.Connect.Password)
return true
}
// OnACLCheck returns true/allowed for all checks.
func (h *MqttServerHook) OnACLCheck(cl *mqttServerV2.Client, topic string, write bool) bool {
return true
}