IAP GITLAB

Skip to content
Snippets Groups Projects
Commit d2e8018b authored by Remy Prechelt's avatar Remy Prechelt
Browse files

Port TrackWriter to the Parquet data format.

parent 154dfd72
No related branches found
No related tags found
No related merge requests found
...@@ -8,53 +8,48 @@ ...@@ -8,53 +8,48 @@
#pragma once #pragma once
#include <corsika/modules/TrackWriter.hpp>
#include <corsika/framework/core/ParticleProperties.hpp> #include <corsika/framework/core/ParticleProperties.hpp>
#include <corsika/framework/core/PhysicalUnits.hpp>
#include <corsika/setup/SetupStack.hpp> // #include <corsika/setup/SetupStack.hpp>
#include <corsika/setup/SetupTrajectory.hpp> // #include <corsika/setup/SetupTrajectory.hpp>
#include <iomanip>
#include <limits> #include <limits>
namespace corsika { namespace corsika {
TrackWriter::TrackWriter(std::string const& filename) template <typename TOutput>
: filename_(filename) { TrackWriter<TOutput>::TrackWriter() {}
using namespace std::string_literals;
file_.open(filename_);
file_
<< "# PID, E / eV, start coordinates / m, displacement vector to end / m, steplength / m "s
<< '\n';
}
template <typename TOutput>
template <typename TParticle, typename TTrack> template <typename TParticle, typename TTrack>
ProcessReturn TrackWriter::doContinuous(const TParticle& vP, const TTrack& vT) { ProcessReturn TrackWriter<TOutput>::doContinuous(const TParticle& vP,
const TTrack& vT) {
auto const start = vT.getPosition(0).getCoordinates(); auto const start = vT.getPosition(0).getCoordinates();
auto const delta = vT.getPosition(1).getCoordinates() - start; auto const end = vT.getPosition(1).getCoordinates();
auto const pdg = static_cast<int>(get_PDG(vP.getPID()));
// write the track to the file
// clang-format off this->write(vP.getPID(), vP.getEnergy(), start, end);
file_ << std::setw(7) << pdg
<< std::setw(width_) << std::scientific << std::setprecision(precision_) << vP.getEnergy() / 1_eV
<< std::setw(width_) << std::scientific << std::setprecision(precision_) << start[0] / 1_m
<< std::setw(width_) << std::scientific << std::setprecision(precision_) << start[1] / 1_m
<< std::setw(width_) << std::scientific << std::setprecision(precision_) << start[2] / 1_m
<< std::setw(width_) << std::scientific << std::setprecision(precision_) << delta[0] / 1_m
<< std::setw(width_) << std::scientific << std::setprecision(precision_) << delta[1] / 1_m
<< std::setw(width_) << std::scientific << std::setprecision(precision_) << delta[2] / 1_m
<< std::setw(width_) << std::scientific << std::setprecision(precision_) << delta.getNorm() / 1_m
<< '\n';
// clang-format on
return ProcessReturn::Ok; return ProcessReturn::Ok;
} }
template <typename TOutput>
template <typename TParticle, typename TTrack> template <typename TParticle, typename TTrack>
LengthType TrackWriter::getMaxStepLength(const TParticle&, const TTrack&) { LengthType TrackWriter<TOutput>::getMaxStepLength(const TParticle&, const TTrack&) {
return meter * std::numeric_limits<double>::infinity(); return meter * std::numeric_limits<double>::infinity();
} }
template <typename TOutput>
YAML::Node TrackWriter<TOutput>::getConfig() const {
using namespace units::si;
YAML::Node node;
// add default units for values
node["type"] = "TrackWriter";
node["units"] = "GeV | m";
return node;
}
} // namespace corsika } // namespace corsika
/*
* (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
*
* This software is distributed under the terms of the GNU General Public
* Licence version 3 (GPL Version 3). See file LICENSE for a full version of
* the license.
*/
#pragma once
namespace corsika {
TrackWriterParquet::TrackWriterParquet()
: output_() {}
void TrackWriterParquet::startOfLibrary(std::filesystem::path const& directory) {
// setup the streamer
output_.initStreamer((directory / "tracks.parquet").string());
// build the schema
output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32,
parquet::ConvertedType::INT_32);
output_.addField("energy", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE);
output_.addField("start_x", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE);
output_.addField("start_y", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE);
output_.addField("start_z", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE);
output_.addField("end_x", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE);
output_.addField("end_y", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE);
output_.addField("end_z", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE);
// and build the streamer
output_.buildStreamer();
}
void TrackWriterParquet::endOfShower() { ++shower_; }
void TrackWriterParquet::endOfLibrary() { output_.closeStreamer(); }
void TrackWriterParquet::write(Code const& pid, units::si::HEPEnergyType const& energy,
QuantityVector<length_d> const& start,
QuantityVector<length_d> const& end) {
using namespace units::si;
// write the next row - we must write `shower_` first.
// clang-format off
*(output_.getWriter())
<< shower_
<< static_cast<int>(get_PDG(pid))
<< static_cast<float>(energy / 1_GeV)
<< static_cast<float>(start[0] / 1_m)
<< static_cast<float>(start[1] / 1_m)
<< static_cast<float>(start[2] / 1_m)
<< static_cast<float>(end[0] / 1_m)
<< static_cast<float>(end[1] / 1_m)
<< static_cast<float>(end[2] / 1_m)
<< parquet::EndRow;
// clang-format on
}
} // namespace corsika
...@@ -8,18 +8,17 @@ ...@@ -8,18 +8,17 @@
#pragma once #pragma once
#include <corsika/framework/core/PhysicalUnits.hpp>
#include <corsika/framework/process/ContinuousProcess.hpp> #include <corsika/framework/process/ContinuousProcess.hpp>
#include <corsika/modules/writers/TrackWriterParquet.hpp>
#include <fstream>
#include <string>
namespace corsika { namespace corsika {
class TrackWriter : public ContinuousProcess<TrackWriter> { template <typename TOutputWriter = TrackWriterParquet>
class TrackWriter : public ContinuousProcess<TrackWriter<TOutputWriter>>,
public TOutputWriter {
public: public:
TrackWriter(std::string const& filename); TrackWriter();
template <typename TParticle, typename TTrack> template <typename TParticle, typename TTrack>
ProcessReturn doContinuous(TParticle const&, TTrack const&); ProcessReturn doContinuous(TParticle const&, TTrack const&);
...@@ -27,12 +26,7 @@ namespace corsika { ...@@ -27,12 +26,7 @@ namespace corsika {
template <typename TParticle, typename TTrack> template <typename TParticle, typename TTrack>
LengthType getMaxStepLength(TParticle const&, TTrack const&); LengthType getMaxStepLength(TParticle const&, TTrack const&);
private: YAML::Node getConfig() const;
std::string const filename_;
std::ofstream file_;
int width_ = 14;
int precision_ = 6;
}; };
} // namespace corsika } // namespace corsika
......
/*
* (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
*
* This software is distributed under the terms of the GNU General Public
* Licence version 3 (GPL Version 3). See file LICENSE for a full version of
* the license.
*/
#pragma once
#include <corsika/output/BaseOutput.hpp>
#include <corsika/output/ParquetStreamer.hpp>
#include <corsika/framework/core/ParticleProperties.hpp>
#include <corsika/framework/core/PhysicalUnits.hpp>
namespace corsika {
class TrackWriterParquet : public BaseOutput {
ParquetStreamer output_; ///< The primary output file.
public:
/**
* Construct a new writer.
*
* @param name The name of this output.
*/
TrackWriterParquet();
/**
* Called at the start of each library.
*/
void startOfLibrary(std::filesystem::path const& directory) final override;
/**
* Called at the end of each shower.
*/
void endOfShower() final override;
/**
* Called at the end of each library.
*
* This must also increment the run number since we override
* the default behaviour of BaseOutput.
*/
void endOfLibrary() final override;
protected:
/**
* Write a track to the file.
*/
void write(Code const& pid, units::si::HEPEnergyType const& energy,
QuantityVector<length_d> const& start,
QuantityVector<length_d> const& end);
}; // class TrackWriterParquet
} // namespace corsika
#include <corsika/detail/modules/writers/TrackWriterParquet.inl>
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
""" """
from .observation_plane import ObservationPlane from .observation_plane import ObservationPlane
from .track_writer import TrackWriter
from .output import Output from .output import Output
__all__ = ["Output", "ObservationPlane"] __all__ = ["Output", "ObservationPlane", "TrackWriter"]
"""
Read data written by TrackWriter
(c) Copyright 2020 CORSIKA Project, corsika-project@lists.kit.edu
This software is distributed under the terms of the GNU General Public
Licence version 3 (GPL Version 3). See file LICENSE for a full version of
the license.
"""
import logging
import os.path as op
from typing import Any
import pyarrow.parquet as pq
from .output import Output
class TrackWriter(Output):
"""
Read particle data from a TrackWriter
"""
def __init__(self, path: str):
"""
Load the particle data into a parquet table.
Parameters
----------
path: str
The path to the directory containing this output.
"""
super().__init__(path)
# try and load our data
try:
self.__data = pq.read_table(op.join(path, "tracks.parquet"))
except Exception as e:
logging.getLogger("corsika").warn(
f"An error occured loading a TrackWriter: {e}"
)
def is_good(self) -> bool:
"""
Returns true if this output has been read successfully
and has the correct files/state/etc.
Returns
-------
bool:
True if this is a good output.
"""
return self.__data is not None
def astype(self, dtype: str = "pandas", **kwargs: Any) -> Any:
"""
Load the particle data from this track writer.
All additional keyword arguments are passed to `parquet.read_table`
Parameters
----------
dtype: str
The data format to return the data in (i.e. numpy, pandas, etc.)
Returns
-------
Any:
The return type of this method is determined by `dtype`.
"""
if dtype == "arrow":
return self.__data
elif dtype == "pandas":
return self.__data.to_pandas()
else:
raise ValueError(
(
f"Unknown format '{dtype}' for TrackWriter. "
"We currently only support ['arrow', 'pandas']."
)
)
def __repr__(self) -> str:
"""
Return a string representation of this class.
"""
return f"TrackWriter('{self.config['name']}')"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment