From 79173d1b833664532741757c4efe48f46132b7ed Mon Sep 17 00:00:00 2001
From: Remy Prechelt <prechelt@hawaii.edu>
Date: Sat, 24 Oct 2020 23:27:58 -1000
Subject: [PATCH] Initial draft of output hierarchy and writers.

---
 .gitmodules                                   |   7 +
 CMakeLists.txt                                |   1 +
 Documentation/Examples/CMakeLists.txt         |   6 +
 Documentation/Examples/vertical_EAS.cc        |  19 +-
 Framework/Cascade/Cascade.h                   |  23 +-
 Outputs/BaseOutput.h                          |  58 +++++
 Outputs/CMakeLists.txt                        |  37 ++++
 Outputs/ObservationPlaneWriterParquet.h       |  76 +++++++
 Outputs/OutputManager.cc                      |  10 +
 Outputs/OutputManager.h                       | 205 ++++++++++++++++++
 Outputs/YAML.hpp                              |  22 ++
 Outputs/helpers/ParquetStreamer.h             |  62 ++++++
 Processes/ObservationPlane/CMakeLists.txt     |   2 +
 .../ObservationPlane/ObservationPlane.cc      |  71 ------
 Processes/ObservationPlane/ObservationPlane.h | 106 +++++++--
 .../ObservationPlane/testObservationPlane.cc  |   4 +-
 ThirdParty/CMakeLists.txt                     |  54 ++++-
 ThirdParty/arrow                              |   1 +
 ThirdParty/yaml-cpp                           |   1 +
 19 files changed, 673 insertions(+), 92 deletions(-)
 create mode 100644 Outputs/BaseOutput.h
 create mode 100644 Outputs/CMakeLists.txt
 create mode 100644 Outputs/ObservationPlaneWriterParquet.h
 create mode 100644 Outputs/OutputManager.cc
 create mode 100644 Outputs/OutputManager.h
 create mode 100644 Outputs/YAML.hpp
 create mode 100644 Outputs/helpers/ParquetStreamer.h
 create mode 160000 ThirdParty/arrow
 create mode 160000 ThirdParty/yaml-cpp

diff --git a/.gitmodules b/.gitmodules
index 185d42cff..e437835b0 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -9,4 +9,11 @@
 [submodule "Processes/Proposal/PROPOSAL"]
 	path = Processes/Proposal/PROPOSAL
 	url = https://github.com/tudo-astroparticlephysics/PROPOSAL.git
+[submodule "ThirdParty/yaml-cpp"]
+	path = ThirdParty/yaml-cpp
+	url = https://github.com/jbeder/yaml-cpp
+	shallow = true
+[submodule "ThirdParty/arrow"]
+	path = ThirdParty/arrow
+	url = https://github.com/apache/arrow
 	shallow = true
diff --git a/CMakeLists.txt b/CMakeLists.txt
index af8d04ff8..06963c0e3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -156,6 +156,7 @@ add_subdirectory (Framework)
 add_subdirectory (Environment)
 add_subdirectory (Stack)
 add_subdirectory (Setup)
+add_subdirectory (Outputs)
 add_subdirectory (Processes)
 add_subdirectory (Documentation)
 add_subdirectory (Main)
diff --git a/Documentation/Examples/CMakeLists.txt b/Documentation/Examples/CMakeLists.txt
index 62c9d6bd3..f9cd5ca73 100644
--- a/Documentation/Examples/CMakeLists.txt
+++ b/Documentation/Examples/CMakeLists.txt
@@ -28,6 +28,7 @@ target_link_libraries (boundary_example
   CORSIKAparticles
   CORSIKAgeometry
   CORSIKAenvironment
+  CORSIKAoutput
   CORSIKAprocesssequence
   C8::ext::boost # boost::histogram
   )
@@ -53,6 +54,7 @@ target_link_libraries (cascade_example
   CORSIKAparticles
   CORSIKAgeometry
   CORSIKAenvironment
+  CORSIKAoutput
   CORSIKAprocesssequence
   )
 
@@ -80,6 +82,7 @@ if (Pythia8_FOUND)
     CORSIKAcascade
     CORSIKAparticles
     CORSIKAgeometry
+    CORSIKAoutput
     CORSIKAenvironment
     CORSIKAprocesssequence
     )
@@ -111,6 +114,7 @@ if (Pythia8_FOUND)
     CORSIKAcascade
     CORSIKAparticles
     CORSIKAgeometry
+    CORSIKAoutput
     CORSIKAenvironment
     CORSIKAprocesssequence
     CORSIKAhistory # for HistoryObservationPlane
@@ -125,6 +129,7 @@ target_link_libraries (stopping_power
   ProcessEnergyLoss
   CORSIKAparticles
   CORSIKAgeometry
+  CORSIKAoutput
   CORSIKAenvironment
   )
 
@@ -132,6 +137,7 @@ CORSIKA_ADD_EXAMPLE (staticsequence_example)
 target_link_libraries (staticsequence_example
   CORSIKAprocesssequence
   CORSIKAunits
+  CORSIKAoutput
   CORSIKAgeometry
   CORSIKAlogging)
 
