分享

Doris源码解析[三、BE存储引擎]

levycui 2021-2-23 20:11:43 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 5522
本帖最后由 levycui 于 2021-2-23 21:06 编辑
问题导读:
1、如何理解物理存储抽象?
2、如何设计Rowset和Tablet?
3、TabletMeta和RowsetMeta的持久化存储的方式有什么不同?
4、Service 和 StorageEngine如何设计?



BE存储引擎部分代码设计文档(2019)

主要工作

  •     类名和文件名尽量一致,看代码的时候比较方便便。
  •     逻辑上分层,理理论上只能从上往下调用,不不能从下往上调用,明确每个层次的作用。
  •     划分概念,现在是乱的,看了了类名不不知道意思。
  •     写好注释,看懂了的代码就加上注释,新来的同学看代码比较容易一些,如果英文不行就使用中文注释吧。
  •     重构完之后,理理论上各个模块应该可以单独测试。
  •     本次不涉及功能裁剪,只是重新组合各个模块的功能让模块之间的职责划分更清晰,现在接口内部的行为的实现代码能不动就不动了。

主要概念

    storageengine –> tablet –> rowset –> segment group –> segment, storageengine 包含多个tablet, tablet包含多个rowset。
    将BE原来的table这个概念换成tablet,每个tablet 有唯一的tabletid和schema hash, rollup 和 schema change都是产生不同的tablet。
    rowset相当于原来的一个版本,rowset可以是一个版本的数据,rowset也可以包含多个版本的数据。
    version:只是代表一个版本号,version不能表示数据文件。
    Delta 特定指导入的一个批次,delta这个概念可能只是出现在我们的讨论中,不会出现在代码中。

接口设计
物理存储抽象


整个BE是建立在以下2个子存储系统上来完成存储的管理工作的:

  •     文件系统,这个主要是底层的rowset读写文件需要这个,比如创建文件,读取文件,删除文件,拷贝文件等。这块考虑到未来我们可能用BOS作为我们的存储系统,所以这块我们做了了一个抽象叫做 FsEnv。
  •     元数据存储系统,是底层存储tablet和rowset的元数据的一个系统,目前我们使用rocksdb实现,未来比如用BOS作为存储的时候这个可能也会变化,所以我们我们把这块也做了了抽象叫做 MetaEnv。
  •     FsEnv和MetaEnv其实都跟底层的存储介质相关,并且它们经常需要一起使用,而且需要在多个对象之 间传递,所以在FsEnv和MetaEnv之上我们构建了一个DataDir层,封装了FsEnv和MetaEnv,每个 DataDir都绑定了一个存储介质StorageMedium。下面分别介绍一下它们的主要接口。

  1. // StoreType 描述底层的物理理存储的类型 StoreType
  2. {
  3.     POSIX; // 基于本地文件系统的一种dir实现
  4.     BOS; // 基于百度对象存储系统的dir实现
  5. }
  6. // data dir 并不不仅仅封装了了FsEnv和MetaEnv, 还封装了部分磁盘录管理的逻辑,如当前我们个磁盘目录下最多允许1000个子目录
  7. // 当我们要create个新的tablet的时候,data dir就需要做下这个录的选择了
  8. // 未来汇报存储状态的时候我们也直接让DataDir来汇报就可以了了
  9. DataDir {
  10.     Status init();
  11.     // 根据tablet的元数据,创建tablet存储的文件结构,并不不修改元数据 Status create_tablet_dir(TabletMeta* tablet_meta);
  12.     // 删除tablet的物理文件结构
  13.     Status delete_tablet_dir(TabletMeta* tablet_meta); Status get_fs_env(FsEnv* fs_env);
  14.     Status get_meta_env(MetaEnv* meta_env);
  15. }
  16. // 抽象下文件系统操作的逻辑,未来我们可能使用bos作为我们的存储介质,到时候只要重新实现下fs env就可以了
  17. FsEnv
  18. {
  19.     create_file(); create_dir(); pread(); pwrite();
  20. }
  21. // 抽象tablet和rowset的元数据操作的逻辑,未来假如对接bos的话需要实现个基于bos的元数据管理模块
  22. // 比如我们基于bos做管理的时候,metastore可能实现为个件,到时候把本地的数据直接保存到bos上作为件就可以了
  23. MetaEnv
  24. {
  25.     Status get_all_tablet_meta(vector<TabletMeta*> tablet_metas);
  26.     Status get_all_rowset_meta(vector<RowsetMeta*> rowset_metas);
  27.     Status save_tablet_meta(TabletMeta* tablet_meta);
  28.     Status delete_tablet_meta(TabletMeta* tablet_meta);
  29.     Status save_rowset_meta(RowsetMeta* rowset_meta);
  30.     Status delete_rowset_meta(RowsetMeta* rowset_meta);
  31. }
