Browse Source

mqtt: init

Yi 5 months ago
commit
a78042a9eb
25 changed files with 873 additions and 0 deletions
  1. 49 0
      .drone.yml
  2. 8 0
      .idea/.gitignore
  3. 10 0
      .idea/kpt-temporary-mqtt.iml
  4. 6 0
      .idea/misc.xml
  5. 8 0
      .idea/modules.xml
  6. 6 0
      .idea/vcs.xml
  7. 28 0
      Dockerfile
  8. 0 0
      LICENSE
  9. 19 0
      cmd/mqtt.go
  10. 29 0
      cmd/root.go
  11. 12 0
      config/app.develop.yaml
  12. 61 0
      config/app.go
  13. 12 0
      config/app.production.yaml
  14. 29 0
      config/load_config.go
  15. 36 0
      dep/dep.go
  16. 20 0
      dep/di_mqtt.go
  17. 51 0
      go.mod
  18. 115 0
      go.sum
  19. 11 0
      main.go
  20. 6 0
      migrator/v0001_pasture_mqtt.sql
  21. 25 0
      model/sub_msg_log.go
  22. 201 0
      mqtt/sub.go
  23. 75 0
      store/kptstore/rw_store.go
  24. 41 0
      util/util.go
  25. 15 0
      util/util_test.go

+ 49 - 0
.drone.yml

@@ -0,0 +1,49 @@
+kind: pipeline
+type: docker
+name: kptTemporaryMqtt
+
+#clone:
+#  depth: 1
+#  disable: true
+
+steps:
+  #- name: clone
+  #  image: alpine/git
+  #  commands:
+  #    - git clone -b develop http://kpt.kptyun.cn:3000/xuyiping/kpt-pasture.git
+  #    - ls -l
+  #    - pwd
+  - name: build
+    image: plugins/docker:20.14.2
+    volumes:
+      - name: hosts
+        path: /etc/hosts
+      - name: docker-ca
+        path: /etc/docker
+      - name: docker-sock
+        path: /var/run/docker.sock
+    settings:
+      dockerfile: /drone/src/Dockerfile
+      username:
+        from_secret: aliyuncs_username
+      password:
+        from_secret: aliyuncs_password
+      repo: registry.cn-hangzhou.aliyuncs.com/kpt-event/kpt-temporary-mqtt
+      registry: registry.cn-hangzhou.aliyuncs.com
+      tags: [ test ]
+
+trigger:
+  branch:
+    include:
+    - feature/*
+  event:
+    include:
+    - push
+
+volumes:
+  - name: docker-ca
+    host:
+      path: /etc/docker
+  - name: docker-sock
+    host:
+      path: /var/run/docker.sock

+ 8 - 0
.idea/.gitignore

@@ -0,0 +1,8 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
+# 基于编辑器的 HTTP 客户端请求
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml

+ 10 - 0
.idea/kpt-temporary-mqtt.iml

@@ -0,0 +1,10 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="JAVA_MODULE" version="4">
+  <component name="Go" enabled="true" />
+  <component name="NewModuleRootManager" inherit-compiler-output="true">
+    <exclude-output />
+    <content url="file://$MODULE_DIR$" />
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

+ 6 - 0
.idea/misc.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectRootManager">
+    <output url="file://$PROJECT_DIR$/out" />
+  </component>
+</project>

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/.idea/kpt-temporary-mqtt.iml" filepath="$PROJECT_DIR$/.idea/kpt-temporary-mqtt.iml" />
+    </modules>
+  </component>
+</project>

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="" vcs="Git" />
+  </component>
+</project>

+ 28 - 0
Dockerfile

@@ -0,0 +1,28 @@
+FROM golang:1.19-alpine as build
+WORKDIR /app/kpt-temporary-mqtt
+
+COPY . .
+
+RUN mkdir -p ./bin
+
+RUN go env -w GO111MODULE=on && \
+    go env -w GOPROXY=https://goproxy.cn,direct && \
+    go env -w CGO_ENABLED=0 && \
+    go env -w GOARCH=amd64 && \
+    go env -w GOOS=linux && \
+    go build -o ./kptTemporaryMqtt -ldflags "-X kpt.kptyun.cn:3000/kpt-event/kpt-temporary-mqtt/pod.appVersion=beef" main.go
+
+
+FROM alpine:latest
+LABEL name="kpt-temporary-mqtt" \
+description="pt service" \
+owner="yiping.xu"
+
+WORKDIR /app/kpt-temporary-mqtt
+
+COPY --from=0 /app/kpt-temporary-mqtt/config/ /app/kpt-temporary-mqtt/config/
+COPY --from=0  /app/kpt-temporary-mqtt/kptPasture /app/kpt-temporary-mqtt/kptPasture
+
+VOLUME ["/app/kpt-temporary-mqtt/logger","/app/kpt-pasture/config"]
+
+CMD ["/app/kpt-temporary-mqtt/kptTemporaryMqtt","mqtt"]

+ 0 - 0
LICENSE


+ 19 - 0
cmd/mqtt.go

@@ -0,0 +1,19 @@
+package cmd
+
+import (
+	"kpt-temporary-mqtt/config"
+	"kpt-temporary-mqtt/mqtt"
+
+	"github.com/spf13/cobra"
+)
+
+// mqttCmd represents the mqtt command
+var mqttCmd = &cobra.Command{
+	Use:   "mqtt",
+	Short: "mqtt",
+	Run: func(cmd *cobra.Command, args []string) {
+		configOption := config.Options()
+		client := mqtt.NewMqtt(configOption)
+		mqtt.SubMsg(configOption, client)
+	},
+}

+ 29 - 0
cmd/root.go

@@ -0,0 +1,29 @@
+/*
+Copyright © 2024 NAME HERE <EMAIL ADDRESS>
+*/
+package cmd
+
+import (
+	"os"
+
+	"github.com/spf13/cobra"
+)
+
+// rootCmd represents the base command when called without any subcommands
+var rootCmd = &cobra.Command{
+	Use:   "kpt-temporary-mqtt",
+	Short: "科派腾临时订阅mqtt消息",
+}
+
+// Execute adds all child commands to the root command and sets flags appropriately.
+// This is called by main.main(). It only needs to happen once to the rootCmd.
+func Execute() {
+	err := rootCmd.Execute()
+	if err != nil {
+		os.Exit(1)
+	}
+}
+
+func init() {
+	rootCmd.AddCommand(mqttCmd)
+}

