diff --git a/corsika/detail/modules/ObservationPlane.inl b/corsika/detail/modules/ObservationPlane.inl index e266a357f3c89351d6126de767bc09978717a34f..16dbfd36e65fbaea2cb6c4c44846976f19e1b90d 100644 --- a/corsika/detail/modules/ObservationPlane.inl +++ b/corsika/detail/modules/ObservationPlane.inl @@ -12,20 +12,18 @@ namespace corsika { - ObservationPlane::ObservationPlane(Plane const& obsPlane, DirectionVector const& x_axis, - std::string const& filename, bool deleteOnHit) + template <typename TOutput> + ObservationPlane<TOutput>::ObservationPlane(Plane const& obsPlane, DirectionVector const& x_axis, bool deleteOnHit) + : plane_(obsPlane) - , outputStream_(filename) , deleteOnHit_(deleteOnHit) , energy_ground_(0_GeV) , count_ground_(0) , xAxis_(x_axis.normalized()) - , yAxis_(obsPlane.getNormal().cross(xAxis_)) { - outputStream_ << "#PDG code, energy / eV, x distance / m, y distance / m" - << std::endl; - } + , yAxis_(obsPlane.getNormal().cross(xAxis_)) {} - ProcessReturn ObservationPlane::doContinuous( + template <typename TOutput> + ProcessReturn ObservationPlane<TOutput>::doContinuous( corsika::setup::Stack::particle_type& particle, corsika::setup::Trajectory& trajectory) { TimeType const timeOfIntersection = @@ -42,11 +40,12 @@ namespace corsika { const auto energy = particle.getEnergy(); auto const displacement = trajectory.getPosition(1) - plane_.getCenter(); - outputStream_ << static_cast<int>(get_PDG(particle.getPID())) << ' ' << energy / 1_eV - << ' ' << displacement.dot(xAxis_) / 1_m << ' ' - << displacement.dot(yAxis_) / 1_m - << (trajectory.getPosition(1) - plane_.getCenter()).getNorm() / 1_m - << std::endl; + // add our particles to the output file stream + this->write(particle.getPID(), + energy, + displacement.dot(xAxis_), + displacement.dot(yAxis_), + (trajectory.getPosition(1) - plane_.getCenter()).getNorm()); if (deleteOnHit_) { count_ground_++; @@ -58,7 +57,8 @@ namespace corsika { } } - LengthType ObservationPlane::getMaxStepLength( + template <typename TOutput> + LengthType ObservationPlane<TOutput>::getMaxStepLength( corsika::setup::Stack::particle_type const& vParticle, corsika::setup::Trajectory const& trajectory) { @@ -140,17 +140,75 @@ namespace corsika { return dist; } - void ObservationPlane::showResults() const { + template <typename TOutput> + void ObservationPlane<TOutput>::showResults() const { CORSIKA_LOG_INFO( " ******************************\n" " ObservationPlane: \n" - " energy in ground (GeV) : {}\n" - " no. of particles in ground : {}\n" + " energy an ground (GeV) : {}\n" + " no. of particles at ground : {}\n" " ******************************", energy_ground_ / 1_GeV, count_ground_); } - void ObservationPlane::reset() { + // template <typename TOutput> + // YAML::Node ObservationPlane<TOutput>::getFinalResults() const { + + // // construct the top-level node + // YAML::Node node; + + // node["energy_at_ground"] = energy_ground_ / 1 GeV; + // node["count_at_ground"] = count_ground_; + + // return node; + + // } + + template <typename TOutput> + YAML::Node ObservationPlane<TOutput>::getConfig() const { + using namespace units::si; + + // construct the top-level node + YAML::Node node; + + // basic info + node["type"] = "ObservationPlane"; + + // the center of the plane + auto const center{plane_.getCenter()}; + + // save each component in its native coordinate system + auto const center_coords{center.getCoordinates(center.getCoordinateSystem())}; + node["plane"]["center"].push_back(center_coords.getX() / 1_m); + node["plane"]["center"].push_back(center_coords.getY() / 1_m); + node["plane"]["center"].push_back(center_coords.getZ() / 1_m); + node["plane"]["center.units"] = "m"; + + // the normal vector of the plane + auto const normal{plane_.getNormal().getComponents()}; + node["plane"]["normal"].push_back(normal.getX().magnitude()); + node["plane"]["normal"].push_back(normal.getY().magnitude()); + node["plane"]["normal"].push_back(normal.getZ().magnitude()); + + // the x-axis vector + auto const xAxis_coords{xAxis_.getComponents(xAxis_.getCoordinateSystem())}; + node["x-axis"].push_back(xAxis_coords.getX().magnitude()); + node["x-axis"].push_back(xAxis_coords.getY().magnitude()); + node["x-axis"].push_back(xAxis_coords.getZ().magnitude()); + + // the y-axis vector + auto const yAxis_coords{yAxis_.getComponents(yAxis_.getCoordinateSystem())}; + node["y-axis"].push_back(yAxis_coords.getX().magnitude()); + node["y-axis"].push_back(yAxis_coords.getY().magnitude()); + node["y-axis"].push_back(yAxis_coords.getZ().magnitude()); + + node["delete_on_hit"] = deleteOnHit_; + + return node; + } + + template <typename TOutput> + void ObservationPlane<TOutput>::reset() { energy_ground_ = 0_GeV; count_ground_ = 0; } diff --git a/corsika/detail/output/BaseOutput.inl b/corsika/detail/output/BaseOutput.inl new file mode 100644 index 0000000000000000000000000000000000000000..da71adaf0c48b0c05c3e3d87da45c41d14106c78 --- /dev/null +++ b/corsika/detail/output/BaseOutput.inl @@ -0,0 +1,15 @@ +/* + * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu + * + * This software is distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3). See file LICENSE for a full version of + * the license. + */ +#pragma once + +namespace corsika { + + BaseOutput::BaseOutput() + : event_(0){} + +} diff --git a/corsika/detail/output/OutputManager.inl b/corsika/detail/output/OutputManager.inl new file mode 100644 index 0000000000000000000000000000000000000000..b35cddfae9c5f3ed8864b50094803116ec17263c --- /dev/null +++ b/corsika/detail/output/OutputManager.inl @@ -0,0 +1,176 @@ +/* + * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu + * + * This software is distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3). See file LICENSE for a full version of + * the license. + */ +#pragma once + +#include <algorithm> +#include <fstream> +#include <functional> + +namespace corsika { + + void OutputManager::writeNode(YAML::Node const& node, + std::filesystem::path const& path) const { + + // construct a YAML emitter for this config file + YAML::Emitter out; + + // and write the node to the output + out << node; + + // open the output file - this is <output name>.json + std::ofstream file(path.string()); + + // dump the JSON to the file + file << out.c_str() << std::endl; + + // and close the efile + file.close(); + } + + void OutputManager::writeTopLevelConfig() const { + + YAML::Node config; + + // some basic info + config["name"] = name_; // the simulation name + config["creator"] = "CORSIKA8"; // a tag to identify C8 libraries + config["version"] = "8.0.0-prealpha"; // the current version + // TODO: Add current datetime as a string to the config. + + // write the node to a file + writeNode(config, root_ / ("config.yaml")); + } + + void OutputManager::initOutput(std::string const& name) const { + // construct the path to this directory + auto const path{root_ / name}; + + // create the directory for this process. + std::filesystem::create_directory(path); + + // get the config for this output + auto config = outputs_.at(name).get().getConfig(); + + // and assign the name for this output + config["name"] = name; + + // write the config for this output to the file + writeNode(config, path / "config.yaml"); + } + + OutputManager::OutputManager( + std::string const& name, + std::filesystem::path const& dir = std::filesystem::current_path()) + : name_(name) + , root_(dir / name) { + + // check if this directory already exists + if (std::filesystem::exists(root_)) { + logger->warn( + "Output directory '{}' already exists! This is currenty not supported.", + root_.string()); + throw std::runtime_error("Output directory already exists."); + } + + // construct the directory for this run + std::filesystem::create_directory(root_); + + // write the top level config file + writeTopLevelConfig(); + } + + OutputManager::~OutputManager() { + + // if we are being destructed but EndOfRun() has not been called, + // make sure that we gracefully close all the outputs + if (state_ == OutputState::EventInProgress || state_ == OutputState::RunInitialized) { + endOfRun(); + } + } + + // void OutputManager::add(std::string const& name, BaseOutput& output) { + template <typename TOutput> + void OutputManager::add(std::string const& name, TOutput& output) { + + // check if that name is already in the map + if (outputs_.count(name) > 0) { + logger->warn("'{}' is already registered. All outputs must have unique names.", + name); + return; + } + + // if we get here, the name is not already in the map + // so we create the output and register it into the map + outputs_.insert(std::make_pair(name, std::ref(output))); + + // and initialize this output + initOutput(name); + } + + void OutputManager::startOfRun() { + + // this is only valid when we haven't started a run + // or have already finished a run + if + (!(state_ == OutputState::RunNoInit || state_ == OutputState::RunFinished)) { + + throw std::runtime_error("startOfRun() called in invalid state."); + } + + // we now forward this signal to all of our outputs + for (auto& [name, output] : outputs_) { + + // construct the path to this output subdirectory + auto const path{root_ / name}; + + // and start the run + output.get().startOfRun(path); + } + + // we have now started running + state_ = OutputState::RunInitialized; + } + + void OutputManager::startOfEvent() { + + // if this is called but we are still in the initialized state, + // make sure that we transition to EventInProgress + // if (state_ == OutputState::RunNoInit) { startOfRun(); } + + // now start the event for all the outputs + for (auto& [name, output] : outputs_) { output.get().startOfEvent(); } + + // and transition to the in progress state + state_ = OutputState::EventInProgress; + } + + void OutputManager::endOfEvent() { + + for (auto& [name, output] : outputs_) { output.get().endOfEvent(); } + + // switch back to the initialized state + state_ = OutputState::RunInitialized; + } + + void OutputManager::endOfRun() { + + // we can only call endOfRun when we have already started + if (state_ == OutputState::RunNoInit) { + throw std::runtime_error("endOfRun() called in invalid state."); + } + + for (auto& [name, output] : outputs_) { output.get().endOfRun(); } + + // and the run has finished + state_ = OutputState::RunFinished; + + // write any final state information into the config files + // for (auto& [name, output] : outputs_) { output.get().endOfRun(); } + } + +} // namespace corsika diff --git a/corsika/detail/output/ParquetStreamer.inl b/corsika/detail/output/ParquetStreamer.inl new file mode 100644 index 0000000000000000000000000000000000000000..71f2b3fc4686fc6f673ad3c4d3b486ac049355af --- /dev/null +++ b/corsika/detail/output/ParquetStreamer.inl @@ -0,0 +1,52 @@ +/* + * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu + * + * This software is distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3). See file LICENSE for a full version of + * the license. + */ + +#pragma once + +namespace corsika { + + ParquetStreamer::ParquetStreamer() {} + + void ParquetStreamer::initStreamer(std::string const& filepath) { + + // open the file and connect it to our pointer + PARQUET_ASSIGN_OR_THROW(outfile_, arrow::io::FileOutputStream::Open(filepath)); + + // the default builder settings + builder_.created_by("CORSIKA8"); + + // add run and event tags to the file + addField("run", parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_32); + addField("event", parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_32); + } + + template <typename... TArgs> + void ParquetStreamer::addField(TArgs&&... args) { + fields_.push_back(parquet::schema::PrimitiveNode::Make(args...)); + } + + void ParquetStreamer::buildStreamer() { + + // build the top level schema + schema_ = std::static_pointer_cast<parquet::schema::GroupNode>( + parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, + fields_)); + + // and build the writer + writer_ = std::make_shared<parquet::StreamWriter>( + parquet::ParquetFileWriter::Open(outfile_, schema_, builder_.build())); + } + + void ParquetStreamer::closeStreamer() { + writer_.reset(); + outfile_->Close(); + } + +} // namespace corsika diff --git a/corsika/modules/ObservationPlane.hpp b/corsika/modules/ObservationPlane.hpp index 7effcd93b6abfda4ae1cc8b2adca07b38cb6cdc6..aec2c0086555563d1b9718a9387bbac9733c8d9f 100644 --- a/corsika/modules/ObservationPlane.hpp +++ b/corsika/modules/ObservationPlane.hpp @@ -13,6 +13,7 @@ #include <corsika/framework/process/ContinuousProcess.hpp> #include <corsika/setup/SetupStack.hpp> #include <corsika/setup/SetupTrajectory.hpp> +#include <corsika/modules/writers/ObservationPlaneWriterParquet.hpp> #include <fstream> #include <string> @@ -24,11 +25,12 @@ namespace corsika { * central point of the plane into its output file. The particles are considered * "absorbed" afterwards. */ - class ObservationPlane : public ContinuousProcess<ObservationPlane> { + template <typename TOutputWriter = ObservationPlaneWriterParquet> + class ObservationPlane : public ContinuousProcess<ObservationPlane<TOutputWriter>>, + public TOutputWriter { public: - ObservationPlane(Plane const&, DirectionVector const&, std::string const&, - bool = true); + ObservationPlane(Plane const&, DirectionVector const&, bool = true); ProcessReturn doContinuous(corsika::setup::Stack::particle_type& vParticle, corsika::setup::Trajectory& vTrajectory); @@ -39,10 +41,9 @@ namespace corsika { void showResults() const; void reset(); HEPEnergyType getEnergyGround() const { return energy_ground_; } - + YAML::Node getConfig() const; private: Plane const plane_; - std::ofstream outputStream_; bool const deleteOnHit_; HEPEnergyType energy_ground_; unsigned int count_ground_; diff --git a/corsika/modules/writers/ObservationPlaneWriterParquet.hpp b/corsika/modules/writers/ObservationPlaneWriterParquet.hpp new file mode 100644 index 0000000000000000000000000000000000000000..25a961ac28bf7c0ade1090fdd2ba2341b2766eaa --- /dev/null +++ b/corsika/modules/writers/ObservationPlaneWriterParquet.hpp @@ -0,0 +1,86 @@ +/* + * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu + * + * This software is distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3). See file LICENSE for a full version of + * the license. + */ + +#pragma once + +#include <corsika/output/BaseOutput.hpp> +#include <corsika/output/ParquetStreamer.hpp> +#include <corsika/framework/core/ParticleProperties.hpp> +#include <corsika/framework/core/PhysicalUnits.hpp> + +namespace corsika { + + class ObservationPlaneWriterParquet : public BaseOutput, private ParquetStreamer { + + public: + /** + * Write an observation plane to a directory. + * + * @param name The name of this output. + */ + ObservationPlaneWriterParquet() + : ParquetStreamer(){}; + + /** + * Called at the start of each run. + */ + void startOfRun(std::filesystem::path const& directory) final override { + + // setup the streamer + initStreamer((directory / "particles.parquet").string()); + + // build the schema + addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, + parquet::ConvertedType::INT_32); + addField("energy", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE, + parquet::ConvertedType::NONE); + addField("x", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE, + parquet::ConvertedType::NONE); + addField("y", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE, + parquet::ConvertedType::NONE); + addField("radius", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE, + parquet::ConvertedType::NONE); + + // and build the streamer + buildStreamer(); + } + + /** + * Called at the end of each event/shower. + */ + void endOfEvent() final override { ++event_; } + + /** + * Called at the end of each run. + * + * This must also increment the run number since we override + * the default behaviour of BaseOutput. + */ + void endOfRun() final override { + closeStreamer(); + ++run_; + } + + protected: + /** + * Write a particle to the file. + */ + void write(Code const& pid, units::si::HEPEnergyType const& energy, + units::si::LengthType const& x, units::si::LengthType const& y, + units::si::LengthType const& radius) { + using namespace units::si; + + // write the next row + // NOTE: we must write run_ and then event_ first + (*writer_) << run_ << event_ << static_cast<int>(get_PDG(pid)) << energy / 1_eV + << x / 1_m << y / 1_m << radius / 1_m << parquet::EndRow; + } + + }; // class ObservationPlaneWriterParquet + +} // namespace corsika diff --git a/corsika/output/BaseOutput.hpp b/corsika/output/BaseOutput.hpp new file mode 100644 index 0000000000000000000000000000000000000000..cb75615794f193ed582858c516114f8c91612da6 --- /dev/null +++ b/corsika/output/BaseOutput.hpp @@ -0,0 +1,64 @@ +/* + * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu + * + * This software is distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3). See file LICENSE for a full version of + * the license. + */ +#pragma once + +//#include <memory> +#include <filesystem> + +#include <yaml-cpp/yaml.h> + +namespace corsika { + + /** + * This is the base class for all outputs so that they + * can be stored in homogeneous containers. + */ + // class BaseOutput : public std::enable_shared_from_this<BaseOutput> { + class BaseOutput { + + protected: + int event_{0}; ///< The current event number. + int run_{0}; ///< The current run number. + + BaseOutput(); + + public: + /** + * Called at the start of each run. + */ + virtual void startOfRun(std::filesystem::path const& directory) = 0; + + /** + * Called at the start of each event/shower. + */ + virtual void startOfEvent() {} + + /** + * Called at the end of each event/shower. + */ + virtual void endOfEvent() = 0; + + /** + * Called at the end of each run. + */ + virtual void endOfRun() = 0; + + /** + * Get the configuration of this output. + */ + virtual YAML::Node getConfig() const = 0; + + /** + * Get final text outputs for the config file. + */ + virtual YAML::Node getFinalOutput() { return YAML::Node(); }; + }; + +} // namespace corsika + +#include <corsika/detail/output/BaseOutput.inl> diff --git a/corsika/output/OutputManager.hpp b/corsika/output/OutputManager.hpp new file mode 100644 index 0000000000000000000000000000000000000000..a1a3fc144fb0c3060e650a0a08b607856357897b --- /dev/null +++ b/corsika/output/OutputManager.hpp @@ -0,0 +1,112 @@ +/* + * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu + * + * This software is distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3). See file LICENSE for a full version of + * the license. + */ +#pragma once + +#include <string> +#include <filesystem> +#include <corsika/output/BaseOutput.hpp> +#include <corsika/framework/core/Logging.hpp> + +namespace corsika { + + /*! + * Manages CORSIKA 8 output streams. + */ + class OutputManager final { + + /** + * Indicates the current state of this manager. + */ + enum class OutputState { + RunNoInit, + RunInitialized, + EventInProgress, + RunFinished, + }; + + OutputState state_{OutputState::RunNoInit}; ///< The current state of this manager. + std::string const name_; ///< The name of this simulation file. + std::filesystem::path const root_; ///< The top-level directory for the output. + inline static auto logger{get_logger("output")}; ///< A custom logger. + + /** + * The outputs that have been registered with us. + */ + // std::map<std::string, std::shared_ptr<BaseOutput>> outputs_; + std::map<std::string, std::reference_wrapper<BaseOutput>> outputs_; + + /** + * Write a YAML-node to a file. + */ + void writeNode(YAML::Node const& node, std::filesystem::path const& path) const; + + /** + * Write the top-level config of this simulation. + */ + void writeTopLevelConfig() const; + + /** + * Initialize the "registered" output with a given name. + */ + void initOutput(std::string const& name) const; + + public: + /** + * Construct an OutputManager instance with a name in a given directory. + * + * @param name The name of this output collection. + * @param dir The directory where the output directory will be stored. + */ + OutputManager(std::string const& name, std::filesystem::path const& dir); + + /** + * Handle graceful closure of the outputs upon destruction. + */ + ~OutputManager(); + + /** + * Register an existing output to this manager. + * + * @param name The unique name of this output. + * @param args... These are perfect forwarded to the + * constructor of the output. + */ + template <typename TOutput> + void add(std::string const& name, TOutput& output); + // void add(std::string const& name, BaseOutput& output); + + /** + * Called at the start of each run. + * + * This iteratively calls startOfRun on each registered output. + */ + void startOfRun(); + + /** + * Called at the start of each event/shower. + * This iteratively calls startOfEvent on each registered output. + */ + void startOfEvent(); + + /** + * Called at the end of each event/shower. + * This iteratively calls endOfEvent on each registered output. + */ + void endOfEvent(); + + /** + * Called at the end of each run. + * This iteratively calls endOfRun on each registered output. + */ + void endOfRun(); + + }; // class OutputManager + +} // namespace corsika + +#include <corsika/detail/output/OutputManager.inl> diff --git a/corsika/output/ParquetStreamer.hpp b/corsika/output/ParquetStreamer.hpp new file mode 100644 index 0000000000000000000000000000000000000000..3e5c2f1c0dbbcffc7cbe2dae73d4c66b5ec76842 --- /dev/null +++ b/corsika/output/ParquetStreamer.hpp @@ -0,0 +1,65 @@ +/* + * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu + * + * This software is distributed under the terms of the GNU General Public + * Licence version 3 (GPL Version 3). See file LICENSE for a full version of + * the license. + */ + +#pragma once + +#include <string> + +// NOTE: the order of these includes is *important* +// you will get unhelpful compiler errors about unknown +// operator definitions if these are reordered +#include <parquet/stream_writer.h> +#include <parquet/arrow/schema.h> +#include <arrow/io/file.h> + +namespace corsika { + + /** + * This class automates the construction of simple tabular + * Parquet files using the parquet::StreamWriter. + */ + class ParquetStreamer { + + protected: + std::shared_ptr<parquet::StreamWriter> writer_; ///< The stream writer to 'outfile' + parquet::WriterProperties::Builder builder_; ///< The writer properties builder. + parquet::schema::NodeVector fields_; ///< The fields in this file. + std::shared_ptr<parquet::schema::GroupNode> schema_; ///< The schema for this file. + std::shared_ptr<arrow::io::FileOutputStream> outfile_; ///< The output file. + + public: + /** + * ParquetStreamer's take no constructor arguments. + */ + ParquetStreamer(); + + /** + * Initialize the streamer to write to a given file. + */ + void initStreamer(std::string const& filepath); + + /** + * Add a field to this streamer. + */ + template <typename... TArgs> + void addField(TArgs&&... args); + + /** + * Finalize the streamer construction. + */ + void buildStreamer(); + + /** + * Finish writing this stream. + */ + void closeStreamer(); + + }; // class ParquetHelper +} // namespace corsika + +#include <corsika/detail/output/ParquetStreamer.inl>