package mqtt import ( "fmt" "gitee.com/xuyiping_admin/pkg/logger/zaplog" golangMqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" "kpt-temporary-mqtt/config" "kpt-temporary-mqtt/model" "kpt-temporary-mqtt/util" "strconv" "strings" ) var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) { zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic())) } var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) { zaplog.Info("connectedClient", zap.Any("client", client)) } var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) { zaplog.Info("connectLost", zap.Any("err", err.Error())) } func (d *DataEventEntry) NewMqtt(configOption *config.AppConfig) golangMqtt.Client { opts := golangMqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", configOption.Broker, configOption.Port)) opts.SetClientID(util.RandString(6)) opts.SetUsername(configOption.UserName) opts.SetPassword(configOption.Password) opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := golangMqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } return client } func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig, client golangMqtt.Client) { var subMsgChan = make(chan []byte, configOption.WorkNumber) client.Subscribe(configOption.SubTopName, 1, func(client golangMqtt.Client, msg golangMqtt.Message) { subMsgChan <- msg.Payload() }) //for i := 0; i < int(configOption.WorkNumber); i++ { //go func() { for { select { case msg := <-subMsgChan: d.CreatMsgLog(msg) } } //}() //} } func (d *DataEventEntry) CreatMsgLog(msg []byte) { subMsgLog, err := d.MsgDataFormat(msg) if err != nil { zaplog.Error("CreatMsgLog", zap.Any("err", err), zap.Any("msg", string(msg))) } if err := d.DB.Table(new(model.SubMsgLog).TableName()).Create(subMsgLog).Error; err != nil { fmt.Println("msg", string(msg)) fmt.Println("subMsgLog", subMsgLog) } } func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) { msgData := make(map[string]interface{}) pairs := strings.Split(util.MsgFormat(string(msg)), " ") for _, pair := range pairs { parts := strings.SplitN(pair, ":", 2) if len(parts) != 2 { continue } key, value := parts[0], parts[1] if len(key) == 0 { continue } msgData[key] = value } softVer := int64(0) if softVerInter, ok := msgData["SOFT_VER"]; ok { if softVerstr, ok := softVerInter.(string); ok { softVer, _ = strconv.ParseInt(softVerstr, 10, 64) } } uuid := "" if uuidInter, ok := msgData["uuid"]; ok { if uuidStr, ok := uuidInter.(string); ok { uuid = uuidStr } } frameId := int64(0) if frameIdInter, ok := msgData["frameid"]; ok { if frameId64, ok := frameIdInter.(string); ok { frameId, _ = strconv.ParseInt(frameId64, 10, 64) } } cowId := "" if cowIdInter, ok := msgData["cowid"]; ok { if cowIdStr, ok := cowIdInter.(string); ok { cowId = cowIdStr } } csq := int64(0) if csqInter, ok := msgData["csq"]; ok { if csq32, ok := csqInter.(string); ok { csq, _ = strconv.ParseInt(csq32, 10, 64) } } temp := float64(0) if tempInter, ok := msgData["Temp"]; ok { if tempFloat, ok := tempInter.(string); ok { temp, _ = strconv.ParseFloat(tempFloat, 64) } } imei := "" if imeiInter, ok := msgData["imei"]; ok { if imeiStr, ok := imeiInter.(string); ok { imei = imeiStr } } active := int64(0) if activeInter, ok := msgData["active"]; ok { if active32, ok := activeInter.(string); ok { active, _ = strconv.ParseInt(active32, 10, 64) } } inAction := int64(0) if inActionInter, ok := msgData["inactive"]; ok { if inAction32, ok := inActionInter.(string); ok { inAction, _ = strconv.ParseInt(inAction32, 10, 64) } } ruMina := int64(0) if ruMinaInter, ok := msgData["Rumina"]; ok { if ruMina32, ok := ruMinaInter.(string); ok { ruMina, _ = strconv.ParseInt(ruMina32, 10, 64) } } intake := int64(0) if intakeInter, ok := msgData["Intake"]; ok { if intake32, ok := intakeInter.(string); ok { intake, _ = strconv.ParseInt(intake32, 10, 64) } } gasp := int64(0) if gaspInter, ok := msgData["gasp"]; ok { if gasp32, ok := gaspInter.(string); ok { gasp, _ = strconv.ParseInt(gasp32, 10, 64) } } other := int64(0) if otherInter, ok := msgData["other"]; ok { if other32, ok := otherInter.(string); ok { other, _ = strconv.ParseInt(other32, 10, 64) } } reMain := int64(0) if reMainInter, ok := msgData["Remain"]; ok { if reMain32, ok := reMainInter.(string); ok { reMain, _ = strconv.ParseInt(reMain32, 10, 64) } } return &model.SubMsgLog{ SoftVer: softVer, Uuid: uuid, FrameId: frameId, CowId: cowId, Csq: csq, Temp: int64(temp * 100), Imei: imei, Active: int32(active), InActive: int32(inAction), RuMina: int32(ruMina), Intake: int32(intake), Gasp: int32(gasp), Other: int32(other), ReMain: int32(reMain), }, nil }