123 lines
4.6 KiB
C
123 lines
4.6 KiB
C
|
|
/*
|
|||
|
|
* MultiJobDecompose.h
|
|||
|
|
*
|
|||
|
|
* Created on: Mar 14, 2015
|
|||
|
|
* Author: dev
|
|||
|
|
*/
|
|||
|
|
|
|||
|
|
#ifndef PAI_FRAME_WORKFLOWENGINE_MULTIJOBDECOMPOSE_H
|
|||
|
|
#define PAI_FRAME_WORKFLOWENGINE_MULTIJOBDECOMPOSE_H
|
|||
|
|
|
|||
|
|
#include "WorkFlowFileWrapper.h"
|
|||
|
|
#include "WorkFlowFile.h"
|
|||
|
|
#include "WorkFlowDecompose.h"
|
|||
|
|
|
|||
|
|
#include <vector>
|
|||
|
|
#include <string>
|
|||
|
|
|
|||
|
|
class CMultiJobDecomposeDecomposeTest;
|
|||
|
|
|
|||
|
|
namespace pai{
|
|||
|
|
namespace workflow{
|
|||
|
|
|
|||
|
|
class CMultiJobDecompose:public CWorkFlowDecompose
|
|||
|
|
{
|
|||
|
|
|
|||
|
|
public:
|
|||
|
|
CMultiJobDecompose(const int &startModuleId);
|
|||
|
|
virtual ~CMultiJobDecompose();
|
|||
|
|
/**
|
|||
|
|
* @brief 拆分工作流的入口函数,其他类需要实现此函数进行分类处理
|
|||
|
|
* @param[in] workflowFileWrapper
|
|||
|
|
*/
|
|||
|
|
virtual void DecomposeWorkFlowFile(CWorkFlowFileWrapper* workflowFileWrapper);
|
|||
|
|
private:
|
|||
|
|
/**
|
|||
|
|
* @brief 处理特殊模块
|
|||
|
|
*@param[in] workflowFileWrapper 原始的CWorkFlowFile的封装对象
|
|||
|
|
*/
|
|||
|
|
void InternalDecomposeWorkFlowFile(CWorkFlowFile* workflowFile);
|
|||
|
|
/**
|
|||
|
|
* @brief sourceWorkflowFile中包含静矫正应用类型的模块,
|
|||
|
|
* 从sourceWorkflowFile中moduleID及其上游的模块复制出一个新的工作流,并返回。
|
|||
|
|
*@param[in] sourceWorkflowFile
|
|||
|
|
*@param[in] moduleID 特殊模块的id
|
|||
|
|
*@return 返回新建的workflowfile对象
|
|||
|
|
*/
|
|||
|
|
CStatApplyWorkFlowFile* ProcessStaticApplyJobType(CWorkFlowFile* workflowFile, const int moduleID);
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* @brief sourceWorkflowFile中包含偏移多hadoop作业类型的模块,
|
|||
|
|
* 从sourceWorkflowFile中moduleID及其上游的模块拆分出一个新的工作流,
|
|||
|
|
* 并返回,sourceWorkflowFile中剩下moduleID及其其他模块。
|
|||
|
|
*@param[in] sourceWorkflowFile
|
|||
|
|
*@param[in] moduleID 特殊模块的id
|
|||
|
|
*@param[in] isMultiJob 是否是多作业类型
|
|||
|
|
*@return 返回新建的workflowfile对象
|
|||
|
|
*/
|
|||
|
|
COffsetWorkFlowFile* ProcessOffsetJobType(CWorkFlowFile* workflowFile, const int moduleID, bool isMultiJob);
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* @brief 从workflowFile中获取所有特殊模块
|
|||
|
|
*@param[in] workflowFile
|
|||
|
|
*@return
|
|||
|
|
*/
|
|||
|
|
std::vector<int> GetSpecialModuleIDs(CWorkFlowFile* workflowFile);
|
|||
|
|
/**
|
|||
|
|
* @brief 获取特殊模块的处理方法
|
|||
|
|
*@param[in] workflowFile
|
|||
|
|
*@param[in] moduleID 特殊模块的id
|
|||
|
|
*@return 1 处理统计-应用类型模块的工作流 2 处理偏移多hadoop作业类型模块的工作流 3 处理偏移单hadoop作业类型模块的工作流
|
|||
|
|
*/
|
|||
|
|
virtual int GetSpecialModuleProcessType(CWorkFlowFile* workflowFile, const int moduleID);
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* @brief 获取工作流中的模块id作为key,模块信息作为value的映射;
|
|||
|
|
* @param[in] workflow 工作流对象
|
|||
|
|
* @param[out] moduleMap 模块信息映射
|
|||
|
|
* pair<moduleId, CModuleInformation* >
|
|||
|
|
*/
|
|||
|
|
void GetModulesMap(const CWorkFlowFile *workflow, map<int, CModuleInformation *> &moduleMap);
|
|||
|
|
/**
|
|||
|
|
* @brief 获取工作流中以模块id作为key,该模块作为dest模块的连接作为value的向上的连接信息映射;
|
|||
|
|
* 获取工作流中以模块id作为key,该模块作为source模块的连接作为value的向下的连接信息映射;
|
|||
|
|
* @param[in] workflow 工作流对象
|
|||
|
|
* @param[out] parentMap 工作流中各个模块向上的连接信息映射
|
|||
|
|
* @param[out] childMap 工作流中各个模块向下的连接信息映射
|
|||
|
|
*/
|
|||
|
|
void GetParentAndChildConnMap(const CWorkFlowFile *workflow, map<int, vector<CModuleConnection *> > &parentMap, map<int, vector<CModuleConnection *> > &childMap);
|
|||
|
|
|
|||
|
|
/**
|
|||
|
|
* @brief 将一个工作流中的模块和连接填充到另一个工作流中
|
|||
|
|
* @param[in] src 源工作流
|
|||
|
|
* @param[out] dest 目标工作流
|
|||
|
|
*/
|
|||
|
|
void FillOriModulesAndConns(CWorkFlowFile *src, CWorkFlowFile *dest);
|
|||
|
|
/**
|
|||
|
|
* @brief 向工作流中填充以指定模块为基础的上游所有模块和连接
|
|||
|
|
* @param[in] id 模块id
|
|||
|
|
* @param[in] connMap 工作流中各个模块向上的连接信息映射
|
|||
|
|
* @param[in] moduleMap 工作流模块信息映射
|
|||
|
|
* @param[out] workflow 需要填充的工作流
|
|||
|
|
*/
|
|||
|
|
void FillParentModulesAndConns(const int &id, map<int, vector<CModuleConnection *> > &connMap, map<int, CModuleInformation *> &moduleMap, CWorkFlowFile *workflow);
|
|||
|
|
/**
|
|||
|
|
* @brief 向工作流中填充以指定模块为基础的上游所有模块和连接
|
|||
|
|
* @param[in] id 模块id
|
|||
|
|
* @param[in] connMap 工作流中各个模块向下的连接信息映射
|
|||
|
|
* @param[in] moduleMap 工作流模块信息映射
|
|||
|
|
* @param[out] workflow 需要填充的工作流
|
|||
|
|
*/
|
|||
|
|
void FillChildModulesAndConns(const int &id, map<int, vector<CModuleConnection *> > &connMap, map<int, CModuleInformation *> &moduleMap, CWorkFlowFile *workflow);
|
|||
|
|
private:
|
|||
|
|
std::vector<std::string> spcialModuleNameList;
|
|||
|
|
|
|||
|
|
friend class ::CMultiJobDecomposeDecomposeTest;
|
|||
|
|
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
#endif /* MULTIJOBDECOMPOSE_H_ */
|