Browse Source

mqtt: update

Yi 5 months ago
parent
commit
2a7bffd5e1
9 changed files with 119 additions and 24 deletions
  1. 51 0
      .gitignore
  2. 5 5
      cmd/mqtt.go
  3. 2 2
      config/app.develop.yaml
  4. 2 2
      config/app.production.yaml
  5. 2 0
      dep/dep.go
  6. 4 4
      dep/di_mqtt.go
  7. 17 3
      migrator/v0001_pasture_mqtt.sql
  8. 28 0
      module/mqtt/interface.go
  9. 8 8
      module/mqtt/sub.go

+ 51 - 0
.gitignore

@@ -0,0 +1,51 @@
+# ---> Go
+# Compiled Object files, Static and Dynamic libs (Shared Objects)
+*.o
+*.a
+*.so
+
+# Folders
+_obj
+_test
+
+# Architecture specific extensions/prefixes
+*.[568vq]
+[568vq].out
+
+*.cgo1.go
+*.cgo2.c
+_cgo_defun.c
+_cgo_gotypes.go
+_cgo_export.*
+
+_testmain.go
+
+*.exe
+*.test
+*.prof
+
+# User-specific stuff
+.idea/**/workspace.xml
+.idea/**/tasks.xml
+.idea/**/dictionaries
+.idea/**/shelf
+
+# Sensitive or high-churn files
+.idea/**/dataSources/
+.idea/**/dataSources.ids
+.idea/**/dataSources.local.xml
+.idea/**/sqlDataSources.xml
+.idea/**/dynamic.xml
+.idea/**/uiDesigner.xml
+.idea/**/dbnavigator.xml
+
+# Gradle
+.idea/**/gradle.xml
+.idea/**/libraries
+
+.idea/
+bin/
+.vscode/
+logger/
+config/private.key
+files/*

+ 5 - 5
cmd/mqtt.go

@@ -1,10 +1,9 @@
 package cmd
 
 import (
-	"kpt-temporary-mqtt/config"
-	"kpt-temporary-mqtt/mqtt"
-
 	"github.com/spf13/cobra"
+	"kpt-temporary-mqtt/config"
+	"kpt-temporary-mqtt/dep"
 )
 
 // mqttCmd represents the mqtt command
@@ -13,7 +12,8 @@ var mqttCmd = &cobra.Command{
 	Short: "mqtt",
 	Run: func(cmd *cobra.Command, args []string) {
 		configOption := config.Options()
-		client := mqtt.NewMqtt(configOption)
-		mqtt.SubMsg(configOption, client)
+		dataEvent := dep.DIMqtt()
+		mqttClient := dataEvent.DataEventEntry.NewMqtt(configOption)
+		dataEvent.DataEventEntry.SubMsgLog(configOption, mqttClient)
 	},
 }

+ 2 - 2
config/app.develop.yaml

@@ -8,5 +8,5 @@ work_number: 4
 store:
   show_sql: true
   driver_name: mysql
-  kpt_rw: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
-  kpt_migr: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
+  kpt_rw: "root:kpt!123456@tcp(210.16.188.161:3326)/mqtt_log?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
+  kpt_migr: "root:kpt!123456@tcp(210.16.188.161:3326)/mqtt_log?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"

+ 2 - 2
config/app.production.yaml

@@ -8,5 +8,5 @@ work_number: 4
 store:
   show_sql: true
   driver_name: mysql
-  kpt_rw: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
-  kpt_migr: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
+  kpt_rw: "root:kpt!123456@tcp(210.16.188.161:3326)/mqtt_log?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
+  kpt_migr: "root:kpt!123456@tcp(210.16.188.161:3326)/mqtt_log?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"

+ 2 - 0
dep/dep.go

@@ -2,6 +2,7 @@ package dep
 
 import (
 	"kpt-temporary-mqtt/config"
+	"kpt-temporary-mqtt/module/mqtt"
 	"kpt-temporary-mqtt/store/kptstore"
 
 	"gitee.com/xuyiping_admin/pkg/di"
@@ -32,5 +33,6 @@ func Options() []di.HubOption {
 		// 基础依赖
 		config.Module,
 		kptstore.Module,
+		mqtt.Module,
 	}
 }

+ 4 - 4
dep/di_mqtt.go

@@ -1,13 +1,13 @@
 package dep
 
 import (
-	golangMqtt "github.com/eclipse/paho.mqtt.golang"
 	"go.uber.org/dig"
+	"kpt-temporary-mqtt/module/mqtt"
 )
 
-func DIMqtt() (out *MqttDependency) {
+func DIMqtt() (out MqttDependency) {
 	container := DI()
-	if err := container.Invoke(func(c *MqttDependency) { out = c }); err != nil {
+	if err := container.Invoke(func(c MqttDependency) { out = c }); err != nil {
 		panic(err)
 	}
 	return
@@ -16,5 +16,5 @@ func DIMqtt() (out *MqttDependency) {
 type MqttDependency struct {
 	dig.In
 
-	MqttServer golangMqtt.Client
+	DataEventEntry mqtt.DataEvent
 }

+ 17 - 3
migrator/v0001_pasture_mqtt.sql

@@ -1,6 +1,20 @@
-CREATE TABLE IF NOT EXISTS `sub_msg_log` (
-   `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键自增id',
+CREATE TABLE `sub_msg_log` (
+   `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键Id自增',
+   `soft_ver` int(11) unsigned NOT NULL COMMENT '版本号',
+   `uuid` varchar(255) NOT NULL COMMENT 'uuid',
+   `frame_id` bigint(20) unsigned NOT NULL COMMENT '牧场Id',
+   `cow_id` varchar(255) NOT NULL COMMENT '牛号',
+   `csq` int(11) unsigned NOT NULL,
+   `temp` int(11) unsigned NOT NULL COMMENT '体温',
+   `imei` varchar(255) NOT NULL COMMENT '设备号',
+   `active` int(11) unsigned NOT NULL,
+   `in_active` int(11) unsigned NOT NULL COMMENT '静止时间',
+   `re_main` int(11) unsigned NOT NULL COMMENT '脖环存储数据剩余量',
+   `intake` int(11) unsigned NOT NULL COMMENT '采食时间',
+   `gasp` int(11) unsigned NOT NULL,
+   `other` int(11) unsigned NOT NULL COMMENT '其他时间',
+   `ru_mina` int(11) unsigned NOT NULL COMMENT '反刍时间',
    `created_at` bigint(20) unsigned NOT NULL COMMENT '创建时间',
    `updated_at` bigint(20) unsigned NOT NULL COMMENT '更新时间',
    PRIMARY KEY (`id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消费订阅日志表';
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='脖环数据日志'

+ 28 - 0
module/mqtt/interface.go

@@ -0,0 +1,28 @@
+package mqtt
+
+import (
+	"gitee.com/xuyiping_admin/pkg/di"
+	golangMqtt "github.com/eclipse/paho.mqtt.golang"
+	"go.uber.org/dig"
+	"kpt-temporary-mqtt/config"
+	"kpt-temporary-mqtt/store/kptstore"
+)
+
+var Module = di.Options(
+	di.Provide(NewDataEvent),
+)
+
+type DataEvent interface {
+	NewMqtt(configOption *config.AppConfig) golangMqtt.Client
+	SubMsgLog(configOption *config.AppConfig, client golangMqtt.Client)
+}
+
+type DataEventEntry struct {
+	dig.In
+
+	DB *kptstore.DB // DB
+}
+
+func NewDataEvent(entry DataEventEntry) DataEvent {
+	return &entry
+}

+ 8 - 8
mqtt/sub.go → module/mqtt/sub.go

@@ -7,7 +7,6 @@ import (
 	"go.uber.org/zap"
 	"kpt-temporary-mqtt/config"
 	"kpt-temporary-mqtt/model"
-	"kpt-temporary-mqtt/store/kptstore"
 	"kpt-temporary-mqtt/util"
 	"strconv"
 	"strings"
@@ -25,7 +24,7 @@ var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt
 	zaplog.Info("connectLost", zap.Any("err", err.Error()))
 }
 
-func NewMqtt(configOption *config.AppConfig) golangMqtt.Client {
+func (d *DataEventEntry) NewMqtt(configOption *config.AppConfig) golangMqtt.Client {
 	opts := golangMqtt.NewClientOptions()
 	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", configOption.Broker, configOption.Port))
 	opts.SetClientID(util.RandString(6))
@@ -41,7 +40,7 @@ func NewMqtt(configOption *config.AppConfig) golangMqtt.Client {
 	return client
 }
 
-func SubMsg(configOption *config.AppConfig, client golangMqtt.Client) {
+func (d *DataEventEntry) SubMsgLog(configOption *config.AppConfig, client golangMqtt.Client) {
 	var subMsgChan = make(chan []byte, configOption.WorkNumber)
 	client.Subscribe(configOption.SubTopName, 1, func(client golangMqtt.Client, msg golangMqtt.Message) {
 		subMsgChan <- msg.Payload()
@@ -52,26 +51,27 @@ func SubMsg(configOption *config.AppConfig, client golangMqtt.Client) {
 	for {
 		select {
 		case msg := <-subMsgChan:
-			CreatMsgLog(msg)
+			d.CreatMsgLog(msg)
 		}
 	}
+
 	//}()
 	//}
 }
 
-func CreatMsgLog(msg []byte) {
-	subMsgLog, err := MsgDataFormat(msg)
+func (d *DataEventEntry) CreatMsgLog(msg []byte) {
+	subMsgLog, err := d.MsgDataFormat(msg)
 	if err != nil {
 		zaplog.Error("CreatMsgLog", zap.Any("err", err), zap.Any("msg", string(msg)))
 	}
 
-	if err := &kptstore.DB.Table(new(model.SubMsgLog).TableName()).Create(subMsgLog).Error; err != nil {
+	if err := d.DB.Table(new(model.SubMsgLog).TableName()).Create(subMsgLog).Error; err != nil {
 		fmt.Println("msg", string(msg))
 		fmt.Println("subMsgLog", subMsgLog)
 	}
 }
 
-func MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
+func (d *DataEventEntry) MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
 	msgData := make(map[string]interface{})
 	pairs := strings.Split(util.MsgFormat(string(msg)), " ")
 	for _, pair := range pairs {