复制代码


Rowset设计

Rowset代表了一个或多个批次的数据,抽象它的原因是想以后能支持更多的文件存储格式。在设计过程中考虑了一下几点:

  •     tablet是不感知磁盘的,真正感知存储路径的是rowset,每个rowset都应该有一个单独的磁盘目录,比如我们在做磁盘迁移的时候,一个tablet的数据可能跨越了多个磁盘,这样实际上就要求每个rowset有&#12163;己的存储目录。
  •     rowset一旦生成了就应该是一个不可变的结构,理论上任何状态都不能改变,这样有利于控制我们系统的读写并发。
  •     rowset的元数据尽量少一些,因为一个节点可能有几十万个tablet,每个tablet&#12060;可能有几百到几千个rowset,而rowset的元数据位于内存中,如果数据量太大内存可能爆掉。
  •     rowset的设计要满&#12188;我们磁盘拷贝的需求,比如我们把一个磁盘的数据和元数据都拷贝一下到另外一个磁盘上或者把磁盘目录变更一下,系统应该是能够加载的。所以不能直接在rowset meta中存储data dir。
  •     rowset的id在一个be内要唯一, 但是在不同的be上会有重复。rowset id只是一个唯一的标识,并不代表任何含义,两个BE上如果有两个rowset的id相同,他们的数据内容不一定一样。

  1. // 我们的文件格式可能会升级,也可能引入别的文件格式,所以我们引入rowset type来表示不不同的文件格式
  2. enum RowsetType {
  3.     ALPHA_ROWSET = 0, // doris原有的列存
  4.     BETA_ROWSET = 1, // 新列存
  5. };
  6. RowsetState {
  7.     PREPARING, // 表示rowset正在写入
  8.     COMMITTED, // 表示rowset已经写入完成,但是户还不可见;这个状态下的rowset,BE不能行判断是否删除,必须由FE的指令
  9.     VISIBLE; // 表示rowset 已经对户可见
  10. }
  11. RowsetMeta {
  12.     version; // 当rowset meta更新的时候修改一下这个version,当两个元数据冲突的时候以version大的为基准
  13.     txn_id; // 当前rowset关联的txn是哪个,当系统重启的时候storage engine会加载所有的rowset,根据rowset关联的txn恢复内存中的txnmap
  14.     tablet_id;
  15.     tablet_schemahash;
  16.     rowset_id;
  17.     rowset_type;
  18.     start_version;
  19.     end_version;
  20.     row_num;
  21.     total_disk_size;
  22.     data_disk_size;
  23.     index_disk_size;
  24.     // 下这一堆最好直接封装成binary
  25.     // min/max/null等统计信息
  26.     column_meta;
  27.     delete_condition;
  28.     // 用于存储不同rowset的一些额外信息
  29.     // 为了方便扩展,Property是一个key/value对,类型都是string vector<std::string> properties;
  30. }
  31. Rowset {
  32. 方法:
  33.     // 主要用来通过rowset_id获取元素信息,初始化_rowset_meta
  34.     // 加载rowset的时候要不要做文件的check?
  35.     Status init(RowsetMeta* rowset_meta);
  36.     // 构造RowsetReader,用于读取rowset
  37.     RowsetReader* create_reader();
  38.     // 于rowset重命名使,比如迁盘
  39.     // 我们不提供move接口,move可以用copy + delete 实现
  40.     Status copy(RowsetWriter* dest_rowset_writer);
  41.     // 删除rowset底层所有的文件和其他资源
  42.     Status delete();
  43. 属性:
  44.     // rowset 记录它所在的磁盘目录的句柄,从这个句柄中可以获得metaenv和fsenv
  45.     DataDir dir;
  46.     RowsetMeta rowset_meta;
  47. };
  48. // 当要创建个rowset的时候必须调用这个api,如clone,restore,ingest
  49. // 除了系统加载之外,只有这个类能够成个新的rowset
  50. RowsetWriter {
  51. 方法:
  52.     // 初始化RowsetWriter
  53.     Status init(std::string rowset_id, const std::string& rowset_path_prefix, Sche ma* schema) = 0;
  54.     // 写一个rowblock数据
  55.     Status add_row_block(RowBlock* row_block) = 0;
  56.     // 当要向个rowset中增加个文件的时候调这个api,这个方法看着有点别扭,有更更好的方法可以替换一下
  57.     // 1. 根据传的file name,rowset writer会成个对应的文件名
  58.     // 2. 上游系统根据生成的文件名把文件拷到这个目录下
  59.     Status generate_written_path(string src_path);
  60.     // 成一个rowset,先要检查是否close了,如果close了才能调用这个API
  61.     Status build(Rowset* rowset);
  62.     // 释放所有的资源,此时代表不能写新的数据文件了,但是可以修改元数据
  63.     close();
  64. 属性:
  65.     DataDir dir;
  66. };
  67. /**
  68. * ReadContext中的参数是在要读取rowset的逻辑的地方
  69. * 拿到RowsetReader对象之后,进行init的时候传的。
  70. * 这些参数区别于构造函数中的传入参数在于构造函数的参数是在
  71. * new ReadContext的对象的时候就会确定,也就是在Rowset::new_reader()
  72. * 的时候决定
  73. */
  74. class ReadContext {
  75. private:
  76.     Schema* projection; // 投影列信息
  77.     std::unordered_map<std::string, ColumnPredicate> predicates; //过滤条件 const EncodedKey* lower_bound_key; // key下界
  78.     const EncodedKey* exclusive_upper_bound_key; // key上界
  79. };
  80. class RowsetReader {
  81. 方法:
  82.     // reader初始化函数
  83.     // init逻辑中需要判断rowset是否可以直接被统计信息过滤删除、是否被删除条件删除。
  84.     // 如果被删除,则has_next直接返回false
  85.     Status init(ReadContext* read_context) = 0;
  86.     // 判断是否还有数据 bool has_next() = 0;
  87.     // 读取下个Block的数据
  88.     Status next_block(RowBlock* row_block) = 0;
  89.     // 关闭reader
  90.     // 会触发下层数据读取逻辑的close操作,进行类似关闭文件,
  91.     // 更新统计信息等操作
  92.     void close() = 0;
  93. 属性:
  94.     DataDir dir;
  95.     RowsetMeta rowset_meta;
  96. };
复制代码



Tablet的设计

    在新的架构里tablet只是一个rowset的容器器,tablet可以根据rowset创建相应的reader。
    tablet里所有的rowset都是生效的rowset,不生效的rowset不放在这&#12197;面,这样简化了tablet的逻辑。

  1. TabletState
  2. {
  3. }
  4. TabletMeta
  5. {
  6.     int64_t version; // 表示meta修改的版本号,当有多个属于同一个tablet的meta存在时,选择version最的那个
  7.     tableid; // 前没想到有什么用
  8.     partition_id; // publish version的时候需要根据partition id来获取所有关联的rowset, 因为version是绑定在partition上的,不能按照tablet来partition的原因是怕tabletid太多了
  9.     shardid; // tablet 所在的shard的id,这个是跟磁盘管理理相关的逻辑,是个optional
  10.     TabletState state;
  11.     vector<RowsetMeta> rowsets;
  12.     serialize();
  13.     deserialize();
  14. }
  15. Tablet {
  16. 方法:
  17.     // 当compaction、写结束、clone结束的时候调用这个api来修改
  18.     // 这把要增加的rowset和要删除的rowset放在起可以保持个原子性
  19.     // 当compaction的时候可能会增加些rowset,删除些rowset,并且希望这个原性
  20.     Status modify_rowsets(vector<Rowset*> added_rowsets, vector<Rowset*> deleted_r
  21.     owsets);
  22.     Status create_snapshot();
  23.    
  24.     Status release_snapshot();
  25.     // 这个API 只有在垃圾清理理的时候用
  26.     Status get_all_rowsets(vector<Rowset*> rowsets)
  27.     {
  28.         // 根据文件列列表的前缀返回所有的独立的rowsetid
  29.     }
  30. 属性:
  31.     DataDir data_dir; // 当前tablet 默认的data dir,当写入新的rowset的时候,需要用这个dat
  32. adir来存储数据
  33. }
  34. TabletReader
  35. {
  36.     set_read_version(int64_t end_version);
  37.     set_start_key();
  38.     set_end_key();
  39.     set_filter_expr();
  40. }
复制代码



