dqueue.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805
  1. package kpt
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "os"
  8. "strconv"
  9. "time"
  10. "github.com/Shopify/sarama"
  11. "github.com/edsrzf/mmap-go"
  12. uuid "github.com/satori/go.uuid"
  13. )
  14. const HEADER_SIZE = 13 // 1 for version 4 for viewSize 4 for nextWriteAt 4 for nextReadAt
  15. const MAX_READ_PACKET_COUNT = 1024
  16. const MAX_PACKET_SIZE = 1024 * 63 //最大64k
  17. var (
  18. LogStatu bool
  19. )
  20. type DQueue struct {
  21. // underlying disk
  22. fileObj *os.File
  23. mappedFile mmap.MMap
  24. // the uncommitted read pointer
  25. // will be committed at next pop()
  26. nextReadAt uint32
  27. lastWriteAt uint32
  28. /*
  29. view covers the readable range of body
  30. to avoid split packet in two parts to wrap around
  31. the memory will NOT be copied during read,
  32. the reader must copy out the bytes manually before goroutine switch
  33. [--- header ---][--- body ---]
  34. [--- view --]
  35. */
  36. header header
  37. body []byte
  38. view []byte
  39. // used to avoid allocation
  40. readBufferItself [][]byte
  41. dqueuePop *DQueue
  42. //dqueuePopTemp *DQueue
  43. dqueue1Index *DQueue
  44. dqueuPush *DQueue
  45. nowPath string
  46. srcpath string
  47. size int
  48. deleteChan string
  49. }
  50. type header []byte
  51. func (header header) setVersion() {
  52. header[0] = 1
  53. }
  54. func (header header) setViewSize(viewSize uint32) {
  55. binary.BigEndian.PutUint32(header[1:], viewSize)
  56. }
  57. func (header header) getViewSize() uint32 {
  58. return binary.BigEndian.Uint32(header[1:])
  59. }
  60. func (header header) setNextWriteAt(nextWriteAt uint32) {
  61. binary.BigEndian.PutUint32(header[5:], nextWriteAt)
  62. }
  63. func (header header) getNextWriteAt() uint32 {
  64. return binary.BigEndian.Uint32(header[5:])
  65. }
  66. func (mappedBytes header) setNextReadAt(nextReadAt uint32) {
  67. binary.BigEndian.PutUint32(mappedBytes[9:], nextReadAt)
  68. }
  69. func (mappedBytes header) getNextReadAt() uint32 {
  70. return binary.BigEndian.Uint32(mappedBytes[9:])
  71. }
  72. func Open(filePath string, nkiloBytes int) (*DQueue, error) {
  73. fileObj, err := os.OpenFile(filePath, os.O_RDWR, 0644)
  74. if err != nil {
  75. if os.IsNotExist(err) {
  76. fileObj, err = os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
  77. if err != nil {
  78. return nil, err
  79. }
  80. emptyBytes := make([]byte, 1024)
  81. for i := 0; i < nkiloBytes; i++ {
  82. _, err = fileObj.Write(emptyBytes[:])
  83. if err != nil {
  84. return nil, err
  85. }
  86. }
  87. err = fileObj.Close()
  88. if err != nil {
  89. return nil, err
  90. }
  91. fileObj, err = os.OpenFile(filePath, os.O_RDWR, 0644)
  92. if err != nil {
  93. return nil, err
  94. }
  95. mappedFile, err := mmap.Map(fileObj, mmap.RDWR, 0)
  96. if err != nil {
  97. return nil, err
  98. }
  99. header := header(mappedFile[:HEADER_SIZE])
  100. header.setVersion()
  101. header.setNextWriteAt(0)
  102. header.setNextReadAt(0)
  103. body := mappedFile[HEADER_SIZE:]
  104. view := body[:0]
  105. header.setViewSize(uint32(len(view)))
  106. readBufferItself := make([][]byte, MAX_READ_PACKET_COUNT)
  107. return &DQueue{
  108. fileObj: fileObj,
  109. mappedFile: mappedFile,
  110. header: header,
  111. body: body,
  112. view: view,
  113. readBufferItself: readBufferItself,
  114. lastWriteAt: 0,
  115. }, nil
  116. } else {
  117. return nil, err
  118. }
  119. } else {
  120. mappedFile, err := mmap.Map(fileObj, mmap.RDWR, 0)
  121. if err != nil {
  122. return nil, err
  123. }
  124. header := header(mappedFile[:HEADER_SIZE])
  125. header.setVersion()
  126. //header.setNextWriteAt(0)
  127. //header.setNextReadAt(0)
  128. body := mappedFile[HEADER_SIZE:]
  129. view := body[:]
  130. header.setViewSize(uint32(len(view)))
  131. readBufferItself := make([][]byte, MAX_READ_PACKET_COUNT)
  132. return &DQueue{
  133. fileObj: fileObj,
  134. mappedFile: mappedFile,
  135. header: header,
  136. body: body,
  137. view: view,
  138. readBufferItself: readBufferItself,
  139. }, nil
  140. }
  141. }
  142. func (q *DQueue) Close() error {
  143. err := q.mappedFile.Unmap()
  144. if err != nil {
  145. return err
  146. }
  147. err = q.fileObj.Close()
  148. if err != nil {
  149. return err
  150. }
  151. return nil
  152. }
  153. func (q *DQueue) Pop() ([][]byte, error) {
  154. nextWriteAt := q.header.getNextWriteAt()
  155. nextReadAt := q.nextReadAt
  156. q.header.setNextReadAt(nextReadAt)
  157. packetsCount := 0
  158. fallBehind := false
  159. if nextReadAt == nextReadAt && nextWriteAt == uint32(len(q.view)) {
  160. // at tail
  161. } else if nextReadAt >= nextWriteAt {
  162. fallBehind = true
  163. }
  164. pos := nextReadAt
  165. for ; packetsCount < len(q.readBufferItself); packetsCount++ {
  166. if !fallBehind && pos >= nextWriteAt {
  167. break
  168. }
  169. viewSize := uint32(len(q.view))
  170. if pos >= viewSize {
  171. pos = 0 // wrap around
  172. fallBehind = false
  173. // the region between [nextWriteAt, tail) is now invalid
  174. q.view = q.body[:nextWriteAt]
  175. }
  176. packetSize := binary.BigEndian.Uint16(q.view[pos : pos+2])
  177. if packetSize > MAX_PACKET_SIZE {
  178. return nil, errors.New("packet is too large")
  179. }
  180. pos += 2
  181. nextPos := pos + uint32(packetSize)
  182. q.readBufferItself[packetsCount] = q.view[pos:nextPos]
  183. pos = nextPos
  184. }
  185. q.nextReadAt = pos
  186. return q.readBufferItself[:packetsCount], nil
  187. }
  188. func (q *DQueue) PopPeeksV2(count int) (readBuff [][]byte, err error, full bool, null bool) {
  189. nextWriteAt := q.dqueuePop.header.getNextWriteAt()
  190. nextReadAt := q.dqueuePop.nextReadAt
  191. //fmt.Println("nextWriteAt,nextReadAt",nextWriteAt,nextReadAt)
  192. packetsCount := 0
  193. pos := nextReadAt
  194. var readBuffer [][]byte
  195. if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name() {
  196. full = true
  197. }
  198. for ; packetsCount < count; packetsCount++ {
  199. if pos >= nextWriteAt {
  200. null = true
  201. break
  202. }
  203. //viewSize := uint32(len(q.view))
  204. //if pos >= viewSize {
  205. // pos = 0 // wrap around
  206. // // the region between [nextWriteAt, tail) is now invalid
  207. // q.view = q.body[:nextWriteAt]
  208. //}
  209. packetSize := binary.BigEndian.Uint16(q.dqueuePop.view[pos : pos+2])
  210. if packetSize > MAX_PACKET_SIZE {
  211. return nil, errors.New("packet is too large"), full, null
  212. }
  213. pos += 2
  214. nextPos := pos + uint32(packetSize)
  215. if len(q.dqueuePop.view[pos:nextPos]) == 0 {
  216. continue
  217. }
  218. readBuffer = append(readBuffer, q.dqueuePop.view[pos:nextPos])
  219. pos = nextPos
  220. }
  221. q.nextReadAt = pos
  222. return readBuffer, nil, full, null
  223. }
  224. func (q *DQueue) PopPeeksV3(count int) (readBuff []*sarama.ProducerMessage, err error, full bool, null bool) {
  225. nextWriteAt := q.dqueuePop.header.getNextWriteAt()
  226. nextReadAt := q.dqueuePop.header.getNextReadAt()
  227. packetsCount := 0
  228. pos := nextReadAt
  229. if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name() {
  230. full = true
  231. }
  232. for ; packetsCount < count; packetsCount++ {
  233. if pos >= nextWriteAt {
  234. null = true
  235. break
  236. }
  237. //viewSize := uint32(len(q.view))
  238. //if pos >= viewSize {
  239. // pos = 0 // wrap around
  240. // // the region between [nextWriteAt, tail) is now invalid
  241. // q.view = q.body[:nextWriteAt]
  242. //}
  243. packetSize := binary.BigEndian.Uint16(q.dqueuePop.view[pos : pos+2])
  244. if packetSize > MAX_PACKET_SIZE {
  245. return nil, errors.New("packet is too large"), full, null
  246. }
  247. pos += 2
  248. nextPos := pos + uint32(packetSize)
  249. if len(q.dqueuePop.view[pos:nextPos]) == 0 {
  250. continue
  251. }
  252. msg := &sarama.ProducerMessage{Topic: kafka_topic, Key: sarama.StringEncoder(""),
  253. Value: sarama.StringEncoder(string(q.dqueuePop.view[pos:nextPos]))}
  254. readBuff = append(readBuff, msg)
  255. pos = nextPos
  256. }
  257. q.nextReadAt = pos
  258. return readBuff, nil, full, null
  259. }
  260. //func (q *DQueue) PopPeeks(count int) ( [][]byte, error) {
  261. // if q.dqueuePop.fileObj.Name() != q.dqueuePopTemp.fileObj.Name(){
  262. // err :=q.dqueuePopTemp.Close()
  263. // if err != nil{
  264. // fmt.Println("err :=q.dqueuePopTemp.Close() err",err)
  265. // }
  266. // q.dqueuePopTemp,err = Open(q.dqueuePop.fileObj.Name(),q.size)
  267. // if err != nil {
  268. // fmt.Println("= Open err",q.dqueuePop.fileObj.Name(),err)
  269. // }
  270. // fmt.Println("q.dqueuePopTemp,err = Open ",q.dqueuePop.fileObj.Name(),err)
  271. // }
  272. // nextWriteAt := q.dqueuePopTemp.header.getNextWriteAt()
  273. // nextReadAt := q.dqueuePopTemp.header.getNextReadAt()
  274. // var readBuff [][]byte
  275. // pos := nextReadAt
  276. // for packetsCount := 0; packetsCount < count; packetsCount++ {
  277. // if pos >= nextWriteAt {
  278. // if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name(){
  279. // break
  280. // }
  281. //
  282. // popOne,err :=q.dqueue1Index.PeekTwo()
  283. // if err != nil {
  284. // if err.Error() =="file is no two peek" || err.Error() =="queue is null" {
  285. // break
  286. // }else {
  287. // fmt.Println("q.dqueue1Index.Peek() err",err)
  288. // }
  289. // }
  290. // fmt.Println("popOne,err :=q.dqueue1Index.PeekTwo()",string(popOne),err)
  291. // err =q.dqueuePopTemp.Close()
  292. // if err != nil{
  293. // fmt.Println("err :=q.dqueuePopTemp.Close() err",err)
  294. // break
  295. // }
  296. // q.dqueuePopTemp,err = Open(q.srcpath+"/"+string(popOne),q.size)
  297. // if err != nil {
  298. // fmt.Println("= Open err",q.srcpath+"/"+string(popOne),err)
  299. // break
  300. // }
  301. // nextWriteAt = q.dqueuePopTemp.header.getNextWriteAt()
  302. // if nextWriteAt == 0 {
  303. // break
  304. // }
  305. // pos = q.dqueuePopTemp.header.getNextReadAt()
  306. // }
  307. // //viewSize := uint32(len(q.dqueuePopTemp.view))
  308. // //if pos >= viewSize {
  309. // // pos = 0 // wrap around
  310. // // // the region between [nextWriteAt, tail) is now invalid
  311. // // q.dqueuePopTemp.view = q.dqueuePopTemp.body[:nextWriteAt]
  312. // //}
  313. // packetSize := binary.BigEndian.Uint16(q.dqueuePopTemp.body[pos:pos+2])
  314. // if packetSize > MAX_PACKET_SIZE {
  315. // return nil, errors.New("packet is too large : "+string(q.dqueuePopTemp.body[pos:pos+2]))
  316. // }
  317. // pos += 2
  318. // nextPos := pos + uint32(packetSize)
  319. // if int(nextPos) >=len(q.dqueuePopTemp.body){
  320. // continue
  321. // }
  322. // if len(q.dqueuePopTemp.body[pos:nextPos]) == 0{
  323. // continue
  324. // }
  325. // readBuff = append(readBuff,q.dqueuePopTemp.body[pos:nextPos])
  326. // //fmt.Println("q.dqueuePop.view[pos:nextPos]",string(q.dqueuePopTemp.view[pos:nextPos]),
  327. // // q.dqueuePopTemp.fileObj.Name())
  328. // pos = nextPos
  329. // }
  330. // q.dqueuePop.nextReadAt = pos
  331. // // fmt.Println("readBuff",readBuff)
  332. // return readBuff, nil
  333. // //}
  334. //}
  335. func (q *DQueue) PopOne() ([]byte, error) {
  336. nextWriteAt := q.header.getNextWriteAt()
  337. nextReadAt := q.header.getNextReadAt()
  338. if LogStatu {
  339. fmt.Println("nextWriteAt", nextWriteAt, nextReadAt)
  340. }
  341. if nextReadAt >= nextWriteAt {
  342. return nil, errors.New("queue is null")
  343. }
  344. pos := nextReadAt
  345. q.view = q.body[:nextWriteAt]
  346. viewSize := uint32(len(q.view))
  347. if pos >= viewSize {
  348. pos = 0 // wrap around
  349. // the region between [nextWriteAt, tail) is now invalid
  350. q.view = q.body[:nextWriteAt]
  351. }
  352. packetSize := binary.BigEndian.Uint16(q.view[pos : pos+2])
  353. if packetSize > MAX_PACKET_SIZE {
  354. return nil, errors.New("packet is too large" + strconv.Itoa(int(packetSize)))
  355. }
  356. pos += 2
  357. nextPos := pos + uint32(packetSize)
  358. posstr := q.view[pos:nextPos]
  359. pos = nextPos
  360. q.nextReadAt = pos
  361. q.header.setNextReadAt(pos)
  362. return posstr, nil
  363. }
  364. func (q *DQueue) Peek() ([]byte, error) {
  365. nextWriteAt := q.header.getNextWriteAt()
  366. nextReadAt := q.header.getNextReadAt()
  367. if LogStatu {
  368. fmt.Println("nextWriteAt", nextWriteAt, nextReadAt)
  369. }
  370. if nextReadAt >= nextWriteAt {
  371. return nil, errors.New("queue is null")
  372. }
  373. pos := nextReadAt
  374. viewSize := uint32(len(q.view))
  375. if pos >= viewSize {
  376. pos = 0 // wrap around
  377. // the region between [nextWriteAt, tail) is now invalid
  378. q.view = q.body[:nextWriteAt]
  379. }
  380. packetSize := binary.BigEndian.Uint16(q.view[pos : pos+2])
  381. if packetSize > MAX_PACKET_SIZE {
  382. return nil, errors.New("packet is too large" + strconv.Itoa(int(packetSize)))
  383. }
  384. pos += 2
  385. nextPos := pos + uint32(packetSize)
  386. posstr := q.view[pos:nextPos]
  387. pos = nextPos
  388. return posstr, nil
  389. }
  390. func (q *DQueue) PeekTwo() ([]byte, error) {
  391. nextWriteAt := q.header.getNextWriteAt()
  392. nextReadAt := q.header.getNextReadAt()
  393. if LogStatu {
  394. fmt.Println("nextWriteAt", nextWriteAt, nextReadAt)
  395. }
  396. if nextReadAt >= nextWriteAt {
  397. //fmt.Println("queue is null")
  398. return nil, errors.New("queue is null")
  399. }
  400. pos := nextReadAt
  401. viewSize := uint32(len(q.view))
  402. if pos >= viewSize {
  403. pos = 0 // wrap around
  404. // the region between [nextWriteAt, tail) is now invalid
  405. q.view = q.body[:nextWriteAt]
  406. }
  407. packetSize := binary.BigEndian.Uint16(q.view[pos : pos+2])
  408. if packetSize > MAX_PACKET_SIZE {
  409. return nil, errors.New("packet is too large" + strconv.Itoa(int(packetSize)))
  410. }
  411. pos += 2
  412. nextPos := pos + uint32(packetSize)
  413. posstr := q.view[pos:nextPos]
  414. pos = nextPos
  415. if pos >= q.header.getNextWriteAt() {
  416. //fmt.Println("file is no two peek")
  417. return nil, errors.New("file is no two peek")
  418. }
  419. packetSize = binary.BigEndian.Uint16(q.view[pos : pos+2])
  420. if packetSize > MAX_PACKET_SIZE {
  421. return nil, errors.New("packet is too large" + strconv.Itoa(int(packetSize)))
  422. }
  423. pos += 2
  424. nextPos = pos + uint32(packetSize)
  425. posstr = q.view[pos:nextPos]
  426. //fmt.Println("PeekTwo__posstr",string(posstr))
  427. return posstr, nil
  428. }
  429. func (q *DQueue) PeekHead() ([]byte, error) {
  430. //lastWriteAt := q.lastWriteAt
  431. //nextReadAt := q.header.getNextReadAt()
  432. //fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)
  433. pos := q.header.getNextWriteAt()
  434. //packetSize := binary.BigEndian.Uint16(q.view[pos-19:pos])
  435. //if packetSize > MAX_PACKET_SIZE {
  436. // return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize)))
  437. //}
  438. //pos += 2
  439. //nextPos := pos + uint32(packetSize)
  440. posstr := q.view[pos-36 : pos]
  441. //pos = nextPos
  442. return posstr, nil
  443. }
  444. func (q *DQueue) Push(packets [][]byte) error {
  445. pos := q.header.getNextWriteAt()
  446. if pos > uint32(len(q.body)) {
  447. return errors.New("internal error: nextWriteAt is invalid")
  448. }
  449. for _, packet := range packets {
  450. packetSize := uint16(len(packet))
  451. if packetSize > MAX_PACKET_SIZE {
  452. return errors.New("packet is too large")
  453. }
  454. viewSize := uint32(len(q.view))
  455. willWriteTo := pos + 2 + uint32(packetSize)
  456. // write range is [pos, willWriteTo)
  457. if q.nextReadAt > pos && willWriteTo > q.nextReadAt {
  458. // overflow the read
  459. q.nextReadAt = willWriteTo
  460. q.header.setNextReadAt(q.nextReadAt)
  461. }
  462. if willWriteTo > viewSize {
  463. // overflow the view
  464. if willWriteTo > uint32(len(q.body)) {
  465. // overflow the body, shrink the view
  466. q.view = q.body[:pos]
  467. pos = 0
  468. } else {
  469. // grow the view to cover
  470. q.view = q.body[:willWriteTo]
  471. }
  472. }
  473. binary.BigEndian.PutUint16(q.view[pos:pos+2], packetSize)
  474. pos += 2
  475. nextPos := pos + uint32(packetSize)
  476. copy(q.view[pos:nextPos], packet)
  477. pos = nextPos
  478. }
  479. q.header.setNextWriteAt(pos)
  480. q.header.setViewSize(uint32(len(q.view)))
  481. if LogStatu {
  482. nextWriteAt := q.header.getNextWriteAt()
  483. nextReadAt := q.header.getNextReadAt()
  484. fmt.Println("nextWriteAt", nextWriteAt, nextReadAt)
  485. }
  486. return nil
  487. }
  488. func (q *DQueue) PushOne(packet []byte) error {
  489. pos := q.header.getNextWriteAt()
  490. if pos > uint32(len(q.body)) {
  491. return errors.New("internal error: nextWriteAt is invalid")
  492. }
  493. packetSize := uint16(len(packet))
  494. if packetSize > MAX_PACKET_SIZE {
  495. return errors.New("packet is too large")
  496. }
  497. viewSize := uint32(len(q.body))
  498. willWriteTo := pos + 2 + uint32(packetSize)
  499. // write range is [pos, willWriteTo)
  500. //if q.nextReadAt > pos && willWriteTo > q.nextReadAt {
  501. // // overflow the read
  502. // q.nextReadAt = willWriteTo
  503. // q.header.setNextReadAt(q.nextReadAt)
  504. //}
  505. if willWriteTo > viewSize {
  506. return errors.New("file is full")
  507. }
  508. binary.BigEndian.PutUint16(q.view[pos:pos+2], packetSize)
  509. pos += 2
  510. nextPos := pos + uint32(packetSize)
  511. copy(q.view[pos:nextPos], packet)
  512. pos = nextPos
  513. q.lastWriteAt = q.header.getNextWriteAt()
  514. q.header.setNextWriteAt(pos)
  515. q.view = q.body[:pos]
  516. //if LogStatu{
  517. // //nextWriteAt := q.header.getNextWriteAt()
  518. // //nextReadAt := q.header.getNextReadAt()
  519. // fmt.Println("nextWriteAt",q.lastWriteAt,string(packet))
  520. //}
  521. //nextWriteAt := q.header.getNextWriteAt()
  522. // nextReadAt := q.header.getNextReadAt()
  523. //fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)
  524. return nil
  525. }
  526. func OpenIndexFile(filepath string, size int, indexSize int, log bool) (*DQueue, error) {
  527. LogStatu = log
  528. fmt.Println("=====队列路径===", filepath)
  529. dqueue1Index1, err := Open(filepath+"/index", indexSize)
  530. fmt.Println(filepath)
  531. dqueue1Index1.srcpath = filepath
  532. dqueue1Index1.size = size
  533. dqueue1Index1.dqueue1Index = dqueue1Index1
  534. popOne, err := dqueue1Index1.dqueue1Index.Peek()
  535. if LogStatu {
  536. fmt.Println("=====peek", string(popOne))
  537. }
  538. if popOne == nil || err != nil {
  539. // dqueue1Index1.nowPath =strconv.Itoa(int(time.Now().UnixNano()))
  540. dqueue1Index1.nowPath = fmt.Sprintf("%s", uuid.NewV1())
  541. dqueue1Index1.dqueuePop, err = Open(dqueue1Index1.srcpath+"/"+dqueue1Index1.nowPath, size)
  542. dqueue1Index1.dqueuePop.nextReadAt = dqueue1Index1.dqueuePop.header.getNextReadAt()
  543. if err != nil {
  544. return nil, err
  545. }
  546. err = dqueue1Index1.dqueue1Index.PushOne(
  547. []byte(dqueue1Index1.nowPath),
  548. )
  549. } else {
  550. dqueue1Index1.nowPath = string(popOne)
  551. dqueue1Index1.dqueuePop, err = Open(dqueue1Index1.srcpath+"/"+string(popOne), size)
  552. dqueue1Index1.dqueuePop.nextReadAt = dqueue1Index1.dqueuePop.header.getNextReadAt()
  553. if err != nil {
  554. return nil, err
  555. }
  556. }
  557. peekHead, err := dqueue1Index1.dqueue1Index.PeekHead()
  558. if LogStatu {
  559. fmt.Println("=====peekHead", string(peekHead))
  560. }
  561. //dqueue1Index.PopOne()
  562. if peekHead == nil || err != nil {
  563. dqueue1Index1.dqueuPush = dqueue1Index1.dqueuPush
  564. } else {
  565. //dqueue1Index1.nowPath = string(popOne)
  566. dqueue1Index1.dqueuPush, err = Open(dqueue1Index1.srcpath+"/"+string(peekHead), size)
  567. if err != nil {
  568. return nil, err
  569. }
  570. }
  571. return dqueue1Index1, err
  572. }
  573. func (q *DQueue) PushOneIndex(packet []byte) error {
  574. err := q.dqueuPush.PushOne(
  575. packet,
  576. )
  577. if err != nil {
  578. if err.Error() == "file is full" {
  579. //if q.dqueuPush.header.getNextReadAt()==0{
  580. err := q.dqueuPush.Close()
  581. if err != nil {
  582. if LogStatu {
  583. fmt.Println("err :=q.dqueuPush.Close() err", err)
  584. }
  585. }
  586. //}
  587. //datestr :=strconv.Itoa(int(time.Now().UnixNano()))
  588. files, _ := ioutil.ReadDir(q.srcpath + "/")
  589. if len(files) > 15 {
  590. panic("日志文件过多!")
  591. }
  592. fmt.Println(len(files))
  593. datestr := fmt.Sprintf("%s", uuid.NewV1())
  594. q.dqueuPush, err = Open(q.srcpath+"/"+datestr, q.size)
  595. if err != nil {
  596. if LogStatu {
  597. fmt.Println("kpt.Open dqueue2 err", err)
  598. }
  599. return err
  600. }
  601. err = q.dqueue1Index.PushOne(
  602. []byte(datestr),
  603. )
  604. if err != nil {
  605. if err.Error() == "file is full" {
  606. //q.dqueue1Index.header.setNextWriteAt(0)
  607. //err =q.dqueue1Index.PushOne(
  608. // []byte(datestr),
  609. //)
  610. //fmt.Println("index file is full",err,[]byte(datestr))
  611. }
  612. }
  613. err = q.dqueuPush.PushOne(
  614. packet,
  615. )
  616. //dqueueChan <- dqueue2
  617. } else {
  618. return err
  619. }
  620. }
  621. if showlog > 0 {
  622. // fmt.Println(string(packet))
  623. }
  624. return nil
  625. }
  626. func (q *DQueue) PopOneIndex() ([]byte, error) {
  627. pop, err := q.dqueuePop.PopOne()
  628. //fmt.Println("q.dqueuePop.header.getNextReadAt() != q.dqueuPush.header.getNextWriteAt()",q.dqueuePop.header.getNextReadAt(),q.dqueuPush.header.getNextWriteAt())
  629. if err != nil {
  630. if err.Error() == "queue is null" {
  631. if q.dqueuePop.fileObj.Name() != q.dqueuPush.fileObj.Name() {
  632. popIndex, err1 := q.dqueue1Index.PopOne()
  633. //fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------",pop,err,string(popIndex), q.dqueuePop.fileObj.Name())
  634. if err1 != nil {
  635. if LogStatu {
  636. fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------", pop, err, string(popIndex))
  637. }
  638. }
  639. popOne, err1 := q.dqueue1Index.Peek()
  640. if popOne == nil && err1 != nil { //索引为空
  641. if LogStatu {
  642. fmt.Println("索引为空==============")
  643. }
  644. return nil, err1
  645. }
  646. //fmt.Println("开始关闭==============")
  647. errC := q.dqueuePop.Close()
  648. if errC != nil {
  649. if LogStatu {
  650. fmt.Println("q.dqueuePop.Close() err", errC)
  651. }
  652. return nil, errC
  653. }
  654. //fmt.Println("开始删除==============")
  655. errR := os.Remove(q.srcpath + "/" + q.nowPath)
  656. if errR != nil {
  657. fmt.Println("os.Remove() err", errR)
  658. time.Sleep(100 * time.Millisecond)
  659. errR := os.Rename(q.srcpath+"/"+q.nowPath, q.srcpath+"/"+"del"+q.nowPath)
  660. if errR != nil {
  661. if LogStatu {
  662. fmt.Println("os.reRename() err", errR)
  663. }
  664. }
  665. return nil, errR
  666. } else {
  667. if LogStatu {
  668. fmt.Println("file delete ---------", q.nowPath)
  669. }
  670. }
  671. fmt.Println("file delete ---------", q.nowPath)
  672. q.dqueuePop, err = Open(q.srcpath+"/"+string(popOne), q.size)
  673. if err != nil {
  674. if LogStatu {
  675. fmt.Println("kpt.Open dqueue1 5 err", err)
  676. }
  677. return nil, err
  678. } else {
  679. q.nowPath = string(popOne)
  680. //pop ,err = q.dqueuePop.PopOne()
  681. pop, err = q.dqueuePop.Peek()
  682. if LogStatu {
  683. fmt.Println("q.dqueuePop.fileObj.Name()", q.dqueuePop.fileObj.Name(), q.dqueuPush.fileObj.Name(), q.dqueuePop.header.getNextReadAt())
  684. fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------", pop, err, string(popOne))
  685. }
  686. }
  687. } else {
  688. return pop, err
  689. }
  690. } else {
  691. return pop, err
  692. }
  693. } else {
  694. return pop, err
  695. }
  696. return pop, nil
  697. }
  698. func (q *DQueue) CloseIndex() error {
  699. err := q.mappedFile.Unmap()
  700. if err != nil {
  701. return err
  702. }
  703. err = q.dqueue1Index.mappedFile.Unmap()
  704. if err != nil {
  705. //return err
  706. }
  707. err = q.dqueuePop.mappedFile.Unmap()
  708. if err != nil {
  709. return err
  710. }
  711. err = q.dqueuPush.mappedFile.Unmap()
  712. if err != nil {
  713. return err
  714. }
  715. err = q.fileObj.Close()
  716. if err != nil {
  717. return err
  718. }
  719. err = q.dqueue1Index.Close()
  720. if err != nil {
  721. //return err
  722. }
  723. err = q.dqueuPush.fileObj.Close()
  724. if err != nil {
  725. return err
  726. }
  727. err = q.dqueuePop.fileObj.Close()
  728. if err != nil {
  729. return err
  730. }
  731. return nil
  732. }
  733. func (q *DQueue) DeleteIndexFile(dqueuePop *DQueue) {
  734. //if err1 !=nil {
  735. // //if err1.Error()=="queue is null" && (q.dqueuePop.header.getNextReadAt() != q.dqueuPush.header.getNextWriteAt() ){
  736. // // q.dqueue1Index.header.setNextReadAt(0)
  737. // // popOne ,err1 = q.dqueue1Index.Peek()
  738. // //}
  739. //}
  740. //errC := dqueuePop.Close()
  741. //if errC != nil {
  742. // fmt.Println("q.dqueuePop.Close() err",errC)
  743. // return nil,errC
  744. //}
  745. //errR := os.Remove(q.srcpath+"/"+q.nowPath)
  746. //if errR != nil {
  747. // fmt.Println("os.Remove() err",errR)
  748. // return nil,errR
  749. //}else{
  750. // fmt.Println("file delete ---------",q.nowPath)
  751. //}
  752. }