row_event.go 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597
  1. package replication
  2. import (
  3. "encoding/binary"
  4. "encoding/hex"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/pingcap/errors"
  11. "github.com/shopspring/decimal"
  12. "github.com/siddontang/go-log/log"
  13. "github.com/siddontang/go/hack"
  14. . "github.com/go-mysql-org/go-mysql/mysql"
  15. )
  16. var errMissingTableMapEvent = errors.New("invalid table id, no corresponding table map event")
  17. type TableMapEvent struct {
  18. flavor string
  19. tableIDSize int
  20. TableID uint64
  21. Flags uint16
  22. Schema []byte
  23. Table []byte
  24. ColumnCount uint64
  25. ColumnType []byte
  26. ColumnMeta []uint16
  27. //len = (ColumnCount + 7) / 8
  28. NullBitmap []byte
  29. /*
  30. The following are available only after MySQL-8.0.1 or MariaDB-10.5.0
  31. By default MySQL and MariaDB do not log the full row metadata.
  32. see:
  33. - https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_row_metadata
  34. - https://mariadb.com/kb/en/replication-and-binary-log-system-variables/#binlog_row_metadata
  35. */
  36. // SignednessBitmap stores signedness info for numeric columns.
  37. SignednessBitmap []byte
  38. // DefaultCharset/ColumnCharset stores collation info for character columns.
  39. // DefaultCharset[0] is the default collation of character columns.
  40. // For character columns that have different charset,
  41. // (character column index, column collation) pairs follows
  42. DefaultCharset []uint64
  43. // ColumnCharset contains collation sequence for all character columns
  44. ColumnCharset []uint64
  45. // SetStrValue stores values for set columns.
  46. SetStrValue [][][]byte
  47. setStrValueString [][]string
  48. // EnumStrValue stores values for enum columns.
  49. EnumStrValue [][][]byte
  50. enumStrValueString [][]string
  51. // ColumnName list all column names.
  52. ColumnName [][]byte
  53. columnNameString []string // the same as ColumnName in string type, just for reuse
  54. // GeometryType stores real type for geometry columns.
  55. GeometryType []uint64
  56. // PrimaryKey is a sequence of column indexes of primary key.
  57. PrimaryKey []uint64
  58. // PrimaryKeyPrefix is the prefix length used for each column of primary key.
  59. // 0 means that the whole column length is used.
  60. PrimaryKeyPrefix []uint64
  61. // EnumSetDefaultCharset/EnumSetColumnCharset is similar to DefaultCharset/ColumnCharset but for enum/set columns.
  62. EnumSetDefaultCharset []uint64
  63. EnumSetColumnCharset []uint64
  64. }
  65. func (e *TableMapEvent) Decode(data []byte) error {
  66. pos := 0
  67. e.TableID = FixedLengthInt(data[0:e.tableIDSize])
  68. pos += e.tableIDSize
  69. e.Flags = binary.LittleEndian.Uint16(data[pos:])
  70. pos += 2
  71. schemaLength := data[pos]
  72. pos++
  73. e.Schema = data[pos : pos+int(schemaLength)]
  74. pos += int(schemaLength)
  75. //skip 0x00
  76. pos++
  77. tableLength := data[pos]
  78. pos++
  79. e.Table = data[pos : pos+int(tableLength)]
  80. pos += int(tableLength)
  81. //skip 0x00
  82. pos++
  83. var n int
  84. e.ColumnCount, _, n = LengthEncodedInt(data[pos:])
  85. pos += n
  86. e.ColumnType = data[pos : pos+int(e.ColumnCount)]
  87. pos += int(e.ColumnCount)
  88. var err error
  89. var metaData []byte
  90. if metaData, _, n, err = LengthEncodedString(data[pos:]); err != nil {
  91. return errors.Trace(err)
  92. }
  93. if err = e.decodeMeta(metaData); err != nil {
  94. return errors.Trace(err)
  95. }
  96. pos += n
  97. nullBitmapSize := bitmapByteSize(int(e.ColumnCount))
  98. if len(data[pos:]) < nullBitmapSize {
  99. return io.EOF
  100. }
  101. e.NullBitmap = data[pos : pos+nullBitmapSize]
  102. pos += nullBitmapSize
  103. if err = e.decodeOptionalMeta(data[pos:]); err != nil {
  104. return err
  105. }
  106. return nil
  107. }
  108. func bitmapByteSize(columnCount int) int {
  109. return (columnCount + 7) / 8
  110. }
  111. // see mysql sql/log_event.h
  112. /*
  113. 0 byte
  114. MYSQL_TYPE_DECIMAL
  115. MYSQL_TYPE_TINY
  116. MYSQL_TYPE_SHORT
  117. MYSQL_TYPE_LONG
  118. MYSQL_TYPE_NULL
  119. MYSQL_TYPE_TIMESTAMP
  120. MYSQL_TYPE_LONGLONG
  121. MYSQL_TYPE_INT24
  122. MYSQL_TYPE_DATE
  123. MYSQL_TYPE_TIME
  124. MYSQL_TYPE_DATETIME
  125. MYSQL_TYPE_YEAR
  126. 1 byte
  127. MYSQL_TYPE_FLOAT
  128. MYSQL_TYPE_DOUBLE
  129. MYSQL_TYPE_BLOB
  130. MYSQL_TYPE_GEOMETRY
  131. //maybe
  132. MYSQL_TYPE_TIME2
  133. MYSQL_TYPE_DATETIME2
  134. MYSQL_TYPE_TIMESTAMP2
  135. 2 byte
  136. MYSQL_TYPE_VARCHAR
  137. MYSQL_TYPE_BIT
  138. MYSQL_TYPE_NEWDECIMAL
  139. MYSQL_TYPE_VAR_STRING
  140. MYSQL_TYPE_STRING
  141. This enumeration value is only used internally and cannot exist in a binlog.
  142. MYSQL_TYPE_NEWDATE
  143. MYSQL_TYPE_ENUM
  144. MYSQL_TYPE_SET
  145. MYSQL_TYPE_TINY_BLOB
  146. MYSQL_TYPE_MEDIUM_BLOB
  147. MYSQL_TYPE_LONG_BLOB
  148. */
  149. func (e *TableMapEvent) decodeMeta(data []byte) error {
  150. pos := 0
  151. e.ColumnMeta = make([]uint16, e.ColumnCount)
  152. for i, t := range e.ColumnType {
  153. switch t {
  154. case MYSQL_TYPE_STRING:
  155. var x uint16 = uint16(data[pos]) << 8 //real type
  156. x += uint16(data[pos+1]) //pack or field length
  157. e.ColumnMeta[i] = x
  158. pos += 2
  159. case MYSQL_TYPE_NEWDECIMAL:
  160. var x uint16 = uint16(data[pos]) << 8 //precision
  161. x += uint16(data[pos+1]) //decimals
  162. e.ColumnMeta[i] = x
  163. pos += 2
  164. case MYSQL_TYPE_VAR_STRING,
  165. MYSQL_TYPE_VARCHAR,
  166. MYSQL_TYPE_BIT:
  167. e.ColumnMeta[i] = binary.LittleEndian.Uint16(data[pos:])
  168. pos += 2
  169. case MYSQL_TYPE_BLOB,
  170. MYSQL_TYPE_DOUBLE,
  171. MYSQL_TYPE_FLOAT,
  172. MYSQL_TYPE_GEOMETRY,
  173. MYSQL_TYPE_JSON:
  174. e.ColumnMeta[i] = uint16(data[pos])
  175. pos++
  176. case MYSQL_TYPE_TIME2,
  177. MYSQL_TYPE_DATETIME2,
  178. MYSQL_TYPE_TIMESTAMP2:
  179. e.ColumnMeta[i] = uint16(data[pos])
  180. pos++
  181. case MYSQL_TYPE_NEWDATE,
  182. MYSQL_TYPE_ENUM,
  183. MYSQL_TYPE_SET,
  184. MYSQL_TYPE_TINY_BLOB,
  185. MYSQL_TYPE_MEDIUM_BLOB,
  186. MYSQL_TYPE_LONG_BLOB:
  187. return errors.Errorf("unsupport type in binlog %d", t)
  188. default:
  189. e.ColumnMeta[i] = 0
  190. }
  191. }
  192. return nil
  193. }
  194. func (e *TableMapEvent) decodeOptionalMeta(data []byte) (err error) {
  195. pos := 0
  196. for pos < len(data) {
  197. // optional metadata fields are stored in Type, Length, Value(TLV) format
  198. // Type takes 1 byte. Length is a packed integer value. Values takes Length bytes
  199. t := data[pos]
  200. pos++
  201. l, _, n := LengthEncodedInt(data[pos:])
  202. pos += n
  203. v := data[pos : pos+int(l)]
  204. pos += int(l)
  205. switch t {
  206. case TABLE_MAP_OPT_META_SIGNEDNESS:
  207. e.SignednessBitmap = v
  208. case TABLE_MAP_OPT_META_DEFAULT_CHARSET:
  209. e.DefaultCharset, err = e.decodeDefaultCharset(v)
  210. if err != nil {
  211. return err
  212. }
  213. case TABLE_MAP_OPT_META_COLUMN_CHARSET:
  214. e.ColumnCharset, err = e.decodeIntSeq(v)
  215. if err != nil {
  216. return err
  217. }
  218. case TABLE_MAP_OPT_META_COLUMN_NAME:
  219. if err = e.decodeColumnNames(v); err != nil {
  220. return err
  221. }
  222. case TABLE_MAP_OPT_META_SET_STR_VALUE:
  223. e.SetStrValue, err = e.decodeStrValue(v)
  224. if err != nil {
  225. return err
  226. }
  227. case TABLE_MAP_OPT_META_ENUM_STR_VALUE:
  228. e.EnumStrValue, err = e.decodeStrValue(v)
  229. if err != nil {
  230. return err
  231. }
  232. case TABLE_MAP_OPT_META_GEOMETRY_TYPE:
  233. e.GeometryType, err = e.decodeIntSeq(v)
  234. if err != nil {
  235. return err
  236. }
  237. case TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY:
  238. if err = e.decodeSimplePrimaryKey(v); err != nil {
  239. return err
  240. }
  241. case TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX:
  242. if err = e.decodePrimaryKeyWithPrefix(v); err != nil {
  243. return err
  244. }
  245. case TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET:
  246. e.EnumSetDefaultCharset, err = e.decodeDefaultCharset(v)
  247. if err != nil {
  248. return err
  249. }
  250. case TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET:
  251. e.EnumSetColumnCharset, err = e.decodeIntSeq(v)
  252. if err != nil {
  253. return err
  254. }
  255. default:
  256. // Ignore for future extension
  257. }
  258. }
  259. return nil
  260. }
  261. func (e *TableMapEvent) decodeIntSeq(v []byte) (ret []uint64, err error) {
  262. p := 0
  263. for p < len(v) {
  264. i, _, n := LengthEncodedInt(v[p:])
  265. p += n
  266. ret = append(ret, i)
  267. }
  268. return
  269. }
  270. func (e *TableMapEvent) decodeDefaultCharset(v []byte) (ret []uint64, err error) {
  271. ret, err = e.decodeIntSeq(v)
  272. if err != nil {
  273. return
  274. }
  275. if len(ret)%2 != 1 {
  276. return nil, errors.Errorf("Expect odd item in DefaultCharset but got %d", len(ret))
  277. }
  278. return
  279. }
  280. func (e *TableMapEvent) decodeColumnNames(v []byte) error {
  281. p := 0
  282. e.ColumnName = make([][]byte, 0, e.ColumnCount)
  283. for p < len(v) {
  284. n := int(v[p])
  285. p++
  286. e.ColumnName = append(e.ColumnName, v[p:p+n])
  287. p += n
  288. }
  289. if len(e.ColumnName) != int(e.ColumnCount) {
  290. return errors.Errorf("Expect %d column names but got %d", e.ColumnCount, len(e.ColumnName))
  291. }
  292. return nil
  293. }
  294. func (e *TableMapEvent) decodeStrValue(v []byte) (ret [][][]byte, err error) {
  295. p := 0
  296. for p < len(v) {
  297. nVal, _, n := LengthEncodedInt(v[p:])
  298. p += n
  299. vals := make([][]byte, 0, int(nVal))
  300. for i := 0; i < int(nVal); i++ {
  301. val, _, n, err := LengthEncodedString(v[p:])
  302. if err != nil {
  303. return nil, err
  304. }
  305. p += n
  306. vals = append(vals, val)
  307. }
  308. ret = append(ret, vals)
  309. }
  310. return
  311. }
  312. func (e *TableMapEvent) decodeSimplePrimaryKey(v []byte) error {
  313. p := 0
  314. for p < len(v) {
  315. i, _, n := LengthEncodedInt(v[p:])
  316. e.PrimaryKey = append(e.PrimaryKey, i)
  317. e.PrimaryKeyPrefix = append(e.PrimaryKeyPrefix, 0)
  318. p += n
  319. }
  320. return nil
  321. }
  322. func (e *TableMapEvent) decodePrimaryKeyWithPrefix(v []byte) error {
  323. p := 0
  324. for p < len(v) {
  325. i, _, n := LengthEncodedInt(v[p:])
  326. e.PrimaryKey = append(e.PrimaryKey, i)
  327. p += n
  328. i, _, n = LengthEncodedInt(v[p:])
  329. e.PrimaryKeyPrefix = append(e.PrimaryKeyPrefix, i)
  330. p += n
  331. }
  332. return nil
  333. }
  334. func (e *TableMapEvent) Dump(w io.Writer) {
  335. fmt.Fprintf(w, "TableID: %d\n", e.TableID)
  336. fmt.Fprintf(w, "TableID size: %d\n", e.tableIDSize)
  337. fmt.Fprintf(w, "Flags: %d\n", e.Flags)
  338. fmt.Fprintf(w, "Schema: %s\n", e.Schema)
  339. fmt.Fprintf(w, "Table: %s\n", e.Table)
  340. fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount)
  341. fmt.Fprintf(w, "Column type: \n%s", hex.Dump(e.ColumnType))
  342. fmt.Fprintf(w, "NULL bitmap: \n%s", hex.Dump(e.NullBitmap))
  343. fmt.Fprintf(w, "Signedness bitmap: \n%s", hex.Dump(e.SignednessBitmap))
  344. fmt.Fprintf(w, "Default charset: %v\n", e.DefaultCharset)
  345. fmt.Fprintf(w, "Column charset: %v\n", e.ColumnCharset)
  346. fmt.Fprintf(w, "Set str value: %v\n", e.SetStrValueString())
  347. fmt.Fprintf(w, "Enum str value: %v\n", e.EnumStrValueString())
  348. fmt.Fprintf(w, "Column name: %v\n", e.ColumnNameString())
  349. fmt.Fprintf(w, "Geometry type: %v\n", e.GeometryType)
  350. fmt.Fprintf(w, "Primary key: %v\n", e.PrimaryKey)
  351. fmt.Fprintf(w, "Primary key prefix: %v\n", e.PrimaryKeyPrefix)
  352. fmt.Fprintf(w, "Enum/set default charset: %v\n", e.EnumSetDefaultCharset)
  353. fmt.Fprintf(w, "Enum/set column charset: %v\n", e.EnumSetColumnCharset)
  354. unsignedMap := e.UnsignedMap()
  355. fmt.Fprintf(w, "UnsignedMap: %#v\n", unsignedMap)
  356. collationMap := e.CollationMap()
  357. fmt.Fprintf(w, "CollationMap: %#v\n", collationMap)
  358. enumSetCollationMap := e.EnumSetCollationMap()
  359. fmt.Fprintf(w, "EnumSetCollationMap: %#v\n", enumSetCollationMap)
  360. enumStrValueMap := e.EnumStrValueMap()
  361. fmt.Fprintf(w, "EnumStrValueMap: %#v\n", enumStrValueMap)
  362. setStrValueMap := e.SetStrValueMap()
  363. fmt.Fprintf(w, "SetStrValueMap: %#v\n", setStrValueMap)
  364. geometryTypeMap := e.GeometryTypeMap()
  365. fmt.Fprintf(w, "GeometryTypeMap: %#v\n", geometryTypeMap)
  366. nameMaxLen := 0
  367. for _, name := range e.ColumnName {
  368. if len(name) > nameMaxLen {
  369. nameMaxLen = len(name)
  370. }
  371. }
  372. nameFmt := " %s"
  373. if nameMaxLen > 0 {
  374. nameFmt = fmt.Sprintf(" %%-%ds", nameMaxLen)
  375. }
  376. primaryKey := map[int]struct{}{}
  377. for _, pk := range e.PrimaryKey {
  378. primaryKey[int(pk)] = struct{}{}
  379. }
  380. fmt.Fprintf(w, "Columns: \n")
  381. for i := 0; i < int(e.ColumnCount); i++ {
  382. if len(e.ColumnName) == 0 {
  383. fmt.Fprintf(w, nameFmt, "<n/a>")
  384. } else {
  385. fmt.Fprintf(w, nameFmt, e.ColumnName[i])
  386. }
  387. fmt.Fprintf(w, " type=%-3d", e.realType(i))
  388. if e.IsNumericColumn(i) {
  389. if len(unsignedMap) == 0 {
  390. fmt.Fprintf(w, " unsigned=<n/a>")
  391. } else if unsignedMap[i] {
  392. fmt.Fprintf(w, " unsigned=yes")
  393. } else {
  394. fmt.Fprintf(w, " unsigned=no ")
  395. }
  396. }
  397. if e.IsCharacterColumn(i) {
  398. if len(collationMap) == 0 {
  399. fmt.Fprintf(w, " collation=<n/a>")
  400. } else {
  401. fmt.Fprintf(w, " collation=%d ", collationMap[i])
  402. }
  403. }
  404. if e.IsEnumColumn(i) {
  405. if len(enumSetCollationMap) == 0 {
  406. fmt.Fprintf(w, " enum_collation=<n/a>")
  407. } else {
  408. fmt.Fprintf(w, " enum_collation=%d", enumSetCollationMap[i])
  409. }
  410. if len(enumStrValueMap) == 0 {
  411. fmt.Fprintf(w, " enum=<n/a>")
  412. } else {
  413. fmt.Fprintf(w, " enum=%v", enumStrValueMap[i])
  414. }
  415. }
  416. if e.IsSetColumn(i) {
  417. if len(enumSetCollationMap) == 0 {
  418. fmt.Fprintf(w, " set_collation=<n/a>")
  419. } else {
  420. fmt.Fprintf(w, " set_collation=%d", enumSetCollationMap[i])
  421. }
  422. if len(setStrValueMap) == 0 {
  423. fmt.Fprintf(w, " set=<n/a>")
  424. } else {
  425. fmt.Fprintf(w, " set=%v", setStrValueMap[i])
  426. }
  427. }
  428. if e.IsGeometryColumn(i) {
  429. if len(geometryTypeMap) == 0 {
  430. fmt.Fprintf(w, " geometry_type=<n/a>")
  431. } else {
  432. fmt.Fprintf(w, " geometry_type=%v", geometryTypeMap[i])
  433. }
  434. }
  435. available, nullable := e.Nullable(i)
  436. if !available {
  437. fmt.Fprintf(w, " null=<n/a>")
  438. } else if nullable {
  439. fmt.Fprintf(w, " null=yes")
  440. } else {
  441. fmt.Fprintf(w, " null=no ")
  442. }
  443. if _, ok := primaryKey[i]; ok {
  444. fmt.Fprintf(w, " pri")
  445. }
  446. fmt.Fprintf(w, "\n")
  447. }
  448. fmt.Fprintln(w)
  449. }
  450. // Nullable returns the nullablity of the i-th column.
  451. // If null bits are not available, available is false.
  452. // i must be in range [0, ColumnCount).
  453. func (e *TableMapEvent) Nullable(i int) (available, nullable bool) {
  454. if len(e.NullBitmap) == 0 {
  455. return
  456. }
  457. return true, e.NullBitmap[i/8]&(1<<uint(i%8)) != 0
  458. }
  459. // SetStrValueString returns values for set columns as string slices.
  460. // nil is returned if not available or no set columns at all.
  461. func (e *TableMapEvent) SetStrValueString() [][]string {
  462. if e.setStrValueString == nil {
  463. if len(e.SetStrValue) == 0 {
  464. return nil
  465. }
  466. e.setStrValueString = make([][]string, 0, len(e.SetStrValue))
  467. for _, vals := range e.SetStrValue {
  468. e.setStrValueString = append(
  469. e.setStrValueString,
  470. e.bytesSlice2StrSlice(vals),
  471. )
  472. }
  473. }
  474. return e.setStrValueString
  475. }
  476. // EnumStrValueString returns values for enum columns as string slices.
  477. // nil is returned if not available or no enum columns at all.
  478. func (e *TableMapEvent) EnumStrValueString() [][]string {
  479. if e.enumStrValueString == nil {
  480. if len(e.EnumStrValue) == 0 {
  481. return nil
  482. }
  483. e.enumStrValueString = make([][]string, 0, len(e.EnumStrValue))
  484. for _, vals := range e.EnumStrValue {
  485. e.enumStrValueString = append(
  486. e.enumStrValueString,
  487. e.bytesSlice2StrSlice(vals),
  488. )
  489. }
  490. }
  491. return e.enumStrValueString
  492. }
  493. // ColumnNameString returns column names as string slice.
  494. // nil is returned if not available.
  495. func (e *TableMapEvent) ColumnNameString() []string {
  496. if e.columnNameString == nil {
  497. e.columnNameString = e.bytesSlice2StrSlice(e.ColumnName)
  498. }
  499. return e.columnNameString
  500. }
  501. func (e *TableMapEvent) bytesSlice2StrSlice(src [][]byte) []string {
  502. if src == nil {
  503. return nil
  504. }
  505. ret := make([]string, 0, len(src))
  506. for _, item := range src {
  507. ret = append(ret, string(item))
  508. }
  509. return ret
  510. }
  511. // UnsignedMap returns a map: column index -> unsigned.
  512. // Note that only numeric columns will be returned.
  513. // nil is returned if not available or no numeric columns at all.
  514. func (e *TableMapEvent) UnsignedMap() map[int]bool {
  515. if len(e.SignednessBitmap) == 0 {
  516. return nil
  517. }
  518. p := 0
  519. ret := make(map[int]bool)
  520. for i := 0; i < int(e.ColumnCount); i++ {
  521. if !e.IsNumericColumn(i) {
  522. continue
  523. }
  524. ret[i] = e.SignednessBitmap[p/8]&(1<<uint(7-p%8)) != 0
  525. p++
  526. }
  527. return ret
  528. }
  529. // CollationMap returns a map: column index -> collation id.
  530. // Note that only character columns will be returned.
  531. // nil is returned if not available or no character columns at all.
  532. func (e *TableMapEvent) CollationMap() map[int]uint64 {
  533. return e.collationMap(e.IsCharacterColumn, e.DefaultCharset, e.ColumnCharset)
  534. }
  535. // EnumSetCollationMap returns a map: column index -> collation id.
  536. // Note that only enum or set columns will be returned.
  537. // nil is returned if not available or no enum/set columns at all.
  538. func (e *TableMapEvent) EnumSetCollationMap() map[int]uint64 {
  539. return e.collationMap(e.IsEnumOrSetColumn, e.EnumSetDefaultCharset, e.EnumSetColumnCharset)
  540. }
  541. func (e *TableMapEvent) collationMap(includeType func(int) bool, defaultCharset, columnCharset []uint64) map[int]uint64 {
  542. if len(defaultCharset) != 0 {
  543. defaultCollation := defaultCharset[0]
  544. // character column index -> collation
  545. collations := make(map[int]uint64)
  546. for i := 1; i < len(defaultCharset); i += 2 {
  547. collations[int(defaultCharset[i])] = defaultCharset[i+1]
  548. }
  549. p := 0
  550. ret := make(map[int]uint64)
  551. for i := 0; i < int(e.ColumnCount); i++ {
  552. if !includeType(i) {
  553. continue
  554. }
  555. if collation, ok := collations[p]; ok {
  556. ret[i] = collation
  557. } else {
  558. ret[i] = defaultCollation
  559. }
  560. p++
  561. }
  562. return ret
  563. }
  564. if len(columnCharset) != 0 {
  565. p := 0
  566. ret := make(map[int]uint64)
  567. for i := 0; i < int(e.ColumnCount); i++ {
  568. if !includeType(i) {
  569. continue
  570. }
  571. ret[i] = columnCharset[p]
  572. p++
  573. }
  574. return ret
  575. }
  576. return nil
  577. }
  578. // EnumStrValueMap returns a map: column index -> enum string value.
  579. // Note that only enum columns will be returned.
  580. // nil is returned if not available or no enum columns at all.
  581. func (e *TableMapEvent) EnumStrValueMap() map[int][]string {
  582. return e.strValueMap(e.IsEnumColumn, e.EnumStrValueString())
  583. }
  584. // SetStrValueMap returns a map: column index -> set string value.
  585. // Note that only set columns will be returned.
  586. // nil is returned if not available or no set columns at all.
  587. func (e *TableMapEvent) SetStrValueMap() map[int][]string {
  588. return e.strValueMap(e.IsSetColumn, e.SetStrValueString())
  589. }
  590. func (e *TableMapEvent) strValueMap(includeType func(int) bool, strValue [][]string) map[int][]string {
  591. if len(strValue) == 0 {
  592. return nil
  593. }
  594. p := 0
  595. ret := make(map[int][]string)
  596. for i := 0; i < int(e.ColumnCount); i++ {
  597. if !includeType(i) {
  598. continue
  599. }
  600. ret[i] = strValue[p]
  601. p++
  602. }
  603. return ret
  604. }
  605. // GeometryTypeMap returns a map: column index -> geometry type.
  606. // Note that only geometry columns will be returned.
  607. // nil is returned if not available or no geometry columns at all.
  608. func (e *TableMapEvent) GeometryTypeMap() map[int]uint64 {
  609. if len(e.GeometryType) == 0 {
  610. return nil
  611. }
  612. p := 0
  613. ret := make(map[int]uint64)
  614. for i := 0; i < int(e.ColumnCount); i++ {
  615. if !e.IsGeometryColumn(i) {
  616. continue
  617. }
  618. ret[i] = e.GeometryType[p]
  619. p++
  620. }
  621. return ret
  622. }
  623. // Below realType and IsXXXColumn are base from:
  624. // table_def::type in sql/rpl_utility.h
  625. // Table_map_log_event::print_columns in mysql-8.0/sql/log_event.cc and mariadb-10.5/sql/log_event_client.cc
  626. func (e *TableMapEvent) realType(i int) byte {
  627. typ := e.ColumnType[i]
  628. switch typ {
  629. case MYSQL_TYPE_STRING:
  630. rtyp := byte(e.ColumnMeta[i] >> 8)
  631. if rtyp == MYSQL_TYPE_ENUM || rtyp == MYSQL_TYPE_SET {
  632. return rtyp
  633. }
  634. case MYSQL_TYPE_DATE:
  635. return MYSQL_TYPE_NEWDATE
  636. }
  637. return typ
  638. }
  639. func (e *TableMapEvent) IsNumericColumn(i int) bool {
  640. switch e.realType(i) {
  641. case MYSQL_TYPE_TINY,
  642. MYSQL_TYPE_SHORT,
  643. MYSQL_TYPE_INT24,
  644. MYSQL_TYPE_LONG,
  645. MYSQL_TYPE_LONGLONG,
  646. MYSQL_TYPE_NEWDECIMAL,
  647. MYSQL_TYPE_FLOAT,
  648. MYSQL_TYPE_DOUBLE:
  649. return true
  650. default:
  651. return false
  652. }
  653. }
  654. // IsCharacterColumn returns true if the column type is considered as character type.
  655. // Note that JSON/GEOMETRY types are treated as character type in mariadb.
  656. // (JSON is an alias for LONGTEXT in mariadb: https://mariadb.com/kb/en/json-data-type/)
  657. func (e *TableMapEvent) IsCharacterColumn(i int) bool {
  658. switch e.realType(i) {
  659. case MYSQL_TYPE_STRING,
  660. MYSQL_TYPE_VAR_STRING,
  661. MYSQL_TYPE_VARCHAR,
  662. MYSQL_TYPE_BLOB:
  663. return true
  664. case MYSQL_TYPE_GEOMETRY:
  665. if e.flavor == "mariadb" {
  666. return true
  667. }
  668. return false
  669. default:
  670. return false
  671. }
  672. }
  673. func (e *TableMapEvent) IsEnumColumn(i int) bool {
  674. return e.realType(i) == MYSQL_TYPE_ENUM
  675. }
  676. func (e *TableMapEvent) IsSetColumn(i int) bool {
  677. return e.realType(i) == MYSQL_TYPE_SET
  678. }
  679. func (e *TableMapEvent) IsGeometryColumn(i int) bool {
  680. return e.realType(i) == MYSQL_TYPE_GEOMETRY
  681. }
  682. func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool {
  683. rtyp := e.realType(i)
  684. return rtyp == MYSQL_TYPE_ENUM || rtyp == MYSQL_TYPE_SET
  685. }
  686. // RowsEventStmtEndFlag is set in the end of the statement.
  687. const RowsEventStmtEndFlag = 0x01
  688. type RowsEvent struct {
  689. //0, 1, 2
  690. Version int
  691. tableIDSize int
  692. tables map[uint64]*TableMapEvent
  693. needBitmap2 bool
  694. Table *TableMapEvent
  695. TableID uint64
  696. Flags uint16
  697. //if version == 2
  698. ExtraData []byte
  699. //lenenc_int
  700. ColumnCount uint64
  701. /*
  702. By default MySQL and MariaDB log the full row image.
  703. see
  704. - https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_row_image
  705. - https://mariadb.com/kb/en/replication-and-binary-log-system-variables/#binlog_row_image
  706. ColumnBitmap1, ColumnBitmap2 and SkippedColumns are not set on the full row image.
  707. */
  708. //len = (ColumnCount + 7) / 8
  709. ColumnBitmap1 []byte
  710. //if UPDATE_ROWS_EVENTv1 or v2
  711. //len = (ColumnCount + 7) / 8
  712. ColumnBitmap2 []byte
  713. //rows: invalid: int64, float64, bool, []byte, string
  714. Rows [][]interface{}
  715. SkippedColumns [][]int
  716. parseTime bool
  717. timestampStringLocation *time.Location
  718. useDecimal bool
  719. ignoreJSONDecodeErr bool
  720. }
  721. func (e *RowsEvent) Decode(data []byte) (err2 error) {
  722. pos := 0
  723. e.TableID = FixedLengthInt(data[0:e.tableIDSize])
  724. pos += e.tableIDSize
  725. e.Flags = binary.LittleEndian.Uint16(data[pos:])
  726. pos += 2
  727. if e.Version == 2 {
  728. dataLen := binary.LittleEndian.Uint16(data[pos:])
  729. pos += 2
  730. e.ExtraData = data[pos : pos+int(dataLen-2)]
  731. pos += int(dataLen - 2)
  732. }
  733. var n int
  734. e.ColumnCount, _, n = LengthEncodedInt(data[pos:])
  735. pos += n
  736. bitCount := bitmapByteSize(int(e.ColumnCount))
  737. e.ColumnBitmap1 = data[pos : pos+bitCount]
  738. pos += bitCount
  739. if e.needBitmap2 {
  740. e.ColumnBitmap2 = data[pos : pos+bitCount]
  741. pos += bitCount
  742. }
  743. var ok bool
  744. e.Table, ok = e.tables[e.TableID]
  745. if !ok {
  746. if len(e.tables) > 0 {
  747. return errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
  748. } else {
  749. return errors.Annotatef(errMissingTableMapEvent, "table id %d", e.TableID)
  750. }
  751. }
  752. var err error
  753. // ... repeat rows until event-end
  754. defer func() {
  755. if r := recover(); r != nil {
  756. errStr := fmt.Sprintf("parse rows event panic %v, data %q, parsed rows %#v, table map %#v", r, data, e, e.Table)
  757. log.Errorf("%s\n%s", errStr, Pstack())
  758. err2 = errors.Trace(errors.New(errStr))
  759. }
  760. }()
  761. // Pre-allocate memory for rows.
  762. rowsLen := e.ColumnCount
  763. if e.needBitmap2 {
  764. rowsLen += e.ColumnCount
  765. }
  766. e.SkippedColumns = make([][]int, 0, rowsLen)
  767. e.Rows = make([][]interface{}, 0, rowsLen)
  768. for pos < len(data) {
  769. if n, err = e.decodeRows(data[pos:], e.Table, e.ColumnBitmap1); err != nil {
  770. return errors.Trace(err)
  771. }
  772. pos += n
  773. if e.needBitmap2 {
  774. if n, err = e.decodeRows(data[pos:], e.Table, e.ColumnBitmap2); err != nil {
  775. return errors.Trace(err)
  776. }
  777. pos += n
  778. }
  779. }
  780. return nil
  781. }
  782. func isBitSet(bitmap []byte, i int) bool {
  783. return bitmap[i>>3]&(1<<(uint(i)&7)) > 0
  784. }
  785. func (e *RowsEvent) decodeRows(data []byte, table *TableMapEvent, bitmap []byte) (int, error) {
  786. row := make([]interface{}, e.ColumnCount)
  787. skips := make([]int, 0)
  788. pos := 0
  789. // refer: https://github.com/alibaba/canal/blob/c3e38e50e269adafdd38a48c63a1740cde304c67/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java#L63
  790. count := 0
  791. for i := 0; i < int(e.ColumnCount); i++ {
  792. if isBitSet(bitmap, i) {
  793. count++
  794. }
  795. }
  796. count = (count + 7) / 8
  797. nullBitmap := data[pos : pos+count]
  798. pos += count
  799. nullbitIndex := 0
  800. var n int
  801. var err error
  802. for i := 0; i < int(e.ColumnCount); i++ {
  803. if !isBitSet(bitmap, i) {
  804. skips = append(skips, i)
  805. continue
  806. }
  807. isNull := (uint32(nullBitmap[nullbitIndex/8]) >> uint32(nullbitIndex%8)) & 0x01
  808. nullbitIndex++
  809. if isNull > 0 {
  810. row[i] = nil
  811. continue
  812. }
  813. row[i], n, err = e.decodeValue(data[pos:], table.ColumnType[i], table.ColumnMeta[i])
  814. if err != nil {
  815. return 0, err
  816. }
  817. pos += n
  818. }
  819. e.Rows = append(e.Rows, row)
  820. e.SkippedColumns = append(e.SkippedColumns, skips)
  821. return pos, nil
  822. }
  823. func (e *RowsEvent) parseFracTime(t interface{}) interface{} {
  824. v, ok := t.(fracTime)
  825. if !ok {
  826. return t
  827. }
  828. if !e.parseTime {
  829. // Don't parse time, return string directly
  830. return v.String()
  831. }
  832. // return Golang time directly
  833. return v.Time
  834. }
  835. // see mysql sql/log_event.cc log_event_print_value
  836. func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{}, n int, err error) {
  837. var length int = 0
  838. if tp == MYSQL_TYPE_STRING {
  839. if meta >= 256 {
  840. b0 := uint8(meta >> 8)
  841. b1 := uint8(meta & 0xFF)
  842. if b0&0x30 != 0x30 {
  843. length = int(uint16(b1) | (uint16((b0&0x30)^0x30) << 4))
  844. tp = b0 | 0x30
  845. } else {
  846. length = int(meta & 0xFF)
  847. tp = b0
  848. }
  849. } else {
  850. length = int(meta)
  851. }
  852. }
  853. switch tp {
  854. case MYSQL_TYPE_NULL:
  855. return nil, 0, nil
  856. case MYSQL_TYPE_LONG:
  857. n = 4
  858. v = ParseBinaryInt32(data)
  859. case MYSQL_TYPE_TINY:
  860. n = 1
  861. v = ParseBinaryInt8(data)
  862. case MYSQL_TYPE_SHORT:
  863. n = 2
  864. v = ParseBinaryInt16(data)
  865. case MYSQL_TYPE_INT24:
  866. n = 3
  867. v = ParseBinaryInt24(data)
  868. case MYSQL_TYPE_LONGLONG:
  869. n = 8
  870. v = ParseBinaryInt64(data)
  871. case MYSQL_TYPE_NEWDECIMAL:
  872. prec := uint8(meta >> 8)
  873. scale := uint8(meta & 0xFF)
  874. v, n, err = decodeDecimal(data, int(prec), int(scale), e.useDecimal)
  875. case MYSQL_TYPE_FLOAT:
  876. n = 4
  877. v = ParseBinaryFloat32(data)
  878. case MYSQL_TYPE_DOUBLE:
  879. n = 8
  880. v = ParseBinaryFloat64(data)
  881. case MYSQL_TYPE_BIT:
  882. nbits := ((meta >> 8) * 8) + (meta & 0xFF)
  883. n = int(nbits+7) / 8
  884. //use int64 for bit
  885. v, err = decodeBit(data, int(nbits), n)
  886. case MYSQL_TYPE_TIMESTAMP:
  887. n = 4
  888. t := binary.LittleEndian.Uint32(data)
  889. if t == 0 {
  890. v = formatZeroTime(0, 0)
  891. } else {
  892. v = e.parseFracTime(fracTime{
  893. Time: time.Unix(int64(t), 0),
  894. Dec: 0,
  895. timestampStringLocation: e.timestampStringLocation,
  896. })
  897. }
  898. case MYSQL_TYPE_TIMESTAMP2:
  899. v, n, err = decodeTimestamp2(data, meta, e.timestampStringLocation)
  900. v = e.parseFracTime(v)
  901. case MYSQL_TYPE_DATETIME:
  902. n = 8
  903. i64 := binary.LittleEndian.Uint64(data)
  904. if i64 == 0 {
  905. v = formatZeroTime(0, 0)
  906. } else {
  907. d := i64 / 1000000
  908. t := i64 % 1000000
  909. v = e.parseFracTime(fracTime{
  910. Time: time.Date(
  911. int(d/10000),
  912. time.Month((d%10000)/100),
  913. int(d%100),
  914. int(t/10000),
  915. int((t%10000)/100),
  916. int(t%100),
  917. 0,
  918. time.UTC,
  919. ),
  920. Dec: 0,
  921. })
  922. }
  923. case MYSQL_TYPE_DATETIME2:
  924. v, n, err = decodeDatetime2(data, meta)
  925. v = e.parseFracTime(v)
  926. case MYSQL_TYPE_TIME:
  927. n = 3
  928. i32 := uint32(FixedLengthInt(data[0:3]))
  929. if i32 == 0 {
  930. v = "00:00:00"
  931. } else {
  932. v = fmt.Sprintf("%02d:%02d:%02d", i32/10000, (i32%10000)/100, i32%100)
  933. }
  934. case MYSQL_TYPE_TIME2:
  935. v, n, err = decodeTime2(data, meta)
  936. case MYSQL_TYPE_DATE:
  937. n = 3
  938. i32 := uint32(FixedLengthInt(data[0:3]))
  939. if i32 == 0 {
  940. v = "0000-00-00"
  941. } else {
  942. v = fmt.Sprintf("%04d-%02d-%02d", i32/(16*32), i32/32%16, i32%32)
  943. }
  944. case MYSQL_TYPE_YEAR:
  945. n = 1
  946. year := int(data[0])
  947. if year == 0 {
  948. v = year
  949. } else {
  950. v = year + 1900
  951. }
  952. case MYSQL_TYPE_ENUM:
  953. l := meta & 0xFF
  954. switch l {
  955. case 1:
  956. v = int64(data[0])
  957. n = 1
  958. case 2:
  959. v = int64(binary.LittleEndian.Uint16(data))
  960. n = 2
  961. default:
  962. err = fmt.Errorf("Unknown ENUM packlen=%d", l)
  963. }
  964. case MYSQL_TYPE_SET:
  965. n = int(meta & 0xFF)
  966. nbits := n * 8
  967. v, err = littleDecodeBit(data, nbits, n)
  968. case MYSQL_TYPE_BLOB:
  969. v, n, err = decodeBlob(data, meta)
  970. case MYSQL_TYPE_VARCHAR,
  971. MYSQL_TYPE_VAR_STRING:
  972. length = int(meta)
  973. v, n = decodeString(data, length)
  974. case MYSQL_TYPE_STRING:
  975. v, n = decodeString(data, length)
  976. case MYSQL_TYPE_JSON:
  977. // Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404
  978. length = int(FixedLengthInt(data[0:meta]))
  979. n = length + int(meta)
  980. var d []byte
  981. d, err = e.decodeJsonBinary(data[meta:n])
  982. if err == nil {
  983. v = hack.String(d)
  984. }
  985. case MYSQL_TYPE_GEOMETRY:
  986. // MySQL saves Geometry as Blob in binlog
  987. // Seem that the binary format is SRID (4 bytes) + WKB, outer can use
  988. // MySQL GeoFromWKB or others to create the geometry data.
  989. // Refer https://dev.mysql.com/doc/refman/5.7/en/gis-wkb-functions.html
  990. // I also find some go libs to handle WKB if possible
  991. // see https://github.com/twpayne/go-geom or https://github.com/paulmach/go.geo
  992. v, n, err = decodeBlob(data, meta)
  993. default:
  994. err = fmt.Errorf("unsupport type %d in binlog and don't know how to handle", tp)
  995. }
  996. return v, n, err
  997. }
  998. func decodeString(data []byte, length int) (v string, n int) {
  999. if length < 256 {
  1000. length = int(data[0])
  1001. n = length + 1
  1002. v = hack.String(data[1:n])
  1003. } else {
  1004. length = int(binary.LittleEndian.Uint16(data[0:]))
  1005. n = length + 2
  1006. v = hack.String(data[2:n])
  1007. }
  1008. return
  1009. }
  1010. // ref: https://github.com/mysql/mysql-server/blob/a9b0c712de3509d8d08d3ba385d41a4df6348775/strings/decimal.c#L137
  1011. const digitsPerInteger int = 9
  1012. var compressedBytes = []int{0, 1, 1, 2, 2, 3, 3, 4, 4, 4}
  1013. func decodeDecimalDecompressValue(compIndx int, data []byte, mask uint8) (size int, value uint32) {
  1014. size = compressedBytes[compIndx]
  1015. switch size {
  1016. case 0:
  1017. case 1:
  1018. value = uint32(data[0] ^ mask)
  1019. case 2:
  1020. value = uint32(data[1]^mask) | uint32(data[0]^mask)<<8
  1021. case 3:
  1022. value = uint32(data[2]^mask) | uint32(data[1]^mask)<<8 | uint32(data[0]^mask)<<16
  1023. case 4:
  1024. value = uint32(data[3]^mask) | uint32(data[2]^mask)<<8 | uint32(data[1]^mask)<<16 | uint32(data[0]^mask)<<24
  1025. }
  1026. return
  1027. }
  1028. var zeros = [digitsPerInteger]byte{48, 48, 48, 48, 48, 48, 48, 48, 48}
  1029. func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (interface{}, int, error) {
  1030. //see python mysql replication and https://github.com/jeremycole/mysql_binlog
  1031. integral := precision - decimals
  1032. uncompIntegral := integral / digitsPerInteger
  1033. uncompFractional := decimals / digitsPerInteger
  1034. compIntegral := integral - (uncompIntegral * digitsPerInteger)
  1035. compFractional := decimals - (uncompFractional * digitsPerInteger)
  1036. binSize := uncompIntegral*4 + compressedBytes[compIntegral] +
  1037. uncompFractional*4 + compressedBytes[compFractional]
  1038. buf := make([]byte, binSize)
  1039. copy(buf, data[:binSize])
  1040. //must copy the data for later change
  1041. data = buf
  1042. // Support negative
  1043. // The sign is encoded in the high bit of the the byte
  1044. // But this bit can also be used in the value
  1045. value := uint32(data[0])
  1046. var res strings.Builder
  1047. res.Grow(precision + 2)
  1048. var mask uint32 = 0
  1049. if value&0x80 == 0 {
  1050. mask = uint32((1 << 32) - 1)
  1051. res.WriteString("-")
  1052. }
  1053. //clear sign
  1054. data[0] ^= 0x80
  1055. zeroLeading := true
  1056. pos, value := decodeDecimalDecompressValue(compIntegral, data, uint8(mask))
  1057. if value != 0 {
  1058. zeroLeading = false
  1059. res.WriteString(strconv.FormatUint(uint64(value), 10))
  1060. }
  1061. for i := 0; i < uncompIntegral; i++ {
  1062. value = binary.BigEndian.Uint32(data[pos:]) ^ mask
  1063. pos += 4
  1064. if zeroLeading {
  1065. if value != 0 {
  1066. zeroLeading = false
  1067. res.WriteString(strconv.FormatUint(uint64(value), 10))
  1068. }
  1069. } else {
  1070. toWrite := strconv.FormatUint(uint64(value), 10)
  1071. res.Write(zeros[:digitsPerInteger-len(toWrite)])
  1072. res.WriteString(toWrite)
  1073. }
  1074. }
  1075. if zeroLeading {
  1076. res.WriteString("0")
  1077. }
  1078. if pos < len(data) {
  1079. res.WriteString(".")
  1080. for i := 0; i < uncompFractional; i++ {
  1081. value = binary.BigEndian.Uint32(data[pos:]) ^ mask
  1082. pos += 4
  1083. toWrite := strconv.FormatUint(uint64(value), 10)
  1084. res.Write(zeros[:digitsPerInteger-len(toWrite)])
  1085. res.WriteString(toWrite)
  1086. }
  1087. if size, value := decodeDecimalDecompressValue(compFractional, data[pos:], uint8(mask)); size > 0 {
  1088. toWrite := strconv.FormatUint(uint64(value), 10)
  1089. padding := compFractional - len(toWrite)
  1090. if padding > 0 {
  1091. res.Write(zeros[:padding])
  1092. }
  1093. res.WriteString(toWrite)
  1094. pos += size
  1095. }
  1096. }
  1097. if useDecimal {
  1098. f, err := decimal.NewFromString(res.String())
  1099. return f, pos, err
  1100. }
  1101. return res.String(), pos, nil
  1102. }
  1103. func decodeBit(data []byte, nbits int, length int) (value int64, err error) {
  1104. if nbits > 1 {
  1105. switch length {
  1106. case 1:
  1107. value = int64(data[0])
  1108. case 2:
  1109. value = int64(binary.BigEndian.Uint16(data))
  1110. case 3:
  1111. value = int64(BFixedLengthInt(data[0:3]))
  1112. case 4:
  1113. value = int64(binary.BigEndian.Uint32(data))
  1114. case 5:
  1115. value = int64(BFixedLengthInt(data[0:5]))
  1116. case 6:
  1117. value = int64(BFixedLengthInt(data[0:6]))
  1118. case 7:
  1119. value = int64(BFixedLengthInt(data[0:7]))
  1120. case 8:
  1121. value = int64(binary.BigEndian.Uint64(data))
  1122. default:
  1123. err = fmt.Errorf("invalid bit length %d", length)
  1124. }
  1125. } else {
  1126. if length != 1 {
  1127. err = fmt.Errorf("invalid bit length %d", length)
  1128. } else {
  1129. value = int64(data[0])
  1130. }
  1131. }
  1132. return
  1133. }
  1134. func littleDecodeBit(data []byte, nbits int, length int) (value int64, err error) {
  1135. if nbits > 1 {
  1136. switch length {
  1137. case 1:
  1138. value = int64(data[0])
  1139. case 2:
  1140. value = int64(binary.LittleEndian.Uint16(data))
  1141. case 3:
  1142. value = int64(FixedLengthInt(data[0:3]))
  1143. case 4:
  1144. value = int64(binary.LittleEndian.Uint32(data))
  1145. case 5:
  1146. value = int64(FixedLengthInt(data[0:5]))
  1147. case 6:
  1148. value = int64(FixedLengthInt(data[0:6]))
  1149. case 7:
  1150. value = int64(FixedLengthInt(data[0:7]))
  1151. case 8:
  1152. value = int64(binary.LittleEndian.Uint64(data))
  1153. default:
  1154. err = fmt.Errorf("invalid bit length %d", length)
  1155. }
  1156. } else {
  1157. if length != 1 {
  1158. err = fmt.Errorf("invalid bit length %d", length)
  1159. } else {
  1160. value = int64(data[0])
  1161. }
  1162. }
  1163. return
  1164. }
  1165. func decodeTimestamp2(data []byte, dec uint16, timestampStringLocation *time.Location) (interface{}, int, error) {
  1166. //get timestamp binary length
  1167. n := int(4 + (dec+1)/2)
  1168. sec := int64(binary.BigEndian.Uint32(data[0:4]))
  1169. usec := int64(0)
  1170. switch dec {
  1171. case 1, 2:
  1172. usec = int64(data[4]) * 10000
  1173. case 3, 4:
  1174. usec = int64(binary.BigEndian.Uint16(data[4:])) * 100
  1175. case 5, 6:
  1176. usec = int64(BFixedLengthInt(data[4:7]))
  1177. }
  1178. if sec == 0 {
  1179. return formatZeroTime(int(usec), int(dec)), n, nil
  1180. }
  1181. return fracTime{
  1182. Time: time.Unix(sec, usec*1000),
  1183. Dec: int(dec),
  1184. timestampStringLocation: timestampStringLocation,
  1185. }, n, nil
  1186. }
  1187. const DATETIMEF_INT_OFS int64 = 0x8000000000
  1188. func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
  1189. //get datetime binary length
  1190. n := int(5 + (dec+1)/2)
  1191. intPart := int64(BFixedLengthInt(data[0:5])) - DATETIMEF_INT_OFS
  1192. var frac int64 = 0
  1193. switch dec {
  1194. case 1, 2:
  1195. frac = int64(data[5]) * 10000
  1196. case 3, 4:
  1197. frac = int64(binary.BigEndian.Uint16(data[5:7])) * 100
  1198. case 5, 6:
  1199. frac = int64(BFixedLengthInt(data[5:8]))
  1200. }
  1201. if intPart == 0 {
  1202. return formatZeroTime(int(frac), int(dec)), n, nil
  1203. }
  1204. tmp := intPart<<24 + frac
  1205. //handle sign???
  1206. if tmp < 0 {
  1207. tmp = -tmp
  1208. }
  1209. // var secPart int64 = tmp % (1 << 24)
  1210. ymdhms := tmp >> 24
  1211. ymd := ymdhms >> 17
  1212. ym := ymd >> 5
  1213. hms := ymdhms % (1 << 17)
  1214. day := int(ymd % (1 << 5))
  1215. month := int(ym % 13)
  1216. year := int(ym / 13)
  1217. second := int(hms % (1 << 6))
  1218. minute := int((hms >> 6) % (1 << 6))
  1219. hour := int((hms >> 12))
  1220. // DATETIME encoding for nonfractional part after MySQL 5.6.4
  1221. // https://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html
  1222. // integer value for 1970-01-01 00:00:00 is
  1223. // year*13+month = 25611 = 0b110010000001011
  1224. // day = 1 = 0b00001
  1225. // hour = 0 = 0b00000
  1226. // minute = 0 = 0b000000
  1227. // second = 0 = 0b000000
  1228. // integer value = 0b1100100000010110000100000000000000000 = 107420450816
  1229. if intPart < 107420450816 {
  1230. return formatBeforeUnixZeroTime(year, month, day, hour, minute, second, int(frac), int(dec)), n, nil
  1231. }
  1232. return fracTime{
  1233. Time: time.Date(year, time.Month(month), day, hour, minute, second, int(frac*1000), time.UTC),
  1234. Dec: int(dec),
  1235. }, n, nil
  1236. }
  1237. const TIMEF_OFS int64 = 0x800000000000
  1238. const TIMEF_INT_OFS int64 = 0x800000
  1239. func decodeTime2(data []byte, dec uint16) (string, int, error) {
  1240. //time binary length
  1241. n := int(3 + (dec+1)/2)
  1242. tmp := int64(0)
  1243. intPart := int64(0)
  1244. frac := int64(0)
  1245. switch dec {
  1246. case 1, 2:
  1247. intPart = int64(BFixedLengthInt(data[0:3])) - TIMEF_INT_OFS
  1248. frac = int64(data[3])
  1249. if intPart < 0 && frac != 0 {
  1250. /*
  1251. Negative values are stored with reverse fractional part order,
  1252. for binary sort compatibility.
  1253. Disk value intpart frac Time value Memory value
  1254. 800000.00 0 0 00:00:00.00 0000000000.000000
  1255. 7FFFFF.FF -1 255 -00:00:00.01 FFFFFFFFFF.FFD8F0
  1256. 7FFFFF.9D -1 99 -00:00:00.99 FFFFFFFFFF.F0E4D0
  1257. 7FFFFF.00 -1 0 -00:00:01.00 FFFFFFFFFF.000000
  1258. 7FFFFE.FF -1 255 -00:00:01.01 FFFFFFFFFE.FFD8F0
  1259. 7FFFFE.F6 -2 246 -00:00:01.10 FFFFFFFFFE.FE7960
  1260. Formula to convert fractional part from disk format
  1261. (now stored in "frac" variable) to absolute value: "0x100 - frac".
  1262. To reconstruct in-memory value, we shift
  1263. to the next integer value and then substruct fractional part.
  1264. */
  1265. intPart++ /* Shift to the next integer value */
  1266. frac -= 0x100 /* -(0x100 - frac) */
  1267. }
  1268. tmp = intPart<<24 + frac*10000
  1269. case 3, 4:
  1270. intPart = int64(BFixedLengthInt(data[0:3])) - TIMEF_INT_OFS
  1271. frac = int64(binary.BigEndian.Uint16(data[3:5]))
  1272. if intPart < 0 && frac != 0 {
  1273. /*
  1274. Fix reverse fractional part order: "0x10000 - frac".
  1275. See comments for FSP=1 and FSP=2 above.
  1276. */
  1277. intPart++ /* Shift to the next integer value */
  1278. frac -= 0x10000 /* -(0x10000-frac) */
  1279. }
  1280. tmp = intPart<<24 + frac*100
  1281. case 5, 6:
  1282. tmp = int64(BFixedLengthInt(data[0:6])) - TIMEF_OFS
  1283. return timeFormat(tmp, dec, n)
  1284. default:
  1285. intPart = int64(BFixedLengthInt(data[0:3])) - TIMEF_INT_OFS
  1286. tmp = intPart << 24
  1287. }
  1288. if intPart == 0 && frac == 0 {
  1289. return "00:00:00", n, nil
  1290. }
  1291. return timeFormat(tmp, dec, n)
  1292. }
  1293. func timeFormat(tmp int64, dec uint16, n int) (string, int, error) {
  1294. hms := int64(0)
  1295. sign := ""
  1296. if tmp < 0 {
  1297. tmp = -tmp
  1298. sign = "-"
  1299. }
  1300. hms = tmp >> 24
  1301. hour := (hms >> 12) % (1 << 10) /* 10 bits starting at 12th */
  1302. minute := (hms >> 6) % (1 << 6) /* 6 bits starting at 6th */
  1303. second := hms % (1 << 6) /* 6 bits starting at 0th */
  1304. secPart := tmp % (1 << 24)
  1305. if secPart != 0 {
  1306. s := fmt.Sprintf("%s%02d:%02d:%02d.%06d", sign, hour, minute, second, secPart)
  1307. return s[0 : len(s)-(6-int(dec))], n, nil
  1308. }
  1309. return fmt.Sprintf("%s%02d:%02d:%02d", sign, hour, minute, second), n, nil
  1310. }
  1311. func decodeBlob(data []byte, meta uint16) (v []byte, n int, err error) {
  1312. var length int
  1313. switch meta {
  1314. case 1:
  1315. length = int(data[0])
  1316. v = data[1 : 1+length]
  1317. n = length + 1
  1318. case 2:
  1319. length = int(binary.LittleEndian.Uint16(data))
  1320. v = data[2 : 2+length]
  1321. n = length + 2
  1322. case 3:
  1323. length = int(FixedLengthInt(data[0:3]))
  1324. v = data[3 : 3+length]
  1325. n = length + 3
  1326. case 4:
  1327. length = int(binary.LittleEndian.Uint32(data))
  1328. v = data[4 : 4+length]
  1329. n = length + 4
  1330. default:
  1331. err = fmt.Errorf("invalid blob packlen = %d", meta)
  1332. }
  1333. return
  1334. }
  1335. func (e *RowsEvent) Dump(w io.Writer) {
  1336. fmt.Fprintf(w, "TableID: %d\n", e.TableID)
  1337. fmt.Fprintf(w, "Flags: %d\n", e.Flags)
  1338. fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount)
  1339. fmt.Fprintf(w, "Values:\n")
  1340. for _, rows := range e.Rows {
  1341. fmt.Fprintf(w, "--\n")
  1342. for j, d := range rows {
  1343. if _, ok := d.([]byte); ok {
  1344. fmt.Fprintf(w, "%d:%q\n", j, d)
  1345. } else {
  1346. fmt.Fprintf(w, "%d:%#v\n", j, d)
  1347. }
  1348. }
  1349. }
  1350. fmt.Fprintln(w)
  1351. }
  1352. type RowsQueryEvent struct {
  1353. Query []byte
  1354. }
  1355. func (e *RowsQueryEvent) Decode(data []byte) error {
  1356. //ignore length byte 1
  1357. e.Query = data[1:]
  1358. return nil
  1359. }
  1360. func (e *RowsQueryEvent) Dump(w io.Writer) {
  1361. fmt.Fprintf(w, "Query: %s\n", e.Query)
  1362. fmt.Fprintln(w)
  1363. }