logplus/Workflow/WFEngine/Module/include/Buffer.h

338 lines
13 KiB
C
Raw Normal View History

2026-01-16 17:18:41 +08:00
/**
* @file Buffer.h
* @brief BufferBuffer相关的操作
* @author Dev
* @date 2011-5-25
*/
#ifndef PAI_FRAME_MODULEAPI_BUFFER_H
#define PAI_FRAME_MODULEAPI_BUFFER_H
// #include "BufferElement.h"
// #include "ModuleContext.h"
// #include "WorkflowContext.h"
#include <time.h>
#include <iostream>
#include <map>
#include <vector>
class BufferTest;
namespace pai {
namespace module {
/**
* Buffer可能会被模块用来作为输入和输出
*/
const int BufferAsInput = 0; /**< 输入Buffer标识*/
const int BufferAsOutput = 1; /**< 输出Buffer标识*/
const unsigned int MaxBufferSize = 2000; /**< 默认Buffer的大小*/
/**
* @var typedef void* ModuleId
* @brief
*/
typedef void* ModuleId;
/**
* @brief
*/
struct SProcessInfo {
std::string module_runtime_id; /**< 处理的模块运行时ID */
int port; /**< 模块链接端口 */
int tracenum; /**< 处理道数 */
int iotype; /**< 输入和输出类型 */
int time; /**< 两次操作间到时间差*/
SProcessInfo():module_runtime_id(""),port(0),tracenum(0),iotype(0),time(0){}
};
/**
* @brief Buffer管理的的目标
*/
enum BUFFER_MANAGER_TARGET {
BMT_MODULE, /**< 模块使用的Buffer */
BMT_OTHER /**< 非模块使用的Buffer */
};
/**
* @var typedef std::map<const ModuleId, unsigned long> ModuleProcessIndex
* @brief ,
*/
typedef std::map<const ModuleId, unsigned long> ModuleProcessIndex;
/**
* @var typedef std::map<const ModuleId, SProcessInfo> ModuleProcess
* @brief Buffer关联的所有读写模块
*/
typedef std::map<const ModuleId, SProcessInfo> ModuleProcess;
/**
* @var typedef void (*Callback)(const SProcessInfo info)
* @brief
*/
typedef void (*Callback)(const SProcessInfo info);
class CModule;
/**
* @class CBuffer
* @brief
*/
class PAI_MODULE_EXPORT CBuffer {
private:
CBuffer(const CBuffer& buffer);
CBuffer& operator=(const CBuffer& buffer);
public:
/**
* @brief .
* @param[in] size
* @param[in] target Buffer管理的目标类型
* @param[in] cbUpdateModuleProgress
*/
CBuffer(int size, BUFFER_MANAGER_TARGET target, Callback cbUpdateModuleProgress);
/**
* @brief .
* @param[in] size
* @param[in] cbUpdateModuleProgress
*/
CBuffer(int size, Callback cbUpdateModuleProgress);
/**
* @brief .
* @param[in] size
*/
CBuffer(int size);
/**
* @brief
*/
virtual ~CBuffer();
/**
* @deprecated
* @see void SetNextElement(CBufferElement* element,ModuleId moduleId,ElementType type);
* @brief buffer元素到bufferElementType
* @param[in] moduleID id使this
*/
// void SetNextElement(CBufferElement* element, ModuleId moduleId);
/**
* @deprecated
* @see CBufferElement* GetNextElement(ModuleId moduleID, ElementType &type);
* @brief buffer元素TraceElementType
* 使
* @param[in] moduleID id使this
* @return
*/
// CBufferElement* GetNextElement(ModuleId moduleID);
/**
* @brief (N道)buffer元素N道是Any Marker就一直Skip到TRACE(Skip到DATA_END_MARKER)
* TRACE后(DATA_END_MARKER)
* a.N+1TRACE,TRACE
* b.N+1DATA_END_MARKER,DATA_END_MARKERSkip这个Element
* c.N+1GATHER_END_MARKER/LINE_END_MARKER,Skip这个Element
* N+2TRACEN+1Marker的类型
* N+2Any MARKERSkip(N+2,N+3...)TRACE为止或者到DATA_END_MARKER
* MARKER这种情况MARKER(GATHER_END_MARKER < LINE_END_MARKER < DATA_END_MARKER)
* buffer元素之后type返回道集线
* @remark Module实现的逻辑来说Get一定会有数据NULL是异常情况OVER
* Buffer里没有输入任何数据/DATA_END_MARKERGet只能返回NULL,Type=DATA_END_MARKER
* @param[in] moduleID id
* @param[in/out] type
* type为TRACE
* type为GATHER_ENDMARKER
* type为LINE_ENDMARKER线线
* type为DATA_ENDMARKER
* @return (DATA_END_MARKER)NULL
*/
// CBufferElement* GetNextElement(ModuleId moduleID, ElementType &type);
/**
* @brief buffer元素Tracebuffer元素之后type类型保存不同的分隔符type
* type为TRACE
* type为GATHER_ENDMARKER
* type为LINE_ENDMARKER线
* type为DATA_ENDMARKER
*
* @param[in] element
* @param[in] moduleID id
* @param[in] type TRACE--> GATHER_ENDMARKER--> LINE_ENDMARKER-->线 DATA_ENDMARKER-->
*/
// void SetNextElement(CBufferElement* element,ModuleId moduleId,ElementType type);
////////////////////////////////////////////////////////////////////////
//FIXME:GetNextElements/SetNextElements 目前没有人调用,而且这个两个方法目前
//不太严谨当一个Gather大于BufferSize的时候就会出现deadlock,建议不要继续使用
////////////////////////////////////////////////////////////////////////
/**
* @deprecated
* @brief
* 1. processIndex到m_size之间gatherSplitMarker存在
* @param[in] gather
* @param[in] moduleId 使this
* @exception pai::error::invalid_argument
*/
// void SetNextElements(std::vector<CBufferElement*> gather, ModuleId moduleId );
/**
* @deprecated
* @brief
* @param[in] moduleID 使this
* @return
*/
// std::vector<CBufferElement*> GetNextElements(ModuleId moduleID);
/**
* @brief buffer的长度
* @return buffer的长度
*/
int Size() const {
return static_cast<int>(m_size);
}
/**
* @brief Buffer是否为空
* @return buffer中没有元素truefalse
*/
bool IsEmpty() const {
return m_size == 0;
}
/**
* @brief workflow调用使
* @param[in] moduleID
*/
// void AddReferenceModule(ModuleId module);
/***
* @brief workflow调用
* @param[in] moduleID
* @param[in] info
*/
void AddProcessContext(ModuleId moudle, const SProcessInfo& info) {
this->m_moduleProcess[moudle] = info;
}
/**
* @brief Buffer的元数据信息
* @param[in] key
* @param[in] anything
*/
// void SetMetaData(const std::string& key, Any anything);
/**
* @brief Buffer元数据信息
* @param[in] key
* @return
*/
template<typename T>
T GetMetaData(const std::string& key)
{
return this->m_context.Get<T>(key);
};
/**
* @brief
* @param[in] key key
* @return BufferContext中含有指定key的元信息truefalse
*/
bool HasMetaData(const std::string& key);
private:
/**
* @brief Element
*/
// CBufferElement* PreReadyNextElement(ModuleId moduleID);
/**
* @brief skip连续的Marker(element是marker)
*/
// void SkipContinuousMarkerElement(ModuleId moduleID, ElementType &type);
/**
* @brief Skip&free Element.
*/
// void SkipCurrentElement(CBufferElement* element,ModuleId moduleID);
/**
* @brief Element的数据类型
*/
// void PreReadyNextElementType(ModuleId moduleID, ElementType &type);
/**
* @brief TraceDATA_END_MARKER都没有获取到TraceNULL
*/
// CBufferElement* GetTraceNextElement(ModuleId moduleID);
// CopyType CalculateCopyType(const std::string& caller, ModuleId);
/**
* Readable & Writable become inline as they both are invoked frequently in
* limited place. although they are not small method, they will not increase
* too much size in final executable.
*/
/**
* Module is allow to read from the buffer, the conditions must satisfy are:
* 1. module.current - inputIndex < 0
*/
bool Readable(const std::string& caller, ModuleId moduleId);
/**
* InputMoudle is allow to write to the buffer, the conditions must satisfy are:
* 1. inputIndex - MAX(processIndex) >=0
* 2. inputIndex - MIN(processIndex) < size //buffer size
*/
bool Writable(const std::string& caller);
/***
* @brief buffer不能满足读取或者写入条件的时候线
*/
inline void Block();
/***
* @brief
*/
void Init();
/**
* @brief
* processIndex到m_size之间gatherSplitMarker存在
* @param[in] moduleID ,使this
* @param[out] nextGatherEndIndex
* @return bool
*/
bool NextElementsIsReady(ModuleId moduleID,unsigned long & nextGatherEndIndex);
/**
* @brief
* @param[in] moduleID
* @param[out] element buffer指针
*/
// void GetMultiTraceToElement(ModuleId moduleID,CBufferElement* element);
/**
* @brief
* @param[in] moduleID
* @param[out] element buffer指针
*/
// void SetMultiTraceToElement(ModuleId moduleID,CBufferElement* element);
/**
* @brief
* @param[in] moduleID
* @param[in] componentMap ID及分量对应的数据类型值
* @param[out] dataType
* @return bool truefalse
*/
bool GetSelectedMultiwaveComponent(ModuleId moduleID ,std::map< std::string ,int > & componentMap , int & dataType );
private:
//TODO remove this after finding a better solution
friend class ::BufferTest;
// CBufferElement** m_elements;
unsigned m_size;
volatile unsigned m_elementCount;
/**
*
*/
Callback m_cbUpdateModuleProgress;
/**
* the processIndex must be initialized for each module using it when the Buffer is created
*/
ModuleProcessIndex m_processIndex;
/**
*
*/
ModuleProcess m_moduleProcess;
/**
* elementCount is the number of elements has been written to the buffer by previous modules
*/
time_t m_prevGetTime;
time_t m_prevSetTime;
BUFFER_MANAGER_TARGET m_target;
// BufferContext m_context;
};
}
}
#endif