diff --git a/Detectors/TOF/workflow/src/CompressedAnalysisTask.cxx b/Detectors/TOF/workflow/src/CompressedAnalysisTask.cxx index 2757b27ef959c..92507feafda85 100644 --- a/Detectors/TOF/workflow/src/CompressedAnalysisTask.cxx +++ b/Detectors/TOF/workflow/src/CompressedAnalysisTask.cxx @@ -82,7 +82,7 @@ void CompressedAnalysisTask::run(ProcessingContext& pc) } /** loop over input parts **/ - for (auto const& ref : iit) { + for (auto const& ref : iit.parts()) { const auto* headerIn = DataRefUtils::getHeader(ref); auto payloadIn = ref.payload; diff --git a/Detectors/TOF/workflow/src/CompressedInspectorTask.cxx b/Detectors/TOF/workflow/src/CompressedInspectorTask.cxx index 968d26b6aebd0..21985f2217741 100644 --- a/Detectors/TOF/workflow/src/CompressedInspectorTask.cxx +++ b/Detectors/TOF/workflow/src/CompressedInspectorTask.cxx @@ -100,7 +100,7 @@ void CompressedInspectorTask::run(ProcessingContext& pc) } /** loop over input parts **/ - for (auto const& ref : iit) { + for (auto const& ref : iit.parts()) { const auto* headerIn = DataRefUtils::getHeader(ref); auto payloadIn = ref.payload; diff --git a/Detectors/TPC/workflow/readers/include/TPCReaderWorkflow/TPCSectorCompletionPolicy.h b/Detectors/TPC/workflow/readers/include/TPCReaderWorkflow/TPCSectorCompletionPolicy.h index 315737df3ec97..644572f42161a 100644 --- a/Detectors/TPC/workflow/readers/include/TPCReaderWorkflow/TPCSectorCompletionPolicy.h +++ b/Detectors/TPC/workflow/readers/include/TPCReaderWorkflow/TPCSectorCompletionPolicy.h @@ -101,9 +101,10 @@ class TPCSectorCompletionPolicy size_t nMaxPartsPerRoute = 0; int inputType = -1; for (auto it = inputs.begin(), end = inputs.end(); it != end; ++it) { - nMaxPartsPerRoute = it.size() > nMaxPartsPerRoute ? it.size() : nMaxPartsPerRoute; + auto parts = it.parts(); + nMaxPartsPerRoute = parts.size() > nMaxPartsPerRoute ? parts.size() : nMaxPartsPerRoute; bool haveActivePart = false; - for (auto const& ref : it) { + for (auto const& ref : parts) { if (!framework::DataRefUtils::isValid(ref)) { continue; } diff --git a/Framework/Core/include/Framework/InputRecord.h b/Framework/Core/include/Framework/InputRecord.h index d2e152c1bcacc..0aa59787671c0 100644 --- a/Framework/Core/include/Framework/InputRecord.h +++ b/Framework/Core/include/Framework/InputRecord.h @@ -668,6 +668,11 @@ class InputRecord return mPosition; } + [[nodiscard]] auto parts() const + { + return mParent->parts(mPosition); + } + private: size_t mPosition; size_t mSize; @@ -688,32 +693,12 @@ class InputRecord using reference = typename BaseType::reference; using pointer = typename BaseType::pointer; using ElementType = typename std::remove_const::type; - using iterator = InputSpan::Iterator; - using const_iterator = InputSpan::Iterator; - InputRecordIterator(InputRecord const* parent, bool isEnd = false) : BaseType(parent, isEnd) { } - /// Initial indices for part-level iteration: first part starts at {headerIdx=0, payloadIdx=1}. - [[nodiscard]] DataRefIndices initialIndices() const { return {0, 1}; } - /// Sentinel used by nextIndicesGetter to signal end-of-slot. - [[nodiscard]] DataRefIndices endIndices() const { return {size_t(-1), size_t(-1)}; } - - /// Get element at the given raw message indices in O(1). - [[nodiscard]] ElementType getAtIndices(DataRefIndices indices) const - { - return this->parent()->getAtIndices(this->position(), indices); - } - - /// Advance @a current to the next part's indices in O(1). - [[nodiscard]] DataRefIndices nextIndices(DataRefIndices current) const - { - return this->parent()->nextIndices(this->position(), current); - } - - /// Check if slot is valid, index of part is not used + /// Check if slot is valid [[nodiscard]] bool isValid(size_t = 0) const { if (this->position() < this->parent()->size()) { @@ -722,21 +707,6 @@ class InputRecord return false; } - /// Get number of parts in input slot - [[nodiscard]] size_t size() const - { - return this->parent()->getNofParts(this->position()); - } - - [[nodiscard]] const_iterator begin() const - { - return const_iterator(this, size() == 0); - } - - [[nodiscard]] const_iterator end() const - { - return const_iterator(this, true); - } }; using iterator = InputRecordIterator; @@ -752,6 +722,24 @@ class InputRecord return {this, true}; } + /// A range over the parts of a single slot that sets ref.spec on each DataRef. + struct PartRange { + InputRecord const* record; + size_t slot; + + [[nodiscard]] DataRefIndices initialIndices() const { return {0, 1}; } + [[nodiscard]] DataRefIndices endIndices() const { return {size_t(-1), size_t(-1)}; } + [[nodiscard]] DataRef getAtIndices(DataRefIndices idx) const { return record->getAtIndices((int)slot, idx); } + [[nodiscard]] DataRefIndices nextIndices(DataRefIndices idx) const { return record->nextIndices((int)slot, idx); } + [[nodiscard]] size_t size() const { return record->getNofParts((int)slot); } + + [[nodiscard]] InputSpan::Iterator begin() const { return {this, size() == 0}; } + [[nodiscard]] InputSpan::Iterator end() const { return {this, true}; } + }; + + /// Return an iterable range over all parts in slot @a pos (DataRef objects have spec set). + [[nodiscard]] PartRange parts(size_t pos) const { return {this, pos}; } + InputSpan& span() { return mSpan; diff --git a/Framework/Core/include/Framework/InputRecordWalker.h b/Framework/Core/include/Framework/InputRecordWalker.h index 4d36a1f17bc82..d885fd704c974 100644 --- a/Framework/Core/include/Framework/InputRecordWalker.h +++ b/Framework/Core/include/Framework/InputRecordWalker.h @@ -73,16 +73,22 @@ class InputRecordWalker // the iterator over the input routes using input_iterator = decltype(std::declval().begin()); + // the range over parts in one slot — must be stored as a member so the + // part_iterator (which holds a pointer into it) does not dangle + using part_range = InputRecord::PartRange; + using part_iterator = InputSpan::Iterator; + Iterator() = delete; - Iterator(InputRecord& parent, input_iterator it, input_iterator end, std::vector const& filterSpecs) - : mParent(parent), mInputIterator(it), mEnd(end), mCurrent(mInputIterator.begin()), mFilterSpecs(filterSpecs) + Iterator(input_iterator it, input_iterator end, std::vector const& filterSpecs) + : mInputIterator(it), mEnd(end), + mCurrentRange(it.parts()), + mCurrent(mCurrentRange.begin()), + mFilterSpecs(filterSpecs) { next(true); } - ~Iterator() = default; - // prefix increment self_type& operator++() { @@ -104,9 +110,7 @@ class InputRecordWalker // comparison bool operator==(const self_type& other) const { - bool result = mInputIterator == other.mInputIterator; - result = result && mCurrent == other.mCurrent; - return result; + return mInputIterator == other.mInputIterator && mCurrent == other.mCurrent; } bool operator!=(const self_type& rh) const @@ -115,19 +119,13 @@ class InputRecordWalker } private: - // the iterator over the parts in one channel - using part_iterator = typename input_iterator::const_iterator; - bool next(bool isInitialPart = false) { + if (!isInitialPart) { + ++mCurrent; + } while (mInputIterator != mEnd) { - while (mCurrent != mInputIterator.end()) { - // increment on the level of one input - if (!isInitialPart && (mCurrent == mInputIterator.end() || ++mCurrent == mInputIterator.end())) { - // no more parts, go to next input - break; - } - isInitialPart = false; + for (; mCurrent != mCurrentRange.end(); ++mCurrent) { // check filter rules if (mFilterSpecs.size() > 0) { bool isSelected = false; @@ -143,15 +141,15 @@ class InputRecordWalker return true; } ++mInputIterator; - mCurrent = mInputIterator.begin(); - isInitialPart = true; + mCurrentRange = mInputIterator.parts(); + mCurrent = mCurrentRange.begin(); } // end loop over record return false; } - InputRecord& mParent; input_iterator mInputIterator; input_iterator mEnd; + part_range mCurrentRange; // declared before mCurrent — initialized first part_iterator mCurrent; std::vector const& mFilterSpecs; }; @@ -160,12 +158,12 @@ class InputRecordWalker const_iterator begin() const { - return const_iterator(mRecord, mRecord.begin(), mRecord.end(), mFilterSpecs); + return const_iterator(mRecord.begin(), mRecord.end(), mFilterSpecs); } const_iterator end() const { - return const_iterator(mRecord, mRecord.end(), mRecord.end(), mFilterSpecs); + return const_iterator(mRecord.end(), mRecord.end(), mFilterSpecs); } private: diff --git a/Framework/Core/include/Framework/InputSpan.h b/Framework/Core/include/Framework/InputSpan.h index dbe270f0e030d..d708d2e2f5dde 100644 --- a/Framework/Core/include/Framework/InputSpan.h +++ b/Framework/Core/include/Framework/InputSpan.h @@ -179,69 +179,40 @@ class InputSpan return mCurrentIndices.headerIdx; } + // return an iterable range over all parts in the current slot + // only available for slot-level iterators whose parent has parts(size_t) + [[nodiscard]] auto parts() const + requires requires(ParentType const* p, size_t i) { p->parts(i); } + { + return mParent->parts(mCurrentIndices.headerIdx); + } + private: ParentType const* mParent; DataRefIndices mCurrentIndices; ElementType mElement; }; - /// @class InputSpanIterator - /// An iterator over the input slots. - /// It supports an iterator interface to access the parts in the slot. - template - class InputSpanIterator : public Iterator - { - public: - using SelfType = InputSpanIterator; - using BaseType = Iterator; - using value_type = typename BaseType::value_type; - using reference = typename BaseType::reference; - using pointer = typename BaseType::pointer; - using ElementType = typename std::remove_const::type; - using iterator = Iterator; - using const_iterator = Iterator; - - InputSpanIterator(InputSpan const* parent, bool isEnd = false) - : BaseType(parent, isEnd) - { - } + /// A range over the parts of a single slot, supporting range-based for. + struct PartRange { + InputSpan const* span; + size_t slot; - /// Initial indices for part-level iteration: first part starts at {headerIdx=0, payloadIdx=1}. [[nodiscard]] DataRefIndices initialIndices() const { return {0, 1}; } - /// Sentinel used by nextIndicesGetter to signal end-of-slot. [[nodiscard]] DataRefIndices endIndices() const { return {size_t(-1), size_t(-1)}; } + [[nodiscard]] DataRef getAtIndices(DataRefIndices idx) const { return span->getAtIndices(slot, idx); } + [[nodiscard]] DataRefIndices nextIndices(DataRefIndices idx) const { return span->nextIndices(slot, idx); } + [[nodiscard]] size_t size() const { return span->getNofParts(slot); } - /// Get element at the given raw message indices in O(1). - [[nodiscard]] ElementType getAtIndices(DataRefIndices indices) const - { - return this->parent()->getAtIndices(this->position(), indices); - } - - /// Advance @a current to the next part's indices in O(1). - [[nodiscard]] DataRefIndices nextIndices(DataRefIndices current) const - { - return this->parent()->nextIndices(this->position(), current); - } - - /// Get number of parts in input slot - [[nodiscard]] size_t size() const - { - return this->parent()->getNofParts(this->position()); - } - - [[nodiscard]] const_iterator begin() const - { - return const_iterator(this, size() == 0); - } - - [[nodiscard]] const_iterator end() const - { - return const_iterator(this, true); - } + [[nodiscard]] Iterator begin() const { return {this, size() == 0}; } + [[nodiscard]] Iterator end() const { return {this, true}; } }; - using iterator = InputSpanIterator; - using const_iterator = InputSpanIterator; + /// Return an iterable range over all parts in slot @a i. + [[nodiscard]] PartRange parts(size_t i) const { return {this, i}; } + + using const_iterator = Iterator; + using iterator = const_iterator; // supporting read-only access and returning const_iterator [[nodiscard]] const_iterator begin() const diff --git a/Framework/Core/src/CompletionPolicyHelpers.cxx b/Framework/Core/src/CompletionPolicyHelpers.cxx index e5a91ae58f899..d2c63cbf4c90b 100644 --- a/Framework/Core/src/CompletionPolicyHelpers.cxx +++ b/Framework/Core/src/CompletionPolicyHelpers.cxx @@ -343,7 +343,7 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyWithAllConditions(const } if (canConsume || conditionMissing) { for (auto it = inputs.begin(), end = inputs.end(); it != end; ++it) { - for (auto const& ref : it) { + for (auto const& ref : it.parts()) { if (!framework::DataRefUtils::isValid(ref)) { continue; } diff --git a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx index d4ee776986184..6162d222649b9 100644 --- a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx +++ b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx @@ -1028,8 +1028,7 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name, return adaptStateless([lastDataProcessingHeader](InputRecord& inputs) { for (auto it = inputs.begin(); it != inputs.end(); it++) { - for (auto indices = it.initialIndices(); indices != it.endIndices(); indices = it.nextIndices(indices)) { - auto part = it.getAtIndices(indices); + for (auto const& part : it.parts()) { const auto* dph = o2::header::get(part.header); if (dph) { // FIXME: should we implement an assignment operator for DataProcessingHeader? @@ -1164,8 +1163,7 @@ DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const* name, // as forward routes but we need to keep a copy of the last DataProcessingHeader // for sending the EOS for (auto it = inputs.begin(); it != inputs.end(); it++) { - for (auto indices = it.initialIndices(); indices != it.endIndices(); indices = it.nextIndices(indices)) { - auto part = it.getAtIndices(indices); + for (auto const& part : it.parts()) { const auto* dph = o2::header::get(part.header); if (dph) { // FIXME: should we implement an assignment operator for DataProcessingHeader? diff --git a/Framework/Core/test/test_DataAllocator.cxx b/Framework/Core/test/test_DataAllocator.cxx index fefb6438b98d5..5b89feed8f3dd 100644 --- a/Framework/Core/test/test_DataAllocator.cxx +++ b/Framework/Core/test/test_DataAllocator.cxx @@ -220,9 +220,9 @@ DataProcessorSpec getSinkSpec() dumpStack(dh); if ((*iit).spec->binding == "inputMP") { - LOG(info) << "inputMP with " << iit.size() << " part(s)"; + LOG(info) << "inputMP with " << iit.parts().size() << " part(s)"; int nPart = 0; - for (auto const& ref : iit) { + for (auto const& ref : iit.parts()) { LOG(info) << "accessing part " << nPart++ << " of input slot 'inputMP':" << pc.inputs().get(ref); ASSERT_ERROR(pc.inputs().get(ref) == nPart * 10); @@ -407,8 +407,8 @@ DataProcessorSpec getSpectatorSinkSpec() << " payload size " << dh->payloadSize; if ((*iit).spec->binding == "inputMP") { - LOG(info) << "inputMP with " << iit.size() << " part(s)"; - for (auto const& ref : iit) { + LOG(info) << "inputMP with " << iit.parts().size() << " part(s)"; + for (auto const& ref : iit.parts()) { LOG(info) << "accessing part " << nPart << " of input slot 'inputMP':" << pc.inputs().get(ref); nPart++; diff --git a/Framework/Core/test/test_InputRecord.cxx b/Framework/Core/test/test_InputRecord.cxx index 355e52539ea5a..5dff09409325f 100644 --- a/Framework/Core/test/test_InputRecord.cxx +++ b/Framework/Core/test/test_InputRecord.cxx @@ -160,11 +160,11 @@ TEST_CASE("TestInputRecord") // the 2-level iterator to access inputs and their parts // all inputs have 1 part, we check the first input - REQUIRE(record.begin().size() == 1); + REQUIRE(record.begin().parts().size() == 1); // the end-instance of the inputs has no parts - REQUIRE(record.end().size() == 0); + REQUIRE(record.end().parts().size() == 0); // thus there is no element and begin == end - REQUIRE(record.end().begin() == record.end().end()); + REQUIRE(record.end().parts().begin() == record.end().parts().end()); } // TODO: diff --git a/Framework/Core/test/test_InputSpan.cxx b/Framework/Core/test/test_InputSpan.cxx index dc31085e741fd..041a14c8916a4 100644 --- a/Framework/Core/test/test_InputSpan.cxx +++ b/Framework/Core/test/test_InputSpan.cxx @@ -61,8 +61,9 @@ TEST_CASE("TestInputSpan") routeNo = 0; for (auto it = span.begin(), end = span.end(); it != end; ++it) { size_t partNo = 0; - REQUIRE(it.size() * 2 == inputs[routeNo].size()); - for (auto const& ref : it) { + auto parts = it.parts(); + REQUIRE(parts.size() * 2 == inputs[routeNo].size()); + for (auto const& ref : parts) { REQUIRE(inputs[routeNo].at(partNo++) == ref.header); REQUIRE(inputs[routeNo].at(partNo++) == ref.payload); INFO(ref.header << " " << ref.payload); diff --git a/Framework/Utils/include/DPLUtils/DPLRawParser.h b/Framework/Utils/include/DPLUtils/DPLRawParser.h index 5fa0775025deb..903d8667e12f7 100644 --- a/Framework/Utils/include/DPLUtils/DPLRawParser.h +++ b/Framework/Utils/include/DPLUtils/DPLRawParser.h @@ -105,14 +105,12 @@ class DPLRawParser Iterator() = delete; Iterator(InputRecord& parent, input_iterator it, input_iterator end, std::vector const& filterSpecs, fair::Severity sev = fair::Severity::alarm, size_t maxErrMsg = -1, size_t* cntErrMsg = nullptr) - : mParent(parent), mInputIterator(it), mEnd(end), mPartIterator(mInputIterator.begin()), mParser(std::make_unique(reinterpret_cast(&initializer), sizeof(initializer))), mCurrent(mParser->begin()), mFilterSpecs(filterSpecs), mMaxFailureMessages(maxErrMsg), mExtFailureCounter(cntErrMsg), mSeverity(sev) + : mParent(parent), mInputIterator(it), mEnd(end), mCurrentRange(it.parts()), mPartIterator(mCurrentRange.begin()), mParser(std::make_unique(reinterpret_cast(&initializer), sizeof(initializer))), mCurrent(mParser->begin()), mFilterSpecs(filterSpecs), mMaxFailureMessages(maxErrMsg), mExtFailureCounter(cntErrMsg), mSeverity(sev) { mParser.reset(); next(); } - ~Iterator() = default; - // prefix increment self_type& operator++() { @@ -205,7 +203,7 @@ class DPLRawParser friend std::ostream& operator<<(std::ostream& os, self_type const& it) { - if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mInputIterator.end() && it.mParser != nullptr) { + if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mCurrentRange.end() && it.mParser != nullptr) { os << it.mCurrent; } return os; @@ -223,7 +221,7 @@ class DPLRawParser friend std::ostream& operator<<(std::ostream& os, Fmt const& fmt) { auto const& it = fmt.it; - if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mInputIterator.end() && it.mParser != nullptr) { + if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mCurrentRange.end() && it.mParser != nullptr) { if constexpr (FmtCtrl == raw_parser::FormatSpec::Info) { // TODO: need to propagate the format spec also on the RawParser object // for now this operation prints the RDH version info and the table header @@ -236,8 +234,10 @@ class DPLRawParser } private: - // the iterator over the parts in one channel - using part_iterator = typename input_iterator::const_iterator; + // the range over parts in one slot — must be stored as a member so the + // part_iterator (which holds a pointer into it) does not dangle + using part_range = InputRecord::PartRange; + using part_iterator = InputSpan::Iterator; // the iterator over the over the parser pages using parser_iterator = typename parser_type::const_iterator; @@ -265,7 +265,7 @@ class DPLRawParser while (mInputIterator != mEnd) { bool isInitial = mParser == nullptr; - while (mPartIterator != mInputIterator.end()) { + while (mPartIterator != mCurrentRange.end()) { // first increment on the parser level if (mParser && mCurrent != mParser->end() && ++mCurrent != mParser->end()) { // we have an active parser and there is still data at the incremented iterator @@ -273,7 +273,7 @@ class DPLRawParser } // now increment on the level of one input mParser.reset(); - if (!isInitial && (mPartIterator == mInputIterator.end() || ++mPartIterator == mInputIterator.end())) { + if (!isInitial && ++mPartIterator == mCurrentRange.end()) { // no more parts, go to next input break; } @@ -312,7 +312,8 @@ class DPLRawParser } } // end loop over parts on one input ++mInputIterator; - mPartIterator = mInputIterator.begin(); + mCurrentRange = mInputIterator.parts(); + mPartIterator = mCurrentRange.begin(); } // end loop over inputs return false; } @@ -320,6 +321,7 @@ class DPLRawParser InputRecord& mParent; input_iterator mInputIterator; input_iterator mEnd; + part_range mCurrentRange; // declared before mPartIterator — initialized first part_iterator mPartIterator; std::unique_ptr mParser; parser_iterator mCurrent; diff --git a/Steer/DigitizerWorkflow/src/TPCDigitizerSpec.cxx b/Steer/DigitizerWorkflow/src/TPCDigitizerSpec.cxx index 68476c3a92a6d..3737a1e84e26f 100644 --- a/Steer/DigitizerWorkflow/src/TPCDigitizerSpec.cxx +++ b/Steer/DigitizerWorkflow/src/TPCDigitizerSpec.cxx @@ -260,7 +260,7 @@ class TPCDPLDigitizerTask : public BaseDPLDigitizer } for (auto it = pc.inputs().begin(), end = pc.inputs().end(); it != end; ++it) { - for (auto const& inputref : it) { + for (auto const& inputref : it.parts()) { if (inputref.spec->lifetime == o2::framework::Lifetime::Condition) { // process does not need conditions continue; } diff --git a/Utilities/DataSampling/src/Dispatcher.cxx b/Utilities/DataSampling/src/Dispatcher.cxx index 22dd457a1211a..362ef73ecef7e 100644 --- a/Utilities/DataSampling/src/Dispatcher.cxx +++ b/Utilities/DataSampling/src/Dispatcher.cxx @@ -137,7 +137,7 @@ void Dispatcher::run(ProcessingContext& ctx) if (auto route = policy->match(inputMatcher); route != nullptr && policy->decide(firstPart)) { auto routeAsConcreteDataType = DataSpecUtils::asConcreteDataTypeMatcher(*route); auto dsheader = prepareDataSamplingHeader(*policy, *firstInputHeader); - for (const auto& part : inputIt) { + for (const auto& part : inputIt.parts()) { if (part.header != nullptr) { // We copy every header which is not DataHeader or DataProcessingHeader, // so that custom data-dependent headers are passed forward,