TabletMeta和RowsetMeta的持久化存储的&#12101;式:

    当前存在的问题
  •         过去tablet meta中保存了一个rowset meta的列表,每次更新rowset都需要更新整个tablet meta,这样写放大非常严重。
  •         在这种&#12101;方式下未生效的rowset【COMMITTED】和已生效的rowset【VISIBLE】都存储在tablet meta中,比较混乱。
    所以这次我们计划更新tablet meta和rowset meta的管理方式,具体逻辑如下:
  •         当rowset commit的时候,需要修改rowset的状态为COMMITTED,需要把rowsetmeta信息持久化到meta store中。
  •         当rowset publish的时候,需要修改rowset的状态为VISIBLE,修改rowset的version,把rowset信息持久化到meta env中。然后把rowset meta添加到tablet meta中,此时tablet meta可以不持久化。
  •         这会带来一个问题,就是系统重启的时候如果rowset特别多,那么启动会非常慢。所以需要engine定期的把tablet meta持久化,一旦持久化,那么就把已经visible的rowset meta从meta store中删除,因为他们已经保存到tablet meta中了。 这样做还带来一个好处,当rowset向tablet中增加时,只是内存操作,加一个lock就可以,不涉及io,速度可以做到&#12206;常快。

系统启动加载过程

在上述结构下,整个系统重启的时候的tablet和rowset加载过程如下:

  •     根据系统的配置文件读取data dir,目前doris的架构&#12197;,每个data dir就是配置文件的一个磁盘目录。
  •     读取每个data dir下的meta store中的tablet meta和rowset meta,根据tablet meta中的shard id构建 rowset对象的shard id信息。
  •     根据加载后的所有的tablet meta构建tablet对象
        如果在不同的data dir目录下存在多个tablet meta,那么选择version最大的tablet meta构建tablet对象。
        如果rowset的状态是PUBLISHED,那么就把他追加到对应的tablet里。
        如果rowset的状态是COMMITTED,那么就根据txn id,把他加到txn map里。

Service 和 StorageEngine设计

