logplus/Workflow/WFEngine/Module/include/Module.h

442 lines
15 KiB
C
Raw Normal View History

2026-01-16 17:18:41 +08:00
/**
* @file Module.h
* @brief
* @author dev
* @date 2011-4-28
*/
#ifndef PAI_FRAME_MODULEAPI_MODULE_H
#define PAI_FRAME_MODULEAPI_MODULE_H
#include "Turtle.h"
// #include "Log.h"
#include "Buffer.h"
#include "ModuleCheckResult.h"
#include "ModuleFactory.h"
#include "ModuleMetaData.h"
#include "ModuleParameter.h"
// #include "WorkflowContext.h"
// #include "DataSetInfo.h"
#include <string>
#include <vector>
#include <assert.h>
namespace pai
{
namespace module
{
// using namespace pai::ios::seis;
/**
* @brief
*/
enum STATUS
{
WAIT_INPUT, /**< 等待buffer中的有可读的数据 */
OVER /**< 模块运行完毕 */
};
/**
* @brief
*/
enum Phase{
Map, /**< 模块在hadoop的map阶段运行 */
Reduce /**< 模块在hadoop的reduce阶段运行 */
};
const std::string CONST_PROP_WORKFLOW_CLUSTER_TMP = "WORKFLOW_CLUSTER_TMP"; /**< 集群临时目录常量 */
const std::string CONST_PROP_ATTEMPTASK_DIR = "mapred.local.dir"; /**< hadoop Task的本地临时目录常量 */
const std::string CONST_FILEPATH_SEPARATOR = "/"; /**< 文件分隔符常量 */
/**
* @class CModule
* @brief
* <br>
*
* <li>1. <code>CModule</code></li>
* <li>2. Run()validate()</li>
* <li>3. </li>
* <li>4. </li>
*
*
* <p>
* UT中创建Module的时候使ModuleManager中的工厂方法(CreateModuleByClassName)
*
*/
class PAI_MODULE_EXPORT CModule
{
private:
CModule& operator=(const CModule& moudle);
public:
/**
* @brief
*/
CModule();
/**
* @brief
*/
CModule(const CModule& srcModule);
//welllog 特殊模块使用
virtual CModule* Clone(){return NULL;};
/**
* @brief
*/
virtual ~CModule();
/**
* @brief buffer
* @param[in] port 01
* @return <code>CBuffer</code>
*/
virtual CBuffer* GetInputBuffer(int port)
{
return inputBuffers[port];
}
/**
* @brief
* @param[in] port 01
* @return <code>CBuffer</code>
*/
virtual CBuffer* GetOutputBuffer(int port)
{
return outputBuffers[port];
}
/**
* @brief buffer
* @param[in] port
* @param[in] buffer <code>CBuffer</code>
*/
// void SetInputBuffer(int port, CBuffer* buffer);
/**
* @brief
* @param[in] port
* @param[in] buffer <code>CBuffer</code>
*/
void SetOutputBuffer(int port, CBuffer* buffer);
// /**
// * 设置模块的上下文信息
// * @param context 模块的上下文信息
// */
// void SetModuleContext(SModuleContext& context);
//
// WorkflowContext& SetWorkflowContext() const;
/**
* @brief map阶段执行
* <p>
* <li>1. </li>
* <li>2. WAIT_INPUT</li>
* <li>3. WAIT_INPUT</li>
* <li>4. OVER</li>
* <li>5. </li>
* @return
*/
virtual STATUS Run() = 0;
/**
* @brief inputmodule/outputmodule来overrideoverride
*/
virtual void RunBeforeJob()
{
}
/**
* @brief inputmodule/outputmodule来overrideoverride
*/
virtual void RunAfterJob()
{
}
/**
* @brief killed之后执行的方法
* @param[isFailed] true为失败false为被杀死
*/
virtual void RunAfterJobAbort(bool /*isFailed*/)
{
}
/**
* @brief task失败或者killed之后执行的方法killed的task都會执行一次
*/
virtual void RunAfterTaskAbort()
{
}
/**
* @brief , 使
*/
virtual void RunBeforSubmit()
{
}
/**
* @brief
* @param[in] _parammmmeter <code>CModuleParameter</code>
*/
virtual void SetParameters(CModuleParameter* _parameter)
{
parameter = _parameter;
}
/**
* @brief
* @return <code>CModuleParameter</code>
*/
virtual CModuleParameter* GetModuleParameter()
{
return parameter;
}
/**
* @brief
* <p>
* @param[in] moduleCheckResult
* @return
*/
virtual bool validate(CModuleCheckResult& moduleCheckResult) = 0;
/**
* @brief
* <p>
*/
virtual void Reduce()
{
}
/**
* @brief ,Run和Reduce方法之前自动被调用
*/
virtual void SetUp();
/**
* @brief Run和Reduce方法结束后自动被调用
*/
virtual void CleanUp();
/**
* @brief
*/
virtual void WriteHistory() = 0;
/**
* @brief .
*/
// void SetWorkflowContext(const WorkflowContext& context)
// {
// m_context = context;
// }
/**
* @brief .
* @WARN 使
*/
// WorkflowContext GetWorkflowContext() const
// {
// return m_context;
// }
/**
* @brief
*/
// void SetModuleContext(const SModuleContext& context)
// {
// m_modulecontext = context;
// }
/**
* @brief , 使
*/
// SModuleContext GetModuleContext() const
// {
// return m_modulecontext;
// }
/**
* @brief
* @return <code>CModuleMetaData</code>
*/
CModuleMetaData* GetMetaData();
/**
* @brief
* @return
*/
virtual std::string GetClassName()
{
return std::string("CModule");
}
/**
*@brief
*@param logPriority
*@param logmsg
*/
// void WriteLog(pai::log::Priority logPriority, std::string logmsg);
/**
* @brief
* @return truefalse
*/
// bool HasProjectDataGenerated() const;
/**
* @brief metaData信息的初始化
* ModuleManager创建模块的时候会自动被调用
*/
void Initialize();
/**
* @deprecated
* @brief , 使WorkflowContext中
* @param[in] fileHeader
*/
// void SetFileHeader(Any anything);
/**
* @brief , 使WorkflowContext中
* @return
*/
template<typename T>
T GetFileHeader()
{
return this->GetFileHeader<T>(0);
};
/**
* @deprecated
* @brief 使WorkflowContext中
* @param[in] port
* @return
*/
template<typename T>
T GetFileHeader(const int& port)
{
return this->GetInputMetaData<T>(port,WORKLFOWCONTEXT_KEY_DATASETINFO);
};
/**
* @brief
* @param[in] key
* @param[in] anything
*/
// void SetOutputMetaData(const string& key, Any anything);
/**
* @brief
* @param[in] key
* @return
*/
// template<typename T>
// T GetInputMetaData(const string& key)
// {
// return this->GetInputMetaData<T>(0,key);
// };
/**
* @brief
* @param port[in]
* @param key [in]
* @return
*/
// template<typename T>
// T GetInputMetaData(const int& port,const string& key)
// {
// assert(port < m_nInputPortCount);
// CBuffer* buffer = this->GetInputBuffer(port);
// if(buffer==NULL)
// {
// std::stringstream ss;
// ss << this->m_modulecontext.module_runtime_name
// << " input port:" << port << "doesn't exist!"
// << std::endl;
// throw pai::error::invalid_argument(ss.str());
// }
// return buffer->GetMetaData<T>(key);
// };
/**
* @brief
* @param port[in]
* @param key [in]
* @exception pai::error::invalid_argument
* @return
*/
// bool HasInputMetaData(const int& port,const std::string& key);
/**
* @brief
* @param key [in]
* @return
* @exception pai::error::invalid_argument
*/
// bool HasInputMetaData(const std::string& key);
/**
* @brief HadoopJobId
* @return
*/
std::string GetPreJobId()const{
return this->preJobId;
};
/**
* @brief HadoopJobId
* @param[in] pJobId HadoopJobId
*/
void SetPreJobId(const std::string& pJobId){
this->preJobId = pJobId;
};
/**
* @biref GUI排查内存增长问题
* @return
*/
unsigned long GetMemorySize();
protected:
/**
* @brief hadoop计算的本地目录map-site.xml中通过mapred.local.dir配置
*/
std::string GetLocalAttemptDir();
/**
*@brief /workflow_tmp/jobId
*@param[in] jobid jobId
*@return
*/
std::string GetClusterJobTempDir(const std::string& jobid);
/**
* @brief Task的临时目录路径/workflow_tmp/jobId/attempt_id
* @return
*/
// std::string GetClusterCurrentJobAttemptDir();
/**
* @brief /workflow_tmp/hadoopJobId
* @return :/workflow_tmp
*/
std::string GetClusterPreJobAttemptDir();
/**
* @brief , 使
* @param[in] path
* @return truefalse
*/
// bool CleanClusterJobTempDir(const std::string& path);
protected:
int m_nInputPortCount; /**< 模块输入端口数 */
int m_nOutputPortCount; /**< 模块输出端口数 */
CBuffer** inputBuffers; /**< 模块输入Buffer */
CBuffer** outputBuffers; /**< 模块输出Buffer */
int* m_usedInputPort; /**< 模块已经使用的输入端口 */
int* m_usedOutputPort; /**< 模块已经使用的输出端口 */
int m_usedInputPortCount; /**< 模块已经使用的输入端口数 */
int m_usedOutputPortCount; /**< 模块已经使用的输出端口数 */
CModuleParameter* parameter; /**< 模块参数 */
std::string m_strID; /**< 模块id */
// SModuleContext m_modulecontext; /**< 模块上下文,存储模块的运行时信息 */
CModuleMetaData m_metaData; /**< 模块的元数据信息 */
std::stringstream _pai_log_common_ss; /**< 日志信息,模块开发人员勿使用此变量 */
private:
// WorkflowContext m_context;
// pai::log::LoggerPtr m_logger;
std::string m_workflowClusterTmp;
std::string preJobId;
};
/*
* @def WRITE_MODULE_LOG(PRIORITY,MSG,VALUE)
* @brief
*/
#define WRITE_MODULE_LOG(PRIORITY,MSG,VALUE) _pai_log_common_ss << __FILE__ <<":"<<__LINE__ << ":" << __FUNCTION__ << "\t" << (MSG) << (VALUE) << std::endl;WriteLog((PRIORITY),_pai_log_common_ss.str());_pai_log_common_ss.str("");;
}
}
#endif