123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805 |
- package kpt
- import (
- "encoding/binary"
- "errors"
- "fmt"
- "io/ioutil"
- "os"
- "strconv"
- "time"
- "github.com/Shopify/sarama"
- "github.com/edsrzf/mmap-go"
- uuid "github.com/satori/go.uuid"
- )
- const HEADER_SIZE = 13 // 1 for version 4 for viewSize 4 for nextWriteAt 4 for nextReadAt
- const MAX_READ_PACKET_COUNT = 1024
- const MAX_PACKET_SIZE = 1024 * 63 //最大64k
- var (
- LogStatu bool
- )
- type DQueue struct {
- // underlying disk
- fileObj *os.File
- mappedFile mmap.MMap
- // the uncommitted read pointer
- // will be committed at next pop()
- nextReadAt uint32
- lastWriteAt uint32
- /*
- view covers the readable range of body
- to avoid split packet in two parts to wrap around
- the memory will NOT be copied during read,
- the reader must copy out the bytes manually before goroutine switch
- [--- header ---][--- body ---]
- [--- view --]
- */
- header header
- body []byte
- view []byte
- // used to avoid allocation
- readBufferItself [][]byte
- dqueuePop *DQueue
- //dqueuePopTemp *DQueue
- dqueue1Index *DQueue
- dqueuPush *DQueue
- nowPath string
- srcpath string
- size int
- deleteChan string
- }
- type header []byte
- func (header header) setVersion() {
- header[0] = 1
- }
- func (header header) setViewSize(viewSize uint32) {
- binary.BigEndian.PutUint32(header[1:], viewSize)
- }
- func (header header) getViewSize() uint32 {
- return binary.BigEndian.Uint32(header[1:])
- }
- func (header header) setNextWriteAt(nextWriteAt uint32) {
- binary.BigEndian.PutUint32(header[5:], nextWriteAt)
- }
- func (header header) getNextWriteAt() uint32 {
- return binary.BigEndian.Uint32(header[5:])
- }
- func (mappedBytes header) setNextReadAt(nextReadAt uint32) {
- binary.BigEndian.PutUint32(mappedBytes[9:], nextReadAt)
- }
- func (mappedBytes header) getNextReadAt() uint32 {
- return binary.BigEndian.Uint32(mappedBytes[9:])
- }
- func Open(filePath string, nkiloBytes int) (*DQueue, error) {
- fileObj, err := os.OpenFile(filePath, os.O_RDWR, 0644)
- if err != nil {
- if os.IsNotExist(err) {
- fileObj, err = os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- return nil, err
- }
- emptyBytes := make([]byte, 1024)
- for i := 0; i < nkiloBytes; i++ {
- _, err = fileObj.Write(emptyBytes[:])
- if err != nil {
- return nil, err
- }
- }
- err = fileObj.Close()
- if err != nil {
- return nil, err
- }
- fileObj, err = os.OpenFile(filePath, os.O_RDWR, 0644)
- if err != nil {
- return nil, err
- }
- mappedFile, err := mmap.Map(fileObj, mmap.RDWR, 0)
- if err != nil {
- return nil, err
- }
- header := header(mappedFile[:HEADER_SIZE])
- header.setVersion()
- header.setNextWriteAt(0)
- header.setNextReadAt(0)
- body := mappedFile[HEADER_SIZE:]
- view := body[:0]
- header.setViewSize(uint32(len(view)))
- readBufferItself := make([][]byte, MAX_READ_PACKET_COUNT)
- return &DQueue{
- fileObj: fileObj,
- mappedFile: mappedFile,
- header: header,
- body: body,
- view: view,
- readBufferItself: readBufferItself,
- lastWriteAt: 0,
- }, nil
- } else {
- return nil, err
- }
- } else {
- mappedFile, err := mmap.Map(fileObj, mmap.RDWR, 0)
- if err != nil {
- return nil, err
- }
- header := header(mappedFile[:HEADER_SIZE])
- header.setVersion()
- //header.setNextWriteAt(0)
- //header.setNextReadAt(0)
- body := mappedFile[HEADER_SIZE:]
- view := body[:]
- header.setViewSize(uint32(len(view)))
- readBufferItself := make([][]byte, MAX_READ_PACKET_COUNT)
- return &DQueue{
- fileObj: fileObj,
- mappedFile: mappedFile,
- header: header,
- body: body,
- view: view,
- readBufferItself: readBufferItself,
- }, nil
- }
- }
- func (q *DQueue) Close() error {
- err := q.mappedFile.Unmap()
- if err != nil {
- return err
- }
- err = q.fileObj.Close()
- if err != nil {
- return err
- }
- return nil
- }
- func (q *DQueue) Pop() ([][]byte, error) {
- nextWriteAt := q.header.getNextWriteAt()
- nextReadAt := q.nextReadAt
- q.header.setNextReadAt(nextReadAt)
- packetsCount := 0
- fallBehind := false
- if nextReadAt == nextReadAt && nextWriteAt == uint32(len(q.view)) {
- // at tail
- } else if nextReadAt >= nextWriteAt {
- fallBehind = true
- }
- pos := nextReadAt
- for ; packetsCount < len(q.readBufferItself); packetsCount++ {
- if !fallBehind && pos >= nextWriteAt {
- break
- }
- viewSize := uint32(len(q.view))
- if pos >= viewSize {
- pos = 0 // wrap around
- fallBehind = false
- // the region between [nextWriteAt, tail) is now invalid
- q.view = q.body[:nextWriteAt]
- }
- packetSize := binary.BigEndian.Uint16(q.view[pos : pos+2])
- if packetSize > MAX_PACKET_SIZE {
- return nil, errors.New("packet is too large")
- }
- pos += 2
- nextPos := pos + uint32(packetSize)
- q.readBufferItself[packetsCount] = q.view[pos:nextPos]
- pos = nextPos
- }
- q.nextReadAt = pos
- return q.readBufferItself[:packetsCount], nil
- }
- func (q *DQueue) PopPeeksV2(count int) (readBuff [][]byte, err error, full bool, null bool) {
- nextWriteAt := q.dqueuePop.header.getNextWriteAt()
- nextReadAt := q.dqueuePop.nextReadAt
- //fmt.Println("nextWriteAt,nextReadAt",nextWriteAt,nextReadAt)
- packetsCount := 0
- pos := nextReadAt
- var readBuffer [][]byte
- if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name() {
- full = true
- }
- for ; packetsCount < count; packetsCount++ {
- if pos >= nextWriteAt {
- null = true
- break
- }
- //viewSize := uint32(len(q.view))
- //if pos >= viewSize {
- // pos = 0 // wrap around
- // // the region between [nextWriteAt, tail) is now invalid
- // q.view = q.body[:nextWriteAt]
- //}
- packetSize := binary.BigEndian.Uint16(q.dqueuePop.view[pos : pos+2])
- if packetSize > MAX_PACKET_SIZE {
- return nil, errors.New("packet is too large"), full, null
- }
- pos += 2
- nextPos := pos + uint32(packetSize)
- if len(q.dqueuePop.view[pos:nextPos]) == 0 {
- continue
- }
- readBuffer = append(readBuffer, q.dqueuePop.view[pos:nextPos])
- pos = nextPos
- }
- q.nextReadAt = pos
- return readBuffer, nil, full, null
- }
- func (q *DQueue) PopPeeksV3(count int) (readBuff []*sarama.ProducerMessage, err error, full bool, null bool) {
- nextWriteAt := q.dqueuePop.header.getNextWriteAt()
- nextReadAt := q.dqueuePop.header.getNextReadAt()
- packetsCount := 0
- pos := nextReadAt
- if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name() {
- full = true
- }
- for ; packetsCount < count; packetsCount++ {
- if pos >= nextWriteAt {
- null = true
- break
- }
- //viewSize := uint32(len(q.view))
- //if pos >= viewSize {
- // pos = 0 // wrap around
- // // the region between [nextWriteAt, tail) is now invalid
- // q.view = q.body[:nextWriteAt]
- //}
- packetSize := binary.BigEndian.Uint16(q.dqueuePop.view[pos : pos+2])
- if packetSize > MAX_PACKET_SIZE {
- return nil, errors.New("packet is too large"), full, null
- }
- pos += 2
- nextPos := pos + uint32(packetSize)
- if len(q.dqueuePop.view[pos:nextPos]) == 0 {
- continue
- }
- msg := &sarama.ProducerMessage{Topic: kafka_topic, Key: sarama.StringEncoder(""),
- Value: sarama.StringEncoder(string(q.dqueuePop.view[pos:nextPos]))}
- readBuff = append(readBuff, msg)
- pos = nextPos
- }
- q.nextReadAt = pos
- return readBuff, nil, full, null
- }
- //func (q *DQueue) PopPeeks(count int) ( [][]byte, error) {
- // if q.dqueuePop.fileObj.Name() != q.dqueuePopTemp.fileObj.Name(){
- // err :=q.dqueuePopTemp.Close()
- // if err != nil{
- // fmt.Println("err :=q.dqueuePopTemp.Close() err",err)
- // }
- // q.dqueuePopTemp,err = Open(q.dqueuePop.fileObj.Name(),q.size)
- // if err != nil {
- // fmt.Println("= Open err",q.dqueuePop.fileObj.Name(),err)
- // }
- // fmt.Println("q.dqueuePopTemp,err = Open ",q.dqueuePop.fileObj.Name(),err)
- // }
- // nextWriteAt := q.dqueuePopTemp.header.getNextWriteAt()
- // nextReadAt := q.dqueuePopTemp.header.getNextReadAt()
- // var readBuff [][]byte
- // pos := nextReadAt
- // for packetsCount := 0; packetsCount < count; packetsCount++ {
- // if pos >= nextWriteAt {
- // if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name(){
- // break
- // }
- //
- // popOne,err :=q.dqueue1Index.PeekTwo()
- // if err != nil {
- // if err.Error() =="file is no two peek" || err.Error() =="queue is null" {
- // break
- // }else {
- // fmt.Println("q.dqueue1Index.Peek() err",err)
- // }
- // }
- // fmt.Println("popOne,err :=q.dqueue1Index.PeekTwo()",string(popOne),err)
- // err =q.dqueuePopTemp.Close()
- // if err != nil{
- // fmt.Println("err :=q.dqueuePopTemp.Close() err",err)
- // break
- // }
- // q.dqueuePopTemp,err = Open(q.srcpath+"/"+string(popOne),q.size)
- // if err != nil {
- // fmt.Println("= Open err",q.srcpath+"/"+string(popOne),err)
- // break
- // }
- // nextWriteAt = q.dqueuePopTemp.header.getNextWriteAt()
- // if nextWriteAt == 0 {
- // break
- // }
- // pos = q.dqueuePopTemp.header.getNextReadAt()
- // }
- // //viewSize := uint32(len(q.dqueuePopTemp.view))
- // //if pos >= viewSize {
- // // pos = 0 // wrap around
- // // // the region between [nextWriteAt, tail) is now invalid
- // // q.dqueuePopTemp.view = q.dqueuePopTemp.body[:nextWriteAt]
- // //}
- // packetSize := binary.BigEndian.Uint16(q.dqueuePopTemp.body[pos:pos+2])
- // if packetSize > MAX_PACKET_SIZE {
- // return nil, errors.New("packet is too large : "+string(q.dqueuePopTemp.body[pos:pos+2]))
- // }
- // pos += 2
- // nextPos := pos + uint32(packetSize)
- // if int(nextPos) >=len(q.dqueuePopTemp.body){
- // continue
- // }
- // if len(q.dqueuePopTemp.body[pos:nextPos]) == 0{
- // continue
- // }
- // readBuff = append(readBuff,q.dqueuePopTemp.body[pos:nextPos])
- // //fmt.Println("q.dqueuePop.view[pos:nextPos]",string(q.dqueuePopTemp.view[pos:nextPos]),
- // // q.dqueuePopTemp.fileObj.Name())
- // pos = nextPos
- // }
- // q.dqueuePop.nextReadAt = pos
- // // fmt.Println("readBuff",readBuff)
- // return readBuff, nil
- // //}
- //}
- func (q *DQueue) PopOne() ([]byte, error) {
- nextWriteAt := q.header.getNextWriteAt()
- nextReadAt := q.header.getNextReadAt()
- if LogStatu {
- fmt.Println("nextWriteAt", nextWriteAt, nextReadAt)
- }
- if nextReadAt >= nextWriteAt {
- return nil, errors.New("queue is null")
- }
- pos := nextReadAt
- q.view = q.body[:nextWriteAt]
- viewSize := uint32(len(q.view))
- if pos >= viewSize {
- pos = 0 // wrap around
- // the region between [nextWriteAt, tail) is now invalid
- q.view = q.body[:nextWriteAt]
- }
- packetSize := binary.BigEndian.Uint16(q.view[pos : pos+2])
- if packetSize > MAX_PACKET_SIZE {
- return nil, errors.New("packet is too large" + strconv.Itoa(int(packetSize)))
- }
- pos += 2
- nextPos := pos + uint32(packetSize)
- posstr := q.view[pos:nextPos]
- pos = nextPos
- q.nextReadAt = pos
- q.header.setNextReadAt(pos)
- return posstr, nil
- }
- func (q *DQueue) Peek() ([]byte, error) {
- nextWriteAt := q.header.getNextWriteAt()
- nextReadAt := q.header.getNextReadAt()
- if LogStatu {
- fmt.Println("nextWriteAt", nextWriteAt, nextReadAt)
- }
- if nextReadAt >= nextWriteAt {
- return nil, errors.New("queue is null")
- }
- pos := nextReadAt
- viewSize := uint32(len(q.view))
- if pos >= viewSize {
- pos = 0 // wrap around
- // the region between [nextWriteAt, tail) is now invalid
- q.view = q.body[:nextWriteAt]
- }
- packetSize := binary.BigEndian.Uint16(q.view[pos : pos+2])
- if packetSize > MAX_PACKET_SIZE {
- return nil, errors.New("packet is too large" + strconv.Itoa(int(packetSize)))
- }
- pos += 2
- nextPos := pos + uint32(packetSize)
- posstr := q.view[pos:nextPos]
- pos = nextPos
- return posstr, nil
- }
- func (q *DQueue) PeekTwo() ([]byte, error) {
- nextWriteAt := q.header.getNextWriteAt()
- nextReadAt := q.header.getNextReadAt()
- if LogStatu {
- fmt.Println("nextWriteAt", nextWriteAt, nextReadAt)
- }
- if nextReadAt >= nextWriteAt {
- //fmt.Println("queue is null")
- return nil, errors.New("queue is null")
- }
- pos := nextReadAt
- viewSize := uint32(len(q.view))
- if pos >= viewSize {
- pos = 0 // wrap around
- // the region between [nextWriteAt, tail) is now invalid
- q.view = q.body[:nextWriteAt]
- }
- packetSize := binary.BigEndian.Uint16(q.view[pos : pos+2])
- if packetSize > MAX_PACKET_SIZE {
- return nil, errors.New("packet is too large" + strconv.Itoa(int(packetSize)))
- }
- pos += 2
- nextPos := pos + uint32(packetSize)
- posstr := q.view[pos:nextPos]
- pos = nextPos
- if pos >= q.header.getNextWriteAt() {
- //fmt.Println("file is no two peek")
- return nil, errors.New("file is no two peek")
- }
- packetSize = binary.BigEndian.Uint16(q.view[pos : pos+2])
- if packetSize > MAX_PACKET_SIZE {
- return nil, errors.New("packet is too large" + strconv.Itoa(int(packetSize)))
- }
- pos += 2
- nextPos = pos + uint32(packetSize)
- posstr = q.view[pos:nextPos]
- //fmt.Println("PeekTwo__posstr",string(posstr))
- return posstr, nil
- }
- func (q *DQueue) PeekHead() ([]byte, error) {
- //lastWriteAt := q.lastWriteAt
- //nextReadAt := q.header.getNextReadAt()
- //fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)
- pos := q.header.getNextWriteAt()
- //packetSize := binary.BigEndian.Uint16(q.view[pos-19:pos])
- //if packetSize > MAX_PACKET_SIZE {
- // return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize)))
- //}
- //pos += 2
- //nextPos := pos + uint32(packetSize)
- posstr := q.view[pos-36 : pos]
- //pos = nextPos
- return posstr, nil
- }
- func (q *DQueue) Push(packets [][]byte) error {
- pos := q.header.getNextWriteAt()
- if pos > uint32(len(q.body)) {
- return errors.New("internal error: nextWriteAt is invalid")
- }
- for _, packet := range packets {
- packetSize := uint16(len(packet))
- if packetSize > MAX_PACKET_SIZE {
- return errors.New("packet is too large")
- }
- viewSize := uint32(len(q.view))
- willWriteTo := pos + 2 + uint32(packetSize)
- // write range is [pos, willWriteTo)
- if q.nextReadAt > pos && willWriteTo > q.nextReadAt {
- // overflow the read
- q.nextReadAt = willWriteTo
- q.header.setNextReadAt(q.nextReadAt)
- }
- if willWriteTo > viewSize {
- // overflow the view
- if willWriteTo > uint32(len(q.body)) {
- // overflow the body, shrink the view
- q.view = q.body[:pos]
- pos = 0
- } else {
- // grow the view to cover
- q.view = q.body[:willWriteTo]
- }
- }
- binary.BigEndian.PutUint16(q.view[pos:pos+2], packetSize)
- pos += 2
- nextPos := pos + uint32(packetSize)
- copy(q.view[pos:nextPos], packet)
- pos = nextPos
- }
- q.header.setNextWriteAt(pos)
- q.header.setViewSize(uint32(len(q.view)))
- if LogStatu {
- nextWriteAt := q.header.getNextWriteAt()
- nextReadAt := q.header.getNextReadAt()
- fmt.Println("nextWriteAt", nextWriteAt, nextReadAt)
- }
- return nil
- }
- func (q *DQueue) PushOne(packet []byte) error {
- pos := q.header.getNextWriteAt()
- if pos > uint32(len(q.body)) {
- return errors.New("internal error: nextWriteAt is invalid")
- }
- packetSize := uint16(len(packet))
- if packetSize > MAX_PACKET_SIZE {
- return errors.New("packet is too large")
- }
- viewSize := uint32(len(q.body))
- willWriteTo := pos + 2 + uint32(packetSize)
- // write range is [pos, willWriteTo)
- //if q.nextReadAt > pos && willWriteTo > q.nextReadAt {
- // // overflow the read
- // q.nextReadAt = willWriteTo
- // q.header.setNextReadAt(q.nextReadAt)
- //}
- if willWriteTo > viewSize {
- return errors.New("file is full")
- }
- binary.BigEndian.PutUint16(q.view[pos:pos+2], packetSize)
- pos += 2
- nextPos := pos + uint32(packetSize)
- copy(q.view[pos:nextPos], packet)
- pos = nextPos
- q.lastWriteAt = q.header.getNextWriteAt()
- q.header.setNextWriteAt(pos)
- q.view = q.body[:pos]
- //if LogStatu{
- // //nextWriteAt := q.header.getNextWriteAt()
- // //nextReadAt := q.header.getNextReadAt()
- // fmt.Println("nextWriteAt",q.lastWriteAt,string(packet))
- //}
- //nextWriteAt := q.header.getNextWriteAt()
- // nextReadAt := q.header.getNextReadAt()
- //fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)
- return nil
- }
- func OpenIndexFile(filepath string, size int, indexSize int, log bool) (*DQueue, error) {
- LogStatu = log
- fmt.Println("=====队列路径===", filepath)
- dqueue1Index1, err := Open(filepath+"/index", indexSize)
- fmt.Println(filepath)
- dqueue1Index1.srcpath = filepath
- dqueue1Index1.size = size
- dqueue1Index1.dqueue1Index = dqueue1Index1
- popOne, err := dqueue1Index1.dqueue1Index.Peek()
- if LogStatu {
- fmt.Println("=====peek", string(popOne))
- }
- if popOne == nil || err != nil {
- // dqueue1Index1.nowPath =strconv.Itoa(int(time.Now().UnixNano()))
- dqueue1Index1.nowPath = fmt.Sprintf("%s", uuid.NewV1())
- dqueue1Index1.dqueuePop, err = Open(dqueue1Index1.srcpath+"/"+dqueue1Index1.nowPath, size)
- dqueue1Index1.dqueuePop.nextReadAt = dqueue1Index1.dqueuePop.header.getNextReadAt()
- if err != nil {
- return nil, err
- }
- err = dqueue1Index1.dqueue1Index.PushOne(
- []byte(dqueue1Index1.nowPath),
- )
- } else {
- dqueue1Index1.nowPath = string(popOne)
- dqueue1Index1.dqueuePop, err = Open(dqueue1Index1.srcpath+"/"+string(popOne), size)
- dqueue1Index1.dqueuePop.nextReadAt = dqueue1Index1.dqueuePop.header.getNextReadAt()
- if err != nil {
- return nil, err
- }
- }
- peekHead, err := dqueue1Index1.dqueue1Index.PeekHead()
- if LogStatu {
- fmt.Println("=====peekHead", string(peekHead))
- }
- //dqueue1Index.PopOne()
- if peekHead == nil || err != nil {
- dqueue1Index1.dqueuPush = dqueue1Index1.dqueuPush
- } else {
- //dqueue1Index1.nowPath = string(popOne)
- dqueue1Index1.dqueuPush, err = Open(dqueue1Index1.srcpath+"/"+string(peekHead), size)
- if err != nil {
- return nil, err
- }
- }
- return dqueue1Index1, err
- }
- func (q *DQueue) PushOneIndex(packet []byte) error {
- err := q.dqueuPush.PushOne(
- packet,
- )
- if err != nil {
- if err.Error() == "file is full" {
- //if q.dqueuPush.header.getNextReadAt()==0{
- err := q.dqueuPush.Close()
- if err != nil {
- if LogStatu {
- fmt.Println("err :=q.dqueuPush.Close() err", err)
- }
- }
- //}
- //datestr :=strconv.Itoa(int(time.Now().UnixNano()))
- files, _ := ioutil.ReadDir(q.srcpath + "/")
- if len(files) > 15 {
- panic("日志文件过多!")
- }
- fmt.Println(len(files))
- datestr := fmt.Sprintf("%s", uuid.NewV1())
- q.dqueuPush, err = Open(q.srcpath+"/"+datestr, q.size)
- if err != nil {
- if LogStatu {
- fmt.Println("kpt.Open dqueue2 err", err)
- }
- return err
- }
- err = q.dqueue1Index.PushOne(
- []byte(datestr),
- )
- if err != nil {
- if err.Error() == "file is full" {
- //q.dqueue1Index.header.setNextWriteAt(0)
- //err =q.dqueue1Index.PushOne(
- // []byte(datestr),
- //)
- //fmt.Println("index file is full",err,[]byte(datestr))
- }
- }
- err = q.dqueuPush.PushOne(
- packet,
- )
- //dqueueChan <- dqueue2
- } else {
- return err
- }
- }
- if showlog > 0 {
- // fmt.Println(string(packet))
- }
- return nil
- }
- func (q *DQueue) PopOneIndex() ([]byte, error) {
- pop, err := q.dqueuePop.PopOne()
- //fmt.Println("q.dqueuePop.header.getNextReadAt() != q.dqueuPush.header.getNextWriteAt()",q.dqueuePop.header.getNextReadAt(),q.dqueuPush.header.getNextWriteAt())
- if err != nil {
- if err.Error() == "queue is null" {
- if q.dqueuePop.fileObj.Name() != q.dqueuPush.fileObj.Name() {
- popIndex, err1 := q.dqueue1Index.PopOne()
- //fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------",pop,err,string(popIndex), q.dqueuePop.fileObj.Name())
- if err1 != nil {
- if LogStatu {
- fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------", pop, err, string(popIndex))
- }
- }
- popOne, err1 := q.dqueue1Index.Peek()
- if popOne == nil && err1 != nil { //索引为空
- if LogStatu {
- fmt.Println("索引为空==============")
- }
- return nil, err1
- }
- //fmt.Println("开始关闭==============")
- errC := q.dqueuePop.Close()
- if errC != nil {
- if LogStatu {
- fmt.Println("q.dqueuePop.Close() err", errC)
- }
- return nil, errC
- }
- //fmt.Println("开始删除==============")
- errR := os.Remove(q.srcpath + "/" + q.nowPath)
- if errR != nil {
- fmt.Println("os.Remove() err", errR)
- time.Sleep(100 * time.Millisecond)
- errR := os.Rename(q.srcpath+"/"+q.nowPath, q.srcpath+"/"+"del"+q.nowPath)
- if errR != nil {
- if LogStatu {
- fmt.Println("os.reRename() err", errR)
- }
- }
- return nil, errR
- } else {
- if LogStatu {
- fmt.Println("file delete ---------", q.nowPath)
- }
- }
- fmt.Println("file delete ---------", q.nowPath)
- q.dqueuePop, err = Open(q.srcpath+"/"+string(popOne), q.size)
- if err != nil {
- if LogStatu {
- fmt.Println("kpt.Open dqueue1 5 err", err)
- }
- return nil, err
- } else {
- q.nowPath = string(popOne)
- //pop ,err = q.dqueuePop.PopOne()
- pop, err = q.dqueuePop.Peek()
- if LogStatu {
- fmt.Println("q.dqueuePop.fileObj.Name()", q.dqueuePop.fileObj.Name(), q.dqueuPush.fileObj.Name(), q.dqueuePop.header.getNextReadAt())
- fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------", pop, err, string(popOne))
- }
- }
- } else {
- return pop, err
- }
- } else {
- return pop, err
- }
- } else {
- return pop, err
- }
- return pop, nil
- }
- func (q *DQueue) CloseIndex() error {
- err := q.mappedFile.Unmap()
- if err != nil {
- return err
- }
- err = q.dqueue1Index.mappedFile.Unmap()
- if err != nil {
- //return err
- }
- err = q.dqueuePop.mappedFile.Unmap()
- if err != nil {
- return err
- }
- err = q.dqueuPush.mappedFile.Unmap()
- if err != nil {
- return err
- }
- err = q.fileObj.Close()
- if err != nil {
- return err
- }
- err = q.dqueue1Index.Close()
- if err != nil {
- //return err
- }
- err = q.dqueuPush.fileObj.Close()
- if err != nil {
- return err
- }
- err = q.dqueuePop.fileObj.Close()
- if err != nil {
- return err
- }
- return nil
- }
- func (q *DQueue) DeleteIndexFile(dqueuePop *DQueue) {
- //if err1 !=nil {
- // //if err1.Error()=="queue is null" && (q.dqueuePop.header.getNextReadAt() != q.dqueuPush.header.getNextWriteAt() ){
- // // q.dqueue1Index.header.setNextReadAt(0)
- // // popOne ,err1 = q.dqueue1Index.Peek()
- // //}
- //}
- //errC := dqueuePop.Close()
- //if errC != nil {
- // fmt.Println("q.dqueuePop.Close() err",errC)
- // return nil,errC
- //}
- //errR := os.Remove(q.srcpath+"/"+q.nowPath)
- //if errR != nil {
- // fmt.Println("os.Remove() err",errR)
- // return nil,errR
- //}else{
- // fmt.Println("file delete ---------",q.nowPath)
- //}
- }
|