dqueue.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779
  1. package kpt
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/Shopify/sarama"
  7. "github.com/edsrzf/mmap-go"
  8. "github.com/satori/go.uuid"
  9. "os"
  10. "strconv"
  11. "time"
  12. )
  13. const HEADER_SIZE = 13 // 1 for version 4 for viewSize 4 for nextWriteAt 4 for nextReadAt
  14. const MAX_READ_PACKET_COUNT = 1024
  15. const MAX_PACKET_SIZE = 1024 * 63//最大64k
  16. var (
  17. LogStatu bool
  18. )
  19. type DQueue struct {
  20. // underlying disk
  21. fileObj *os.File
  22. mappedFile mmap.MMap
  23. // the uncommitted read pointer
  24. // will be committed at next pop()
  25. nextReadAt uint32
  26. lastWriteAt uint32
  27. /*
  28. view covers the readable range of body
  29. to avoid split packet in two parts to wrap around
  30. the memory will NOT be copied during read,
  31. the reader must copy out the bytes manually before goroutine switch
  32. [--- header ---][--- body ---]
  33. [--- view --]
  34. */
  35. header header
  36. body []byte
  37. view []byte
  38. // used to avoid allocation
  39. readBufferItself [][]byte
  40. dqueuePop *DQueue
  41. //dqueuePopTemp *DQueue
  42. dqueue1Index *DQueue
  43. dqueuPush *DQueue
  44. nowPath string
  45. srcpath string
  46. size int
  47. deleteChan string
  48. }
  49. type header []byte
  50. func (header header) setVersion() {
  51. header[0] = 1
  52. }
  53. func (header header) setViewSize(viewSize uint32) {
  54. binary.BigEndian.PutUint32(header[1:], viewSize)
  55. }
  56. func (header header) getViewSize() uint32 {
  57. return binary.BigEndian.Uint32(header[1:])
  58. }
  59. func (header header) setNextWriteAt(nextWriteAt uint32) {
  60. binary.BigEndian.PutUint32(header[5:], nextWriteAt)
  61. }
  62. func (header header) getNextWriteAt() uint32 {
  63. return binary.BigEndian.Uint32(header[5:])
  64. }
  65. func (mappedBytes header) setNextReadAt(nextReadAt uint32) {
  66. binary.BigEndian.PutUint32(mappedBytes[9:], nextReadAt)
  67. }
  68. func (mappedBytes header) getNextReadAt() uint32 {
  69. return binary.BigEndian.Uint32(mappedBytes[9:])
  70. }
  71. func Open(filePath string, nkiloBytes int) (*DQueue, error) {
  72. fileObj, err := os.OpenFile(filePath, os.O_RDWR, 0644)
  73. if err != nil {
  74. if os.IsNotExist(err) {
  75. fileObj, err = os.OpenFile(filePath, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644)
  76. if err != nil {
  77. return nil, err
  78. }
  79. emptyBytes := make([]byte, 1024)
  80. for i := 0; i < nkiloBytes; i++ {
  81. _, err = fileObj.Write(emptyBytes[:])
  82. if err != nil {
  83. return nil, err
  84. }
  85. }
  86. err = fileObj.Close()
  87. if err != nil {
  88. return nil, err
  89. }
  90. fileObj, err = os.OpenFile(filePath, os.O_RDWR, 0644)
  91. if err != nil {
  92. return nil, err
  93. }
  94. mappedFile, err := mmap.Map(fileObj, mmap.RDWR, 0)
  95. if err != nil {
  96. return nil, err
  97. }
  98. header := header(mappedFile[:HEADER_SIZE])
  99. header.setVersion()
  100. header.setNextWriteAt(0)
  101. header.setNextReadAt(0)
  102. body := mappedFile[HEADER_SIZE:]
  103. view := body[:0]
  104. header.setViewSize(uint32(len(view)))
  105. readBufferItself := make([][]byte, MAX_READ_PACKET_COUNT)
  106. return &DQueue{
  107. fileObj: fileObj,
  108. mappedFile: mappedFile,
  109. header: header,
  110. body: body,
  111. view: view,
  112. readBufferItself: readBufferItself,
  113. lastWriteAt : 0 ,
  114. }, nil
  115. } else {
  116. return nil, err
  117. }
  118. }else {
  119. mappedFile, err := mmap.Map(fileObj, mmap.RDWR, 0)
  120. if err != nil {
  121. return nil, err
  122. }
  123. header := header(mappedFile[:HEADER_SIZE])
  124. header.setVersion()
  125. //header.setNextWriteAt(0)
  126. //header.setNextReadAt(0)
  127. body := mappedFile[HEADER_SIZE:]
  128. view := body[:]
  129. header.setViewSize(uint32(len(view)))
  130. readBufferItself := make([][]byte, MAX_READ_PACKET_COUNT)
  131. return &DQueue{
  132. fileObj: fileObj,
  133. mappedFile: mappedFile,
  134. header: header,
  135. body: body,
  136. view: view,
  137. readBufferItself: readBufferItself,
  138. }, nil
  139. }
  140. }
  141. func (q *DQueue) Close() error {
  142. err := q.mappedFile.Unmap()
  143. if err != nil {
  144. return err
  145. }
  146. err = q.fileObj.Close()
  147. if err != nil {
  148. return err
  149. }
  150. return nil
  151. }
  152. func (q *DQueue) Pop() ([][]byte, error) {
  153. nextWriteAt := q.header.getNextWriteAt()
  154. nextReadAt := q.nextReadAt
  155. q.header.setNextReadAt(nextReadAt)
  156. packetsCount := 0
  157. fallBehind := false
  158. if nextReadAt == nextReadAt && nextWriteAt == uint32(len(q.view)) {
  159. // at tail
  160. } else if nextReadAt >= nextWriteAt {
  161. fallBehind = true
  162. }
  163. pos := nextReadAt
  164. for ; packetsCount < len(q.readBufferItself); packetsCount++ {
  165. if !fallBehind && pos >= nextWriteAt {
  166. break
  167. }
  168. viewSize := uint32(len(q.view))
  169. if pos >= viewSize {
  170. pos = 0 // wrap around
  171. fallBehind = false
  172. // the region between [nextWriteAt, tail) is now invalid
  173. q.view = q.body[:nextWriteAt]
  174. }
  175. packetSize := binary.BigEndian.Uint16(q.view[pos:pos+2])
  176. if packetSize > MAX_PACKET_SIZE {
  177. return nil, errors.New("packet is too large")
  178. }
  179. pos += 2
  180. nextPos := pos + uint32(packetSize)
  181. q.readBufferItself[packetsCount] = q.view[pos:nextPos]
  182. pos = nextPos
  183. }
  184. q.nextReadAt = pos
  185. return q.readBufferItself[:packetsCount], nil
  186. }
  187. func (q *DQueue) PopPeeksV2(count int) (readBuff [][]byte, err error,full bool,null bool){
  188. nextWriteAt := q.dqueuePop.header.getNextWriteAt()
  189. nextReadAt :=q.dqueuePop.nextReadAt
  190. //fmt.Println("nextWriteAt,nextReadAt",nextWriteAt,nextReadAt)
  191. packetsCount := 0
  192. pos := nextReadAt
  193. var readBuffer [][]byte
  194. if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name(){
  195. full = true
  196. }
  197. for ; packetsCount < count; packetsCount++ {
  198. if pos >= nextWriteAt {
  199. null = true
  200. break
  201. }
  202. //viewSize := uint32(len(q.view))
  203. //if pos >= viewSize {
  204. // pos = 0 // wrap around
  205. // // the region between [nextWriteAt, tail) is now invalid
  206. // q.view = q.body[:nextWriteAt]
  207. //}
  208. packetSize := binary.BigEndian.Uint16(q.dqueuePop.view[pos:pos+2])
  209. if packetSize > MAX_PACKET_SIZE {
  210. return nil, errors.New("packet is too large"),full,null
  211. }
  212. pos += 2
  213. nextPos := pos + uint32(packetSize)
  214. if len(q.dqueuePop.view[pos:nextPos])==0{
  215. continue
  216. }
  217. readBuffer = append(readBuffer,q.dqueuePop.view[pos:nextPos])
  218. pos = nextPos
  219. }
  220. q.nextReadAt = pos
  221. return readBuffer, nil,full,null
  222. }
  223. func (q *DQueue) PopPeeksV3(count int) (readBuff []*sarama.ProducerMessage, err error,full bool,null bool){
  224. nextWriteAt := q.dqueuePop.header.getNextWriteAt()
  225. nextReadAt :=q.dqueuePop.header.getNextReadAt()
  226. packetsCount := 0
  227. pos := nextReadAt
  228. if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name(){
  229. full = true
  230. }
  231. for ; packetsCount < count; packetsCount++ {
  232. if pos >= nextWriteAt {
  233. null = true
  234. break
  235. }
  236. //viewSize := uint32(len(q.view))
  237. //if pos >= viewSize {
  238. // pos = 0 // wrap around
  239. // // the region between [nextWriteAt, tail) is now invalid
  240. // q.view = q.body[:nextWriteAt]
  241. //}
  242. packetSize := binary.BigEndian.Uint16(q.dqueuePop.view[pos:pos+2])
  243. if packetSize > MAX_PACKET_SIZE {
  244. return nil, errors.New("packet is too large"),full,null
  245. }
  246. pos += 2
  247. nextPos := pos + uint32(packetSize)
  248. if len(q.dqueuePop.view[pos:nextPos])==0{
  249. continue
  250. }
  251. msg := &sarama.ProducerMessage{Topic: kafka_topic, Key: sarama.StringEncoder(""),
  252. Value: sarama.StringEncoder(string(q.dqueuePop.view[pos:nextPos]))}
  253. readBuff = append(readBuff,msg)
  254. pos = nextPos
  255. }
  256. q.nextReadAt = pos
  257. return readBuff, nil,full,null
  258. }
  259. //func (q *DQueue) PopPeeks(count int) ( [][]byte, error) {
  260. // if q.dqueuePop.fileObj.Name() != q.dqueuePopTemp.fileObj.Name(){
  261. // err :=q.dqueuePopTemp.Close()
  262. // if err != nil{
  263. // fmt.Println("err :=q.dqueuePopTemp.Close() err",err)
  264. // }
  265. // q.dqueuePopTemp,err = Open(q.dqueuePop.fileObj.Name(),q.size)
  266. // if err != nil {
  267. // fmt.Println("= Open err",q.dqueuePop.fileObj.Name(),err)
  268. // }
  269. // fmt.Println("q.dqueuePopTemp,err = Open ",q.dqueuePop.fileObj.Name(),err)
  270. // }
  271. // nextWriteAt := q.dqueuePopTemp.header.getNextWriteAt()
  272. // nextReadAt := q.dqueuePopTemp.header.getNextReadAt()
  273. // var readBuff [][]byte
  274. // pos := nextReadAt
  275. // for packetsCount := 0; packetsCount < count; packetsCount++ {
  276. // if pos >= nextWriteAt {
  277. // if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name(){
  278. // break
  279. // }
  280. //
  281. // popOne,err :=q.dqueue1Index.PeekTwo()
  282. // if err != nil {
  283. // if err.Error() =="file is no two peek" || err.Error() =="queue is null" {
  284. // break
  285. // }else {
  286. // fmt.Println("q.dqueue1Index.Peek() err",err)
  287. // }
  288. // }
  289. // fmt.Println("popOne,err :=q.dqueue1Index.PeekTwo()",string(popOne),err)
  290. // err =q.dqueuePopTemp.Close()
  291. // if err != nil{
  292. // fmt.Println("err :=q.dqueuePopTemp.Close() err",err)
  293. // break
  294. // }
  295. // q.dqueuePopTemp,err = Open(q.srcpath+"/"+string(popOne),q.size)
  296. // if err != nil {
  297. // fmt.Println("= Open err",q.srcpath+"/"+string(popOne),err)
  298. // break
  299. // }
  300. // nextWriteAt = q.dqueuePopTemp.header.getNextWriteAt()
  301. // if nextWriteAt == 0 {
  302. // break
  303. // }
  304. // pos = q.dqueuePopTemp.header.getNextReadAt()
  305. // }
  306. // //viewSize := uint32(len(q.dqueuePopTemp.view))
  307. // //if pos >= viewSize {
  308. // // pos = 0 // wrap around
  309. // // // the region between [nextWriteAt, tail) is now invalid
  310. // // q.dqueuePopTemp.view = q.dqueuePopTemp.body[:nextWriteAt]
  311. // //}
  312. // packetSize := binary.BigEndian.Uint16(q.dqueuePopTemp.body[pos:pos+2])
  313. // if packetSize > MAX_PACKET_SIZE {
  314. // return nil, errors.New("packet is too large : "+string(q.dqueuePopTemp.body[pos:pos+2]))
  315. // }
  316. // pos += 2
  317. // nextPos := pos + uint32(packetSize)
  318. // if int(nextPos) >=len(q.dqueuePopTemp.body){
  319. // continue
  320. // }
  321. // if len(q.dqueuePopTemp.body[pos:nextPos]) == 0{
  322. // continue
  323. // }
  324. // readBuff = append(readBuff,q.dqueuePopTemp.body[pos:nextPos])
  325. // //fmt.Println("q.dqueuePop.view[pos:nextPos]",string(q.dqueuePopTemp.view[pos:nextPos]),
  326. // // q.dqueuePopTemp.fileObj.Name())
  327. // pos = nextPos
  328. // }
  329. // q.dqueuePop.nextReadAt = pos
  330. // // fmt.Println("readBuff",readBuff)
  331. // return readBuff, nil
  332. // //}
  333. //}
  334. func (q *DQueue) PopOne() ([]byte, error) {
  335. nextWriteAt := q.header.getNextWriteAt()
  336. nextReadAt := q.header.getNextReadAt()
  337. if LogStatu {fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)}
  338. if nextReadAt >= nextWriteAt {
  339. return nil, errors.New("queue is null")
  340. }
  341. pos := nextReadAt
  342. q.view = q.body[:nextWriteAt]
  343. viewSize := uint32(len(q.view))
  344. if pos >= viewSize {
  345. pos = 0 // wrap around
  346. // the region between [nextWriteAt, tail) is now invalid
  347. q.view = q.body[:nextWriteAt]
  348. }
  349. packetSize := binary.BigEndian.Uint16(q.view[pos:pos+2])
  350. if packetSize > MAX_PACKET_SIZE {
  351. return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize)))
  352. }
  353. pos += 2
  354. nextPos := pos + uint32(packetSize)
  355. posstr:= q.view[pos:nextPos]
  356. pos = nextPos
  357. q.nextReadAt = pos
  358. q.header.setNextReadAt(pos)
  359. return posstr, nil
  360. }
  361. func (q *DQueue) Peek() ([]byte, error) {
  362. nextWriteAt := q.header.getNextWriteAt()
  363. nextReadAt := q.header.getNextReadAt()
  364. if LogStatu {fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)}
  365. if nextReadAt >= nextWriteAt {
  366. return nil, errors.New("queue is null")
  367. }
  368. pos := nextReadAt
  369. viewSize := uint32(len(q.view))
  370. if pos >= viewSize {
  371. pos = 0 // wrap around
  372. // the region between [nextWriteAt, tail) is now invalid
  373. q.view = q.body[:nextWriteAt]
  374. }
  375. packetSize := binary.BigEndian.Uint16(q.view[pos:pos+2])
  376. if packetSize > MAX_PACKET_SIZE {
  377. return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize)))
  378. }
  379. pos += 2
  380. nextPos := pos + uint32(packetSize)
  381. posstr:= q.view[pos:nextPos]
  382. pos = nextPos
  383. return posstr, nil
  384. }
  385. func (q *DQueue) PeekTwo() ([]byte, error) {
  386. nextWriteAt := q.header.getNextWriteAt()
  387. nextReadAt := q.header.getNextReadAt()
  388. if LogStatu {fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)}
  389. if nextReadAt >= nextWriteAt {
  390. //fmt.Println("queue is null")
  391. return nil, errors.New("queue is null")
  392. }
  393. pos := nextReadAt
  394. viewSize := uint32(len(q.view))
  395. if pos >= viewSize {
  396. pos = 0 // wrap around
  397. // the region between [nextWriteAt, tail) is now invalid
  398. q.view = q.body[:nextWriteAt]
  399. }
  400. packetSize := binary.BigEndian.Uint16(q.view[pos:pos+2])
  401. if packetSize > MAX_PACKET_SIZE {
  402. return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize)))
  403. }
  404. pos += 2
  405. nextPos := pos + uint32(packetSize)
  406. posstr:= q.view[pos:nextPos]
  407. pos = nextPos
  408. if pos >= q.header.getNextWriteAt() {
  409. //fmt.Println("file is no two peek")
  410. return nil, errors.New("file is no two peek")
  411. }
  412. packetSize = binary.BigEndian.Uint16(q.view[pos:pos+2])
  413. if packetSize > MAX_PACKET_SIZE {
  414. return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize)))
  415. }
  416. pos += 2
  417. nextPos = pos + uint32(packetSize)
  418. posstr = q.view[pos:nextPos]
  419. //fmt.Println("PeekTwo__posstr",string(posstr))
  420. return posstr, nil
  421. }
  422. func (q *DQueue) PeekHead() ([]byte, error) {
  423. //lastWriteAt := q.lastWriteAt
  424. //nextReadAt := q.header.getNextReadAt()
  425. //fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)
  426. pos := q.header.getNextWriteAt()
  427. //packetSize := binary.BigEndian.Uint16(q.view[pos-19:pos])
  428. //if packetSize > MAX_PACKET_SIZE {
  429. // return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize)))
  430. //}
  431. //pos += 2
  432. //nextPos := pos + uint32(packetSize)
  433. posstr:= q.view[pos-36:pos]
  434. //pos = nextPos
  435. return posstr, nil
  436. }
  437. func (q *DQueue) Push(packets [][]byte) error {
  438. pos := q.header.getNextWriteAt()
  439. if pos > uint32(len(q.body)) {
  440. return errors.New("internal error: nextWriteAt is invalid")
  441. }
  442. for _, packet := range packets {
  443. packetSize := uint16(len(packet))
  444. if packetSize > MAX_PACKET_SIZE {
  445. return errors.New("packet is too large")
  446. }
  447. viewSize := uint32(len(q.view))
  448. willWriteTo := pos + 2 + uint32(packetSize)
  449. // write range is [pos, willWriteTo)
  450. if q.nextReadAt > pos && willWriteTo > q.nextReadAt {
  451. // overflow the read
  452. q.nextReadAt = willWriteTo
  453. q.header.setNextReadAt(q.nextReadAt)
  454. }
  455. if willWriteTo > viewSize {
  456. // overflow the view
  457. if willWriteTo > uint32(len(q.body)) {
  458. // overflow the body, shrink the view
  459. q.view = q.body[:pos]
  460. pos = 0
  461. } else {
  462. // grow the view to cover
  463. q.view = q.body[:willWriteTo]
  464. }
  465. }
  466. binary.BigEndian.PutUint16(q.view[pos:pos+2], packetSize)
  467. pos += 2
  468. nextPos := pos + uint32(packetSize)
  469. copy(q.view[pos:nextPos], packet)
  470. pos = nextPos
  471. }
  472. q.header.setNextWriteAt(pos)
  473. q.header.setViewSize(uint32(len(q.view)))
  474. if LogStatu {
  475. nextWriteAt := q.header.getNextWriteAt()
  476. nextReadAt := q.header.getNextReadAt()
  477. fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)
  478. }
  479. return nil
  480. }
  481. func (q *DQueue) PushOne(packet []byte) error {
  482. pos := q.header.getNextWriteAt()
  483. if pos > uint32(len(q.body)) {
  484. return errors.New("internal error: nextWriteAt is invalid")
  485. }
  486. packetSize := uint16(len(packet))
  487. if packetSize > MAX_PACKET_SIZE {
  488. return errors.New("packet is too large")
  489. }
  490. viewSize := uint32(len(q.body))
  491. willWriteTo := pos + 2 + uint32(packetSize)
  492. // write range is [pos, willWriteTo)
  493. //if q.nextReadAt > pos && willWriteTo > q.nextReadAt {
  494. // // overflow the read
  495. // q.nextReadAt = willWriteTo
  496. // q.header.setNextReadAt(q.nextReadAt)
  497. //}
  498. if willWriteTo > viewSize {
  499. return errors.New("file is full")
  500. }
  501. binary.BigEndian.PutUint16(q.view[pos:pos+2], packetSize)
  502. pos += 2
  503. nextPos := pos + uint32(packetSize)
  504. copy(q.view[pos:nextPos], packet)
  505. pos = nextPos
  506. q.lastWriteAt = q.header.getNextWriteAt()
  507. q.header.setNextWriteAt(pos)
  508. q.view=q.body[:pos]
  509. //if LogStatu{
  510. // //nextWriteAt := q.header.getNextWriteAt()
  511. // //nextReadAt := q.header.getNextReadAt()
  512. // fmt.Println("nextWriteAt",q.lastWriteAt,string(packet))
  513. //}
  514. //nextWriteAt := q.header.getNextWriteAt()
  515. // nextReadAt := q.header.getNextReadAt()
  516. //fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)
  517. return nil
  518. }
  519. func OpenIndexFile(filepath string,size int,indexSize int,log bool) (*DQueue,error){
  520. LogStatu = log
  521. fmt.Println("=====队列路径===",filepath)
  522. dqueue1Index1,err := Open(filepath+"/index",indexSize)
  523. dqueue1Index1.srcpath = filepath
  524. dqueue1Index1.size = size
  525. dqueue1Index1.dqueue1Index = dqueue1Index1
  526. popOne ,err :=dqueue1Index1.dqueue1Index.Peek()
  527. if LogStatu {
  528. fmt.Println("=====peek",string(popOne))
  529. }
  530. if popOne==nil || err != nil{
  531. // dqueue1Index1.nowPath =strconv.Itoa(int(time.Now().UnixNano()))
  532. dqueue1Index1.nowPath = fmt.Sprintf("%s", uuid.NewV1())
  533. dqueue1Index1.dqueuePop,err = Open(dqueue1Index1.srcpath+"/"+dqueue1Index1.nowPath,size)
  534. dqueue1Index1.dqueuePop.nextReadAt = dqueue1Index1.dqueuePop.header.getNextReadAt()
  535. if err !=nil {
  536. return nil,err
  537. }
  538. err =dqueue1Index1.dqueue1Index.PushOne(
  539. []byte(dqueue1Index1.nowPath),
  540. )
  541. }else{
  542. dqueue1Index1.nowPath = string(popOne)
  543. dqueue1Index1.dqueuePop,err = Open(dqueue1Index1.srcpath+"/"+string(popOne),size)
  544. dqueue1Index1.dqueuePop.nextReadAt = dqueue1Index1.dqueuePop.header.getNextReadAt()
  545. if err !=nil {
  546. return nil,err
  547. }
  548. }
  549. peekHead ,err :=dqueue1Index1.dqueue1Index.PeekHead()
  550. if LogStatu {
  551. fmt.Println("=====peekHead",string(peekHead))
  552. }
  553. //dqueue1Index.PopOne()
  554. if peekHead==nil || err != nil{
  555. dqueue1Index1.dqueuPush =dqueue1Index1.dqueuPush
  556. }else{
  557. //dqueue1Index1.nowPath = string(popOne)
  558. dqueue1Index1.dqueuPush,err = Open(dqueue1Index1.srcpath+"/"+string(peekHead),size)
  559. if err !=nil {
  560. return nil,err
  561. }
  562. }
  563. return dqueue1Index1,err
  564. }
  565. func (q *DQueue) PushOneIndex(packet []byte) error {
  566. err := q.dqueuPush.PushOne(
  567. packet,
  568. )
  569. if err !=nil {
  570. if err.Error()=="file is full"{
  571. //if q.dqueuPush.header.getNextReadAt()==0{
  572. err :=q.dqueuPush.Close()
  573. if err != nil{
  574. if LogStatu {fmt.Println("err :=q.dqueuPush.Close() err",err)}
  575. }
  576. //}
  577. //datestr :=strconv.Itoa(int(time.Now().UnixNano()))
  578. datestr :=fmt.Sprintf("%s", uuid.NewV1())
  579. q.dqueuPush,err = Open(q.srcpath+"/"+datestr,q.size)
  580. if err !=nil {
  581. if LogStatu {fmt.Println("kpt.Open dqueue2 err",err)}
  582. return err
  583. }
  584. err = q.dqueue1Index.PushOne(
  585. []byte(datestr),
  586. )
  587. if err != nil {
  588. if err.Error()=="file is full"{
  589. //q.dqueue1Index.header.setNextWriteAt(0)
  590. //err =q.dqueue1Index.PushOne(
  591. // []byte(datestr),
  592. //)
  593. //fmt.Println("index file is full",err,[]byte(datestr))
  594. }
  595. }
  596. err =q.dqueuPush.PushOne(
  597. packet,
  598. )
  599. //dqueueChan <- dqueue2
  600. }else{
  601. return err
  602. }
  603. }
  604. if showlog >0 {
  605. fmt.Println(string(packet))
  606. }
  607. return nil
  608. }
  609. func (q *DQueue) PopOneIndex() ([]byte, error) {
  610. pop ,err := q.dqueuePop.PopOne()
  611. //fmt.Println("q.dqueuePop.header.getNextReadAt() != q.dqueuPush.header.getNextWriteAt()",q.dqueuePop.header.getNextReadAt(),q.dqueuPush.header.getNextWriteAt())
  612. if err != nil{
  613. if err.Error()=="queue is null" {
  614. if q.dqueuePop.fileObj.Name() != q.dqueuPush.fileObj.Name(){
  615. popIndex ,err1 :=q.dqueue1Index.PopOne()
  616. //fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------",pop,err,string(popIndex), q.dqueuePop.fileObj.Name())
  617. if err1 !=nil{
  618. if LogStatu {fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------",pop,err,string(popIndex))}
  619. }
  620. popOne ,err1 :=q.dqueue1Index.Peek()
  621. if popOne == nil && err1 != nil{ //索引为空
  622. if LogStatu {fmt.Println("索引为空==============")}
  623. return nil,err1
  624. }
  625. //fmt.Println("开始关闭==============")
  626. errC := q.dqueuePop.Close()
  627. if errC != nil {
  628. if LogStatu {fmt.Println("q.dqueuePop.Close() err",errC)}
  629. return nil,errC
  630. }
  631. //fmt.Println("开始删除==============")
  632. errR := os.Remove(q.srcpath+"/"+q.nowPath)
  633. if errR != nil {
  634. fmt.Println("os.Remove() err",errR)
  635. time.Sleep(100*time.Millisecond)
  636. errR := os.Rename(q.srcpath+"/"+q.nowPath,q.srcpath+"/"+"del"+q.nowPath)
  637. if errR != nil {
  638. if LogStatu {fmt.Println("os.reRename() err",errR)}}
  639. return nil,errR
  640. }else{
  641. if LogStatu {fmt.Println("file delete ---------",q.nowPath)}
  642. }
  643. fmt.Println("file delete ---------",q.nowPath)
  644. q.dqueuePop,err = Open(q.srcpath+"/"+string(popOne),q.size)
  645. if err !=nil {
  646. if LogStatu {fmt.Println("kpt.Open dqueue1 5 err",err)}
  647. return nil,err
  648. }else {
  649. q.nowPath = string(popOne)
  650. //pop ,err = q.dqueuePop.PopOne()
  651. pop ,err = q.dqueuePop.Peek()
  652. if LogStatu {
  653. fmt.Println("q.dqueuePop.fileObj.Name()",q.dqueuePop.fileObj.Name(),q.dqueuPush.fileObj.Name(),q.dqueuePop.header.getNextReadAt())
  654. fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------",pop,err,string(popOne))
  655. }
  656. }
  657. }else {
  658. return pop,err
  659. }
  660. }else {
  661. return pop,err
  662. }
  663. }else {
  664. return pop,err
  665. }
  666. return pop,nil
  667. }
  668. func (q *DQueue) CloseIndex() error {
  669. err := q.mappedFile.Unmap()
  670. if err != nil {
  671. return err
  672. }
  673. err = q.dqueue1Index.mappedFile.Unmap()
  674. if err != nil {
  675. //return err
  676. }
  677. err = q.dqueuePop.mappedFile.Unmap()
  678. if err != nil {
  679. return err
  680. }
  681. err = q.dqueuPush.mappedFile.Unmap()
  682. if err != nil {
  683. return err
  684. }
  685. err = q.fileObj.Close()
  686. if err != nil {
  687. return err
  688. }
  689. err = q.dqueue1Index.Close()
  690. if err != nil {
  691. //return err
  692. }
  693. err = q.dqueuPush.fileObj.Close()
  694. if err != nil {
  695. return err
  696. }
  697. err = q.dqueuePop.fileObj.Close()
  698. if err != nil {
  699. return err
  700. }
  701. return nil
  702. }
  703. func (q *DQueue)DeleteIndexFile( dqueuePop *DQueue){
  704. //if err1 !=nil {
  705. // //if err1.Error()=="queue is null" && (q.dqueuePop.header.getNextReadAt() != q.dqueuPush.header.getNextWriteAt() ){
  706. // // q.dqueue1Index.header.setNextReadAt(0)
  707. // // popOne ,err1 = q.dqueue1Index.Peek()
  708. // //}
  709. //}
  710. //errC := dqueuePop.Close()
  711. //if errC != nil {
  712. // fmt.Println("q.dqueuePop.Close() err",errC)
  713. // return nil,errC
  714. //}
  715. //errR := os.Remove(q.srcpath+"/"+q.nowPath)
  716. //if errR != nil {
  717. // fmt.Println("os.Remove() err",errR)
  718. // return nil,errR
  719. //}else{
  720. // fmt.Println("file delete ---------",q.nowPath)
  721. //}
  722. }