json_binary.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. package replication
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "math"
  6. . "github.com/go-mysql-org/go-mysql/mysql"
  7. "github.com/pingcap/errors"
  8. "github.com/siddontang/go/hack"
  9. )
  10. const (
  11. JSONB_SMALL_OBJECT byte = iota // small JSON object
  12. JSONB_LARGE_OBJECT // large JSON object
  13. JSONB_SMALL_ARRAY // small JSON array
  14. JSONB_LARGE_ARRAY // large JSON array
  15. JSONB_LITERAL // literal (true/false/null)
  16. JSONB_INT16 // int16
  17. JSONB_UINT16 // uint16
  18. JSONB_INT32 // int32
  19. JSONB_UINT32 // uint32
  20. JSONB_INT64 // int64
  21. JSONB_UINT64 // uint64
  22. JSONB_DOUBLE // double
  23. JSONB_STRING // string
  24. JSONB_OPAQUE byte = 0x0f // custom data (any MySQL data type)
  25. )
  26. const (
  27. JSONB_NULL_LITERAL byte = 0x00
  28. JSONB_TRUE_LITERAL byte = 0x01
  29. JSONB_FALSE_LITERAL byte = 0x02
  30. )
  31. const (
  32. jsonbSmallOffsetSize = 2
  33. jsonbLargeOffsetSize = 4
  34. jsonbKeyEntrySizeSmall = 2 + jsonbSmallOffsetSize
  35. jsonbKeyEntrySizeLarge = 2 + jsonbLargeOffsetSize
  36. jsonbValueEntrySizeSmall = 1 + jsonbSmallOffsetSize
  37. jsonbValueEntrySizeLarge = 1 + jsonbLargeOffsetSize
  38. )
  39. func jsonbGetOffsetSize(isSmall bool) int {
  40. if isSmall {
  41. return jsonbSmallOffsetSize
  42. }
  43. return jsonbLargeOffsetSize
  44. }
  45. func jsonbGetKeyEntrySize(isSmall bool) int {
  46. if isSmall {
  47. return jsonbKeyEntrySizeSmall
  48. }
  49. return jsonbKeyEntrySizeLarge
  50. }
  51. func jsonbGetValueEntrySize(isSmall bool) int {
  52. if isSmall {
  53. return jsonbValueEntrySizeSmall
  54. }
  55. return jsonbValueEntrySizeLarge
  56. }
  57. // decodeJsonBinary decodes the JSON binary encoding data and returns
  58. // the common JSON encoding data.
  59. func (e *RowsEvent) decodeJsonBinary(data []byte) ([]byte, error) {
  60. // Sometimes, we can insert a NULL JSON even we set the JSON field as NOT NULL.
  61. // If we meet this case, we can return an empty slice.
  62. if len(data) == 0 {
  63. return []byte{}, nil
  64. }
  65. d := jsonBinaryDecoder{
  66. useDecimal: e.useDecimal,
  67. ignoreDecodeErr: e.ignoreJSONDecodeErr,
  68. }
  69. if d.isDataShort(data, 1) {
  70. return nil, d.err
  71. }
  72. v := d.decodeValue(data[0], data[1:])
  73. if d.err != nil {
  74. return nil, d.err
  75. }
  76. return json.Marshal(v)
  77. }
  78. type jsonBinaryDecoder struct {
  79. useDecimal bool
  80. ignoreDecodeErr bool
  81. err error
  82. }
  83. func (d *jsonBinaryDecoder) decodeValue(tp byte, data []byte) interface{} {
  84. if d.err != nil {
  85. return nil
  86. }
  87. switch tp {
  88. case JSONB_SMALL_OBJECT:
  89. return d.decodeObjectOrArray(data, true, true)
  90. case JSONB_LARGE_OBJECT:
  91. return d.decodeObjectOrArray(data, false, true)
  92. case JSONB_SMALL_ARRAY:
  93. return d.decodeObjectOrArray(data, true, false)
  94. case JSONB_LARGE_ARRAY:
  95. return d.decodeObjectOrArray(data, false, false)
  96. case JSONB_LITERAL:
  97. return d.decodeLiteral(data)
  98. case JSONB_INT16:
  99. return d.decodeInt16(data)
  100. case JSONB_UINT16:
  101. return d.decodeUint16(data)
  102. case JSONB_INT32:
  103. return d.decodeInt32(data)
  104. case JSONB_UINT32:
  105. return d.decodeUint32(data)
  106. case JSONB_INT64:
  107. return d.decodeInt64(data)
  108. case JSONB_UINT64:
  109. return d.decodeUint64(data)
  110. case JSONB_DOUBLE:
  111. return d.decodeDouble(data)
  112. case JSONB_STRING:
  113. return d.decodeString(data)
  114. case JSONB_OPAQUE:
  115. return d.decodeOpaque(data)
  116. default:
  117. d.err = errors.Errorf("invalid json type %d", tp)
  118. }
  119. return nil
  120. }
  121. func (d *jsonBinaryDecoder) decodeObjectOrArray(data []byte, isSmall bool, isObject bool) interface{} {
  122. offsetSize := jsonbGetOffsetSize(isSmall)
  123. if d.isDataShort(data, 2*offsetSize) {
  124. return nil
  125. }
  126. count := d.decodeCount(data, isSmall)
  127. size := d.decodeCount(data[offsetSize:], isSmall)
  128. if d.isDataShort(data, size) {
  129. // Before MySQL 5.7.22, json type generated column may have invalid value,
  130. // bug ref: https://bugs.mysql.com/bug.php?id=88791
  131. // As generated column value is not used in replication, we can just ignore
  132. // this error and return a dummy value for this column.
  133. if d.ignoreDecodeErr {
  134. d.err = nil
  135. }
  136. return nil
  137. }
  138. keyEntrySize := jsonbGetKeyEntrySize(isSmall)
  139. valueEntrySize := jsonbGetValueEntrySize(isSmall)
  140. headerSize := 2*offsetSize + count*valueEntrySize
  141. if isObject {
  142. headerSize += count * keyEntrySize
  143. }
  144. if headerSize > size {
  145. d.err = errors.Errorf("header size %d > size %d", headerSize, size)
  146. return nil
  147. }
  148. var keys []string
  149. if isObject {
  150. keys = make([]string, count)
  151. for i := 0; i < count; i++ {
  152. // decode key
  153. entryOffset := 2*offsetSize + keyEntrySize*i
  154. keyOffset := d.decodeCount(data[entryOffset:], isSmall)
  155. keyLength := int(d.decodeUint16(data[entryOffset+offsetSize:]))
  156. // Key must start after value entry
  157. if keyOffset < headerSize {
  158. d.err = errors.Errorf("invalid key offset %d, must > %d", keyOffset, headerSize)
  159. return nil
  160. }
  161. if d.isDataShort(data, keyOffset+keyLength) {
  162. return nil
  163. }
  164. keys[i] = hack.String(data[keyOffset : keyOffset+keyLength])
  165. }
  166. }
  167. if d.err != nil {
  168. return nil
  169. }
  170. values := make([]interface{}, count)
  171. for i := 0; i < count; i++ {
  172. // decode value
  173. entryOffset := 2*offsetSize + valueEntrySize*i
  174. if isObject {
  175. entryOffset += keyEntrySize * count
  176. }
  177. tp := data[entryOffset]
  178. if isInlineValue(tp, isSmall) {
  179. values[i] = d.decodeValue(tp, data[entryOffset+1:entryOffset+valueEntrySize])
  180. continue
  181. }
  182. valueOffset := d.decodeCount(data[entryOffset+1:], isSmall)
  183. if d.isDataShort(data, valueOffset) {
  184. return nil
  185. }
  186. values[i] = d.decodeValue(tp, data[valueOffset:])
  187. }
  188. if d.err != nil {
  189. return nil
  190. }
  191. if !isObject {
  192. return values
  193. }
  194. m := make(map[string]interface{}, count)
  195. for i := 0; i < count; i++ {
  196. m[keys[i]] = values[i]
  197. }
  198. return m
  199. }
  200. func isInlineValue(tp byte, isSmall bool) bool {
  201. switch tp {
  202. case JSONB_INT16, JSONB_UINT16, JSONB_LITERAL:
  203. return true
  204. case JSONB_INT32, JSONB_UINT32:
  205. return !isSmall
  206. }
  207. return false
  208. }
  209. func (d *jsonBinaryDecoder) decodeLiteral(data []byte) interface{} {
  210. if d.isDataShort(data, 1) {
  211. return nil
  212. }
  213. tp := data[0]
  214. switch tp {
  215. case JSONB_NULL_LITERAL:
  216. return nil
  217. case JSONB_TRUE_LITERAL:
  218. return true
  219. case JSONB_FALSE_LITERAL:
  220. return false
  221. }
  222. d.err = errors.Errorf("invalid literal %c", tp)
  223. return nil
  224. }
  225. func (d *jsonBinaryDecoder) isDataShort(data []byte, expected int) bool {
  226. if d.err != nil {
  227. return true
  228. }
  229. if len(data) < expected {
  230. d.err = errors.Errorf("data len %d < expected %d", len(data), expected)
  231. }
  232. return d.err != nil
  233. }
  234. func (d *jsonBinaryDecoder) decodeInt16(data []byte) int16 {
  235. if d.isDataShort(data, 2) {
  236. return 0
  237. }
  238. v := ParseBinaryInt16(data[0:2])
  239. return v
  240. }
  241. func (d *jsonBinaryDecoder) decodeUint16(data []byte) uint16 {
  242. if d.isDataShort(data, 2) {
  243. return 0
  244. }
  245. v := ParseBinaryUint16(data[0:2])
  246. return v
  247. }
  248. func (d *jsonBinaryDecoder) decodeInt32(data []byte) int32 {
  249. if d.isDataShort(data, 4) {
  250. return 0
  251. }
  252. v := ParseBinaryInt32(data[0:4])
  253. return v
  254. }
  255. func (d *jsonBinaryDecoder) decodeUint32(data []byte) uint32 {
  256. if d.isDataShort(data, 4) {
  257. return 0
  258. }
  259. v := ParseBinaryUint32(data[0:4])
  260. return v
  261. }
  262. func (d *jsonBinaryDecoder) decodeInt64(data []byte) int64 {
  263. if d.isDataShort(data, 8) {
  264. return 0
  265. }
  266. v := ParseBinaryInt64(data[0:8])
  267. return v
  268. }
  269. func (d *jsonBinaryDecoder) decodeUint64(data []byte) uint64 {
  270. if d.isDataShort(data, 8) {
  271. return 0
  272. }
  273. v := ParseBinaryUint64(data[0:8])
  274. return v
  275. }
  276. func (d *jsonBinaryDecoder) decodeDouble(data []byte) float64 {
  277. if d.isDataShort(data, 8) {
  278. return 0
  279. }
  280. v := ParseBinaryFloat64(data[0:8])
  281. return v
  282. }
  283. func (d *jsonBinaryDecoder) decodeString(data []byte) string {
  284. if d.err != nil {
  285. return ""
  286. }
  287. l, n := d.decodeVariableLength(data)
  288. if d.isDataShort(data, l+n) {
  289. return ""
  290. }
  291. data = data[n:]
  292. v := hack.String(data[0:l])
  293. return v
  294. }
  295. func (d *jsonBinaryDecoder) decodeOpaque(data []byte) interface{} {
  296. if d.isDataShort(data, 1) {
  297. return nil
  298. }
  299. tp := data[0]
  300. data = data[1:]
  301. l, n := d.decodeVariableLength(data)
  302. if d.isDataShort(data, l+n) {
  303. return nil
  304. }
  305. data = data[n : l+n]
  306. switch tp {
  307. case MYSQL_TYPE_NEWDECIMAL:
  308. return d.decodeDecimal(data)
  309. case MYSQL_TYPE_TIME:
  310. return d.decodeTime(data)
  311. case MYSQL_TYPE_DATE, MYSQL_TYPE_DATETIME, MYSQL_TYPE_TIMESTAMP:
  312. return d.decodeDateTime(data)
  313. default:
  314. return hack.String(data)
  315. }
  316. }
  317. func (d *jsonBinaryDecoder) decodeDecimal(data []byte) interface{} {
  318. precision := int(data[0])
  319. scale := int(data[1])
  320. v, _, err := decodeDecimal(data[2:], precision, scale, d.useDecimal)
  321. d.err = err
  322. return v
  323. }
  324. func (d *jsonBinaryDecoder) decodeTime(data []byte) interface{} {
  325. v := d.decodeInt64(data)
  326. if v == 0 {
  327. return "00:00:00"
  328. }
  329. sign := ""
  330. if v < 0 {
  331. sign = "-"
  332. v = -v
  333. }
  334. intPart := v >> 24
  335. hour := (intPart >> 12) % (1 << 10)
  336. min := (intPart >> 6) % (1 << 6)
  337. sec := intPart % (1 << 6)
  338. frac := v % (1 << 24)
  339. return fmt.Sprintf("%s%02d:%02d:%02d.%06d", sign, hour, min, sec, frac)
  340. }
  341. func (d *jsonBinaryDecoder) decodeDateTime(data []byte) interface{} {
  342. v := d.decodeInt64(data)
  343. if v == 0 {
  344. return "0000-00-00 00:00:00"
  345. }
  346. // handle negative?
  347. if v < 0 {
  348. v = -v
  349. }
  350. intPart := v >> 24
  351. ymd := intPart >> 17
  352. ym := ymd >> 5
  353. hms := intPart % (1 << 17)
  354. year := ym / 13
  355. month := ym % 13
  356. day := ymd % (1 << 5)
  357. hour := (hms >> 12)
  358. minute := (hms >> 6) % (1 << 6)
  359. second := hms % (1 << 6)
  360. frac := v % (1 << 24)
  361. return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d.%06d", year, month, day, hour, minute, second, frac)
  362. }
  363. func (d *jsonBinaryDecoder) decodeCount(data []byte, isSmall bool) int {
  364. if isSmall {
  365. v := d.decodeUint16(data)
  366. return int(v)
  367. }
  368. return int(d.decodeUint32(data))
  369. }
  370. func (d *jsonBinaryDecoder) decodeVariableLength(data []byte) (int, int) {
  371. // The max size for variable length is math.MaxUint32, so
  372. // here we can use 5 bytes to save it.
  373. maxCount := 5
  374. if len(data) < maxCount {
  375. maxCount = len(data)
  376. }
  377. pos := 0
  378. length := uint64(0)
  379. for ; pos < maxCount; pos++ {
  380. v := data[pos]
  381. length |= uint64(v&0x7F) << uint(7*pos)
  382. if v&0x80 == 0 {
  383. if length > math.MaxUint32 {
  384. d.err = errors.Errorf("variable length %d must <= %d", length, int64(math.MaxUint32))
  385. return 0, 0
  386. }
  387. pos += 1
  388. // TODO: should consider length overflow int here.
  389. return int(length), pos
  390. }
  391. }
  392. d.err = errors.New("decode variable length failed")
  393. return 0, 0
  394. }