package mqtt

import (
	"fmt"
	"kpt-pasture/config"
	"kpt-pasture/model"
	"kpt-pasture/util"
	"strconv"
	"strings"
	"sync"
	"time"

	pasturePb "gitee.com/xuyiping_admin/go_proto/proto/go/backend/cow"

	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
	golangMqtt "github.com/eclipse/paho.mqtt.golang"
	"go.uber.org/zap"
)

var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
}

var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
	zaplog.Info("connectedClient", zap.Any("client", client))
}

var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
	zaplog.Info("connectLost", zap.Any("err", err.Error()))
}

func (d *DataEventEntry) NewMqtt(conf config.MqttSetting) golangMqtt.Client {
	opts := golangMqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", conf.Broker, conf.Port))
	opts.SetClientID(conf.ClientId)
	opts.SetCleanSession(false)
	opts.SetUsername(conf.UserName)
	opts.SetPassword(conf.Password)
	opts.SetAutoReconnect(conf.AutoReconnect)
	opts.SetDefaultPublishHandler(messagePubHandler)
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectLostHandler
	client := golangMqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
	return client
}

var bufferPool = sync.Pool{
	New: func() interface{} {
		return make([]byte, 1024) // 根据实际情况调整缓冲区大小
	},
}

func (d *DataEventEntry) SubMsg(conf config.MqttSetting, client golangMqtt.Client) {
	var subMsgChan = make(chan []byte, 2*conf.WorkNumber)
	if token := client.Subscribe(conf.Topic, byte(conf.Qos), func(client golangMqtt.Client, msg golangMqtt.Message) {
		buffer := bufferPool.Get().([]byte)
		copy(buffer, msg.Payload())
		subMsgChan <- buffer[:len(msg.Payload())]
	}); token.Wait() && token.Error() != nil {
		close(subMsgChan)
		zaplog.Error("SubMsg", zap.Any("configOption", conf), zap.Any("err", token.Error()))
		return
	}

	defer close(subMsgChan)
	select {
	case msg := <-subMsgChan:
		bufferPool.Put(msg)
		d.ProcessMessages(msg)
	}
}

func (d *DataEventEntry) ProcessMessages(msg []byte) {
	neckRingOriginalData, err := d.MsgDataFormat(msg)
	if err != nil {
		zaplog.Error("MsgDataFormat", zap.Any("err", err), zap.Any("msg", string(msg)))
		return
	}
	if neckRingOriginalData == nil {
		return
	}

	if neckRingOriginalData.Imei == "" {
		zaplog.Info("neckRingOriginalData", zap.Any("msg", string(msg)), zap.Any("neckRingOriginalData", neckRingOriginalData))
		return
	}

	defer func() {
		if time.Now().Day()%15 == 0 {
			d.DB.Model(new(model.NeckRingOriginalData)).
				Where("created_at < ?", time.Now().AddDate(-2, 0, 0).Unix()).
				Delete(new(model.NeckRingOriginalData))
			return
		}
	}()

	// 计算牛只实际活动时间
	nowDayTime := time.Now()
	currHour := nowDayTime.Hour() + 2
	frameIdHour := neckRingOriginalData.FrameId * 2
	frameDayTime := fmt.Sprintf("%s %s:00:00", nowDayTime.Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour))
	if frameIdHour > int64(currHour) {
		frameDayTime = fmt.Sprintf("%s %s:00:00", nowDayTime.AddDate(0, 0, -1).Format(model.LayoutDate2), fmt.Sprintf("%02d", frameIdHour))
	}

	neckRingOriginalData.ActiveTime = frameDayTime
	if err = d.DB.Create(neckRingOriginalData).Error; err != nil {
		zaplog.Error("ProcessMessages", zap.Any("err", err), zap.Any("neckRingOriginalData", neckRingOriginalData))
	}

	// 更新脖环数据状态
	neckRingStatus := pasturePb.NeckRingStatus_Normal
	errorReason := ""
	if neckRingOriginalData.FrameId >= 11 || neckRingOriginalData.FrameId < 0 {
		neckRingStatus = pasturePb.NeckRingStatus_Error
		errorReason = "数据异常"
	}

	d.DB.Model(new(model.NeckRingLog)).
		Where("number = ?", neckRingOriginalData.Imei).
		Updates(map[string]interface{}{
			"status":       neckRingStatus,
			"error_reason": errorReason,
		})
}

