diff --git a/mqtt/client.go b/mqtt/client.go index 7adc5d7..cc27807 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -48,6 +48,18 @@ func MqttClientDisconnect(session *Session) { client.Disconnect(0) } +func MqttClientSubscribeMultiple(session *Session, filters map[string]byte) { + client := *session.Client + client.SubscribeMultiple(filters, func(client mqttClient.Client, msg mqttClient.Message) { + log.Printf("Received message on topic %s: %s\n", msg.Topic(), msg.Payload()) + server := GetMqttServer() + err := server.Publish(msg.Topic(), msg.Payload(), msg.Retained(), msg.Qos()) + if err != nil { + log.Println("server.Publish err=", err) + } + }) +} + func MqttClientSubscribe(session *Session, topic string, qos byte) { client := *session.Client client.Subscribe(topic, qos, func(client mqtt.Client, msg mqtt.Message) { diff --git a/mqtt/hook.go b/mqtt/hook.go index f04ec05..e07f67b 100644 --- a/mqtt/hook.go +++ b/mqtt/hook.go @@ -83,9 +83,11 @@ func (h *MqttServerHook) OnDisconnect(cl *mqttServerV2.Client, err error, expire func (h *MqttServerHook) OnSubscribed(cl *mqttServerV2.Client, pk packets.Packet, reasonCodes []byte) { h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters) session := GetSession(cl.ID) - for _, v := range pk.Filters { - MqttClientSubscribe(session, v.Filter, reasonCodes[0]) + filter := make(map[string]byte) + for i, v := range pk.Filters { + filter[v.Filter] = reasonCodes[i] } + MqttClientSubscribeMultiple(session, filter) }