我们把BE向外部暴露的能力封装为一个的Service,Service有两类一类是轮询fe获得task,然后执行 task;另一类是作为rpc的服务端供rpc调用的。Service假如需要访问rowset或者tablet的时候,我们都首先注册一个task,由storageengine负责生成task的各种状态,然后暴露对应的task给service,然后service调用 task的execute方法来执行。这样就把所有的并发和状态控制都封装在StorageEngine中,简单的逻辑如下:

  •     service模块调用StorageEngine对应的&#12101;法create一个task,在create task中storageengine做一系列的检查,加锁的逻辑
  •     service拿到task之后调用task的execute方法,来执行具体的&#12079;作。service调用storageengine的finish方法结束一个task。
  •     如果task执行失败,那么调用cancel task方法来取消task,storageengine需要清除对应的状态。在上述逻辑下,所有的状态维护,锁的维护都由storageengine完成,外部不再看到锁,看到状态。以目前为例我们有以下&#12047;种task:

  1.     ` IngestService { // batch ingest的逻辑 storageengine->create_rowset_writer();while(1) { download_file(); rowset_writer.write_block(); } storageengine->finish_rowset_writer(rowset_writer);
  2.     // 流式导的逻辑 storageengine->create_rowset_writer(); // 每次写都调 rowset_writer.write_block(); // 写完之后 storageengine->finish_rowset_writer(rowset_writer); }
  3. AlterService { storageengine->create_alter_Task(); alter_task.execute() { } storageengine->finish_alter_Task(alter_Task); }
  4. // 从文件恢复一个rowset CloneService { // 当全量clone的时候,tablet不存在,那么先调create tablet创建一个tablet // 增量clone的时候,tablet存在,不需要create tablet if (tablet not exists) { storageengine->create_tablet(); } storageengine->create_clone_task(); clone_Task.execute(); storageengine->finish_clone_task(clone_task); }
  5. AlterTask { Status execute() { // 1. 读取老老的数据 // 2. 把数据写入到新的tablet中 }
  6. Status get_rowsets(vector<RowSet*> rowsets);  }
  7. CloneTask { // 开始执一个clone Task // 1. 调远程的节点,make snapshot // 2. 从远程节点下载文件,放到本地临时文件夹,包括meta,index,data Status execute() { make_snapshot(); }
  8. // 当clone任务结束后,上层调用这个API,这个API返回clone的rowset的列表
  9. // 上层API把对应的rowset根据条件添加到tablet中
  10. Status get_rowsets(vector<RowSet*> rowsets); }
  11. MigrateTask { Status execute() { // 修改tablet的meta,把tablet meta持久化到目标的 // 1. 遍历migrate task中的rowset 列列表 // 2. 一个个rowset进行行转化。 }
  12. Status get_rowsets(vector<RowSet*> rowsets);  }
  13. CompactionTask { Status execute(); Status get_rowsets(vector<RowSet*> rowsets); }
  14. ### StorageEngine部分
  15. StorageEngine { // 当上游的导入任务需要导一份数据的时候调这个api,这个api会创建个rowset writer给上层 // 上层调用这个api完成数据的写入 Status open_rowset_writer(int64_t tablet_id, int64_t schema_hash, int64_t txnId, RowsetWriter* rowset_writer) { // 1. 检测当前tablet是否在做schema change,如果在做,那么就把schema change的tablet也拿出来 // 2. 成rowset id 和 rowset writer // 3. 根据tablet的属性,设置rowset writer写文件的录 // 4. 在txn map中注册下txn与rowset,tablet的关系 // 5. 在一些场景下【clone,schema change,rollup】的时候没有txn id,此时就不需要建立关联了。 }
  16. // 当向个rowset writer 写完数据之后调用这个API
  17. Status finish_rowset_writer(RowsetWriter* rowset_writer)
  18. {
  19.     // 1. 生成对应的rowset, 如果有alter task在执那么就生成两个tablet的rowset
  20.     // 2. 将rowset的状态变更为committed状态。
  21.     // 3. 当户调用finish的时候,需要把rowset元数据持久化,因为调完finish之后,户会调用txn.commit
  22.     // 旦commit成功,那么数据就不能丢失了。
  23. }
  24. // 当写错误的时候调用这个API,表示取消一个rowset writer
  25. Status cancel_rowset_writer(RowsetWriter* rowset_writer)
  26. {
  27.     // 1. 关闭rowset 打开的文件句柄
  28.     // 2. 把rowset 从txn map中移除
  29.     // 3. 这可以不删除rowset对应的件,因为垃圾清理线程会清理它们
  30. }
  31. // 当service 层收到publish version task的时候调用这个api
  32. Status publish_txn(int64_t txn_id)
  33. {
  34.     // 1. 找到txn对应的rowset,把rowset的状态变更更为 VISIBLE
  35.     // 2. 持久化rowset meta到meta store中
  36.     // 3. 把rowset添加到tablet对应的rowset 列列表中
  37. }
  38. // 清理掉txn关联的rowset
  39. Status clear_txn(int64_t txn_id) {
  40.     // 1. 如果txn 关联的rowset meta已经持久化到meta store中,那么需要从meta store中清理。
  41. }
  42. // 创建个alter task, 在be端不区分schema change和rollup,统一用alter task来区分。
  43. Status create_alter_task(AlterTableReq* req, AlterTask* alter_task)
  44. {
  45.     // 1. 选择个data dir
  46.     // 2. 在data dir下创建个tablet
  47.     // 3. 把tablet信息填充到 alter_task
  48.     // 4. 把tablet信息添加到storage engine的map
  49.     // 5. 在tablet的meta中记录一个状态,这个状态要持久化,否则会有问题。
  50. }
  51. // 当alter task结束后,调用这个api,把新的rowset添加到tablet的rowset列列表中
  52. Status finish_alter_task(AlterTask* alter_task)
  53. {
  54.     // 1. 读取创建的新的rowset,把他们加入到新的tablet中
  55.     // 2. 清理一下alter task的状态
  56.     // 3. 持久化2个tablet的meta状态
  57. }
  58. Status cancel_alter_task(AlterTask* alter_task) {
  59.     // 1. 删除创建的新的tablet
  60.     // 2. 清理正在操作的tablet的alter status
  61. }
  62. // 清除一个tablet上标记的alter状态
  63. Status clear_alter_status(int64_t tablet_id);
  64. Status create_clone_task(CloneTask* clone_task)
  65. {
  66. }
  67. // clone task本质上是从远端复制一堆rowset到本地,当cloneTask结束的时候,调用这个api
  68. // 这个api会拿到clone Task成的rowset,把他们加入到tablet的rowset列列表中。
  69. Status finish_clone_task(CloneTask* clone_task)
  70. {
  71.     // 1. 取出clone task中生成的rowset 把rowset 添加到tablet的meta中
  72.     // 2. 持久化tabletmeta
  73.     // 3. 把rowset 添加到tablet的active rowset列表中
  74. }
  75. Status make_snapshot(int64_t tablet_id, TabletSnapshot* snapshot)
  76. {
  77.     // 创建一个snapshot
  78. }
  79. // 把个tablet从一个录移动到另外一个录。
  80. Status create_migrate_task(int64_t tablet_id, Path dest_path)
  81. {
  82.     // 1. 修改tablet的default dir,这样新来的数据就都可以往新的dir写了
  83.     // 2. 持久化tablet meta
  84.     // 3. 返回所有visible的rowset和正在写的rowset。
  85.     // 4. 把要转化的rowset的信息填充到migrate task中。
  86. }
  87. Status finish_migrate_task(MigrateTask* migrate_task);
  88. Status create_compaction_task(CompactionTask* compaction_task)
  89. {
  90.     // 根据compaction policy选择个最合适的tablet
  91.     // 选择一堆rowset,创建一个rowset writer 填充到task中
  92. }
  93. Status finish_compaction_task()
  94. {
  95.     // 读取compaction task 成的rowset
  96.     // 变更tablet meta,持久化tabletmeta
  97.     // 变更tablet 中的rowset列表
  98. }
  99. Status cancel_compaction_task();
  100. Status drop_tablet()
  101. {
  102.     tablet_meta->set_state(TabletState.DELETING);
  103.     store = tablet_info->get_store();
  104.     store->save_meta(tablet_meta);
  105.     store->delete_tablet(tablet_meta);
  106.     tablet_meta->set_state(TabletState.DELETED);
  107.     store->save_meta(tablet_meta);
  108. }
  109. Status create_tablet(CreateTabletReq* req)
  110. {
  111.     OlapStore* store = _select_store();
  112.     TabletMeta* tablet_meta = _create_new_meta(req);
  113.     store->create_tablet(tablet_meta);store->save_meta(tablet_meta);
  114. }
  115. // 系统启动的时候只是根据data dir的配置,读取meta store中所有的tablet和rowset的元数据信息
  116. // 并没有读取具体的物理件的信息,因为读取物理文件信息在rowset规模特别大的时候会非常慢
  117. Status load()
  118. {
  119.     vector<DataDir> data_dirs = 根据配置文件读取所有的dir
  120.     for (DataDir dir : data_dirs) {
  121.         dir.load_tablets(vector<TabletMeta*> tablets);
  122.         for (TabletMeta* talbet_meta : tablets) {
  123.             // 构建tablet 对象
  124.             for (RowsetMeta* rowset_meta : tablet_meta -> get_rowsets) {
  125.                 // 构建txn map
  126.             }
  127.         }
  128.     }
  129. }
  130. // 这是个垃圾回收的操作,BE运过程中会产许多垃圾,如当rowset在写的时候,写了部分,此时文件系统core掉了会产
  131. // rowset 的垃圾文件;当tablet被删除的时候可能会产生tablet的垃圾文件
  132. //
  133. Status garbge_collection()
  134. {
  135.     for (TabletInfo* tablet_info : tablet_infos)
  136.     {
  137.         // 清理理所有的垃圾的tablet
  138.         if (tablet_info->tablet_meta->tablet_state == TabletState.DELETING)
  139.         {
  140.             tablet_info->store()->delete_tablet(tablet_meta);
  141.             tablet_meta->set_state(TabletState.DELETED);
  142.             store->save_meta(tablet_meta);
  143.             continue;
  144.         }
  145.         // 清理tablet中用的rowset文件, get all tablets 的实现目前考虑的就是根据文件名的前缀作为rowsetid
  146.         rowset_ids = tablet_info->get_tablet()->get_all_rowsets();
  147.         for (int64_t rowset_id : rowset_ids)
  148.         {
  149.             if (rowset_id not in committed rowsets
  150.                 && rowset id not in prepare rowsets
  151.                 && rowset id not in writing rowsets
  152.             )
  153.             {
  154.                 tablet_info->get_tablet()->delete_rowset(rowset_id);
  155.             }
  156.         }
  157.     }
  158.     // 垃圾的txn不要在这清理, storage engine会**定时的把txn向fe汇报,fe会下发清理txn的任务,收到clear txn任务的时候清理**
  159. } } ```
复制代码



错误处理逻辑

抽象一个错误处理的队列,发生错误后,如果在当前线程中不能直接处理,那么需要把错误消息封装一下直接发送给错误队列,由各个不同的任务线程,从队列中读取错误信息来处理。

最新经典文章,欢迎关注公众号


作者:wingsgo
来源:https://wingsgo.github.io/2020/0 ... _refactor_2019.html


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条