IAP GITLAB

Skip to content
Snippets Groups Projects
Commit c8146d88 authored by Alan Coleman's avatar Alan Coleman Committed by Marvin Gottowik
Browse files

Added compression for io

parent ac396934
No related branches found
No related tags found
1 merge request!656Added compression for io
Showing
with 113 additions and 25 deletions
...@@ -215,7 +215,7 @@ build_test-clang-14: ...@@ -215,7 +215,7 @@ build_test-clang-14:
- cmake --build . --target run_examples -- -j2 - cmake --build . --target run_examples -- -j2
- export EXE=${CI_PROJECT_DIR}/build/install/bin/c8_air_shower # Run the example scripts - export EXE=${CI_PROJECT_DIR}/build/install/bin/c8_air_shower # Run the example scripts
- export OUTPUT_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/ - export OUTPUT_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/
- export CMD="$EXE --seed 1234 --energy 1e3 -p 2212 --disable-interaction-histograms --filename $OUTPUT_DIR/c8_air_shower_output" - export CMD="$EXE --seed 1234 --energy 1e3 -p 2212 --disable-interaction-histograms --compress --filename $OUTPUT_DIR/c8_air_shower_output"
- echo "Running... $CMD" - echo "Running... $CMD"
- $CMD - $CMD
rules: rules:
...@@ -235,7 +235,7 @@ build_test-clang-14: ...@@ -235,7 +235,7 @@ build_test-clang-14:
- ${CI_PROJECT_DIR}/build/test_outputs/junit*.xml - ${CI_PROJECT_DIR}/build/test_outputs/junit*.xml
paths: paths:
- ${CI_PROJECT_DIR}/build/build_examples/example_outputs/radio_em_shower_outputs #python examples need this - ${CI_PROJECT_DIR}/build/build_examples/example_outputs/radio_em_shower_outputs #python examples need this
- ${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output #python examples need this - ${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output.tar #python examples need this
- ${CI_PROJECT_DIR}/build/test_outputs/junit*.xml - ${CI_PROJECT_DIR}/build/test_outputs/junit*.xml
- ${CI_PROJECT_DIR}/build/CMakeCache.txt #python tests need this - ${CI_PROJECT_DIR}/build/CMakeCache.txt #python tests need this
- ${CI_PROJECT_DIR}/build #/bin #python tests need this - ${CI_PROJECT_DIR}/build #/bin #python tests need this
...@@ -455,12 +455,13 @@ python-examples: ...@@ -455,12 +455,13 @@ python-examples:
paths: paths:
- ${CI_PROJECT_DIR}/python/examples/example_plots - ${CI_PROJECT_DIR}/python/examples/example_plots
script: script:
- export EXAMPLE_SHOWER_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output - export EXAMPLE_SHOWER=${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output.tar
- cd ${CI_PROJECT_DIR}/python - cd ${CI_PROJECT_DIR}/python
- pip3 install --user -e '.[test,examples]' - pip3 install --user -e '.[test,examples]'
- cd ${CI_PROJECT_DIR}/python/examples - cd ${CI_PROJECT_DIR}/python/examples
- python3 particle_distribution.py --input-dir $EXAMPLE_SHOWER_DIR - python3 particle_distribution.py --input-dir $EXAMPLE_SHOWER
- python3 shower_profile.py --input-dir $EXAMPLE_SHOWER_DIR - python3 shower_profile.py --input-dir $EXAMPLE_SHOWER
- python3 first_interactions.py --input-dir $EXAMPLE_SHOWER_DIR - python3 first_interactions.py --input-dir $EXAMPLE_SHOWER
- export EXAMPLE_RADIO_SHOWER_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/radio_em_shower_outputs - export EXAMPLE_RADIO_SHOWER_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/radio_em_shower_outputs
- python3 radio_emission.py --input-dir $EXAMPLE_RADIO_SHOWER_DIR - python3 radio_emission.py --input-dir $EXAMPLE_RADIO_SHOWER_DIR
...@@ -229,6 +229,9 @@ int main(int argc, char** argv) { ...@@ -229,6 +229,9 @@ int main(int argc, char** argv) {
->default_val("corsika_library") ->default_val("corsika_library")
->check(CLI::NonexistentPath) ->check(CLI::NonexistentPath)
->group("Library/Output"); ->group("Library/Output");
bool compressOutput = false;
app.add_flag("--compress", compressOutput, "Compress the output directory to a tarball")
->group("Library/Output");
app.add_option("-s,--seed", "The random number seed.") app.add_option("-s,--seed", "The random number seed.")
->default_val(0) ->default_val(0)
->check(CLI::NonNegativeNumber) ->check(CLI::NonNegativeNumber)
...@@ -401,7 +404,8 @@ int main(int argc, char** argv) { ...@@ -401,7 +404,8 @@ int main(int argc, char** argv) {
std::stringstream args; std::stringstream args;
for (int i = 0; i < argc; ++i) { args << argv[i] << " "; } for (int i = 0; i < argc; ++i) { args << argv[i] << " "; }
// create the output manager that we then register outputs with // create the output manager that we then register outputs with
OutputManager output(app["--filename"]->as<std::string>(), seed, args.str()); OutputManager output(app["--filename"]->as<std::string>(), seed, args.str(),
compressOutput);
// register energy losses as output // register energy losses as output
EnergyLossWriter dEdX{showerAxis, dX}; EnergyLossWriter dEdX{showerAxis, dX};
......
...@@ -18,6 +18,7 @@ class Pkg(ConanFile): ...@@ -18,6 +18,7 @@ class Pkg(ConanFile):
'arrow*:with_utf8proc': 'False', 'arrow*:with_utf8proc': 'False',
'arrow*:with_zstd': 'False', 'arrow*:with_zstd': 'False',
'arrow*:with_bz2': 'False', 'arrow*:with_bz2': 'False',
'arrow*:with_lz4': 'True',
'arrow*:with_thrift': 'True', 'arrow*:with_thrift': 'True',
'arrow*:with_boost': 'True', 'arrow*:with_boost': 'True',
'boost*:without_container': 'True', 'boost*:without_container': 'True',
......
...@@ -72,6 +72,10 @@ namespace corsika { ...@@ -72,6 +72,10 @@ namespace corsika {
// setup the streamer // setup the streamer
output_.initStreamer((directory / ("observers.parquet")).string()); output_.initStreamer((directory / ("observers.parquet")).string());
// enable compression with the default level
output_.enableCompression();
// LCOV_EXCL_START // LCOV_EXCL_START
// build the schema // build the schema
output_.addField("Time", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE, output_.addField("Time", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE,
......
...@@ -31,7 +31,7 @@ namespace corsika { ...@@ -31,7 +31,7 @@ namespace corsika {
output_.initStreamer((directory / "dEdX.parquet").string()); output_.initStreamer((directory / "dEdX.parquet").string());
// enable compression with the default level // enable compression with the default level
// output_.enableCompression(); output_.enableCompression();
// build the schema // build the schema
output_.addField("X", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, output_.addField("X", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
......
...@@ -102,6 +102,9 @@ namespace corsika { ...@@ -102,6 +102,9 @@ namespace corsika {
boost::filesystem::path const& directory) { boost::filesystem::path const& directory) {
output_.initStreamer((directory / ("interactions.parquet")).string()); output_.initStreamer((directory / ("interactions.parquet")).string());
// enable compression with the default level
output_.enableCompression();
output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32,
parquet::ConvertedType::INT_32); parquet::ConvertedType::INT_32);
output_.addField("px", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, output_.addField("px", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
......
...@@ -31,7 +31,7 @@ namespace corsika { ...@@ -31,7 +31,7 @@ namespace corsika {
output_.initStreamer((directory / "profile.parquet").string()); output_.initStreamer((directory / "profile.parquet").string());
// enable compression with the default level // enable compression with the default level
// output_.enableCompression(); output_.enableCompression();
// build the schema // build the schema
output_.addField("X", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, output_.addField("X", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
......
...@@ -24,7 +24,7 @@ namespace corsika { ...@@ -24,7 +24,7 @@ namespace corsika {
output_.initStreamer((directory / "particles.parquet").string()); output_.initStreamer((directory / "particles.parquet").string());
// enable compression with the default level // enable compression with the default level
// output_.enableCompression(); output_.enableCompression();
// build the schema // build the schema
output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32,
......
...@@ -20,6 +20,9 @@ namespace corsika { ...@@ -20,6 +20,9 @@ namespace corsika {
// setup the streamer // setup the streamer
output_.initStreamer((directory / "tracks.parquet").string()); output_.initStreamer((directory / "tracks.parquet").string());
// enable compression with the default level
output_.enableCompression();
// build the schema // build the schema
output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32,
parquet::ConvertedType::INT_32); parquet::ConvertedType::INT_32);
......
...@@ -23,9 +23,11 @@ ...@@ -23,9 +23,11 @@
namespace corsika { namespace corsika {
inline OutputManager::OutputManager(std::string const& dir_path, const long& vseed = 0, inline OutputManager::OutputManager(std::string const& dir_path, const long& vseed = 0,
std::string const& vargs = "") std::string const& vargs = "",
bool useCompression = false)
: root_(dir_path) : root_(dir_path)
, cmnd_line_args_(vargs) , cmnd_line_args_(vargs)
, useCompression_(useCompression)
, count_(0) , count_(0)
, seed_(vseed) { , seed_(vseed) {
...@@ -89,6 +91,26 @@ namespace corsika { ...@@ -89,6 +91,26 @@ namespace corsika {
// make sure that we gracefully close all the outputs. This is a supported // make sure that we gracefully close all the outputs. This is a supported
// method of operation so we don't issue a warning here // method of operation so we don't issue a warning here
if (state_ == OutputState::LibraryReady) { endOfLibrary(); } if (state_ == OutputState::LibraryReady) { endOfLibrary(); }
if (useCompression_) {
auto const parent = root_.parent_path();
auto const final_part = root_.filename();
// Ensure we use "./" if the parent path is empty
std::string parent_path = parent.empty() ? "./" : parent.string() + "/";
std::string const cmd = "tar cf " + parent_path + final_part.string() + ".tar -C " +
parent_path + " " + final_part.string();
CORSIKA_LOG_INFO("Compressing output directory using: {}", cmd.c_str());
int const returnCode = std::system(cmd.c_str());
if (returnCode) {
CORSIKA_LOG_ERROR("Compression returned with error code {}", returnCode);
} else {
// remove the original directory
boost::filesystem::remove_all(root_);
}
}
} }
inline int OutputManager::getEventId() const { return count_; } inline int OutputManager::getEventId() const { return count_; }
...@@ -232,6 +254,5 @@ namespace corsika { ...@@ -232,6 +254,5 @@ namespace corsika {
// and the library has finished // and the library has finished
state_ = OutputState::LibraryFinished; state_ = OutputState::LibraryFinished;
} // namespace corsika }
} // namespace corsika } // namespace corsika
...@@ -40,7 +40,7 @@ namespace corsika { ...@@ -40,7 +40,7 @@ namespace corsika {
* @param input_args The command line arguments at runtime (written to summary file) * @param input_args The command line arguments at runtime (written to summary file)
*/ */
OutputManager(std::string const& dir_path, const long& vseed, OutputManager(std::string const& dir_path, const long& vseed,
std::string const& input_args); std::string const& input_args, bool useCompression);
/** /**
* Handle graceful closure of the outputs upon destruction. * Handle graceful closure of the outputs upon destruction.
...@@ -108,6 +108,7 @@ namespace corsika { ...@@ -108,6 +108,7 @@ namespace corsika {
boost::filesystem::path root_; ///< The unique output directory. boost::filesystem::path root_; ///< The unique output directory.
OutputState state_{OutputState::NoInit}; ///< The current state of this manager. OutputState state_{OutputState::NoInit}; ///< The current state of this manager.
std::string const cmnd_line_args_; ///< The command line arguments used in this run std::string const cmnd_line_args_; ///< The command line arguments used in this run
bool useCompression_; ///< compress the files to tarball
int count_{0}; ///< The current ID of this shower. int count_{0}; ///< The current ID of this shower.
long seed_{0}; long seed_{0};
std::chrono::time_point<std::chrono::system_clock> const start_time{ std::chrono::time_point<std::chrono::system_clock> const start_time{
......
...@@ -47,7 +47,7 @@ namespace corsika { ...@@ -47,7 +47,7 @@ namespace corsika {
/** /**
* Enable compression for this streamer. * Enable compression for this streamer.
*/ */
void enableCompression(int const level = 3); void enableCompression(int const level = 9);
/** /**
* Finalize the streamer construction. * Finalize the streamer construction.
......
...@@ -11,6 +11,8 @@ ...@@ -11,6 +11,8 @@
import logging import logging
import os import os
import os.path as op import os.path as op
import tarfile
import tempfile
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
import yaml import yaml
...@@ -38,7 +40,22 @@ class Library(object): ...@@ -38,7 +40,22 @@ class Library(object):
If `path` does not contain a valid CORSIKA8 library. If `path` does not contain a valid CORSIKA8 library.
""" """
# check that this is a valid library self.tempDir = None
# If the path is a tarball, make a temp dir to open the file into
if path.endswith(".tar"):
tar = tarfile.open(path, "r")
self.tempDir = tempfile.TemporaryDirectory(prefix="c8_temp_dir_")
msg = f"Unzipping corsika library {path} to {self.tempDir.name}"
logging.getLogger("corsika").info(msg)
tar.extractall(self.tempDir.name)
tar.close()
path = op.join(
self.tempDir.name, os.path.basename(path).replace(".tar", "")
)
# Check that this is a valid library
if not self.__valid_library(path): if not self.__valid_library(path):
raise ValueError(f"'{path}' does not contain a valid CORSIKA8 library.") raise ValueError(f"'{path}' does not contain a valid CORSIKA8 library.")
...@@ -48,8 +65,9 @@ class Library(object): ...@@ -48,8 +65,9 @@ class Library(object):
output_dirs = None output_dirs = None
# load the config and summary files # load the config and summary files
self.config = self.load_config(path) self.config = self.load_config(self.path)
self.summary = self.load_summary(path) self.summary = self.load_summary(self.path)
if self.summary is None: if self.summary is None:
msg = f"Missing summary file in '{path}'." msg = f"Missing summary file in '{path}'."
msg += " The simulation may not have finished. Will not load library" msg += " The simulation may not have finished. Will not load library"
...@@ -66,7 +84,11 @@ class Library(object): ...@@ -66,7 +84,11 @@ class Library(object):
logging.getLogger("corsika").debug(msg) logging.getLogger("corsika").debug(msg)
# build the list of outputs # build the list of outputs
self.__outputs = self.__build_outputs(path, output_dirs) self.__outputs = self.__build_outputs(self.path, output_dirs)
def __del__(self) -> None:
if self.tempDir is not None:
self.tempDir.cleanup()
@property @property
def names(self) -> List[str]: def names(self) -> List[str]:
......
...@@ -153,4 +153,3 @@ for ishower in range(n_showers): ...@@ -153,4 +153,3 @@ for ishower in range(n_showers):
plot_path = os.path.join(args.output_dir, f"ObserverWaveforms_Sh{ishower}.png") plot_path = os.path.join(args.output_dir, f"ObserverWaveforms_Sh{ishower}.png")
print("Saving", plot_path) print("Saving", plot_path)
fig.savefig(plot_path, bbox_inches="tight") fig.savefig(plot_path, bbox_inches="tight")
...@@ -68,7 +68,7 @@ TEST_CASE("OutputManager") { ...@@ -68,7 +68,7 @@ TEST_CASE("OutputManager") {
} }
// output manager performs nothing, no action, just interface // output manager performs nothing, no action, just interface
OutputManager output("./out_test/check", 0, ""); OutputManager output("./out_test/check", 0, "", false);
CHECK(boost::filesystem::is_directory("./out_test/check")); CHECK(boost::filesystem::is_directory("./out_test/check"));
...@@ -98,6 +98,35 @@ TEST_CASE("OutputManager") { ...@@ -98,6 +98,35 @@ TEST_CASE("OutputManager") {
test.endLibrary_ = false; test.endLibrary_ = false;
} }
SECTION("compression") {
std::string const outputDir = "./out_compressed";
// preparation
if (boost::filesystem::exists(outputDir)) {
boost::filesystem::remove_all(outputDir);
}
// We make a pointer here because the compression happens at deconstruction
OutputManager* output = new OutputManager(outputDir, 0, "", true);
CHECK(boost::filesystem::is_directory(outputDir));
CHECK(
!boost::filesystem::exists(outputDir + ".tar")); // compressed file does NOT exist
// Make an output and open/close shower/lib
DummyOutput test;
output->add("test", test);
output->startOfLibrary();
output->startOfShower();
output->endOfShower();
output->endOfLibrary();
// Ensure compression happens at deconstruction
CHECK(
!boost::filesystem::exists(outputDir + ".tar")); // compressed file does NOT exist
delete output;
CHECK(boost::filesystem::exists(outputDir + ".tar")); // compressed file DOES exist
}
SECTION("auto-write") { SECTION("auto-write") {
// preparation // preparation
...@@ -106,7 +135,7 @@ TEST_CASE("OutputManager") { ...@@ -106,7 +135,7 @@ TEST_CASE("OutputManager") {
} }
// output manager performs nothing, no action, just interface // output manager performs nothing, no action, just interface
OutputManager* output = new OutputManager("./out_test/check", 0, ""); OutputManager* output = new OutputManager("./out_test/check", 0, "", false);
CHECK(boost::filesystem::is_directory("./out_test/check")); CHECK(boost::filesystem::is_directory("./out_test/check"));
...@@ -137,8 +166,8 @@ TEST_CASE("OutputManager") { ...@@ -137,8 +166,8 @@ TEST_CASE("OutputManager") {
} }
// output manager performs nothing, no action, just interface // output manager performs nothing, no action, just interface
OutputManager output("./out_test/check", 0, ""); OutputManager output("./out_test/check", 0, "", false);
CHECK_THROWS(new OutputManager("./out_test/check", 0, "")); CHECK_THROWS(new OutputManager("./out_test/check", 0, "", false));
CHECK_THROWS(output.endOfLibrary()); CHECK_THROWS(output.endOfLibrary());
......
...@@ -37,7 +37,7 @@ TEST_CASE("ParquetStreamer") { ...@@ -37,7 +37,7 @@ TEST_CASE("ParquetStreamer") {
test.addField("testfloat", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, test.addField("testfloat", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
parquet::ConvertedType::NONE); parquet::ConvertedType::NONE);
// test.enableCompression(1); needs to be enabled via conan test.enableCompression(1);
test.buildStreamer(); test.buildStreamer();
CHECK(test.isInit()); CHECK(test.isInit());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment