package mqtt import ( "bytes" "fmt" mqttServerV2 "github.com/mochi-mqtt/server/v2" "github.com/mochi-mqtt/server/v2/packets" ) type MqttServerHookOptions struct { Server *mqttServerV2.Server } type MqttServerHook struct { mqttServerV2.HookBase config *MqttServerHookOptions } func (h *MqttServerHook) ID() string { return "events-example" } func (h *MqttServerHook) Provides(b byte) bool { return bytes.Contains([]byte{ mqttServerV2.OnConnect, mqttServerV2.OnDisconnect, mqttServerV2.OnSubscribed, mqttServerV2.OnUnsubscribed, mqttServerV2.OnPublished, mqttServerV2.OnPublish, mqttServerV2.OnConnectAuthenticate, mqttServerV2.OnACLCheck, }, []byte{b}) } func (h *MqttServerHook) Init(config any) error { h.Log.Info("hook was initialised") if _, ok := config.(*MqttServerHookOptions); !ok && config != nil { return mqttServerV2.ErrInvalidConfigType } h.config = config.(*MqttServerHookOptions) if h.config.Server == nil { return mqttServerV2.ErrInvalidConfigType } return nil } // subscribeCallback handles messages for subscribed topics func (h *MqttServerHook) subscribeCallback(cl *mqttServerV2.Client, sub packets.Subscription, pk packets.Packet) { h.Log.Info("hook subscribed message", "client", cl.ID, "topic", pk.TopicName) } func (h *MqttServerHook) OnConnect(cl *mqttServerV2.Client, pk packets.Packet) error { h.Log.Info("client connected", "client", cl.ID) return nil } func (h *MqttServerHook) OnDisconnect(cl *mqttServerV2.Client, err error, expire bool) { if err != nil { h.Log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err) } else { h.Log.Info("client disconnected", "client", cl.ID, "expire", expire) } session := GetSession(cl.ID) MqttClientDisconnect(session) DelSession(session.ClientId) } func (h *MqttServerHook) OnSubscribed(cl *mqttServerV2.Client, pk packets.Packet, reasonCodes []byte) { h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters) session := GetSession(cl.ID) filter := make(map[string]byte) for i, v := range pk.Filters { filter[v.Filter] = reasonCodes[i] } MqttClientSubscribeMultiple(session, filter) } func (h *MqttServerHook) OnUnsubscribed(cl *mqttServerV2.Client, pk packets.Packet) { h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters) session := GetSession(cl.ID) for _, v := range pk.Filters { MqttClientUnsubscribe(session, v.Filter) } } func (h *MqttServerHook) OnPublish(cl *mqttServerV2.Client, pk packets.Packet) (packets.Packet, error) { session := GetSession(cl.ID) if session != nil { h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload)) MqttClientPublish(session, pk.TopicName, pk.FixedHeader.Qos, pk.FixedHeader.Retain, pk.Payload) } return pk, nil } func (h *MqttServerHook) OnPublished(cl *mqttServerV2.Client, pk packets.Packet) { h.Log.Info("published to client", "client", cl.ID, "topic", pk.TopicName, "payload", string(pk.Payload)) } // OnConnectAuthenticate returns true/allowed for all requests. func (h *MqttServerHook) OnConnectAuthenticate(cl *mqttServerV2.Client, pk packets.Packet) bool { h.Log.Info("Auth", pk.Connect.Username, pk.Connect.Password) session := GetSession(cl.ID) if session != nil { MqttClientDisconnect(session) DelSession(session.ClientId) } session = CreateSession(cl.ID, pk.Connect.Username, pk.Connect.Password) err := MqttClientConnect(session) if err != nil { return false } return true } // OnACLCheck returns true/allowed for all checks. func (h *MqttServerHook) OnACLCheck(cl *mqttServerV2.Client, topic string, write bool) bool { return true }