mqtt.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682
  1. package kpt
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "time"
  12. MQTT "github.com/eclipse/paho.mqtt.golang"
  13. "github.com/go-mysql-org/go-mysql/canal"
  14. "github.com/go-mysql-org/go-mysql/mysql"
  15. "github.com/golang/snappy"
  16. "github.com/vmihailenco/msgpack/v5"
  17. )
  18. var (
  19. mqtt_host string
  20. mqtt_port int
  21. mqtt_path string
  22. mqtt_topic string
  23. mqtt_qos int
  24. mqtt_saslEnable bool
  25. mqtt_username string
  26. mqtt_saslpassword string
  27. mqtt_tlsEnable bool
  28. mqtt_clientcert string
  29. mqtt_clientkey string
  30. mqtt_cacert string
  31. mqtt_nodevalue string
  32. mqtt_CleanSession bool
  33. mqtt_Breaki int
  34. mqtt_onetimerows string
  35. temp_i int = 0
  36. )
  37. type SqlItem struct {
  38. Sql string
  39. }
  40. type TransferItem struct {
  41. Ftype string
  42. Content []byte
  43. }
  44. func SqlItemBuilder() interface{} {
  45. return &SqlItem{}
  46. }
  47. func reconnect(connOpts MQTT.Client) {
  48. for {
  49. fmt.Println("尝试重新连接...")
  50. if token := connOpts.Connect(); token.Wait() && token.Error() == nil {
  51. fmt.Println("重新连接成功")
  52. return
  53. }
  54. time.Sleep(5 * time.Second) // 等待5秒后再次尝试
  55. }
  56. }
  57. func clearPos() {
  58. c, err := canal.NewCanal(cfg)
  59. if err != nil {
  60. fmt.Printf("create canal err %v", err)
  61. os.Exit(1)
  62. }
  63. if len(ignoreTables) > 0 {
  64. subs := strings.Split(ignoreTables, ",")
  65. for _, sub := range subs {
  66. if seps := strings.Split(sub, "."); len(seps) == 2 {
  67. c.AddDumpIgnoreTables(seps[0], seps[1])
  68. }
  69. }
  70. }
  71. if len(tables) > 0 && len(tableDB) > 0 {
  72. subs := strings.Split(tables, ",")
  73. c.AddDumpTables(tableDB, subs...)
  74. } else if len(tableDB) > 0 {
  75. subs := strings.Split(tableDB, ",")
  76. c.AddDumpDatabases(subs...)
  77. }
  78. c.SetEventHandler(&handler{})
  79. // ctx := context.Background()
  80. // ctx, cancel = context.WithCancel(ctx)
  81. startPos_ := mysql.Position{
  82. Name: "",
  83. Pos: 0,
  84. }
  85. // initDB()
  86. insertEmpty()
  87. err = c.RunFrom(startPos_)
  88. logger.Error("start canal err second %v", err)
  89. }
  90. func onMessageReceived(client MQTT.Client, message MQTT.Message) {
  91. err := DqueueIndexMqtt.PushOneIndex(message.Payload()[:])
  92. if err != nil {
  93. logger.Errorln("DqueueIndexMqtt.PushOneIndex,err", err)
  94. }
  95. }
  96. func Mqtt_producerDB(ctx context.Context) {
  97. fmt.Println("Mqtt Clint 启动")
  98. retained := flag.Bool("retained", false, "Are the messages sent with the retained flag")
  99. connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves.
  100. connOpts.SetClientID(mqtt_nodevalue)
  101. connOpts.SetMaxReconnectInterval(1 * time.Second)
  102. connOpts.SetCleanSession(mqtt_CleanSession)
  103. connOpts.AutoReconnect = true
  104. brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
  105. int_, err := strconv.Atoi(onetimerows)
  106. if err == nil {
  107. } else {
  108. logger.Errorln("转换onetimerowsMqtt错误 \r\n", err.Error())
  109. }
  110. if mqtt_saslEnable {
  111. if mqtt_username != "" {
  112. connOpts.SetUsername(mqtt_username)
  113. if mqtt_saslpassword != "" {
  114. connOpts.SetPassword(mqtt_saslpassword)
  115. }
  116. }
  117. }
  118. if mqtt_tlsEnable {
  119. cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey)
  120. if err != nil {
  121. println("LoadX509KeyPair err :", err.Error())
  122. }
  123. connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  124. brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
  125. }
  126. connOpts.OnConnect = func(c MQTT.Client) {
  127. //if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil {
  128. // panic(token.Error())
  129. //}
  130. }
  131. connOpts.OnConnectionLost = func(c MQTT.Client, reason error) {
  132. println("onlost reason", reason.Error())
  133. clearPos()
  134. reconnect(c)
  135. }
  136. connOpts.AddBroker(brokerURL)
  137. //connOpts.OnConnect = func(c MQTT.Client) {
  138. // if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil {
  139. // //panic(token.Error())
  140. // fmt.Println("panic(token.Error())",token.Error())
  141. // }else{
  142. // //fmt.Println("",token,token.Error(),mqtt_topic)
  143. // }
  144. //
  145. //}
  146. // var reconnectHandler mqtt.ReconnectHandler = func(client mqtt.Client, opts *mqtt.ClientOptions) {
  147. // fmt.Println("Reconnecting to MQTT broker...")
  148. // if token := client.Connect(); token.Wait() && token.Error() != nil {
  149. // log.Fatalf("Failed to reconnect: %v", token.Error())
  150. // }
  151. // }
  152. // connOpts.OnReconnecting = reconnectHandler
  153. mqttClient := MQTT.NewClient(connOpts)
  154. if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
  155. panic(token.Error())
  156. }
  157. logger.Errorln("[MQTT] Connected")
  158. for {
  159. select {
  160. case <-ctx.Done():
  161. fmt.Println("mqtt 退出")
  162. return
  163. default:
  164. PopPeeks, err, full, null := DqueueIndex.PopPeeksV2(int_)
  165. if err != nil {
  166. fmt.Println("PopPeeks,err ", err)
  167. } else {
  168. if len(PopPeeks) == 0 {
  169. if !full && null {
  170. DqueueIndex.PopOneIndex()
  171. }
  172. time.Sleep(time.Second)
  173. continue
  174. }
  175. //fmt.Println("PopPeeks==",string(PopPeeks[0]))
  176. datasSlice := make([]TransferItem, len(PopPeeks))
  177. for key, value := range PopPeeks {
  178. //println("resql =========",string(value))
  179. datasSlice[key].Content = value
  180. }
  181. b1, err := msgpack.Marshal(&datasSlice)
  182. if err != nil {
  183. panic(err)
  184. }
  185. //fmt.Println("encode", len(b1))
  186. encode := snappy.Encode(nil, b1)
  187. logger.Infoln("encode", len(encode))
  188. //println("encode",len(encode))
  189. token := mqttClient.Publish(mqtt_topic, byte(mqtt_qos), *retained, encode)
  190. //if token.Wait() && token.Error() != nil {
  191. // fmt.Println("=============",token.Error())
  192. //}
  193. if token.Wait() && token.Error() != nil {
  194. if showlog > 0 {
  195. fmt.Println(" mqttClient.Publish err:", mqtt_topic, string(PopPeeks[0]), err)
  196. }
  197. logger.Errorln("Client.Publish err:--- ", token.Error())
  198. } else {
  199. temp_i++
  200. //fmt.Println(temp_i)
  201. logger.Infoln("Client.Publish len:--- ", len(encode))
  202. if !full && null {
  203. for i := 0; i < len(PopPeeks)+1; i++ {
  204. _, _ = DqueueIndex.PopOneIndex()
  205. if showlog > 0 {
  206. //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err)
  207. }
  208. }
  209. } else {
  210. for i := 0; i < len(PopPeeks); i++ {
  211. _, _ = DqueueIndex.PopOneIndex()
  212. if showlog > 0 {
  213. //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err)
  214. }
  215. }
  216. }
  217. }
  218. }
  219. }
  220. }
  221. }
  222. func Mq_Consumer(ctx context.Context) {
  223. var err error
  224. DqueueIndexMqtt, err = OpenIndexFile(CurrentPath+"logs", 100000, 10000, false)
  225. if err != nil {
  226. fmt.Printf("create DqueueIndex err %v", err)
  227. os.Exit(1)
  228. }
  229. fmt.Println("Mqtt Clint 启动")
  230. connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves.
  231. connOpts.SetClientID(mqtt_nodevalue)
  232. connOpts.SetMaxReconnectInterval(1 * time.Second)
  233. connOpts.SetCleanSession(mqtt_CleanSession)
  234. connOpts.AutoReconnect = true
  235. brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
  236. if mqtt_saslEnable {
  237. if mqtt_username != "" {
  238. connOpts.SetUsername(mqtt_username)
  239. if mqtt_saslpassword != "" {
  240. connOpts.SetPassword(mqtt_saslpassword)
  241. }
  242. }
  243. }
  244. if mqtt_tlsEnable {
  245. cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey)
  246. check(err)
  247. connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  248. brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
  249. }
  250. connOpts.AddBroker(brokerURL)
  251. connOpts.OnConnect = func(c MQTT.Client) {
  252. if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil {
  253. fmt.Println("panic(token.Error())", token.Error())
  254. }
  255. }
  256. label:
  257. mqttClient := MQTT.NewClient(connOpts)
  258. if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
  259. if showlog > 0 {
  260. fmt.Println("token.Error(),err", token.Error())
  261. }
  262. logger.Errorln("token.Error(),err", token.Error())
  263. time.Sleep(time.Second)
  264. goto label
  265. //panic(token.Error())
  266. }
  267. log.Println("[MQTT] Connected")
  268. go func() {
  269. for {
  270. select {
  271. case <-ctx.Done():
  272. fmt.Println("mqtt 退出")
  273. logger.Errorln("mqtt 退出")
  274. return
  275. default:
  276. PopPeeks, err, full, null := DqueueIndexMqtt.PopPeeksV2(1)
  277. fmt.Println("DqueueIndexMqtt.PopPeeksV2(1)", len(PopPeeks))
  278. if err != nil {
  279. logger.Errorln("PopPeeks,err ", err)
  280. } else {
  281. if len(PopPeeks) == 0 {
  282. if !full && null {
  283. DqueueIndexMqtt.PopOneIndex()
  284. }
  285. if showlog > 0 {
  286. fmt.Println("PopPeeks,null,", PopPeeks)
  287. }
  288. time.Sleep(time.Second)
  289. continue
  290. }
  291. // var pop []byte
  292. labelDb:
  293. if mqttDb == nil {
  294. mqttDb, err = GetmyDbsConnect(user, password, host, port, tableDB)
  295. if err != nil {
  296. logger.Errorln("GetmyDbsConnect err:", err)
  297. time.Sleep(time.Second)
  298. goto labelDb
  299. }
  300. }
  301. //fmt.Println("mqttDb.Begin()")
  302. tx, err := mqttDb.Begin()
  303. if err == nil {
  304. //fmt.Println("tx, err := mqttDb.Begin() err", err)
  305. decode, _ := snappy.Decode(nil, PopPeeks[0])
  306. fmt.Println("encode", string(PopPeeks[0]))
  307. var item1 []TransferItem
  308. err = msgpack.Unmarshal(decode, &item1)
  309. if err != nil {
  310. log.Println("Unmarshal panic(err)", err)
  311. }
  312. // fmt.Println(item1)
  313. count := 0
  314. str := ""
  315. errstr := ""
  316. for _, value := range item1 {
  317. sqls := strings.Split(string(value.Content), ";")
  318. for _, sql := range sqls {
  319. //err = ExecT(tx, string(value.Content))
  320. if sql == "" {
  321. continue
  322. }
  323. sql = strings.Replace(sql, "\\", "\\\\", -1)
  324. if strings.Index(sql, "downloadedplan") > 0 {
  325. fmt.Println(sql)
  326. }
  327. logger.Infoln(sql, "test")
  328. err = ExecT(tx, sql)
  329. // db, err := tx.Exec(sql)
  330. //fmt.Println("string(value.Content)",string(value.Content))
  331. if err == nil {
  332. //logger.Infoln(fmt.Sprintf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), sqlline))
  333. //fmt.Println( "ExecT---" + string(value.Content))
  334. } else {
  335. logger.Errorln(err.Error() + "---" + sql)
  336. count++
  337. str = string(value.Content)
  338. errstr = err.Error()
  339. //fmt.Println(err.Error() + "---" + string(value.Content))
  340. }
  341. }
  342. }
  343. if count > 0 {
  344. count = 0
  345. logger.Errorln(errstr + "---" + str)
  346. }
  347. if showlog > 0 {
  348. log.Println("PopPeeks, %d \n\t", len(item1))
  349. }
  350. } else {
  351. if showlog > 0 {
  352. log.Println("mqttDb.Begin(),err", err)
  353. }
  354. }
  355. err = tx.Commit()
  356. if err == nil {
  357. if !full && null {
  358. for i := 0; i < len(PopPeeks)+1; i++ {
  359. _, _ = DqueueIndexMqtt.PopOneIndex()
  360. if showlog > 0 {
  361. //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err)
  362. }
  363. }
  364. } else {
  365. for i := 0; i < len(PopPeeks); i++ {
  366. _, _ = DqueueIndexMqtt.PopOneIndex()
  367. if showlog > 0 {
  368. //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()2", string(pop), err)
  369. }
  370. }
  371. }
  372. } else {
  373. logger.Errorln("tx.Commit(),err", err)
  374. if showlog > 0 {
  375. fmt.Println("tx.Commit(),err", err)
  376. }
  377. }
  378. }
  379. }
  380. }
  381. }()
  382. }
  383. func check(err error) {
  384. if err != nil {
  385. logger.Errorln("err :", err.Error())
  386. }
  387. }
  388. //func (m *Client) Pub() {
  389. // fmt.Println("pub")
  390. // s := time.Now().String()
  391. // token := m.Client.Publish("date", 0, false, s)
  392. // if token.Wait() && token.Error() != nil {
  393. // fmt.Println(token.Error())
  394. // }
  395. //}
  396. //新希望
  397. type Xxw struct {
  398. id int64
  399. xxw_sql string
  400. }
  401. func Mq_ConsumerXxw(ctx context.Context) {
  402. var err error
  403. DqueueIndexMqtt, err = OpenIndexFile(CurrentPath+"logs", 100000, 10000, false)
  404. if err != nil {
  405. fmt.Printf("create DqueueIndex err %v", err)
  406. os.Exit(1)
  407. }
  408. fmt.Println("Mqtt Clint 启动")
  409. connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves.
  410. connOpts.SetClientID(mqtt_nodevalue)
  411. connOpts.SetMaxReconnectInterval(1 * time.Second)
  412. connOpts.SetCleanSession(mqtt_CleanSession)
  413. connOpts.AutoReconnect = true
  414. brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
  415. println("brokerURL", brokerURL)
  416. if mqtt_saslEnable {
  417. if mqtt_username != "" {
  418. connOpts.SetUsername(mqtt_username)
  419. if mqtt_saslpassword != "" {
  420. connOpts.SetPassword(mqtt_saslpassword)
  421. }
  422. }
  423. }
  424. if mqtt_tlsEnable {
  425. cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey)
  426. check(err)
  427. connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  428. brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
  429. }
  430. connOpts.AddBroker(brokerURL)
  431. connOpts.OnConnect = func(c MQTT.Client) {
  432. if token := c.Subscribe(mqtt_topic, byte(mqtt_qos), onMessageReceived); token.Wait() && token.Error() != nil {
  433. fmt.Println("panic(token.Error())", token.Error())
  434. }
  435. }
  436. label:
  437. mqttClient := MQTT.NewClient(connOpts)
  438. if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
  439. if showlog > 0 {
  440. fmt.Println("token.Error(),err", token.Error())
  441. }
  442. logger.Errorln("token.Error(),err", token.Error())
  443. time.Sleep(time.Second)
  444. goto label
  445. }
  446. log.Println("[MQTT] Connected")
  447. go func() {
  448. for {
  449. select {
  450. case <-ctx.Done():
  451. fmt.Println("mqtt 退出")
  452. logger.Errorln("mqtt 退出")
  453. return
  454. default:
  455. PopPeeks, err, full, null := DqueueIndexMqtt.PopPeeksV2(1)
  456. if err != nil {
  457. logger.Errorln("PopPeeks,err ", err)
  458. } else {
  459. // fmt.Println(PopPeeks)
  460. if len(PopPeeks) == 0 {
  461. if !full && null {
  462. DqueueIndexMqtt.PopOneIndex()
  463. }
  464. if showlog > 0 {
  465. fmt.Println("PopPeeks,null,", PopPeeks)
  466. }
  467. time.Sleep(time.Second)
  468. continue
  469. }
  470. labelDb:
  471. if mqttDb == nil {
  472. mqttDb, err = GetmyDbsConnect(user, password, host, port, tableDB)
  473. if err != nil {
  474. logger.Errorln("GetmyDbsConnect err:", err)
  475. time.Sleep(time.Second)
  476. goto labelDb
  477. }
  478. }
  479. tx, err := mqttDb.Begin()
  480. if err == nil {
  481. count := 0
  482. str := ""
  483. errstr := ""
  484. sqls := strings.Split(string(PopPeeks[0]), ";")
  485. fmt.Println(sqls, time.Now())
  486. for _, sql := range sqls {
  487. // fmt.Println(sql)
  488. //err = ExecT(tx, string(value.Content))
  489. fmt.Println(sqls, time.Now(), strings.Index(sql, "kptCattleId"+KptCattleId))
  490. logger.Errorln(sqls, time.Now(), strings.Index(sql, "kptCattleId"+KptCattleId))
  491. if sql == "" || strings.Index(sql, "kptCattleId"+KptCattleId) == -1 {
  492. continue
  493. }
  494. sql = strings.Replace(sql, "kptCattleId"+KptCattleId, "", -1)
  495. sql = strings.Replace(sql, "\\", "\\\\", -1)
  496. err = ExecT(tx, sql)
  497. if err == nil {
  498. logger.Infoln(" -- ", sql)
  499. //fmt.Println( "ExecT---" + string(value.Content))
  500. } else {
  501. logger.Errorln(err.Error() + "---" + sql)
  502. count++
  503. errstr = err.Error()
  504. //fmt.Println(err.Error() + "---" + string(value.Content))
  505. }
  506. }
  507. if count > 0 {
  508. count = 0
  509. logger.Errorln(errstr + "---" + str)
  510. }
  511. } else {
  512. if showlog > 0 {
  513. log.Println("mqttDb.Begin(),err", err)
  514. }
  515. }
  516. err = tx.Commit()
  517. if err == nil {
  518. if !full && null {
  519. for i := 0; i < len(PopPeeks)+1; i++ {
  520. _, _ = DqueueIndexMqtt.PopOneIndex()
  521. if showlog > 0 {
  522. //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()1", string(pop), err)
  523. }
  524. }
  525. } else {
  526. for i := 0; i < len(PopPeeks); i++ {
  527. _, _ = DqueueIndexMqtt.PopOneIndex()
  528. if showlog > 0 {
  529. //fmt.Println(" pop,err:= dqueuePop.PopOneIndex()2", string(pop), err)
  530. }
  531. }
  532. }
  533. } else {
  534. logger.Errorln("tx.Commit(),err", err)
  535. if showlog > 0 {
  536. fmt.Println("tx.Commit(),err", err)
  537. }
  538. }
  539. }
  540. }
  541. }
  542. }()
  543. }
  544. func Mqtt_producerXxw(ctx context.Context) { //0824数据同步
  545. fmt.Println("Mqtt Clint 启动1")
  546. // var reconnectHandler mqtt.ReconnectHandler = func(client mqtt.Client, opts *mqtt.ClientOptions) {
  547. // fmt.Println("Reconnecting to MQTT broker...")
  548. // if token := client.Connect(); token.Wait() && token.Error() != nil {
  549. // log.Fatalf("Failed to reconnect: %v", token.Error())
  550. // }
  551. // }
  552. println(user, password, host, port, tableDB)
  553. retained := flag.Bool("retained", false, "Are the messages sent with the retained flag")
  554. connOpts := MQTT.NewClientOptions() // This line is different, we use the constructor function instead of creating the instance ourselves.
  555. connOpts.SetClientID(mqtt_nodevalue)
  556. connOpts.SetMaxReconnectInterval(1 * time.Second)
  557. connOpts.SetCleanSession(mqtt_CleanSession)
  558. connOpts.AutoReconnect = true
  559. // connOpts.OnReconnecting = reconnectHandler
  560. brokerURL := fmt.Sprintf("tcp://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
  561. if mqtt_saslEnable {
  562. if mqtt_username != "" {
  563. connOpts.SetUsername(mqtt_username)
  564. if mqtt_saslpassword != "" {
  565. connOpts.SetPassword(mqtt_saslpassword)
  566. }
  567. }
  568. }
  569. logger.Info("开始")
  570. if mqtt_tlsEnable {
  571. cer, err := tls.LoadX509KeyPair(mqtt_clientcert, mqtt_clientkey)
  572. check(err)
  573. connOpts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cer}})
  574. brokerURL = fmt.Sprintf("tcps://%s:%d%s", mqtt_host, mqtt_port, mqtt_path)
  575. }
  576. connOpts.OnConnect = func(c MQTT.Client) {
  577. }
  578. connOpts.AddBroker(brokerURL)
  579. mqttClient := MQTT.NewClient(connOpts)
  580. if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
  581. panic(token.Error())
  582. }
  583. logger.Errorln("[MQTT] Connected")
  584. for {
  585. select {
  586. case <-ctx.Done():
  587. fmt.Println("mqtt 退出")
  588. return
  589. default:
  590. var err error
  591. labelDb:
  592. if mqttDb == nil {
  593. mqttDb, err = GetmyDbsConnect(user, password, host, port, tableDB)
  594. if err != nil {
  595. logger.Errorln("GetmyDbsConnect err:", err)
  596. time.Sleep(time.Second)
  597. goto labelDb
  598. }
  599. }
  600. //fmt.Println("mqttDb.Begin()")
  601. tx, err := mqttDb.Begin()
  602. xxw := Xxw{}
  603. row := tx.QueryRow("select id,xxw_sql from xxw where isused=0 order by id limit 1")
  604. if err != nil {
  605. logger.Info("Query err", err)
  606. }
  607. err = row.Scan(&xxw.id, &xxw.xxw_sql)
  608. if xxw.id != 0 && xxw.xxw_sql != "" {
  609. token := mqttClient.Publish(mqtt_topic, byte(mqtt_qos), *retained, xxw.xxw_sql)
  610. if token.Wait() && token.Error() != nil {
  611. logger.Info("Client.Publish err:--- ", err)
  612. } else {
  613. logger.Infoln("Client.Publish success")
  614. _, err = tx.Exec("update xxw set isused=1 where id=?", xxw.id)
  615. if err != nil {
  616. logger.Info("Exec err", err)
  617. }
  618. }
  619. } else {
  620. time.Sleep(time.Second)
  621. }
  622. err = tx.Commit()
  623. if err != nil {
  624. tx.Rollback()
  625. logger.Info("Commit err", err)
  626. }
  627. }
  628. }
  629. }