IAP GITLAB

Skip to content
Snippets Groups Projects
Commit bbe4d337 authored by Remy Prechelt's avatar Remy Prechelt
Browse files

Base Parquet implementation for ObservationPlane.

This also further refines the YAML writing for processes,
and the base interfaces.
parent 79173d1b
No related branches found
No related tags found
1 merge request!317Output infrastructure and Python analysis library.
......@@ -116,6 +116,7 @@ if (Pythia8_FOUND)
CORSIKAgeometry
CORSIKAoutput
CORSIKAenvironment
CORSIKAthirdparty
CORSIKAprocesssequence
CORSIKAhistory # for HistoryObservationPlane
)
......
......@@ -255,8 +255,17 @@ int main(int argc, char** argv) {
// to fix the point of first interaction, uncomment the following two lines:
// EAS.forceInteraction();
// start a new run
outputs.StartOfRun();
EAS.Run();
// and end this run
outputs.EndOfRun();
eLoss.PrintProfile(); // print longitudinal profile
conexSource.SolveCE();
cut.ShowResults();
em_continuous.ShowResults();
observationLevel.ShowResults();
......
......@@ -51,7 +51,6 @@ namespace corsika::output {
/**
* Get the configuration of this output.
*/
// virtual void WriteConfig(YAML::Emitter&) const = 0;
virtual YAML::Node GetConfig() const = 0;
};
......
......@@ -2,6 +2,7 @@
MODEL_HEADERS
BaseOutput.h
OutputManager.h
ParquetStreamer.h
ObservationPlaneWriterParquet.h
)
......
......@@ -9,16 +9,13 @@
#pragma once
#include <corsika/output/BaseOutput.h>
#include <corsika/output/ParquetStreamer.h>
#include <corsika/particles/ParticleProperties.h>
#include <corsika/units/PhysicalUnits.h>
#include <arrow/io/file.h>
#include <parquet/arrow/schema.h>
#include <parquet/stream_writer.h>
namespace corsika::output {
class ObservationPlaneWriterParquet : public BaseOutput, private ParquetStreamer {
class ObservationPlaneWriterParquet : public BaseOutput {
public:
/**
......@@ -27,50 +24,72 @@ namespace corsika::output {
* @param name The name of this output.
*/
ObservationPlaneWriterParquet(std::string const& name)
: name_(name){};
: name_(name)
, event_(0){};
/**
* Called at the start of each run.
*/
void StartOfRun(std::filesystem::path const& directory) final {
// (directory / "particles.parquet").string()
// construct the schema
auto schema = arrow::schema({arrow::field("pdg", arrow::int64()),
arrow::field("energy", arrow::float64()),
arrow::field("radius", arrow::float64())});
auto properties = builder.build();
// setup the streamer
streamer_.Init((directory / "particles.parquet").string());
// build the schema
streamer_.AddField("event", parquet::Repetition::REQUIRED, parquet::Type::INT32,
parquet::ConvertedType::INT_32);
streamer_.AddField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32,
parquet::ConvertedType::INT_32);
streamer_.AddField("energy", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE);
streamer_.AddField("radius", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE);
// and build the streamer
streamer_.Build();
}
/**
* Called at the start of each event/shower.
*/
void StartOfEvent() final {}
void StartOfEvent() final { ++event_;
}
/**
* Called at the end of each event/shower.
*/
void EndOfEvent() final {}
void EndOfEvent() final {
}
/**
* Called at the end of each run.
*/
void EndOfRun() final {}
void EndOfRun() final { streamer_.Close();
}
protected:
/**
* Write a particle to the file.
*/
void Write(particles::Code const& pid, units::si::HEPEnergyType const& energy,
units::si::LengthType const& distance) {
// outputStream_ << static_cast<int>(particles::GetPDG(pid)) << ' '
// << particle.GetEnergy() / 1_eV << ' '
// << (trajectory.GetPosition(1) - plane_.GetCenter()).norm() / 1_m
// << std::endl;
using namespace units::si;
// write the next row
writer_ << event_ << static_cast<int>(particles::GetPDG(pid)) << energy / 1_eV
<< distance / 1_m << parquet::EndRow;
}
std::string const name_; ///< The name of this output.
private:
int event_; ///< The current event number we are processing.
ParquetStreamer streamer_; ///< A parquet stream writer helper
parquet::StreamWriter writer_; ///< The writer for this file.
}; // class ObservationPlaneWriterParquet
} // namespace corsika::output
......@@ -156,7 +156,7 @@ namespace corsika::output {
/**
* Called at the start of each run.
*/
void StartOfRun() {
void StartOfRun() {
for (auto& [name, output] : outputs_) {
// construct the path to this output subdirectory
......@@ -173,7 +173,7 @@ namespace corsika::output {
/**
* Called at the start of each event/shower.
*/
void StartOfEvent() {
void StartOfEvent() {
// if this is called but we are still in the initialized state,
// make sure that we transition to RunInProgress
......@@ -186,7 +186,8 @@ namespace corsika::output {
/**
* Called at the end of each event/shower.
*/
void EndOfEvent() {
void EndOfEvent() {
for (auto& [name, output] : outputs_) { output.get().EndOfEvent(); }
}
......
......@@ -8,7 +8,16 @@
#pragma once
namespace corsika::output::helper {
#include <string>
// NOTE: the order of these includes is *important*
// you will get unhelpful compiler errors about unknown
// operator definitions if these are reordered
#include <parquet/stream_writer.h>
#include <parquet/arrow/schema.h>
#include <arrow/io/file.h>
namespace corsika::output {
/**
* This class automates the construction of simple tabular
......@@ -17,21 +26,23 @@ namespace corsika::output::helper {
class ParquetStreamer final {
public:
ParquetStreamer(std::string const& filepath) {
ParquetStreamer() {}
void Init(std::string const& filepath) {
// open the file and connect it to our pointer
PARQUET_ASSIGN_OR_THROW(outfile_, arrow::io::FileOutputStream::Open(filepath));
// the default builder settings
builder_.created_by("CORSIKA8");
};
}
/**
* Add a field to this streamer.
*/
template <typename TField>
void AddField(T const& field) {
nodes_.push_back(field);
template <typename... TArgs>
void AddField(TArgs&&... args) {
nodes_.push_back(parquet::schema::PrimitiveNode::Make(args...));
}
/**
......@@ -46,17 +57,26 @@ namespace corsika::output::helper {
// and build the writer
writer_ = parquet::StreamWriter(
parquet::ParquetFileWriter::Open(outfile_, schema, builder_->build()));
parquet::ParquetFileWriter::Open(outfile_, schema, builder_.build()));
}
protected:
/**
* Get a reference to the writer for this stream.
*/
parquet::StreamWriter& GetWriter() { return writer_; }
/**
* Close the file.
*/
void Close() { outfile_->Close(); }
///
private:
parquet::StreamWriter writer_;
parquet::StreamWriter stream_; ///< The stream writer to 'outfile'
parquet::WriterProperties::Builder builder_; ///< The writer properties builder.
///
private:
parquet::schema::NodeVector nodes_;
std::shared_ptr<arrow::io::FileOutputStream> outfile_; ///< The output file.
}; // class ParquetHelper
} // namespace corsika::output::helper
} // namespace corsika::output
......@@ -326,7 +326,10 @@ endif (ZLIB_FOUND)
########## YAML CPP ##########
# add YAML-CPP as a SYSTEM directory to ignore internal warnings/errors
target_include_directories(CORSIKAthirdparty SYSTEM INTERFACE yaml-cpp/include/)
target_include_directories(CORSIKAthirdparty
SYSTEM
INTERFACE
"${CMAKE_CURRENT_SOURCE_DIR}/yaml-cpp/include/")
# disable the unecessary parts of the build
set(YAML_CPP_BUILD_TOOLS OFF CACHE BOOL "Disable building YAML parsing tools.")
......@@ -338,24 +341,11 @@ target_link_libraries(CORSIKAthirdparty INTERFACE yaml-cpp)
########## Apache Arrow ##########
# add_subdirectory(arrow/cpp/)
# set(ARROW_COMPUTE OFF)
# set(ARROW_CSV OFF)
# set(ARROW_CUDA OFF)
# set(ARROW_FLIGHT OFF)
# set(ARROW_GANDIVA OFF)
# set(ARROW_HDFS OFF)
# set(ARROW_PLASMA OFF)
# target_link_libraries(CORSIKAthirdparty INTERFACE arrow)
find_package(Arrow REQUIRED CONFIG)
message(STATUS "Arrow version: ${ARROW_VERSION}")
target_include_directories(CORSIKAthirdparty
INTERFACE
"/home/rprechelt/.local/lib/python3.8/site-packages/pyarrow/include")
target_link_directories(CORSIKAthirdparty
INTERFACE
"/home/rprechelt/.local/lib/python3.8/site-packages/pyarrow/")
target_link_libraries(CORSIKAthirdparty
INTERFACE
arrow)
arrow
parquet)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment