diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f049ddc719c18f74bb25476d3d944ea1829fd113..229d2cdf950ae032b7714ed96574e9725ff28ad1 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -215,7 +215,7 @@ build_test-clang-14: - cmake --build . --target run_examples -- -j2 - export EXE=${CI_PROJECT_DIR}/build/install/bin/c8_air_shower # Run the example scripts - export OUTPUT_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/ - - export CMD="$EXE --seed 1234 --energy 1e3 -p 2212 --disable-interaction-histograms --filename $OUTPUT_DIR/c8_air_shower_output" + - export CMD="$EXE --seed 1234 --energy 1e3 -p 2212 --disable-interaction-histograms --compress --filename $OUTPUT_DIR/c8_air_shower_output" - echo "Running... $CMD" - $CMD rules: @@ -235,7 +235,7 @@ build_test-clang-14: - ${CI_PROJECT_DIR}/build/test_outputs/junit*.xml paths: - ${CI_PROJECT_DIR}/build/build_examples/example_outputs/radio_em_shower_outputs #python examples need this - - ${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output #python examples need this + - ${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output.tar #python examples need this - ${CI_PROJECT_DIR}/build/test_outputs/junit*.xml - ${CI_PROJECT_DIR}/build/CMakeCache.txt #python tests need this - ${CI_PROJECT_DIR}/build #/bin #python tests need this @@ -455,12 +455,13 @@ python-examples: paths: - ${CI_PROJECT_DIR}/python/examples/example_plots script: - - export EXAMPLE_SHOWER_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output + - export EXAMPLE_SHOWER=${CI_PROJECT_DIR}/build/build_examples/example_outputs/c8_air_shower_output.tar - cd ${CI_PROJECT_DIR}/python - pip3 install --user -e '.[test,examples]' - cd ${CI_PROJECT_DIR}/python/examples - - python3 particle_distribution.py --input-dir $EXAMPLE_SHOWER_DIR - - python3 shower_profile.py --input-dir $EXAMPLE_SHOWER_DIR - - python3 first_interactions.py --input-dir $EXAMPLE_SHOWER_DIR + - python3 particle_distribution.py --input-dir $EXAMPLE_SHOWER + - python3 shower_profile.py --input-dir $EXAMPLE_SHOWER + - python3 first_interactions.py --input-dir $EXAMPLE_SHOWER - export EXAMPLE_RADIO_SHOWER_DIR=${CI_PROJECT_DIR}/build/build_examples/example_outputs/radio_em_shower_outputs - python3 radio_emission.py --input-dir $EXAMPLE_RADIO_SHOWER_DIR + diff --git a/applications/c8_air_shower.cpp b/applications/c8_air_shower.cpp index a91a830d59ed93149e86bd9cf85d82de9c329ff1..9b0f008c0b457175163ac1584e22f8514b136587 100644 --- a/applications/c8_air_shower.cpp +++ b/applications/c8_air_shower.cpp @@ -229,6 +229,9 @@ int main(int argc, char** argv) { ->default_val("corsika_library") ->check(CLI::NonexistentPath) ->group("Library/Output"); + bool compressOutput = false; + app.add_flag("--compress", compressOutput, "Compress the output directory to a tarball") + ->group("Library/Output"); app.add_option("-s,--seed", "The random number seed.") ->default_val(0) ->check(CLI::NonNegativeNumber) @@ -401,7 +404,8 @@ int main(int argc, char** argv) { std::stringstream args; for (int i = 0; i < argc; ++i) { args << argv[i] << " "; } // create the output manager that we then register outputs with - OutputManager output(app["--filename"]->as<std::string>(), seed, args.str()); + OutputManager output(app["--filename"]->as<std::string>(), seed, args.str(), + compressOutput); // register energy losses as output EnergyLossWriter dEdX{showerAxis, dX}; diff --git a/conanfile.py b/conanfile.py index 1a1f5d83fe795b00e8f1e0d8e561a899ebc9171e..28c08b731761103bda0f08ab45577640624477fe 100644 --- a/conanfile.py +++ b/conanfile.py @@ -18,6 +18,7 @@ class Pkg(ConanFile): 'arrow*:with_utf8proc': 'False', 'arrow*:with_zstd': 'False', 'arrow*:with_bz2': 'False', + 'arrow*:with_lz4': 'True', 'arrow*:with_thrift': 'True', 'arrow*:with_boost': 'True', 'boost*:without_container': 'True', diff --git a/corsika/detail/modules/radio/RadioProcess.inl b/corsika/detail/modules/radio/RadioProcess.inl index b77b78cb6f1b687c75144efe845e84ea71ac98cd..58e395051be5773fa6243932aedcd3f6e7f29508 100644 --- a/corsika/detail/modules/radio/RadioProcess.inl +++ b/corsika/detail/modules/radio/RadioProcess.inl @@ -72,6 +72,10 @@ namespace corsika { // setup the streamer output_.initStreamer((directory / ("observers.parquet")).string()); + + // enable compression with the default level + output_.enableCompression(); + // LCOV_EXCL_START // build the schema output_.addField("Time", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE, diff --git a/corsika/detail/modules/writers/EnergyLossWriterParquet.inl b/corsika/detail/modules/writers/EnergyLossWriterParquet.inl index 55b301f509bc094a878e529e8ebb58360dc6568a..d58e5423a6e63230e86c858af49e44a22d481da2 100644 --- a/corsika/detail/modules/writers/EnergyLossWriterParquet.inl +++ b/corsika/detail/modules/writers/EnergyLossWriterParquet.inl @@ -31,7 +31,7 @@ namespace corsika { output_.initStreamer((directory / "dEdX.parquet").string()); // enable compression with the default level - // output_.enableCompression(); + output_.enableCompression(); // build the schema output_.addField("X", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, diff --git a/corsika/detail/modules/writers/InteractionWriter.inl b/corsika/detail/modules/writers/InteractionWriter.inl index e4d23bd50f587f25454c73f574175a988f3d07e5..f443ecd105b412227b99ac67d8b0f63021fe056b 100644 --- a/corsika/detail/modules/writers/InteractionWriter.inl +++ b/corsika/detail/modules/writers/InteractionWriter.inl @@ -102,6 +102,9 @@ namespace corsika { boost::filesystem::path const& directory) { output_.initStreamer((directory / ("interactions.parquet")).string()); + // enable compression with the default level + output_.enableCompression(); + output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, parquet::ConvertedType::INT_32); output_.addField("px", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, diff --git a/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl b/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl index 2e5801fee1882cc0ee44c522ae63fc1b205a5f4b..1fb03268679933b3d54f0add8f27a1f891ed3e4b 100644 --- a/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl +++ b/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl @@ -31,7 +31,7 @@ namespace corsika { output_.initStreamer((directory / "profile.parquet").string()); // enable compression with the default level - // output_.enableCompression(); + output_.enableCompression(); // build the schema output_.addField("X", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, diff --git a/corsika/detail/modules/writers/ParticleWriterParquet.inl b/corsika/detail/modules/writers/ParticleWriterParquet.inl index 640c1c704b783cd4bd2bcbaab2391abd5817d931..b80fb4608efb2877b71cd00babe77a4ace3b0b8f 100644 --- a/corsika/detail/modules/writers/ParticleWriterParquet.inl +++ b/corsika/detail/modules/writers/ParticleWriterParquet.inl @@ -24,7 +24,7 @@ namespace corsika { output_.initStreamer((directory / "particles.parquet").string()); // enable compression with the default level - // output_.enableCompression(); + output_.enableCompression(); // build the schema output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, diff --git a/corsika/detail/modules/writers/TrackWriterParquet.inl b/corsika/detail/modules/writers/TrackWriterParquet.inl index 97624dcc4540d7893002e1c1bed54c012e999842..a529c319ad140d554a535f95e7bcc2a102ab1394 100644 --- a/corsika/detail/modules/writers/TrackWriterParquet.inl +++ b/corsika/detail/modules/writers/TrackWriterParquet.inl @@ -20,6 +20,9 @@ namespace corsika { // setup the streamer output_.initStreamer((directory / "tracks.parquet").string()); + // enable compression with the default level + output_.enableCompression(); + // build the schema output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32, parquet::ConvertedType::INT_32); diff --git a/corsika/detail/output/OutputManager.inl b/corsika/detail/output/OutputManager.inl index e7bebabe1e4415132a7942be93fd139ca784de7b..7ccc3e15049a3ab6ce6f58c9fe77fe05e99d8f2b 100644 --- a/corsika/detail/output/OutputManager.inl +++ b/corsika/detail/output/OutputManager.inl @@ -23,9 +23,11 @@ namespace corsika { inline OutputManager::OutputManager(std::string const& dir_path, const long& vseed = 0, - std::string const& vargs = "") + std::string const& vargs = "", + bool useCompression = false) : root_(dir_path) , cmnd_line_args_(vargs) + , useCompression_(useCompression) , count_(0) , seed_(vseed) { @@ -89,6 +91,26 @@ namespace corsika { // make sure that we gracefully close all the outputs. This is a supported // method of operation so we don't issue a warning here if (state_ == OutputState::LibraryReady) { endOfLibrary(); } + + if (useCompression_) { + auto const parent = root_.parent_path(); + auto const final_part = root_.filename(); + + // Ensure we use "./" if the parent path is empty + std::string parent_path = parent.empty() ? "./" : parent.string() + "/"; + + std::string const cmd = "tar cf " + parent_path + final_part.string() + ".tar -C " + + parent_path + " " + final_part.string(); + CORSIKA_LOG_INFO("Compressing output directory using: {}", cmd.c_str()); + int const returnCode = std::system(cmd.c_str()); + + if (returnCode) { + CORSIKA_LOG_ERROR("Compression returned with error code {}", returnCode); + } else { + // remove the original directory + boost::filesystem::remove_all(root_); + } + } } inline int OutputManager::getEventId() const { return count_; } @@ -232,6 +254,5 @@ namespace corsika { // and the library has finished state_ = OutputState::LibraryFinished; - } // namespace corsika - + } } // namespace corsika diff --git a/corsika/output/OutputManager.hpp b/corsika/output/OutputManager.hpp index dbadf4508445c27526f60817beebb5176427827a..a169c9999e162c6b98fcf4658e70920f0f490ec7 100644 --- a/corsika/output/OutputManager.hpp +++ b/corsika/output/OutputManager.hpp @@ -40,7 +40,7 @@ namespace corsika { * @param input_args The command line arguments at runtime (written to summary file) */ OutputManager(std::string const& dir_path, const long& vseed, - std::string const& input_args); + std::string const& input_args, bool useCompression); /** * Handle graceful closure of the outputs upon destruction. @@ -108,6 +108,7 @@ namespace corsika { boost::filesystem::path root_; ///< The unique output directory. OutputState state_{OutputState::NoInit}; ///< The current state of this manager. std::string const cmnd_line_args_; ///< The command line arguments used in this run + bool useCompression_; ///< compress the files to tarball int count_{0}; ///< The current ID of this shower. long seed_{0}; std::chrono::time_point<std::chrono::system_clock> const start_time{ diff --git a/corsika/output/ParquetStreamer.hpp b/corsika/output/ParquetStreamer.hpp index dcf1f547f72338b45459583a591150487de937c8..e27ad9a1a662da4ce92d6df893fe3b2075319ae8 100644 --- a/corsika/output/ParquetStreamer.hpp +++ b/corsika/output/ParquetStreamer.hpp @@ -47,7 +47,7 @@ namespace corsika { /** * Enable compression for this streamer. */ - void enableCompression(int const level = 3); + void enableCompression(int const level = 9); /** * Finalize the streamer construction. diff --git a/python/corsika/io/library.py b/python/corsika/io/library.py index ad1f520524f2a685ab1c8d37b146fec3fe3fce12..e3fadfa9e62a3701882d983a988a74f7c99531a1 100644 --- a/python/corsika/io/library.py +++ b/python/corsika/io/library.py @@ -11,6 +11,8 @@ import logging import os import os.path as op +import tarfile +import tempfile from typing import Any, Dict, List, Optional, Union import yaml @@ -38,7 +40,22 @@ class Library(object): If `path` does not contain a valid CORSIKA8 library. """ - # check that this is a valid library + self.tempDir = None + + # If the path is a tarball, make a temp dir to open the file into + if path.endswith(".tar"): + tar = tarfile.open(path, "r") + self.tempDir = tempfile.TemporaryDirectory(prefix="c8_temp_dir_") + msg = f"Unzipping corsika library {path} to {self.tempDir.name}" + logging.getLogger("corsika").info(msg) + tar.extractall(self.tempDir.name) + tar.close() + + path = op.join( + self.tempDir.name, os.path.basename(path).replace(".tar", "") + ) + + # Check that this is a valid library if not self.__valid_library(path): raise ValueError(f"'{path}' does not contain a valid CORSIKA8 library.") @@ -48,8 +65,9 @@ class Library(object): output_dirs = None # load the config and summary files - self.config = self.load_config(path) - self.summary = self.load_summary(path) + self.config = self.load_config(self.path) + self.summary = self.load_summary(self.path) + if self.summary is None: msg = f"Missing summary file in '{path}'." msg += " The simulation may not have finished. Will not load library" @@ -66,7 +84,11 @@ class Library(object): logging.getLogger("corsika").debug(msg) # build the list of outputs - self.__outputs = self.__build_outputs(path, output_dirs) + self.__outputs = self.__build_outputs(self.path, output_dirs) + + def __del__(self) -> None: + if self.tempDir is not None: + self.tempDir.cleanup() @property def names(self) -> List[str]: diff --git a/python/examples/radio_emission.py b/python/examples/radio_emission.py index 7d830e358afc117b90f705189a8939a7a421ba05..5148d09a7b6dd2d492979c95bc6998f48b30598a 100644 --- a/python/examples/radio_emission.py +++ b/python/examples/radio_emission.py @@ -153,4 +153,3 @@ for ishower in range(n_showers): plot_path = os.path.join(args.output_dir, f"ObserverWaveforms_Sh{ishower}.png") print("Saving", plot_path) fig.savefig(plot_path, bbox_inches="tight") - diff --git a/tests/output/testOutputManager.cpp b/tests/output/testOutputManager.cpp index b9e9e3fd09267b5c8f68be9f3fd9fa6738fb2254..fc0a562d2ad9a2c693bfec0cc9fd82c1f969c8ac 100644 --- a/tests/output/testOutputManager.cpp +++ b/tests/output/testOutputManager.cpp @@ -68,7 +68,7 @@ TEST_CASE("OutputManager") { } // output manager performs nothing, no action, just interface - OutputManager output("./out_test/check", 0, ""); + OutputManager output("./out_test/check", 0, "", false); CHECK(boost::filesystem::is_directory("./out_test/check")); @@ -98,6 +98,35 @@ TEST_CASE("OutputManager") { test.endLibrary_ = false; } + SECTION("compression") { + std::string const outputDir = "./out_compressed"; + + // preparation + if (boost::filesystem::exists(outputDir)) { + boost::filesystem::remove_all(outputDir); + } + + // We make a pointer here because the compression happens at deconstruction + OutputManager* output = new OutputManager(outputDir, 0, "", true); + CHECK(boost::filesystem::is_directory(outputDir)); + CHECK( + !boost::filesystem::exists(outputDir + ".tar")); // compressed file does NOT exist + + // Make an output and open/close shower/lib + DummyOutput test; + output->add("test", test); + output->startOfLibrary(); + output->startOfShower(); + output->endOfShower(); + output->endOfLibrary(); + + // Ensure compression happens at deconstruction + CHECK( + !boost::filesystem::exists(outputDir + ".tar")); // compressed file does NOT exist + delete output; + CHECK(boost::filesystem::exists(outputDir + ".tar")); // compressed file DOES exist + } + SECTION("auto-write") { // preparation @@ -106,7 +135,7 @@ TEST_CASE("OutputManager") { } // output manager performs nothing, no action, just interface - OutputManager* output = new OutputManager("./out_test/check", 0, ""); + OutputManager* output = new OutputManager("./out_test/check", 0, "", false); CHECK(boost::filesystem::is_directory("./out_test/check")); @@ -137,8 +166,8 @@ TEST_CASE("OutputManager") { } // output manager performs nothing, no action, just interface - OutputManager output("./out_test/check", 0, ""); - CHECK_THROWS(new OutputManager("./out_test/check", 0, "")); + OutputManager output("./out_test/check", 0, "", false); + CHECK_THROWS(new OutputManager("./out_test/check", 0, "", false)); CHECK_THROWS(output.endOfLibrary()); diff --git a/tests/output/testParquetStreamer.cpp b/tests/output/testParquetStreamer.cpp index 5d1a9c677184a58e674aded5ee1fc12c62dbf235..569bdf5e9fd762c7f2502d30004cfadce3f7bef0 100644 --- a/tests/output/testParquetStreamer.cpp +++ b/tests/output/testParquetStreamer.cpp @@ -37,7 +37,7 @@ TEST_CASE("ParquetStreamer") { test.addField("testfloat", parquet::Repetition::REQUIRED, parquet::Type::FLOAT, parquet::ConvertedType::NONE); - // test.enableCompression(1); needs to be enabled via conan + test.enableCompression(1); test.buildStreamer(); CHECK(test.isInit());