分享

Doris源码解析[二、异步任务之Schema Change]

hyj 2021-2-22 07:21:57 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 2799

问题导读

1.在Doris中,哪些任务都是异步执行?
2.异步执行的主要流程包含哪些?
3.FE端用户修改表结构有哪几个状态?


Doris提交Schema Change策略
在Doris中,许多任务都是异步执行的,例如创建tablet,删除tablet,修改表结构,Broker Load等等,这一类的作业的主要流程为:

  • 在FE根据task_type可以分为多种任务,AgentTask为所有任务的父类,AgentTask及其子类封装了包括backendId, signature, taskType等任务的信息,在RPC调用时FE通过调用toAgentTaskRequest方法,将该类转化为TAgentTaskRequest然后将调用RPC方法接口submit_task将请求下发给BE执行。
  • AgentServer在初始化时,会调用类中根据task_type划分的多个std::unique_ptr<TaskWorkerPool>的实例的start方法,该方法会根据task_type为每个实例绑定不同的回调函数,并根据参数,对每种task生成指定数量的线程去执行对应的任务。BE接收任务请求参数TAgentTaskRequest,AgentServer::submit_task方法会解析TAgentTaskRequest,并根据task_type转发给不同类型的TaskWorkPool实例,该实例会调用TaskWorkPool::submit_task方法,将TAgentTaskRequest提交到std::deque<TAgentTaskRequest>队列中等待调度,而TaskWorkPool类中有多种回调函数,当一个任务提交完成后,同时会调用_worker_thread_condition_lock.notify()方法唤醒一个消费者线程去执行任务。
  • 当线程对应回调方法的函数中的条件变量唤醒时(即worker_pool_this->_worker_thread_condition_lock.wait();),该线程会开始执行任务,并在执行完成后在方法TaskWorkerPool::_finish_task中进行RPC调用,向FE汇报任务完成的结果TFinishTaskRequest,最后调用TaskWorkerPool::_remove_task_info方法将该任务从队列中移除,完成整个Schema Change的任务逻辑