+ 12 - 0
config/app.develop.yaml

@@ -0,0 +1,12 @@
+user_name: "kptmqtt"
+password: "kepaiteng"
+sub_top_name: "kptmqtt"
+broker: "kptyun.com"
+port: 1983
+work_number: 4
+
+store:
+  show_sql: true
+  driver_name: mysql
+  kpt_rw: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
+  kpt_migr: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"

+ 61 - 0
config/app.go

@@ -0,0 +1,61 @@
+package config
+
+import (
+	"gitee.com/xuyiping_admin/pkg/di"
+	"os"
+	"strings"
+	"sync"
+)
+
+var (
+	Module   = di.Provide(Options)
+	options  *AppConfig
+	appEnv   string
+	initOnce sync.Once
+)
+
+// AppConfig store all configuration options
+type AppConfig struct {
+	Broker     string `yaml:"broker"`
+	Port       int64  `yaml:"port"`
+	UserName   string `yaml:"user_name"`
+	Password   string `yaml:"password"`
+	SubTopName string `yaml:"sub_top_name"`
+	WorkNumber int64  `yaml:"work_number"`
+
+	// 数据库配置 额外加载文件部分 database.yaml
+	StoreSetting StoreSetting `json:"storeSetting" yaml:"store"`
+}
+
+// StoreSetting 数据库配置
+type StoreSetting struct {
+	// 开启 SyDb SQL 记录
+	DriverName string `yaml:"driver_name" json:"driver_name"`
+	ShowSQL    bool   `yaml:"show_sql" json:"show_sql"`
+	KptRW      string `yaml:"kpt_rw" json:"kpt_rw"`
+	KptMigr    string `yaml:"kpt_migr" json:"kpt_migr"`
+}
+
+func Options() *AppConfig {
+	return options
+}
+
+func init() {
+	appEnv = strings.ToLower(os.Getenv("APP_ENVIRONMENT"))
+	cfg := &AppConfig{}
+	var err error
+	initOnce.Do(func() {
+		switch appEnv {
+		case "test":
+			err = Initialize("app.test.yaml", cfg)
+		case "production":
+			err = Initialize("app.production.yaml", cfg)
+		default:
+			err = Initialize("app.develop.yaml", cfg)
+		}
+		if err != nil {
+			panic(err)
+		}
+		options = cfg
+	})
+}

