package mqtt import ( "fmt" "kpt-pasture/config" "sync" "gitee.com/xuyiping_admin/pkg/logger/zaplog" golangMqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" ) var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) { zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic())) } var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) { zaplog.Info("connectedClient", zap.Any("client", client)) } var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) { zaplog.Info("connectLost", zap.Any("err", err.Error())) } func (d *DataEventEntry) NewMqtt(conf config.MqttSetting) golangMqtt.Client { opts := golangMqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port)) opts.SetClientID(conf.ClientId) opts.SetCleanSession(false) opts.SetUsername(conf.UserName) opts.SetPassword(conf.Password) opts.SetAutoReconnect(conf.AutoReconnect) opts.SetDefaultPublishHandler(messagePubHandler) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := golangMqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } return client } var bufferPool = sync.Pool{ New: func() interface{} { return make([]byte, 1024) // 根据实际情况调整缓冲区大小 }, } func (d *DataEventEntry) SubMsg(conf config.MqttSetting, client golangMqtt.Client) { var subMsgChan = make(chan []byte, 2*conf.WorkNumber) if token := client.Subscribe(conf.Topic, byte(conf.Qos), func(client golangMqtt.Client, msg golangMqtt.Message) { buffer := bufferPool.Get().([]byte) copy(buffer, msg.Payload()) subMsgChan <- buffer[:len(msg.Payload())] }); token.Wait() && token.Error() != nil { close(subMsgChan) zaplog.Error("SubMsg", zap.Any("configOption", conf), zap.Any("err", token.Error())) return } defer close(subMsgChan) select { case msg := <-subMsgChan: bufferPool.Put(msg) d.ProcessMessages(msg) } } func (d *DataEventEntry) ProcessMessages(msg []byte) { /*neckRingOriginalData, err := d.MsgDataFormat(msg) if err != nil { zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg))) return } if neckRingOriginalData == nil { return } if neckRingOriginalData.Imei == "" { zaplog.Info("neckRingOriginalData", zap.Any("msg", string(msg)), zap.Any("neckRingOriginalData", neckRingOriginalData)) return } defer func() { if time.Now().Day()%15 == 0 { d.DB.Model(new(model.NeckRingOriginalData)). Where("created_at < ?", time.Now().AddDate(-2, 0, 0).Unix()). Delete(new(model.NeckRingOriginalData)) return } }() // 计算牛只实际活动时间 nowDayTime := time.Now() currHour := nowDayTime.Hour() + 2 frameIdHour := neckRingOriginalData.FrameId * 2 frameDayTime := fmt.Sprintf("%s %s:00:00", nowDayTime.Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour)) if frameIdHour > int32(currHour) { frameDayTime = fmt.Sprintf("%s %s:00:00", nowDayTime.AddDate(0, 0, -1).Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour)) } neckRingOriginalData.ActiveTime = frameDayTime if err = d.DB.Create(neckRingOriginalData).Error; err != nil { zaplog.Error("ProcessMessages", zap.Any("err", err), zap.Any("neckRingOriginalData", neckRingOriginalData)) } // 更新脖环数据状态 neckRingStatus := pasturePb.NeckRingStatus_Normal errorReason := "" if neckRingOriginalData.FrameId >= 11 || neckRingOriginalData.FrameId < 0 { neckRingStatus = pasturePb.NeckRingStatus_Error errorReason = "数据异常" } d.DB.Model(new(model.NeckRingLog)). Where("number = ?", neckRingOriginalData.Imei). Updates(map[string]interface{}{ "status": neckRingStatus, "error_reason": errorReason, })*/ } /*func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.NeckRingOriginalData, error) { msgData := make(map[string]interface{}) pairs := strings.Split(util.MsgFormat(string(msg)), " ") for _, pair := range pairs { parts := strings.SplitN(pair, ":", 2) if len(parts) != 2 { continue } key, value := parts[0], parts[1] if len(key) == 0 { continue } msgData[key] = value } softVer := int64(0) if softVerInter, ok := msgData["SOFT_VER"]; ok { if softVerstr, ok := softVerInter.(string); ok { softVer, _ = strconv.ParseInt(softVerstr, 10, 64) } } uuid := "" if uuidInter, ok := msgData["uuid"]; ok { if uuidStr, ok := uuidInter.(string); ok { uuid = uuidStr } } frameId := int64(0) if frameIdInter, ok := msgData["frameid"]; ok { if frameId64, ok := frameIdInter.(string); ok { frameId, _ = strconv.ParseInt(frameId64, 10, 64) } } cowId := "" if cowIdInter, ok := msgData["cowid"]; ok { if cowIdStr, ok := cowIdInter.(string); ok { cowId = cowIdStr } } csq := int64(0) if csqInter, ok := msgData["csq"]; ok { if csq32, ok := csqInter.(string); ok { csq, _ = strconv.ParseInt(csq32, 10, 64) } } temp := float64(0) if tempInter, ok := msgData["Temp"]; ok { if tempFloat, ok := tempInter.(string); ok { temp, _ = strconv.ParseFloat(tempFloat, 64) } } imei := "" if imeiInter, ok := msgData["imei"]; ok { if imeiStr, ok := imeiInter.(string); ok { imei = imeiStr } } active := int64(0) if activeInter, ok := msgData["active"]; ok { if active32, ok := activeInter.(string); ok { active, _ = strconv.ParseInt(active32, 10, 64) } } inAction := int64(0) if inActionInter, ok := msgData["inactive"]; ok { if inAction32, ok := inActionInter.(string); ok { inAction, _ = strconv.ParseInt(inAction32, 10, 64) } } ruMina := int64(0) if ruMinaInter, ok := msgData["Rumina"]; ok { if ruMina32, ok := ruMinaInter.(string); ok { ruMina, _ = strconv.ParseInt(ruMina32, 10, 64) } } intake := int64(0) if intakeInter, ok := msgData["Intake"]; ok { if intake32, ok := intakeInter.(string); ok { intake, _ = strconv.ParseInt(intake32, 10, 64) } } gasp := int64(0) if gaspInter, ok := msgData["gasp"]; ok { if gasp32, ok := gaspInter.(string); ok { gasp, _ = strconv.ParseInt(gasp32, 10, 64) } } other := int64(0) if otherInter, ok := msgData["other"]; ok { if other32, ok := otherInter.(string); ok { other, _ = strconv.ParseInt(other32, 10, 64) } } reMain := int64(0) if reMainInter, ok := msgData["Remain"]; ok { if reMain32, ok := reMainInter.(string); ok { reMain, _ = strconv.ParseInt(reMain32, 10, 64) } } return &model.NeckRingOriginalData{ SoftVer: softVer, Uuid: uuid, OriginFrameId: int32(frameId), FrameId: int32(frameId), CowId: cowId, Csq: csq, Temp: int64(temp * 100), Imei: imei, Active: int32(active), InActive: int32(inAction), RuMina: int32(ruMina), Intake: int32(intake), Gasp: int32(gasp), Other: int32(other), ReMain: int32(reMain), IsShow: pasturePb.IsShow_No, }, nil } */