add MqttClientSubscribeMultiple

This commit is contained in:
ssp97 2024-08-22 20:24:55 +08:00
parent ee6fb20d3e
commit f2b81d358b
2 changed files with 16 additions and 2 deletions

View File

@ -48,6 +48,18 @@ func MqttClientDisconnect(session *Session) {
client.Disconnect(0) client.Disconnect(0)
} }
func MqttClientSubscribeMultiple(session *Session, filters map[string]byte) {
client := *session.Client
client.SubscribeMultiple(filters, func(client mqttClient.Client, msg mqttClient.Message) {
log.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload())
server := GetMqttServer()
err := server.Publish(msg.Topic(), msg.Payload(), msg.Retained(), msg.Qos())
if err != nil {
log.Println("server.Publish err=", err)
}
})
}
func MqttClientSubscribe(session *Session, topic string, qos byte) { func MqttClientSubscribe(session *Session, topic string, qos byte) {
client := *session.Client client := *session.Client
client.Subscribe(topic, qos, func(client mqtt.Client, msg mqtt.Message) { client.Subscribe(topic, qos, func(client mqtt.Client, msg mqtt.Message) {

View File

@ -83,9 +83,11 @@ func (h *MqttServerHook) OnDisconnect(cl *mqttServerV2.Client, err error, expire
func (h *MqttServerHook) OnSubscribed(cl *mqttServerV2.Client, pk packets.Packet, reasonCodes []byte) { func (h *MqttServerHook) OnSubscribed(cl *mqttServerV2.Client, pk packets.Packet, reasonCodes []byte) {
h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters) h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters)
session := GetSession(cl.ID) session := GetSession(cl.ID)
for _, v := range pk.Filters { filter := make(map[string]byte)
MqttClientSubscribe(session, v.Filter, reasonCodes[0]) for i, v := range pk.Filters {
filter[v.Filter] = reasonCodes[i]
} }
MqttClientSubscribeMultiple(session, filter)
} }