+ 12 - 0
config/app.production.yaml

@@ -0,0 +1,12 @@
+user_name: "kptmqtt"
+password: "kepaiteng"
+sub_top_name: "kptmqtt"
+broker: "kptyun.com"
+port: 1983
+work_number: 4
+
+store:
+  show_sql: true
+  driver_name: mysql
+  kpt_rw: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"
+  kpt_migr: "kpt_pasture:4~H@InK6jK@tcp(47.92.95.119:3326)/kpt_pasture?charset=utf8mb4&parseTime=true&loc=Local&allowNativePasswords=true&timeout=300s&readTimeout=300s&writeTimeout=300s"

+ 29 - 0
config/load_config.go

@@ -0,0 +1,29 @@
+package config
+
+import (
+	"fmt"
+	"os"
+
+	"github.com/mitchellh/mapstructure"
+	"github.com/spf13/viper"
+)
+
+var WorkDir = os.Getenv("MQTT_WORK_DIR")
+
+func Initialize(path string, cfgStruct interface{}) error {
+	if WorkDir == "" {
+		WorkDir = "."
+	}
+	dir := fmt.Sprintf("%s/config/%s", WorkDir, path)
+	viper.SetConfigType("yaml")
+	viper.SetConfigFile(dir)
+	if err := viper.ReadInConfig(); err != nil {
+		return err
+	}
+	if err := viper.Unmarshal(&cfgStruct, func(c *mapstructure.DecoderConfig) {
+		c.TagName = "yaml"
+	}); err != nil {
+		return err
+	}
+	return nil
+}

+ 36 - 0
dep/dep.go

@@ -0,0 +1,36 @@
+package dep
+
+import (
+	"kpt-temporary-mqtt/config"
+	"kpt-temporary-mqtt/store/kptstore"
+
+	"gitee.com/xuyiping_admin/pkg/di"
+)
+
+func DI(opts ...di.HubOption) *di.Hub {
+	var hubOpts []di.HubOption
+	if len(opts) != 0 {
+		hubOpts = append(opts, Global())
+	} else {
+		hubOpts = append(hubOpts, Global())
+	}
+
+	hub, err := di.New(hubOpts...)
+	if err != nil {
+		panic(err)
+	}
+
+	return hub
+}
+
+func Global() di.HubOption {
+	return di.Options(Options()...)
+}
+
+func Options() []di.HubOption {
+	return []di.HubOption{
+		// 基础依赖
+		config.Module,
+		kptstore.Module,
+	}
+}

+ 20 - 0
dep/di_mqtt.go

@@ -0,0 +1,20 @@
+package dep
+
+import (
+	golangMqtt "github.com/eclipse/paho.mqtt.golang"
+	"go.uber.org/dig"
+)
+
+func DIMqtt() (out *MqttDependency) {
+	container := DI()
+	if err := container.Invoke(func(c *MqttDependency) { out = c }); err != nil {
+		panic(err)
+	}
+	return
+}
+
+type MqttDependency struct {
+	dig.In
+
+	MqttServer golangMqtt.Client
+}

+ 51 - 0
go.mod

