rows.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package canal
  2. import (
  3. "fmt"
  4. "github.com/go-mysql-org/go-mysql/replication"
  5. "github.com/go-mysql-org/go-mysql/schema"
  6. )
  7. // The action name for sync.
  8. const (
  9. UpdateAction = "update"
  10. InsertAction = "insert"
  11. DeleteAction = "delete"
  12. )
  13. // RowsEvent is the event for row replication.
  14. type RowsEvent struct {
  15. Table *schema.Table
  16. Action string
  17. // changed row list
  18. // binlog has three update event version, v0, v1 and v2.
  19. // for v1 and v2, the rows number must be even.
  20. // Two rows for one event, format is [before update row, after update row]
  21. // for update v0, only one row for a event, and we don't support this version.
  22. Rows [][]interface{}
  23. // Header can be used to inspect the event
  24. Header *replication.EventHeader
  25. }
  26. func newRowsEvent(table *schema.Table, action string, rows [][]interface{}, header *replication.EventHeader) *RowsEvent {
  27. e := new(RowsEvent)
  28. e.Table = table
  29. e.Action = action
  30. e.Rows = rows
  31. e.Header = header
  32. e.handleUnsigned()
  33. return e
  34. }
  35. const maxMediumintUnsigned int32 = 16777215
  36. func (r *RowsEvent) handleUnsigned() {
  37. // Handle Unsigned Columns here, for binlog replication, we can't know the integer is unsigned or not,
  38. // so we use int type but this may cause overflow outside sometimes, so we must convert to the really .
  39. // unsigned type
  40. if len(r.Table.UnsignedColumns) == 0 {
  41. return
  42. }
  43. for i := 0; i < len(r.Rows); i++ {
  44. for _, columnIdx := range r.Table.UnsignedColumns {
  45. switch value := r.Rows[i][columnIdx].(type) {
  46. case int8:
  47. r.Rows[i][columnIdx] = uint8(value)
  48. case int16:
  49. r.Rows[i][columnIdx] = uint16(value)
  50. case int32:
  51. // problem with mediumint is that it's a 3-byte type. There is no compatible golang type to match that.
  52. // So to convert from negative to positive we'd need to convert the value manually
  53. if value < 0 && r.Table.Columns[columnIdx].Type == schema.TYPE_MEDIUM_INT {
  54. r.Rows[i][columnIdx] = uint32(maxMediumintUnsigned + value + 1)
  55. } else {
  56. r.Rows[i][columnIdx] = uint32(value)
  57. }
  58. case int64:
  59. r.Rows[i][columnIdx] = uint64(value)
  60. case int:
  61. r.Rows[i][columnIdx] = uint(value)
  62. default:
  63. // nothing to do
  64. }
  65. }
  66. }
  67. }
  68. // String implements fmt.Stringer interface.
  69. func (r *RowsEvent) String() string {
  70. return fmt.Sprintf("%s %s %v", r.Action, r.Table, r.Rows)
  71. }