|
@@ -8,8 +8,12 @@ import (
|
|
|
"kpt-temporary-mqtt/config"
|
|
|
"kpt-temporary-mqtt/model"
|
|
|
"kpt-temporary-mqtt/util"
|
|
|
+ "os"
|
|
|
+ "os/signal"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
+ "syscall"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
@@ -41,18 +45,59 @@ func (d *DataEventEntry) NewMqtt(configOption *config.AppConfig) golangMqtt.Clie
|
|
|
return client
|
|
|
}
|
|
|
|
|
|
+type DataInsertSubMsgLog struct {
|
|
|
+ SubMsgLogList []*model.SubMsgLog
|
|
|
+ Mx *sync.RWMutex
|
|
|
+}
|
|
|
+
|
|
|
func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig, client golangMqtt.Client) {
|
|
|
var subMsgChan = make(chan []byte, configOption.WorkNumber)
|
|
|
client.Subscribe(configOption.SubTopName, 1, func(client golangMqtt.Client, msg golangMqtt.Message) {
|
|
|
subMsgChan <- msg.Payload()
|
|
|
})
|
|
|
|
|
|
+ DSMLog := DataInsertSubMsgLog{
|
|
|
+ SubMsgLogList: make([]*model.SubMsgLog, 0),
|
|
|
+ Mx: &sync.RWMutex{},
|
|
|
+ }
|
|
|
+
|
|
|
+ batchSize := 5
|
|
|
+ batchList := make([]*model.SubMsgLog, 0, batchSize)
|
|
|
+
|
|
|
+ sc := make(chan os.Signal, 1)
|
|
|
+ signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
|
|
+
|
|
|
//for i := 0; i < int(configOption.WorkNumber); i++ {
|
|
|
//go func() {
|
|
|
for {
|
|
|
select {
|
|
|
case msg := <-subMsgChan:
|
|
|
- d.CreatMsgLog(msg)
|
|
|
+ subMsLog := d.CreatMsgLog(msg)
|
|
|
+ batchList = append(batchList, subMsLog)
|
|
|
+ if len(batchList) >= batchSize {
|
|
|
+ DSMLog.Mx.Lock()
|
|
|
+ DSMLog.SubMsgLogList = append(DSMLog.SubMsgLogList, batchList...)
|
|
|
+ if len(DSMLog.SubMsgLogList) >= 10 {
|
|
|
+ if err := d.DB.Create(DSMLog.SubMsgLogList).Error; err != nil {
|
|
|
+ zaplog.Error("subMsgChan-info", zap.Any("err", err), zap.Any("subMsgLog", DSMLog.SubMsgLogList))
|
|
|
+ }
|
|
|
+ DSMLog.SubMsgLogList = make([]*model.SubMsgLog, 0)
|
|
|
+ }
|
|
|
+ DSMLog.Mx.Unlock()
|
|
|
+ batchList = batchList[:0]
|
|
|
+ }
|
|
|
+ // 优雅退出
|
|
|
+ case <-sc:
|
|
|
+ if len(DSMLog.SubMsgLogList) > 0 {
|
|
|
+ DSMLog.Mx.Lock()
|
|
|
+ if err := d.DB.Create(DSMLog.SubMsgLogList).Error; err != nil {
|
|
|
+ zaplog.Error("subMsgChan-os", zap.Any("err", err), zap.Any("subMsgLog", DSMLog.SubMsgLogList))
|
|
|
+ }
|
|
|
+ zaplog.Info("subMsgChan-os", zap.Any("success", DSMLog.SubMsgLogList))
|
|
|
+ DSMLog.SubMsgLogList = make([]*model.SubMsgLog, 0)
|
|
|
+ DSMLog.Mx.Unlock()
|
|
|
+ }
|
|
|
+ close(subMsgChan)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -60,29 +105,25 @@ func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig, client golang
|
|
|
//}
|
|
|
}
|
|
|
|
|
|
-func (d *DataEventEntry) CreatMsgLog(msg []byte) {
|
|
|
+func (d *DataEventEntry) CreatMsgLog(msg []byte) *model.SubMsgLog {
|
|
|
+ defer func() {
|
|
|
+ if time.Now().Day()%15 == 0 {
|
|
|
+ d.DB.Model(new(model.SubMsgLog)).Where("created_at < ?", time.Now().AddDate(0, 0, -15).Unix()).Delete(new(model.SubMsgLog))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }()
|
|
|
subMsgLog, err := d.MsgDataFormat(msg)
|
|
|
if err != nil {
|
|
|
zaplog.Error("CreatMsgLog", zap.Any("err", err), zap.Any("msg", string(msg)))
|
|
|
}
|
|
|
if subMsgLog == nil {
|
|
|
- return
|
|
|
+ return nil
|
|
|
}
|
|
|
if subMsgLog.Imei == "" {
|
|
|
zaplog.Info("CreatMsgLog", zap.Any("msg", string(msg)), zap.Any("subMsgLog", subMsgLog))
|
|
|
- return
|
|
|
- }
|
|
|
-
|
|
|
- defer func() {
|
|
|
- if time.Now().Day()%15 == 0 {
|
|
|
- d.DB.Model(new(model.SubMsgLog)).Where("created_at < ?", time.Now().AddDate(0, 0, -15).Unix()).Delete(new(model.SubMsgLog))
|
|
|
- return
|
|
|
- }
|
|
|
- }()
|
|
|
-
|
|
|
- if err = d.DB.Table(new(model.SubMsgLog).TableName()).Create(subMsgLog).Error; err != nil {
|
|
|
- zaplog.Error("CreatMsgLog", zap.Any("err", err), zap.Any("msg", string(msg)), zap.Any("subMsgLog", subMsgLog))
|
|
|
+ return nil
|
|
|
}
|
|
|
+ return subMsgLog
|
|
|
}
|
|
|
|
|
|
func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
|