@@ -0,0 +1,51 @@
+module kpt-temporary-mqtt
+
+go 1.19
+
+require (
+	gitee.com/xuyiping_admin/pkg v0.0.0-20241010101255-0c6bd229b939
+	github.com/eclipse/paho.mqtt.golang v1.5.0
+	github.com/mitchellh/mapstructure v1.5.0
+	github.com/spf13/cobra v1.8.1
+	github.com/spf13/viper v1.19.0
+	github.com/stretchr/testify v1.9.0
+	go.uber.org/dig v1.15.0
+	go.uber.org/zap v1.24.0
+	gorm.io/driver/mysql v1.5.1
+	gorm.io/gorm v1.25.2
+)
+
+require (
+	github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
+	github.com/fsnotify/fsnotify v1.7.0 // indirect
+	github.com/getsentry/sentry-go v0.23.0 // indirect
+	github.com/go-sql-driver/mysql v1.7.0 // indirect
+	github.com/gorilla/websocket v1.5.3 // indirect
+	github.com/hashicorp/hcl v1.0.0 // indirect
+	github.com/inconshreveable/mousetrap v1.1.0 // indirect
+	github.com/jinzhu/inflection v1.0.0 // indirect
+	github.com/jinzhu/now v1.1.5 // indirect
+	github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
+	github.com/lestrrat-go/strftime v1.0.6 // indirect
+	github.com/magiconair/properties v1.8.7 // indirect
+	github.com/pelletier/go-toml/v2 v2.2.2 // indirect
+	github.com/pkg/errors v0.9.1 // indirect
+	github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
+	github.com/sagikazarmark/locafero v0.4.0 // indirect
+	github.com/sagikazarmark/slog-shim v0.1.0 // indirect
+	github.com/sirupsen/logrus v1.9.3 // indirect
+	github.com/sourcegraph/conc v0.3.0 // indirect
+	github.com/spf13/afero v1.11.0 // indirect
+	github.com/spf13/cast v1.6.0 // indirect
+	github.com/spf13/pflag v1.0.5 // indirect
+	github.com/subosito/gotenv v1.6.0 // indirect
+	go.uber.org/atomic v1.9.0 // indirect
+	go.uber.org/multierr v1.9.0 // indirect
+	golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
+	golang.org/x/net v0.27.0 // indirect
+	golang.org/x/sync v0.7.0 // indirect
+	golang.org/x/sys v0.22.0 // indirect
+	golang.org/x/text v0.16.0 // indirect
+	gopkg.in/ini.v1 v1.67.0 // indirect
+	gopkg.in/yaml.v3 v3.0.1 // indirect
+)

+ 115 - 0
go.sum

@@ -0,0 +1,115 @@
+gitee.com/xuyiping_admin/pkg v0.0.0-20241010101255-0c6bd229b939 h1:ubU0RZ/fiElF9NfDDaJh+czYryqKSoEiMd4ec1emtH8=
+gitee.com/xuyiping_admin/pkg v0.0.0-20241010101255-0c6bd229b939/go.mod h1:Fk4GYI/v0IK3XFrm1Gn+VkgCz5Y7mfswD5hsTJYOG6A=
+github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
+github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
+github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
+github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
+github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
+github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
+github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
+github.com/getsentry/sentry-go v0.23.0 h1:dn+QRCeJv4pPt9OjVXiMcGIBIefaTJPw/h0bZWO05nE=
+github.com/getsentry/sentry-go v0.23.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
+github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
+github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
+github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
+github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
+github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
+github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
+github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
+github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
+github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
+github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
+github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
+github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
+github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4=
+github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA=
+github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ=
+github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
+github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
+github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
+github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
+github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
+github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
+github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
+github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
+github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
+github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ=
+github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
+github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
+github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
+github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
+github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
+github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
+github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
+github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
+github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY=
+github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
+github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
+github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
+github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
+github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
+github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
+github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI=
+github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
+github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
+github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
+github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
+go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
+go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/dig v1.15.0 h1:vq3YWr8zRj1eFGC7Gvf907hE0eRjPTZ1d3xHadD6liE=
+go.uber.org/dig v1.15.0/go.mod h1:pKHs0wMynzL6brANhB2hLMro+zalv1osARTviTcqHLM=
+go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
+go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
+go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
+go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
+go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
+golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
+golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
+golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
+golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
+golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
+golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
+golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
+golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
+gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
+gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gorm.io/driver/mysql v1.5.1 h1:WUEH5VF9obL/lTtzjmML/5e6VfFR/788coz2uaVCAZw=
+gorm.io/driver/mysql v1.5.1/go.mod h1:Jo3Xu7mMhCyj8dlrb3WoCaRd1FhsVh+yMXb1jUInf5o=
+gorm.io/gorm v1.25.1/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
+gorm.io/gorm v1.25.2 h1:gs1o6Vsa+oVKG/a9ElL3XgyGfghFfkKA2SInQaCyMho=
+gorm.io/gorm v1.25.2/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=

+ 11 - 0
main.go

@@ -0,0 +1,11 @@
+/*
+Copyright © 2024 NAME HERE <EMAIL ADDRESS>
+
+*/
+package main
+
+import "kpt-temporary-mqtt/cmd"
+
+func main() {
+	cmd.Execute()
+}

+ 6 - 0
migrator/v0001_pasture_mqtt.sql

