From c8146d88792f3da8613b793e321342079d640743 Mon Sep 17 00:00:00 2001 From: Alan Coleman <alanco@umich.edu> Date: Wed, 4 Dec 2024 11:02:19 +0000 Subject: [PATCH] Added compression for io --- .gitlab-ci.yml | 13 ++++--- applications/c8_air_shower.cpp | 6 ++- conanfile.py | 1 + corsika/detail/modules/radio/RadioProcess.inl | 4 ++ .../writers/EnergyLossWriterParquet.inl | 2 +- .../modules/writers/InteractionWriter.inl | 3 ++ .../LongitudinalProfileWriterParquet.inl | 2 +- .../modules/writers/ParticleWriterParquet.inl | 2 +- .../modules/writers/TrackWriterParquet.inl | 3 ++ corsika/detail/output/OutputManager.inl | 27 ++++++++++++-- corsika/output/OutputManager.hpp | 3 +- corsika/output/ParquetStreamer.hpp | 2 +- python/corsika/io/library.py | 30 +++++++++++++-- python/examples/radio_emission.py | 1 - tests/output/testOutputManager.cpp | 37 +++++++++++++++++-- tests/output/testParquetStreamer.cpp | 2 +- 16 files changed, 113 insertions(+), 25 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f049ddc71..229d2cdf9 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -215,7 +215,7 @@ build_test-clang-14: - cmake --build . --target run_examples -- -j2 - export EXE=${CI_PROJECT_DIR}/build/install/bin/c8_air_shower # Run the example scripts - export OUTPUT_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/ - - export CMD="$EXE --seed 1234 --energy 1e3 -p 2212 --disable-interaction-histograms --filename $OUTPUT_DIR/c8_air_shower_output" + - export CMD="$EXE --seed 1234 --energy 1e3 -p 2212 --disable-interaction-histograms --compress --filename $OUTPUT_DIR/c8_air_shower_output" - echo "Running... $CMD" - $CMD rules: @@ -235,7 +235,7 @@ build_test-clang-14: - ${CI_PROJECT_DIR}/build/test_outputs/junit*.xml paths: - ${CI_PROJECT_DIR}/build/build_examples/example_outputs/radio_em_shower_outputs #python examples need this - - ${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output #python examples need this + - ${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output.tar #python examples need this - ${CI_PROJECT_DIR}/build/test_outputs/junit*.xml - ${CI_PROJECT_DIR}/build/CMakeCache.txt #python tests need this - ${CI_PROJECT_DIR}/build #/bin #python tests need this @@ -455,12 +455,13 @@ python-examples: paths: - ${CI_PROJECT_DIR}/python/examples/example_plots script: - - export EXAMPLE_SHOWER_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output + - export EXAMPLE_SHOWER=${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output.tar - cd ${CI_PROJECT_DIR}/python - pip3 install --user -e '.[test,examples]' - cd ${CI_PROJECT_DIR}/python/examples - - python3 particle_distribution.py --input-dir $EXAMPLE_SHOWER_DIR - - python3 shower_profile.py --input-dir $EXAMPLE_SHOWER_DIR - - python3 first_interactions.py --input-dir $EXAMPLE_SHOWER_DIR + - python3 particle_distribution.py --input-dir $EXAMPLE_SHOWER + - python3 shower_profile.py --input-dir $EXAMPLE_SHOWER + - python3 first_interactions.py --input-dir $EXAMPLE_SHOWER - export EXAMPLE_RADIO_SHOWER_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/radio_em_shower_outputs - python3 radio_emission.py --input-dir $EXAMPLE_RADIO_SHOWER_DIR + diff --git a/applications/c8_air_shower.cpp b/applications/c8_air_shower.cpp index a91a830d5..9b0f008c0 100644 --- a/applications/c8_air_shower.cpp +++ b/applications/c8_air_shower.cpp @@ -229,6 +229,9 @@ int main(int argc, char** argv) { ->default_val("corsika_library") ->check(CLI::NonexistentPath) ->group("Library/Output"); + bool compressOutput = false; + app.add_flag("--compress", compressOutput, "Compress the output directory to a tarball") + ->group("Library/Output"); app.add_option("-s,--seed", "The random number seed.") ->default_val(0) ->check(CLI::NonNegativeNumber) @@ -401,7 +404,8 @@ int main(int argc, char** argv) { std::stringstream args; for (int i = 0; i < argc; ++i) { args << argv[i] << " "; } // create the output manager that we then register outputs with - OutputManager output(app["--filename"]->as<std::string>(), seed, args.str()); + OutputManager output(app["--filename"]->as<std::string>(), seed, args.str(), + compressOutput); // register energy losses as output EnergyLossWriter dEdX{showerAxis, dX}; diff --git a/conanfile.py b/conanfile.py index 1a1f5d83f..28c08b731 100644 --- a/conanfile.py +++ b/conanfile.py @@ -18,6 +18,7 @@ class Pkg(ConanFile): 'arrow*:with_utf8proc': 'False', 'arrow*:with_zstd': 'False', 'arrow*:with_bz2': 'False', + 'arrow*:with_lz4': 'True', 'arrow*:with_thrift': 'True', 'arrow*:with_boost': 'True', 'boost*:without_container': 'True', diff --git a/corsika/detail/modules/radio/RadioProcess.inl b/corsika/detail/modules/radio/RadioProcess.inl index b77b78cb6..58e395051 100644 --- a/corsika/detail/modules/radio/RadioProcess.inl +++ b/corsika/detail/modules/radio/RadioProcess.inl @@ -72,6 +72,10 @@ namespace corsika { // setup the streamer output_.initStreamer((directory / ("observers.parquet")).string()); + + // enable compression with the default level + output_.enableCompression(); + // LCOV_EXCL_START // build the schema output_.addField("Time", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE, diff --git a/corsika/detail/modules/writers/EnergyLossWriterParquet.inl b/corsika/detail/modules/writers/EnergyLossWriterParquet.inl index 55b301f50..d58e5423a 100644 --- a/corsika/detail/modules/writers/EnergyLossWriterParquet.inl +++ b/corsika/detail/modules/writers/EnergyLossWriterParquet.inl @@ -31,7 +31,7 @@ namespace corsika { output_.initStreamer((directory / "dEdX.parquet").string()); // enable compression with the default level - // output_.enableCompression(); + output_.enableCompression(); // build the schema output_.addField("X", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, diff --git a/corsika/detail/modules/writers/InteractionWriter.inl b/corsika/detail/modules/writers/InteractionWriter.inl index e4d23bd50..f443ecd10 100644 --- a/corsika/detail/modules/writers/InteractionWriter.inl +++ b/corsika/detail/modules/writers/InteractionWriter.inl @@ -102,6 +102,9 @@ namespace corsika { boost::filesystem::path const& directory) { output_.initStreamer((directory / ("interactions.parquet")).string()); + // enable compression with the default level + output_.enableCompression(); + output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, parquet::ConvertedType::INT_32); output_.addField("px", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, diff --git a/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl b/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl index 2e5801fee..1fb032686 100644 --- a/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl +++ b/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl @@ -31,7 +31,7 @@ namespace corsika { output_.initStreamer((directory / "profile.parquet").string()); // enable compression with the default level - // output_.enableCompression(); + output_.enableCompression(); // build the schema output_.addField("X", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, diff --git a/corsika/detail/modules/writers/ParticleWriterParquet.inl b/corsika/detail/modules/writers/ParticleWriterParquet.inl index 640c1c704..b80fb4608 100644 --- a/corsika/detail/modules/writers/ParticleWriterParquet.inl +++ b/corsika/detail/modules/writers/ParticleWriterParquet.inl @@ -24,7 +24,7 @@ namespace corsika { output_.initStreamer((directory / "particles.parquet").string()); // enable compression with the default level - // output_.enableCompression(); + output_.enableCompression(); // build the schema output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, diff --git a/corsika/detail/modules/writers/TrackWriterParquet.inl b/corsika/detail/modules/writers/TrackWriterParquet.inl index 97624dcc4..a529c319a 100644 --- a/corsika/detail/modules/writers/TrackWriterParquet.inl +++ b/corsika/detail/modules/writers/TrackWriterParquet.inl @@ -20,6 +20,9 @@ namespace corsika { // setup the streamer output_.initStreamer((directory / "tracks.parquet").string()); + // enable compression with the default level + output_.enableCompression(); + // build the schema output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, parquet::ConvertedType::INT_32); diff --git a/corsika/detail/output/OutputManager.inl b/corsika/detail/output/OutputManager.inl index e7bebabe1..7ccc3e150 100644 --- a/corsika/detail/output/OutputManager.inl +++ b/corsika/detail/output/OutputManager.inl @@ -23,9 +23,11 @@ namespace corsika { inline OutputManager::OutputManager(std::string const& dir_path, const long& vseed = 0, - std::string const& vargs = "") + std::string const& vargs = "", + bool useCompression = false) : root_(dir_path) , cmnd_line_args_(vargs) + , useCompression_(useCompression) , count_(0) , seed_(vseed) { @@ -89,6 +91,26 @@ namespace corsika { // make sure that we gracefully close all the outputs. This is a supported // method of operation so we don't issue a warning here if (state_ == OutputState::LibraryReady) { endOfLibrary(); } + + if (useCompression_) { + auto const parent = root_.parent_path(); + auto const final_part = root_.filename(); + + // Ensure we use "./" if the parent path is empty + std::string parent_path = parent.empty() ? "./" : parent.string() + "/"; + + std::string const cmd = "tar cf " + parent_path + final_part.string() + ".tar -C " + + parent_path + " " + final_part.string(); + CORSIKA_LOG_INFO("Compressing output directory using: {}", cmd.c_str()); + int const returnCode = std::system(cmd.c_str()); + + if (returnCode) { + CORSIKA_LOG_ERROR("Compression returned with error code {}", returnCode); + } else { + // remove the original directory + boost::filesystem::remove_all(root_); + } + } } inline int OutputManager::getEventId() const { return count_; } @@ -232,6 +254,5 @@ namespace corsika { // and the library has finished state_ = OutputState::LibraryFinished; - } // namespace corsika - + } } // namespace corsika diff --git a/corsika/output/OutputManager.hpp b/corsika/output/OutputManager.hpp index dbadf4508..a169c9999 100644 --- a/corsika/output/OutputManager.hpp +++ b/corsika/output/OutputManager.hpp @@ -40,7 +40,7 @@ namespace corsika { * @param input_args The command line arguments at runtime (written to summary file) */ OutputManager(std::string const& dir_path, const long& vseed, - std::string const& input_args); + std::string const& input_args, bool useCompression); /** * Handle graceful closure of the outputs upon destruction. @@ -108,6 +108,7 @@ namespace corsika { boost::filesystem::path root_; ///< The unique output directory. OutputState state_{OutputState::NoInit}; ///< The current state of this manager. std::string const cmnd_line_args_; ///< The command line arguments used in this run + bool useCompression_; ///< compress the files to tarball int count_{0}; ///< The current ID of this shower. long seed_{0}; std::chrono::time_point<std::chrono::system_clock> const start_time{ diff --git a/corsika/output/ParquetStreamer.hpp b/corsika/output/ParquetStreamer.hpp index dcf1f547f..e27ad9a1a 100644 --- a/corsika/output/ParquetStreamer.hpp +++ b/corsika/output/ParquetStreamer.hpp @@ -47,7 +47,7 @@ namespace corsika { /** * Enable compression for this streamer. */ - void enableCompression(int const level = 3); + void enableCompression(int const level = 9); /** * Finalize the streamer construction. diff --git a/python/corsika/io/library.py b/python/corsika/io/library.py index ad1f52052..e3fadfa9e 100644 --- a/python/corsika/io/library.py +++ b/python/corsika/io/library.py @@ -11,6 +11,8 @@ import logging import os import os.path as op +import tarfile +import tempfile from typing import Any, Dict, List, Optional, Union import yaml @@ -38,7 +40,22 @@ class Library(object): If `path` does not contain a valid CORSIKA8 library. """ - # check that this is a valid library + self.tempDir = None + + # If the path is a tarball, make a temp dir to open the file into + if path.endswith(".tar"): + tar = tarfile.open(path, "r") + self.tempDir = tempfile.TemporaryDirectory(prefix="c8_temp_dir_") + msg = f"Unzipping corsika library {path} to {self.tempDir.name}" + logging.getLogger("corsika").info(msg) + tar.extractall(self.tempDir.name) + tar.close() + + path = op.join( + self.tempDir.name, os.path.basename(path).replace(".tar", "") + ) + + # Check that this is a valid library if not self.__valid_library(path): raise ValueError(f"'{path}' does not contain a valid CORSIKA8 library.") @@ -48,8 +65,9 @@ class Library(object): output_dirs = None # load the config and summary files - self.config = self.load_config(path) - self.summary = self.load_summary(path) + self.config = self.load_config(self.path) + self.summary = self.load_summary(self.path) + if self.summary is None: msg = f"Missing summary file in '{path}'." msg += " The simulation may not have finished. Will not load library" @@ -66,7 +84,11 @@ class Library(object): logging.getLogger("corsika").debug(msg) # build the list of outputs - self.__outputs = self.__build_outputs(path, output_dirs) + self.__outputs = self.__build_outputs(self.path, output_dirs) + + def __del__(self) -> None: + if self.tempDir is not None: + self.tempDir.cleanup() @property def names(self) -> List[str]: diff --git a/python/examples/radio_emission.py b/python/examples/radio_emission.py index 7d830e358..5148d09a7 100644 --- a/python/examples/radio_emission.py +++ b/python/examples/radio_emission.py @@ -153,4 +153,3 @@ for ishower in range(n_showers): plot_path = os.path.join(args.output_dir, f"ObserverWaveforms_Sh{ishower}.png") print("Saving", plot_path) fig.savefig(plot_path, bbox_inches="tight") - diff --git a/tests/output/testOutputManager.cpp b/tests/output/testOutputManager.cpp index b9e9e3fd0..fc0a562d2 100644 --- a/tests/output/testOutputManager.cpp +++ b/tests/output/testOutputManager.cpp @@ -68,7 +68,7 @@ TEST_CASE("OutputManager") { } // output manager performs nothing, no action, just interface - OutputManager output("./out_test/check", 0, ""); + OutputManager output("./out_test/check", 0, "", false); CHECK(boost::filesystem::is_directory("./out_test/check")); @@ -98,6 +98,35 @@ TEST_CASE("OutputManager") { test.endLibrary_ = false; } + SECTION("compression") { + std::string const outputDir = "./out_compressed"; + + // preparation + if (boost::filesystem::exists(outputDir)) { + boost::filesystem::remove_all(outputDir); + } + + // We make a pointer here because the compression happens at deconstruction + OutputManager* output = new OutputManager(outputDir, 0, "", true); + CHECK(boost::filesystem::is_directory(outputDir)); + CHECK( + !boost::filesystem::exists(outputDir + ".tar")); // compressed file does NOT exist + + // Make an output and open/close shower/lib + DummyOutput test; + output->add("test", test); + output->startOfLibrary(); + output->startOfShower(); + output->endOfShower(); + output->endOfLibrary(); + + // Ensure compression happens at deconstruction + CHECK( + !boost::filesystem::exists(outputDir + ".tar")); // compressed file does NOT exist + delete output; + CHECK(boost::filesystem::exists(outputDir + ".tar")); // compressed file DOES exist + } + SECTION("auto-write") { // preparation @@ -106,7 +135,7 @@ TEST_CASE("OutputManager") { } // output manager performs nothing, no action, just interface - OutputManager* output = new OutputManager("./out_test/check", 0, ""); + OutputManager* output = new OutputManager("./out_test/check", 0, "", false); CHECK(boost::filesystem::is_directory("./out_test/check")); @@ -137,8 +166,8 @@ TEST_CASE("OutputManager") { } // output manager performs nothing, no action, just interface - OutputManager output("./out_test/check", 0, ""); - CHECK_THROWS(new OutputManager("./out_test/check", 0, "")); + OutputManager output("./out_test/check", 0, "", false); + CHECK_THROWS(new OutputManager("./out_test/check", 0, "", false)); CHECK_THROWS(output.endOfLibrary()); diff --git a/tests/output/testParquetStreamer.cpp b/tests/output/testParquetStreamer.cpp index 5d1a9c677..569bdf5e9 100644 --- a/tests/output/testParquetStreamer.cpp +++ b/tests/output/testParquetStreamer.cpp @@ -37,7 +37,7 @@ TEST_CASE("ParquetStreamer") { test.addField("testfloat", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, parquet::ConvertedType::NONE); - // test.enableCompression(1); needs to be enabled via conan + test.enableCompression(1); test.buildStreamer(); CHECK(test.isInit()); -- GitLab