logplus/Workflow/WFEngine/Module/src/Buffer.cpp

532 lines
20 KiB
C++
Raw Normal View History

2026-01-16 17:18:41 +08:00
/*
* Buffer.cpp
*
* Created on: May 25, 2011
* Author: dev
*/
#include "Buffer.h"
#include "Module.h"
#include "error.h"
#include "Utils.h"
#include "Configure.h"
#include "Turtle.h"
#include <iostream>
#include <limits.h>
#include <stdio.h>
#include "MultiwaveTrace.h"
using namespace std;
namespace pai {
namespace module {
CBuffer::CBuffer(int size) :
m_elements(NULL), m_size(unsigned(size)), m_elementCount(0), m_cbUpdateModuleProgress(NULL),
m_processIndex(), m_moduleProcess(), m_prevGetTime(), m_prevSetTime(), m_target(BMT_MODULE), m_context(){
Init();
}
CBuffer::CBuffer(int size, Callback cbUpdateModuleProgress) :
m_elements(NULL), m_size(unsigned(size)), m_elementCount(0), m_cbUpdateModuleProgress(cbUpdateModuleProgress),
m_processIndex(),m_moduleProcess(),m_prevGetTime(),m_prevSetTime(),m_target(BMT_MODULE),m_context(){
Init();
}
CBuffer::CBuffer(int size, BUFFER_MANAGER_TARGET target, Callback cbUpdateModuleProgress) :
m_elements(NULL), m_size(unsigned(size)), m_elementCount(0), m_cbUpdateModuleProgress(cbUpdateModuleProgress),
m_processIndex(), m_moduleProcess(), m_prevGetTime(), m_prevSetTime(), m_target(target), m_context(){
Init();
}
void CBuffer::Block()
{
timespec tm;
tm.tv_sec = 0;
tm.tv_nsec = 1000 * 1000 * 50;
//clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &tm, NULL);
GetTurtleAPI()->Sleep(tm.tv_nsec);
}
void CBuffer::Init() {
if (m_size <= 0 || m_size > MaxBufferSize) {
m_size = MaxBufferSize;
}
pai::conf::CConfigure conf;
const string confSize = conf.GetValueByKey("MODULE_BUFFER_SIZE");
//TODO:LOGGER CONNECT HDFS ERROR,SO COMMENTS BELOW LINE(IN BUILDENGINE)
//pai::log::Info(_FLF("PAICONF MODULE_BUFFER_SIZE=" + confSize));
if(confSize!="")
m_size = unsigned(pai::utils::CUtils::StrToInt(confSize));
m_elements = new CBufferElement*[m_size];
for (unsigned i = 0; i < m_size; i++) {
m_elements[i] = NULL;
}
m_elementCount = 0;
time(&m_prevGetTime);
time(&m_prevSetTime);
}
CBuffer::~CBuffer() {
if (!m_elements) {
return;
}
//在Buffer的数据流转中由使用module去统一清理CBufferElement(Trace),CBuffer不在负责析够BufferElement
for (unsigned int i = 0; i < m_size; ++i) {
if (m_elements[i]) {
m_elements[i] = NULL;
}
}
//m_elements = (CBufferElement**) NULL;
m_elements = reinterpret_cast<CBufferElement**>(NULL) ;
}
void CBuffer::SetNextElement(CBufferElement* element, ModuleId moduleId) {
string caller = "...";
if (moduleId != NULL && this->m_target == BMT_MODULE) {
CModule* module = reinterpret_cast<CModule*>(moduleId);
caller = module->GetModuleContext().module_runtime_name;
if (element == NULL) {
cerr << caller << " SetNextElement element is null" << endl;
}
}
while (!Writable(caller)) {
//cout << caller << " waiting to write\n";
Block();
}
//多波数据处理逻辑,如果不是多波数据此方法内部不做任何处理
SetMultiTraceToElement(moduleId,element);
unsigned long idx = m_elementCount % m_size;
//cerr << caller << " idx= " << idx << " size: " << size << " elementCount:" << elementCount << endl;
if (m_elements[idx]) {
delete m_elements[idx];
m_elements[idx] = NULL;
}
m_elements[idx] = element;
// cout << "set m_elements[" << idx << "]=" << m_elements[idx] << ", addr(element)=" << element << "\n";
++m_elementCount; //should only increment when the data is already in the buffer
//cerr << caller <<" is writed one buffer blocked"<<endl;
/**
*
*/
if (m_cbUpdateModuleProgress && moduleId && element->GetElementType() == TRACE) {
/**
*
*/
time_t now;
time(&now);
m_moduleProcess[moduleId].tracenum = 1;
m_moduleProcess[moduleId].time = int(now - m_prevSetTime);
m_cbUpdateModuleProgress(m_moduleProcess[moduleId]);
m_prevSetTime = now;
}
}
void CBuffer::SetNextElements(std::vector<CBufferElement*> gather, ModuleId moduleId)
{
if (gather.size() <= 0)
throw pai::error::invalid_argument("Invalid Gather.Gather is empty!");
if (!gather[gather.size() - 1]->IsGatherSplitMarker()&&!gather[gather.size() - 1]->IsEndMarker())
throw pai::error::invalid_argument("Invalid Gather.Gather must end with a GatherSplitMarker,Last Gather must end with a EndMarker!");
if (gather.size() > m_size)
throw pai::error::invalid_argument("Invalid Gather.Gather length exceed max buffer size!");
unsigned long current = m_processIndex[moduleId];
if (current + gather.size() > m_size)
throw pai::error::invalid_argument("Invalid Gather.Gather length exceed buffer left size!");
for (unsigned int i = 0; i < gather.size(); i++)
{
SetNextElement(gather[i], moduleId);
}
}
/**
* @brief buffer元素Tracebuffer元素之后type类型保存不同的分隔符
* type为TRACE
* type为GATHER_ENDMARKER
* type为LINE_ENDMARKER线
* type为DATA_ENDMARKER
*
* @param[in] element
* @param[in] moduleID id
* @param[in] type TRACE--> GATHER_ENDMARKER--> LINE_ENDMARKER-->线 DATA_ENDMARKER-->
*/
void CBuffer::SetNextElement(CBufferElement* element,ModuleId moduleId,ElementType type)
{
SetNextElement(element,moduleId);
if(type!=TRACE)
SetNextElement(CBufferElement::CreateElement(type),moduleId);
}
std::vector<CBufferElement*> CBuffer::GetNextElements(ModuleId moduleID)
{
unsigned long gatherend;
while (!NextElementsIsReady(moduleID, gatherend))
{
Block();
}
unsigned long current = m_processIndex[moduleID];
vector<CBufferElement*> gather;
gather.reserve(gatherend - current + 1);
for (unsigned long i = current; i <= gatherend; i++)
{
gather.push_back(GetNextElement(moduleID));
}
return gather;
}
//要求Inputmodule必须传递道集分割的标志位下来
bool CBuffer::NextElementsIsReady(ModuleId moduleID, unsigned long & nextGatherEndIndex)
{
unsigned long current = m_processIndex[moduleID];
for (unsigned long i = current; i < m_size; i++)
{
CBufferElement* currentelement = m_elements[i];
if (currentelement->IsGatherSplitMarker()||currentelement->isEnd)
{
nextGatherEndIndex = i;
return true;
}
}
return false;
}
void CBuffer::AddReferenceModule(ModuleId module) {
//cout << module << " module" << endl;
if (module == NULL) {
cerr << " AddReferenceModule moduleId is null!" << endl;
}
if (m_processIndex.count(module) > 0) {
cerr << "moduleId[" << module << "] AddReferenceModule is exist!" << endl;
}
m_processIndex[module] = 0; //the value of the index indicates the element to be process NOW
}
/**
* This method should check if an element is allowed to be returned before
* returning the BufferElement, the thread will be blocked until the element
* is allowed to be returned, e.g. all modules need the BufferElement has
* processed the data in the element
*/
CBufferElement* CBuffer::GetNextElement(ModuleId moduleId) {
//sleep(1);
string caller = "...";
if (moduleId != NULL && this->m_target == BMT_MODULE) {
CModule* module = reinterpret_cast<CModule*>(moduleId);
caller = module->GetModuleContext().module_runtime_name;
}
while (!Readable(caller, moduleId)) {
//cout << caller << " Waiting to read\n";
Block();
}
unsigned long current = m_processIndex[moduleId];
//cout << caller << " current=" << current << ", element count=" << m_elementCount << "\n";
unsigned long idx = current % m_size;
CBufferElement* element = m_elements[idx];
//cout << caller << " element 1= " << element << "\n";
//cout << "length = " << element->GetDataLength() << "\n";
if (element->IsEndMarker()) {
//cerr << caller <<" is read finished"<<endl;
return element;
}
//cout << caller << " element 2= " << element << "\n";
CopyType copyType = CalculateCopyType(caller, moduleId);
CBufferElement* newElement = new CBufferElement(*element, copyType);
if (copyType == ShallowCopy) {
element->Reset();
delete element;
m_elements[idx] = NULL;
}
//cout << caller << " new element = " << newElement << "\n";
element = newElement;
m_processIndex[moduleId] = current + 1; //should only increase when the element is read
//cout << caller << " new process index = " << m_processIndex[moduleId] << "\n";
//cerr << caller <<" is readed one buffer blocked"<<endl;
/**
*
*/
if (m_cbUpdateModuleProgress && moduleId && element->GetElementType() == TRACE) {
/**
*
*/
time_t now;
time(&now);
m_moduleProcess[moduleId].tracenum = 1;
m_moduleProcess[moduleId].time = int(now - m_prevGetTime);
m_cbUpdateModuleProgress(m_moduleProcess[moduleId]);
m_prevGetTime = now;
}
//cout << caller << " safter update progress\n";
return element;
}
CBufferElement* CBuffer::GetTraceNextElement(ModuleId moduleID)
{
CBufferElement* element = GetNextElement(moduleID);
// std::cout << "GetTraceNextElement first element address:" << element << ",type:" << element->GetElementType() << std::endl;
while(!element->IsTraceElement()
&& !element->IsEndMarker())
{
element = GetNextElement(moduleID);
// std::cout << "GetTraceNextElement skip_marker element address:" << element << ",type:" << element->GetElementType() << std::endl;
}
if(element->IsTraceElement())
{
// std::cout << "GetTraceNextElement find trace element address:" << element << ",type:" << element->GetElementType() << std::endl;
return element;
}else if(element->IsEndMarker()){
// std::cout << "GetTraceNextElement find->end endelement address:" << element << ",type:" << element->GetElementType() << std::endl;
return NULL;
}
return NULL;
}
CBufferElement* CBuffer::PreReadyNextElement(ModuleId moduleID)
{
string caller = "...";
if (moduleID != NULL && this->m_target == BMT_MODULE)
{
CModule* module = reinterpret_cast<CModule*>(moduleID);
caller = module->GetModuleContext().module_runtime_name;
}
while (!Readable(caller, moduleID))
{
//cout << caller << " Waiting to read\n";
Block();
}
unsigned long current = m_processIndex[moduleID];
unsigned long idx = current % m_size;
// std::cout << "PreReadyNextElement index=" << idx << ",element address:" << m_elements[idx] << ",type:" << m_elements[idx]->GetElementType() << std::endl;
return m_elements[idx];
}
void CBuffer::SkipContinuousMarkerElement(ModuleId moduleID, ElementType &type)
{
CBufferElement* element = PreReadyNextElement(moduleID);
// std::cout << "SkipContinuousMarkerElement PreReadyNextElement element address:" << element << ",type:" << element->GetElementType() << std::endl;
if(element->IsTraceElement()){
return;
}else if(element->IsEndMarker()){
type = DATA_ENDMARKER;
SkipCurrentElement(element,moduleID);
}else{
// std::cout << "SkipContinuousMarkerElement more marker,preMarker:" << type << ",currentMarker:" << element->GetElementType() << std::endl;
if(element->GetElementType() > type)
type = element->GetElementType();
// std::cout << "SkipContinuousMarkerElement changed marker:" << type << std::endl;
SkipCurrentElement(element,moduleID);
SkipContinuousMarkerElement(moduleID,type);
}
}
void CBuffer::SkipCurrentElement(CBufferElement* /*element*/,ModuleId moduleID)
{
m_processIndex[moduleID]++;
}
void CBuffer::PreReadyNextElementType(ModuleId moduleID, ElementType &type)
{
CBufferElement* element = PreReadyNextElement(moduleID);
// std::cout << "PreReadyNextElementType element address:" << element << ",type:" << element->GetElementType() << std::endl;
if(element->IsTraceElement()){
type = TRACE;
}else if(element->IsEndMarker()){
type = DATA_ENDMARKER;
SkipCurrentElement(element,moduleID);
}else{
type = element->GetElementType();
SkipCurrentElement(element,moduleID);
SkipContinuousMarkerElement(moduleID,type);
}
}
CBufferElement* CBuffer::GetNextElement(ModuleId moduleID, ElementType &type)
{
//step1:读取Trace,如果读到DATA_END_MARKER还读取不到数据返回NULL
CBufferElement* element = GetTraceNextElement(moduleID);
if(element==NULL)
{
// std::cout << "GetNextElement don't trace element." << std::endl;
type = DATA_ENDMARKER;
return NULL;
}
// std::cout << "GetNextElement find element address:" << element << ",type:" << element->GetElementType() << std::endl;
//step2:读取下一个Element的类型
PreReadyNextElementType(moduleID,type);
// std::cout << "GetNextElement find nextelement type:" << type << std::endl;
//多波数据处理逻辑,如果不是多波数据此方法内部不做任何处理
GetMultiTraceToElement(moduleID,element);
return element;
}
bool CBuffer::Writable(const string& /*caller*/) {
unsigned long max = 0; //set to max value
unsigned long min = ULONG_MAX; //set to max value
ModuleProcessIndex::iterator end = m_processIndex.end();
for (ModuleProcessIndex::iterator it = m_processIndex.begin(); it != end; it++) {
if (it->second < min)
min = it->second;
if (it->second > max)
max = it->second;
}
//cerr << caller << " " << __FUNCTION__ << " min:" << min << ",max:" << max << ",element count:" << elementCount << endl;
if (m_processIndex.empty()) { //there is no module using the buffer as input
return true;
}
return (m_elementCount >= max) && (m_elementCount - min < m_size);
}
//huangjun modify 2011-10-08
// 目前算法如果当前模块要读取的Buff块index是所有模块中最小的index,那么判断是它最后一个读取此Buff块的Module(ShallowCopy).
// 目前问题但是如果有两个模块都在读取最小index的Buff块那么将出现错误。
// 正确处理:需要判断当前模块是[唯一]读取最小index的Buff块才ShallCopy.
CopyType CBuffer::CalculateCopyType(const string& /*caller*/, ModuleId moduleId) {
unsigned long max = 0;
unsigned long min = ULONG_MAX;
ModuleProcessIndex::iterator end = m_processIndex.end();
for (ModuleProcessIndex::iterator it = m_processIndex.begin(); it != end; it++) {
if (it->second < min)
min = it->second;
if (it->second > max)
max = it->second;
}
//读取最小内存块的模块个数
int min_count = 0;
for (ModuleProcessIndex::iterator it = m_processIndex.begin(); it != end; it++) {
if (it->second == min)
min_count++;
}
//cout << caller << " " << __FUNCTION__ << " min = " << min << "max = " << max << endl;
if (m_processIndex.size() == 1) { //there is only one module using the buffer as input
return ShallowCopy;
}
return (m_processIndex[moduleId] == min && (min_count < 2) && min != max) ? ShallowCopy : DeepCopy;
}
/**
* this method must be mutex between all of the threads using this buffer
* Data can only be read if the data has been written into the buffer
*/
bool CBuffer::Readable(const string& /*caller*/, ModuleId moduleId) {
// cerr << ", m_processIndex " << m_processIndex[moduleId] << ", elemenmt count:" << m_elementCount << endl;
return m_processIndex[moduleId] < m_elementCount;
}
void CBuffer::SetMetaData(const string& key, Any anything)
{
//设置当前Buffer的MetaData
this->m_context.Put(key,anything);
if(this->m_target == BMT_MODULE){
//递归设置下游模块MetaData
ModuleProcessIndex::iterator end = m_processIndex.end();
for (ModuleProcessIndex::iterator it = m_processIndex.begin(); it != end; it++) {
CModule* module = reinterpret_cast<CModule*>(it->first);
module->SetOutputMetaData(key,anything);
}
}
}
bool CBuffer::HasMetaData(const std::string& key)
{
return this->m_context.ContainsKey(key);
}
void CBuffer::GetMultiTraceToElement(ModuleId moduleID,CBufferElement* element){
if((element->elementType==TRACE)&&(NULL!=element->GetData())){
//设置多波分量中的分量ID(josn文件中的ID)和此分量对应的数据类型
std::map<string ,int> componentMap;
componentMap.insert(std::make_pair("x-component",CONST_THREE_COMPONENT_DATATYPE_X_CODE));
componentMap.insert(std::make_pair("y-component",CONST_THREE_COMPONENT_DATATYPE_Y_CODE));
componentMap.insert(std::make_pair("z-component",CONST_THREE_COMPONENT_DATATYPE_Z_CODE));
int m_dataType =1;
//查找需要处理的多波分量,如何未找到则不进行多波处理
bool isMultiwave=false;
isMultiwave=GetSelectedMultiwaveComponent(moduleID,componentMap,m_dataType);
if(isMultiwave){
::pai::ios::seis::MultiwaveTrace* multiwaveTrace =(::pai::ios::seis::MultiwaveTrace*)(element->GetData());
if(CONST_THREE_COMPONENT_DATATYPE_X_CODE==m_dataType){
multiwaveTrace->m_traceData=multiwaveTrace->m_traceX_SR->m_traceData;
multiwaveTrace->m_traceHeader=multiwaveTrace->m_traceX_SR->m_traceHeader;
}else if(CONST_THREE_COMPONENT_DATATYPE_Y_CODE==m_dataType){
multiwaveTrace->m_traceData=multiwaveTrace->m_traceY_SV->m_traceData;
multiwaveTrace->m_traceHeader=multiwaveTrace->m_traceY_SV->m_traceHeader;
}else if(CONST_THREE_COMPONENT_DATATYPE_Z_CODE==m_dataType){
multiwaveTrace->m_traceData=multiwaveTrace->m_traceZ->m_traceData;
multiwaveTrace->m_traceHeader=multiwaveTrace->m_traceZ->m_traceHeader;
}else{
std::cerr << __FILE__ << __LINE__ <<" m_dataType [ "<<m_dataType<< " ]"<<" , there is no matching component"<<std::endl;
}
}
}
}
void CBuffer::SetMultiTraceToElement(ModuleId moduleID,CBufferElement* element){
if((element->elementType==TRACE)&&(NULL!=element->GetData())){
//设置多波分量中的分量ID(josn文件中的ID)和此分量对应的数据类型
std::map<string ,int> componentMap;
componentMap.insert(std::make_pair("x-component",CONST_THREE_COMPONENT_DATATYPE_X_CODE));
componentMap.insert(std::make_pair("y-component",CONST_THREE_COMPONENT_DATATYPE_Y_CODE));
componentMap.insert(std::make_pair("z-component",CONST_THREE_COMPONENT_DATATYPE_Z_CODE));
int m_dataType =1;
//查找需要处理的多波分量,如何未找到则不进行多波处理
bool isMultiwave=false;
isMultiwave=GetSelectedMultiwaveComponent(moduleID,componentMap,m_dataType);
if(isMultiwave){
::pai::ios::seis::MultiwaveTrace* multiwaveTrace =(::pai::ios::seis::MultiwaveTrace*)(element->GetData());
if(CONST_THREE_COMPONENT_DATATYPE_X_CODE==m_dataType){
multiwaveTrace->m_traceX_SR->m_traceData=multiwaveTrace->m_traceData;
multiwaveTrace->m_traceX_SR->m_traceHeader=multiwaveTrace->m_traceHeader;
}else if(CONST_THREE_COMPONENT_DATATYPE_Y_CODE==m_dataType){
multiwaveTrace->m_traceY_SV->m_traceData=multiwaveTrace->m_traceData;
multiwaveTrace->m_traceY_SV->m_traceHeader=multiwaveTrace->m_traceHeader;
}else if(CONST_THREE_COMPONENT_DATATYPE_Z_CODE==m_dataType){
multiwaveTrace->m_traceZ->m_traceData=multiwaveTrace->m_traceData;
multiwaveTrace->m_traceZ->m_traceHeader=multiwaveTrace->m_traceHeader;
}else{
std::cerr << __FILE__ << __LINE__ <<" m_dataType [ "<<m_dataType<< " ]"<<" , there is no matching component"<<std::endl;
}
}
}
}
bool CBuffer::GetSelectedMultiwaveComponent(ModuleId moduleID ,std::map< std::string ,int > & componentMap , int & dataType ){
bool isSelected=false;
//模块指针不为空
if(moduleID != NULL && this->m_target == BMT_MODULE){
CModule* module = reinterpret_cast<CModule*>(moduleID);
CModuleParameter* paramter= module->GetModuleParameter();
for(std::map<string ,int >::iterator it =componentMap.begin(); it!=componentMap.end();++it){
CParameterItem *pItem = paramter->GetParameterItem(it->first);
if(NULL!=pItem){
std::string s_flag =pItem->ValueToString();
//是否选中
bool b_select =atoi(s_flag.c_str()) ;
//判断此分量是否被选中如果选中则赋值datatype
//(目前多波和常规模块窜接时,经工作流分解后,每个常规模块处理一个分量,所以找到第一个被选中的分量后,直接退出循环)
if(b_select){
dataType=it->second;
isSelected=true;
break;
}
}
}
}
return isSelected;
}
}
}