logplus/Workflow/WFEngine/WorkflowEngine/include/ModuleContainer.h

810 lines
32 KiB
C
Raw Normal View History

2026-01-16 17:18:41 +08:00
/*
* ModuleContainer.h
*
* Created on: 2011-5-6
* Author: dev
*/
//TODO use composite pattern?
#ifndef PAI_FRAME_WORKFLOWENGINE_MODULECONTAINER_H
#define PAI_FRAME_WORKFLOWENGINE_MODULECONTAINER_H
#include <vector>
#include <map>
#include <algorithm>
#include "pb_gen/splitInfo.pb.h"
#include "MutiParallModule.h"
#include "ModuleContext.h"
#include "WorkflowContext.h"
#include "Log.h"
//#include "WorkflowJob.h"
#include "Thread.h"
#include "WorkflowRunner.h"
#include "ParameterItem.h"
#include "error.h"
//*************************模块参数Meta变量*****************************
#define MODULE_METADATA_IOMODULE_FILES "files"
#define MODULE_METADATA_IOMODULE_TRACESELECTION "traceselection"
#define MODULE_METADATA_IOMODULE_HASTRACESELECTION "hastraceselection"
#define MODULE_METADATA_IOMODULE_KEY "Key"
#define MODULE_METADATA_IOMODULE_START "Start"
#define MODULE_METADATA_IOMODULE_END "End"
#define MODULE_METADATA_IOMODULE_INCREMENT "Increment"
//********************************************************************
namespace pai {
namespace workflow {
using namespace pai::module;
using namespace std;
bool CompareModuleThread(CThread* one, CThread* another);
/**
* @brief ModuleContainer类用于描述工作流信息
*
* @remark
* -
* -
* --
* --
* --
*
*/
class PAI_WORKFLOWENGINE_EXPORT ModuleContainer {
public:
ModuleContainer():module_threads(), workflow(), reduce(), isClusterJob(false), isReduce(false), context(), m_split(NULL)
{
};
ModuleContainer(bool clusterJob,bool p_reduce):module_threads(), workflow(), reduce(), isClusterJob(clusterJob), isReduce(p_reduce),\
context(), m_split(NULL)
{
};
ModuleContainer(const workflow_info& p_workflow):module_threads(), workflow(p_workflow), reduce(), isClusterJob(true), isReduce(false),\
context(), m_split(NULL)
{
}
ModuleContainer(const ReduceInfo& p_reduce):module_threads(), workflow(), reduce(p_reduce), isClusterJob(true), isReduce(true), \
context(), m_split(NULL)
{
}
void SetContextSplit(const string& key, google::protobuf::Message* splitObj){
context.Put(key, splitObj, false);
}
virtual ~ModuleContainer()
{
//clear threads
std::vector<CThread*> threads = GetAllModuleThreads();
for (size_t idx = 0; idx < threads.size(); idx++) {
if(threads.at(idx)) {
delete threads.at(idx);
threads.at(idx) = NULL;
}
}
};
/**@brief 初始化工作流模块信息,主要包括:
* @remark
* -
* -
* - Module::SetModuleParameter()
* - modules容器中
*/
virtual void Initialize()
{
//从配置context中取出job_db_id
std::string strDBJobId = context.Get<std::string>(pai::module::KEY_JOB_DB_ID);
long long dbJobId = 0;
std::stringstream changeStr;
changeStr << strDBJobId;
changeStr >> dbJobId;
this->context.Put(pai::module::KEY_DATABASE_ID, dbJobId, false);
//
if(isReduce)
{
pai::log::Logger::SetLogger(pai::log::Logger::GetJobLogger(strDBJobId, reduce.taskInfo.attempt_id));
InitializeReduce();
}else{
pai::log::Logger::SetLogger(pai::log::Logger::GetJobLogger(strDBJobId, workflow.taskInfo.attempt_id));
InitializeGeneral();
}
SortModuleThreads();
};
/**
*
* 1. Module::AddNextModule()
* 2. Module::SetInputBuffer()Module::SetOutputBuffer()
*
*/
virtual void DescribeRelation();
/**
*
*/
void InitializeGeneral();
/**
*PSTM等并行模块Reduce方法
*/
void InitializeReduce();
/**
* Module::validate()
* validate()
* validate()
*/
virtual bool CheckConfiguration(){
bool result = true;
for (size_t idx = 0; idx < module_threads.size() && result; idx++) {
CModuleCheckResult me(static_cast<int>(idx));
result &= module_threads.at(idx)->GetModule()->validate(me);
}
return result;
};
/**
* 退
* 线退
*/
virtual bool IsAllModuleFinished(){
bool result = true;
for (size_t idx = 0; idx < module_threads.size(); idx++ ) {
result &= module_threads.at(idx)->GetThreadStatus() == pai::workflow::EXIT;
}
return result;
};
void SetUp(){
for (size_t idx = 0; idx < module_threads.size(); idx++) {
pai::log::Info(module_threads.at(idx)->GetModule()->GetMetaData()->GetName() + " ready setup");
module_threads.at(idx)->GetModule()->SetUp();
pai::log::Info(module_threads.at(idx)->GetModule()->GetMetaData()->GetName() + " setup finished.");
}
}
const std::vector<CThread*> GetAllModuleThreads() const { return module_threads;}
/**
*id查找Module
*/
CModule* FindModule(const std::string& runtime_id)
{
for (size_t idx = 0; idx < module_threads.size(); idx++) {
if(module_threads.at(idx)->GetModule()->GetModuleContext().module_runtime_id==runtime_id)
return module_threads.at(idx)->GetModule();
}
return NULL;
};
/**
*
*/
CModuleSelfSplitSupport* FindSelfSplitModule()
{
for (size_t idx = 0; idx < module_threads.size(); idx++) {
CModule* module = module_threads.at(idx)->GetModule();
CModuleSelfSplitSupport* pModule = dynamic_cast<CModuleSelfSplitSupport*>(module);
if(pModule!=NULL)
return pModule;
}
return NULL;
};
/**
*
*/
CMutiParallModule* FindMutiParallModule()
{
for (size_t idx = 0; idx < module_threads.size(); idx++) {
CModule* module = module_threads.at(idx)->GetModule();
CMutiParallModule* pModule = dynamic_cast<CMutiParallModule*>(module);
if(pModule!=NULL)
return pModule;
}
return NULL;
};
/**
*Module
*PSTMModule
*/
CModule* FindModuleByClassName(const std::string& moduleClassName)
{
for (size_t idx = 0; idx < module_threads.size(); idx++) {
if(module_threads.at(idx)->GetModule()->GetMetaData()->GetName()==moduleClassName)
return module_threads.at(idx)->GetModule();
}
return NULL;
};
/*
* @brief reset workflow paramitems
*/
void ResetParamItems()
{
if(!isReduce)
ResetGeneralParamItems();
else
ResetReduceParamItems();
};
/*
* @brief reset general workflow paramitems
*/
void ResetGeneralParamItems()
{
//TODO:设计不太好,General Workflow & Reduce Workflow需要在抽象一层
// 当是一个General Workflow时只能访问属于自己的对象
// 如:只能workflow object,不能访问reduce object
//Reset inputmodule
for(size_t i = 0 ;i < workflow.inputs.size();i++)
{
ResetInputParamItems(workflow.inputs.at(i));
}
//Reset outputmodule
for(size_t i = 0 ;i < workflow.outputs.size();i++)
{
ResetOutputParamItems(workflow.outputs.at(i));
}
//TODO:需要重构因为假设只有一个SelfSplitModule)
//if has slefsplit set->module
CModuleSelfSplitSupport* pModule = FindSelfSplitModule();
if((pModule!=NULL) && (m_split!=NULL)){
pModule->setSplit(m_split);
}
//if has CMutiParallModule,fill cluster parameters
//TODO:需要重构因为假设只有一个MutiParallModule)
CMutiParallModule* mutiParallModule = FindMutiParallModule();
if(mutiParallModule!=NULL){
mutiParallModule->SetCurrentPhase(pai::module::Map);
}
};
/*
* @brief reset reduce worklfow paramitems
*/
void ResetReduceParamItems()
{
//Reset outputmodule
for(size_t i = 0 ;i < reduce.outputs.size();i++)
{
ResetOutputParamItems(reduce.outputs.at(i));
}
//TODO:需要重构因为假设只有一个SelfSplitModule)
//if has slefsplit set->module
CModuleSelfSplitSupport* pModule = FindSelfSplitModule();
if((pModule!=NULL) && (m_split!=NULL)){
pModule->setSplit(m_split);
}
//if has CMutiParallModule,fill cluster parameters
//TODO:需要重构因为假设只有一个MutiParallModule)
CMutiParallModule* mutiParallModule = FindMutiParallModule();
if(mutiParallModule!=NULL){
mutiParallModule->SetCurrentPhase(pai::module::Reduce);
mutiParallModule->SetPreJobId(reduce.preJobId);
mutiParallModule->setParamItems(reduce.parameters);
}
};
void initModuleContext()
{
if(!isReduce)
initModuleContext(workflow.taskInfo);
else
initModuleContext(reduce.taskInfo);
};
void initModuleContext(const TaskInfo& taskInfo)
{
map<string, vector<string> > id2Branch;
GetBranch(id2Branch);
this->context.Put("job_id", taskInfo.job_id, false);
this->context.Put("hostname", taskInfo.hostname, false);
this->context.Put("task_id", taskInfo.attempt_id, false);
for (size_t idx = 0; idx < module_threads.size(); idx++)
{
/// deprecated begin
SModuleContext m_context = module_threads.at(idx)->GetModule()->GetModuleContext();
m_context.taskInfo = taskInfo;
m_context.module_runtime_env = "";
m_context.module_runtime_env.append("[");
m_context.module_runtime_env.append(taskInfo.hostname);
m_context.module_runtime_env.append("][");
m_context.module_runtime_env.append(taskInfo.attempt_id);
m_context.module_runtime_env.append("]");
module_threads.at(idx)->GetModule()->SetModuleContext(m_context);
/// deprecated end
if(id2Branch.count(m_context.module_runtime_id) > 0)
{
std::vector<std::string> branchs = id2Branch.find(m_context.module_runtime_id)->second;
//不能使用 module_threads.at(idx)->GetModule()->GetWorkflowContext().SetBranchs(branchs);
//GetModule()->GetWorkflowContext()返回一个const对象
context.branchs = branchs;
}else{
throw pai::error::runtime_error("not found branch in workflow!");
}
module_threads.at(idx)->GetModule()->SetWorkflowContext(this->context);
}
}
void GetBranch(map<string, vector<string> >& id2Branch)
{
vector<string> allBranchs;//存储所有分支
vector<string> tail;//存储分支的尾
map<string, vector<string>* > dest2Source;//存储detst-》【source。。。】
for(size_t i = 0; i<module_threads.size(); i++)
{
SModuleContext moduleContext = module_threads.at(i)->GetModule()->GetModuleContext();
id2Branch.insert(pair<string, vector<string> >(moduleContext.module_runtime_id, vector<string>()) );
if(moduleContext.module_indegree != 0)
{
dest2Source[moduleContext.module_runtime_id] = new vector<string>();
}
if(moduleContext.module_outdegree == 0)
{
tail.push_back(moduleContext.module_runtime_id);
}
int in_port = 0;
for(int indegree=0; indegree<moduleContext.module_indegree; indegree++)
{
//找到模块的输入buffer
CBuffer* input_buffer = NULL;
while(in_port < module_threads.at(i)->GetModule()->GetMetaData()->GetInputPortCount()
&& (input_buffer = module_threads.at(i)->GetModule()->GetInputBuffer(in_port++)) == NULL){}
//通过比较buffer是否相同找到这个模块的上游模块
for(size_t j = 0; j < module_threads.size(); j++)
{
SModuleContext moduleContext_tmp = module_threads.at(j)->GetModule()->GetModuleContext();
//模块有多个输出端口使用GetModule()->GetMetaData()->GetOutputPortCount()可以获取到,
//moduleContext_tmp.module_outdegree代表工作流中模块的实际输出个数
bool findEqualBuffer = false;
for(int outdegree = 0; outdegree < moduleContext_tmp.module_outdegree; outdegree++)
{
for(int out_port = 0; out_port < module_threads.at(j)->GetModule()->GetMetaData()->GetOutputPortCount(); out_port++)
{
if(input_buffer == module_threads.at(j)->GetModule()->GetOutputBuffer(out_port))
{
findEqualBuffer = true;
break;
}
}
if(findEqualBuffer){
dest2Source.find(moduleContext.module_runtime_id)->second->push_back(moduleContext_tmp.module_runtime_id);
break;
}
}
}
}
}
for(vector<string>::iterator it = tail.begin(); it < tail.end(); it++)
{
string workflowBranch;
TravBackwardConnect(*it, workflowBranch, dest2Source, allBranchs);
}
for(map<string, vector<string> >::iterator map_it = id2Branch.begin(); map_it != id2Branch.end(); map_it++)
{
string id = map_it->first;
for(vector<string>::iterator branchs_it = allBranchs.begin(); branchs_it != allBranchs.end(); branchs_it++)
{
if(branchs_it->substr(0, branchs_it->find_first_of("-")) == id || branchs_it->find("-"+id+"-") != string::npos
|| branchs_it->substr(branchs_it->find_last_of("-")+1) == id)
{
map_it->second.push_back(*branchs_it);
}
}
}
//print id2Branch
for(map<string, vector<string> >::iterator map_it = id2Branch.begin(); map_it != id2Branch.end(); map_it++)
{
string id = map_it->first;
vector<string> branchs = map_it->second;
std::cout<<"id is "<<id<<" [ ";
for(vector<string>::iterator v_it = branchs.begin(); v_it != branchs.end(); v_it++)
{
std::cout<<*v_it<<" ";
}
std::cout<<"]"<<std::endl;
}
for(map<string, vector<string>* >::iterator it = dest2Source.begin(); it != dest2Source.end(); it++)
{
delete it->second;
}
}
/**
* @brief .
* @WARN 使
*/
WorkflowContext GetWorkflowContext() const
{
return context;
}
void SetModuleSelfSplit(Split* split){
m_split = split;
};
Split* GetModuleSelfSplit() const{
return m_split;
}
/**
* @brief map中的key和valuecontext中
*/
void SetConfiguration(map<std::string,std::string>& config){
map<std::string,std::string>::iterator it;
for(it = config.begin(); it != config.end(); it++)
{
this->context.Put((*it).first, (*it).second, false);
}
}
protected:
std::vector<CThread*> module_threads;
workflow_info workflow;
ReduceInfo reduce;
bool isClusterJob;
bool isReduce;
private:
WorkflowContext context;
Split* m_split;
private:
ModuleContainer(const ModuleContainer & constainer);
ModuleContainer & operator=(const ModuleContainer & constainer);
/*
* @brief child paramitem
* @param[in] parentNode
* @param[in] childNode
* @param[in] eType
* @param[in] eInputType
* @param[in] strId Id
* @param[in] strValue
* @param[in] strName
* @param[in] strDescription
* @param[in] strDefault
* @param[in] strCategory
* @param[in] strMax
* @param[in] strMin
* @param[in] strInputMetaData
* @param[in] strInputData
* @param[in] bNecessary
*/
void AddParamItem(CCompositeParameterItem* parentNode,CParameterItem* childNode,
const ParameterType& eType,const ParameterInputType& eInputType,const std::string& strId,const std::string& strValue,
const std::string& strName,const std::string& strDescription,const std::string& strDefault,const std::string& strCategory,const std::string& strMax,const std::string& strMin,
const std::string& strInputMetaData,const std::string& strInputData,bool bNecessary)
{
AddParamItem(parentNode,childNode,
eType,eInputType,strId,strValue,
strName,strDescription,strDefault,strCategory,strMax,strMin,
strInputMetaData,strInputData,"","",bNecessary);
};
/*
* @brief child paramitem
* @param[in] parentNode
* @param[in] childNode
* @param[in] eType
* @param[in] eInputType
* @param[in] strId Id
* @param[in] strValue
* @param[in] strName
* @param[in] strDescription
* @param[in] strDefault
* @param[in] strCategory
* @param[in] strMax
* @param[in] strMin
* @param[in] strInputMetaData
* @param[in] strInputData
* @param[in] strInOut
* @param[in] strDataType
* @param[in] bNecessary
*/
void AddParamItem(CCompositeParameterItem* parentNode,CParameterItem* childNode,
const ParameterType& eType,const ParameterInputType& eInputType,const std::string& strId,const std::string& strValue,
const std::string& strName,const std::string& strDescription,const std::string& strDefault,const std::string& strCategory,const std::string& strMax,const std::string& strMin,
const std::string& strInputMetaData,const std::string& strInputData,const std::string& strInOut,const std::string& strDataType,bool bNecessary)
{
CParameterItem& parameterItem = *childNode;
parameterItem.SetType(eType);
parameterItem.SetInputType(eInputType);
parameterItem.SetId(strId);
parameterItem.SetStringValue(strValue);
parameterItem.SetName(strName);
parameterItem.SetDescription(strDescription);
parameterItem.SetDefault(strDefault);
parameterItem.SetCategory(strCategory);
parameterItem.SetMax(strMax);
parameterItem.SetMin(strMin);
parameterItem.SetInputMetaData(strInputMetaData);
parameterItem.SetInputData(strInputData);
parameterItem.SetIsNessary(bNecessary);
parameterItem.SetInOut(strInOut);
parameterItem.SetDataType(strDataType);
parentNode->AddParameterItem(parameterItem,false);
};
/*
* @brief outputmodule paramitems
* @param output workflow_output_info
*/
void ResetOutputParamItems(const workflow_output_info& output)
{
CModule* outputmodule = FindModule(output.module_context_id);
if(NULL==outputmodule)
throw pai::error::logic_error("ModuleContainer::ResetOutputParamItems: workflow runtime don't find runtimeid=" + output.module_context_id);
CModuleParameter* param = outputmodule->GetModuleParameter();
//Reset filter
for(size_t i = 0 ;i < output.filters.size();i++)
{
RestOutputFilter(param,output.filters.at(i));
}
};
/**
* @brief outputmodule参数项
* @param param outputmodule参数对象
* @param outFilter filter
*/
void RestOutputFilter(CModuleParameter* param,const workflow_output_filter& outFilter)
{
CParameterItem* fileNamesParamItem = GetParameterItemByRoot(param,outFilter.param_item_id);
fileNamesParamItem->SetStringValue(outFilter.filenames);
};
/**
* @brief inputmodule参数项
* @param input workflow_input_info
*/
void ResetInputParamItems(const workflow_input_info& input)
{
CModule* inputmodule = FindModule(input.module_context_id);
if(NULL==inputmodule)
throw pai::error::logic_error("ModuleContainer::ResetInputParamItems: workflow runtime don't find runtimeid=" + input.module_context_id);
CModuleParameter* param = inputmodule->GetModuleParameter();
//add filters
for(size_t i = 0 ;i < input.modify_filters.size();i++)
{
ModifyInputFilter(param,input.modify_filters.at(i));
}
//del filters
//TODO:按照moduleapi parameteritem的设计对于删除数组元素时必须倒退着删除。
if(input.del_filters.size() > 0)
{
size_t del_index = input.del_filters.size() - 1;
for(size_t i = 0 ;i < input.del_filters.size();i++)
{
DelInputFilter(param,input.del_filters.at(del_index--));
}
}
};
/**
* @brief inputmodule相关的参数项
* @param param
* @param delFilter filter
*/
void DelInputFilter(CModuleParameter* param,const workflow_del_input_filter& delFilter)
{
CCompositeParameterItem* filesParamItem = CastCCompositeParameterItem(GetParameterItemByRoot(param,MODULE_METADATA_IOMODULE_FILES));
bool bRemoveResult = filesParamItem->RemoveParameterItem(delFilter.param_item_id);
if(!bRemoveResult)
throw pai::error::logic_error("ModuleContainer::DelFilter: workflow runtime DelFilter remove paramitem error id=" + delFilter.param_item_id);
};
/**
* @brief inputmodule参数项
* @param param
* @param modifyFilter filter
*/
void ModifyInputFilter(CModuleParameter* param,const workflow_modify_input_filter& modifyFilter)
{
std::string traceselection_id = MODULE_METADATA_IOMODULE_TRACESELECTION;
//判断是否有“files[*].traceselection”参数项如果没有则添加,而且必须设置files[*].hastraceselection=true.
if(modifyFilter.modify_filter_type==NO_FILTER)
{
//设置files[*].hastraceselection=true
std::string hastraceselection = modifyFilter.param_item_id + "." + MODULE_METADATA_IOMODULE_HASTRACESELECTION;
CParameterItem* hasTraceselectionParamItem = GetParameterItemByRoot(param,hastraceselection);
hasTraceselectionParamItem->SetValue(true);
CCompositeParameterItem* fileParamItem = CastCCompositeParameterItem(GetParameterItemByRoot(param,modifyFilter.param_item_id));
CCompositeParameterItem* fileTraceSelectionParamItem = CastCCompositeParameterItem(fileParamItem->GetParameterItem(traceselection_id));
if(fileTraceSelectionParamItem==NULL)
{
CParameterItem parentItem;
parentItem.SetId(traceselection_id);
parentItem.SetType(pai::module::ARRAY);
//this is deepcopy
fileParamItem->AddParameterItem(parentItem,true);
}
else{
//因为界面保存的workflwo.json时,存在不勾选HasTraceSelection,但是里面有过滤参数项的情况,
//因此必须要清除掉这些废弃子项这些过滤子项必须通过切片重新Add
fileTraceSelectionParamItem->Clear();
}
}
//是否是添加一个过滤参数项目modify_filter_type=0,1
if(modifyFilter.modify_filter_type==NO_FILTER
|| modifyFilter.modify_filter_type==NO_ORDER_KEY)
{
std::string file_traceselection_id = "";
if(modifyFilter.modify_filter_type==NO_FILTER)
file_traceselection_id = modifyFilter.param_item_id + "." + traceselection_id;
else
file_traceselection_id = modifyFilter.param_item_id;
//获取“files[*].traceselection”参数项
CCompositeParameterItem* parentItem = CastCCompositeParameterItem(GetParameterItemByRoot(param,file_traceselection_id));
//Add FilterRow
CCompositeParameterItem filterRow;
filterRow.SetId("");//IDID"[index]"
filterRow.SetType(pai::module::CUSTOM);//CUSTOM
//Add FilterRow Cell
//Key Item
CParameterItem keyItem;
keyItem.SetId(MODULE_METADATA_IOMODULE_KEY);
keyItem.SetType(pai::module::STRING);
keyItem.SetStringValue(modifyFilter.key);
filterRow.AddParameterItem(keyItem);//Name
//Start Item
CParameterItem startItem;
startItem.SetId(MODULE_METADATA_IOMODULE_START);
startItem.SetType(pai::module::STRING);
startItem.SetStringValue(modifyFilter.start);
filterRow.AddParameterItem(startItem);//Name
//End Item
CParameterItem endItem;
endItem.SetId(MODULE_METADATA_IOMODULE_END);
endItem.SetType(pai::module::STRING);
endItem.SetStringValue(modifyFilter.end);
filterRow.AddParameterItem(endItem);//Name
//Increment Item
CParameterItem incrementItem;
incrementItem.SetId(MODULE_METADATA_IOMODULE_INCREMENT);
incrementItem.SetType(pai::module::INT);
incrementItem.SetStringValue("1");//default=1
filterRow.AddParameterItem(incrementItem);//Name
//保证切片的第一关键字插入到第一行
parentItem->InsertParameterItem(0,filterRow);
}
else if(modifyFilter.modify_filter_type==HAS_ORDER__KEY)//直接切分用户设置的道集范围
{
//获取“files[*].traceselection[*]”参数项
CCompositeParameterItem* parentItem = CastCCompositeParameterItem(GetParameterItemByRoot(param,modifyFilter.param_item_id));
//Start Item
CParameterItem* startItem = GetParameterItemByParent(parentItem,MODULE_METADATA_IOMODULE_START);
startItem->SetStringValue(modifyFilter.start);
//End Item
CParameterItem* endItem = GetParameterItemByParent(parentItem,MODULE_METADATA_IOMODULE_END);
endItem->SetStringValue(modifyFilter.end);
}
};
/**
* @brief paramItem(),
* @param paramItem
* @param param_item_id id
* @return ,exception
*/
CParameterItem* GetParameterItemByParent(const CCompositeParameterItem* parentItem,const std::string& param_item_id)
{
CParameterItem* paramItem = parentItem->GetParameterItem(param_item_id);
if(paramItem==NULL)
throw pai::error::logic_error("ModuleContainer::GetParameterItemByParent: workflow runtime don't find paramitem id=" + param_item_id);
return paramItem;
};
/**
* @brief param_item_id获取参数项
* @param param
* @param param_item_id id
* @return ,exception
*/
CParameterItem* GetParameterItemByRoot(CModuleParameter* param,const std::string& param_item_id)
{
CParameterItem* paramItem = param->GetParameterItem(param_item_id);
if(paramItem==NULL)
throw pai::error::logic_error("ModuleContainer::GetParameterItemByRoot: workflow runtime don't find paramitem id=" + param_item_id);
return paramItem;
};
/**
* @brief CParameterItem* paramItem为CCompositeParameterItem
* @param paramItem
* @return ,exception
*/
CCompositeParameterItem* CastCCompositeParameterItem(CParameterItem* paramItem)
{
CCompositeParameterItem* compositeParamItem = dynamic_cast<CCompositeParameterItem*>(paramItem);
if(compositeParamItem==NULL)
throw pai::error::logic_error("ModuleContainer::CastCCompositeParameterItem: workflow runtime CastCCompositeParameterItem exception");
return compositeParamItem;
};
/**
*
*/
void TravBackwardConnect(string& id, string& workflowBranch, map<string, vector<string>* >& dest2Source, vector<string>& branchs)
{
workflowBranch += id+"-";
if(dest2Source.count(id) > 0)
{
vector<std::string>* backV = dest2Source.find(id)->second;
for(vector<std::string>::iterator it = backV->begin(); it < backV->end(); it++)
{
TravBackwardConnect((*it), workflowBranch, dest2Source, branchs);
workflowBranch = workflowBranch.substr(0, workflowBranch.size()-string((*it)+"-").size());
}
}
else
{
branchs.push_back(workflowBranch.substr(0, workflowBranch.size()-1));
}
}
void SortModuleThreads(){
sort(module_threads.begin(), module_threads.end(), CompareModuleThread);
#ifdef DEBUG
std::vector<CThread*> threads = this->GetAllModuleThreads();
std::cout << "--------------Sorted modules--------------" << std::endl;
for (size_t i = 0; i < threads.size(); i++)
{
std::cout << threads[i]->GetModule()->GetModuleContext().module_runtime_name
<< ":sortId=" << threads[i]->GetModule()->GetModuleContext().module_sort_id
<< " indegree=" << threads[i]->GetModule()->GetModuleContext().module_indegree
<< " outdegree=" << threads[i]->GetModule()->GetModuleContext().module_outdegree << std::endl;
}
#endif
};
public:
/**
* run all module's RunBeforeJob()
*/
void BeforeJob()
{
std::vector<CThread*> threads = this->GetAllModuleThreads();
for (size_t i = 0; i < threads.size(); i++)
{
pai::log::Info("......begin " + threads[i]->GetModule()->GetMetaData()->GetName() + " BeforeJob");
threads[i]->GetModule()->RunBeforeJob();
pai::log::Info("......end " + threads[i]->GetModule()->GetMetaData()->GetName() + " BeforeJob");
}
}
/**
* run all module's RunAfterJob()
*/
void AfterJob()
{
std::vector<CThread*> threads = this->GetAllModuleThreads();
for (size_t i = 0; i < threads.size(); i++)
{
pai::log::Info("......begin " + threads[i]->GetModule()->GetMetaData()->GetName() + " AfterJob");
threads[i]->GetModule()->RunAfterJob();
pai::log::Info( "......end " + threads[i]->GetModule()->GetMetaData()->GetName() + " AfterJob");
}
}
/**
* run all module's RunAfterJobAbort(is Failed)
*/
void AbortJob(const bool isFailed)
{
std::vector<CThread*> threads = this->GetAllModuleThreads();
for (size_t i = 0; i < threads.size(); i++)
{
pai::log::Info("......begin " + threads[i]->GetModule()->GetMetaData()->GetName() + " AbortJob");
threads[i]->GetModule()->RunAfterJobAbort(isFailed);
pai::log::Info( "......end " + threads[i]->GetModule()->GetMetaData()->GetName() + " AbortJob");
}
}
/**
* run all module's RunAfterTaskAbort
*/
void AbortTask()
{
std::vector<CThread*> threads = this->GetAllModuleThreads();
for (size_t i = 0; i < threads.size(); i++)
{
pai::log::Info("......begin " + threads[i]->GetModule()->GetMetaData()->GetName() + " AbortJob");
threads[i]->GetModule()->RunAfterTaskAbort();
pai::log::Info("......end " + threads[i]->GetModule()->GetMetaData()->GetName() + " AbortJob");
}
}
};
}
}
#endif