package mqtt

import (
	"fmt"
	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
	golangMqtt "github.com/eclipse/paho.mqtt.golang"
	"go.uber.org/zap"
	"kpt-temporary-mqtt/config"
	"kpt-temporary-mqtt/model"
	"kpt-temporary-mqtt/util"
	"os"
	"os/signal"
	"strconv"
	"strings"
	"sync"
	"syscall"
	"time"
)

var golangMqttClient golangMqtt.Client

var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
	zaplog.Info("messagePubHandlerReceived", zap.Any("message", string(msg.Payload())), zap.Any("topic", msg.Topic()))
}

var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
	zaplog.Info("connectedClient", zap.Any("client", client))
}

// 连接丢失处理
var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
	zaplog.Error("connectLost", zap.Any("err", err.Error()))
	ConnectionRetry()
}

// ConnectionRetry 尝试重新连接
func ConnectionRetry() {
	options := config.Options()
	opts := golangMqtt.NewClientOptions().
		AddBroker(fmt.Sprintf("tcp://%s:%d", options.Broker, options.Port)).
		SetClientID(util.RandString(12)).
		SetUsername(options.UserName).
		SetPassword(options.Password).
		SetDefaultPublishHandler(messagePubHandler)
	opts.SetMaxReconnectInterval(5 * time.Minute)
	opts.SetKeepAlive(2 * time.Minute)
	opts.SetAutoReconnect(true)
	opts.SetConnectRetry(true)

	newClient := golangMqtt.NewClient(opts)
	if token := newClient.Connect(); token.Wait() && token.Error() == nil {
		// 成功重连,更新全局客户端实例
		golangMqttClient = newClient
		return // 退出重连循环
	} else {
		zaplog.Error("ConnectionRetry", zap.Any("err", token.Error()))
	}
}

func (d *DataEventEntry) NewMqtt(configOption *config.AppConfig) {
	opts := golangMqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", configOption.Broker, configOption.Port))
	opts.SetClientID(util.RandString(12))
	opts.SetUsername(configOption.UserName)
	opts.SetPassword(configOption.Password)
	opts.SetKeepAlive(2 * time.Minute)
	opts.SetAutoReconnect(true)
	opts.SetConnectRetry(true)
	opts.SetDefaultPublishHandler(messagePubHandler)
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectLostHandler
	client := golangMqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
	golangMqttClient = client
}

type DataInsertSubMsgLog struct {
	SubMsgLogList []*model.SubMsgLog
	Mx            *sync.RWMutex
}

func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig) {
	var subMsgChan = make(chan []byte, configOption.WorkNumber)
	golangMqttClient.Subscribe(configOption.SubTopName, 1, func(client golangMqtt.Client, msg golangMqtt.Message) {
		subMsgChan <- msg.Payload()
	})

	DSMLog := DataInsertSubMsgLog{
		SubMsgLogList: make([]*model.SubMsgLog, 0),
		Mx:            &sync.RWMutex{},
	}

	batchSize := 5
	batchList := make([]*model.SubMsgLog, 0, batchSize)

	sc := make(chan os.Signal, 1)
	signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

	// 设置2分钟超时
	tc := time.After(2 * time.Minute)

	go func() {
		for {
			select {
			case <-tc:
				if !golangMqttClient.IsConnectionOpen() || !golangMqttClient.IsConnected() {
					zaplog.Info("IsConnectionOpen")
					ConnectionRetry()
				}
			}
		}
	}()

	//for i := 0; i < int(configOption.WorkNumber); i++ {
	//go func() {
	for {
		select {
		case msg := <-subMsgChan:
			subMsLog := d.CreatMsgLog(msg)
			if subMsLog != nil {
				batchList = append(batchList, subMsLog)
			}
			if len(batchList) >= batchSize {
				DSMLog.Mx.Lock()
				DSMLog.SubMsgLogList = append(DSMLog.SubMsgLogList, batchList...)
				if len(DSMLog.SubMsgLogList) >= 10 {
					if err := d.DB.Create(DSMLog.SubMsgLogList).Error; err != nil {
						zaplog.Error("subMsgChan-info", zap.Any("err", err), zap.Any("subMsgLog", DSMLog.SubMsgLogList))
					}
					DSMLog.SubMsgLogList = make([]*model.SubMsgLog, 0)
				}
				DSMLog.Mx.Unlock()
				batchList = batchList[:0]
			}
		// 优雅退出
		case <-sc:
			if len(DSMLog.SubMsgLogList) > 0 {
				DSMLog.Mx.Lock()
				if err := d.DB.Create(DSMLog.SubMsgLogList).Error; err != nil {
					zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("subMsgLog", DSMLog.SubMsgLogList))
				}
				zaplog.Info("subMsgChan-os", zap.Any("success", DSMLog.SubMsgLogList))
				DSMLog.SubMsgLogList = make([]*model.SubMsgLog, 0)
				DSMLog.Mx.Unlock()
			}
			close(subMsgChan)
		case <-tc:
			if len(DSMLog.SubMsgLogList) > 0 {
				DSMLog.Mx.Lock()
				if err := d.DB.Create(DSMLog.SubMsgLogList).Error; err != nil {
					zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("subMsgLog", DSMLog.SubMsgLogList))
				}
				zaplog.Info("subMsgChan-os", zap.Any("success", DSMLog.SubMsgLogList))
				DSMLog.SubMsgLogList = make([]*model.SubMsgLog, 0)
				DSMLog.Mx.Unlock()
			}
		}
	}

	//}()
	//}
}