FE端逻辑
以下从源码层面进一步剖析整个作业的执行逻辑,首先从FE入手,在用户修改表结构时,任务有4个状态,即pending,waitingTxn, running及finished,在进行类型转换时,Doris首先会在pending阶段,下发给BE创建replicas的任务,具体的代码如下:


  1. protected void runPendingJob() throws AlterCancelException {
  2.     Preconditions.checkState(jobState == JobState.PENDING, jobState);
  3.     LOG.info("begin to send create replica tasks. job: {}", jobId);
  4.     Database db = Catalog.getCurrentCatalog().getDb(dbId);
  5.     if (db == null) {
  6.         throw new AlterCancelException("Databasee " + dbId + " does not exist");
  7.     }
  8.     // 1. create replicas
  9.     AgentBatchTask batchTask = new AgentBatchTask();
  10.     // count total replica num
  11.     int totalReplicaNum = 0;
  12.     for (MaterializedIndex shadowIdx : partitionIndexMap.values()) {
  13.         for (Tablet tablet : shadowIdx.getTablets()) {
  14.             totalReplicaNum += tablet.getReplicas().size();
  15.         }
  16.     }
  17.     MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<>(totalReplicaNum);
  18.     db.readLock();
  19.     try {
  20.         OlapTable tbl = (OlapTable) db.getTable(tableId);
  21.         if (tbl == null) {
  22.             throw new AlterCancelException("Table " + tableId + " does not exist");
  23.         }
  24.         boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(),
  25.                 Catalog.getCurrentCatalog().getTabletScheduler(),
  26.                 db.getClusterName());
  27.         if (!isStable) {
  28.             errMsg = "table is unstable";
  29.             LOG.warn("doing schema change job: " + jobId + " while table is not stable.");
  30.             return;
  31.         }
  32.         Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
  33.         for (long partitionId : partitionIndexMap.rowKeySet()) {
  34.             Partition partition = tbl.getPartition(partitionId);
  35.             if (partition == null) {
  36.                 continue;
  37.             }
  38.             TStorageMedium storageMedium = tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
  39.             
  40.             Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
  41.             for (Map.Entry<Long, MaterializedIndex> entry : shadowIndexMap.entrySet()) {
  42.                 long shadowIdxId = entry.getKey();
  43.                 MaterializedIndex shadowIdx = entry.getValue();
  44.                
  45.                 short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId);
  46.                 List<Column> shadowSchema = indexSchemaMap.get(shadowIdxId);
  47.                 int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).second;
  48.                 int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
  49.                
  50.                 for (Tablet shadowTablet : shadowIdx.getTablets()) {
  51.                     long shadowTabletId = shadowTablet.getId();
  52.                     List<Replica> shadowReplicas = shadowTablet.getReplicas();
  53.                     for (Replica shadowReplica : shadowReplicas) {
  54.                         long backendId = shadowReplica.getBackendId();
  55.                         countDownLatch.addMark(backendId, shadowTabletId);
  56.                         CreateReplicaTask createReplicaTask = new CreateReplicaTask(
  57.                                 backendId, dbId, tableId, partitionId, shadowIdxId, shadowTabletId,
  58.                                 shadowShortKeyColumnCount, shadowSchemaHash,
  59.                                 Partition.PARTITION_INIT_VERSION, Partition.PARTITION_INIT_VERSION_HASH,
  60.                                 tbl.getKeysType(), TStorageType.COLUMN, storageMedium,
  61.                                 shadowSchema, bfColumns, bfFpp, countDownLatch);
  62.                         createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId), originSchemaHash);
  63.                         
  64.                         batchTask.addTask(createReplicaTask);
  65.                     } // end for rollupReplicas
  66.                 } // end for rollupTablets
  67.             }
  68.         }
  69.     } finally {
  70.         db.readUnlock();
  71.     }
  72.     if (!FeConstants.runningUnitTest) {
  73.         // send all tasks and wait them finished
  74.         AgentTaskQueue.addBatchTask(batchTask);
  75.         AgentTaskExecutor.submit(batchTask);
  76.         // max timeout is 1 min
  77.         long timeout = Math.min(Config.tablet_create_timeout_second * 1000L * totalReplicaNum, 60000);
  78.         boolean ok = false;
  79.         try {
  80.             ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
  81.         } catch (InterruptedException e) {
  82.             LOG.warn("InterruptedException: ", e);
  83.             ok = false;
  84.         }
  85.         if (!ok) {
  86.             // create replicas failed. just cancel the job
  87.             // clear tasks and show the failed replicas to user
  88.             AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
  89.             String errMsg = null;
  90.             if (!countDownLatch.getStatus().ok()) {
  91.                 errMsg = countDownLatch.getStatus().getErrorMsg();
  92.             } else {
  93.                 List<Entry<Long, Long>> unfinishedMarks = countDownLatch.getLeftMarks();
  94.                 // only show at most 3 results
  95.                 List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 3));
  96.                 errMsg = "Error replicas:" + Joiner.on(", ").join(subList);
  97.             }
  98.             LOG.warn("failed to create replicas for job: {}, {}", jobId, errMsg);
  99.             throw new AlterCancelException("Create replicas failed. Error: " + errMsg);
  100.         }
  101.     }
  102.     // create all replicas success.
  103.     // add all shadow indexes to catalog
  104.     db.writeLock();
  105.     try {
  106.         OlapTable tbl = (OlapTable) db.getTable(tableId);
  107.         if (tbl == null) {
  108.             throw new AlterCancelException("Table " + tableId + " does not exist");
  109.         }
  110.         Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
  111.         addShadowIndexToCatalog(tbl);
  112.     } finally {
  113.         db.writeUnlock();
  114.     }
  115.     this.watershedTxnId = Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();
  116.     this.jobState = JobState.WAITING_TXN;
  117.     // write edit log
  118.     Catalog.getCurrentCatalog().getEditLog().logAlterJob(this);
  119.     LOG.info("transfer schema change job {} state to {}, watershed txn id: {}", jobId, this.jobState, watershedTxnId);
  120. }
复制代码
这段代码的主要逻辑就是首先在元数据中检查db、tbl、副本数、表的状态等信息,由于一个be需要生成多个tablet,因此使用MarkedCountDownLatch记录任务数量,并生成对应的CreateReplicaTask任务,然后将多个CreateReplicaTask添加到实现了Runnable接口的AgentBatchTask类的私有成员变量private Map<Long, List<AgentTask>> backendIdToTasks中,在AgentBatchTask.run方法中,会将List<AgentTask>转化为List<TAgentTaskRequest>,然后通过thrift接口AgentService.TAgentResult submit_tasks(1:list<AgentService.TAgentTaskRequest> tasks);向BE发送RPC调用,BE完成后,会调用thrift接口MasterService.TMasterResult finishTask(1:MasterService.TFinishTaskRequest request); 向FE汇报BE的任务执行结果。

随后,该方法会调用AgentTaskQueue.addBatchTask(batchTask);及AgentTaskExecutor.submit(batchTask);方法,记录并提交batchTask,AgentTaskExecutor内部是通过Executors.newCachedThreadPool来创造线程并执行实现了Runnable接口的AgentBatchTask。

在生成了多个执行任务线程后,主线程会调用ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);方法,阻塞并等待所有执行提交任务的子线程完成。


BE端逻辑
在BE端,我们首先查看AgentServer类,该类的主要私有成员及构造函数如下,在构造函数中,首先有多个std::unique_ptr<TaskWorkerPool>类的实例化对象,每个实例化对象代表一类任务,在AgentServer类初始化时,会根据配置参数,对每个实例化对象调用TaskWorkerPool::start函数,生成指定数量的线程并绑定对应的回调函数。

  1. std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
  2. std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
  3. std::unique_ptr<TaskWorkerPool> _push_workers;
  4. std::unique_ptr<TaskWorkerPool> _publish_version_workers;
  5. std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
  6. std::unique_ptr<TaskWorkerPool> _delete_workers;
  7. std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
  8. std::unique_ptr<TaskWorkerPool> _clone_workers;
  9. std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_workers;
  10. std::unique_ptr<TaskWorkerPool> _check_consistency_workers;
  11. // These 3 worker-pool do not accept tasks from FE.
  12. // It is self triggered periodically and reports to Fe master
  13. std::unique_ptr<TaskWorkerPool> _report_task_workers;
  14. std::unique_ptr<TaskWorkerPool> _report_disk_state_workers;
  15. std::unique_ptr<TaskWorkerPool> _report_tablet_workers;
  16. std::unique_ptr<TaskWorkerPool> _upload_workers;
  17. std::unique_ptr<TaskWorkerPool> _download_workers;
  18. std::unique_ptr<TaskWorkerPool> _make_snapshot_workers;
  19. std::unique_ptr<TaskWorkerPool> _release_snapshot_workers;
  20. std::unique_ptr<TaskWorkerPool> _move_dir_workers;
  21. std::unique_ptr<TaskWorkerPool> _recover_tablet_workers;
  22. std::unique_ptr<TaskWorkerPool> _update_tablet_meta_info_workers;
复制代码

  1. AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) :
  2.         _exec_env(exec_env),
  3.         _master_info(master_info),
  4.         _topic_subscriber(new TopicSubscriber()) {
  5.     for (auto& path : exec_env->store_paths()) {
  6.         try {
  7.             string dpp_download_path_str = path.path + DPP_PREFIX;
  8.             boost::filesystem::path dpp_download_path(dpp_download_path_str);
  9.             if (boost::filesystem::exists(dpp_download_path)) {
  10.                 boost::filesystem::remove_all(dpp_download_path);
  11.             }
  12.         } catch (...) {
  13.             LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
  14.         }
  15.     }
  16.     // It is the same code to create workers of each type, so we use a macro
  17.     // to make code to be more readable.
  18. #ifndef BE_TEST
  19. #define CREATE_AND_START_POOL(type, pool_name)         \
  20.     pool_name.reset(new TaskWorkerPool(                \
  21.                 TaskWorkerPool::TaskWorkerType::type,  \
  22.                 _exec_env,                             \
  23.                 master_info));                         \
  24.     pool_name->start();
  25. #else
  26. #define CREATE_AND_START_POOL(type, pool_name)
  27. #endif // BE_TEST
  28.     CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
  29.     CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
  30.     // Both PUSH and REALTIME_PUSH type use _push_workers
  31.     CREATE_AND_START_POOL(PUSH, _push_workers);
  32.     CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
  33.     CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers);
  34.     CREATE_AND_START_POOL(DELETE, _delete_workers);
  35.     CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
  36.     CREATE_AND_START_POOL(CLONE, _clone_workers);
  37.     CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers);
  38.     CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
  39.     CREATE_AND_START_POOL(REPORT_TASK, _report_task_workers);
  40.     CREATE_AND_START_POOL(REPORT_DISK_STATE, _report_disk_state_workers);
  41.     CREATE_AND_START_POOL(REPORT_OLAP_TABLE, _report_tablet_workers);
  42.     CREATE_AND_START_POOL(UPLOAD, _upload_workers);
  43.     CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
  44.     CREATE_AND_START_POOL(MAKE_SNAPSHOT, _make_snapshot_workers);
  45.     CREATE_AND_START_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers);
  46.     CREATE_AND_START_POOL(MOVE, _move_dir_workers);
  47.     CREATE_AND_START_POOL(RECOVER_TABLET, _recover_tablet_workers);
  48.     CREATE_AND_START_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers);
  49. #undef CREATE_AND_START_POOL
  50. #ifndef BE_TEST
  51.     // Add subscriber here and register listeners
  52.     TopicListener* user_resource_listener = new UserResourceListener(exec_env, master_info);
  53.     LOG(INFO) << "Register user resource listener";
  54.     _topic_subscriber->register_listener(doris::TTopicType::type::RESOURCE, user_resource_listener);
  55. #endif
  56. }
复制代码


接下来在AgentServer中最重要的是AgentServer::submit_task方法,该方法是thrift调用接口的具体实现,接收List<TAgentTaskRequest>参数,并根据每个task的task_type将请求转发给TaskWorkPool::submit_task方法
  1. void AgentServer::submit_tasks(TAgentResult& agent_result, const vector<TAgentTaskRequest>& tasks) {
  2.     Status ret_st;
  3.     // TODO check master_info here if it is the same with that of heartbeat rpc
  4.     if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
  5.         Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
  6.         ret_st.to_thrift(&agent_result.status);
  7.         return;
  8.     }
  9.     for (auto task : tasks) {
  10.         VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
  11.         TTaskType::type task_type = task.task_type;
  12.         int64_t signature = task.signature;
  13. #define HANDLE_TYPE(t_task_type, work_pool, req_member)                         \
  14.     case t_task_type:                                                           \
  15.         if (task.__isset.req_member) {                                          \
  16.             work_pool->submit_task(task);                                       \
  17.         } else {                                                                \
  18.             ret_st = Status::InvalidArgument(strings::Substitute(               \
  19.                     "task(signature=$0) has wrong request member", signature)); \
  20.         }                                                                       \
  21.         break;
  22.         // TODO(lingbin): It still too long, divided these task types into several categories
  23.         switch (task_type) {
  24.         HANDLE_TYPE(TTaskType::CREATE, _create_tablet_workers, create_tablet_req);
  25.         HANDLE_TYPE(TTaskType::DROP, _drop_tablet_workers, drop_tablet_req);
  26.         HANDLE_TYPE(TTaskType::PUBLISH_VERSION, _publish_version_workers, publish_version_req);
  27.         HANDLE_TYPE(TTaskType::CLEAR_TRANSACTION_TASK,
  28.                     _clear_transaction_task_workers,
  29.                     clear_transaction_task_req);
  30.         HANDLE_TYPE(TTaskType::CLONE, _clone_workers, clone_req);
  31.         HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE,
  32.                     _storage_medium_migrate_workers,
  33.                     storage_medium_migrate_req);
  34.         HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY,
  35.                     _check_consistency_workers,
  36.                     check_consistency_req);
  37.         HANDLE_TYPE(TTaskType::UPLOAD, _upload_workers, upload_req);
  38.         HANDLE_TYPE(TTaskType::DOWNLOAD, _download_workers, download_req);
  39.         HANDLE_TYPE(TTaskType::MAKE_SNAPSHOT, _make_snapshot_workers, snapshot_req);
  40.         HANDLE_TYPE(TTaskType::RELEASE_SNAPSHOT, _release_snapshot_workers, release_snapshot_req);
  41.         HANDLE_TYPE(TTaskType::MOVE, _move_dir_workers, move_dir_req);
  42.         HANDLE_TYPE(TTaskType::RECOVER_TABLET, _recover_tablet_workers, recover_tablet_req);
  43.         HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
  44.                     _update_tablet_meta_info_workers,
  45.                     update_tablet_meta_info_req);
  46.         case TTaskType::REALTIME_PUSH:
  47.         case TTaskType::PUSH:
  48.             if (!task.__isset.push_req) {
  49.                 ret_st = Status::InvalidArgument(strings::Substitute(
  50.                         "task(signature=$0) has wrong request member", signature));
  51.                 break;
  52.             }
  53.             if (task.push_req.push_type == TPushType::LOAD
  54.                     || task.push_req.push_type == TPushType::LOAD_DELETE) {
  55.                 _push_workers->submit_task(task);
  56.             } else if (task.push_req.push_type == TPushType::DELETE) {
  57.                 _delete_workers->submit_task(task);
  58.             } else {
  59.                 ret_st = Status::InvalidArgument(strings::Substitute(
  60.                         "task(signature=$0, type=$1, push_type=$2) has wrong push_type",
  61.                         signature, task_type, task.push_req.push_type));
  62.             }
  63.             break;
  64.         case TTaskType::ALTER:
  65.             if (task.__isset.alter_tablet_req || task.__isset.alter_tablet_req_v2) {
  66.                 _alter_tablet_workers->submit_task(task);
  67.             } else {
  68.                 ret_st = Status::InvalidArgument(strings::Substitute(
  69.                         "task(signature=$0) has wrong request member", signature));
  70.             }
  71.             break;
  72.         default:
  73.             ret_st = Status::InvalidArgument(strings::Substitute(
  74.                     "task(signature=$0, type=$1) has wrong task type", signature, task_type));
  75.             break;
  76.         }
  77. #undef HANDLE_TYPE
  78.         if (!ret_st.ok()) {
  79.             LOG(WARNING) << "fail to submit task. reason: " << ret_st.get_error_msg()
  80.                     << ", task: " << task;
  81.             // For now, all tasks in the batch share one status, so if any task
  82.             // was failed to submit, we can only return error to FE(even when some
  83.             // tasks have already been successfully submitted).
  84.             // However, Fe does not check the return status of submit_tasks() currently,
  85.             // and it is not sure that FE will retry when something is wrong, so here we
  86.             // only print an warning log and go on(i.e. do not break current loop),
  87.             // to ensure every task can be submitted once. It is OK for now, because the
  88.             // ret_st can be error only when it encounters an wrong task_type and
  89.             // req-member in TAgentTaskRequest, which is basically impossible.
  90.             // TODO(lingbin): check the logic in FE again later.
  91.         }
  92.     }
  93.     ret_st.to_thrift(&agent_result.status);
  94. }
复制代码
然后我们进入TaskWorkPool函数中,可以看到主要有一下的函数指针,每个函数指针会绑定一个回调函数,该回调函数在TaskWorkPool::start方法中被绑定
  1. static void* _create_tablet_worker_thread_callback(void* arg_this);
  2. static void* _drop_tablet_worker_thread_callback(void* arg_this);
  3. static void* _push_worker_thread_callback(void* arg_this);
  4. static void* _publish_version_worker_thread_callback(void* arg_this);
  5. static void* _clear_transaction_task_worker_thread_callback(void* arg_this);
  6. static void* _alter_tablet_worker_thread_callback(void* arg_this);
  7. static void* _clone_worker_thread_callback(void* arg_this);
  8. static void* _storage_medium_migrate_worker_thread_callback(void* arg_this);
  9. static void* _check_consistency_worker_thread_callback(void* arg_this);
  10. static void* _report_task_worker_thread_callback(void* arg_this);
  11. static void* _report_disk_state_worker_thread_callback(void* arg_this);
  12. static void* _report_tablet_worker_thread_callback(void* arg_this);
  13. static void* _upload_worker_thread_callback(void* arg_this);
  14. static void* _download_worker_thread_callback(void* arg_this);
  15. static void* _make_snapshot_thread_callback(void* arg_this);
  16. static void* _release_snapshot_thread_callback(void* arg_this);
  17. static void* _move_dir_thread_callback(void* arg_this);
  18. static void* _recover_tablet_thread_callback(void* arg_this);
  19. static void* _update_tablet_meta_worker_thread_callback(void* arg_this);
