IAP GITLAB

Skip to content
Snippets Groups Projects
Commit 74063a6f authored by Remy Prechelt's avatar Remy Prechelt
Browse files

Change member to private inheritance.

This is the first commit to generate correct, readable Parquet files.
parent f32d4da9
No related branches found
No related tags found
No related merge requests found
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
namespace corsika::output { namespace corsika::output {
class ObservationPlaneWriterParquet : public BaseOutput { class ObservationPlaneWriterParquet : public BaseOutput, private ParquetStreamer {
public: public:
/** /**
...@@ -24,7 +24,8 @@ namespace corsika::output { ...@@ -24,7 +24,8 @@ namespace corsika::output {
* @param name The name of this output. * @param name The name of this output.
*/ */
ObservationPlaneWriterParquet(std::string const& name) ObservationPlaneWriterParquet(std::string const& name)
: name_(name) : ParquetStreamer()
, name_(name)
, event_(0){}; , event_(0){};
/** /**
...@@ -33,20 +34,21 @@ namespace corsika::output { ...@@ -33,20 +34,21 @@ namespace corsika::output {
void StartOfRun(std::filesystem::path const& directory) final { void StartOfRun(std::filesystem::path const& directory) final {
// setup the streamer // setup the streamer
streamer_.Init((directory / "particles.parquet").string()); InitStreamer((directory / "particles.parquet").string());
// build the schema // build the schema
streamer_.AddField("event", parquet::Repetition::REQUIRED, parquet::Type::INT32, AddField("event", parquet::Repetition::REQUIRED, parquet::Type::INT32,
parquet::ConvertedType::INT_32); parquet::ConvertedType::INT_32);
streamer_.AddField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, AddField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32,
parquet::ConvertedType::INT_32); parquet::ConvertedType::INT_32);
streamer_.AddField("energy", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, AddField("energy", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE,
parquet::ConvertedType::NONE); parquet::ConvertedType::NONE);
streamer_.AddField("radius", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, AddField("radius", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE,
parquet::ConvertedType::NONE); parquet::ConvertedType::NONE);
// and build the streamer // and build the streamer
streamer_.Build(); BuildStreamer();
} }
/** /**
...@@ -62,7 +64,7 @@ namespace corsika::output { ...@@ -62,7 +64,7 @@ namespace corsika::output {
/** /**
* Called at the end of each run. * Called at the end of each run.
*/ */
void EndOfRun() final { streamer_.Close(); } void EndOfRun() final { writer_.reset(); outfile_->Close(); }
/** /**
* Get final text outputs for the config file. * Get final text outputs for the config file.
...@@ -75,20 +77,17 @@ namespace corsika::output { ...@@ -75,20 +77,17 @@ namespace corsika::output {
*/ */
void Write(particles::Code const& pid, units::si::HEPEnergyType const& energy, void Write(particles::Code const& pid, units::si::HEPEnergyType const& energy,
units::si::LengthType const& distance) { units::si::LengthType const& distance) {
using namespace units::si; using namespace units::si;
// write the next row // write the next row
writer_ << event_ << static_cast<int>(particles::GetPDG(pid)) << energy / 1_eV (*writer_) << event_ << static_cast<int>(particles::GetPDG(pid)) << energy / 1_eV
<< distance / 1_m << parquet::EndRow; << distance / 1_m << parquet::EndRow;
} }
std::string const name_; ///< The name of this output. std::string const name_; ///< The name of this output.
private: private:
int event_; ///< The current event number we are processing. int event_; ///< The current event number we are processing.
ParquetStreamer streamer_; ///< A parquet stream writer helper
parquet::StreamWriter writer_; ///< The writer for this file.
}; // class ObservationPlaneWriterParquet }; // class ObservationPlaneWriterParquet
......
...@@ -21,6 +21,9 @@ namespace corsika::output { ...@@ -21,6 +21,9 @@ namespace corsika::output {
*/ */
class OutputManager final { class OutputManager final {
/**
* Indicates the current state of this manager.
*/
enum class OutputState { enum class OutputState {
RunNoInit, RunNoInit,
RunInitialized, RunInitialized,
...@@ -31,7 +34,7 @@ namespace corsika::output { ...@@ -31,7 +34,7 @@ namespace corsika::output {
OutputState state_{OutputState::RunNoInit}; ///< The current state of this manager. OutputState state_{OutputState::RunNoInit}; ///< The current state of this manager.
std::string const name_; ///< The name of this simulation file. std::string const name_; ///< The name of this simulation file.
std::filesystem::path const root_; ///< The top-level directory for the output. std::filesystem::path const root_; ///< The top-level directory for the output.
inline static auto logger{logging::GetLogger("output_manager")}; ///< A custom logger. inline static auto logger{logging::GetLogger("output")}; ///< A custom logger.
/** /**
* The outputs that have been registered with us. * The outputs that have been registered with us.
...@@ -156,7 +159,7 @@ namespace corsika::output { ...@@ -156,7 +159,7 @@ namespace corsika::output {
/** /**
* Called at the start of each run. * Called at the start of each run.
*/ */
void StartOfRun() { void StartOfRun() {
for (auto& [name, output] : outputs_) { for (auto& [name, output] : outputs_) {
// construct the path to this output subdirectory // construct the path to this output subdirectory
...@@ -173,7 +176,7 @@ namespace corsika::output { ...@@ -173,7 +176,7 @@ namespace corsika::output {
/** /**
* Called at the start of each event/shower. * Called at the start of each event/shower.
*/ */
void StartOfEvent() { void StartOfEvent() {
// if this is called but we are still in the initialized state, // if this is called but we are still in the initialized state,
// make sure that we transition to RunInProgress // make sure that we transition to RunInProgress
...@@ -186,7 +189,7 @@ namespace corsika::output { ...@@ -186,7 +189,7 @@ namespace corsika::output {
/** /**
* Called at the end of each event/shower. * Called at the end of each event/shower.
*/ */
void EndOfEvent() { void EndOfEvent() {
for (auto& [name, output] : outputs_) { output.get().EndOfEvent(); } for (auto& [name, output] : outputs_) { output.get().EndOfEvent(); }
} }
...@@ -195,10 +198,15 @@ namespace corsika::output { ...@@ -195,10 +198,15 @@ namespace corsika::output {
* Called at the end of each run. * Called at the end of each run.
*/ */
void EndOfRun() { void EndOfRun() {
for (auto& [name, output] : outputs_) { output.get().EndOfRun(); } for (auto& [name, output] : outputs_) {
output.get().EndOfRun();
}
// and the run has finished // and the run has finished
state_ = OutputState::RunFinished; state_ = OutputState::RunFinished;
// write any final state information into the config files
// for (auto& [name, output] : outputs_) { output.get().EndOfRun(); }
} }
}; // class OutputManager }; // class OutputManager
......
/* /*
* (c) Copyright 2019 CORSIKA Project, corsika-project@lists.kit.edu * (c) Copyright 2020 CORSIKA Project, corsika-project@lists.kit.edu
* *
* This software is distributed under the terms of the GNU General Public * This software is distributed under the terms of the GNU General Public
* Licence version 3 (GPL Version 3). See file LICENSE for a full version of * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
...@@ -23,12 +23,15 @@ namespace corsika::output { ...@@ -23,12 +23,15 @@ namespace corsika::output {
* This class automates the construction of simple tabular * This class automates the construction of simple tabular
* Parquet files using the parquet::StreamWriter. * Parquet files using the parquet::StreamWriter.
*/ */
class ParquetStreamer final { class ParquetStreamer {
public: public:
ParquetStreamer() {} ParquetStreamer() = default;
void Init(std::string const& filepath) { /**
* Initialize the streamer to write to a given file.
*/
void InitStreamer(std::string const& filepath) {
// open the file and connect it to our pointer // open the file and connect it to our pointer
PARQUET_ASSIGN_OR_THROW(outfile_, arrow::io::FileOutputStream::Open(filepath)); PARQUET_ASSIGN_OR_THROW(outfile_, arrow::io::FileOutputStream::Open(filepath));
...@@ -42,40 +45,29 @@ namespace corsika::output { ...@@ -42,40 +45,29 @@ namespace corsika::output {
*/ */
template <typename... TArgs> template <typename... TArgs>
void AddField(TArgs&&... args) { void AddField(TArgs&&... args) {
nodes_.push_back(parquet::schema::PrimitiveNode::Make(args...)); fields_.push_back(parquet::schema::PrimitiveNode::Make(args...));
} }
/** /**
* Finalize the streamer construction. * Finalize the streamer construction.
*/ */
void Build() { void BuildStreamer() {
// build the top level schema // build the top level schema
auto schema = std::static_pointer_cast<parquet::schema::GroupNode>( schema_ = std::static_pointer_cast<parquet::schema::GroupNode>(
parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED,
nodes_)); fields_));
// and build the writer // and build the writer
writer_ = parquet::StreamWriter( writer_ = std::make_shared<parquet::StreamWriter>(
parquet::ParquetFileWriter::Open(outfile_, schema, builder_.build())); parquet::ParquetFileWriter::Open(outfile_, schema_, builder_.build()));
} }
/** protected:
* Get a reference to the writer for this stream. std::shared_ptr<parquet::StreamWriter> writer_; ///< The stream writer to 'outfile'
*/
parquet::StreamWriter& GetWriter() { return writer_; }
/**
* Close the file.
*/
void Close() { outfile_->Close(); }
///
private:
parquet::StreamWriter writer_;
parquet::StreamWriter stream_; ///< The stream writer to 'outfile'
parquet::WriterProperties::Builder builder_; ///< The writer properties builder. parquet::WriterProperties::Builder builder_; ///< The writer properties builder.
parquet::schema::NodeVector nodes_; parquet::schema::NodeVector fields_; ///< The fields in this file.
std::shared_ptr<parquet::schema::GroupNode> schema_; ///< The schema for this file.
std::shared_ptr<arrow::io::FileOutputStream> outfile_; ///< The output file. std::shared_ptr<arrow::io::FileOutputStream> outfile_; ///< The output file.
}; // class ParquetHelper }; // class ParquetHelper
......
...@@ -35,10 +35,10 @@ namespace corsika::process::observation_plane { ...@@ -35,10 +35,10 @@ namespace corsika::process::observation_plane {
: TOutputWriter(args...) : TOutputWriter(args...)
, plane_(plane) , plane_(plane)
, deleteOnHit_(deleteOnHit) , deleteOnHit_(deleteOnHit)
, energy_ground_(0_GeV) , energy_ground_(units::si::HEPEnergyType::zero())
, count_ground_(0) {} , count_ground_(0) {}
process::EProcessReturn DoContinuous(setup::Stack::ParticleType const& particle, process::EProcessReturn DoContinuous(setup::Stack::ParticleType& particle,
setup::Trajectory const& trajectory) { setup::Trajectory const& trajectory) {
using namespace units::si; using namespace units::si;
...@@ -53,15 +53,23 @@ namespace corsika::process::observation_plane { ...@@ -53,15 +53,23 @@ namespace corsika::process::observation_plane {
return process::EProcessReturn::eOk; return process::EProcessReturn::eOk;
} }
const auto energy = particle.GetEnergy();
// write the data to the output // write the data to the output
this->Write(particle.GetPID(), particle.GetEnergy(), this->Write(particle.GetPID(), energy,
(trajectory.GetPosition(1) - plane_.GetCenter()).norm()); (trajectory.GetPosition(1) - plane_.GetCenter()).norm());
if (deleteOnHit_) { return process::EProcessReturn::eParticleAbsorbed; } if (deleteOnHit_) {
return process::EProcessReturn::eOk; count_ground_++;
energy_ground_ += energy;
particle.Delete();
return process::EProcessReturn::eParticleAbsorbed;
} else {
return process::EProcessReturn::eOk;
}
} }
units::si::LengthType MaxStepLength(setup::Stack::ParticleType const&, units::si::LengthType MaxStepLength(setup::Stack::ParticleType& particle,
setup::Trajectory const& trajectory) { setup::Trajectory const& trajectory) {
using namespace units::si; using namespace units::si;
...@@ -74,7 +82,9 @@ namespace corsika::process::observation_plane { ...@@ -74,7 +82,9 @@ namespace corsika::process::observation_plane {
} }
auto const pointOfIntersection = trajectory.GetPosition(timeOfIntersection); auto const pointOfIntersection = trajectory.GetPosition(timeOfIntersection);
return (trajectory.GetR0() - pointOfIntersection).norm() * 1.0001; auto dist = (trajectory.GetR0() - pointOfIntersection).norm() * 1.0001;
C8LOG_TRACE("ObservationPlane::MaxStepLength l={} m", dist / 1_m);
return dist;
} }
YAML::Node GetConfig() const { YAML::Node GetConfig() const {
...@@ -105,20 +115,22 @@ namespace corsika::process::observation_plane { ...@@ -105,20 +115,22 @@ namespace corsika::process::observation_plane {
return node; return node;
} }
void ObservationPlane::ShowResults() const { void ShowResults() const {
C8LOG_INFO( using namespace units::si;
" ******************************\n" C8LOG_INFO(
" ObservationPlane: \n" " ******************************\n"
" energy in ground (GeV) : {}\n" " ObservationPlane: \n"
" no. of particles in ground : {}\n" " energy in ground (GeV) : {}\n"
" ******************************", " no. of particles in ground : {}\n"
energy_ground_ / 1_GeV, count_ground_); " ******************************",
} energy_ground_ / 1_GeV, count_ground_);
}
void ObservationPlane::Reset() {
energy_ground_ = 0_GeV; void Reset() {
count_ground_ = 0; using namespace units::si;
} energy_ground_ = 0_GeV;
count_ground_ = 0;
}
corsika::units::si::HEPEnergyType GetEnergyGround() const { return energy_ground_; } corsika::units::si::HEPEnergyType GetEnergyGround() const { return energy_ground_; }
private: private:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment