fdb_tsdb.c 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980
  1. /*
  2. * Copyright (c) 2020, Armink, <armink.ztl@gmail.com>
  3. *
  4. * SPDX-License-Identifier: Apache-2.0
  5. */
  6. /**
  7. * @file
  8. * @brief TSDB feature.
  9. *
  10. * Time series log (like TSDB) feature implement source file.
  11. *
  12. * TSL is time series log, the TSDB saved many TSLs.
  13. */
  14. #include <inttypes.h>
  15. #include <string.h>
  16. #include <flashdb.h>
  17. #include <fdb_low_lvl.h>
  18. #define FDB_LOG_TAG "[tsl]"
  19. /* rewrite log prefix */
  20. #undef FDB_LOG_PREFIX2
  21. #define FDB_LOG_PREFIX2() FDB_PRINT("[%s] ", db_name(db))
  22. #if defined(FDB_USING_TSDB)
  23. /* magic word(`T`, `S`, `L`, `0`) */
  24. #define SECTOR_MAGIC_WORD 0x304C5354
  25. #define TSL_STATUS_TABLE_SIZE FDB_STATUS_TABLE_SIZE(FDB_TSL_STATUS_NUM)
  26. #define SECTOR_HDR_DATA_SIZE (FDB_WG_ALIGN(sizeof(struct sector_hdr_data)))
  27. #define LOG_IDX_DATA_SIZE (FDB_WG_ALIGN(sizeof(struct log_idx_data)))
  28. #define LOG_IDX_TS_OFFSET ((unsigned long)(&((struct log_idx_data *)0)->time))
  29. #define SECTOR_MAGIC_OFFSET ((unsigned long)(&((struct sector_hdr_data *)0)->magic))
  30. #define SECTOR_START_TIME_OFFSET ((unsigned long)(&((struct sector_hdr_data *)0)->start_time))
  31. #define SECTOR_END0_TIME_OFFSET ((unsigned long)(&((struct sector_hdr_data *)0)->end_info[0].time))
  32. #define SECTOR_END0_IDX_OFFSET ((unsigned long)(&((struct sector_hdr_data *)0)->end_info[0].index))
  33. #define SECTOR_END0_STATUS_OFFSET ((unsigned long)(&((struct sector_hdr_data *)0)->end_info[0].status))
  34. #define SECTOR_END1_TIME_OFFSET ((unsigned long)(&((struct sector_hdr_data *)0)->end_info[1].time))
  35. #define SECTOR_END1_IDX_OFFSET ((unsigned long)(&((struct sector_hdr_data *)0)->end_info[1].index))
  36. #define SECTOR_END1_STATUS_OFFSET ((unsigned long)(&((struct sector_hdr_data *)0)->end_info[1].status))
  37. /* the next address is get failed */
  38. #define FAILED_ADDR 0xFFFFFFFF
  39. #define db_name(db) (((fdb_db_t)db)->name)
  40. #define db_init_ok(db) (((fdb_db_t)db)->init_ok)
  41. #define db_sec_size(db) (((fdb_db_t)db)->sec_size)
  42. #define db_max_size(db) (((fdb_db_t)db)->max_size)
  43. #define db_lock(db) \
  44. do { \
  45. if (((fdb_db_t)db)->lock) ((fdb_db_t)db)->lock((fdb_db_t)db); \
  46. } while(0);
  47. #define db_unlock(db) \
  48. do { \
  49. if (((fdb_db_t)db)->unlock) ((fdb_db_t)db)->unlock((fdb_db_t)db); \
  50. } while(0);
  51. #define _FDB_WRITE_STATUS(db, addr, status_table, status_num, status_index, sync) \
  52. do { \
  53. result = _fdb_write_status((fdb_db_t)db, addr, status_table, status_num, status_index, sync);\
  54. if (result != FDB_NO_ERR) return result; \
  55. } while(0);
  56. #define FLASH_WRITE(db, addr, buf, size, sync) \
  57. do { \
  58. result = _fdb_flash_write((fdb_db_t)db, addr, buf, size, sync); \
  59. if (result != FDB_NO_ERR) return result; \
  60. } while(0);
  61. struct sector_hdr_data {
  62. uint8_t status[FDB_STORE_STATUS_TABLE_SIZE]; /**< sector store status @see fdb_sector_store_status_t */
  63. uint32_t magic; /**< magic word(`T`, `S`, `L`, `0`) */
  64. fdb_time_t start_time; /**< the first start node's timestamp */
  65. struct {
  66. fdb_time_t time; /**< the last end node's timestamp */
  67. uint32_t index; /**< the last end node's index */
  68. uint8_t status[TSL_STATUS_TABLE_SIZE]; /**< end node status, @see fdb_tsl_status_t */
  69. } end_info[2];
  70. uint32_t reserved;
  71. };
  72. typedef struct sector_hdr_data *sector_hdr_data_t;
  73. /* time series log node index data */
  74. struct log_idx_data {
  75. uint8_t status_table[TSL_STATUS_TABLE_SIZE]; /**< node status, @see fdb_tsl_status_t */
  76. fdb_time_t time; /**< node timestamp */
  77. uint32_t log_len; /**< node total length (header + name + value), must align by FDB_WRITE_GRAN */
  78. uint32_t log_addr; /**< node address */
  79. };
  80. typedef struct log_idx_data *log_idx_data_t;
  81. struct query_count_args {
  82. fdb_tsl_status_t status;
  83. size_t count;
  84. };
  85. struct check_sec_hdr_cb_args {
  86. fdb_tsdb_t db;
  87. bool check_failed;
  88. size_t empty_num;
  89. uint32_t empty_addr;
  90. };
  91. static fdb_err_t read_tsl(fdb_tsdb_t db, fdb_tsl_t tsl)
  92. {
  93. struct log_idx_data idx;
  94. /* read TSL index raw data */
  95. _fdb_flash_read((fdb_db_t)db, tsl->addr.index, (uint32_t *) &idx, sizeof(struct log_idx_data));
  96. tsl->status = (fdb_tsl_status_t) _fdb_get_status(idx.status_table, FDB_TSL_STATUS_NUM);
  97. if ((tsl->status == FDB_TSL_PRE_WRITE) || (tsl->status == FDB_TSL_UNUSED)) {
  98. tsl->log_len = db->max_len;
  99. tsl->addr.log = FDB_DATA_UNUSED;
  100. tsl->time = 0;
  101. } else {
  102. tsl->log_len = idx.log_len;
  103. tsl->addr.log = idx.log_addr;
  104. tsl->time = idx.time;
  105. }
  106. return FDB_NO_ERR;
  107. }
  108. static uint32_t get_next_sector_addr(fdb_tsdb_t db, tsdb_sec_info_t pre_sec, uint32_t traversed_len)
  109. {
  110. if (traversed_len + db_sec_size(db) <= db_max_size(db)) {
  111. if (pre_sec->addr + db_sec_size(db) < db_max_size(db)) {
  112. return pre_sec->addr + db_sec_size(db);
  113. } else {
  114. /* the next sector is on the top of the database */
  115. return 0;
  116. }
  117. } else {
  118. /* finished */
  119. return FAILED_ADDR;
  120. }
  121. }
  122. static uint32_t get_next_tsl_addr(tsdb_sec_info_t sector, fdb_tsl_t pre_tsl)
  123. {
  124. uint32_t addr = FAILED_ADDR;
  125. if (sector->status == FDB_SECTOR_STORE_EMPTY) {
  126. return FAILED_ADDR;
  127. }
  128. if (pre_tsl->addr.index + LOG_IDX_DATA_SIZE <= sector->end_idx) {
  129. addr = pre_tsl->addr.index + LOG_IDX_DATA_SIZE;
  130. } else {
  131. /* no TSL */
  132. return FAILED_ADDR;
  133. }
  134. return addr;
  135. }
  136. static uint32_t get_last_tsl_addr(tsdb_sec_info_t sector, fdb_tsl_t pre_tsl)
  137. {
  138. uint32_t addr = FAILED_ADDR;
  139. if (sector->status == FDB_SECTOR_STORE_EMPTY) {
  140. return FAILED_ADDR;
  141. }
  142. if (pre_tsl->addr.index >= (sector->addr + SECTOR_HDR_DATA_SIZE + LOG_IDX_DATA_SIZE)) {
  143. addr = pre_tsl->addr.index - LOG_IDX_DATA_SIZE;
  144. } else {
  145. return FAILED_ADDR;
  146. }
  147. return addr;
  148. }
  149. static uint32_t get_last_sector_addr(fdb_tsdb_t db, tsdb_sec_info_t pre_sec, uint32_t traversed_len)
  150. {
  151. if (traversed_len + db_sec_size(db) <= db_max_size(db)) {
  152. if (pre_sec->addr >= db_sec_size(db)) {
  153. /* the next sector is previous sector */
  154. return pre_sec->addr - db_sec_size(db);
  155. } else {
  156. /* the next sector is the last sector */
  157. return db_max_size(db) - db_sec_size(db);
  158. }
  159. } else {
  160. return FAILED_ADDR;
  161. }
  162. }
  163. static fdb_err_t read_sector_info(fdb_tsdb_t db, uint32_t addr, tsdb_sec_info_t sector, bool traversal)
  164. {
  165. fdb_err_t result = FDB_NO_ERR;
  166. struct sector_hdr_data sec_hdr;
  167. FDB_ASSERT(sector);
  168. /* read sector header raw data */
  169. _fdb_flash_read((fdb_db_t)db, addr, (uint32_t *)&sec_hdr, sizeof(struct sector_hdr_data));
  170. sector->addr = addr;
  171. sector->magic = sec_hdr.magic;
  172. /* check magic word */
  173. if (sector->magic != SECTOR_MAGIC_WORD) {
  174. sector->check_ok = false;
  175. return FDB_INIT_FAILED;
  176. }
  177. sector->check_ok = true;
  178. sector->status = (fdb_sector_store_status_t) _fdb_get_status(sec_hdr.status, FDB_SECTOR_STORE_STATUS_NUM);
  179. sector->start_time = sec_hdr.start_time;
  180. sector->end_info_stat[0] = (fdb_tsl_status_t) _fdb_get_status(sec_hdr.end_info[0].status, FDB_TSL_STATUS_NUM);
  181. sector->end_info_stat[1] = (fdb_tsl_status_t) _fdb_get_status(sec_hdr.end_info[1].status, FDB_TSL_STATUS_NUM);
  182. if (sector->end_info_stat[0] == FDB_TSL_WRITE) {
  183. sector->end_time = sec_hdr.end_info[0].time;
  184. sector->end_idx = sec_hdr.end_info[0].index;
  185. } else if (sector->end_info_stat[1] == FDB_TSL_WRITE) {
  186. sector->end_time = sec_hdr.end_info[1].time;
  187. sector->end_idx = sec_hdr.end_info[1].index;
  188. } else if (sector->end_info_stat[0] == FDB_TSL_PRE_WRITE && sector->end_info_stat[1] == FDB_TSL_PRE_WRITE) {
  189. //TODO There is no valid end node info on this sector, need impl fast query this sector by fdb_tsl_iter_by_time
  190. FDB_ASSERT(0);
  191. }
  192. /* traversal all TSL and calculate the remain space size */
  193. sector->empty_idx = sector->addr + SECTOR_HDR_DATA_SIZE;
  194. sector->empty_data = sector->addr + db_sec_size(db);
  195. /* the TSL's data is saved from sector bottom, and the TSL's index saved from the sector top */
  196. sector->remain = sector->empty_data - sector->empty_idx;
  197. if (sector->status == FDB_SECTOR_STORE_USING && traversal) {
  198. struct fdb_tsl tsl;
  199. tsl.addr.index = sector->empty_idx;
  200. while (read_tsl(db, &tsl) == FDB_NO_ERR) {
  201. if (tsl.status == FDB_TSL_UNUSED) {
  202. break;
  203. }
  204. sector->end_time = tsl.time;
  205. sector->end_idx = tsl.addr.index;
  206. sector->empty_idx += LOG_IDX_DATA_SIZE;
  207. sector->empty_data -= FDB_WG_ALIGN(tsl.log_len);
  208. tsl.addr.index += LOG_IDX_DATA_SIZE;
  209. if (sector->remain > LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(tsl.log_len)) {
  210. sector->remain -= (LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(tsl.log_len));
  211. } else {
  212. FDB_INFO("Error: this TSL (0x%08" PRIX32 ") size (%" PRIu32 ") is out of bound.\n", tsl.addr.index, tsl.log_len);
  213. sector->remain = 0;
  214. result = FDB_READ_ERR;
  215. break;
  216. }
  217. }
  218. }
  219. return result;
  220. }
  221. static fdb_err_t format_sector(fdb_tsdb_t db, uint32_t addr)
  222. {
  223. fdb_err_t result = FDB_NO_ERR;
  224. struct sector_hdr_data sec_hdr;
  225. FDB_ASSERT(addr % db_sec_size(db) == 0);
  226. result = _fdb_flash_erase((fdb_db_t)db, addr, db_sec_size(db));
  227. if (result == FDB_NO_ERR) {
  228. _FDB_WRITE_STATUS(db, addr, sec_hdr.status, FDB_SECTOR_STORE_STATUS_NUM, FDB_SECTOR_STORE_EMPTY, true);
  229. /* set the magic */
  230. sec_hdr.magic = SECTOR_MAGIC_WORD;
  231. FLASH_WRITE(db, addr + SECTOR_MAGIC_OFFSET, &sec_hdr.magic, sizeof(sec_hdr.magic), true);
  232. }
  233. return result;
  234. }
  235. static void sector_iterator(fdb_tsdb_t db, tsdb_sec_info_t sector, fdb_sector_store_status_t status, void *arg1,
  236. void *arg2, bool (*callback)(tsdb_sec_info_t sector, void *arg1, void *arg2), bool traversal)
  237. {
  238. uint32_t sec_addr = sector->addr, traversed_len = 0;
  239. /* search all sectors */
  240. do {
  241. read_sector_info(db, sec_addr, sector, false);
  242. if (status == FDB_SECTOR_STORE_UNUSED || status == sector->status) {
  243. if (traversal) {
  244. read_sector_info(db, sec_addr, sector, true);
  245. }
  246. /* iterator is interrupted when callback return true */
  247. if (callback && callback(sector, arg1, arg2)) {
  248. return;
  249. }
  250. }
  251. traversed_len += db_sec_size(db);
  252. } while ((sec_addr = get_next_sector_addr(db, sector, traversed_len)) != FAILED_ADDR);
  253. }
  254. static fdb_err_t write_tsl(fdb_tsdb_t db, fdb_blob_t blob, fdb_time_t time)
  255. {
  256. fdb_err_t result = FDB_NO_ERR;
  257. struct log_idx_data idx;
  258. uint32_t idx_addr = db->cur_sec.empty_idx;
  259. idx.log_len = blob->size;
  260. idx.time = time;
  261. idx.log_addr = db->cur_sec.empty_data - FDB_WG_ALIGN(idx.log_len);
  262. /* write the status will by write granularity */
  263. _FDB_WRITE_STATUS(db, idx_addr, idx.status_table, FDB_TSL_STATUS_NUM, FDB_TSL_PRE_WRITE, false);
  264. /* write other index info */
  265. FLASH_WRITE(db, idx_addr + LOG_IDX_TS_OFFSET, &idx.time, sizeof(struct log_idx_data) - LOG_IDX_TS_OFFSET, false);
  266. /* write blob data */
  267. FLASH_WRITE(db, idx.log_addr, blob->buf, blob->size, false);
  268. /* write the status will by write granularity */
  269. _FDB_WRITE_STATUS(db, idx_addr, idx.status_table, FDB_TSL_STATUS_NUM, FDB_TSL_WRITE, true);
  270. return result;
  271. }
  272. static fdb_err_t update_sec_status(fdb_tsdb_t db, tsdb_sec_info_t sector, fdb_blob_t blob, fdb_time_t cur_time)
  273. {
  274. fdb_err_t result = FDB_NO_ERR;
  275. uint8_t status[FDB_STORE_STATUS_TABLE_SIZE];
  276. if (sector->status == FDB_SECTOR_STORE_USING && sector->remain < LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(blob->size)) {
  277. uint8_t end_status[TSL_STATUS_TABLE_SIZE];
  278. uint32_t end_index = sector->empty_idx - LOG_IDX_DATA_SIZE, new_sec_addr, cur_sec_addr = sector->addr;
  279. /* save the end node index and timestamp */
  280. if (sector->end_info_stat[0] == FDB_TSL_UNUSED) {
  281. _FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END0_STATUS_OFFSET, end_status, FDB_TSL_STATUS_NUM, FDB_TSL_PRE_WRITE, false);
  282. FLASH_WRITE(db, cur_sec_addr + SECTOR_END0_TIME_OFFSET, (uint32_t * )&db->last_time, sizeof(fdb_time_t), false);
  283. FLASH_WRITE(db, cur_sec_addr + SECTOR_END0_IDX_OFFSET, &end_index, sizeof(end_index), false);
  284. _FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END0_STATUS_OFFSET, end_status, FDB_TSL_STATUS_NUM, FDB_TSL_WRITE, true);
  285. } else if (sector->end_info_stat[1] == FDB_TSL_UNUSED) {
  286. _FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END1_STATUS_OFFSET, end_status, FDB_TSL_STATUS_NUM, FDB_TSL_PRE_WRITE, false);
  287. FLASH_WRITE(db, cur_sec_addr + SECTOR_END1_TIME_OFFSET, (uint32_t * )&db->last_time, sizeof(fdb_time_t), false);
  288. FLASH_WRITE(db, cur_sec_addr + SECTOR_END1_IDX_OFFSET, &end_index, sizeof(end_index), false);
  289. _FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END1_STATUS_OFFSET, end_status, FDB_TSL_STATUS_NUM, FDB_TSL_WRITE, true);
  290. }
  291. /* change current sector to full */
  292. _FDB_WRITE_STATUS(db, cur_sec_addr, status, FDB_SECTOR_STORE_STATUS_NUM, FDB_SECTOR_STORE_FULL, true);
  293. sector->status = FDB_SECTOR_STORE_FULL;
  294. /* calculate next sector address */
  295. if (sector->addr + db_sec_size(db) < db_max_size(db)) {
  296. new_sec_addr = sector->addr + db_sec_size(db);
  297. }
  298. else if (db->rollover) {
  299. new_sec_addr = 0;
  300. } else {
  301. /* not rollover */
  302. return FDB_SAVED_FULL;
  303. }
  304. read_sector_info(db, new_sec_addr, &db->cur_sec, false);
  305. if (sector->status != FDB_SECTOR_STORE_EMPTY) {
  306. /* calculate the oldest sector address */
  307. if (new_sec_addr + db_sec_size(db) < db_max_size(db)) {
  308. db->oldest_addr = new_sec_addr + db_sec_size(db);
  309. } else {
  310. db->oldest_addr = 0;
  311. }
  312. format_sector(db, new_sec_addr);
  313. read_sector_info(db, new_sec_addr, &db->cur_sec, false);
  314. }
  315. } else if (sector->status == FDB_SECTOR_STORE_FULL) {
  316. /* database full */
  317. return FDB_SAVED_FULL;
  318. }
  319. if (sector->status == FDB_SECTOR_STORE_EMPTY) {
  320. /* change the sector to using */
  321. sector->status = FDB_SECTOR_STORE_USING;
  322. sector->start_time = cur_time;
  323. _FDB_WRITE_STATUS(db, sector->addr, status, FDB_SECTOR_STORE_STATUS_NUM, FDB_SECTOR_STORE_USING, true);
  324. /* save the start timestamp */
  325. FLASH_WRITE(db, sector->addr + SECTOR_START_TIME_OFFSET, (uint32_t *)&cur_time, sizeof(fdb_time_t), true);
  326. }
  327. return result;
  328. }
  329. static fdb_err_t tsl_append(fdb_tsdb_t db, fdb_blob_t blob)
  330. {
  331. fdb_err_t result = FDB_NO_ERR;
  332. fdb_time_t cur_time = db->get_time();
  333. FDB_ASSERT(blob->size <= db->max_len);
  334. /* check the current timestamp, MUST more than the last save timestamp */
  335. if (cur_time < db->last_time) {
  336. FDB_INFO("Warning: current timestamp (%" PRIdMAX ") is less than the last save timestamp (%" PRIdMAX "). This tsl will be dropped.\n",
  337. (intmax_t )cur_time, (intmax_t )(db->last_time));
  338. return FDB_WRITE_ERR;
  339. }
  340. result = update_sec_status(db, &db->cur_sec, blob, cur_time);
  341. if (result != FDB_NO_ERR) {
  342. return result;
  343. }
  344. /* write the TSL node */
  345. result = write_tsl(db, blob, cur_time);
  346. if (result != FDB_NO_ERR) {
  347. return result;
  348. }
  349. /* recalculate the current using sector info */
  350. db->cur_sec.end_idx = db->cur_sec.empty_idx;
  351. db->cur_sec.end_time = cur_time;
  352. db->cur_sec.empty_idx += LOG_IDX_DATA_SIZE;
  353. db->cur_sec.empty_data -= FDB_WG_ALIGN(blob->size);
  354. db->cur_sec.remain -= LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(blob->size);
  355. db->last_time = cur_time;
  356. return result;
  357. }
  358. /**
  359. * Append a new log to TSDB.
  360. *
  361. * @param db database object
  362. * @param blob log blob data
  363. *
  364. * @return result
  365. */
  366. fdb_err_t fdb_tsl_append(fdb_tsdb_t db, fdb_blob_t blob)
  367. {
  368. fdb_err_t result = FDB_NO_ERR;
  369. if (!db_init_ok(db)) {
  370. FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
  371. return FDB_INIT_FAILED;
  372. }
  373. db_lock(db);
  374. result = tsl_append(db, blob);
  375. db_unlock(db);
  376. return result;
  377. }
  378. /**
  379. * The TSDB iterator for each TSL.
  380. *
  381. * @param db database object
  382. * @param cb callback
  383. * @param arg callback argument
  384. */
  385. void fdb_tsl_iter(fdb_tsdb_t db, fdb_tsl_cb cb, void *arg)
  386. {
  387. struct tsdb_sec_info sector;
  388. uint32_t sec_addr, traversed_len = 0;
  389. struct fdb_tsl tsl;
  390. if (!db_init_ok(db)) {
  391. FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
  392. }
  393. if (cb == NULL) {
  394. return;
  395. }
  396. sec_addr = db->oldest_addr;
  397. db_lock(db);
  398. /* search all sectors */
  399. do {
  400. traversed_len += db_sec_size(db);
  401. if (read_sector_info(db, sec_addr, &sector, false) != FDB_NO_ERR) {
  402. continue;
  403. }
  404. /* sector has TSL */
  405. if (sector.status == FDB_SECTOR_STORE_USING || sector.status == FDB_SECTOR_STORE_FULL) {
  406. if (sector.status == FDB_SECTOR_STORE_USING) {
  407. /* copy the current using sector status */
  408. sector = db->cur_sec;
  409. }
  410. tsl.addr.index = sector.addr + SECTOR_HDR_DATA_SIZE;
  411. /* search all TSL */
  412. do {
  413. read_tsl(db, &tsl);
  414. /* iterator is interrupted when callback return true */
  415. if (cb(&tsl, arg)) {
  416. db_unlock(db);
  417. return;
  418. }
  419. } while ((tsl.addr.index = get_next_tsl_addr(&sector, &tsl)) != FAILED_ADDR);
  420. }
  421. } while ((sec_addr = get_next_sector_addr(db, &sector, traversed_len)) != FAILED_ADDR);
  422. db_unlock(db);
  423. }
  424. /**
  425. * The TSDB iterator for each TSL.
  426. *
  427. * @param db database object
  428. * @param cb callback
  429. * @param arg callback argument
  430. */
  431. void fdb_tsl_iter_reverse(fdb_tsdb_t db, fdb_tsl_cb cb, void *cb_arg)
  432. {
  433. struct tsdb_sec_info sector;
  434. uint32_t sec_addr, traversed_len = 0;
  435. struct fdb_tsl tsl;
  436. if (!db_init_ok(db)) {
  437. FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
  438. }
  439. if (cb == NULL) {
  440. return;
  441. }
  442. sec_addr = db->cur_sec.addr;
  443. db_lock(db);
  444. /* search all sectors */
  445. do {
  446. traversed_len += db_sec_size(db);
  447. if (read_sector_info(db, sec_addr, &sector, false) != FDB_NO_ERR) {
  448. continue;
  449. }
  450. /* sector has TSL */
  451. if (sector.status == FDB_SECTOR_STORE_USING || sector.status == FDB_SECTOR_STORE_FULL) {
  452. if (sector.status == FDB_SECTOR_STORE_USING) {
  453. /* copy the current using sector status */
  454. sector = db->cur_sec;
  455. }
  456. tsl.addr.index = sector.end_idx;
  457. /* search all TSL */
  458. do {
  459. read_tsl(db, &tsl);
  460. /* iterator is interrupted when callback return true */
  461. if (cb(&tsl, cb_arg)) {
  462. goto __exit;
  463. }
  464. } while ((tsl.addr.index = get_last_tsl_addr(&sector, &tsl)) != FAILED_ADDR);
  465. } else if (sector.status == FDB_SECTOR_STORE_EMPTY || sector.status == FDB_SECTOR_STORE_UNUSED)
  466. goto __exit;
  467. } while ((sec_addr = get_last_sector_addr(db, &sector, traversed_len)) != FAILED_ADDR);
  468. __exit:
  469. db_unlock(db);
  470. }
  471. /*
  472. * Found the matched TSL address.
  473. */
  474. static int search_start_tsl_addr(fdb_tsdb_t db, int start, int end, fdb_time_t from, fdb_time_t to)
  475. {
  476. struct fdb_tsl tsl;
  477. while (true) {
  478. tsl.addr.index = start + FDB_ALIGN((end - start) / 2, LOG_IDX_DATA_SIZE);
  479. read_tsl(db, &tsl);
  480. if (tsl.time < from) {
  481. start = tsl.addr.index + LOG_IDX_DATA_SIZE;
  482. } else if (tsl.time > from) {
  483. end = tsl.addr.index - LOG_IDX_DATA_SIZE;
  484. } else {
  485. return tsl.addr.index;
  486. }
  487. if (start > end) {
  488. if (from > to) {
  489. tsl.addr.index = start;
  490. read_tsl(db, &tsl);
  491. if (tsl.time > from) {
  492. start -= LOG_IDX_DATA_SIZE;
  493. }
  494. }
  495. break;
  496. }
  497. }
  498. return start;
  499. }
  500. /**
  501. * The TSDB iterator for each TSL by timestamp.
  502. *
  503. * @param db database object
  504. * @param from starting timestamp. It will be a reverse iterator when ending timestamp less than starting timestamp
  505. * @param to ending timestamp
  506. * @param cb callback
  507. * @param arg callback argument
  508. */
  509. void fdb_tsl_iter_by_time(fdb_tsdb_t db, fdb_time_t from, fdb_time_t to, fdb_tsl_cb cb, void *cb_arg)
  510. {
  511. struct tsdb_sec_info sector;
  512. uint32_t sec_addr, start_addr, traversed_len = 0;
  513. struct fdb_tsl tsl;
  514. bool found_start_tsl = false;
  515. uint32_t (*get_sector_addr)(fdb_tsdb_t , tsdb_sec_info_t , uint32_t);
  516. uint32_t (*get_tsl_addr)(tsdb_sec_info_t , fdb_tsl_t);
  517. if (!db_init_ok(db)) {
  518. FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
  519. }
  520. if(from <= to) {
  521. start_addr = db->oldest_addr;
  522. get_sector_addr = get_next_sector_addr;
  523. get_tsl_addr = get_next_tsl_addr;
  524. } else {
  525. start_addr = db->cur_sec.addr;
  526. get_sector_addr = get_last_sector_addr;
  527. get_tsl_addr = get_last_tsl_addr;
  528. }
  529. // FDB_INFO("from %s", ctime((const time_t * )&from));
  530. // FDB_INFO("to %s", ctime((const time_t * )&to));
  531. if (cb == NULL) {
  532. return;
  533. }
  534. sec_addr = start_addr;
  535. db_lock(db);
  536. /* search all sectors */
  537. do {
  538. traversed_len += db_sec_size(db);
  539. if (read_sector_info(db, sec_addr, &sector, false) != FDB_NO_ERR) {
  540. continue;
  541. }
  542. /* sector has TSL */
  543. if ((sector.status == FDB_SECTOR_STORE_USING || sector.status == FDB_SECTOR_STORE_FULL)) {
  544. if (sector.status == FDB_SECTOR_STORE_USING) {
  545. /* copy the current using sector status */
  546. sector = db->cur_sec;
  547. }
  548. if ((found_start_tsl)
  549. || (!found_start_tsl &&
  550. ((from <= to && ((sec_addr == start_addr && from <= sector.start_time) || from <= sector.end_time)) ||
  551. (from > to && ((sec_addr == start_addr && from >= sector.end_time) || from >= sector.start_time)))
  552. )) {
  553. uint32_t start = sector.addr + SECTOR_HDR_DATA_SIZE, end = sector.end_idx;
  554. found_start_tsl = true;
  555. /* search the first start TSL address */
  556. tsl.addr.index = search_start_tsl_addr(db, start, end, from, to);
  557. /* search all TSL */
  558. do {
  559. read_tsl(db, &tsl);
  560. if (tsl.status != FDB_TSL_UNUSED) {
  561. if ((from <= to && tsl.time >= from && tsl.time <= to)
  562. || (from > to && tsl.time <= from && tsl.time >= to)) {
  563. /* iterator is interrupted when callback return true */
  564. if (cb(&tsl, cb_arg)) {
  565. goto __exit;
  566. }
  567. } else {
  568. goto __exit;
  569. }
  570. }
  571. } while ((tsl.addr.index = get_tsl_addr(&sector, &tsl)) != FAILED_ADDR);
  572. }
  573. } else if (sector.status == FDB_SECTOR_STORE_EMPTY) {
  574. goto __exit;
  575. }
  576. } while ((sec_addr = get_sector_addr(db, &sector, traversed_len)) != FAILED_ADDR);
  577. __exit:
  578. db_unlock(db);
  579. }
  580. static bool query_count_cb(fdb_tsl_t tsl, void *arg)
  581. {
  582. struct query_count_args *args = arg;
  583. if (tsl->status == args->status) {
  584. args->count++;
  585. }
  586. return false;
  587. }
  588. /**
  589. * Query some TSL's count by timestamp and status.
  590. *
  591. * @param db database object
  592. * @param from starting timestamp
  593. * @param to ending timestamp
  594. * @param status status
  595. */
  596. size_t fdb_tsl_query_count(fdb_tsdb_t db, fdb_time_t from, fdb_time_t to, fdb_tsl_status_t status)
  597. {
  598. struct query_count_args arg = { FDB_TSL_UNUSED, 0 };
  599. arg.status = status;
  600. if (!db_init_ok(db)) {
  601. FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
  602. return FDB_INIT_FAILED;
  603. }
  604. fdb_tsl_iter_by_time(db, from, to, query_count_cb, &arg);
  605. return arg.count;
  606. }
  607. /**
  608. * Set the TSL status.
  609. *
  610. * @param db database object
  611. * @param tsl TSL object
  612. * @param status status
  613. *
  614. * @return result
  615. */
  616. fdb_err_t fdb_tsl_set_status(fdb_tsdb_t db, fdb_tsl_t tsl, fdb_tsl_status_t status)
  617. {
  618. fdb_err_t result = FDB_NO_ERR;
  619. uint8_t status_table[TSL_STATUS_TABLE_SIZE];
  620. /* write the status will by write granularity */
  621. _FDB_WRITE_STATUS(db, tsl->addr.index, status_table, FDB_TSL_STATUS_NUM, status, true);
  622. return result;
  623. }
  624. /**
  625. * Convert the TSL object to blob object
  626. *
  627. * @param tsl TSL object
  628. * @param blob blob object
  629. *
  630. * @return new blob object
  631. */
  632. fdb_blob_t fdb_tsl_to_blob(fdb_tsl_t tsl, fdb_blob_t blob)
  633. {
  634. blob->saved.addr = tsl->addr.log;
  635. blob->saved.meta_addr = tsl->addr.index;
  636. blob->saved.len = tsl->log_len;
  637. return blob;
  638. }
  639. static bool check_sec_hdr_cb(tsdb_sec_info_t sector, void *arg1, void *arg2)
  640. {
  641. struct check_sec_hdr_cb_args *arg = arg1;
  642. fdb_tsdb_t db = arg->db;
  643. if (!sector->check_ok) {
  644. FDB_INFO("Sector (0x%08" PRIX32 ") header info is incorrect.\n", sector->addr);
  645. (arg->check_failed) = true;
  646. return true;
  647. } else if (sector->status == FDB_SECTOR_STORE_USING) {
  648. if (db->cur_sec.addr == FDB_DATA_UNUSED) {
  649. memcpy(&db->cur_sec, sector, sizeof(struct tsdb_sec_info));
  650. } else {
  651. FDB_INFO("Warning: Sector status is wrong, there are multiple sectors in use.\n");
  652. (arg->check_failed) = true;
  653. return true;
  654. }
  655. } else if (sector->status == FDB_SECTOR_STORE_EMPTY) {
  656. (arg->empty_num) += 1;
  657. arg->empty_addr = sector->addr;
  658. if ((arg->empty_num) == 1 && db->cur_sec.addr == FDB_DATA_UNUSED) {
  659. memcpy(&db->cur_sec, sector, sizeof(struct tsdb_sec_info));
  660. }
  661. }
  662. return false;
  663. }
  664. static bool format_all_cb(tsdb_sec_info_t sector, void *arg1, void *arg2)
  665. {
  666. fdb_tsdb_t db = arg1;
  667. format_sector(db, sector->addr);
  668. return false;
  669. }
  670. static void tsl_format_all(fdb_tsdb_t db)
  671. {
  672. struct tsdb_sec_info sector;
  673. sector.addr = 0;
  674. sector_iterator(db, &sector, FDB_SECTOR_STORE_UNUSED, db, NULL, format_all_cb, false);
  675. db->oldest_addr = 0;
  676. db->cur_sec.addr = 0;
  677. db->last_time = 0;
  678. /* read the current using sector info */
  679. read_sector_info(db, db->cur_sec.addr, &db->cur_sec, false);
  680. FDB_INFO("All sector format finished.\n");
  681. }
  682. /**
  683. * Clean all the data in the TSDB.
  684. *
  685. * @note It's DANGEROUS. This operation is not reversible.
  686. *
  687. * @param db database object
  688. */
  689. void fdb_tsl_clean(fdb_tsdb_t db)
  690. {
  691. db_lock(db);
  692. tsl_format_all(db);
  693. db_unlock(db);
  694. }
  695. /**
  696. * This function will get or set some options of the database
  697. *
  698. * @param db database object
  699. * @param cmd the control command
  700. * @param arg the argument
  701. */
  702. void fdb_tsdb_control(fdb_tsdb_t db, int cmd, void *arg)
  703. {
  704. FDB_ASSERT(db);
  705. switch (cmd) {
  706. case FDB_TSDB_CTRL_SET_SEC_SIZE:
  707. /* this change MUST before database initialization */
  708. FDB_ASSERT(db->parent.init_ok == false);
  709. db->parent.sec_size = *(uint32_t *)arg;
  710. break;
  711. case FDB_TSDB_CTRL_GET_SEC_SIZE:
  712. *(uint32_t *)arg = db->parent.sec_size;
  713. break;
  714. case FDB_TSDB_CTRL_SET_LOCK:
  715. #if !defined(__ARMCC_VERSION) && defined(__GNUC__)
  716. #pragma GCC diagnostic push
  717. #pragma GCC diagnostic ignored "-Wpedantic"
  718. #endif
  719. db->parent.lock = (void (*)(fdb_db_t db))arg;
  720. #if !defined(__ARMCC_VERSION) && defined(__GNUC__)
  721. #pragma GCC diagnostic pop
  722. #endif
  723. break;
  724. case FDB_TSDB_CTRL_SET_UNLOCK:
  725. #if !defined(__ARMCC_VERSION) && defined(__GNUC__)
  726. #pragma GCC diagnostic push
  727. #pragma GCC diagnostic ignored "-Wpedantic"
  728. #endif
  729. db->parent.unlock = (void (*)(fdb_db_t db))arg;
  730. #if !defined(__ARMCC_VERSION) && defined(__GNUC__)
  731. #pragma GCC diagnostic pop
  732. #endif
  733. break;
  734. case FDB_TSDB_CTRL_SET_ROLLOVER:
  735. /* this change MUST after database initialized */
  736. FDB_ASSERT(db->parent.init_ok == true);
  737. db->rollover = *(bool *)arg;
  738. break;
  739. case FDB_TSDB_CTRL_GET_ROLLOVER:
  740. *(bool *)arg = db->rollover;
  741. break;
  742. case FDB_TSDB_CTRL_GET_LAST_TIME:
  743. *(fdb_time_t *)arg = db->last_time;
  744. break;
  745. case FDB_TSDB_CTRL_SET_FILE_MODE:
  746. #ifdef FDB_USING_FILE_MODE
  747. /* this change MUST before database initialization */
  748. FDB_ASSERT(db->parent.init_ok == false);
  749. db->parent.file_mode = *(bool *)arg;
  750. #else
  751. FDB_INFO("Error: set file mode Failed. Please defined the FDB_USING_FILE_MODE macro.");
  752. #endif
  753. break;
  754. case FDB_TSDB_CTRL_SET_MAX_SIZE:
  755. #ifdef FDB_USING_FILE_MODE
  756. /* this change MUST before database initialization */
  757. FDB_ASSERT(db->parent.init_ok == false);
  758. db->parent.max_size = *(uint32_t *)arg;
  759. #endif
  760. break;
  761. case FDB_TSDB_CTRL_SET_NOT_FORMAT:
  762. /* this change MUST before database initialization */
  763. FDB_ASSERT(db->parent.init_ok == false);
  764. db->parent.not_formatable = *(bool *)arg;
  765. break;
  766. }
  767. }
  768. /**
  769. * The time series database initialization.
  770. *
  771. * @param db database object
  772. * @param name database name
  773. * @param path FAL mode: partition name, file mode: database saved directory path
  774. * @param get_time get current time function
  775. * @param max_len maximum length of each log
  776. * @param user_data user data
  777. *
  778. * @return result
  779. */
  780. fdb_err_t fdb_tsdb_init(fdb_tsdb_t db, const char *name, const char *path, fdb_get_time get_time, size_t max_len, void *user_data)
  781. {
  782. fdb_err_t result = FDB_NO_ERR;
  783. struct tsdb_sec_info sector;
  784. struct check_sec_hdr_cb_args check_sec_arg = { db, false, 0, 0};
  785. FDB_ASSERT(get_time);
  786. result = _fdb_init_ex((fdb_db_t)db, name, path, FDB_DB_TYPE_TS, user_data);
  787. if (result != FDB_NO_ERR) {
  788. goto __exit;
  789. }
  790. db->get_time = get_time;
  791. db->max_len = max_len;
  792. /* default rollover flag is true */
  793. db->rollover = true;
  794. db->oldest_addr = FDB_DATA_UNUSED;
  795. db->cur_sec.addr = FDB_DATA_UNUSED;
  796. /* must less than sector size */
  797. FDB_ASSERT(max_len < db_sec_size(db));
  798. /* check all sector header */
  799. sector.addr = 0;
  800. sector_iterator(db, &sector, FDB_SECTOR_STORE_UNUSED, &check_sec_arg, NULL, check_sec_hdr_cb, true);
  801. /* format all sector when check failed */
  802. if (check_sec_arg.check_failed) {
  803. if (db->parent.not_formatable) {
  804. result = FDB_READ_ERR;
  805. goto __exit;
  806. } else {
  807. tsl_format_all(db);
  808. }
  809. } else {
  810. uint32_t latest_addr;
  811. if (check_sec_arg.empty_num > 0) {
  812. latest_addr = check_sec_arg.empty_addr;
  813. } else {
  814. if (db->rollover) {
  815. latest_addr = db->cur_sec.addr;
  816. } else {
  817. /* There is no empty sector. */
  818. latest_addr = db->cur_sec.addr = db_max_size(db) - db_sec_size(db);
  819. }
  820. }
  821. /* db->cur_sec is the latest sector, and the next is the oldest sector */
  822. if (latest_addr + db_sec_size(db) >= db_max_size(db)) {
  823. /* db->cur_sec is the the bottom of the database */
  824. db->oldest_addr = 0;
  825. } else {
  826. db->oldest_addr = latest_addr + db_sec_size(db);
  827. }
  828. }
  829. FDB_DEBUG("TSDB (%s) oldest sectors is 0x%08" PRIX32 ", current using sector is 0x%08" PRIX32 ".\n", db_name(db), db->oldest_addr,
  830. db->cur_sec.addr);
  831. /* read the current using sector info */
  832. read_sector_info(db, db->cur_sec.addr, &db->cur_sec, true);
  833. /* get last save time */
  834. if (db->cur_sec.status == FDB_SECTOR_STORE_USING) {
  835. db->last_time = db->cur_sec.end_time;
  836. } else if (db->cur_sec.status == FDB_SECTOR_STORE_EMPTY && db->oldest_addr != db->cur_sec.addr) {
  837. struct tsdb_sec_info sec;
  838. uint32_t addr = db->cur_sec.addr;
  839. if (addr == 0) {
  840. addr = db_max_size(db) - db_sec_size(db);
  841. } else {
  842. addr -= db_sec_size(db);
  843. }
  844. read_sector_info(db, addr, &sec, false);
  845. db->last_time = sec.end_time;
  846. }
  847. __exit:
  848. _fdb_init_finish((fdb_db_t)db, result);
  849. return result;
  850. }
  851. /**
  852. * The time series database deinitialization.
  853. *
  854. * @param db database object
  855. *
  856. * @return result
  857. */
  858. fdb_err_t fdb_tsdb_deinit(fdb_tsdb_t db)
  859. {
  860. _fdb_deinit((fdb_db_t) db);
  861. return FDB_NO_ERR;
  862. }
  863. #endif /* defined(FDB_USING_TSDB) */