logplus/Workflow/WFEngine/WorkflowEngine/include/WorkflowRunner.h

470 lines
14 KiB
C
Raw Normal View History

2026-01-16 17:18:41 +08:00
/*
* @file WorkflowRunner.h
* @brief WorkflowRunner中使用Logger(Log4cxx),Logger类型C++
* Created on: 2011-6-17
* Author: John.Huang
*/
#ifndef PAI_FRAME_WORKFLOWENGINE_WORKFLOWRUNNER_H
#define PAI_FRAME_WORKFLOWENGINE_WORKFLOWRUNNER_H
#include "Module.h"
#include "MutiParallModule.h"
#include "ModuleSelfSplitSupport.h"
#include "Buffer.h"
#include "pb_gen/splitInfo.pb.h"
#include <set>
#include <vector>
#include <string>
#include <iostream>
#include <map>
using namespace pai::module;
typedef struct workflow_modify_input_filter workflow_modify_input_filter;
typedef struct workflow_del_input_filter workflow_del_input_filter;
typedef struct workflow_input_info workflow_input_info;
typedef struct workflow_output_filter workflow_output_filter;
typedef struct workflow_info workflow_info;
/**
* Filter类型
*/
enum ModifyFilterType
{
//没有过滤参数集合
NO_FILTER,
//有过滤参数集合,但是没有排序关键字
NO_ORDER_KEY,
//有过滤参数集合,有排序关键字
HAS_ORDER__KEY
};
struct workflow_modify_input_filter
{
/*
* ModifyFilter类型
* 0:filteritems
* node(id="files[*].traceselection"),;Add新的filteritem
* 1:filteritem
* node(id="files[*].traceselection")filteritem子节点
* 2:filteritem
* param_item_id(id=files[*].traceselection[*])CUSTOM节点的子节点(start,end)
*/
ModifyFilterType modify_filter_type;
std::string param_item_id;
std::string key;
std::string start;
std::string end;
//increment default;
//exclude default
//And/Or default
workflow_modify_input_filter(): modify_filter_type(), param_item_id(), key(), start(), end(){}
void Print(int index = 0)
{
std::cout << "\t\tworkflow_modify_input_filter(" << index << "):" << std::endl;
std::cout << "\t\t{" << std::endl;
std::cout << "\t\t\tmodify_filter_type=" << static_cast<int>(modify_filter_type) << std::endl;
std::cout << "\t\t\tparam_item_id=" << param_item_id << std::endl;
std::cout << "\t\t\tkey=" << key << std::endl;
std::cout << "\t\t\tstart=" << start << std::endl;
std::cout << "\t\t\tend=" << end << std::endl;
std::cout << "\t\t}" << std::endl;
};
bool equal(workflow_modify_input_filter& other)
{
if(param_item_id.compare(other.param_item_id) != 0)
return false;
if(key.compare(other.key) != 0)
return false;
if(start.compare(other.start) != 0)
return false;
if(end.compare(other.end) != 0)
return false;
if(modify_filter_type != other.modify_filter_type)
return false;
return true;
}
};
struct workflow_del_input_filter
{
std::string param_item_id;
workflow_del_input_filter():param_item_id(){}
void Print(int index = 0)
{
std::cout << "\t\tworkflow_del_input_filter(" << index << "):" << std::endl;
std::cout << "\t\t{" << std::endl;
std::cout << "\t\t\tparam_item_id=" << param_item_id << std::endl;
std::cout << "\t\t}" << std::endl;
};
bool equal(workflow_del_input_filter& other)
{
if(param_item_id.compare(other.param_item_id)!=0)
return false;
return true;
};
};
struct workflow_input_info
{
//inputmodule runtime_id
std::string module_context_id;
//inputmodule 需要修改的filter参数项目
std::vector<workflow_modify_input_filter> modify_filters;
//inputmodule 需要删除的filter参数项目
std::vector<workflow_del_input_filter> del_filters;
workflow_input_info():module_context_id(), modify_filters(), del_filters(){}
void Print(int index = 0)
{
std::cout << "\tworkflow_input_info(" << index << "):" << std::endl;
std::cout << "\t{" << std::endl;
std::cout << "\t\tmodule_context_id=" << module_context_id << std::endl;
std::cout << "\t\tmodify_filters:" << std::endl;
std::cout << "\t\t[" << std::endl;
size_t modify_size = modify_filters.size();
for(size_t i = 0;i < modify_size;i++)
{
workflow_modify_input_filter modify = modify_filters.at(i);
modify.Print(static_cast<int>(i));
}
std::cout << "\t\t]" << std::endl;
std::cout << "\t\tdel_filters:" << std::endl;
std::cout << "\t\t[" << std::endl;
size_t del_size = del_filters.size();
for(size_t i = 0;i < del_size;i++)
{
workflow_del_input_filter del = del_filters.at(i);
del.Print(static_cast<int>(i));
}
std::cout << "\t\t]" << std::endl;
std::cout << "\t}" << std::endl;
};
bool equal(workflow_input_info& other)
{
if(module_context_id.compare(other.module_context_id)!=0)
return false;
if(modify_filters.size() != other.modify_filters.size())
return false;
else if(modify_filters.size() > 0)
for(size_t i = 0;i < modify_filters.size();i++)
{
workflow_modify_input_filter modify = modify_filters.at(i);
if(!modify.equal(other.modify_filters.at(i)))
return false;
}
if(del_filters.size() != other.del_filters.size())
return false;
else if(del_filters.size() > 0)
for(size_t i = 0;i < del_filters.size();i++)
{
workflow_del_input_filter del = del_filters.at(i);
if(!del.equal(other.del_filters.at(i)))
return false;
}
return true;
}
};
struct workflow_output_filter
{
std::string param_item_id;
std::string filenames;
workflow_output_filter():param_item_id(), filenames(){}
void Print(int index = 0)
{
std::cout << "\t\tworkflow_output_filter(" << index << "):" << std::endl;
std::cout << "\t\t{" << std::endl;
std::cout << "\t\t\tparam_item_id=" << param_item_id << std::endl;
std::cout << "\t\t\tfilenames=" << filenames << std::endl;
std::cout << "\t\t}" << std::endl;
};
bool equal(workflow_output_filter& other)
{
if(param_item_id.compare(other.param_item_id)!=0)
return false;
if(filenames.compare(other.filenames)!=0)
return false;
return true;
}
};
struct workflow_output_info
{
//outputmodule runtime_id
std::string module_context_id;
//outputmodule 需要重新设置过滤参数项
std::vector<workflow_output_filter> filters;
workflow_output_info():module_context_id(), filters(){}
void Print(int index = 0)
{
std::cout << "\tworkflow_output_info(" << index << "):" << std::endl;
std::cout << "\t{" << std::endl;
std::cout << "\t\tmodule_context_id=" << module_context_id << std::endl;
std::cout << "\t\tfilters:" << std::endl;
std::cout << "\t\t[" << std::endl;
size_t size = filters.size();
for(size_t i = 0;i < size;i++)
{
workflow_output_filter filter = filters.at(i);
filter.Print(static_cast<int>(i));
}
std::cout << "\t\t]" << std::endl;
std::cout << "\t}" << std::endl;
};
bool equal(workflow_output_info& other)
{
if(module_context_id.compare(other.module_context_id) != 0)
return false;
if(filters.size() != other.filters.size())
return false;
else if(filters.size() > 0)
for(size_t i=0; i<filters.size(); i++)
{
workflow_output_filter filter = filters.at(i);
if(!filter.equal(other.filters.at(i)))
return false;
}
return true;
}
};
struct workflow_info {
std::string id;
TaskInfo taskInfo;
//inputmodule 因为切片重新设置的参数项信息
std::vector<workflow_input_info> inputs;
//outputmodule 因为切片重新设置的参数项信息
std::vector<workflow_output_info> outputs;
//用于统计到回调函数
Callback cb_update_module_progress;
workflow_info():id(), taskInfo(), inputs(), outputs(), cb_update_module_progress(){}
void Print()
{
std::cout << "workflow_info:" << std::endl;
std::cout << "{" << std::endl;
std::cout << "\tid=" << id << std::endl;
std::cout << "\tjob_id=" << taskInfo.job_id << std::endl;
std::cout << "\tattempt_id=" << taskInfo.attempt_id << std::endl;
std::cout << "\thostname=" << taskInfo.hostname << std::endl;
std::cout << "\tinputs:" << std::endl;
std::cout << "\t[" << std::endl;
size_t inputs_size = inputs.size();
for( size_t i = 0;i < inputs_size;i++)
{
workflow_input_info input = inputs.at(i);
input.Print(static_cast<int>(i));
}
std::cout << "\t]" << std::endl;
std::cout << "\toutputs:" << std::endl;
std::cout << "\t[" << std::endl;
size_t outputs_size = outputs.size();
for( size_t i = 0;i < outputs_size;i++)
{
workflow_output_info output = outputs.at(i);
output.Print(static_cast<int>(i));
}
std::cout << "\t]" << std::endl;
std::cout << "}" << std::endl;
};
};
struct ReduceInfo
{
TaskInfo taskInfo;
//outputmodule 因为切片重新设置的参数项信息
std::vector<workflow_output_info> outputs;
//prevision hadoop jobid
std::string preJobId;
std::map<std::string,std::string> parameters;
//用于统计到回调函数
Callback cb_update_module_progress;
ReduceInfo():taskInfo(), outputs(), preJobId(), parameters(), cb_update_module_progress()
{}
void Print()
{
std::cout << "ReduceInfo:" << std::endl;
std::cout << "{" << std::endl;
std::cout << "\tjob_id=" << taskInfo.job_id << std::endl;
std::cout << "\tattempt_id=" << taskInfo.attempt_id << std::endl;
std::cout << "\thostname=" << taskInfo.hostname << std::endl;
std::cout << "\tpreJobId=" << preJobId << std::endl;
std::cout << "\tparameters size=" << parameters.size() << std::endl;
for(std::map<std::string,std::string>::iterator iter = parameters.begin();iter!=parameters.end();++iter){
std::cout << "\toparam[" << iter->first <<"]=" << iter->second << std::endl;
}
std::cout << "\toutputs size=" << outputs.size() << std::endl;
std::cout << "\t{" << std::endl;
size_t outputs_size = outputs.size();
for( size_t i = 0;i < outputs_size;i++)
{
workflow_output_info output = outputs.at(i);
output.Print(static_cast<int>(i));
}
std::cout << "\t}" << std::endl;
std::cout << "}" << std::endl;
};
};
struct JobInfo
{
//outputmodule 因为切片重新设置的参数项信息
std::vector<workflow_output_info> outputs;
bool isReduceWorkflow;
TaskInfo taskInfo;
JobInfo():outputs(), isReduceWorkflow(), taskInfo(){}
void Print()
{
std::cout << "JobInfo:" << std::endl;
std::cout << "{" << std::endl;
std::cout << "\tisReduceWorkflow=" << isReduceWorkflow << std::endl;
std::cout << "\tjob_id=" << taskInfo.job_id << std::endl;
std::cout << "\tattempt_id=" << taskInfo.attempt_id << std::endl;
std::cout << "\thostname=" << taskInfo.hostname << std::endl;
std::cout << "\toutputs size=" << outputs.size() << std::endl;
std::cout << "\t{" << std::endl;
for(size_t i=0; i<outputs.size(); i++)
{
outputs.at(i).Print(static_cast<int>(i));
}
std::cout << "\t}" << std::endl;
std::cout << "}" << std::endl;
};
bool equal(JobInfo& other)
{
if(isReduceWorkflow != other.isReduceWorkflow)
return false;
if(taskInfo.job_id.compare(other.taskInfo.job_id) != 0)
return false;
if(taskInfo.attempt_id.compare(other.taskInfo.attempt_id) != 0)
return false;
if(taskInfo.hostname.compare(other.taskInfo.hostname) != 0)
return false;
//compare outputs
if(outputs.size() != other.outputs.size())
return false;
for(size_t i=0; i<outputs.size(); i++)
{
if(!outputs.at(i).equal(other.outputs.at(i)))
{
return false;
}
}
return true;
}
};
/**
*
*/
void BeforeJob(const JobInfo& workflow);
/**
*
*/
void AfterJob(const JobInfo& workflow);
/**
* Reduce
*/
void RunReduce(const ReduceInfo& reduce);
/**
* abortJob
*/
void AbortJob(const JobInfo& job,const bool isFailed);
/**
* abortTask
*/
void AbortTask(const JobInfo& job);
//****************************************************************
// below refactor part of workflowrunner
//****************************************************************
namespace pai {
namespace workflow {
/**
* @brief workflow执行器
* Job提供执行服务
*/
class WorkflowRunner
{
public:
WorkflowRunner():m_workflow(),m_reduce(),m_config(),m_split(NULL),firstStartedModules(),split_msg_(NULL){
};
void SetWorkflowInfo(const workflow_info& workflow){
this->m_workflow = workflow;
};
workflow_info GetWorkflowInfo() const{
return this->m_workflow;
};
Split* GetModuleSelfSplit() const{
return this->m_split;
}
void SetModuleSelfSplit(Split* split){
m_split = split;
};
void SetReduceInfo(const ReduceInfo& reduce){
this->m_reduce = reduce;
};
ReduceInfo GetReduceInfo() const{
return this->m_reduce;
}
void SetConfiguration(const std::map<std::string,std::string>& config){
this->m_config = config;
};
inline void SetSplitMessage(google::protobuf::Message* s){
split_msg_=s;
}
/**
* Map工作流
*/
void RunWorkflow();
/**
* Reduce工作流
*/
void RunReduce();
void BeforeJob(const JobInfo& job);
void AfterJob(const JobInfo& job);
/**
* abortJob:
* @param[job] the job's info
* @param[isFailed] if the job is failed or killed
*/
void AbortJob(const JobInfo& job,const bool isFailed);
void AbortTask(const JobInfo& job);
private:
bool IsFirstStartModule(CModule* module);
bool IsOutputModule(CModule* module){
if(module->GetClassName() == "COutputModule")
return true;
else
return false;
};
bool IsUseModuleSelfSplit(){
if(m_split!=NULL)
return true;
else
return false;
};
WorkflowRunner(const WorkflowRunner & runner);
WorkflowRunner & operator=(const WorkflowRunner & runner);
private:
workflow_info m_workflow;
ReduceInfo m_reduce;
std::map<std::string,std::string> m_config;
Split* m_split;
std::set<std::string> firstStartedModules;
//it's created by split factory,so need destry it.
google::protobuf::Message* split_msg_;
};
}
}
#endif