复制代码
最后是被AgentServer::submit_task转发调用的TaskWorkPool::submit_task方法,在该方法中,每个task会被依次塞入std::deque<TAgentTaskRequest> _tasks;队列中(先进先出),然后调用_worker_thread_condition_lock.notify();方法唤醒一个task线程执行任务,即一个典型的生产-消费者模型

  1. void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
  2.     const TTaskType::type task_type = task.task_type;
  3.     int64_t signature = task.signature;
  4.     std::string type_str;
  5.     EnumToString(TTaskType, task_type, type_str);
  6.     LOG(INFO) << "submitting task. type=" << type_str << ", signature=" << signature;
  7.     if (_register_task_info(task_type, signature)) {
  8.         // Set the receiving time of task so that we can determine whether it is timed out later
  9.         (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
  10.         size_t task_count_in_queue = 0;
  11.         {
  12.             lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
  13.             _tasks.push_back(task);
  14.             task_count_in_queue = _tasks.size();
  15.             _worker_thread_condition_lock.notify();
  16.         }
  17.         LOG(INFO) << "success to submit task. type=" << type_str << ", signature=" << signature
  18.                 << ", task_count_in_queue=" << task_count_in_queue;
  19.     } else {
  20.         LOG(INFO) << "fail to register task. type=" << type_str << ", signature=" << signature;
  21.     }
  22. }
复制代码
以_alter_tablet_worker_thread_callback任务为例,该任务是一个消费者,当生产者队列为空时会调用worker_pool_this->_worker_thread_condition_lock.wait();进行阻塞,直到被生产唤醒后,执行具体的任务逻辑,然后调用TaskWorkPool::_finish_task向FE汇报任务完成情况,最后通过TaskWorkPool::_remote_task_info将任务从队列std::deque<TAgentTaskRequest> _tasks;中移除,至此便完成了整个创建replica的逻辑。

  1. void* TaskWorkerPool::_alter_tablet_worker_thread_callback(void* arg_this) {
  2.     TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this;
  3. #ifndef BE_TEST
  4.     while (true) {
  5. #endif
  6.         TAgentTaskRequest agent_task_req;
  7.         {
  8.             lock_guard<Mutex> worker_thread_lock(worker_pool_this->_worker_thread_lock);
  9.             while (worker_pool_this->_tasks.empty()) {
  10.                 worker_pool_this->_worker_thread_condition_lock.wait();
  11.             }
  12.             agent_task_req = worker_pool_this->_tasks.front();
  13.             worker_pool_this->_tasks.pop_front();
  14.         }
  15.         int64_t signatrue = agent_task_req.signature;
  16.         LOG(INFO) << "get alter table task, signature: " <<  agent_task_req.signature;
  17.         bool is_task_timeout = false;
  18.         if (agent_task_req.__isset.recv_time) {
  19.             int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
  20.             if (time_elapsed > config::report_task_interval_seconds * 20) {
  21.                 LOG(INFO) << "task elapsed " << time_elapsed
  22.                           << " seconds since it is inserted to queue, it is timeout";
  23.                 is_task_timeout = true;
  24.             }
  25.         }
  26.         if (!is_task_timeout) {
  27.             TFinishTaskRequest finish_task_request;
  28.             TTaskType::type task_type = agent_task_req.task_type;
  29.             switch (task_type) {
  30.             case TTaskType::ALTER:
  31.                 worker_pool_this->_alter_tablet(worker_pool_this,
  32.                                             agent_task_req,
  33.                                             signatrue,
  34.                                             task_type,
  35.                                             &finish_task_request);
  36.                 break;
  37.             default:
  38.                 // pass
  39.                 break;
  40.             }
  41.             worker_pool_this->_finish_task(finish_task_request);
  42.         }
  43.         worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
  44. #ifndef BE_TEST
  45.     }
  46. #endif
  47.     return (void*)0;
  48. }
复制代码




最新经典文章,欢迎关注公众号


原文链接
https://wingsgo.github.io/2020/0 ... ma_change_task.html

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条