ddl.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. // Copyright 2015 PingCAP, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package model
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "math"
  18. "sync"
  19. "time"
  20. "github.com/pingcap/errors"
  21. "github.com/pingcap/parser/mysql"
  22. "github.com/pingcap/parser/terror"
  23. )
  24. // ActionType is the type for DDL action.
  25. type ActionType byte
  26. // List DDL actions.
  27. const (
  28. ActionNone ActionType = 0
  29. ActionCreateSchema ActionType = 1
  30. ActionDropSchema ActionType = 2
  31. ActionCreateTable ActionType = 3
  32. ActionDropTable ActionType = 4
  33. ActionAddColumn ActionType = 5
  34. ActionDropColumn ActionType = 6
  35. ActionAddIndex ActionType = 7
  36. ActionDropIndex ActionType = 8
  37. ActionAddForeignKey ActionType = 9
  38. ActionDropForeignKey ActionType = 10
  39. ActionTruncateTable ActionType = 11
  40. ActionModifyColumn ActionType = 12
  41. ActionRebaseAutoID ActionType = 13
  42. ActionRenameTable ActionType = 14
  43. ActionSetDefaultValue ActionType = 15
  44. ActionShardRowID ActionType = 16
  45. ActionModifyTableComment ActionType = 17
  46. ActionRenameIndex ActionType = 18
  47. ActionAddTablePartition ActionType = 19
  48. ActionDropTablePartition ActionType = 20
  49. ActionCreateView ActionType = 21
  50. ActionModifyTableCharsetAndCollate ActionType = 22
  51. ActionTruncateTablePartition ActionType = 23
  52. ActionDropView ActionType = 24
  53. ActionRecoverTable ActionType = 25
  54. ActionModifySchemaCharsetAndCollate ActionType = 26
  55. ActionLockTable ActionType = 27
  56. ActionUnlockTable ActionType = 28
  57. ActionRepairTable ActionType = 29
  58. ActionSetTiFlashReplica ActionType = 30
  59. ActionUpdateTiFlashReplicaStatus ActionType = 31
  60. ActionAddPrimaryKey ActionType = 32
  61. ActionDropPrimaryKey ActionType = 33
  62. ActionCreateSequence ActionType = 34
  63. ActionAlterSequence ActionType = 35
  64. ActionDropSequence ActionType = 36
  65. ActionAddColumns ActionType = 37
  66. ActionDropColumns ActionType = 38
  67. ActionModifyTableAutoIdCache ActionType = 39
  68. ActionRebaseAutoRandomBase ActionType = 40
  69. ActionAlterIndexVisibility ActionType = 41
  70. ActionExchangeTablePartition ActionType = 42
  71. ActionAddCheckConstraint ActionType = 43
  72. ActionDropCheckConstraint ActionType = 44
  73. ActionAlterCheckConstraint ActionType = 45
  74. ActionAlterTableAlterPartition ActionType = 46
  75. ActionRenameTables ActionType = 47
  76. ActionDropIndexes ActionType = 48
  77. )
  78. const (
  79. // AddIndexStr is a string related to the operation of "add index".
  80. AddIndexStr = "add index"
  81. AddPrimaryKeyStr = "add primary key"
  82. )
  83. var actionMap = map[ActionType]string{
  84. ActionCreateSchema: "create schema",
  85. ActionDropSchema: "drop schema",
  86. ActionCreateTable: "create table",
  87. ActionDropTable: "drop table",
  88. ActionAddColumn: "add column",
  89. ActionDropColumn: "drop column",
  90. ActionAddIndex: AddIndexStr,
  91. ActionDropIndex: "drop index",
  92. ActionAddForeignKey: "add foreign key",
  93. ActionDropForeignKey: "drop foreign key",
  94. ActionTruncateTable: "truncate table",
  95. ActionModifyColumn: "modify column",
  96. ActionRebaseAutoID: "rebase auto_increment ID",
  97. ActionRenameTable: "rename table",
  98. ActionSetDefaultValue: "set default value",
  99. ActionShardRowID: "shard row ID",
  100. ActionModifyTableComment: "modify table comment",
  101. ActionRenameIndex: "rename index",
  102. ActionAddTablePartition: "add partition",
  103. ActionDropTablePartition: "drop partition",
  104. ActionCreateView: "create view",
  105. ActionModifyTableCharsetAndCollate: "modify table charset and collate",
  106. ActionTruncateTablePartition: "truncate partition",
  107. ActionDropView: "drop view",
  108. ActionRecoverTable: "recover table",
  109. ActionModifySchemaCharsetAndCollate: "modify schema charset and collate",
  110. ActionLockTable: "lock table",
  111. ActionUnlockTable: "unlock table",
  112. ActionRepairTable: "repair table",
  113. ActionSetTiFlashReplica: "set tiflash replica",
  114. ActionUpdateTiFlashReplicaStatus: "update tiflash replica status",
  115. ActionAddPrimaryKey: AddPrimaryKeyStr,
  116. ActionDropPrimaryKey: "drop primary key",
  117. ActionCreateSequence: "create sequence",
  118. ActionAlterSequence: "alter sequence",
  119. ActionDropSequence: "drop sequence",
  120. ActionAddColumns: "add multi-columns",
  121. ActionDropColumns: "drop multi-columns",
  122. ActionModifyTableAutoIdCache: "modify auto id cache",
  123. ActionRebaseAutoRandomBase: "rebase auto_random ID",
  124. ActionAlterIndexVisibility: "alter index visibility",
  125. ActionExchangeTablePartition: "exchange partition",
  126. ActionAddCheckConstraint: "add check constraint",
  127. ActionDropCheckConstraint: "drop check constraint",
  128. ActionAlterCheckConstraint: "alter check constraint",
  129. ActionAlterTableAlterPartition: "alter partition",
  130. ActionDropIndexes: "drop multi-indexes",
  131. }
  132. // String return current ddl action in string
  133. func (action ActionType) String() string {
  134. if v, ok := actionMap[action]; ok {
  135. return v
  136. }
  137. return "none"
  138. }
  139. // HistoryInfo is used for binlog.
  140. type HistoryInfo struct {
  141. SchemaVersion int64
  142. DBInfo *DBInfo
  143. TableInfo *TableInfo
  144. FinishedTS uint64
  145. }
  146. // AddDBInfo adds schema version and schema information that are used for binlog.
  147. // dbInfo is added in the following operations: create database, drop database.
  148. func (h *HistoryInfo) AddDBInfo(schemaVer int64, dbInfo *DBInfo) {
  149. h.SchemaVersion = schemaVer
  150. h.DBInfo = dbInfo
  151. }
  152. // AddTableInfo adds schema version and table information that are used for binlog.
  153. // tblInfo is added except for the following operations: create database, drop database.
  154. func (h *HistoryInfo) AddTableInfo(schemaVer int64, tblInfo *TableInfo) {
  155. h.SchemaVersion = schemaVer
  156. h.TableInfo = tblInfo
  157. }
  158. // Clean cleans history information.
  159. func (h *HistoryInfo) Clean() {
  160. h.SchemaVersion = 0
  161. h.DBInfo = nil
  162. h.TableInfo = nil
  163. }
  164. // DDLReorgMeta is meta info of DDL reorganization.
  165. type DDLReorgMeta struct {
  166. // EndHandle is the last handle of the adding indices table.
  167. // We should only backfill indices in the range [startHandle, EndHandle].
  168. EndHandle int64 `json:"end_handle"`
  169. SQLMode mysql.SQLMode `json:"sql_mode"`
  170. Warnings map[errors.ErrorID]*terror.Error `json:"warnings"`
  171. WarningsCount map[errors.ErrorID]int64 `json:"warnings_count"`
  172. }
  173. // NewDDLReorgMeta new a DDLReorgMeta.
  174. func NewDDLReorgMeta() *DDLReorgMeta {
  175. return &DDLReorgMeta{
  176. EndHandle: math.MaxInt64,
  177. }
  178. }
  179. // Job is for a DDL operation.
  180. type Job struct {
  181. ID int64 `json:"id"`
  182. Type ActionType `json:"type"`
  183. SchemaID int64 `json:"schema_id"`
  184. TableID int64 `json:"table_id"`
  185. SchemaName string `json:"schema_name"`
  186. State JobState `json:"state"`
  187. Error *terror.Error `json:"err"`
  188. // ErrorCount will be increased, every time we meet an error when running job.
  189. ErrorCount int64 `json:"err_count"`
  190. // RowCount means the number of rows that are processed.
  191. RowCount int64 `json:"row_count"`
  192. Mu sync.Mutex `json:"-"`
  193. // CtxVars are variables attached to the job. It is for internal usage.
  194. // E.g. passing arguments between functions by one single *Job pointer.
  195. CtxVars []interface{} `json:"-"`
  196. Args []interface{} `json:"-"`
  197. // RawArgs : We must use json raw message to delay parsing special args.
  198. RawArgs json.RawMessage `json:"raw_args"`
  199. SchemaState SchemaState `json:"schema_state"`
  200. // SnapshotVer means snapshot version for this job.
  201. SnapshotVer uint64 `json:"snapshot_ver"`
  202. // StartTS uses timestamp allocated by TSO.
  203. // Now it's the TS when we put the job to TiKV queue.
  204. StartTS uint64 `json:"start_ts"`
  205. // DependencyID is the job's ID that the current job depends on.
  206. DependencyID int64 `json:"dependency_id"`
  207. // Query string of the ddl job.
  208. Query string `json:"query"`
  209. BinlogInfo *HistoryInfo `json:"binlog"`
  210. // Version indicates the DDL job version. For old jobs, it will be 0.
  211. Version int64 `json:"version"`
  212. // ReorgMeta is meta info of ddl reorganization.
  213. // This field is depreciated.
  214. ReorgMeta *DDLReorgMeta `json:"reorg_meta"`
  215. // Priority is only used to set the operation priority of adding indices.
  216. Priority int `json:"priority"`
  217. }
  218. // FinishTableJob is called when a job is finished.
  219. // It updates the job's state information and adds tblInfo to the binlog.
  220. func (job *Job) FinishTableJob(jobState JobState, schemaState SchemaState, ver int64, tblInfo *TableInfo) {
  221. job.State = jobState
  222. job.SchemaState = schemaState
  223. job.BinlogInfo.AddTableInfo(ver, tblInfo)
  224. }
  225. // FinishDBJob is called when a job is finished.
  226. // It updates the job's state information and adds dbInfo the binlog.
  227. func (job *Job) FinishDBJob(jobState JobState, schemaState SchemaState, ver int64, dbInfo *DBInfo) {
  228. job.State = jobState
  229. job.SchemaState = schemaState
  230. job.BinlogInfo.AddDBInfo(ver, dbInfo)
  231. }
  232. // TSConvert2Time converts timestamp to time.
  233. func TSConvert2Time(ts uint64) time.Time {
  234. t := int64(ts >> 18) // 18 is for the logical time.
  235. return time.Unix(t/1e3, (t%1e3)*1e6)
  236. }
  237. // SetRowCount sets the number of rows. Make sure it can pass `make race`.
  238. func (job *Job) SetRowCount(count int64) {
  239. job.Mu.Lock()
  240. defer job.Mu.Unlock()
  241. job.RowCount = count
  242. }
  243. // GetRowCount gets the number of rows. Make sure it can pass `make race`.
  244. func (job *Job) GetRowCount() int64 {
  245. job.Mu.Lock()
  246. defer job.Mu.Unlock()
  247. return job.RowCount
  248. }
  249. // SetWarnings sets the warnings of rows handled.
  250. func (job *Job) SetWarnings(warnings map[errors.ErrorID]*terror.Error, warningsCount map[errors.ErrorID]int64) {
  251. job.ReorgMeta.Warnings = warnings
  252. job.ReorgMeta.WarningsCount = warningsCount
  253. }
  254. // GetWarnings gets the warnings of the rows handled.
  255. func (job *Job) GetWarnings() (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
  256. return job.ReorgMeta.Warnings, job.ReorgMeta.WarningsCount
  257. }
  258. // Encode encodes job with json format.
  259. // updateRawArgs is used to determine whether to update the raw args.
  260. func (job *Job) Encode(updateRawArgs bool) ([]byte, error) {
  261. var err error
  262. if updateRawArgs {
  263. job.RawArgs, err = json.Marshal(job.Args)
  264. if err != nil {
  265. return nil, errors.Trace(err)
  266. }
  267. }
  268. var b []byte
  269. job.Mu.Lock()
  270. defer job.Mu.Unlock()
  271. b, err = json.Marshal(job)
  272. return b, errors.Trace(err)
  273. }
  274. // Decode decodes job from the json buffer, we must use DecodeArgs later to
  275. // decode special args for this job.
  276. func (job *Job) Decode(b []byte) error {
  277. err := json.Unmarshal(b, job)
  278. return errors.Trace(err)
  279. }
  280. // DecodeArgs decodes job args.
  281. func (job *Job) DecodeArgs(args ...interface{}) error {
  282. var rawArgs []json.RawMessage
  283. if err := json.Unmarshal(job.RawArgs, &rawArgs); err != nil {
  284. return errors.Trace(err)
  285. }
  286. sz := len(rawArgs)
  287. if sz > len(args) {
  288. sz = len(args)
  289. }
  290. for i := 0; i < sz; i++ {
  291. if err := json.Unmarshal(rawArgs[i], args[i]); err != nil {
  292. return errors.Trace(err)
  293. }
  294. }
  295. job.Args = args[:sz]
  296. return nil
  297. }
  298. // String implements fmt.Stringer interface.
  299. func (job *Job) String() string {
  300. rowCount := job.GetRowCount()
  301. return fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaState:%s, SchemaID:%d, TableID:%d, RowCount:%d, ArgLen:%d, start time: %v, Err:%v, ErrCount:%d, SnapshotVersion:%v",
  302. job.ID, job.Type, job.State, job.SchemaState, job.SchemaID, job.TableID, rowCount, len(job.Args), TSConvert2Time(job.StartTS), job.Error, job.ErrorCount, job.SnapshotVer)
  303. }
  304. func (job *Job) hasDependentSchema(other *Job) (bool, error) {
  305. if other.Type == ActionDropSchema || other.Type == ActionCreateSchema {
  306. if other.SchemaID == job.SchemaID {
  307. return true, nil
  308. }
  309. if job.Type == ActionRenameTable {
  310. var oldSchemaID int64
  311. if err := job.DecodeArgs(&oldSchemaID); err != nil {
  312. return false, errors.Trace(err)
  313. }
  314. if other.SchemaID == oldSchemaID {
  315. return true, nil
  316. }
  317. }
  318. }
  319. return false, nil
  320. }
  321. // IsDependentOn returns whether the job depends on "other".
  322. // How to check the job depends on "other"?
  323. // 1. The two jobs handle the same database when one of the two jobs is an ActionDropSchema or ActionCreateSchema type.
  324. // 2. Or the two jobs handle the same table.
  325. func (job *Job) IsDependentOn(other *Job) (bool, error) {
  326. isDependent, err := job.hasDependentSchema(other)
  327. if err != nil || isDependent {
  328. return isDependent, errors.Trace(err)
  329. }
  330. isDependent, err = other.hasDependentSchema(job)
  331. if err != nil || isDependent {
  332. return isDependent, errors.Trace(err)
  333. }
  334. // TODO: If a job is ActionRenameTable, we need to check table name.
  335. if other.TableID == job.TableID {
  336. return true, nil
  337. }
  338. return false, nil
  339. }
  340. // IsFinished returns whether job is finished or not.
  341. // If the job state is Done or Cancelled, it is finished.
  342. func (job *Job) IsFinished() bool {
  343. return job.State == JobStateDone || job.State == JobStateRollbackDone || job.State == JobStateCancelled
  344. }
  345. // IsCancelled returns whether the job is cancelled or not.
  346. func (job *Job) IsCancelled() bool {
  347. return job.State == JobStateCancelled
  348. }
  349. // IsRollbackDone returns whether the job is rolled back or not.
  350. func (job *Job) IsRollbackDone() bool {
  351. return job.State == JobStateRollbackDone
  352. }
  353. // IsRollingback returns whether the job is rolling back or not.
  354. func (job *Job) IsRollingback() bool {
  355. return job.State == JobStateRollingback
  356. }
  357. // IsCancelling returns whether the job is cancelling or not.
  358. func (job *Job) IsCancelling() bool {
  359. return job.State == JobStateCancelling
  360. }
  361. // IsSynced returns whether the DDL modification is synced among all TiDB servers.
  362. func (job *Job) IsSynced() bool {
  363. return job.State == JobStateSynced
  364. }
  365. // IsDone returns whether job is done.
  366. func (job *Job) IsDone() bool {
  367. return job.State == JobStateDone
  368. }
  369. // IsRunning returns whether job is still running or not.
  370. func (job *Job) IsRunning() bool {
  371. return job.State == JobStateRunning
  372. }
  373. // JobState is for job state.
  374. type JobState byte
  375. // List job states.
  376. const (
  377. JobStateNone JobState = 0
  378. JobStateRunning JobState = 1
  379. // When DDL encountered an unrecoverable error at reorganization state,
  380. // some keys has been added already, we need to remove them.
  381. // JobStateRollingback is the state to do the rolling back job.
  382. JobStateRollingback JobState = 2
  383. JobStateRollbackDone JobState = 3
  384. JobStateDone JobState = 4
  385. JobStateCancelled JobState = 5
  386. // JobStateSynced is used to mark the information about the completion of this job
  387. // has been synchronized to all servers.
  388. JobStateSynced JobState = 6
  389. // JobStateCancelling is used to mark the DDL job is cancelled by the client, but the DDL work hasn't handle it.
  390. JobStateCancelling JobState = 7
  391. )
  392. // String implements fmt.Stringer interface.
  393. func (s JobState) String() string {
  394. switch s {
  395. case JobStateRunning:
  396. return "running"
  397. case JobStateRollingback:
  398. return "rollingback"
  399. case JobStateRollbackDone:
  400. return "rollback done"
  401. case JobStateDone:
  402. return "done"
  403. case JobStateCancelled:
  404. return "cancelled"
  405. case JobStateCancelling:
  406. return "cancelling"
  407. case JobStateSynced:
  408. return "synced"
  409. default:
  410. return "none"
  411. }
  412. }
  413. // SchemaDiff contains the schema modification at a particular schema version.
  414. // It is used to reduce schema reload cost.
  415. type SchemaDiff struct {
  416. Version int64 `json:"version"`
  417. Type ActionType `json:"type"`
  418. SchemaID int64 `json:"schema_id"`
  419. TableID int64 `json:"table_id"`
  420. // OldTableID is the table ID before truncate, only used by truncate table DDL.
  421. OldTableID int64 `json:"old_table_id"`
  422. // OldSchemaID is the schema ID before rename table, only used by rename table DDL.
  423. OldSchemaID int64 `json:"old_schema_id"`
  424. AffectedOpts []*AffectedOption `json:"affected_options"`
  425. }
  426. // AffectedOption is used when a ddl affects multi tables.
  427. type AffectedOption struct {
  428. SchemaID int64 `json:"schema_id"`
  429. TableID int64 `json:"table_id"`
  430. OldTableID int64 `json:"old_table_id"`
  431. OldSchemaID int64 `json:"old_schema_id"`
  432. }