logplus/Workflow/WFEngine/Module/src/Module.cpp

315 lines
9.4 KiB
C++
Raw Normal View History

2026-01-16 17:18:41 +08:00
/*
* Module.cpp
*
* Created on: 2011-4-28
* Author: dev
*/
#include "Module.h"
#include "ModuleMetaData.h"
// #include "DataService.h"
// #include "Configure.h"
// #include "ErrorCode.h"
// #include "VFSFactory.h"
#include <algorithm>
#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib>
// #include "Utils.h"
using namespace std;
// using namespace pai::ios;
// using namespace pai::log;
// using namespace pai::utils;
// using namespace pai::ios::vfs;
// using namespace pai::error;
// using namespace pai::conf;
namespace pai
{
namespace module
{
const int DEFAULT_PROT_COUNT = 1;
CModule::CModule() :
m_nInputPortCount(0), m_nOutputPortCount(0), inputBuffers(NULL), outputBuffers(NULL), m_usedInputPort(NULL), m_usedOutputPort(NULL),
m_usedInputPortCount(0), m_usedOutputPortCount(0), parameter(new CModuleParameter()), m_strID(""), /*m_modulecontext(), */m_metaData(),
_pai_log_common_ss(), /*m_context(),*/ /*m_logger(NULL), */m_workflowClusterTmp(""), preJobId("")
{
// CConfigure globalConfigure;
// m_workflowClusterTmp = globalConfigure.GetValueByKey(CONST_PROP_WORKFLOW_CLUSTER_TMP);
}
void CModule::Initialize()
{
//一个模块默认至少有一个IO Buffer数组指针(并没有开Buffer)否则一些UT将失败
//也许有些模块没有INPUT/OUTPUT PORT但这里申请了也不会有影响。
m_nInputPortCount = this->m_metaData.GetHasInput() ?
this->m_metaData.GetInputPortCount() : DEFAULT_PROT_COUNT;
m_nInputPortCount = (m_nInputPortCount < DEFAULT_PROT_COUNT) ? DEFAULT_PROT_COUNT:m_nInputPortCount;
m_nOutputPortCount = this->m_metaData.GetHasOutput() ?
this->m_metaData.GetOutputPortCount(): DEFAULT_PROT_COUNT;
m_nOutputPortCount = (m_nOutputPortCount < DEFAULT_PROT_COUNT) ? DEFAULT_PROT_COUNT:m_nOutputPortCount;
//FIXME: InputMoudle should delete this explicitly in the constructor
inputBuffers = new CBuffer*[m_nInputPortCount];
for (int i = 0; i < m_nInputPortCount; ++i)
{
inputBuffers[i] = NULL;
}
outputBuffers = new CBuffer*[m_nOutputPortCount];
for (int i = 0; i < m_nOutputPortCount; ++i)
{
outputBuffers[i] = NULL;
}
m_usedInputPort = new int[m_nInputPortCount];
m_usedOutputPort = new int[m_nOutputPortCount];
}
CModule::CModule(const CModule& srcModule):
m_nInputPortCount(0), m_nOutputPortCount(0), inputBuffers(NULL), outputBuffers(NULL), m_usedInputPort(NULL), m_usedOutputPort(NULL),
m_usedInputPortCount(0), m_usedOutputPortCount(0), parameter(new CModuleParameter(*srcModule.parameter)), m_strID(srcModule.m_strID), /*m_modulecontext(),*/
m_metaData(srcModule.m_metaData),_pai_log_common_ss(),/* m_context(),*/ /*m_logger(NULL), */m_workflowClusterTmp(""), preJobId("") {}
CModule::~CModule()
{
for (int i = 0; i < m_nOutputPortCount; ++i)
{
if (outputBuffers[i]!=NULL)
{
delete outputBuffers[i];
outputBuffers[i] = NULL;
}
}
if (m_usedInputPort != NULL)
{
delete[] m_usedInputPort;
m_usedInputPort = NULL;
}
if (m_usedOutputPort != NULL)
{
delete[] m_usedOutputPort;
m_usedOutputPort = NULL;
}
if ( parameter != NULL )
{
delete parameter;
parameter = NULL;
}
// // START 为检查工作流崩溃问题,暂时记录相关类析构时的堆栈信息,以便崩溃后跟踪。
// std::string logPath;
// std::ostringstream log;
// log << "workflow_coredump_debug_log_pid" << getpid() << ".log";
// std::string logName = log.str();
// char *pEnv = (char*)pai::utils::CUtils::GetPaiHome().c_str();//getenv("PAI_HOME");
// if(pEnv == NULL)
// {
// logPath = std::string("/home/") + logName;
// }
// else
// {
// logPath = std::string(pEnv) + "/log/" + logName;
// }
/*
std::ofstream fcout(logPath.c_str(), std::ofstream::app);
if(fcout)
{
fcout << "#####start_CModule#####" << std::endl;
std::vector<std::string> stack_trace = GetStackTrace();
for (size_t i = 0; i < stack_trace.size(); ++i)
{
fcout << stack_trace[i] << std::endl;
}
fcout << "#####end_CModule#####" << std::endl;
fcout.close();
}
*/
// END
}
// void CModule::SetInputBuffer(int port, CBuffer* buffer)
// {
// assert(port < m_nInputPortCount);
// inputBuffers[port] = buffer;
// buffer->AddReferenceModule(this);
// //设置进度上下文信息
// SProcessInfo info;
// // info.module_runtime_id = this->GetModuleContext().module_runtime_id;
// info.port = port;
// info.iotype = BufferAsInput;
// buffer->AddProcessContext(this, info);
// m_usedInputPort[m_usedInputPortCount++] = port;
// }
void CModule::SetOutputBuffer(int port, CBuffer* buffer)
{
assert(port < m_nOutputPortCount);
outputBuffers[port] = buffer;
//设置进度上下文信息
SProcessInfo info;
// info.module_runtime_id = this->GetModuleContext().module_runtime_id;
info.port = port;
info.iotype = BufferAsOutput;
buffer->AddProcessContext(this, info);
m_usedOutputPort[m_usedOutputPortCount++] = port;
}
CModuleMetaData* CModule::GetMetaData()
{
return &m_metaData;
}
void CModule::SetUp()
{
}
void CModule::CleanUp()
{
}
// void CModule::WriteLog(pai::log::Priority logPriority, std::string logmsg)
// {
// std::string modulename=GetMetaData()->GetName();
// std::string moduleruntimename = "[Module][" +modulename+"]["+ GetModuleContext().module_runtime_name + "]";
// logmsg = moduleruntimename + logmsg;
// std::cout << logmsg + "\n";
// if (m_logger == NULL && (!GetModuleContext().taskInfo.job_id.empty()))
// {
// try
// {
// long long dbId = GetWorkflowContext().Get<long long>(pai::module::KEY_DATABASE_ID);
// std::stringstream dbIdString;
// dbIdString << dbId;
// std::cout << "KEY_DATABASE_ID: " << dbIdString.str() << '\n';
// m_logger = Logger::GetJobLogger(dbIdString.str(), GetModuleContext().module_runtime_env);
// } catch (exception &e)
// {
// std::cerr << "[" << GetModuleContext().taskInfo.job_id << "][" << GetModuleContext().module_runtime_env
// << "] pai::log::Logger instance creation fails.\n";
// return;
// }
// }
// if ((m_logger != NULL)&&(!logmsg.empty()))
// {
// if (logPriority == pai::log::PAI_ERROR)
// m_logger->Error(logmsg);
// if (logPriority == pai::log::PAI_INFO)
// m_logger->Info(logmsg);
// if (logPriority == pai::log::PAI_DEBUG)
// m_logger->Debug(logmsg);
// }
// }
// bool CModule::HasProjectDataGenerated() const
// {
// std::vector<pai::module::OutputType> outputTypes = m_metaData.GetOutputTypes();
// return find(outputTypes.begin(),outputTypes.end(),None)==outputTypes.end();
// }
// void CModule::SetOutputMetaData(const string& key, Any anything)
// {
// for(unsigned int i = 0;i < unsigned(m_usedOutputPortCount);i++)
// outputBuffers[m_usedOutputPort[i]]->SetMetaData(key,anything);
// }
// bool CModule::HasInputMetaData(const int& port,const string& key)
// {
// assert(port < m_nInputPortCount);
// CBuffer* buffer = this->GetInputBuffer(port);
// if(buffer==NULL)
// {
// std::stringstream ss;
// // ss << this->m_modulecontext.module_runtime_name
// // << " input port:" << port << "doesn't exist!"
// // << std::endl;
// // throw pai::error::invalid_argument(ss.str());
// }
// return buffer->HasMetaData(key);
// }
// bool CModule::HasInputMetaData(const string& key)
// {
// return this->HasInputMetaData(0,key);
// }
// void CModule::SetFileHeader(Any anything)
// {
// this->SetOutputMetaData(WORKLFOWCONTEXT_KEY_DATASETINFO,anything);
// }
// std::string CModule::GetLocalAttemptDir()
// {
// std::string localAttempDir = this->GetWorkflowContext().Get<std::string>(CONST_PROP_ATTEMPTASK_DIR);
// size_t pos = localAttempDir.find_first_of(",");
// if (pos != std::string::npos)
// {
// localAttempDir = localAttempDir.substr(0,pos);
// }
// return localAttempDir;
// }
std::string CModule::GetClusterJobTempDir(const std::string& jobid)
{
std::string clusterJobTempDir = m_workflowClusterTmp + CONST_FILEPATH_SEPARATOR + jobid;
return clusterJobTempDir;
}
// std::string CModule::GetClusterCurrentJobAttemptDir()
// {
// std::string clusterAttempDir = m_workflowClusterTmp + CONST_FILEPATH_SEPARATOR
// + this->GetModuleContext().taskInfo.job_id + CONST_FILEPATH_SEPARATOR
// + this->GetModuleContext().taskInfo.attempt_id;
// return clusterAttempDir;
// }
std::string CModule::GetClusterPreJobAttemptDir()
{
return GetClusterJobTempDir(this->preJobId);
}
// bool CModule::CleanClusterJobTempDir(const std::string& path)
// {
// std::auto_ptr<File> cp = VFSFactory::Instance()->GetFile(path);
// return (cp->Delete() == ERR_SUCCESS);
// }
unsigned long CModule::GetMemorySize()
{
unsigned long total = 0;
total = sizeof(*this);
if(parameter)
{
total += sizeof(*parameter);
}
if(inputBuffers)
{
total += sizeof(**inputBuffers);
}
if(outputBuffers)
{
total += sizeof(**outputBuffers);
}
if(m_usedInputPort)
{
total += sizeof(*m_usedInputPort);
}
if(m_usedOutputPort)
{
total += sizeof(*m_usedOutputPort);
}
return total;
}
}
}