logplus/Workflow/WFEngine/WorkflowEngine/include/WorkFlowDecompose.h

315 lines
10 KiB
C
Raw Normal View History

2026-01-16 17:18:41 +08:00
/*
* WorkFlowDecompose.h
*
* Created on: Feb 27, 2015
* Author: dev
*/
#ifndef PAI_FRAME_WORKFLOWENGINE_WORKFLOWDECOMPOSE_H
#define PAI_FRAME_WORKFLOWENGINE_WORKFLOWDECOMPOSE_H
#include "Utils.h"
#include "WorkFlowFileWrapper.h"
#include "WorkFlowFile.h"
#include "PISortWorkFlowFile.h"
#include "StatApplyWorkFlowFile.h"
#include "OffsetWorkFlowFile.h"
#include "TempDataFormat.h"
#include <string>
#include <vector>
#include <set>
#include <map>
class ConcatenateDecomposeTest;
class SpecialModuleDecomposeTest;
class SortServiceDecomposeTest;
using namespace pai::utils;
namespace pai{
namespace workflow{
struct InputOutputModulePair
{
CModuleInformation* input;
CModuleInformation* output;
};
class CWorkFlowDecompose
{
public:
CWorkFlowDecompose(const int &startModuleId);
virtual ~CWorkFlowDecompose();
/**
* @brief
* @param[in] workflowFileWrapper CWorkFlowFileWrapper
*/
virtual void DecomposeWorkFlowFile(CWorkFlowFileWrapper* workflowFileWrapper);
/**
* @brief stepid
* @return id
*/
int GetNewStepModuleID();
/**
* @brief
* @return
*/
int GetWorkflowFilePriority();
/**
* @brief
*/
void ClearWorkflowFilePriority();
/**
* @brief
* @param[in] port2UpStreamModuleIDs
* @param[in] conflictPortsList
* @param[in/out] workflowList
*/
void GetUpStreamWorkflowList(std::map<int, std::set<int> >& port2UpStreamModuleIDs,
std::vector<std::set<int> >& conflictPortsList, std::vector<std::set<int> >& workflowList);
/**
* @brief key2UpStreamModuleIDs value中是否有交集key
* @param[in] key2UpStreamModuleIDs key为模块端口或moduleidvalue为流入这个端口或模块的上游模块列表
* @param[in/out] conflictKeyList key的集合
*/
void GetConflictList(std::map<int, std::set<int> >& key2UpStreamModuleIDs,
std::vector<std::set<int> >& conflictKeyList);
/**
* @brief id最大值
* @param[in] workflowFile workflowFile对象
* @return id最大值
*/
static int GetMaxStepIdInWorkFlowFile(CWorkFlowFile* workflowFile);
/**
* @brief workflowFile中指定moduleID的moduleinformation
* @param[in] workflowFile
* @param[in] moduleID
*/
static CModuleInformation* CloneModuleInformationByModuleID(CWorkFlowFile* workflowFile, const int moduleID);
/**
* @brief workflowFile中指定moduleID的moduleinformation
* @param[in] workflowFile
* @param[in] moduleID
*/
static bool RemoveModuleInformationByModuleID(CWorkFlowFile* workflowFile, const int moduleID);
/**
* @brief
* @param[in] workflow
* @return
*/
static CWorkFlowFile* CloneWorkFlowFile(CWorkFlowFile *workflow);
/**
* @brief CWorkFlowFile对象
* @param[in] workflowFile
* @return
*/
static CWorkFlowFile* GetNewWorkFlowFile(CWorkFlowFile* workflowFile);
/**
* @brief offset的workflowfile对象
* @param[in] workflowFile
* @return
*/
static COffsetWorkFlowFile* GetNewOffsetWorkFlowFile(CWorkFlowFile* workflowFile);
/**
* @brief sort的workflowfile对象
* @param[in] workflowFile
* @return
*/
static CPISortWorkFlowFile* GetNewSortWorkFlowFile(CWorkFlowFile* workflowFile, PISortType type);
/**
* @brief sort的workflowfile对象
* @param[in] workflowFile
* @return
*/
static CStatApplyWorkFlowFile* GetNewStatisApplyWorkFlowFile(CWorkFlowFile* workflowFile);
/**
* @brief source的CModuleConnection
* @param[in] source CModuleConnection信息
*/
static CModuleConnection* CloneModuleConnection(CModuleConnection* source);
/**
* @brief connectionupStreamConnList
* @param[in] connections
* @param[in] moduleID id
* @param[in] ports
* @param[in/out] upStreamConnList
*/
void GetUpStreamConnectionByModuleIDAndPorts(std::vector<CModuleConnection*>* connections, const int moduleID,
std::set<int>& ports, std::vector<CModuleConnection*>& upStreamConnList);
/**
* @brief input和output模块
* @param[in/out] pair input和output的结构提
* @return
*/
std::string GetInputOutputModulePair(InputOutputModulePair& pair);
/**
* @brief conn的输出信息
* @param[in] conn
* @param[in] destModuleID id
* @param[in] inputPort
*/
void EditConnectionOuputInfo(CModuleConnection* conn, int destModuleID, int inputPort);
/**
* @brief conn的输入信息
* @param[in] conn
* @param[in] sourceModuleID id
* @param[in] inputPort
*/
void EditConnectionInputInfo(CModuleConnection* conn, int sourceModuleID, int outputPort);
/**
* @brief CModuleConnection对象
* @param[in] destModuleID
* @param[in] inputPort
* @param[in] sourceModuleID
* @param[in] outputPort
* @return
*/
CModuleConnection* GetNewConnection(int destModuleID, int inputPort,
int sourceModuleID, int outputPort);
/**
* @brief set集合是否有交集
* @param[in] set1 moduleid
* @param[in] set2 moduleid
* @return set有相同元素truefalse
*/
static bool CheckSetIntersection(const std::set<int>& set1,
const std::set<int>& set2);
/**
* @brief workflowfile到指定的优先级上,priority2WkfWrapper
* @param[in] priority
* @param[in] wkfWrapper CWorkFlowFileWrapper对象
*/
void InsertPriority2WorkflowFileWrapper(const int priority, CWorkFlowFileWrapper* wkfWrapper);
/**
* @brief priority2WkfWrapper的内容
*/
void CleanPriority2WorkflowFileWrapper();
/**
* @brief outputID2InputID的内容
*/
void CleanOutputID2InputID();
/**
* @brief priority2WkfWrapper
*/
std::map<int, std::vector<CWorkFlowFileWrapper*> >& GetPriority2WorkflowFileWrapper();
/**
* @brief assignModuleList中的模块在workflowfile中深度depth2ModuleID
* @param[in] assignModuleList
* @param[in] connections workflowfile对象的连接信息
* @param[in/out] depth2ModuleID key为深度value为当前深度上的模块id
*/
void GetModuleDepthByModuleIDs(std::vector<CModuleInformation*>& assignModuleList,
std::vector<CModuleConnection*>* connections, std::map<int, std::vector<int> >& depth2ModuleID);
/**
* @brief
* @return
*/
static std::string GetPreOutputPath();
/**
* @brief 使preOutputPath做前缀
* @return string
*/
std::string GetNewPath();
/**
* @brief concatenate模块的所有端口的上游模块port2UpStreamModuleIDs中
* @param[in] connections workflowFile的连线信息
* @param[in] moduleID concatenate模块的id
* @param[in/out] port2UpStreamModuleIDs key为concatenate模块的输入端口value为上游模块集合
*/
void GetInputPortUpStreamModuleIDs(std::vector<CModuleConnection*>* connections, int moduleID,
std::map<int, std::set<int> >& port2UpStreamModuleIDs);
/**
* @brief source中复制值到target
* @param[in] target
* @param[in] source
*/
void CloneSet(std::set<int>& target, const std::set<int>& source);
/**
*@brief
* @param[in] outputID id
* @param[in] inputID id
*/
void InsertOutputID2InputID(const int outputID, const int inputID)
{
outputID2InputID.insert(make_pair(outputID, inputID));
}
map<int, int> GetOutputID2InputID()
{
return outputID2InputID;
}
protected:
/**
* @brief
* @return
*/
CModuleInformation* CreateOutputModule();
/**
* @brief
* @return
*/
CModuleInformation* CreateInputModule();
/**
* @brief PISort模块
* @return PISort模块
*/
CModuleInformation* CreatePiSortModule();
/**
* @brief map转换成拆分切面
* @return
*/
vector<DecomposeAspect *> ConvertIOPair2DecomposeAspects(map<int, int> &ioPairs, bool insertMark=false);
private:
/**
* @brief connectionupStreamConnList
* @param[in] connections
* @param[in] moduleID
* @param[in/out] upStreamConnList
*/
void InternalGetUpStreamConnectionByModuleIDAndPorts(std::vector<CModuleConnection*>* connections,
const int moduleID, std::vector<CModuleConnection*>& upStreamConnList);
/**
* @brief
* @param[in] connections workflowFile的连线信息
* @param[in] moduleID concatenate模块的一个输入端口到跟节点之间的一个moduleid
* @param[in/out] upStreamModuleIDs
*/
void InternalGetInputPortUpStreamModuleIDs(std::vector<CModuleConnection*>* connections,
const int moduleID, std::set<int>& upStreamModuleIDs);
/**
* @brief source的基本信息复制到dest中
* @param[in] dest
* @param[in] source
*/
static void SetWorkFlowInfo(CWorkFlowFile* dest, CWorkFlowFile* source);
private:
static long long currTime;
static std::string preOutputPath;
int workflowFilePriority;//工作流优先级
int nextModuleId;
std::map<int, int> outputID2InputID;//存储新创建的output和input的id的映射
std::map<int, std::vector<CWorkFlowFileWrapper*> > priority2WkfWrapper;//存储拆分后的工作流
// std::vector<CWorkflowDecompose* > workflowDecomposes;//
// CWorkFlowFile* tempWorkflowFile;//用于拆分的工作流对象
// friend class ::WorkFlowDecomposeTest;//友元类,单元测试使用
friend class ::ConcatenateDecomposeTest;
friend class ::SpecialModuleDecomposeTest;
friend class ::SortServiceDecomposeTest;
};
}
}
#endif /* WORKFLOWDECOMPOSE_H_ */