func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.NeckRingOriginalData, error) {
	msgData := make(map[string]interface{})
	pairs := strings.Split(util.MsgFormat(string(msg)), " ")
	for _, pair := range pairs {
		parts := strings.SplitN(pair, ":", 2)
		if len(parts) != 2 {
			continue
		}
		key, value := parts[0], parts[1]
		if len(key) == 0 {
			continue
		}
		msgData[key] = value
	}

	softVer := int64(0)
	if softVerInter, ok := msgData["SOFT_VER"]; ok {
		if softVerstr, ok := softVerInter.(string); ok {
			softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
		}
	}

	uuid := ""
	if uuidInter, ok := msgData["uuid"]; ok {
		if uuidStr, ok := uuidInter.(string); ok {
			uuid = uuidStr
		}
	}

	frameId := int64(0)
	if frameIdInter, ok := msgData["frameid"]; ok {
		if frameId64, ok := frameIdInter.(string); ok {
			frameId, _ = strconv.ParseInt(frameId64, 10, 64)
		}
	}
	cowId := ""
	if cowIdInter, ok := msgData["cowid"]; ok {
		if cowIdStr, ok := cowIdInter.(string); ok {
			cowId = cowIdStr
		}
	}

	csq := int64(0)
	if csqInter, ok := msgData["csq"]; ok {
		if csq32, ok := csqInter.(string); ok {
			csq, _ = strconv.ParseInt(csq32, 10, 64)
		}
	}

	temp := float64(0)
	if tempInter, ok := msgData["Temp"]; ok {
		if tempFloat, ok := tempInter.(string); ok {
			temp, _ = strconv.ParseFloat(tempFloat, 64)
		}
	}

	imei := ""
	if imeiInter, ok := msgData["imei"]; ok {
		if imeiStr, ok := imeiInter.(string); ok {
			imei = imeiStr
		}
	}

	active := int64(0)
	if activeInter, ok := msgData["active"]; ok {
		if active32, ok := activeInter.(string); ok {
			active, _ = strconv.ParseInt(active32, 10, 64)
		}
	}

	inAction := int64(0)
	if inActionInter, ok := msgData["inactive"]; ok {
		if inAction32, ok := inActionInter.(string); ok {
			inAction, _ = strconv.ParseInt(inAction32, 10, 64)
		}
	}

	ruMina := int64(0)
	if ruMinaInter, ok := msgData["Rumina"]; ok {
		if ruMina32, ok := ruMinaInter.(string); ok {
			ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
		}
	}

	intake := int64(0)
	if intakeInter, ok := msgData["Intake"]; ok {
		if intake32, ok := intakeInter.(string); ok {
			intake, _ = strconv.ParseInt(intake32, 10, 64)
		}
	}

	gasp := int64(0)
	if gaspInter, ok := msgData["gasp"]; ok {
		if gasp32, ok := gaspInter.(string); ok {
			gasp, _ = strconv.ParseInt(gasp32, 10, 64)
		}
	}

	other := int64(0)
	if otherInter, ok := msgData["other"]; ok {
		if other32, ok := otherInter.(string); ok {
			other, _ = strconv.ParseInt(other32, 10, 64)
		}
	}

	reMain := int64(0)
	if reMainInter, ok := msgData["Remain"]; ok {
		if reMain32, ok := reMainInter.(string); ok {
			reMain, _ = strconv.ParseInt(reMain32, 10, 64)
		}
	}

	return &model.NeckRingOriginalData{
		SoftVer:  softVer,
		Uuid:     uuid,
		FrameId:  frameId,
		CowId:    cowId,
		Csq:      csq,
		Temp:     int64(temp * 100),
		Imei:     imei,
		Active:   int32(active),
		InActive: int32(inAction),
		RuMina:   int32(ruMina),
		Intake:   int32(intake),
		Gasp:     int32(gasp),
		Other:    int32(other),
		ReMain:   int32(reMain),
		IsShow:   pasturePb.IsShow_No,
	}, nil

}