From bbe4d337e3ed1295e31200ec84d312110b311ff8 Mon Sep 17 00:00:00 2001 From: Remy Prechelt <prechelt@hawaii.edu> Date: Sat, 24 Oct 2020 23:28:23 -1000 Subject: [PATCH] Base Parquet implementation for ObservationPlane. This also further refines the YAML writing for processes, and the base interfaces. --- Documentation/Examples/CMakeLists.txt | 1 + Documentation/Examples/vertical_EAS.cc | 9 ++++ Outputs/BaseOutput.h | 1 - Outputs/CMakeLists.txt | 1 + Outputs/ObservationPlaneWriterParquet.h | 61 ++++++++++++++++--------- Outputs/OutputManager.h | 7 +-- Outputs/{helpers => }/ParquetStreamer.h | 42 ++++++++++++----- ThirdParty/CMakeLists.txt | 26 ++++------- 8 files changed, 94 insertions(+), 54 deletions(-) rename Outputs/{helpers => }/ParquetStreamer.h (66%) diff --git a/Documentation/Examples/CMakeLists.txt b/Documentation/Examples/CMakeLists.txt index f9cd5ca73..93cfee7b1 100644 --- a/Documentation/Examples/CMakeLists.txt +++ b/Documentation/Examples/CMakeLists.txt @@ -116,6 +116,7 @@ if (Pythia8_FOUND) CORSIKAgeometry CORSIKAoutput CORSIKAenvironment + CORSIKAthirdparty CORSIKAprocesssequence CORSIKAhistory # for HistoryObservationPlane ) diff --git a/Documentation/Examples/vertical_EAS.cc b/Documentation/Examples/vertical_EAS.cc index a4b3dd66c..79060edb3 100644 --- a/Documentation/Examples/vertical_EAS.cc +++ b/Documentation/Examples/vertical_EAS.cc @@ -255,8 +255,17 @@ int main(int argc, char** argv) { // to fix the point of first interaction, uncomment the following two lines: // EAS.forceInteraction(); + // start a new run + outputs.StartOfRun(); + EAS.Run(); + // and end this run + outputs.EndOfRun(); + + eLoss.PrintProfile(); // print longitudinal profile + conexSource.SolveCE(); + cut.ShowResults(); em_continuous.ShowResults(); observationLevel.ShowResults(); diff --git a/Outputs/BaseOutput.h b/Outputs/BaseOutput.h index e5813eff1..53f01347b 100644 --- a/Outputs/BaseOutput.h +++ b/Outputs/BaseOutput.h @@ -51,7 +51,6 @@ namespace corsika::output { /** * Get the configuration of this output. */ - // virtual void WriteConfig(YAML::Emitter&) const = 0; virtual YAML::Node GetConfig() const = 0; }; diff --git a/Outputs/CMakeLists.txt b/Outputs/CMakeLists.txt index 7b0b3192a..d8829a82d 100644 --- a/Outputs/CMakeLists.txt +++ b/Outputs/CMakeLists.txt @@ -2,6 +2,7 @@ MODEL_HEADERS BaseOutput.h OutputManager.h + ParquetStreamer.h ObservationPlaneWriterParquet.h ) diff --git a/Outputs/ObservationPlaneWriterParquet.h b/Outputs/ObservationPlaneWriterParquet.h index 3771cd63a..3da21460c 100644 --- a/Outputs/ObservationPlaneWriterParquet.h +++ b/Outputs/ObservationPlaneWriterParquet.h @@ -9,16 +9,13 @@ #pragma once #include <corsika/output/BaseOutput.h> +#include <corsika/output/ParquetStreamer.h> #include <corsika/particles/ParticleProperties.h> #include <corsika/units/PhysicalUnits.h> -#include <arrow/io/file.h> -#include <parquet/arrow/schema.h> -#include <parquet/stream_writer.h> - namespace corsika::output { - class ObservationPlaneWriterParquet : public BaseOutput, private ParquetStreamer { + class ObservationPlaneWriterParquet : public BaseOutput { public: /** @@ -27,50 +24,72 @@ namespace corsika::output { * @param name The name of this output. */ ObservationPlaneWriterParquet(std::string const& name) - : name_(name){}; + : name_(name) + , event_(0){}; /** * Called at the start of each run. */ void StartOfRun(std::filesystem::path const& directory) final { - // (directory / "particles.parquet").string() - - // construct the schema - auto schema = arrow::schema({arrow::field("pdg", arrow::int64()), - arrow::field("energy", arrow::float64()), - arrow::field("radius", arrow::float64())}); - - auto properties = builder.build(); + // setup the streamer + streamer_.Init((directory / "particles.parquet").string()); + + // build the schema + streamer_.AddField("event", parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_32); + streamer_.AddField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_32); + streamer_.AddField("energy", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, + parquet::ConvertedType::NONE); + streamer_.AddField("radius", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, + parquet::ConvertedType::NONE); + + // and build the streamer + streamer_.Build(); } /** * Called at the start of each event/shower. */ - void StartOfEvent() final {} + void StartOfEvent() final { ++event_; + } /** * Called at the end of each event/shower. */ - void EndOfEvent() final {} + void EndOfEvent() final { + } /** * Called at the end of each run. */ - void EndOfRun() final {} + void EndOfRun() final { streamer_.Close(); + } protected: + /** + * Write a particle to the file. + */ void Write(particles::Code const& pid, units::si::HEPEnergyType const& energy, units::si::LengthType const& distance) { - // outputStream_ << static_cast<int>(particles::GetPDG(pid)) << ' ' - // << particle.GetEnergy() / 1_eV << ' ' - // << (trajectory.GetPosition(1) - plane_.GetCenter()).norm() / 1_m - // << std::endl; + using namespace units::si; + + // write the next row + writer_ << event_ << static_cast<int>(particles::GetPDG(pid)) << energy / 1_eV + << distance / 1_m << parquet::EndRow; + } + std::string const name_; ///< The name of this output. + private: + int event_; ///< The current event number we are processing. + ParquetStreamer streamer_; ///< A parquet stream writer helper + parquet::StreamWriter writer_; ///< The writer for this file. + }; // class ObservationPlaneWriterParquet } // namespace corsika::output diff --git a/Outputs/OutputManager.h b/Outputs/OutputManager.h index 55afbdec6..391a3801a 100644 --- a/Outputs/OutputManager.h +++ b/Outputs/OutputManager.h @@ -156,7 +156,7 @@ namespace corsika::output { /** * Called at the start of each run. */ - void StartOfRun() { + void StartOfRun() { for (auto& [name, output] : outputs_) { // construct the path to this output subdirectory @@ -173,7 +173,7 @@ namespace corsika::output { /** * Called at the start of each event/shower. */ - void StartOfEvent() { + void StartOfEvent() { // if this is called but we are still in the initialized state, // make sure that we transition to RunInProgress @@ -186,7 +186,8 @@ namespace corsika::output { /** * Called at the end of each event/shower. */ - void EndOfEvent() { + void EndOfEvent() { + for (auto& [name, output] : outputs_) { output.get().EndOfEvent(); } } diff --git a/Outputs/helpers/ParquetStreamer.h b/Outputs/ParquetStreamer.h similarity index 66% rename from Outputs/helpers/ParquetStreamer.h rename to Outputs/ParquetStreamer.h index ba3c8e9cd..4914efa81 100644 --- a/Outputs/helpers/ParquetStreamer.h +++ b/Outputs/ParquetStreamer.h @@ -8,7 +8,16 @@ #pragma once -namespace corsika::output::helper { +#include <string> + +// NOTE: the order of these includes is *important* +// you will get unhelpful compiler errors about unknown +// operator definitions if these are reordered +#include <parquet/stream_writer.h> +#include <parquet/arrow/schema.h> +#include <arrow/io/file.h> + +namespace corsika::output { /** * This class automates the construction of simple tabular @@ -17,21 +26,23 @@ namespace corsika::output::helper { class ParquetStreamer final { public: - ParquetStreamer(std::string const& filepath) { + ParquetStreamer() {} + + void Init(std::string const& filepath) { // open the file and connect it to our pointer PARQUET_ASSIGN_OR_THROW(outfile_, arrow::io::FileOutputStream::Open(filepath)); // the default builder settings builder_.created_by("CORSIKA8"); - }; + } /** * Add a field to this streamer. */ - template <typename TField> - void AddField(T const& field) { - nodes_.push_back(field); + template <typename... TArgs> + void AddField(TArgs&&... args) { + nodes_.push_back(parquet::schema::PrimitiveNode::Make(args...)); } /** @@ -46,17 +57,26 @@ namespace corsika::output::helper { // and build the writer writer_ = parquet::StreamWriter( - parquet::ParquetFileWriter::Open(outfile_, schema, builder_->build())); + parquet::ParquetFileWriter::Open(outfile_, schema, builder_.build())); } - protected: + /** + * Get a reference to the writer for this stream. + */ + parquet::StreamWriter& GetWriter() { return writer_; } + + /** + * Close the file. + */ + void Close() { outfile_->Close(); } + + /// + private: parquet::StreamWriter writer_; parquet::StreamWriter stream_; ///< The stream writer to 'outfile' parquet::WriterProperties::Builder builder_; ///< The writer properties builder. - /// - private: parquet::schema::NodeVector nodes_; std::shared_ptr<arrow::io::FileOutputStream> outfile_; ///< The output file. }; // class ParquetHelper -} // namespace corsika::output::helper +} // namespace corsika::output diff --git a/ThirdParty/CMakeLists.txt b/ThirdParty/CMakeLists.txt index f27fa5ecf..7fd37d2e8 100644 --- a/ThirdParty/CMakeLists.txt +++ b/ThirdParty/CMakeLists.txt @@ -326,7 +326,10 @@ endif (ZLIB_FOUND) ########## YAML CPP ########## # add YAML-CPP as a SYSTEM directory to ignore internal warnings/errors -target_include_directories(CORSIKAthirdparty SYSTEM INTERFACE yaml-cpp/include/) +target_include_directories(CORSIKAthirdparty + SYSTEM + INTERFACE + "${CMAKE_CURRENT_SOURCE_DIR}/yaml-cpp/include/") # disable the unecessary parts of the build set(YAML_CPP_BUILD_TOOLS OFF CACHE BOOL "Disable building YAML parsing tools.") @@ -338,24 +341,11 @@ target_link_libraries(CORSIKAthirdparty INTERFACE yaml-cpp) ########## Apache Arrow ########## -# add_subdirectory(arrow/cpp/) - -# set(ARROW_COMPUTE OFF) -# set(ARROW_CSV OFF) -# set(ARROW_CUDA OFF) -# set(ARROW_FLIGHT OFF) -# set(ARROW_GANDIVA OFF) -# set(ARROW_HDFS OFF) -# set(ARROW_PLASMA OFF) -# target_link_libraries(CORSIKAthirdparty INTERFACE arrow) +find_package(Arrow REQUIRED CONFIG) +message(STATUS "Arrow version: ${ARROW_VERSION}") -target_include_directories(CORSIKAthirdparty - INTERFACE - "/home/rprechelt/.local/lib/python3.8/site-packages/pyarrow/include") -target_link_directories(CORSIKAthirdparty - INTERFACE - "/home/rprechelt/.local/lib/python3.8/site-packages/pyarrow/") target_link_libraries(CORSIKAthirdparty INTERFACE - arrow) + arrow + parquet) -- GitLab