func (d *DataEventEntry) CreatMsgLog(msg []byte) *model.SubMsgLog {
	defer func() {
		if time.Now().Day()%15 == 0 {
			d.DB.Model(new(model.SubMsgLog)).Where("created_at < ?", time.Now().AddDate(0, 0, -15).Unix()).Delete(new(model.SubMsgLog))
			return
		}
	}()
	subMsgLog, err := d.MsgDataFormat(msg)
	if err != nil {
		zaplog.Error("CreatMsgLog", zap.Any("err", err), zap.Any("msg", string(msg)))
	}
	if subMsgLog == nil {
		return nil
	}
	if subMsgLog.Imei == "" {
		zaplog.Info("CreatMsgLog", zap.Any("msg", string(msg)), zap.Any("subMsgLog", subMsgLog))
		return nil
	}
	return subMsgLog
}

func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
	msgData := make(map[string]interface{})
	pairs := strings.Split(util.MsgFormat(string(msg)), " ")
	for _, pair := range pairs {
		parts := strings.SplitN(pair, ":", 2)
		if len(parts) != 2 {
			continue
		}
		key, value := parts[0], parts[1]
		if len(key) == 0 {
			continue
		}
		msgData[key] = value
	}

	softVer := int64(0)
	if softVerInter, ok := msgData["SOFT_VER"]; ok {
		if softVerstr, ok := softVerInter.(string); ok {
			softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
		}
	}

	uuid := ""
	if uuidInter, ok := msgData["uuid"]; ok {
		if uuidStr, ok := uuidInter.(string); ok {
			uuid = uuidStr
		}
	}

	frameId := int64(0)
	if frameIdInter, ok := msgData["frameid"]; ok {
		if frameId64, ok := frameIdInter.(string); ok {
			frameId, _ = strconv.ParseInt(frameId64, 10, 64)
		}
	}
	cowId := ""
	if cowIdInter, ok := msgData["cowid"]; ok {
		if cowIdStr, ok := cowIdInter.(string); ok {
			cowId = cowIdStr
		}
	}

	csq := int64(0)
	if csqInter, ok := msgData["csq"]; ok {
		if csq32, ok := csqInter.(string); ok {
			csq, _ = strconv.ParseInt(csq32, 10, 64)
		}
	}

	temp := float64(0)
	if tempInter, ok := msgData["Temp"]; ok {
		if tempFloat, ok := tempInter.(string); ok {
			temp, _ = strconv.ParseFloat(tempFloat, 64)
		}
	}

	imei := ""
	if imeiInter, ok := msgData["imei"]; ok {
		if imeiStr, ok := imeiInter.(string); ok {
			imei = imeiStr
		}
	}

	active := int64(0)
	if activeInter, ok := msgData["active"]; ok {
		if active32, ok := activeInter.(string); ok {
			active, _ = strconv.ParseInt(active32, 10, 64)
		}
	}

	inAction := int64(0)
	if inActionInter, ok := msgData["inactive"]; ok {
		if inAction32, ok := inActionInter.(string); ok {
			inAction, _ = strconv.ParseInt(inAction32, 10, 64)
		}
	}

	ruMina := int64(0)
	if ruMinaInter, ok := msgData["Rumina"]; ok {
		if ruMina32, ok := ruMinaInter.(string); ok {
			ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
		}
	}

	intake := int64(0)
	if intakeInter, ok := msgData["Intake"]; ok {
		if intake32, ok := intakeInter.(string); ok {
			intake, _ = strconv.ParseInt(intake32, 10, 64)
		}
	}

	gasp := int64(0)
	if gaspInter, ok := msgData["gasp"]; ok {
		if gasp32, ok := gaspInter.(string); ok {
			gasp, _ = strconv.ParseInt(gasp32, 10, 64)
		}
	}

	other := int64(0)
	if otherInter, ok := msgData["other"]; ok {
		if other32, ok := otherInter.(string); ok {
			other, _ = strconv.ParseInt(other32, 10, 64)
		}
	}

	reMain := int64(0)
	if reMainInter, ok := msgData["Remain"]; ok {
		if reMain32, ok := reMainInter.(string); ok {
			reMain, _ = strconv.ParseInt(reMain32, 10, 64)
		}
	}

	return &model.SubMsgLog{
		SoftVer:  softVer,
		Uuid:     uuid,
		FrameId:  frameId,
		CowId:    cowId,
		Csq:      csq,
		Temp:     int64(temp * 100),
		Imei:     imei,
		Active:   int32(active),
		InActive: int32(inAction),
		RuMina:   int32(ruMina),
		Intake:   int32(intake),
		Gasp:     int32(gasp),
		Other:    int32(other),
		ReMain:   int32(reMain),
	}, nil

}