package kpt import ( "encoding/binary" "errors" "fmt" "github.com/Shopify/sarama" "github.com/edsrzf/mmap-go" "github.com/satori/go.uuid" "os" "strconv" "time" ) const HEADER_SIZE = 13 // 1 for version 4 for viewSize 4 for nextWriteAt 4 for nextReadAt const MAX_READ_PACKET_COUNT = 1024 const MAX_PACKET_SIZE = 1024 * 63//最大64k var ( LogStatu bool ) type DQueue struct { // underlying disk fileObj *os.File mappedFile mmap.MMap // the uncommitted read pointer // will be committed at next pop() nextReadAt uint32 lastWriteAt uint32 /* view covers the readable range of body to avoid split packet in two parts to wrap around the memory will NOT be copied during read, the reader must copy out the bytes manually before goroutine switch [--- header ---][--- body ---] [--- view --] */ header header body []byte view []byte // used to avoid allocation readBufferItself [][]byte dqueuePop *DQueue //dqueuePopTemp *DQueue dqueue1Index *DQueue dqueuPush *DQueue nowPath string srcpath string size int deleteChan string } type header []byte func (header header) setVersion() { header[0] = 1 } func (header header) setViewSize(viewSize uint32) { binary.BigEndian.PutUint32(header[1:], viewSize) } func (header header) getViewSize() uint32 { return binary.BigEndian.Uint32(header[1:]) } func (header header) setNextWriteAt(nextWriteAt uint32) { binary.BigEndian.PutUint32(header[5:], nextWriteAt) } func (header header) getNextWriteAt() uint32 { return binary.BigEndian.Uint32(header[5:]) } func (mappedBytes header) setNextReadAt(nextReadAt uint32) { binary.BigEndian.PutUint32(mappedBytes[9:], nextReadAt) } func (mappedBytes header) getNextReadAt() uint32 { return binary.BigEndian.Uint32(mappedBytes[9:]) } func Open(filePath string, nkiloBytes int) (*DQueue, error) { fileObj, err := os.OpenFile(filePath, os.O_RDWR, 0644) if err != nil { if os.IsNotExist(err) { fileObj, err = os.OpenFile(filePath, os.O_RDWR | os.O_CREATE | os.O_TRUNC, 0644) if err != nil { return nil, err } emptyBytes := make([]byte, 1024) for i := 0; i < nkiloBytes; i++ { _, err = fileObj.Write(emptyBytes[:]) if err != nil { return nil, err } } err = fileObj.Close() if err != nil { return nil, err } fileObj, err = os.OpenFile(filePath, os.O_RDWR, 0644) if err != nil { return nil, err } mappedFile, err := mmap.Map(fileObj, mmap.RDWR, 0) if err != nil { return nil, err } header := header(mappedFile[:HEADER_SIZE]) header.setVersion() header.setNextWriteAt(0) header.setNextReadAt(0) body := mappedFile[HEADER_SIZE:] view := body[:0] header.setViewSize(uint32(len(view))) readBufferItself := make([][]byte, MAX_READ_PACKET_COUNT) return &DQueue{ fileObj: fileObj, mappedFile: mappedFile, header: header, body: body, view: view, readBufferItself: readBufferItself, lastWriteAt : 0 , }, nil } else { return nil, err } }else { mappedFile, err := mmap.Map(fileObj, mmap.RDWR, 0) if err != nil { return nil, err } header := header(mappedFile[:HEADER_SIZE]) header.setVersion() //header.setNextWriteAt(0) //header.setNextReadAt(0) body := mappedFile[HEADER_SIZE:] view := body[:] header.setViewSize(uint32(len(view))) readBufferItself := make([][]byte, MAX_READ_PACKET_COUNT) return &DQueue{ fileObj: fileObj, mappedFile: mappedFile, header: header, body: body, view: view, readBufferItself: readBufferItself, }, nil } } func (q *DQueue) Close() error { err := q.mappedFile.Unmap() if err != nil { return err } err = q.fileObj.Close() if err != nil { return err } return nil } func (q *DQueue) Pop() ([][]byte, error) { nextWriteAt := q.header.getNextWriteAt() nextReadAt := q.nextReadAt q.header.setNextReadAt(nextReadAt) packetsCount := 0 fallBehind := false if nextReadAt == nextReadAt && nextWriteAt == uint32(len(q.view)) { // at tail } else if nextReadAt >= nextWriteAt { fallBehind = true } pos := nextReadAt for ; packetsCount < len(q.readBufferItself); packetsCount++ { if !fallBehind && pos >= nextWriteAt { break } viewSize := uint32(len(q.view)) if pos >= viewSize { pos = 0 // wrap around fallBehind = false // the region between [nextWriteAt, tail) is now invalid q.view = q.body[:nextWriteAt] } packetSize := binary.BigEndian.Uint16(q.view[pos:pos+2]) if packetSize > MAX_PACKET_SIZE { return nil, errors.New("packet is too large") } pos += 2 nextPos := pos + uint32(packetSize) q.readBufferItself[packetsCount] = q.view[pos:nextPos] pos = nextPos } q.nextReadAt = pos return q.readBufferItself[:packetsCount], nil } func (q *DQueue) PopPeeksV2(count int) (readBuff [][]byte, err error,full bool,null bool){ nextWriteAt := q.dqueuePop.header.getNextWriteAt() nextReadAt :=q.dqueuePop.nextReadAt //fmt.Println("nextWriteAt,nextReadAt",nextWriteAt,nextReadAt) packetsCount := 0 pos := nextReadAt var readBuffer [][]byte if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name(){ full = true } for ; packetsCount < count; packetsCount++ { if pos >= nextWriteAt { null = true break } //viewSize := uint32(len(q.view)) //if pos >= viewSize { // pos = 0 // wrap around // // the region between [nextWriteAt, tail) is now invalid // q.view = q.body[:nextWriteAt] //} packetSize := binary.BigEndian.Uint16(q.dqueuePop.view[pos:pos+2]) if packetSize > MAX_PACKET_SIZE { return nil, errors.New("packet is too large"),full,null } pos += 2 nextPos := pos + uint32(packetSize) if len(q.dqueuePop.view[pos:nextPos])==0{ continue } readBuffer = append(readBuffer,q.dqueuePop.view[pos:nextPos]) pos = nextPos } q.nextReadAt = pos return readBuffer, nil,full,null } func (q *DQueue) PopPeeksV3(count int) (readBuff []*sarama.ProducerMessage, err error,full bool,null bool){ nextWriteAt := q.dqueuePop.header.getNextWriteAt() nextReadAt :=q.dqueuePop.header.getNextReadAt() packetsCount := 0 pos := nextReadAt if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name(){ full = true } for ; packetsCount < count; packetsCount++ { if pos >= nextWriteAt { null = true break } //viewSize := uint32(len(q.view)) //if pos >= viewSize { // pos = 0 // wrap around // // the region between [nextWriteAt, tail) is now invalid // q.view = q.body[:nextWriteAt] //} packetSize := binary.BigEndian.Uint16(q.dqueuePop.view[pos:pos+2]) if packetSize > MAX_PACKET_SIZE { return nil, errors.New("packet is too large"),full,null } pos += 2 nextPos := pos + uint32(packetSize) if len(q.dqueuePop.view[pos:nextPos])==0{ continue } msg := &sarama.ProducerMessage{Topic: kafka_topic, Key: sarama.StringEncoder(""), Value: sarama.StringEncoder(string(q.dqueuePop.view[pos:nextPos]))} readBuff = append(readBuff,msg) pos = nextPos } q.nextReadAt = pos return readBuff, nil,full,null } //func (q *DQueue) PopPeeks(count int) ( [][]byte, error) { // if q.dqueuePop.fileObj.Name() != q.dqueuePopTemp.fileObj.Name(){ // err :=q.dqueuePopTemp.Close() // if err != nil{ // fmt.Println("err :=q.dqueuePopTemp.Close() err",err) // } // q.dqueuePopTemp,err = Open(q.dqueuePop.fileObj.Name(),q.size) // if err != nil { // fmt.Println("= Open err",q.dqueuePop.fileObj.Name(),err) // } // fmt.Println("q.dqueuePopTemp,err = Open ",q.dqueuePop.fileObj.Name(),err) // } // nextWriteAt := q.dqueuePopTemp.header.getNextWriteAt() // nextReadAt := q.dqueuePopTemp.header.getNextReadAt() // var readBuff [][]byte // pos := nextReadAt // for packetsCount := 0; packetsCount < count; packetsCount++ { // if pos >= nextWriteAt { // if q.dqueuePop.fileObj.Name() == q.dqueuPush.fileObj.Name(){ // break // } // // popOne,err :=q.dqueue1Index.PeekTwo() // if err != nil { // if err.Error() =="file is no two peek" || err.Error() =="queue is null" { // break // }else { // fmt.Println("q.dqueue1Index.Peek() err",err) // } // } // fmt.Println("popOne,err :=q.dqueue1Index.PeekTwo()",string(popOne),err) // err =q.dqueuePopTemp.Close() // if err != nil{ // fmt.Println("err :=q.dqueuePopTemp.Close() err",err) // break // } // q.dqueuePopTemp,err = Open(q.srcpath+"/"+string(popOne),q.size) // if err != nil { // fmt.Println("= Open err",q.srcpath+"/"+string(popOne),err) // break // } // nextWriteAt = q.dqueuePopTemp.header.getNextWriteAt() // if nextWriteAt == 0 { // break // } // pos = q.dqueuePopTemp.header.getNextReadAt() // } // //viewSize := uint32(len(q.dqueuePopTemp.view)) // //if pos >= viewSize { // // pos = 0 // wrap around // // // the region between [nextWriteAt, tail) is now invalid // // q.dqueuePopTemp.view = q.dqueuePopTemp.body[:nextWriteAt] // //} // packetSize := binary.BigEndian.Uint16(q.dqueuePopTemp.body[pos:pos+2]) // if packetSize > MAX_PACKET_SIZE { // return nil, errors.New("packet is too large : "+string(q.dqueuePopTemp.body[pos:pos+2])) // } // pos += 2 // nextPos := pos + uint32(packetSize) // if int(nextPos) >=len(q.dqueuePopTemp.body){ // continue // } // if len(q.dqueuePopTemp.body[pos:nextPos]) == 0{ // continue // } // readBuff = append(readBuff,q.dqueuePopTemp.body[pos:nextPos]) // //fmt.Println("q.dqueuePop.view[pos:nextPos]",string(q.dqueuePopTemp.view[pos:nextPos]), // // q.dqueuePopTemp.fileObj.Name()) // pos = nextPos // } // q.dqueuePop.nextReadAt = pos // // fmt.Println("readBuff",readBuff) // return readBuff, nil // //} //} func (q *DQueue) PopOne() ([]byte, error) { nextWriteAt := q.header.getNextWriteAt() nextReadAt := q.header.getNextReadAt() if LogStatu {fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)} if nextReadAt >= nextWriteAt { return nil, errors.New("queue is null") } pos := nextReadAt q.view = q.body[:nextWriteAt] viewSize := uint32(len(q.view)) if pos >= viewSize { pos = 0 // wrap around // the region between [nextWriteAt, tail) is now invalid q.view = q.body[:nextWriteAt] } packetSize := binary.BigEndian.Uint16(q.view[pos:pos+2]) if packetSize > MAX_PACKET_SIZE { return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize))) } pos += 2 nextPos := pos + uint32(packetSize) posstr:= q.view[pos:nextPos] pos = nextPos q.nextReadAt = pos q.header.setNextReadAt(pos) return posstr, nil } func (q *DQueue) Peek() ([]byte, error) { nextWriteAt := q.header.getNextWriteAt() nextReadAt := q.header.getNextReadAt() if LogStatu {fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)} if nextReadAt >= nextWriteAt { return nil, errors.New("queue is null") } pos := nextReadAt viewSize := uint32(len(q.view)) if pos >= viewSize { pos = 0 // wrap around // the region between [nextWriteAt, tail) is now invalid q.view = q.body[:nextWriteAt] } packetSize := binary.BigEndian.Uint16(q.view[pos:pos+2]) if packetSize > MAX_PACKET_SIZE { return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize))) } pos += 2 nextPos := pos + uint32(packetSize) posstr:= q.view[pos:nextPos] pos = nextPos return posstr, nil } func (q *DQueue) PeekTwo() ([]byte, error) { nextWriteAt := q.header.getNextWriteAt() nextReadAt := q.header.getNextReadAt() if LogStatu {fmt.Println("nextWriteAt",nextWriteAt,nextReadAt)} if nextReadAt >= nextWriteAt { //fmt.Println("queue is null") return nil, errors.New("queue is null") } pos := nextReadAt viewSize := uint32(len(q.view)) if pos >= viewSize { pos = 0 // wrap around // the region between [nextWriteAt, tail) is now invalid q.view = q.body[:nextWriteAt] } packetSize := binary.BigEndian.Uint16(q.view[pos:pos+2]) if packetSize > MAX_PACKET_SIZE { return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize))) } pos += 2 nextPos := pos + uint32(packetSize) posstr:= q.view[pos:nextPos] pos = nextPos if pos >= q.header.getNextWriteAt() { //fmt.Println("file is no two peek") return nil, errors.New("file is no two peek") } packetSize = binary.BigEndian.Uint16(q.view[pos:pos+2]) if packetSize > MAX_PACKET_SIZE { return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize))) } pos += 2 nextPos = pos + uint32(packetSize) posstr = q.view[pos:nextPos] //fmt.Println("PeekTwo__posstr",string(posstr)) return posstr, nil } func (q *DQueue) PeekHead() ([]byte, error) { //lastWriteAt := q.lastWriteAt //nextReadAt := q.header.getNextReadAt() //fmt.Println("nextWriteAt",nextWriteAt,nextReadAt) pos := q.header.getNextWriteAt() //packetSize := binary.BigEndian.Uint16(q.view[pos-19:pos]) //if packetSize > MAX_PACKET_SIZE { // return nil, errors.New("packet is too large"+strconv.Itoa(int(packetSize))) //} //pos += 2 //nextPos := pos + uint32(packetSize) posstr:= q.view[pos-36:pos] //pos = nextPos return posstr, nil } func (q *DQueue) Push(packets [][]byte) error { pos := q.header.getNextWriteAt() if pos > uint32(len(q.body)) { return errors.New("internal error: nextWriteAt is invalid") } for _, packet := range packets { packetSize := uint16(len(packet)) if packetSize > MAX_PACKET_SIZE { return errors.New("packet is too large") } viewSize := uint32(len(q.view)) willWriteTo := pos + 2 + uint32(packetSize) // write range is [pos, willWriteTo) if q.nextReadAt > pos && willWriteTo > q.nextReadAt { // overflow the read q.nextReadAt = willWriteTo q.header.setNextReadAt(q.nextReadAt) } if willWriteTo > viewSize { // overflow the view if willWriteTo > uint32(len(q.body)) { // overflow the body, shrink the view q.view = q.body[:pos] pos = 0 } else { // grow the view to cover q.view = q.body[:willWriteTo] } } binary.BigEndian.PutUint16(q.view[pos:pos+2], packetSize) pos += 2 nextPos := pos + uint32(packetSize) copy(q.view[pos:nextPos], packet) pos = nextPos } q.header.setNextWriteAt(pos) q.header.setViewSize(uint32(len(q.view))) if LogStatu { nextWriteAt := q.header.getNextWriteAt() nextReadAt := q.header.getNextReadAt() fmt.Println("nextWriteAt",nextWriteAt,nextReadAt) } return nil } func (q *DQueue) PushOne(packet []byte) error { pos := q.header.getNextWriteAt() if pos > uint32(len(q.body)) { return errors.New("internal error: nextWriteAt is invalid") } packetSize := uint16(len(packet)) if packetSize > MAX_PACKET_SIZE { return errors.New("packet is too large") } viewSize := uint32(len(q.body)) willWriteTo := pos + 2 + uint32(packetSize) // write range is [pos, willWriteTo) //if q.nextReadAt > pos && willWriteTo > q.nextReadAt { // // overflow the read // q.nextReadAt = willWriteTo // q.header.setNextReadAt(q.nextReadAt) //} if willWriteTo > viewSize { return errors.New("file is full") } binary.BigEndian.PutUint16(q.view[pos:pos+2], packetSize) pos += 2 nextPos := pos + uint32(packetSize) copy(q.view[pos:nextPos], packet) pos = nextPos q.lastWriteAt = q.header.getNextWriteAt() q.header.setNextWriteAt(pos) q.view=q.body[:pos] //if LogStatu{ // //nextWriteAt := q.header.getNextWriteAt() // //nextReadAt := q.header.getNextReadAt() // fmt.Println("nextWriteAt",q.lastWriteAt,string(packet)) //} //nextWriteAt := q.header.getNextWriteAt() // nextReadAt := q.header.getNextReadAt() //fmt.Println("nextWriteAt",nextWriteAt,nextReadAt) return nil } func OpenIndexFile(filepath string,size int,indexSize int,log bool) (*DQueue,error){ LogStatu = log fmt.Println("=====队列路径===",filepath) dqueue1Index1,err := Open(filepath+"/index",indexSize) dqueue1Index1.srcpath = filepath dqueue1Index1.size = size dqueue1Index1.dqueue1Index = dqueue1Index1 popOne ,err :=dqueue1Index1.dqueue1Index.Peek() if LogStatu { fmt.Println("=====peek",string(popOne)) } if popOne==nil || err != nil{ // dqueue1Index1.nowPath =strconv.Itoa(int(time.Now().UnixNano())) dqueue1Index1.nowPath = fmt.Sprintf("%s", uuid.NewV1()) dqueue1Index1.dqueuePop,err = Open(dqueue1Index1.srcpath+"/"+dqueue1Index1.nowPath,size) dqueue1Index1.dqueuePop.nextReadAt = dqueue1Index1.dqueuePop.header.getNextReadAt() if err !=nil { return nil,err } err =dqueue1Index1.dqueue1Index.PushOne( []byte(dqueue1Index1.nowPath), ) }else{ dqueue1Index1.nowPath = string(popOne) dqueue1Index1.dqueuePop,err = Open(dqueue1Index1.srcpath+"/"+string(popOne),size) dqueue1Index1.dqueuePop.nextReadAt = dqueue1Index1.dqueuePop.header.getNextReadAt() if err !=nil { return nil,err } } peekHead ,err :=dqueue1Index1.dqueue1Index.PeekHead() if LogStatu { fmt.Println("=====peekHead",string(peekHead)) } //dqueue1Index.PopOne() if peekHead==nil || err != nil{ dqueue1Index1.dqueuPush =dqueue1Index1.dqueuPush }else{ //dqueue1Index1.nowPath = string(popOne) dqueue1Index1.dqueuPush,err = Open(dqueue1Index1.srcpath+"/"+string(peekHead),size) if err !=nil { return nil,err } } return dqueue1Index1,err } func (q *DQueue) PushOneIndex(packet []byte) error { err := q.dqueuPush.PushOne( packet, ) if err !=nil { if err.Error()=="file is full"{ //if q.dqueuPush.header.getNextReadAt()==0{ err :=q.dqueuPush.Close() if err != nil{ if LogStatu {fmt.Println("err :=q.dqueuPush.Close() err",err)} } //} //datestr :=strconv.Itoa(int(time.Now().UnixNano())) datestr :=fmt.Sprintf("%s", uuid.NewV1()) q.dqueuPush,err = Open(q.srcpath+"/"+datestr,q.size) if err !=nil { if LogStatu {fmt.Println("kpt.Open dqueue2 err",err)} return err } err = q.dqueue1Index.PushOne( []byte(datestr), ) if err != nil { if err.Error()=="file is full"{ //q.dqueue1Index.header.setNextWriteAt(0) //err =q.dqueue1Index.PushOne( // []byte(datestr), //) //fmt.Println("index file is full",err,[]byte(datestr)) } } err =q.dqueuPush.PushOne( packet, ) //dqueueChan <- dqueue2 }else{ return err } } if showlog >0 { fmt.Println(string(packet)) } return nil } func (q *DQueue) PopOneIndex() ([]byte, error) { pop ,err := q.dqueuePop.PopOne() //fmt.Println("q.dqueuePop.header.getNextReadAt() != q.dqueuPush.header.getNextWriteAt()",q.dqueuePop.header.getNextReadAt(),q.dqueuPush.header.getNextWriteAt()) if err != nil{ if err.Error()=="queue is null" { if q.dqueuePop.fileObj.Name() != q.dqueuPush.fileObj.Name(){ popIndex ,err1 :=q.dqueue1Index.PopOne() //fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------",pop,err,string(popIndex), q.dqueuePop.fileObj.Name()) if err1 !=nil{ if LogStatu {fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------",pop,err,string(popIndex))} } popOne ,err1 :=q.dqueue1Index.Peek() if popOne == nil && err1 != nil{ //索引为空 if LogStatu {fmt.Println("索引为空==============")} return nil,err1 } //fmt.Println("开始关闭==============") errC := q.dqueuePop.Close() if errC != nil { if LogStatu {fmt.Println("q.dqueuePop.Close() err",errC)} return nil,errC } //fmt.Println("开始删除==============") errR := os.Remove(q.srcpath+"/"+q.nowPath) if errR != nil { fmt.Println("os.Remove() err",errR) time.Sleep(100*time.Millisecond) errR := os.Rename(q.srcpath+"/"+q.nowPath,q.srcpath+"/"+"del"+q.nowPath) if errR != nil { if LogStatu {fmt.Println("os.reRename() err",errR)}} return nil,errR }else{ if LogStatu {fmt.Println("file delete ---------",q.nowPath)} } fmt.Println("file delete ---------",q.nowPath) q.dqueuePop,err = Open(q.srcpath+"/"+string(popOne),q.size) if err !=nil { if LogStatu {fmt.Println("kpt.Open dqueue1 5 err",err)} return nil,err }else { q.nowPath = string(popOne) //pop ,err = q.dqueuePop.PopOne() pop ,err = q.dqueuePop.Peek() if LogStatu { fmt.Println("q.dqueuePop.fileObj.Name()",q.dqueuePop.fileObj.Name(),q.dqueuPush.fileObj.Name(),q.dqueuePop.header.getNextReadAt()) fmt.Println("pop ,err =q.dqueuePop.PopOne()====------------",pop,err,string(popOne)) } } }else { return pop,err } }else { return pop,err } }else { return pop,err } return pop,nil } func (q *DQueue) CloseIndex() error { err := q.mappedFile.Unmap() if err != nil { return err } err = q.dqueue1Index.mappedFile.Unmap() if err != nil { //return err } err = q.dqueuePop.mappedFile.Unmap() if err != nil { return err } err = q.dqueuPush.mappedFile.Unmap() if err != nil { return err } err = q.fileObj.Close() if err != nil { return err } err = q.dqueue1Index.Close() if err != nil { //return err } err = q.dqueuPush.fileObj.Close() if err != nil { return err } err = q.dqueuePop.fileObj.Close() if err != nil { return err } return nil } func (q *DQueue)DeleteIndexFile( dqueuePop *DQueue){ //if err1 !=nil { // //if err1.Error()=="queue is null" && (q.dqueuePop.header.getNextReadAt() != q.dqueuPush.header.getNextWriteAt() ){ // // q.dqueue1Index.header.setNextReadAt(0) // // popOne ,err1 = q.dqueue1Index.Peek() // //} //} //errC := dqueuePop.Close() //if errC != nil { // fmt.Println("q.dqueuePop.Close() err",errC) // return nil,errC //} //errR := os.Remove(q.srcpath+"/"+q.nowPath) //if errR != nil { // fmt.Println("os.Remove() err",errR) // return nil,errR //}else{ // fmt.Println("file delete ---------",q.nowPath) //} }