| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 | package mqttimport (	"fmt"	"kpt-pasture/config"	"kpt-pasture/model"	"kpt-pasture/util"	"strconv"	"strings"	"sync"	"time"	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"	"gitee.com/xuyiping_admin/pkg/logger/zaplog"	golangMqtt "github.com/eclipse/paho.mqtt.golang"	"go.uber.org/zap")var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))}var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {	zaplog.Info("connectedClient", zap.Any("client", client))}var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {	zaplog.Info("connectLost", zap.Any("err", err.Error()))}func (d *DataEventEntry) NewMqtt(conf config.MqttSetting) golangMqtt.Client {	opts := golangMqtt.NewClientOptions()	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))	opts.SetClientID(conf.ClientId)	opts.SetCleanSession(false)	opts.SetUsername(conf.UserName)	opts.SetPassword(conf.Password)	opts.SetAutoReconnect(conf.AutoReconnect)	opts.SetDefaultPublishHandler(messagePubHandler)	opts.OnConnect = connectHandler	opts.OnConnectionLost = connectLostHandler	client := golangMqtt.NewClient(opts)	if token := client.Connect(); token.Wait() && token.Error() != nil {		panic(token.Error())	}	return client}var bufferPool = sync.Pool{	New: func() interface{} {		return make([]byte, 1024) // 根据实际情况调整缓冲区大小	},}func (d *DataEventEntry) SubMsg(conf config.MqttSetting, client golangMqtt.Client) {	var subMsgChan = make(chan []byte, 2*conf.WorkNumber)	if token := client.Subscribe(conf.Topic, byte(conf.Qos), func(client golangMqtt.Client, msg golangMqtt.Message) {		buffer := bufferPool.Get().([]byte)		copy(buffer, msg.Payload())		subMsgChan <- buffer[:len(msg.Payload())]	}); token.Wait() && token.Error() != nil {		close(subMsgChan)		zaplog.Error("SubMsg", zap.Any("configOption", conf), zap.Any("err", token.Error()))		return	}	defer close(subMsgChan)	select {	case msg := <-subMsgChan:		bufferPool.Put(msg)		d.ProcessMessages(msg)	}}func (d *DataEventEntry) ProcessMessages(msg []byte) {	neckRingOriginalData, err := d.MsgDataFormat(msg)	if err != nil {		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))		return	}	if neckRingOriginalData == nil {		return	}	if neckRingOriginalData.Imei == "" {		zaplog.Info("neckRingOriginalData", zap.Any("msg", string(msg)), zap.Any("neckRingOriginalData", neckRingOriginalData))		return	}	defer func() {		if time.Now().Day()%15 == 0 {			d.DB.Model(new(model.NeckRingOriginalData)).				Where("created_at < ?", time.Now().AddDate(-2, 0, 0).Unix()).				Delete(new(model.NeckRingOriginalData))			return		}	}()	// 计算牛只实际活动时间	nowDayTime := time.Now()	currHour := nowDayTime.Hour() + 2	frameIdHour := neckRingOriginalData.FrameId * 2	frameDayTime := fmt.Sprintf("%s %s:00:00", nowDayTime.Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour))	if frameIdHour > int64(currHour) {		frameDayTime = fmt.Sprintf("%s %s:00:00", nowDayTime.AddDate(0, 0, -1).Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour))	}	neckRingOriginalData.ActiveTime = frameDayTime	if err = d.DB.Create(neckRingOriginalData).Error; err != nil {		zaplog.Error("ProcessMessages", zap.Any("err", err), zap.Any("neckRingOriginalData", neckRingOriginalData))	}	// 更新脖环数据状态	neckRingStatus := pasturePb.NeckRingStatus_Normal	errorReason := ""	if neckRingOriginalData.FrameId >= 11 || neckRingOriginalData.FrameId < 0 {		neckRingStatus = pasturePb.NeckRingStatus_Error		errorReason = "数据异常"	}	d.DB.Model(new(model.NeckRingLog)).		Where("number = ?", neckRingOriginalData.Imei).		Updates(map[string]interface{}{			"status":       neckRingStatus,			"error_reason": errorReason,		})}func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.NeckRingOriginalData, error) {	msgData := make(map[string]interface{})	pairs := strings.Split(util.MsgFormat(string(msg)), " ")	for _, pair := range pairs {		parts := strings.SplitN(pair, ":", 2)		if len(parts) != 2 {			continue		}		key, value := parts[0], parts[1]		if len(key) == 0 {			continue		}		msgData[key] = value	}	softVer := int64(0)	if softVerInter, ok := msgData["SOFT_VER"]; ok {		if softVerstr, ok := softVerInter.(string); ok {			softVer, _ = strconv.ParseInt(softVerstr, 10, 64)		}	}	uuid := ""	if uuidInter, ok := msgData["uuid"]; ok {		if uuidStr, ok := uuidInter.(string); ok {			uuid = uuidStr		}	}	frameId := int64(0)	if frameIdInter, ok := msgData["frameid"]; ok {		if frameId64, ok := frameIdInter.(string); ok {			frameId, _ = strconv.ParseInt(frameId64, 10, 64)		}	}	cowId := ""	if cowIdInter, ok := msgData["cowid"]; ok {		if cowIdStr, ok := cowIdInter.(string); ok {			cowId = cowIdStr		}	}	csq := int64(0)	if csqInter, ok := msgData["csq"]; ok {		if csq32, ok := csqInter.(string); ok {			csq, _ = strconv.ParseInt(csq32, 10, 64)		}	}	temp := float64(0)	if tempInter, ok := msgData["Temp"]; ok {		if tempFloat, ok := tempInter.(string); ok {			temp, _ = strconv.ParseFloat(tempFloat, 64)		}	}	imei := ""	if imeiInter, ok := msgData["imei"]; ok {		if imeiStr, ok := imeiInter.(string); ok {			imei = imeiStr		}	}	active := int64(0)	if activeInter, ok := msgData["active"]; ok {		if active32, ok := activeInter.(string); ok {			active, _ = strconv.ParseInt(active32, 10, 64)		}	}	inAction := int64(0)	if inActionInter, ok := msgData["inactive"]; ok {		if inAction32, ok := inActionInter.(string); ok {			inAction, _ = strconv.ParseInt(inAction32, 10, 64)		}	}	ruMina := int64(0)	if ruMinaInter, ok := msgData["Rumina"]; ok {		if ruMina32, ok := ruMinaInter.(string); ok {			ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)		}	}	intake := int64(0)	if intakeInter, ok := msgData["Intake"]; ok {		if intake32, ok := intakeInter.(string); ok {			intake, _ = strconv.ParseInt(intake32, 10, 64)		}	}	gasp := int64(0)	if gaspInter, ok := msgData["gasp"]; ok {		if gasp32, ok := gaspInter.(string); ok {			gasp, _ = strconv.ParseInt(gasp32, 10, 64)		}	}	other := int64(0)	if otherInter, ok := msgData["other"]; ok {		if other32, ok := otherInter.(string); ok {			other, _ = strconv.ParseInt(other32, 10, 64)		}	}	reMain := int64(0)	if reMainInter, ok := msgData["Remain"]; ok {		if reMain32, ok := reMainInter.(string); ok {			reMain, _ = strconv.ParseInt(reMain32, 10, 64)		}	}	return &model.NeckRingOriginalData{		SoftVer:  softVer,		Uuid:     uuid,		FrameId:  frameId,		CowId:    cowId,		Csq:      csq,		Temp:     int64(temp * 100),		Imei:     imei,		Active:   int32(active),		InActive: int32(inAction),		RuMina:   int32(ruMina),		Intake:   int32(intake),		Gasp:     int32(gasp),		Other:    int32(other),		ReMain:   int32(reMain),		IsShow:   pasturePb.IsShow_No,	}, nil}
 |