@@ -0,0 +1,6 @@
+CREATE TABLE IF NOT EXISTS `sub_msg_log` (
+   `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键自增id',
+   `created_at` bigint(20) unsigned NOT NULL COMMENT '创建时间',
+   `updated_at` bigint(20) unsigned NOT NULL COMMENT '更新时间',
+   PRIMARY KEY (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消费订阅日志表';

+ 25 - 0
model/sub_msg_log.go

@@ -0,0 +1,25 @@
+package model
+
+type SubMsgLog struct {
+	Id        int64  `json:"id"`
+	SoftVer   int64  `json:"softVer"`
+	Uuid      string `json:"uuid"`
+	FrameId   int64  `json:"frameId"`
+	CowId     string `json:"cowId"`
+	Csq       int64  `json:"csq"`
+	Temp      int64  `json:"temp"`
+	Imei      string `json:"imei" `
+	Active    int32  `json:"active"`
+	InActive  int32  `json:"inactive"`
+	RuMina    int32  `json:"ruMina"`
+	Intake    int32  `json:"intake"`
+	Gasp      int32  `json:"gasp"`
+	Other     int32  `json:"other"`
+	ReMain    int32  `json:"remain"`
+	CreatedAt int64  `json:"createdAt"`
+	UpdatedAt int64  `json:"updatedAt"`
+}
+
+func (s *SubMsgLog) TableName() string {
+	return "sub_msg_log"
+}

+ 201 - 0
mqtt/sub.go

@@ -0,0 +1,201 @@
+package mqtt
+
+import (
+	"fmt"
+	"gitee.com/xuyiping_admin/pkg/logger/zaplog"
+	golangMqtt "github.com/eclipse/paho.mqtt.golang"
+	"go.uber.org/zap"
+	"kpt-temporary-mqtt/config"
+	"kpt-temporary-mqtt/model"
+	"kpt-temporary-mqtt/util"
+	"strconv"
+	"strings"
+)
+
+var messagePubHandler golangMqtt.MessageHandler = func(client golangMqtt.Client, msg golangMqtt.Message) {
+	fmt.Printf("messagePubHandler Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
+}
+
+var connectHandler golangMqtt.OnConnectHandler = func(client golangMqtt.Client) {
+	fmt.Println("Connected-client", client)
+}
+
+var connectLostHandler golangMqtt.ConnectionLostHandler = func(client golangMqtt.Client, err error) {
+	fmt.Printf("Connect lost: %v\n", err.Error())
+}
+
+func NewMqtt(configOption *config.AppConfig) golangMqtt.Client {
+	opts := golangMqtt.NewClientOptions()
+	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", configOption.Broker, configOption.Port))
+	opts.SetClientID(util.RandString(6))
+	opts.SetUsername(configOption.UserName)
+	opts.SetPassword(configOption.Password)
+	opts.SetDefaultPublishHandler(messagePubHandler)
+	opts.OnConnect = connectHandler
+	opts.OnConnectionLost = connectLostHandler
+	client := golangMqtt.NewClient(opts)
+	if token := client.Connect(); token.Wait() && token.Error() != nil {
+		panic(token.Error())
+	}
+	return client
+}
+
+func SubMsg(configOption *config.AppConfig, client golangMqtt.Client) {
+	var subMsgChan = make(chan []byte, configOption.WorkNumber)
+	client.Subscribe(configOption.SubTopName, 1, func(client golangMqtt.Client, msg golangMqtt.Message) {
+		subMsgChan <- msg.Payload()
+	})
+
+	//for i := 0; i < int(configOption.WorkNumber); i++ {
+	//go func() {
+	for {
+		select {
+		case msg := <-subMsgChan:
+			CreatMsgLog(msg)
+		}
+	}
+	//}()
+	//}
+}
+
+func CreatMsgLog(msg []byte) {
+	subMsgLog, err := MsgDataFormat(msg)
+	if err != nil {
+		zaplog.Error("CreatMsgLog", zap.Any("err", err), zap.Any("msg", string(msg)))
+	}
+
+	//if err := kptstore.DB.Table(new(model.SubMsgLog).TableName()).Create(subMsgLog).Error; err != nil {
+	fmt.Println("msg", string(msg))
+	fmt.Println("subMsgLog", subMsgLog)
+	//}
+}
+
+func MsgDataFormat(msg []byte) (*model.SubMsgLog, error) {
+	msgData := make(map[string]interface{})
+	pairs := strings.Split(util.MsgFormat(string(msg)), " ")
+	for _, pair := range pairs {
+		parts := strings.SplitN(pair, ":", 2)
+		if len(parts) == 2 {
+			key, value := parts[0], parts[1]
+			if len(key) == 0 {
+				continue
+			}
+			msgData[key] = value
+		}
+	}
+
+	softVer := int64(0)
+	if softVerInter, ok := msgData["SOFT_VER"]; ok {
+		if softVerstr, ok := softVerInter.(string); ok {
+			softVer, _ = strconv.ParseInt(softVerstr, 10, 64)
+		}
+	}
+
+	uuid := ""
+	if uuidInter, ok := msgData["uuid"]; ok {
+		if uuidStr, ok := uuidInter.(string); ok {
+			uuid = uuidStr
+		}
+	}
+
+	frameId := int64(0)
+	if frameIdInter, ok := msgData["frameid"]; ok {
+		if frameId64, ok := frameIdInter.(string); ok {
+			frameId, _ = strconv.ParseInt(frameId64, 10, 64)
+		}
+	}
+	cowId := ""
+	if cowIdInter, ok := msgData["cowid"]; ok {
+		if cowIdStr, ok := cowIdInter.(string); ok {
+			cowId = cowIdStr
+		}
+	}
+
+	csq := int64(0)
+	if csqInter, ok := msgData["csq"]; ok {
+		if csq32, ok := csqInter.(string); ok {
+			csq, _ = strconv.ParseInt(csq32, 10, 64)
+		}
+	}
+
+	temp := float64(0)
+	if tempInter, ok := msgData["Temp"]; ok {
+		if tempFloat, ok := tempInter.(string); ok {
+			temp, _ = strconv.ParseFloat(tempFloat, 64)
+		}
+	}
+
+	imei := ""
+	if imeiInter, ok := msgData["imei"]; ok {
+		if imeiStr, ok := imeiInter.(string); ok {
+			imei = imeiStr
+		}
+	}
+
+	active := int64(0)
+	if activeInter, ok := msgData["active"]; ok {
+		if active32, ok := activeInter.(string); ok {
+			active, _ = strconv.ParseInt(active32, 10, 64)
+		}
+	}
+
+	inAction := int64(0)
+	if inActionInter, ok := msgData["inactive"]; ok {
+		if inAction32, ok := inActionInter.(string); ok {
+			inAction, _ = strconv.ParseInt(inAction32, 10, 64)
+		}
+	}
+
+	ruMina := int64(0)
+	if ruMinaInter, ok := msgData["Rumina"]; ok {
+		if ruMina32, ok := ruMinaInter.(string); ok {
+			ruMina, _ = strconv.ParseInt(ruMina32, 10, 64)
+		}
+	}
+
+	intake := int64(0)
+	if intakeInter, ok := msgData["Intake"]; ok {
+		if intake32, ok := intakeInter.(string); ok {
+			intake, _ = strconv.ParseInt(intake32, 10, 64)
+		}
+	}
+
+	gasp := int64(0)
+	if gaspInter, ok := msgData["gasp"]; ok {
+		if gasp32, ok := gaspInter.(string); ok {
+			gasp, _ = strconv.ParseInt(gasp32, 10, 64)
+		}
+	}
+
+	other := int64(0)
+	if otherInter, ok := msgData["other"]; ok {
+		if other32, ok := otherInter.(string); ok {
+			other, _ = strconv.ParseInt(other32, 10, 64)
+		}
+	}
+
+	reMain := int64(0)
+	if reMainInter, ok := msgData["Remain"]; ok {
+		if reMain32, ok := reMainInter.(string); ok {
+			reMain, _ = strconv.ParseInt(reMain32, 10, 64)
+		}
+	}
+
+	return &model.SubMsgLog{
+		SoftVer:  softVer,
+		Uuid:     uuid,
+		FrameId:  frameId,
+		CowId:    cowId,
+		Csq:      csq,
+		Temp:     int64(temp * 100),
+		Imei:     imei,
+		Active:   int32(active),
+		InActive: int32(inAction),
+		RuMina:   int32(ruMina),
+		Intake:   int32(intake),
+		Gasp:     int32(gasp),
+		Other:    int32(other),
+		ReMain:   int32(reMain),
+	}, nil
+
+}

+ 75 - 0
store/kptstore/rw_store.go

@@ -0,0 +1,75 @@
+package kptstore
+
+import (
+	"kpt-temporary-mqtt/config"
+	"time"
+
+	"gitee.com/xuyiping_admin/pkg/logger/logrus"
+	"gitee.com/xuyiping_admin/pkg/xerr"
+
+	"gorm.io/driver/mysql"
+	"gorm.io/gorm"
+	"gorm.io/gorm/logger"
+
+	"gitee.com/xuyiping_admin/pkg/di"
+)
+
+var Module = di.Options(
+	di.Provide(MustNewStore),
+)
+
+type DB struct {
+	*gorm.DB
+}
+
+func NewStore(engine *gorm.DB) *DB {
+	return &DB{engine}
+}
+
+type goRmLog struct {
+}
+
+func (g goRmLog) Printf(s string, i ...interface{}) {
+	logrus.Infof(s, i...)
+}
+
+var newLogger = logger.New(
+	goRmLog{},
+	logger.Config{
+		SlowThreshold: 5 * time.Second,
+		LogLevel:      logger.Info,
+	},
+)
+
+func MustNewStore(cfg *config.AppConfig) *DB {
+	db, err := gorm.Open(mysql.New(mysql.Config{
+		DriverName: cfg.StoreSetting.DriverName,
+		DSN:        cfg.StoreSetting.KptRW}),
+		&gorm.Config{Logger: newLogger},
+	)
+	if err != nil {
+		panic(xerr.WithStack(err))
+	}
+
+	if cfg.StoreSetting.ShowSQL {
+		db.Logger.LogMode(logger.Info)
+	}
+
+	return NewStore(db)
+}
+
+func MustMigrateStore(cfg *config.AppConfig) *gorm.DB {
+	db, err := gorm.Open(mysql.New(mysql.Config{
+		DriverName: cfg.StoreSetting.DriverName,
+		DSN:        cfg.StoreSetting.KptMigr}),
+		&gorm.Config{Logger: newLogger},
+	)
+	if err != nil {
+		panic(xerr.WithStack(err))
+	}
+
+	if cfg.StoreSetting.ShowSQL {
+		db.Logger.LogMode(logger.Info)
+	}
+	return db
+}

+ 41 - 0
util/util.go

@@ -0,0 +1,41 @@
+package util
+
+import (
+	"math/rand"
+	"regexp"
+	"time"
+)
+
+const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+const (
+	letterIdxBits = 6                    // 6 bits to represent a letter index
+	letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
+	letterIdxMax  = 63 / letterIdxBits   // # of letter indices fitting in 63 bits
+)
+
+func RandString(n int) string {
+	result := make([]byte, n)
+	// A rand.Int63() generates 63 random bits, enough for letterIdxMax characters!
+	rand.Seed(time.Now().UnixNano())
+	for i, cache, remain := n-1, rand.Int63(), letterIdxMax; i >= 0; {
+		if remain == 0 {
+			cache, remain = rand.Int63(), letterIdxMax
+		}
+		if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
+			result[i] = letterBytes[idx]
+			i--
+		}
+		cache >>= letterIdxBits
+		remain--
+	}
+	return string(result)
+}
+
+// MsgFormat 格式化消息字符串 字符串里面有多个冒号,仅删除冒号前后的空格(如果存在)
+func MsgFormat(input string) string {
+	// 定义正则表达式,用于匹配冒号两边的空格
+	re := regexp.MustCompile(`\s*:\s*`)
+
+	// 使用正则表达式替换所有匹配的部分
+	return re.ReplaceAllString(input, ":")
+}

+ 15 - 0
util/util_test.go

@@ -0,0 +1,15 @@
+package util
+
+import (
+	"github.com/stretchr/testify/assert"
+
+	"testing"
+)
+
+func TestMsgFormat(t *testing.T) {
+	inptut := `SOFT_VER: 3 uuid: 8 frameid: 0 cowid: 99999999 csq: 10 Temp: 30.56 imei:869678048355757 active : 631 inactive: 0 Rumina: 76 Intake: 0 gasp: 0 other: 11 Remain: 0`
+	output := `SOFT_VER:3 uuid:8 frameid:0 cowid:99999999 csq:10 Temp:30.56 imei:869678048355757 active:631 inactive:0 Rumina:76 Intake:0 gasp:0 other:11 Remain:0`
+
+	res := MsgFormat(inptut)
+	assert.Equal(t, output, res)
+}