logplus/Workflow/WFEngine/Module/include/WorkflowContext-.h

454 lines
12 KiB
C
Raw Normal View History

2026-01-16 17:18:41 +08:00
/**
* @file WorkflowContext.h
* @brief WorkflowContext相关的类
* @author Ben
* @date 2012-10-25
*/
#ifndef PAI_FRAME_MODULEAPI_WORKFLOW_CONTEXT_H
#define PAI_FRAME_MODULEAPI_WORKFLOW_CONTEXT_H
// #include "error.h"
#include "Turtle.h"
#include <string>
using std::string;
#include <map>
using std::map;
namespace pai
{
namespace workflow
{
class ModuleContainer;
}
}
// for test
class WorkflowContextTest;
namespace pai
{
namespace module
{
/**
* WORKFLOW CONTEXT KEY- DATASETINFO
*/
const std::string WORKLFOWCONTEXT_KEY_DATASETINFO = "DataSetInfo"; /**< WorkflowContext中文件头的key常量 */
const std::string KEY_DATABASE_ID = "database_id"; /**< WorkflowContext中database_id的key常量*/
const std::string KEY_CHAIN_ID = "submitJob.info.chainId"; /**< WorkflowContext中作业链id的key常量*/
const std::string KEY_HADOOP_JOB_ID = "pai.workflow.job.hadoop.id"; /**< WorkflowContext中hadoopId的key常量*/
const std::string KEY_JOB_DB_ID = "pai.workflow.job.db.id"; /**< WorkflowContext中工作流id的key常量*/
const std::string INPUT_GATHER_ORDER = "INPUT_GATHER_ORDER"; /**< WorkflowContext中输入道集顺序的key常量*/
const std::string STACK_FLAG="STACK_FLAG"; /**< WorkflowContext中Stack_flag的key常量*/
const std::string KEY_SURCON_MODULE_ISAPPLY = "pai.workflow.surcon.isapply"; /**< WorkflowContext中isapply的key常量*/
const std::string HADOOP_CONF_KEY_PRE_HADOOP_JOB_ID = "pai.workflow.pre.job.hadoop.id"; /**< WorkflowContext中上一个作业的hadoopId的key常量*/
const std::string RESETDATASETID = "resetDataSetID"; /**< WorkflowContext中resetDataSetID的key常量*/
const std::string CONTEXT_IDXSORTINGCODE="CONTEXT_IDXSORTINGCODE"; /**< WorkflowContext中idxSortingCode的key常量*/
const std::string KEY_MAPRED_SPLIT="split"; /**< WorkflowContext中切片的key常量*/
/**
*
*/
const std::string MC_FILEPATH_X="mc_filepath_x"; /**< WorkflowContext中三分量中x分量的文件*/
const std::string MC_FILEPATH_Y="mc_filepath_y"; /**< WorkflowContext中三分量中y分量的文件*/
const std::string MC_FILEPATH_Z="mc_filepath_z"; /**< WorkflowContext中三分量中z分量的文件*/
/**
* @class Placeholder
* @brief
*/
class Placeholder
{
public:
/**
* @brief
*/
virtual ~Placeholder()
{
}
/**
* @brief
*/
virtual Placeholder * Clone() const = 0;
};
/**
* @class Holder
* @brief 使T决定
*/
template<typename T> class Holder: public Placeholder
{
public:
/**
* @brief
*/
Holder(const T & t) :
m_held(t)
{
}
/**
* @brief
*/
virtual Placeholder * Clone() const
{
return new Holder(m_held);
}
/**
* @brief <code>Holder</code>
* @return <code>Holder</code>
*/
T Get()
{
return m_held;
}
private:
Holder & operator=(const Holder&);//unimplemented
T m_held;
};
/**
* @class Any
* @brief Key-Value中的Value类型value时使
*/
class Any
{
public:
/**
* @brief
*/
Any() :
m_content(NULL)
{
}
/**
* @brief
*/
template<typename T> Any(const T & t) :
m_content(new Holder<T> (t))
{
}
/**
* @brief
*/
Any(const Any& other) :
m_content(other.m_content ? other.m_content->Clone() : NULL)
{
}
/**
* @brief
*/
~Any()
{
if (m_content != NULL)
{
delete m_content;
m_content = NULL;
}
}
/**
* @brief <code>Any</code>
* @return
*/
Any & swap(Any & rhs)
{
std::swap(m_content, rhs.m_content);
return *this;
}
/**
* @brief =
*/
template<typename T> Any& operator=(const T & rhs)
{
anything(rhs).swap(*this);
return *this;
}
/**
* @brief =
*/
Any & operator=(Any rhs)
{
rhs.swap(*this);
return *this;
}
/**
* @brief
* @return truefalse
*/
bool empty() const
{
return !m_content;
}
/**
* @brief <code>Any</code>
*/
template<typename T> T Get()
{
return (dynamic_cast<Holder<T>*> (m_content))->Get();
}
private:
Placeholder * m_content;
};
/**
* @class Context
* @brief Key-Value的上线文容器
* <p>,Context,
*/
class PAI_MODULE_EXPORT Context
{
public:
/**
* @brief
*/
Context() : m_container(new map<string, Any>())
{
init ();
}
/**
*
*/
Context(const Context& another) : m_container(another.m_container)
{
init ();
}
/**
* @brief =
*/
Context& operator=(const Context& rhs)
{
if (this == &rhs)
return *this;
pthread_mutex_lock(&m_Locker);
if (m_container != NULL)
{
delete m_container;
m_container = NULL;
}
m_container = rhs.m_container;
pthread_mutex_unlock(&m_Locker);
return *this;
}
/**
* TODO
* @brief
* @remark
* -
* -
*/
virtual ~Context(){
pthread_mutex_destroy(&m_Locker);
}
/**
* @brief <code>Context</code>
* @param[in] key
* @param[in] anything
*/
void Put(const string& key, Any anything)
{
pthread_mutex_lock(&m_Locker);
(*m_container)[key] = anything;
pthread_mutex_unlock(&m_Locker);
}
/**
* @brief <code>Context</code>key的值
* @param[in] key
* @return key的值
*/
template<typename T>
T Get(const string& key)
{
pthread_mutex_lock(&m_Locker);
if (m_container->count(key) == 0)
{
pthread_mutex_unlock(&m_Locker);
// throw pai::error::key_not_found_error("Key [" + key + "] not found.");
}
T rest =(*m_container)[key].Get<T> ();
pthread_mutex_unlock(&m_Locker);
return rest;
}
/**
* @brief <code>Context</code>key的键值对
* @param[in] key
* @return truefalse
*/
bool ContainsKey(const string& key)
{
pthread_mutex_lock(&m_Locker);
bool rest = false;
if (m_container->count(key) == 0)
rest = false;
else
rest = true;
pthread_mutex_unlock(&m_Locker);
return rest;
}
private:
void init ()
{
pthread_mutex_init(&m_Locker,NULL);
}
map<string, Any> * m_container;
pthread_mutex_t m_Locker;
};
/**
* @class BufferContext
* @brief Buffer上下文信息
*/
class BufferContext:public Context
{
public:
/**
* @brief
*/
BufferContext() : Context()
{
}
/**
* @brief
*/
BufferContext(const BufferContext& another) : Context(another)
{
}
/**
* @brief =
*/
BufferContext& operator=(const BufferContext& rhs)
{
if (this == &rhs)
return *this;
Context::operator=(rhs);
return *this;
}
/**
* @brief
* @remark
* -
* -
*/
virtual ~BufferContext(){}
};
/**
* @class WorkflowContext
* @brief
* <p>WorkflowContext获取全局性的信息以及共享信息.
*/
class WorkflowContext:public Context
{
private:
friend class ::WorkflowContextTest;//for test
friend class pai::workflow::ModuleContainer;
//module 所在的分支
std::vector<std::string> branchs;
/**
* need_branch == true使put操作期望这样的结果
* (branch-id)-key anything保存多分
*
* need_branch == false使
* key anything保存多分
*/
void Put(const string& key, Any anything, bool need_branch)
{
if(need_branch)
{
if(branchs.size() == 0){
throw /*pai::error::runtime_error*/(" Illegal use of context or program error!");
}
for(std::vector<std::string>::iterator it = branchs.begin(); it != branchs.end(); it++)
{
std::string key_tmp = (*it) + "-" + key;
Context::Put(key_tmp, anything);
}
}
else
{
Context::Put(key, anything);
}
}
public:
/**
* @brief
*/
WorkflowContext() : Context(), branchs()
{
}
/**
* @brief
*/
WorkflowContext(const WorkflowContext& another) : Context(another), branchs(another.branchs)
{
}
/**
* @brief <code>WorkflowContext</code>
* @param[in] key
* @param[in] anything
*/
void Put(const string& key, Any anything)
{
Put(key, anything, true);
}
/**
* @brief <code>WorkflowContext</code>
* <p>get操作时
*
* @param[in] key
* @return
*/
// template<typename T>
// T Get(const string& key)
// {
// try{
// return Context::Get<T>(key);
// }catch(pai::error::key_not_found_error& e){
// for(std::vector<std::string>::iterator it = branchs.begin(); it != branchs.end(); it++)
// {
// std::string key_tmp = (*it) + "-" + key;
// try{
// return Context::Get<T>(key_tmp);
// }catch(pai::error::key_not_found_error& e){
// continue;
// }
// }
// }
// throw pai::error::key_not_found_error("Key [" + key + "] not found.");
// }
/**
* @brief <code>WorkflowContext</code>key的键值对
* @param[in] key
* @return truefalse
*/
bool ContainsKey(const string& key)
{
bool res = Context::ContainsKey(key);
for(std::vector<std::string>::iterator it = branchs.begin(); it != branchs.end(); it++)
{
const std::string key_tmp = (*it) + "-" + key;
res = (res || Context::ContainsKey(key_tmp));
}
return res;
}
/**
* @brief =
*/
WorkflowContext& operator=(const WorkflowContext& rhs)
{
if (this == &rhs)
return *this;
branchs = rhs.branchs;
Context::operator=(rhs);
return *this;
}
/**
* @brief
* @remark
* -
* -
*/
virtual ~WorkflowContext(){}
};
}
}
#endif