diff --git a/Documentation/Examples/vertical_EAS.cc b/Documentation/Examples/vertical_EAS.cc
index 14eeea8b9..a4b3dd66c 100644
--- a/Documentation/Examples/vertical_EAS.cc
+++ b/Documentation/Examples/vertical_EAS.cc
@@ -21,6 +21,7 @@
 #include <corsika/geometry/Plane.h>
 #include <corsika/geometry/Sphere.h>
 #include <corsika/logging/Logging.h>
+#include <corsika/output/OutputManager.h>
 #include <corsika/process/ProcessSequence.h>
 #include <corsika/process/SwitchProcessSequence.h>
 #include <corsika/process/StackProcess.h>
@@ -55,6 +56,7 @@ using namespace corsika::particles;
 using namespace corsika::random;
 using namespace corsika::setup;
 using namespace corsika::geometry;
+using namespace corsika::output;
 using namespace corsika::environment;
 
 using namespace std;
@@ -166,6 +168,9 @@ int main(int argc, char** argv) {
   environment::ShowerAxis const showerAxis{injectionPos,
                                            (showerCore - injectionPos) * 1.5, env};
 
+  // setup the output manager
+  OutputManager outputs("vertical_EAS_outputs");
+
   // setup processes, decays and interactions
 
   process::sibyll::Interaction sibyll;
@@ -209,8 +214,16 @@ int main(int argc, char** argv) {
   process::longitudinal_profile::LongitudinalProfile longprof{showerAxis};
 
   Plane const obsPlane(showerCore, Vector<dimensionless_d>(rootCS, {0., 0., 1.}));
-  process::observation_plane::ObservationPlane observationLevel(obsPlane,
-                                                                "particles.dat");
+  process::observation_plane::ObservationPlane observationLevel(obsPlane, false,
+                                                                "particles");
+
+  // outputs.Create<process::observation_plane::ObservationPlane>("particles", obsPlane, false, "particles");
+
+  // auto observationLevel = outputs.Get<process::observation_plane::ObservationPlane>("plane");
+
+  // register the observation plane with the manager
+  outputs.Register("obsplane", observationLevel);
+
 
   process::UrQMD::UrQMD urqmd;
   process::interaction_counter::InteractionCounter urqmdCounted{urqmd};
@@ -237,7 +250,7 @@ int main(int argc, char** argv) {
 
   // define air shower object, run simulation
   tracking_line::TrackingLine tracking;
-  cascade::Cascade EAS(env, tracking, sequence, stack);
+  cascade::Cascade EAS(env, tracking, sequence, stack, outputs);
 
   // to fix the point of first interaction, uncomment the following two lines:
   //  EAS.forceInteraction();
diff --git a/Framework/Cascade/Cascade.h b/Framework/Cascade/Cascade.h
index 43a527543..aabf11d40 100644
--- a/Framework/Cascade/Cascade.h
+++ b/Framework/Cascade/Cascade.h
@@ -60,6 +60,7 @@ namespace corsika::cascade {
    */
 
   template <typename TTracking, typename TProcessList, typename TStack,
+            typename TOutput,
             /*
               TStackView is needed as explicit template parameter because
               of issue 161 and the
@@ -92,16 +93,18 @@ namespace corsika::cascade {
      * list of physics processes for configuration at construct time.
      */
     Cascade(corsika::environment::Environment<MediumInterface> const& env, TTracking& tr,
-            TProcessList& pl, TStack& stack)
+            TProcessList& pl, TStack& stack, TOutput& output)
         : environment_(env)
         , tracking_(tr)
         , process_sequence_(pl)
         , stack_(stack)
+          , output_(output)
         , count_(0) {
       C8LOG_INFO(c8_ascii_);
       if constexpr (TStackView::has_event) {
         C8LOG_INFO(" - With full cascade HISTORY.");
       }
+
     }
 
     /**
@@ -111,6 +114,9 @@ namespace corsika::cascade {
     void Run() {
       setNodes();
 
+      // start a new event
+      fOutput.StartOfEvent();
+
       while (!stack_.IsEmpty()) {
         while (!stack_.IsEmpty()) {
           C8LOG_TRACE("Stack: {}", stack_.as_string());
@@ -128,6 +134,9 @@ namespace corsika::cascade {
         // thus, the double loop
         // DoCascadeEquations();
       }
+
+      // and end the event
+      fOutput.EndOfEvent();
     }
 
     /**
@@ -336,6 +345,7 @@ namespace corsika::cascade {
       return returnCode;
     }
 
+<<<<<<< HEAD
     /**
      * set the nodes for all particles on the stack according to their numerical
      * position
@@ -367,5 +377,16 @@ Y8,            Y8,        ,8P  88    `8b            `8b  88  88P   Y8b       d8"
   `"Y8888Y"'     `"Y8888Y"'    88      `8b   "Y88888P"   88  88       Y8b  d8'          `8b       "Y88888P"
 	)V0G0N";
   };
+=======
+  private:
+    corsika::environment::Environment<MediumInterface> const& fEnvironment;
+    TTracking& fTracking;
+    TProcessList& fProcessSequence;
+    TStack& fStack;
+    TOutput& fOutput;
+    corsika::random::RNG& fRNG =
+        corsika::random::RNGManager::GetInstance().GetRandomStream("cascade");
+  }; // namespace corsika::cascade
+>>>>>>> d1b56063... Initial draft of output hierarchy and writers.
 
 } // namespace corsika::cascade
diff --git a/Outputs/BaseOutput.h b/Outputs/BaseOutput.h
new file mode 100644
index 000000000..e5813eff1
--- /dev/null
+++ b/Outputs/BaseOutput.h
@@ -0,0 +1,58 @@
+/*
+ * (c) Copyright 2020 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+#pragma once
+
+#include <memory>
+
+#include <filesystem>
+
+#include <yaml-cpp/yaml.h>
+
+namespace corsika::output {
+
+  /**
+   * This is the base class for all outputs so that they
+   * can be stored in homogeneous containers.
+   */
+  class BaseOutput : public std::enable_shared_from_this<BaseOutput> {
+
+    int const event_{0}; ///< The current event number.
+
+  protected:
+    BaseOutput()
+        : event_(0){};
+
+  public:
+    /**
+     * Called at the start of each run.
+     */
+    virtual void StartOfRun(std::filesystem::path const& directory) = 0;
+
+    /**
+     * Called at the start of each event/shower.
+     */
+    virtual void StartOfEvent() = 0;
+
+    /**
+     * Called at the end of each event/shower.
+     */
+    virtual void EndOfEvent() = 0;
+
+    /**
+     * Called at the end of each run.
+     */
+    virtual void EndOfRun() = 0;
+
+    /**
+     * Get the configuration of this output.
+     */
+    // virtual void WriteConfig(YAML::Emitter&) const = 0;
+    virtual YAML::Node GetConfig() const = 0;
+  };
+
+} // namespace corsika::output
diff --git a/Outputs/CMakeLists.txt b/Outputs/CMakeLists.txt
new file mode 100644
index 000000000..7b0b3192a
--- /dev/null
+++ b/Outputs/CMakeLists.txt
@@ -0,0 +1,37 @@
+ set (
+  MODEL_HEADERS
+  BaseOutput.h
+  OutputManager.h
+  ObservationPlaneWriterParquet.h
+  )
+
+set (
+  MODEL_SOURCES
+  OutputManager.cc
+  )
+
+set (
+  MODEL_NAMESPACE
+  corsika/output/
+  )
+
+add_library (CORSIKAoutput STATIC ${MODEL_SOURCES})
+CORSIKA_COPY_HEADERS_TO_NAMESPACE (CORSIKAoutput ${MODEL_NAMESPACE} ${MODEL_HEADERS})
+
+#target dependencies on other libraries(also the header onlys)
+target_link_libraries (
+  CORSIKAoutput
+  CORSIKAgeometry
+  CORSIKAlogging
+  CORSIKAthirdparty
+  CORSIKAprocesssequence
+  )
+
+target_include_directories (
+  CORSIKAoutput
+  INTERFACE
+  $<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/include>
+  $<INSTALL_INTERFACE:include/include>
+  )
+
+install (FILES ${MODEL_HEADERS} DESTINATION include/${MODEL_NAMESPACE})
diff --git a/Outputs/ObservationPlaneWriterParquet.h b/Outputs/ObservationPlaneWriterParquet.h
new file mode 100644
index 000000000..3771cd63a
--- /dev/null
+++ b/Outputs/ObservationPlaneWriterParquet.h
@@ -0,0 +1,76 @@
+/*
+ * (c) Copyright 2020 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/output/BaseOutput.h>
+#include <corsika/particles/ParticleProperties.h>
+#include <corsika/units/PhysicalUnits.h>
+
+#include <arrow/io/file.h>
+#include <parquet/arrow/schema.h>
+#include <parquet/stream_writer.h>
+
+namespace corsika::output {
+
+  class ObservationPlaneWriterParquet : public BaseOutput, private ParquetStreamer {
+
+  public:
+    /**
+     * Write an observation plane to a directory.
+     *
+     * @param name    The name of this output.
+     */
+    ObservationPlaneWriterParquet(std::string const& name)
+        : name_(name){};
+
+    /**
+     * Called at the start of each run.
+     */
+    void StartOfRun(std::filesystem::path const& directory) final {
+
+      // (directory / "particles.parquet").string()
+
+      // construct the schema
+      auto schema = arrow::schema({arrow::field("pdg", arrow::int64()),
+                                   arrow::field("energy", arrow::float64()),
+                                   arrow::field("radius", arrow::float64())});
+
+      auto properties = builder.build();
+    }
+
+    /**
+     * Called at the start of each event/shower.
+     */
+    void StartOfEvent() final {}
+
+    /**
+     * Called at the end of each event/shower.
+     */
+    void EndOfEvent() final {}
+
+    /**
+     * Called at the end of each run.
+     */
+    void EndOfRun() final {}
+
+  protected:
+    void Write(particles::Code const& pid, units::si::HEPEnergyType const& energy,
+               units::si::LengthType const& distance) {
+
+      // outputStream_ << static_cast<int>(particles::GetPDG(pid)) << ' '
+      //               << particle.GetEnergy() / 1_eV << ' '
+      //               << (trajectory.GetPosition(1) - plane_.GetCenter()).norm() / 1_m
+      //               << std::endl;
+    }
+
+    std::string const name_; ///< The name of this output.
+
+  }; // class ObservationPlaneWriterParquet
+
+} // namespace corsika::output
diff --git a/Outputs/OutputManager.cc b/Outputs/OutputManager.cc
new file mode 100644
index 000000000..4f983df9e
--- /dev/null
+++ b/Outputs/OutputManager.cc
@@ -0,0 +1,10 @@
+/*
+ * (c) Copyright 2020 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+
+#include <corsika/output/OutputManager.h>
diff --git a/Outputs/OutputManager.h b/Outputs/OutputManager.h
new file mode 100644
index 000000000..55afbdec6
--- /dev/null
+++ b/Outputs/OutputManager.h
@@ -0,0 +1,205 @@
+/*
+ * (c) Copyright 2020 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+#pragma once
+
+#include <algorithm>
+#include <fstream>
+#include <functional>
+
+#include <corsika/logging/Logging.h>
+#include <corsika/output/BaseOutput.h>
+
+namespace corsika::output {
+
+  /*!
+   * Manage random number generators.
+   */
+  class OutputManager final {
+
+    enum class OutputState {
+      RunNoInit,
+      RunInitialized,
+      RunInProgress,
+      RunFinished,
+    };
+
+    OutputState state_{OutputState::RunNoInit}; ///< The current state of this manager.
+    std::string const name_;                    ///< The name of this simulation file.
+    std::filesystem::path const root_; ///< The top-level directory for the output.
+    inline static auto logger{logging::GetLogger("output_manager")}; ///< A custom logger.
+
+    /**
+     * The outputs that have been registered with us.
+     */
+    // std::map<std::string, std::shared_ptr<BaseOutput>> outputs_;
+    std::map<std::string, std::reference_wrapper<BaseOutput>> outputs_;
+
+    /**
+     * Write a YAML-node to a file.
+     */
+    void WriteNode(YAML::Node const& node, std::filesystem::path const& path) const {
+
+      // construct a YAML emitter for this config file
+      YAML::Emitter out;
+
+      // and write the node to the output
+      out << node;
+
+      // open the output file - this is <output name>.json
+      std::ofstream file(path.string());
+
+      // dump the JSON to the file
+      file << out.c_str() << std::endl;
+
+      // and close the efile
+      file.close();
+    }
+
+    /**
+     * Write the top-level config of this simulation.
+     */
+    void WriteTopLevelConfig() const {
+
+      YAML::Node config;
+
+      // some basic info
+      config["name"] = name_;               // the simulation name
+      config["version"] = "8.0.0-prealpha"; // the current version
+
+      // write the node to a file
+      WriteNode(config, root_ / (name_ + ".yaml"));
+    }
+
+    void InitOutput(std::string const& name) const {
+      // construct the path to this directory
+      auto const path{root_ / name};
+
+      // create the directory for this process.
+      std::filesystem::create_directory(path);
+
+      // write the config for this output to the file
+      WriteNode(outputs_.at(name).get().GetConfig(), path / (name + ".yaml"));
+    }
+
+  public:
+    /**
+     * Construct an OutputManager instance with a name in a given directory.
+     *
+     * @param name    The name of this output collection.
+     * @param dir     The directory where the output directory will be stored.
+     */
+    OutputManager(std::string const& name,
+                  std::filesystem::path const& dir = std::filesystem::current_path())
+        : name_(name)
+        , root_(dir / name) {
+
+      // check if this directory already exists
+      if (std::filesystem::exists(root_)) {
+        logger->warn(
+            "Output directory '{}' already exists! This is currenty not supported.",
+            root_.string());
+        throw std::runtime_error("Output directory already exists.");
+      }
+
+      // construct the directory for this run
+      std::filesystem::create_directory(root_);
+
+      // write the top level config file
+      WriteTopLevelConfig();
+
+      // we are now initialized
+      state_ = OutputState::RunInitialized;
+    }
+
+    /**
+     * Handle graceful closure of the outputs upon destruction.
+     */
+    ~OutputManager() {
+
+      // if we are being destructed but EndOfRun() has not been called,
+      // make sure that we gracefully close all the outputs
+      if (state_ == OutputState::RunInProgress || state_ == OutputState::RunInitialized) {
+        EndOfRun();
+      }
+    }
+
+    /**
+     * Register an existing output to this manager.
+     *
+     * @param name    The unique name of this output.
+     * @param args... These are perfect forwarded to the
+     *                constructor of the output.
+     */
+    template <typename TOutput>
+    void Register(std::string const name, TOutput& output) {
+
+      // check if that name is already in the map
+      if (outputs_.count(name) > 0) {
+        logger->warn("'{}' is already registered. All outputs must have unique names.",
+                     name);
+        return;
+      }
+
+      // if we get here, the name is not already in the map
+      // so we create the output and register it into the map
+      outputs_.insert(std::make_pair(name, std::ref(output)));
+
+      // and initialize this output
+      InitOutput(name);
+    }
+
+    /**
+     * Called at the start of each run.
+     */
+    void StartOfRun() {
+      for (auto& [name, output] : outputs_) {
+
+        // construct the path to this output subdirectory
+        auto const path{root_ / name};
+
+        // and start the run
+        output.get().StartOfRun(path);
+      }
+
+      // we have now started running
+      state_ = OutputState::RunInProgress;
+    };
+
+    /**
+     * Called at the start of each event/shower.
+     */
+    void StartOfEvent() {
+
+      // if this is called but we are still in the initialized state,
+      // make sure that we transition to RunInProgress
+      if (state_ == OutputState::RunInitialized) { StartOfRun(); }
+
+      // now start the event for all the outputs
+      for (auto& [name, output] : outputs_) { output.get().StartOfEvent(); }
+    }
+
+    /**
+     * Called at the end of each event/shower.
+     */
+    void EndOfEvent() {
+      for (auto& [name, output] : outputs_) { output.get().EndOfEvent(); }
+    }
+
+    /**
+     * Called at the end of each run.
+     */
+    void EndOfRun() {
+      for (auto& [name, output] : outputs_) { output.get().EndOfRun(); }
+
+      // and the run has finished
+      state_ = OutputState::RunFinished;
+    }
+
+  }; // class OutputManager
+
+} // namespace corsika::output
diff --git a/Outputs/YAML.hpp b/Outputs/YAML.hpp
new file mode 100644
index 000000000..9d1232d91
--- /dev/null
+++ b/Outputs/YAML.hpp
@@ -0,0 +1,22 @@
+/*
+ * (c) Copyright 2020 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+#pragma once
+
+#include <corsika/geometry/QuantityVector.h>
+
+namespace corsika {
+
+  template <typename TDim>
+  YAML::Emitter& operator<<(YAML::Emitter& out,
+                            geometry::QuantityVector<TDim> const& vec) {
+    out << YAML::Flow;
+    out << YAML::BeginSeq << v.x << v.y << v.z << YAML::EndSeq;
+    return out;
+  }
+
+} // namespace corsika
diff --git a/Outputs/helpers/ParquetStreamer.h b/Outputs/helpers/ParquetStreamer.h
new file mode 100644
index 000000000..ba3c8e9cd
--- /dev/null
+++ b/Outputs/helpers/ParquetStreamer.h
@@ -0,0 +1,62 @@
+/*
+ * (c) Copyright 2019 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+namespace corsika::output::helper {
+
+  /**
+   * This class automates the construction of simple tabular
+   * Parquet files using the parquet::StreamWriter.
+   */
+  class ParquetStreamer final {
+
+  public:
+    ParquetStreamer(std::string const& filepath) {
+
+      // open the file and connect it to our pointer
+      PARQUET_ASSIGN_OR_THROW(outfile_, arrow::io::FileOutputStream::Open(filepath));
+
+      // the default builder settings
+      builder_.created_by("CORSIKA8");
+    };
+
+    /**
+     * Add a field to this streamer.
+     */
+    template <typename TField>
+    void AddField(T const& field) {
+      nodes_.push_back(field);
+    }
+
+    /**
+     * Finalize the streamer construction.
+     */
+    void Build() {
+
+      // build the top level schema
+      auto schema = std::static_pointer_cast<parquet::schema::GroupNode>(
+          parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED,
+                                           nodes_));
+
+      // and build the writer
+      writer_ = parquet::StreamWriter(
+          parquet::ParquetFileWriter::Open(outfile_, schema, builder_->build()));
+    }
+
+  protected:
+    parquet::StreamWriter writer_;
+    parquet::StreamWriter stream_;               ///< The stream writer to 'outfile'
+    parquet::WriterProperties::Builder builder_; ///< The writer properties builder.
+                                                 ///
+  private:
+    parquet::schema::NodeVector nodes_;
+    std::shared_ptr<arrow::io::FileOutputStream> outfile_; ///< The output file.
+
+  }; // class ParquetHelper
+} // namespace corsika::output::helper
diff --git a/Processes/ObservationPlane/CMakeLists.txt b/Processes/ObservationPlane/CMakeLists.txt
index 800c5dbd6..99da01c50 100644
--- a/Processes/ObservationPlane/CMakeLists.txt
+++ b/Processes/ObservationPlane/CMakeLists.txt
@@ -20,6 +20,7 @@ CORSIKA_COPY_HEADERS_TO_NAMESPACE (ProcessObservationPlane ${MODEL_NAMESPACE} ${
 target_link_libraries (
   ProcessObservationPlane
   CORSIKAgeometry
+  CORSIKAthirdparty
   CORSIKAprocesssequence
   CORSIKAlogging
   )
@@ -40,5 +41,6 @@ target_link_libraries (
   testObservationPlane
   ProcessObservationPlane
   CORSIKAstackinterface
+  CORSIKAoutput
   CORSIKAthirdparty # for catch2
 )
diff --git a/Processes/ObservationPlane/ObservationPlane.cc b/Processes/ObservationPlane/ObservationPlane.cc
index ab3a80232..80b40eec8 100644
--- a/Processes/ObservationPlane/ObservationPlane.cc
+++ b/Processes/ObservationPlane/ObservationPlane.cc
@@ -9,76 +9,5 @@
 #include <corsika/process/observation_plane/ObservationPlane.h>
 #include <corsika/logging/Logging.h>
 
-#include <fstream>
-
 using namespace corsika::process::observation_plane;
 using namespace corsika::units::si;
-
-ObservationPlane::ObservationPlane(geometry::Plane const& obsPlane,
-                                   std::string const& filename, bool deleteOnHit)
-    : plane_(obsPlane)
-    , outputStream_(filename)
-    , deleteOnHit_(deleteOnHit)
-    , energy_ground_(0_GeV)
-    , count_ground_(0) {
-  outputStream_ << "#PDG code, energy / eV, distance to center / m" << std::endl;
-}
-
-corsika::process::EProcessReturn ObservationPlane::DoContinuous(
-    setup::Stack::ParticleType& particle, setup::Trajectory const& trajectory) {
-  TimeType const timeOfIntersection =
-      (plane_.GetCenter() - trajectory.GetR0()).dot(plane_.GetNormal()) /
-      trajectory.GetV0().dot(plane_.GetNormal());
-
-  if (timeOfIntersection < TimeType::zero()) { return process::EProcessReturn::eOk; }
-
-  if (plane_.IsAbove(trajectory.GetR0()) == plane_.IsAbove(trajectory.GetPosition(1))) {
-    return process::EProcessReturn::eOk;
-  }
-
-  const auto energy = particle.GetEnergy();
-  outputStream_ << static_cast<int>(particles::GetPDG(particle.GetPID())) << ' '
-                << energy / 1_eV << ' '
-                << (trajectory.GetPosition(1) - plane_.GetCenter()).norm() / 1_m
-                << std::endl;
-
-  if (deleteOnHit_) {
-    count_ground_++;
-    energy_ground_ += energy;
-    particle.Delete();
-    return process::EProcessReturn::eParticleAbsorbed;
-  } else {
-    return process::EProcessReturn::eOk;
-  }
-}
-
-LengthType ObservationPlane::MaxStepLength(setup::Stack::ParticleType const&,
-                                           setup::Trajectory const& trajectory) {
-  TimeType const timeOfIntersection =
-      (plane_.GetCenter() - trajectory.GetR0()).dot(plane_.GetNormal()) /
-      trajectory.GetV0().dot(plane_.GetNormal());
-
-  if (timeOfIntersection < TimeType::zero()) {
-    return std::numeric_limits<double>::infinity() * 1_m;
-  }
-
-  auto const pointOfIntersection = trajectory.GetPosition(timeOfIntersection);
-  auto dist = (trajectory.GetR0() - pointOfIntersection).norm() * 1.0001;
-  C8LOG_TRACE("ObservationPlane::MaxStepLength l={} m", dist / 1_m);
-  return dist;
-}
-
-void ObservationPlane::ShowResults() const {
-  C8LOG_INFO(
-      " ******************************\n"
-      " ObservationPlane: \n"
-      " energy in ground (GeV)     :  {}\n"
-      " no. of particles in ground :  {}\n"
-      " ******************************",
-      energy_ground_ / 1_GeV, count_ground_);
-}
-
-void ObservationPlane::Reset() {
-  energy_ground_ = 0_GeV;
-  count_ground_ = 0;
-}
diff --git a/Processes/ObservationPlane/ObservationPlane.h b/Processes/ObservationPlane/ObservationPlane.h
index 86c5ba0f2..986375173 100644
--- a/Processes/ObservationPlane/ObservationPlane.h
+++ b/Processes/ObservationPlane/ObservationPlane.h
@@ -9,12 +9,12 @@
 #pragma once
 
 #include <corsika/geometry/Plane.h>
+#include <corsika/output/ObservationPlaneWriterParquet.h>
 #include <corsika/process/ContinuousProcess.h>
 #include <corsika/setup/SetupStack.h>
 #include <corsika/setup/SetupTrajectory.h>
 #include <corsika/units/PhysicalUnits.h>
-
-#include <fstream>
+#include <corsika/logging/Logging.h>
 
 namespace corsika::process::observation_plane {
 
@@ -23,26 +23,106 @@ namespace corsika::process::observation_plane {
    * central point of the plane into its output file. The particles are considered
    * "absorbed" afterwards.
    */
-  class ObservationPlane : public corsika::process::ContinuousProcess<ObservationPlane> {
+  template <typename TOutputWriter = output::ObservationPlaneWriterParquet>
+  class ObservationPlane final
+      : public process::ContinuousProcess<ObservationPlane<TOutputWriter>>,
+        public TOutputWriter {
 
   public:
-    ObservationPlane(geometry::Plane const&, std::string const&, bool = true);
+    template <typename... TArgs>
+    ObservationPlane(geometry::Plane const& plane, bool const deleteOnHit,
+                     TArgs&&... args)
+        : TOutputWriter(args...)
+        , plane_(plane)
+        , deleteOnHit_(deleteOnHit)
+        , energy_ground_(0_GeV)
+        , count_ground_(0) {}
+
+    process::EProcessReturn DoContinuous(setup::Stack::ParticleType const& particle,
+                                         setup::Trajectory const& trajectory) {
+
+      using namespace units::si;
+      TimeType const timeOfIntersection =
+          (plane_.GetCenter() - trajectory.GetR0()).dot(plane_.GetNormal()) /
+          trajectory.GetV0().dot(plane_.GetNormal());
+
+      if (timeOfIntersection < TimeType::zero()) { return process::EProcessReturn::eOk; }
+
+      if (plane_.IsAbove(trajectory.GetR0()) ==
+          plane_.IsAbove(trajectory.GetPosition(1))) {
+        return process::EProcessReturn::eOk;
+      }
+
+      // write the data to the output
+      this->Write(particle.GetPID(), particle.GetEnergy(),
+                  (trajectory.GetPosition(1) - plane_.GetCenter()).norm());
+
+      if (deleteOnHit_) { return process::EProcessReturn::eParticleAbsorbed; }
+      return process::EProcessReturn::eOk;
+    }
+
+    units::si::LengthType MaxStepLength(setup::Stack::ParticleType const&,
+                                        setup::Trajectory const& trajectory) {
+
+      using namespace units::si;
+      TimeType const timeOfIntersection =
+          (plane_.GetCenter() - trajectory.GetR0()).dot(plane_.GetNormal()) /
+          trajectory.GetV0().dot(plane_.GetNormal());
+
+      if (timeOfIntersection < TimeType::zero()) {
+        return std::numeric_limits<double>::infinity() * 1_m;
+      }
+
+      auto const pointOfIntersection = trajectory.GetPosition(timeOfIntersection);
+      return (trajectory.GetR0() - pointOfIntersection).norm() * 1.0001;
+    }
+
+    YAML::Node GetConfig() const {
+      using namespace units::si;
+
+      // construct the top-level node
+      YAML::Node node;
+
+      // basic info
+      node["name"] = this->name_;
+      node["type"] = "ObservationPlane";
+
+      // the center of the plane
+      auto const center{plane_.GetCenter()};
+      node["plane"]["center"].push_back(center.GetX() / 1_m);
+      node["plane"]["center"].push_back(center.GetY() / 1_m);
+      node["plane"]["center"].push_back(center.GetZ() / 1_m);
+      node["plane"]["center.units"] = "m";
+
+      // the normal vector
+      auto const normal{plane_.GetNormal().GetComponents()};
+      node["plane"]["normal"].push_back(normal.GetX().magnitude());
+      node["plane"]["normal"].push_back(normal.GetY().magnitude());
+      node["plane"]["normal"].push_back(normal.GetZ().magnitude());
+
+      node["delete_on_hit"] = deleteOnHit_;
 
-    corsika::process::EProcessReturn DoContinuous(
-        corsika::setup::Stack::ParticleType& vParticle,
-        corsika::setup::Trajectory const& vTrajectory);
+      return node;
+    }
 
-    corsika::units::si::LengthType MaxStepLength(
-        corsika::setup::Stack::ParticleType const&,
-        corsika::setup::Trajectory const& vTrajectory);
+void ObservationPlane::ShowResults() const {
+  C8LOG_INFO(
+      " ******************************\n"
+      " ObservationPlane: \n"
+      " energy in ground (GeV)     :  {}\n"
+      " no. of particles in ground :  {}\n"
+      " ******************************",
+      energy_ground_ / 1_GeV, count_ground_);
+}
 
-    void ShowResults() const;
-    void Reset();
+void ObservationPlane::Reset() {
+  energy_ground_ = 0_GeV;
+  count_ground_ = 0;
+}
     corsika::units::si::HEPEnergyType GetEnergyGround() const { return energy_ground_; }
 
   private:
     geometry::Plane const plane_;
-    std::ofstream outputStream_;
     bool const deleteOnHit_;
 
     units::si::HEPEnergyType energy_ground_ = 0 * units::si::electronvolt;
diff --git a/Processes/ObservationPlane/testObservationPlane.cc b/Processes/ObservationPlane/testObservationPlane.cc
index 7f3f183a6..0b54668f8 100644
--- a/Processes/ObservationPlane/testObservationPlane.cc
+++ b/Processes/ObservationPlane/testObservationPlane.cc
@@ -62,7 +62,7 @@ TEST_CASE("ContinuousProcess interface", "[proccesses][observation_plane]") {
 
     Plane const obsPlane(Point(rootCS, {0_m, 0_m, 0_m}),
                          Vector<dimensionless_d>(rootCS, {0., 0., 1.}));
-    ObservationPlane obs(obsPlane, "particles.dat", true);
+    ObservationPlane obs(obsPlane, true, "obsplane");
 
     const LengthType length = obs.MaxStepLength(particle, track);
     const process::EProcessReturn ret = obs.DoContinuous(particle, track);
@@ -83,7 +83,7 @@ TEST_CASE("ContinuousProcess interface", "[proccesses][observation_plane]") {
   SECTION("transparent plane") {
     Plane const obsPlane(Point(rootCS, {0_m, 0_m, 0_m}),
                          Vector<dimensionless_d>(rootCS, {0., 0., 1.}));
-    ObservationPlane obs(obsPlane, "particles.dat", false);
+    ObservationPlane obs(obsPlane, false, "obsplane");
 
     const LengthType length = obs.MaxStepLength(particle, track);
     const process::EProcessReturn ret = obs.DoContinuous(particle, track);
diff --git a/ThirdParty/CMakeLists.txt b/ThirdParty/CMakeLists.txt
index 19cc00b37..f27fa5ecf 100644
--- a/ThirdParty/CMakeLists.txt
+++ b/ThirdParty/CMakeLists.txt
@@ -34,7 +34,7 @@ message (STATUS "USE_BOOST_C8='${USE_BOOST_C8}'")
 
 add_library (C8::ext::boost INTERFACE IMPORTED GLOBAL)
 if ("x_${USE_BOOST_C8}" STREQUAL "x_SYSTEM")
-  find_package (Boost REQUIRED COMPONENTS mp11 iterator core format interval optional type_index histogram multi_array)
+  find_package (Boost REQUIRED COMPONENTS mp11 iterator core format interval optional type_index histogram multi_array filesystem)
 
   message (STATUS "Using system-level boost version ${Boost_VERSION} at ${Boost_INCLUDE_DIR}")
   set_target_properties (
@@ -304,8 +304,58 @@ endif (Boost_IOSTREAMS_FOUND)
 find_package (ZLIB QUIET)
 
 if (ZLIB_FOUND)
-  message (STATUS "Found ZLIB. Build cnpy for SaveHistograms")  
+  message (STATUS "Found ZLIB. Build cnpy for SaveHistograms")
+
+  # cnpy reports warnings and errors with our compiler settings
+  # We add the cnpy include as a SYSTEM directory *before*
+  # adding the subdirectory will silence cnpy-only warnings
+  target_include_directories(CORSIKAthirdparty
+  SYSTEM
+  INTERFACE
+  "${CMAKE_CURRENT_SOURCE_DIR}/cnpy/"
+)
+
+  # now we can add cnpy
   add_subdirectory (cnpy)
+
 else (ZLIB_FOUND)
   message (WARNING "Did not find ZLIB. Cannot build cnpy for SaveHistograms")  
 endif (ZLIB_FOUND)
+
+
+########## YAML CPP ##########
+
+# add YAML-CPP as a SYSTEM directory to ignore internal warnings/errors
+target_include_directories(CORSIKAthirdparty SYSTEM INTERFACE yaml-cpp/include/)
+
+# disable the unecessary parts of the build
+set(YAML_CPP_BUILD_TOOLS OFF CACHE BOOL "Disable building YAML parsing tools.")
+set(YAML_CPP_BUILD_CONTRIB OFF CACHE BOOL "Disable YAML contrib/ tools.")
+add_subdirectory (yaml-cpp)
+
+# and link it into CORSIKAthirdparty
+target_link_libraries(CORSIKAthirdparty INTERFACE yaml-cpp)
+
+########## Apache Arrow ##########
+
+# add_subdirectory(arrow/cpp/)
+
+# set(ARROW_COMPUTE OFF)
+# set(ARROW_CSV OFF)
+# set(ARROW_CUDA OFF)
+# set(ARROW_FLIGHT OFF)
+# set(ARROW_GANDIVA OFF)
+# set(ARROW_HDFS OFF)
+# set(ARROW_PLASMA OFF)
+# target_link_libraries(CORSIKAthirdparty INTERFACE arrow)
+
+
+target_include_directories(CORSIKAthirdparty
+  INTERFACE
+  "/home/rprechelt/.local/lib/python3.8/site-packages/pyarrow/include")
+target_link_directories(CORSIKAthirdparty
+  INTERFACE
+  "/home/rprechelt/.local/lib/python3.8/site-packages/pyarrow/")
+target_link_libraries(CORSIKAthirdparty
+  INTERFACE
+  arrow)
diff --git a/ThirdParty/arrow b/ThirdParty/arrow
new file mode 160000
index 000000000..04660f8af
--- /dev/null
+++ b/ThirdParty/arrow
@@ -0,0 +1 @@
+Subproject commit 04660f8afbf5bee052f8881b36b5c59a8c8ca7b5
diff --git a/ThirdParty/yaml-cpp b/ThirdParty/yaml-cpp
new file mode 160000
index 000000000..98acc5a88
--- /dev/null
+++ b/ThirdParty/yaml-cpp
@@ -0,0 +1 @@
+Subproject commit 98acc5a8874faab28b82c28936f4b400b389f5d6
-- 
GitLab