From 1b7f5c44210e77a940d99778baf38b7ee9087d07 Mon Sep 17 00:00:00 2001
From: ralfulrich <ralf.ulrich@kit.edu>
Date: Mon, 17 May 2021 07:44:22 +0200
Subject: [PATCH] improved and completed output machinery

---
 .../framework/core/ParticleProperties.inl     |   6 +
 .../detail/modules/LongitudinalProfile.inl    |  61 +-----
 corsika/detail/modules/ObservationPlane.inl   |  25 ++-
 corsika/detail/modules/ParticleCut.inl        | 104 ++++-----
 corsika/detail/modules/TrackWriter.inl        |  24 +-
 corsika/detail/modules/conex/CONEXhybrid.inl  |  50 ++++-
 .../modules/energy_loss/BetheBlochPDG.inl     | 138 ++++--------
 .../writers/EnergyLossWriterParquet.inl       | 205 ++++++++++++++++++
 .../LongitudinalProfileWriterParquet.inl      | 190 ++++++++++++++++
 .../writers/ObservationPlaneWriterParquet.inl |  57 -----
 .../modules/writers/ParticleWriterParquet.inl | 109 ++++++++++
 .../modules/writers/TrackWriterParquet.inl    |  16 +-
 corsika/detail/output/BaseOutput.inl          |  15 --
 corsika/detail/output/OutputManager.inl       | 200 +++++++----------
 corsika/detail/output/ParquetStreamer.inl     |   8 +-
 corsika/detail/output/YAMLStreamer.inl        |  29 +++
 corsika/framework/core/ParticleProperties.hpp |   3 +-
 corsika/framework/utility/FindXmax.hpp        | 120 ++++++++++
 corsika/modules/LongitudinalProfile.hpp       |  25 +--
 corsika/modules/ObservationPlane.hpp          |  13 +-
 corsika/modules/ParticleCut.hpp               |  51 +++--
 corsika/modules/TrackWriter.hpp               |  11 +-
 corsika/modules/conex/CONEXhybrid.hpp         |  51 ++++-
 corsika/modules/energy_loss/BetheBlochPDG.hpp |  35 +--
 .../modules/writers/EnergyLossWriterOff.hpp   |  69 ++++++
 .../writers/EnergyLossWriterParquet.hpp       | 102 +++++++++
 .../writers/LongitudinalProfileWriterOff.hpp  |  56 +++++
 .../LongitudinalProfileWriterParquet.hpp      | 103 +++++++++
 corsika/modules/writers/ParticleWriterOff.hpp |  39 ++++
 .../modules/writers/ParticleWriterParquet.hpp |  88 ++++++++
 ...neWriterParquet.hpp => TrackWriterOff.hpp} |  32 +--
 .../modules/writers/TrackWriterParquet.hpp    |   8 +-
 corsika/output/BaseOutput.hpp                 |  32 +--
 corsika/output/DummyOutputManager.hpp         |   2 +-
 corsika/output/NoOutput.hpp                   |   6 +-
 corsika/output/OutputManager.hpp              |  87 +++++---
 corsika/output/YAMLStreamer.hpp               |  30 +++
 tests/framework/testLogging.cpp               |   2 +-
 tests/modules/testObservationPlane.cpp        |   2 -
 tests/modules/testParticleCut.cpp             |  11 +-
 tests/output/testOutputManager.cpp            |  49 +----
 tests/output/testParquetStreamer.cpp          |   6 +-
 tests/output/testWriterObservationPlane.cpp   |  12 +-
 tests/output/testWriterTracks.cpp             |   7 +-
 44 files changed, 1635 insertions(+), 654 deletions(-)
 create mode 100644 corsika/detail/modules/writers/EnergyLossWriterParquet.inl
 create mode 100644 corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl
 delete mode 100644 corsika/detail/modules/writers/ObservationPlaneWriterParquet.inl
 create mode 100644 corsika/detail/modules/writers/ParticleWriterParquet.inl
 delete mode 100644 corsika/detail/output/BaseOutput.inl
 create mode 100644 corsika/detail/output/YAMLStreamer.inl
 create mode 100644 corsika/framework/utility/FindXmax.hpp
 create mode 100644 corsika/modules/writers/EnergyLossWriterOff.hpp
 create mode 100644 corsika/modules/writers/EnergyLossWriterParquet.hpp
 create mode 100644 corsika/modules/writers/LongitudinalProfileWriterOff.hpp
 create mode 100644 corsika/modules/writers/LongitudinalProfileWriterParquet.hpp
 create mode 100644 corsika/modules/writers/ParticleWriterOff.hpp
 create mode 100644 corsika/modules/writers/ParticleWriterParquet.hpp
 rename corsika/modules/writers/{ObservationPlaneWriterParquet.hpp => TrackWriterOff.hpp} (56%)
 create mode 100644 corsika/output/YAMLStreamer.hpp

diff --git a/corsika/detail/framework/core/ParticleProperties.inl b/corsika/detail/framework/core/ParticleProperties.inl
index 73c2e6f89..67cf78f64 100644
--- a/corsika/detail/framework/core/ParticleProperties.inl
+++ b/corsika/detail/framework/core/ParticleProperties.inl
@@ -60,12 +60,18 @@ namespace corsika {
                                 A * 10); // 10LZZZAAAI
   }
 
+  inline PDGCode constexpr get_PDG(unsigned int const A, unsigned int const Z) {
+    return PDGCode(1000000000 + Z * 10000 + A * 10);
+  }
+
   inline int16_t constexpr get_charge_number(Code const code) {
     if (is_nucleus(code)) return get_nucleus_Z(code);
     return particle::detail::electric_charges[static_cast<CodeIntType>(code)];
   }
 
   inline ElectricChargeType constexpr get_charge(Code const code) {
+    if (code == Code::Nucleus)
+      throw std::runtime_error("charge of particle::Nucleus undefined");
     return get_charge_number(code) * constants::e;
   }
 
diff --git a/corsika/detail/modules/LongitudinalProfile.inl b/corsika/detail/modules/LongitudinalProfile.inl
index 5c9b54c60..c16ad2f45 100644
--- a/corsika/detail/modules/LongitudinalProfile.inl
+++ b/corsika/detail/modules/LongitudinalProfile.inl
@@ -17,63 +17,18 @@
 
 namespace corsika {
 
-  inline LongitudinalProfile::LongitudinalProfile(ShowerAxis const& shower_axis,
-                                                  GrammageType dX)
-      : dX_(dX)
-      , shower_axis_{shower_axis}
-      , profiles_{static_cast<unsigned int>(shower_axis.getMaximumX() / dX_) + 1} {}
+  template <typename TOutput>
+  inline LongitudinalProfile<TOutput>::LongitudinalProfile(TOutput& output)
+      : output_(output) {}
 
+  template <typename TOutput>
   template <typename TParticle, typename TTrack>
-  inline ProcessReturn LongitudinalProfile::doContinuous(TParticle const& vP,
-                                                         TTrack const& vTrack,
-                                                         bool const) {
-    auto const pid = vP.getPID();
-
-    GrammageType const grammageStart = shower_axis_.getProjectedX(vTrack.getPosition(0));
-    GrammageType const grammageEnd = shower_axis_.getProjectedX(vTrack.getPosition(1));
-
-    CORSIKA_LOG_DEBUG("longprof: pos1={} m, pos2={}, X1={} g/cm2, X2={} g/cm2",
-                      vTrack.getPosition(0).getCoordinates() / 1_m,
-                      vTrack.getPosition(1).getCoordinates() / 1_m,
-                      grammageStart / 1_g * square(1_cm),
-                      grammageEnd / 1_g * square(1_cm));
-
-    // Note: particle may go also "upward", thus, grammageEnd<grammageStart
-    int const binStart = std::ceil(grammageStart / dX_);
-    int const binEnd = std::floor(grammageEnd / dX_);
-
-    for (int b = binStart; b <= binEnd; ++b) {
-      if (pid == Code::Photon) {
-        profiles_.at(b)[ProfileIndex::Photon]++;
-      } else if (pid == Code::Positron) {
-        profiles_.at(b)[ProfileIndex::Positron]++;
-      } else if (pid == Code::Electron) {
-        profiles_.at(b)[ProfileIndex::Electron]++;
-      } else if (pid == Code::MuPlus) {
-        profiles_.at(b)[ProfileIndex::MuPlus]++;
-      } else if (pid == Code::MuMinus) {
-        profiles_.at(b)[ProfileIndex::MuMinus]++;
-      } else if (is_hadron(pid)) {
-        profiles_.at(b)[ProfileIndex::Hadron]++;
-      } else if (is_neutrino(pid)) {
-        profiles_.at(b)[ProfileIndex::Invisible]++;
-      }
-    }
+  inline ProcessReturn LongitudinalProfile<TOutput>::doContinuous(
+      TParticle const& particle, TTrack const& track, bool const) {
 
+    auto const pid = particle.getPID();
+    output_.write(track, pid, 1.0); // weight hardcoded so far
     return ProcessReturn::Ok;
   }
 
-  inline void LongitudinalProfile::save(std::string const& filename, const int width,
-                                        const int precision) {
-    CORSIKA_LOG_DEBUG("Write longprof to {}", filename);
-    std::ofstream f{filename};
-    f << "# X / g·cm¯², photon, e+, e-, mu+, mu-, all hadrons, neutrinos" << std::endl;
-    for (size_t b = 0; b < profiles_.size(); ++b) {
-      f << std::setprecision(5) << std::setw(11) << b * (dX_ / (1_g / 1_cm / 1_cm));
-      for (auto const& N : profiles_.at(b)) {
-        f << std::setw(width) << std::setprecision(precision) << std::scientific << N;
-      }
-      f << std::endl;
-    }
-  }
 } // namespace corsika
diff --git a/corsika/detail/modules/ObservationPlane.inl b/corsika/detail/modules/ObservationPlane.inl
index 7100db181..2f0bdab98 100644
--- a/corsika/detail/modules/ObservationPlane.inl
+++ b/corsika/detail/modules/ObservationPlane.inl
@@ -8,11 +8,12 @@
 
 namespace corsika {
 
-  template <typename TTracking, typename TOutput>
-  ObservationPlane<TTracking, TOutput>::ObservationPlane(Plane const& obsPlane,
-                                                         DirectionVector const& x_axis,
-                                                         bool deleteOnHit)
+  template <typename TOutput>
+  ObservationPlane<TOutput>::ObservationPlane(Plane const& obsPlane,
+                                              DirectionVector const& x_axis,
+                                              TOutput& output, bool deleteOnHit)
       : plane_(obsPlane)
+      , output_(output)
       , deleteOnHit_(deleteOnHit)
       , energy_ground_(0_GeV)
       , count_ground_(0)
@@ -49,9 +50,17 @@ namespace corsika {
     Point const pointOfIntersection = step.getPosition(1);
     Vector const displacement = pointOfIntersection - plane_.getCenter();
 
-    // add our particles to the output file stream
-    this->write(particle.getPID(), energy, displacement.dot(xAxis_),
-                displacement.dot(yAxis_), particle.getTime());
+    double const weight = 1.0;
+    Code const pid = particle.getPID();
+    if (pid == Code::Nucleus) {
+      // add our particles to the output file stream
+      output_.write(particle.getNuclearA(), particle.getNuclearZ(), energy,
+                    displacement.dot(xAxis_), displacement.dot(yAxis_), weight);
+    } else {
+      // add our particles to the output file stream
+      output_.write(particle.getPID(), energy, displacement.dot(xAxis_),
+                    displacement.dot(yAxis_), weight);
+    }
 
     CORSIKA_LOG_TRACE("Particle detected absorbed={}", deleteOnHit_);
 
@@ -93,7 +102,7 @@ namespace corsika {
   template <typename TTracking, typename TOutput>
   inline void ObservationPlane<TTracking, TOutput>::showResults() const {
     CORSIKA_LOG_INFO(
-        " ******************************\n"
+        "\n ******************************\n"
         " ObservationPlane: \n"
         " energy an ground (GeV)     :  {}\n"
         " no. of particles at ground :  {}\n"
diff --git a/corsika/detail/modules/ParticleCut.inl b/corsika/detail/modules/ParticleCut.inl
index 0358bbb5a..b5853df3b 100644
--- a/corsika/detail/modules/ParticleCut.inl
+++ b/corsika/detail/modules/ParticleCut.inl
@@ -12,15 +12,15 @@
 
 namespace corsika {
 
-  inline ParticleCut::ParticleCut(HEPEnergyType const eEleCut,
-                                  HEPEnergyType const ePhoCut,
-                                  HEPEnergyType const eHadCut, HEPEnergyType const eMuCut,
-                                  bool const inv)
-      : doCutEm_(false)
+  template <typename TOutput>
+  inline ParticleCut<TOutput>::ParticleCut(TOutput& output, HEPEnergyType const eEleCut,
+                                           HEPEnergyType const ePhoCut,
+                                           HEPEnergyType const eHadCut,
+                                           HEPEnergyType const eMuCut, bool const inv)
+      : output_(output)
       , doCutInv_(inv)
       , energy_cut_(0_GeV)
       , energy_timecut_(0_GeV)
-      , energy_emcut_(0_GeV)
       , energy_invcut_(0_GeV)
       , em_count_(0)
       , inv_count_(0)
@@ -43,55 +43,14 @@ namespace corsika {
         eEleCut / 1_GeV, ePhoCut / 1_GeV, eHadCut / 1_GeV, eMuCut / 1_GeV);
   }
 
-  inline ParticleCut::ParticleCut(HEPEnergyType const eHadCut, HEPEnergyType const eMuCut,
-                                  bool const inv)
-      : doCutEm_(true)
-      , doCutInv_(inv)
-      , energy_cut_(0_GeV)
-      , energy_timecut_(0_GeV)
-      , energy_emcut_(0_GeV)
-      , energy_invcut_(0_GeV)
-      , em_count_(0)
-      , inv_count_(0)
-      , energy_count_(0) {
-
-    for (auto p : get_all_particles()) {
-      if (is_hadron(p))
-        set_kinetic_energy_threshold(p, eHadCut);
-      else if (is_muon(p))
-        set_kinetic_energy_threshold(p, eMuCut);
-    }
-    set_kinetic_energy_threshold(Code::Nucleus, eHadCut);
-    CORSIKA_LOG_DEBUG(
-        "setting thresholds: hadrons = {} GeV, "
-        "muons = {} GeV",
-        eHadCut / 1_GeV, eMuCut / 1_GeV);
-  }
-
-  inline ParticleCut::ParticleCut(HEPEnergyType const eCut, bool const em, bool const inv)
-      : doCutEm_(em)
-      , doCutInv_(inv)
-      , energy_cut_(0_GeV)
-      , energy_timecut_(0_GeV)
-      , energy_emcut_(0_GeV)
-      , energy_invcut_(0_GeV)
-      , em_count_(0)
-      , inv_count_(0)
-      , energy_count_(0) {
-    for (auto p : get_all_particles()) set_kinetic_energy_threshold(p, eCut);
-    set_kinetic_energy_threshold(Code::Nucleus, eCut);
-    CORSIKA_LOG_DEBUG("setting kinetic energy threshold for all particles to {} GeV",
-                      eCut / 1_GeV);
-  }
-
-  inline ParticleCut::ParticleCut(
-      std::unordered_map<Code const, HEPEnergyType const> const& eCuts, bool const em,
+  template <typename TOutput>
+  inline ParticleCut<TOutput>::ParticleCut(
+      TOutput& output, std::unordered_map<Code const, HEPEnergyType const> const& eCuts,
       bool const inv)
-      : doCutEm_(em)
+      : output_(output)
       , doCutInv_(inv)
       , energy_cut_(0_GeV)
       , energy_timecut_(0_GeV)
-      , energy_emcut_(0_GeV)
       , energy_invcut_(0_GeV)
       , em_count_(0)
       , inv_count_(0)
@@ -100,8 +59,9 @@ namespace corsika {
     CORSIKA_LOG_DEBUG("setting threshold particles individually");
   }
 
+  template <typename TOutput>
   template <typename TParticle>
-  inline bool ParticleCut::isBelowEnergyCut(TParticle const& vP) const {
+  inline bool ParticleCut<TOutput>::isBelowEnergyCut(TParticle const& vP) const {
     auto const energyLab = vP.getKineticEnergy();
     auto const pid = vP.getPID();
     // nuclei
@@ -114,12 +74,14 @@ namespace corsika {
     }
   }
 
-  inline bool ParticleCut::isInvisible(Code const& vCode) const {
+  template <typename TOutput>
+  inline bool ParticleCut<TOutput>::isInvisible(Code const& vCode) const {
     return is_neutrino(vCode);
   }
 
+  template <typename TOutput>
   template <typename TParticle>
-  inline bool ParticleCut::checkCutParticle(TParticle const& particle) {
+  inline bool ParticleCut<TOutput>::checkCutParticle(TParticle const& particle) {
 
     Code const pid = particle.getPID();
     HEPEnergyType const kine_energy = particle.getKineticEnergy();
@@ -158,22 +120,29 @@ namespace corsika {
     return false; // this particle will not be removed/cut
   }
 
-  template <typename TStackView>
-  inline void ParticleCut::doSecondaries(TStackView& vS) {
+
+  template <typename TOutput>
+  inline void ParticleCut<TOutput>::doSecondaries(TStackView& vS) {
     energy_event_ = 0_GeV; // per event counting for printout
     auto particle = vS.begin();
     while (particle != vS.end()) {
-      if (checkCutParticle(particle)) { particle.erase(); }
+      if (checkCutParticle(particle)) {
+        output_.write(particle.getPosition(), particle.getPID(),
+                      particle.getKineticEnergy());
+        particle.erase();
+      }
       ++particle; // next entry in SecondaryView
     }
     CORSIKA_LOG_DEBUG("Event cut: {} GeV", energy_event_ / 1_GeV);
   }
 
   template <typename TParticle, typename TTrajectory>
-  inline ProcessReturn ParticleCut::doContinuous(TParticle& particle, TTrajectory const&,
+  template <typename TOutput>
+  inline ProcessReturn ParticleCut<TOutput>::doContinuous(TParticle& particle, TTrajectory const&,
                                                  bool const) {
-    CORSIKA_LOG_TRACE("ParticleCut::DoContinuous");
     if (checkCutParticle(particle)) {
+      output_.write(particle.getPosition(), particle.getPID(),
+                    particle.getKineticEnergy());
       CORSIKA_LOG_TRACE("removing during continuous");
       // signal to upstream code that this particle was deleted
       return ProcessReturn::ParticleAbsorbed;
@@ -181,15 +150,17 @@ namespace corsika {
     return ProcessReturn::Ok;
   }
 
-  inline void ParticleCut::printThresholds() {
+  template <typename TOutput>
+  inline void ParticleCut<TOutput>::printThresholds() const {
     for (auto p : get_all_particles()) {
-      auto const Eth = calculate_kinetic_energy_threshold(p);
-      CORSIKA_LOG_INFO("kinetic energy threshold for particle {} is {} GeV", p,
-                       Eth / 1_GeV);
+      auto const Eth = get_kinetic_energy_threshold(p);
+      CORSIKA_LOG_DEBUG("kinetic energy threshold for particle {} is {} GeV", p,
+                        Eth / 1_GeV);
     }
   }
 
-  inline void ParticleCut::showResults() {
+  template <typename TOutput>
+  inline void ParticleCut<TOutput>::showResults() const {
     CORSIKA_LOG_INFO(
         "\n ******************************\n "
         " kinetic energy removed by cut of electromagnetic (GeV): {} (number: {})\n "
@@ -201,9 +172,8 @@ namespace corsika {
         energy_cut_ / 1_GeV, energy_count_, energy_timecut_ / 1_GeV);
   }
 
-  inline void ParticleCut::reset() {
-    energy_emcut_ = 0_GeV;
-    em_count_ = 0;
+  template <typename TOutput>
+  inline void ParticleCut<TOutput>::reset() {
     energy_invcut_ = 0_GeV;
     inv_count_ = 0;
     energy_cut_ = 0_GeV;
diff --git a/corsika/detail/modules/TrackWriter.inl b/corsika/detail/modules/TrackWriter.inl
index a1102ce2c..e8e0ac7ad 100644
--- a/corsika/detail/modules/TrackWriter.inl
+++ b/corsika/detail/modules/TrackWriter.inl
@@ -15,34 +15,34 @@
 
 namespace corsika {
 
-  template <typename TOutputWriter>
-  inline TrackWriter<TOutputWriter>::TrackWriter() {}
+  template <typename TOutput>
+  inline TrackWriter<TOutput>::TrackWriter(TOutput& output)
+      : output_(output) {}
 
-  template <typename TOutputWriter>
+  template <typename TOutput>
   template <typename TParticle, typename TTrack>
-  inline ProcessReturn TrackWriter<TOutputWriter>::doContinuous(TParticle const& vP,
-                                                                TTrack const& vT,
-                                                                bool const) {
+  inline ProcessReturn TrackWriter<TOutput>::doContinuous(TParticle const& vP,
+                                                          TTrack const& vT, bool const) {
 
     auto const start = vT.getPosition(0).getCoordinates();
     auto const end = vT.getPosition(1).getCoordinates();
 
     // write the track to the file
-    this->write(vP.getPID(), vP.getEnergy(), start, vP.getTime() - vT.getDuration(), end,
+    this->write(vP.getPID(), vP.getEnergy(), vP.getWeight(), start, vP.getTime() - vT.getDuration(), end,
                 vP.getTime());
 
     return ProcessReturn::Ok;
   }
 
-  template <typename TOutputWriter>
+  template <typename TOutput>
   template <typename TParticle, typename TTrack>
-  inline LengthType TrackWriter<TOutputWriter>::getMaxStepLength(TParticle const&,
-                                                                 TTrack const&) {
+  inline LengthType TrackWriter<TOutput>::getMaxStepLength(TParticle const&,
+                                                           TTrack const&) {
     return meter * std::numeric_limits<double>::infinity();
   }
 
-  template <typename TOutputWriter>
-  YAML::Node TrackWriter<TOutputWriter>::getConfig() const {
+  template <typename TOutput>
+  YAML::Node TrackWriter<TOutput>::getConfig() const {
     using namespace units::si;
 
     YAML::Node node;
diff --git a/corsika/detail/modules/conex/CONEXhybrid.inl b/corsika/detail/modules/conex/CONEXhybrid.inl
index 00e79005e..62b19516c 100644
--- a/corsika/detail/modules/conex/CONEXhybrid.inl
+++ b/corsika/detail/modules/conex/CONEXhybrid.inl
@@ -22,10 +22,14 @@
 
 namespace corsika {
 
-  inline CONEXhybrid::CONEXhybrid(Point center, ShowerAxis const& showerAxis,
-                                  LengthType groundDist, LengthType injectionHeight,
-                                  HEPEnergyType primaryEnergy, PDGCode primaryPDG)
-      : center_{center}
+  template <typename TOutput, typename TProfileOutput>
+  inline CONEXhybrid<TOutput, TProfileOutput>::CONEXhybrid(
+      TOutput& output, TProfileOutput& profileOutput, Point const& center,
+      ShowerAxis const& showerAxis, LengthType groundDist, LengthType injectionHeight,
+      HEPEnergyType primaryEnergy, PDGCode primaryPDG)
+      : output_{output}
+      , profileOutput_(profileOutput)
+      , center_{center}
       , showerAxis_{showerAxis}
       , groundDist_{groundDist}
       , injectionHeight_{injectionHeight}
@@ -133,7 +137,8 @@ namespace corsika {
   }
 
   template <typename TStackView>
-  inline void CONEXhybrid::doSecondaries(TStackView& vS) {
+  template <typename TOutput, typename TProfileOutput>
+  inline void CONEXhybrid<TOutput, TProfileOutput>::doSecondaries(TStackView& vS) {
     auto p = vS.begin();
     while (p != vS.end()) {
       Code const pid = p.getPID();
@@ -145,9 +150,10 @@ namespace corsika {
     }
   }
 
-  inline bool CONEXhybrid::addParticle(Code pid, HEPEnergyType energy, HEPEnergyType mass,
-                                       Point const& position,
-                                       DirectionVector const& direction, TimeType t) {
+  template <typename TOutput, typename TProfileOutput>
+  inline bool CONEXhybrid<TOutput, TProfileOutput>::addParticle(
+      Code pid, HEPEnergyType energy, HEPEnergyType mass, Point const& position,
+      DirectionVector const& direction, TimeType t) {
 
     auto const it = std::find_if(egs_em_codes_.cbegin(), egs_em_codes_.cend(),
                                  [=](auto const& p) { return pid == p.first; });
@@ -182,7 +188,8 @@ namespace corsika {
     double const v = direction.dot(x_sf_).magnitude();
     double const w = direction.dot(showerAxis_.getDirection()).magnitude();
 
-    double const weight = 1; // NEEDS TO BE CHANGED WHEN WE HAVE WEIGHTS!
+    double const weight =
+        1; // particle.getWeight(); // NEEDS TO BE CHANGED WHEN WE HAVE WEIGHTS!
 
     // generation, TO BE CHANGED WHEN WE HAVE THAT INFORMATION AVAILABLE
     int const latchin = 1;
@@ -233,7 +240,8 @@ namespace corsika {
   }
 
   template <typename TStack>
-  inline void CONEXhybrid::doCascadeEquations(TStack&) {
+  template <typename TOutput, typename TProfileOutput>
+  inline void CONEXhybrid<TOutput, TProfileOutput>::doCascadeEquations(TStack&) {
 
     ::conex::conexcascade_();
 
@@ -268,6 +276,18 @@ namespace corsika {
     ::conex::get_shower_electron_(icute, nX, Electrons[0]);
     ::conex::get_shower_hadron_(icuth, nX, Hadrons[0]);
 
+    // make sure CONEX binning is same to C8:
+    GrammageType dX = (X[1] - X[0]) * 1_g / square(1_cm);
+
+    for (int i = 0; i < nX; ++i) {
+      GrammageType curX = X[i] * 1_g / square(1_cm);
+      output_.write(curX, curX + dX, dEdX[i] * 1_GeV / 1_g * square(1_cm) * dX);
+      profileOutput_.write(curX, curX + dX, Code::Photon, Photon[i]);
+      profileOutput_.write(curX, curX + dX, Code::Proton /*hadron*/, Hadrons[i]);
+      profileOutput_.write(curX, curX + dX, Code::Electron, Electrons[i]);
+      profileOutput_.write(curX, curX + dX, Code::MuMinus, Mu[i]);
+    }
+
     std::ofstream file{"conex_output.txt"};
     file << fmt::format("#{:>10} {:>13} {:>13} {:>13} {:>13} {:>13} {:>13} {:>13}\n", "X",
                         "N", "dEdX", "Mu", "dMu", "Photon", "El", "Had");
@@ -293,8 +313,14 @@ namespace corsika {
     fitout << fitpars[13 - 1] << " # ???" << std::endl;
   }
 
-  inline HEPEnergyType CONEXhybrid::getEnergyEM() const { return energy_em_; }
+  template <typename TOutput, typename TProfileOutput>
+  inline HEPEnergyType CONEXhybrid<TOutput, TProfileOutput>::getEnergyEM() const {
+    return energy_em_;
+  }
 
-  inline void CONEXhybrid::reset() { energy_em_ = 0_GeV; }
+  template <typename TOutput, typename TProfileOutput>
+  inline void CONEXhybrid<TOutput, TProfileOutput>::reset() {
+    energy_em_ = 0_GeV;
+  }
 
 } // namespace corsika
diff --git a/corsika/detail/modules/energy_loss/BetheBlochPDG.inl b/corsika/detail/modules/energy_loss/BetheBlochPDG.inl
index 066997364..f42760a27 100644
--- a/corsika/detail/modules/energy_loss/BetheBlochPDG.inl
+++ b/corsika/detail/modules/energy_loss/BetheBlochPDG.inl
@@ -23,15 +23,14 @@ namespace corsika {
     return sqrt((Elab - m) * (Elab + m));
   };
 
-  inline BetheBlochPDG::BetheBlochPDG(ShowerAxis const& shower_axis)
-      : dX_(10_g / square(1_cm)) // profile binning
-      , dX_threshold_(0.0001_g / square(1_cm))
-      , shower_axis_(shower_axis)
-      , profile_(int(shower_axis.getMaximumX() / dX_) + 1) {}
+  template <typename TOutput>
+  inline BetheBlochPDG<TOutput>::BetheBlochPDG(TOutput& output)
+      : output_(output) {}
 
   template <typename TParticle>
-  inline HEPEnergyType BetheBlochPDG::getBetheBloch(TParticle const& p,
-                                                    GrammageType const dX) {
+  template <typename TOutput>
+  inline HEPEnergyType BetheBlochPDG<TOutput>::getBetheBloch(
+      TParticle const& p, GrammageType const dX) {
 
     // all these are material constants and have to come through Environment
     // right now: values for nitrogen_D
@@ -125,8 +124,9 @@ namespace corsika {
 
   // radiation losses according to PDG 2018, ch. 33 ref. [5]
   template <typename TParticle>
-  inline HEPEnergyType BetheBlochPDG::getRadiationLosses(TParticle const& vP,
-                                                         GrammageType const vDX) {
+  template <typename TOutput>
+  inline HEPEnergyType BetheBlochPDG<TOutput>::getRadiationLosses(
+      TParticle const& vP, GrammageType const vDX) {
     // simple-minded hard-coded value for b(E) inspired by data from
     // http://pdg.lbl.gov/2018/AtomicNuclearProperties/ for N and O.
     auto constexpr b = 3.0 * 1e-6 * square(1_cm) / 1_g;
@@ -134,14 +134,16 @@ namespace corsika {
   }
 
   template <typename TParticle>
-  inline HEPEnergyType BetheBlochPDG::getTotalEnergyLoss(TParticle const& vP,
-                                                         GrammageType const vDX) {
+  template <typename TOutput>
+  inline HEPEnergyType BetheBlochPDG<TOutput>::getTotalEnergyLoss(
+      TParticle const& vP, GrammageType const vDX) {
     return getBetheBloch(vP, vDX) + getRadiationLosses(vP, vDX);
   }
 
   template <typename TParticle, typename TTrajectory>
-  inline ProcessReturn BetheBlochPDG::doContinuous(TParticle& p, TTrajectory const& t,
-                                                   bool const) {
+  template <typename TOutput>
+  inline ProcessReturn BetheBlochPDG<TOutput>::doContinuous(
+      TParticle& particle, TTrajectory const& track, bool const) {
 
     // if this step was limiting the CORSIKA stepping, the particle is lost
     /* see Issue https://gitlab.ikp.kit.edu/AirShowerPhysics/corsika/-/issues/389
@@ -152,25 +154,28 @@ namespace corsika {
     }
     */
 
-    if (p.getChargeNumber() == 0) return ProcessReturn::Ok;
-
-    GrammageType const dX = p.getNode()->getModelProperties().getIntegratedGrammage(t);
-    CORSIKA_LOG_TRACE("EnergyLoss pid={}, z={}, dX={} g/cm2", p.getPID(),
-                      p.getChargeNumber(), dX / 1_g * square(1_cm));
-    HEPEnergyType dE = getTotalEnergyLoss(p, dX);
-    auto E = p.getEnergy();
-    [[maybe_unused]] const auto Ekin = E - p.getMass();
-    auto Enew = E + dE;
-    CORSIKA_LOG_TRACE("EnergyLoss  dE={} MeV, E={} GeV, Ekin={} GeV, Enew={} GeV",
-                      dE / 1_MeV, E / 1_GeV, Ekin / 1_GeV, Enew / 1_GeV);
-    p.setEnergy(Enew); // kinetic energy on stack
-    fillProfile(t, dE);
+    if (particle.getChargeNumber() == 0) return ProcessReturn::Ok;
+
+    GrammageType const dX =
+        particle.getNode()->getModelProperties().getIntegratedGrammage(track,
+                                                                       track.getLength());
+    CORSIKA_LOG_TRACE("EnergyLoss pid={}, z={}, dX={} g/cm2", particle.getPID(),
+                      particle.getChargeNumber(), dX / 1_g * square(1_cm));
+    HEPEnergyType const dE = getTotalEnergyLoss(particle, dX);
+    [[maybe_unused]] const auto Ekin = particle.getKineticEnergy();
+    auto EkinNew = Ekin + dE;
+    CORSIKA_LOG_TRACE("EnergyLoss  dE={} MeV, Ekin={} GeV, EkinNew={} GeV", dE / 1_MeV,
+                      Ekin / 1_GeV, EkinNew / 1_GeV);
+    particle.setKineticEnergy(EkinNew);
+    // also send to output
+    output_.write(track, particle.getPID(), -dE);
     return ProcessReturn::Ok;
   }
 
   template <typename TParticle, typename TTrajectory>
-  inline LengthType BetheBlochPDG::getMaxStepLength(TParticle const& vParticle,
-                                                    TTrajectory const& vTrack) const {
+  template <typename TOutput>
+  inline LengthType BetheBlochPDG<TOutput>::getMaxStepLength(TParticle const& vParticle,
+                                                             TTrajectory const& vTrack) const {
     if (vParticle.getChargeNumber() == 0) {
       return meter * std::numeric_limits<double>::infinity();
     }
@@ -193,80 +198,15 @@ namespace corsika {
         vTrack, maxGrammage);
   }
 
-  template <typename TTrajectory>
-  inline void BetheBlochPDG::fillProfile(TTrajectory const& vTrack,
-                                         const HEPEnergyType dE) {
-
-    GrammageType grammageStart = shower_axis_.getProjectedX(vTrack.getPosition(0));
-    GrammageType grammageEnd = shower_axis_.getProjectedX(vTrack.getPosition(1));
-
-    if (grammageStart > grammageEnd) { // particle going upstream
-      std::swap(grammageStart, grammageEnd);
-    }
-
-    GrammageType const deltaX = grammageEnd - grammageStart;
-
-    if (deltaX < dX_threshold_) return;
-
-    // only register the range that is covered by the profile
-    int const maxBin = int(profile_.size() - 1);
-    int binStart = grammageStart / dX_;
-    if (binStart < 0) binStart = 0;
-    if (binStart > maxBin) binStart = maxBin;
-    int binEnd = grammageEnd / dX_;
-    if (binEnd < 0) binEnd = 0;
-    if (binEnd > maxBin) binEnd = maxBin;
-
-    CORSIKA_LOG_TRACE("energy deposit of -dE={} between {} and {}", -dE, grammageStart,
-                      grammageEnd);
-
-    auto energyCount = HEPEnergyType::zero();
-
-    auto const factor = -dE / deltaX;
-    auto fill = [&](int const bin, GrammageType const weight) {
-      auto const increment = factor * weight;
-      profile_[bin] += increment;
-      energyCount += increment;
 
-      CORSIKA_LOG_TRACE("filling bin {} with weight {} : {} ", bin, weight, increment);
-    };
-
-    // fill longitudinal profile
-    if (binStart == binEnd) {
-      fill(binStart, deltaX);
-    } else {
-      fill(binStart, ((1 + binStart) * dX_ - grammageStart));
-      fill(binEnd, (grammageEnd - binEnd * dX_));
-      for (int bin = binStart + 1; bin < binEnd; ++bin) { fill(bin, dX_); }
-    }
-
-    CORSIKA_LOG_TRACE("total energy added to histogram: {} ", energyCount);
+  template <typename TOutput>
+  inline void BetheBlochPDG<TOutput>::showResults() const {
+    CORSIKA_LOG_INFO("energy lost dE (GeV)      : {}  ", energy_lost_ / 1_GeV);
   }
 
-  inline void BetheBlochPDG::printProfile() const {
-    std::ofstream file("EnergyLossProfile.dat");
-    file << "# EnergyLoss profile" << std::endl
-         << "# lower X bin edge [g/cm2]  dE/dX [GeV/g/cm2]\n";
-    double const deltaX = dX_ / 1_g * square(1_cm);
-    for (size_t i = 0; i < profile_.size(); ++i) {
-      file << std::scientific << std::setw(15) << i * deltaX << std::setw(15)
-           << profile_.at(i) / (deltaX * 1_GeV) << '\n';
-    }
-    file.close();
+  template <typename TOutput>
+  inline void BetheBlochPDG<TOutput>::reset() {
+    energy_lost_ = 0_GeV;
   }
 
-  inline HEPEnergyType BetheBlochPDG::getTotal() const {
-    return std::accumulate(profile_.cbegin(), profile_.cend(), HEPEnergyType::zero());
-  }
-
-  inline void BetheBlochPDG::showResults() const {
-    CORSIKA_LOG_INFO(
-        " ******************************\n"
-        " PROCESS::ContinuousProcess: \n"
-        " energy lost dE (GeV)      : {}\n  ",
-        energy_lost_ / 1_GeV);
-  }
-
-  inline void BetheBlochPDG::reset() { energy_lost_ = 0_GeV; }
-
 } // namespace corsika
diff --git a/corsika/detail/modules/writers/EnergyLossWriterParquet.inl b/corsika/detail/modules/writers/EnergyLossWriterParquet.inl
new file mode 100644
index 000000000..7148bdf8e
--- /dev/null
+++ b/corsika/detail/modules/writers/EnergyLossWriterParquet.inl
@@ -0,0 +1,205 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/framework/core/ParticleProperties.hpp>
+#include <corsika/framework/core/PhysicalUnits.hpp>
+#include <corsika/framework/utility/FindXmax.hpp>
+
+#include <corsika/media/ShowerAxis.hpp>
+
+#include <exception>
+
+namespace corsika {
+
+  inline EnergyLossWriterParquet::EnergyLossWriterParquet(ShowerAxis const& showerAxis,
+                                                          GrammageType const dX,
+                                                          unsigned int const nBins,
+                                                          GrammageType const dX_threshold)
+      : showerAxis_(showerAxis)
+      , dX_(dX) // profile binning
+      , nBins_(nBins)
+      , dX_threshold_(dX_threshold) {}
+
+  inline void EnergyLossWriterParquet::startOfLibrary(
+      boost::filesystem::path const& directory) {
+
+    // setup the streamer
+    output_.initStreamer((directory / "dEdX.parquet").string());
+
+    // enable compression with the default level
+    // output_.enableCompression();
+
+    // build the schema
+    output_.addField("X", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("total", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+
+    // and build the streamer
+    output_.buildStreamer();
+  }
+
+  inline void EnergyLossWriterParquet::startOfShower(unsigned int const) {
+
+    // initialize profile
+    profile_.clear();
+    profile_.resize(nBins_);
+  }
+
+  inline void EnergyLossWriterParquet::endOfShower(unsigned int const showerId) {
+
+    int iRow = 0;
+    for (Profile const& row : profile_) {
+
+      double const dX = dX_ / 1_g * square(1_cm); // g/cm2
+
+      // clang-format off
+      *(output_.getWriter()) << showerId << static_cast<float>(iRow * dX)
+                             << static_cast<float>(row.at(static_cast<int>(ProfileIndex::Total)) / 1_GeV / dX)
+                             << parquet::EndRow;
+      // clang-format on
+      iRow++;
+    }
+  }
+
+  inline void EnergyLossWriterParquet::endOfLibrary() { output_.closeStreamer(); }
+
+  template <typename TTrack>
+  inline void EnergyLossWriterParquet::write(TTrack const& track, Code const PID,
+                                             HEPEnergyType const dE) {
+
+    GrammageType grammageStart = showerAxis_.getProjectedX(track.getPosition(0));
+    GrammageType grammageEnd = showerAxis_.getProjectedX(track.getPosition(1));
+
+    if (grammageStart > grammageEnd) { // particle going upstream
+      std::swap(grammageStart, grammageEnd);
+    }
+
+    GrammageType const deltaX = grammageEnd - grammageStart;
+
+    if (deltaX < dX_threshold_) {
+      write(track.getPosition(0), PID, dE);
+      return;
+    }
+
+    // only register the range that is covered by the profile
+    int const maxBin = int(profile_.size() - 1);
+    int binStart = grammageStart / dX_;
+    if (binStart < 0) binStart = 0;
+    if (binStart > maxBin) binStart = maxBin;
+    int binEnd = grammageEnd / dX_;
+    if (binEnd < 0) binEnd = 0;
+    if (binEnd > maxBin) binEnd = maxBin;
+
+    CORSIKA_LOGGER_TRACE(getLogger(), "energy deposit of dE={} GeV between {} and {}",
+                         dE / 1_GeV, grammageStart / 1_g * square(1_cm),
+                         grammageEnd / 1_g * square(1_cm));
+
+    auto energyCount = HEPEnergyType::zero();
+
+    auto const factor = dE / deltaX;
+    auto fill = [&](int const bin, GrammageType const weight) {
+      auto const increment = factor * weight;
+      profile_[bin][static_cast<int>(ProfileIndex::Total)] += increment;
+      energyCount += increment;
+
+      CORSIKA_LOGGER_TRACE(getLogger(), "filling bin={} with weight {} : dE={} GeV ", bin,
+                           weight, increment / 1_GeV);
+    };
+
+    // fill longitudinal profile
+    if (binStart == binEnd) {
+      fill(binStart, deltaX);
+    } else {
+      fill(binStart, ((1 + binStart) * dX_ - grammageStart));
+      fill(binEnd, (grammageEnd - binEnd * dX_));
+      for (int bin = binStart + 1; bin < binEnd; ++bin) { fill(bin, dX_); }
+    }
+
+    CORSIKA_LOGGER_TRACE(getLogger(), "total energy added to histogram: {} GeV ",
+                         energyCount / 1_GeV);
+  }
+
+  inline void EnergyLossWriterParquet::write(Point const& point, Code const PID,
+                                             HEPEnergyType const dE) {
+    GrammageType grammage = showerAxis_.getProjectedX(point);
+    int const maxBin = int(profile_.size() - 1);
+    int bin = grammage / dX_;
+    if (bin < 0) bin = 0;
+    if (bin > maxBin) bin = maxBin;
+
+    CORSIKA_LOGGER_TRACE(getLogger(), "add local energy loss bin={} dE={} GeV ", bin,
+                         dE / 1_GeV);
+
+    profile_[bin][static_cast<int>(ProfileIndex::Total)] += dE;
+  }
+
+  inline void EnergyLossWriterParquet::write(GrammageType const Xstart,
+                                             GrammageType const Xend,
+                                             HEPEnergyType const dE) {
+    double const bstart = Xstart / dX_;
+    double const bend = Xend / dX_;
+
+    if (abs(bstart - floor(bstart + 0.5)) > 1e-2 ||
+        abs(bend - floor(bend + 0.5)) > 1e-2 || abs(bend - bstart - 1) > 1e-2) {
+      CORSIKA_LOGGER_ERROR(getLogger(),
+                           "CONEX and Corsika8 dX grammage binning are not the same! "
+                           "Xstart={} Xend={} dX={}",
+                           Xstart / 1_g * square(1_cm), Xend / 1_g * square(1_cm),
+                           dX_ / 1_g * square(1_cm));
+      throw std::runtime_error(
+          "CONEX and Corsika8 dX grammage binning are not the same!");
+    }
+
+    int const bin = int((bend + bstart) / 2);
+    CORSIKA_LOGGER_TRACE(getLogger(), "add binned energy loss {} {} bin={} dE={} GeV ",
+                         bstart, bend, bin, dE / 1_GeV);
+    profile_[bin][static_cast<int>(ProfileIndex::Total)] += dE;
+  }
+
+  inline HEPEnergyType EnergyLossWriterParquet::getTotal() const {
+    HEPEnergyType tot = HEPEnergyType::zero();
+    for (Profile const& row : profile_)
+      tot += row.at(static_cast<int>(ProfileIndex::Total));
+    return tot;
+  }
+
+  inline YAML::Node EnergyLossWriterParquet::getSummary() const {
+
+    // determined Xmax and dEdXmax from quadratic interpolation
+    double maximum = 0;
+    unsigned int iMaximum = 0;
+    for (unsigned int i = 0; i < profile_.size() - 3; ++i) {
+      double value = (profile_[i + 0].at(static_cast<int>(ProfileIndex::Total)) +
+                      profile_[i + 1].at(static_cast<int>(ProfileIndex::Total)) +
+                      profile_[i + 2].at(static_cast<int>(ProfileIndex::Total))) /
+                     1_GeV;
+      if (value > maximum) {
+        maximum = value;
+        iMaximum = i;
+      }
+    }
+
+    double const dX = dX_ / 1_g * square(1_cm);
+
+    auto [Xmax, dEdXmax] = FindXmax::interpolateProfile(
+        dX * (0.5 + iMaximum), dX * (1.5 + iMaximum), dX * (2.5 + iMaximum),
+        profile_[iMaximum + 0].at(static_cast<int>(ProfileIndex::Total)) / 1_GeV,
+        profile_[iMaximum + 1].at(static_cast<int>(ProfileIndex::Total)) / 1_GeV,
+        profile_[iMaximum + 2].at(static_cast<int>(ProfileIndex::Total)) / 1_GeV);
+
+    YAML::Node summary;
+    summary["total"] = getTotal() / 1_GeV;
+    summary["Xmax"] = Xmax;
+    summary["dEdXmax"] = dEdXmax;
+    return summary;
+  }
+
+} // namespace corsika
diff --git a/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl b/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl
new file mode 100644
index 000000000..aba83d01c
--- /dev/null
+++ b/corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl
@@ -0,0 +1,190 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/framework/core/ParticleProperties.hpp>
+#include <corsika/framework/core/PhysicalUnits.hpp>
+#include <corsika/framework/utility/FindXmax.hpp>
+
+#include <corsika/media/ShowerAxis.hpp>
+
+#include <string>
+#include <exception>
+
+namespace corsika {
+
+  inline LongitudinalProfileWriterParquet::LongitudinalProfileWriterParquet(
+      ShowerAxis const& showerAxis, GrammageType const dX, unsigned int const nBins)
+      : showerAxis_(showerAxis)
+      , dX_(dX) // profile binning
+      , nBins_(nBins) {}
+
+  inline void LongitudinalProfileWriterParquet::startOfLibrary(
+      boost::filesystem::path const& directory) {
+    // setup the streamer
+    output_.initStreamer((directory / "profile.parquet").string());
+
+    // enable compression with the default level
+    // output_.enableCompression();
+
+    // build the schema
+    output_.addField("X", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("charged", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("hadron", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("muminus", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("muplus", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("photon", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("electron", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("positron", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+
+    // and build the streamer
+    output_.buildStreamer();
+  }
+
+  inline void LongitudinalProfileWriterParquet::startOfShower(unsigned int const) {
+    // initialize profile
+    profile_.clear();
+    profile_.resize(nBins_);
+  }
+
+  inline void LongitudinalProfileWriterParquet::endOfShower(unsigned int const showerId) {
+    int iRow = 0;
+    for (ProfileData const& row : profile_) {
+
+      double const dX = dX_ / 1_g * square(1_cm); // g/cm2
+
+      // clang-format off
+      *(output_.getWriter()) << showerId << static_cast<float>(iRow * dX)
+                             << static_cast<float>(row.at(static_cast<int>(ProfileIndex::Charged)))
+                             << static_cast<float>(row.at(static_cast<int>(ProfileIndex::Hadron)))
+                             << static_cast<float>(row.at(static_cast<int>(ProfileIndex::MuMinus)))
+                             << static_cast<float>(row.at(static_cast<int>(ProfileIndex::MuPlus)))
+                             << static_cast<float>(row.at(static_cast<int>(ProfileIndex::Photon)))
+                             << static_cast<float>(row.at(static_cast<int>(ProfileIndex::Electron)))
+                             << static_cast<float>(row.at(static_cast<int>(ProfileIndex::Positron)))
+                             << parquet::EndRow;
+      // clang-format on
+      ++iRow;
+    }
+  }
+
+  inline void LongitudinalProfileWriterParquet::endOfLibrary() {
+    output_.closeStreamer();
+  }
+
+  template <typename TTrack>
+  inline void LongitudinalProfileWriterParquet::write(TTrack const& track, Code const pid,
+                                                      double const weight) {
+    GrammageType const grammageStart = showerAxis_.getProjectedX(track.getPosition(0));
+    GrammageType const grammageEnd = showerAxis_.getProjectedX(track.getPosition(1));
+
+    // Note: particle may go also "upward", thus, grammageEnd<grammageStart
+    int const binStart = std::ceil(grammageStart / dX_);
+    int const binEnd = std::floor(grammageEnd / dX_);
+
+    for (int bin = binStart; bin <= binEnd; ++bin) {
+      if (pid == Code::Photon) {
+        profile_.at(bin)[static_cast<int>(ProfileIndex::Photon)] += weight;
+      } else if (pid == Code::Positron) {
+        profile_.at(bin)[static_cast<int>(ProfileIndex::Positron)] += weight;
+      } else if (pid == Code::Electron) {
+        profile_.at(bin)[static_cast<int>(ProfileIndex::Electron)] += weight;
+      } else if (pid == Code::MuPlus) {
+        profile_.at(bin)[static_cast<int>(ProfileIndex::MuPlus)] += weight;
+      } else if (pid == Code::MuMinus) {
+        profile_.at(bin)[static_cast<int>(ProfileIndex::MuMinus)] += weight;
+      } else if (is_hadron(pid)) {
+        profile_.at(bin)[static_cast<int>(ProfileIndex::Hadron)] += weight;
+      }
+      if (is_charged(pid)) {
+        profile_[bin][static_cast<int>(ProfileIndex::Charged)] += weight;
+      }
+    }
+  }
+
+  inline void LongitudinalProfileWriterParquet::write(GrammageType const Xstart,
+                                                      GrammageType const Xend,
+                                                      Code const pid,
+                                                      double const weight) {
+    double const bstart = Xstart / dX_;
+    double const bend = Xend / dX_;
+
+    if (abs(bstart - floor(bstart + 0.5)) > 1e-2 ||
+        abs(bend - floor(bend + 0.5)) > 1e-2 || abs(bend - bstart - 1) > 1e-2) {
+      CORSIKA_LOGGER_ERROR(getLogger(),
+                           "CONEX and Corsika8 dX grammage binning are not the same! "
+                           "Xstart={} Xend={} dX={}",
+                           Xstart / 1_g * square(1_cm), Xend / 1_g * square(1_cm),
+                           dX_ / 1_g * square(1_cm));
+      throw std::runtime_error(
+          "CONEX and Corsika8 dX grammage binning are not the same!");
+    }
+
+    int const bin = int((bend + bstart) / 2);
+
+    if (pid == Code::Photon) {
+      profile_.at(bin)[static_cast<int>(ProfileIndex::Photon)] += weight;
+    } else if (pid == Code::Positron) {
+      profile_.at(bin)[static_cast<int>(ProfileIndex::Positron)] += weight;
+    } else if (pid == Code::Electron) {
+      profile_.at(bin)[static_cast<int>(ProfileIndex::Electron)] += weight;
+    } else if (pid == Code::MuPlus) {
+      profile_.at(bin)[static_cast<int>(ProfileIndex::MuPlus)] += weight;
+    } else if (pid == Code::MuMinus) {
+      profile_.at(bin)[static_cast<int>(ProfileIndex::MuMinus)] += weight;
+    } else if (is_hadron(pid)) {
+      profile_.at(bin)[static_cast<int>(ProfileIndex::Hadron)] += weight;
+    }
+    if (is_charged(pid)) {
+      profile_[bin][static_cast<int>(ProfileIndex::Charged)] += weight;
+    }
+  }
+
+  inline YAML::Node LongitudinalProfileWriterParquet::getSummary() const {
+    // determined Xmax and dEdXmax from quadratic interpolation
+
+    YAML::Node summary;
+
+    for (int index = 0; index < static_cast<int>(ProfileIndex::Entries); ++index) {
+      // first find highest 3-boxcar sum
+      double maximum = 0;
+      unsigned int iMaximum = 0;
+      for (unsigned int i = 0; i < profile_.size() - 3; ++i) {
+        double value = profile_[i + 0].at(index) + profile_[i + 1].at(index) +
+                       profile_[i + 2].at(index);
+        if (value > maximum) {
+          maximum = value;
+          iMaximum = i;
+        }
+      }
+
+      double const dX = dX_ / 1_g * square(1_cm);
+
+      // quadratic interpolation of maximum in 3 highest points
+      auto [Xmax, Nmax] = FindXmax::interpolateProfile(
+          dX * (0.5 + iMaximum), dX * (1.5 + iMaximum), dX * (2.5 + iMaximum),
+          profile_[iMaximum + 0].at(index), profile_[iMaximum + 1].at(index),
+          profile_[iMaximum + 2].at(index));
+
+      std::string const name = ProfileIndexNames[index];
+      summary[name]["Xmax"] = Xmax;
+      summary[name]["Nmax"] = Nmax;
+    }
+    return summary;
+  }
+
+} // namespace corsika
diff --git a/corsika/detail/modules/writers/ObservationPlaneWriterParquet.inl b/corsika/detail/modules/writers/ObservationPlaneWriterParquet.inl
deleted file mode 100644
index 5eef28a41..000000000
--- a/corsika/detail/modules/writers/ObservationPlaneWriterParquet.inl
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
- *
- * This software is distributed under the terms of the GNU General Public
- * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
- * the license.
- */
-
-#pragma once
-
-namespace corsika {
-
-  inline ObservationPlaneWriterParquet::ObservationPlaneWriterParquet()
-      : output_() {}
-
-  inline void ObservationPlaneWriterParquet::startOfLibrary(
-      boost::filesystem::path const& directory) {
-
-    // setup the streamer
-    output_.initStreamer((directory / "particles.parquet").string());
-
-    // enable compression with the default level
-    output_.enableCompression();
-
-    // build the schema
-    output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32,
-                     parquet::ConvertedType::INT_32);
-    output_.addField("energy", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
-                     parquet::ConvertedType::NONE);
-    output_.addField("x", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
-                     parquet::ConvertedType::NONE);
-    output_.addField("y", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
-                     parquet::ConvertedType::NONE);
-    output_.addField("t", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
-                     parquet::ConvertedType::NONE);
-
-    // and build the streamer
-    output_.buildStreamer();
-  }
-
-  inline void ObservationPlaneWriterParquet::endOfShower() { ++shower_; }
-
-  inline void ObservationPlaneWriterParquet::endOfLibrary() { output_.closeStreamer(); }
-
-  inline void ObservationPlaneWriterParquet::write(Code const& pid,
-                                                   HEPEnergyType const& energy,
-                                                   LengthType const& x,
-                                                   LengthType const& y,
-                                                   TimeType const& t) {
-    // write the next row - we must write `shower_` first.
-    *(output_.getWriter()) << shower_ << static_cast<int>(get_PDG(pid))
-                           << static_cast<float>(energy / 1_GeV)
-                           << static_cast<float>(x / 1_m) << static_cast<float>(y / 1_m)
-                           << static_cast<float>(t / 1_ns) << parquet::EndRow;
-  }
-
-} // namespace corsika
diff --git a/corsika/detail/modules/writers/ParticleWriterParquet.inl b/corsika/detail/modules/writers/ParticleWriterParquet.inl
new file mode 100644
index 000000000..d89db474f
--- /dev/null
+++ b/corsika/detail/modules/writers/ParticleWriterParquet.inl
@@ -0,0 +1,109 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/framework/core/Logging.hpp>
+
+namespace corsika {
+
+  inline ParticleWriterParquet::ParticleWriterParquet()
+      : output_()
+      , showerId_(0)
+      , energyGround_(0_eV) {}
+
+  inline void ParticleWriterParquet::startOfLibrary(
+      boost::filesystem::path const& directory) {
+
+    // setup the streamer
+    output_.initStreamer((directory / "particles.parquet").string());
+
+    // enable compression with the default level
+    // output_.enableCompression();
+
+    // build the schema
+    output_.addField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32,
+                     parquet::ConvertedType::INT_32);
+    output_.addField("energy", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("x", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("y", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+    output_.addField("weight", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
+
+    // and build the streamer
+    output_.buildStreamer();
+
+    showerId_ = 0;
+    energyGround_ = 0_eV;
+    countHadrons_ = 0;
+    countOthers_ = 0;
+    countEM_ = 0;
+    countMuons_ = 0;
+  }
+
+  inline void ParticleWriterParquet::startOfShower(unsigned int const showerId) {
+    showerId_ = showerId;
+  }
+
+  inline void ParticleWriterParquet::endOfShower(unsigned int const) {}
+
+  inline void ParticleWriterParquet::endOfLibrary() { output_.closeStreamer(); }
+
+  inline void ParticleWriterParquet::write(Code const& pid, HEPEnergyType const& energy,
+                                           LengthType const& x, LengthType const& y,
+                                           double const weight) {
+
+    // write the next row - we must write `shower_` first.
+    *(output_.getWriter()) << showerId_ << static_cast<int>(get_PDG(pid))
+                           << static_cast<float>(energy / 1_GeV)
+                           << static_cast<float>(x / 1_m) << static_cast<float>(y / 1_m)
+                           << static_cast<float>(weight) << parquet::EndRow;
+
+    energyGround_ += energy;
+
+    if (is_hadron(pid)) {
+      ++countHadrons_;
+    } else if (is_muon(pid)) {
+      ++countMuons_;
+    } else if (is_em(pid)) {
+      ++countEM_;
+    } else {
+      ++countOthers_;
+    }
+  }
+
+  inline void ParticleWriterParquet::write(unsigned int const A, unsigned int const Z,
+                                           HEPEnergyType const& energy,
+                                           LengthType const& x, LengthType const& y,
+                                           double const weight) {
+    // write the next row - we must write `shower_` first.
+    *(output_.getWriter()) << showerId_ << static_cast<int>(get_PDG(A, Z))
+                           << static_cast<float>(energy / 1_GeV)
+                           << static_cast<float>(x / 1_m) << static_cast<float>(y / 1_m)
+                           << static_cast<float>(weight) << parquet::EndRow;
+    energyGround_ += energy;
+
+    ++countHadrons_;
+  }
+
+  /**
+   * Return collected library-level summary for output.
+   */
+  YAML::Node ParticleWriterParquet::getSummary() const {
+    YAML::Node summary;
+    summary["hadrons"] = countHadrons_;
+    summary["muons"] = countMuons_;
+    summary["em"] = countEM_;
+    summary["others"] = countOthers_;
+    return summary;
+  }
+
+} // namespace corsika
diff --git a/corsika/detail/modules/writers/TrackWriterParquet.inl b/corsika/detail/modules/writers/TrackWriterParquet.inl
index 90e0ba2c7..f233ca318 100644
--- a/corsika/detail/modules/writers/TrackWriterParquet.inl
+++ b/corsika/detail/modules/writers/TrackWriterParquet.inl
@@ -11,7 +11,8 @@
 namespace corsika {
 
   inline TrackWriterParquet::TrackWriterParquet()
-      : output_() {}
+      : output_()
+      , showerId_(0) {}
 
   inline void TrackWriterParquet::startOfLibrary(
       boost::filesystem::path const& directory) {
@@ -24,6 +25,8 @@ namespace corsika {
                      parquet::ConvertedType::INT_32);
     output_.addField("energy", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
                      parquet::ConvertedType::NONE);
+    output_.addField("weight", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
+                     parquet::ConvertedType::NONE);
     output_.addField("start_x", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
                      parquet::ConvertedType::NONE);
     output_.addField("start_y", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
@@ -43,13 +46,19 @@ namespace corsika {
 
     // and build the streamer
     output_.buildStreamer();
+    showerId_ = 0;
+  }
+
+  inline void TrackWriterParquet::startOfShower(unsigned int const showerId) {
+    showerId_ = showerId;
   }
 
-  inline void TrackWriterParquet::endOfShower() { ++shower_; }
+  inline void TrackWriterParquet::endOfShower(unsigned int const) {}
 
   inline void TrackWriterParquet::endOfLibrary() { output_.closeStreamer(); }
 
   inline void TrackWriterParquet::write(Code const& pid, HEPEnergyType const& energy,
+                                        double const weight,
                                         QuantityVector<length_d> const& start,
                                         TimeType const& t_start,
                                         QuantityVector<length_d> const& end,
@@ -58,9 +67,10 @@ namespace corsika {
     // write the next row - we must write `shower_` first.
     // clang-format off
     *(output_.getWriter())
-        << shower_
+        << showerId_
         << static_cast<int>(get_PDG(pid))
         << static_cast<float>(energy / 1_GeV)
+        << static_cast<float>(weight)
         << static_cast<float>(start[0] / 1_m)
         << static_cast<float>(start[1] / 1_m)
         << static_cast<float>(start[2] / 1_m)
diff --git a/corsika/detail/output/BaseOutput.inl b/corsika/detail/output/BaseOutput.inl
deleted file mode 100644
index 24ec861ac..000000000
--- a/corsika/detail/output/BaseOutput.inl
+++ /dev/null
@@ -1,15 +0,0 @@
-/*
- * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
- *
- * This software is distributed under the terms of the GNU General Public
- * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
- * the license.
- */
-#pragma once
-
-namespace corsika {
-
-  inline BaseOutput::BaseOutput()
-      : shower_(0) {}
-
-} // namespace corsika
diff --git a/corsika/detail/output/OutputManager.inl b/corsika/detail/output/OutputManager.inl
index 1fa7e0267..2cc5e9780 100644
--- a/corsika/detail/output/OutputManager.inl
+++ b/corsika/detail/output/OutputManager.inl
@@ -22,23 +22,69 @@
 
 namespace corsika {
 
-  inline void OutputManager::writeNode(YAML::Node const& node,
-                                       boost::filesystem::path const& path) const {
+  inline OutputManager::OutputManager(
+      std::string const& name,
+      boost::filesystem::path const& dir = boost::filesystem::current_path())
+      : root_(dir / name)
+      , name_(name)
+      , count_(0) {
+
+    // check if this directory already exists
+    if (boost::filesystem::exists(root_)) {
+      CORSIKA_LOGGER_ERROR(logger_,
+                           "Output directory '{}' already exists! Do not overwrite!.",
+                           root_.string());
+      throw std::runtime_error("Output directory already exists.");
+    }
+
+    // construct the directory for this library
+    boost::filesystem::create_directories(root_);
+
+    CORSIKA_LOGGER_INFO(logger_, fmt::format("Output library: \"{}\"", root_.string()));
+    writeYAML(getConfig(), root_ / ("config.yaml"));
+  }
+
+  template <typename TOutput>
+  inline void OutputManager::add(std::string const& name, TOutput& output) {
+    // check if that name is already in the map
+    if (outputs_.count(name) > 0) {
+      CORSIKA_LOGGER_ERROR(
+          logger_, "'{}' is already registered. All outputs must have unique names.",
+          name);
+      throw std::runtime_error("Output already exists. Do not overwrite!");
+    }
+
+    // if we get here, the name is not already in the map
+    // so we create the output and register it into the map
+    outputs_.insert(std::make_pair(name, std::ref(output)));
+
+    // create the directory for this process.
+    boost::filesystem::create_directory(root_ / name);
+  }
 
-    // construct a YAML emitter for this config file
-    YAML::Emitter out;
+  inline OutputManager::~OutputManager() {
 
-    // and write the node to the output
-    out << node;
+    if (state_ == OutputState::ShowerInProgress) {
+      // if this the destructor is called before the shower has been explicitly
+      // ended, print a warning and end the shower before continuing.
+      CORSIKA_LOGGER_WARN(logger_,
+                          "OutputManager was destroyed before endOfShower() called."
+                          " The last shower in this libray may be incomplete.");
+      endOfShower();
+    }
 
-    // open the output file - this is <output name>.yaml
-    boost::filesystem::ofstream file(path);
+    // write the top level summary file (summary.yaml)
+    writeSummary();
 
-    // dump the YAML to the file
-    file << out.c_str() << std::endl;
+    // if we are being destructed but EndOfLibrary() has not been called,
+    // make sure that we gracefully close all the outputs. This is a supported
+    // method of operation so we don't issue a warning here
+    if (state_ == OutputState::LibraryReady) { endOfLibrary(); }
   }
 
-  inline void OutputManager::writeTopLevelConfig() const {
+  inline int OutputManager::getEventId() const { return count_; }
+
+  inline YAML::Node OutputManager::getConfig() const {
 
     YAML::Node config;
 
@@ -47,16 +93,15 @@ namespace corsika {
     config["creator"] = "CORSIKA8";       // a tag to identify C8 libraries
     config["version"] = "8.0.0-prealpha"; // the current version
 
-    // write the node to a file
-    writeNode(config, root_ / ("config.yaml"));
+    return config;
   }
 
-  inline void OutputManager::writeTopLevelSummary() const {
+  inline YAML::Node OutputManager::getSummary() const {
 
-    YAML::Node config;
+    YAML::Node summary;
 
     // the total number of showers contained in the library
-    config["showers"] = count_;
+    summary["showers"] = count_;
 
     // this next section handles writing some time and duration information
 
@@ -81,87 +126,17 @@ namespace corsika {
     auto runtime{end_time - start_time};
 
     // add the time and duration info
-    config["start time"] = timeToString(start_time);
-    config["end time"] = timeToString(end_time);
-    config["runtime"] = fmt::format("{:%H:%M:%S}", runtime);
-
-    // write the node to a file
-    writeNode(config, root_ / ("summary.yaml"));
-  }
-
-  inline void OutputManager::initOutput(std::string const& name) const {
-    // construct the path to this directory
-    auto const path{root_ / name};
-
-    // create the directory for this process.
-    boost::filesystem::create_directory(path);
-
-    // get the config for this output
-    auto config = outputs_.at(name).get().getConfig();
-
-    // and assign the name for this output
-    config["name"] = name;
-
-    // write the config for this output to the file
-    writeNode(config, path / "config.yaml");
-  }
-
-  inline OutputManager::OutputManager(
-      std::string const& name,
-      boost::filesystem::path const& dir = boost::filesystem::current_path())
-      : name_(name)
-      , root_(dir / name) {
-
-    // check if this directory already exists
-    if (boost::filesystem::exists(root_)) {
-      logger->error("Output directory '{}' already exists! Do not overwrite!.",
-                    root_.string());
-      throw std::runtime_error("Output directory already exists.");
-    }
-
-    // construct the directory for this library
-    boost::filesystem::create_directories(root_);
+    summary["start time"] = timeToString(start_time);
+    summary["end time"] = timeToString(end_time);
+    summary["runtime"] = fmt::format("{:%H:%M:%S}", runtime);
 
-    // write the top level config file
-    writeTopLevelConfig();
+    return summary;
   }
 
-  inline OutputManager::~OutputManager() {
-
-    if (state_ == OutputState::ShowerInProgress) {
-      // if this the destructor is called before the shower has been explicitly
-      // ended, print a warning and end the shower before continuing.
-      logger->warn(
-          "OutputManager was destroyed before endOfShower() called."
-          " The last shower in this libray may be incomplete.");
-      endOfShower();
-    }
-
-    // write the top level summary file (summary.yaml)
-    writeTopLevelSummary();
-
-    // if we are being destructed but EndOfLibrary() has not been called,
-    // make sure that we gracefully close all the outputs. This is a supported
-    // method of operation so we don't issue a warning here
-    if (state_ == OutputState::LibraryReady) { endOfLibrary(); }
-  }
-
-  template <typename TOutput>
-  inline void OutputManager::add(std::string const& name, TOutput& output) {
-
-    // check if that name is already in the map
-    if (outputs_.count(name) > 0) {
-      logger->error("'{}' is already registered. All outputs must have unique names.",
-                    name);
-      throw std::runtime_error("Output already exists. Do not overwrite!");
-    }
-
-    // if we get here, the name is not already in the map
-    // so we create the output and register it into the map
-    outputs_.insert(std::make_pair(name, std::ref(output)));
+  inline void OutputManager::writeSummary() const {
 
-    // and initialize this output
-    initOutput(name);
+    // write the node to a file
+    writeYAML(getSummary(), root_ / ("summary.yaml"));
   }
 
   inline void OutputManager::startOfLibrary() {
@@ -176,15 +151,13 @@ namespace corsika {
     // we now forward this signal to all of our outputs
     for (auto& [name, output] : outputs_) {
 
-      // construct the path to this output subdirectory
-      auto const path{root_ / name};
-
       // and start the library
-      output.get().startOfLibrary(path);
+      output.get().startOfLibrary(root_ / name);
     }
 
     // we have now started running
     state_ = OutputState::LibraryReady;
+    count_ = 0; // event counter
   }
 
   inline void OutputManager::startOfShower() {
@@ -194,15 +167,7 @@ namespace corsika {
     if (state_ == OutputState::NoInit) { startOfLibrary(); }
 
     // now start the event for all the outputs
-    for (auto& [name, output] : outputs_) {
-      {
-        [[maybe_unused]] auto const& dummy_name = name;
-      }
-      output.get().startOfShower();
-    }
-
-    // increment our shower count
-    ++count_;
+    for (auto& [name, output] : outputs_) { output.get().startOfShower(count_); }
 
     // and transition to the in progress state
     state_ = OutputState::ShowerInProgress;
@@ -210,15 +175,13 @@ namespace corsika {
 
   inline void OutputManager::endOfShower() {
 
-    for (auto& [name, output] : outputs_) {
-      {
-        [[maybe_unused]] auto const& dummy_name = name;
-      }
-      output.get().endOfShower();
-    }
+    for (auto& [name, output] : outputs_) { output.get().endOfShower(count_); }
 
     // switch back to the initialized state
     state_ = OutputState::LibraryReady;
+
+    // increment our shower count
+    ++count_;
   }
 
   inline void OutputManager::endOfLibrary() {
@@ -230,12 +193,11 @@ namespace corsika {
 
     // write the summary for each output and forward the endOfLibrary call()
     for (auto& [name, output] : outputs_) {
-
-      // we get the summary for each output as a YAML node
-      auto summary{outputs_.at(name).get().getSummary()};
-
-      // write the summary for this output to the file
-      writeNode(summary, root_ / name / "summary.yaml");
+      // save eventual YAML summary
+      YAML::Node const summary = output.get().getSummary();
+      if (!summary.IsNull()) {
+        writeYAML(output.get().getSummary(), root_ / name / ("summary.yaml"));
+      }
 
       // and forward the end of library call
       output.get().endOfLibrary();
@@ -243,6 +205,6 @@ namespace corsika {
 
     // and the library has finished
     state_ = OutputState::LibraryFinished;
-  }
+  } // namespace corsika
 
 } // namespace corsika
diff --git a/corsika/detail/output/ParquetStreamer.inl b/corsika/detail/output/ParquetStreamer.inl
index d0bb2f9e0..ddef90eed 100644
--- a/corsika/detail/output/ParquetStreamer.inl
+++ b/corsika/detail/output/ParquetStreamer.inl
@@ -23,7 +23,7 @@ namespace corsika {
 
     // add run and event tags to the file
     addField("shower", parquet::Repetition::REQUIRED, parquet::Type::INT32,
-             parquet::ConvertedType::INT_32);
+             parquet::ConvertedType::UINT_32);
   }
 
   template <typename... TArgs>
@@ -31,9 +31,9 @@ namespace corsika {
     fields_.push_back(parquet::schema::PrimitiveNode::Make(args...));
   }
 
-  inline void ParquetStreamer::enableCompression(int const /*level*/) {
-    // builder_.compression(parquet::Compression::ZSTD);
-    // builder_.compression_level(level);
+  inline void ParquetStreamer::enableCompression(int const level) {
+    builder_.compression(parquet::Compression::LZ4);
+    builder_.compression_level(level);
   }
 
   inline void ParquetStreamer::buildStreamer() {
diff --git a/corsika/detail/output/YAMLStreamer.inl b/corsika/detail/output/YAMLStreamer.inl
new file mode 100644
index 000000000..bdca88ac2
--- /dev/null
+++ b/corsika/detail/output/YAMLStreamer.inl
@@ -0,0 +1,29 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+namespace corsika {
+
+  inline void YAMLStreamer::writeYAML(YAML::Node const& node,
+                                      boost::filesystem::path const& path) const {
+
+    // construct a YAML emitter for this config file
+    YAML::Emitter out;
+
+    // and write the node to the output
+    out << node;
+
+    // open the output file - this is <output name>.yaml
+    boost::filesystem::ofstream file(path);
+
+    // dump the YAML to the file
+    file << out.c_str() << std::endl;
+  }
+
+} // namespace corsika
diff --git a/corsika/framework/core/ParticleProperties.hpp b/corsika/framework/core/ParticleProperties.hpp
index 1b3449541..267c67838 100644
--- a/corsika/framework/core/ParticleProperties.hpp
+++ b/corsika/framework/core/ParticleProperties.hpp
@@ -109,7 +109,7 @@ namespace corsika {
 
   //! Particle code according to PDG, "Monte Carlo Particle Numbering Scheme"
   PDGCode constexpr get_PDG(Code const);
-
+  PDGCode constexpr get_PDG(unsigned int const A, unsigned int const Z);
   std::string_view constexpr get_name(Code const); //!< name of the particle as string
   TimeType constexpr get_lifetime(Code const);     //!< lifetime
 
@@ -117,6 +117,7 @@ namespace corsika {
   bool constexpr is_em(Code const); //!< true if particle is electron, positron or photon
   bool constexpr is_muon(Code const);     //!< true if particle is mu+ or mu-
   bool constexpr is_neutrino(Code const); //!< true if particle is (anti-) neutrino
+  bool constexpr is_charged(Code const); //!< true if particle is charged
 
   /**
    * @brief Creates the Code for a nucleus of type 10LZZZAAAI.
diff --git a/corsika/framework/utility/FindXmax.hpp b/corsika/framework/utility/FindXmax.hpp
new file mode 100644
index 000000000..5b3e1e6b9
--- /dev/null
+++ b/corsika/framework/utility/FindXmax.hpp
@@ -0,0 +1,120 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/framework/core/Logging.hpp>
+
+#include <tuple>
+
+namespace corsika {
+
+  /**
+   * Interpolates profiles around maximum.
+   *
+   * The maximum of profiles can be robustly estimated from the three maximal points by
+   * quadratic interpolation. This code is copied from CONEX and is awaiting full adaption
+   * into CORSIKA8. This is a temporary solution (just copied).
+   */
+
+  class FindXmax {
+
+    static bool invert3by3(double A[3][3]) {
+      const double kSmall = 1e-80;
+
+      double determinant = A[0][0] * (A[1][1] * A[2][2] - A[1][2] * A[2][1]) -
+                           A[0][1] * (A[1][0] * A[2][2] - A[1][2] * A[2][0]) +
+                           A[0][2] * (A[1][0] * A[2][1] - A[1][1] * A[2][0]);
+
+      double absDet = fabs(determinant);
+
+      if (absDet < kSmall) {
+        CORSIKA_LOG_WARN("invert3by3: Error-matrix singular (absDet={})", absDet);
+        return false;
+      }
+
+      double B[3][3];
+
+      B[0][0] = A[1][1] * A[2][2] - A[1][2] * A[2][1];
+      B[1][0] = A[1][2] * A[2][0] - A[2][2] * A[1][0];
+      B[2][0] = A[1][0] * A[2][1] - A[1][1] * A[2][0];
+
+      B[0][1] = A[0][2] * A[2][1] - A[2][2] * A[0][1];
+      B[1][1] = A[0][0] * A[2][2] - A[2][0] * A[0][2];
+      B[2][1] = A[0][1] * A[2][0] - A[0][0] * A[2][1];
+
+      B[0][2] = A[0][1] * A[1][2] - A[1][1] * A[0][2];
+      B[1][2] = A[0][2] * A[1][0] - A[1][2] * A[0][0];
+      B[2][2] = A[0][0] * A[1][1] - A[0][1] * A[1][0];
+
+      for (int i = 0; i < 3; i++)
+        for (int j = 0; j < 3; j++) A[i][j] = B[i][j] / determinant;
+
+      return true;
+    }
+
+    /****************************************************
+     *
+     * solves linear system
+     *
+     *   / y[0] \   / A[0][0] A[0][1] A[0][2] \   / x[0] \
+     *   | y[1] | = | A[1][0] A[1][1] A[1][2] | * | x[1] |
+     *   \ y[2] /   \ A[2][0] A[2][1] A[2][2] /   \ x[2] /
+     *
+     *
+     * Input:  y[3] and A[3][3]
+     * Output: returns true when succeded (i.e. A is not singular)
+     *         A is overwritten with its inverse
+     *
+     * M.Unger 12/1/05
+     *
+     ****************************************************/
+    static bool solve3by3(double y[3], double A[3][3], double x[3]) {
+      if (invert3by3(A)) {
+
+        for (int i = 0; i < 3; i++)
+          x[i] = A[i][0] * y[0] + A[i][1] * y[1] + A[i][2] * y[2];
+
+        return true;
+
+      } else
+        return false;
+    }
+
+  public:
+    static std::tuple<double, double> interpolateProfile(double x1, double x2, double x3,
+                                                         double y1, double y2,
+                                                         double y3) {
+
+      // quadratic "fit" around maximum to get dEdXmax and Xmax
+      double x[3] = {x1, x2, x3};
+      double y[3] = {y1, y2, y3};
+
+      double A[3][3];
+      A[0][0] = x[0] * x[0];
+      A[0][1] = x[0];
+      A[0][2] = 1.;
+      A[1][0] = x[1] * x[1];
+      A[1][1] = x[1];
+      A[1][2] = 1.;
+      A[2][0] = x[2] * x[2];
+      A[2][1] = x[2];
+      A[2][2] = 1.;
+
+      double a[3];
+
+      solve3by3(y, A, a);
+
+      if (a[0] < 0.) {
+        double const Xmax = -a[1] / (2. * a[0]);
+        return std::make_tuple(Xmax, a[0] * Xmax * Xmax + a[1] * Xmax + a[2]);
+      }
+      return std::make_tuple(0, 0);
+    }
+  };
+} // namespace corsika
\ No newline at end of file
diff --git a/corsika/modules/LongitudinalProfile.hpp b/corsika/modules/LongitudinalProfile.hpp
index b5d199fb8..2319a3866 100644
--- a/corsika/modules/LongitudinalProfile.hpp
+++ b/corsika/modules/LongitudinalProfile.hpp
@@ -12,6 +12,9 @@
 #include <corsika/framework/process/ContinuousProcess.hpp>
 #include <corsika/framework/core/PhysicalUnits.hpp>
 
+#include <corsika/modules/writers/LongitudinalProfileWriterParquet.hpp>
+#include <corsika/modules/writers/LongitudinalProfileWriterOff.hpp>
+
 #include <array>
 #include <fstream>
 #include <limits>
@@ -33,11 +36,11 @@ namespace corsika {
    * boundaries.
    */
 
-  class LongitudinalProfile : public ContinuousProcess<LongitudinalProfile> {
+  template <typename TOutput>
+  class LongitudinalProfile : public ContinuousProcess<LongitudinalProfile<TOutput>> {
 
   public:
-    LongitudinalProfile(ShowerAxis const&,
-                        GrammageType dX = 10_g / square(1_cm)); // profile binning);
+    LongitudinalProfile(TOutput& output);
 
     template <typename TParticle, typename TTrack>
     ProcessReturn doContinuous(
@@ -49,22 +52,8 @@ namespace corsika {
       return meter * std::numeric_limits<double>::infinity();
     }
 
-    void save(std::string const&, int const width = 14, int const precision = 6);
-
   private:
-    GrammageType const dX_;
-    ShowerAxis const& shower_axis_;
-    using ProfileEntry = std::array<uint32_t, 7>;
-    enum ProfileIndex {
-      Photon = 0,
-      Positron = 1,
-      Electron = 2,
-      MuPlus = 3,
-      MuMinus = 4,
-      Hadron = 5,
-      Invisible = 6,
-    };
-    std::vector<ProfileEntry> profiles_; // longitudinal profile
+    TOutput& output_;
   };
 
 } // namespace corsika
diff --git a/corsika/modules/ObservationPlane.hpp b/corsika/modules/ObservationPlane.hpp
index 6d71d8f76..bf1798c18 100644
--- a/corsika/modules/ObservationPlane.hpp
+++ b/corsika/modules/ObservationPlane.hpp
@@ -36,7 +36,16 @@ namespace corsika {
         public TOutputWriter {
 
   public:
-    ObservationPlane(Plane const&, DirectionVector const&, bool = true);
+    ObservationPlane(Plane const&, DirectionVector const&, TOutput& output, bool = true);
+
+    ObservationPlane(Plane const& p, DirectionVector const& d, bool f = true)
+        : ObservationPlane(p, d, *(new ParticleWriterOff()), f) {
+      ownOutput_ = true;
+    }
+
+    ~ObservationPlane() {
+      if (ownOutput_) delete &output_;
+    }
 
     template <typename TParticle, typename TTrajectory>
     ProcessReturn doContinuous(TParticle& vParticle, TTrajectory& vTrajectory,
@@ -52,6 +61,8 @@ namespace corsika {
 
   private:
     Plane const plane_;
+    TOutput& output_;
+    bool ownOutput_ = false;
     bool const deleteOnHit_;
     HEPEnergyType energy_ground_;
     unsigned int count_ground_;
diff --git a/corsika/modules/ParticleCut.hpp b/corsika/modules/ParticleCut.hpp
index 159186e1d..3db5d0d30 100644
--- a/corsika/modules/ParticleCut.hpp
+++ b/corsika/modules/ParticleCut.hpp
@@ -24,30 +24,40 @@ namespace corsika {
   for each particle. Special constructors for cuts by the following groups are
   implemented: (electrons,positrons), photons, hadrons and muons.
    **/
-  class ParticleCut : public SecondariesProcess<ParticleCut>,
-                      public ContinuousProcess<ParticleCut> {
+  template <typename TOutput = EnergyLossWriterOff>
+  class ParticleCut : public SecondariesProcess<ParticleCut<TOutput>>,
+                      public ContinuousProcess<ParticleCut<TOutput>> {
 
   public:
     /**
      * particle cut with kinetic energy thresholds for electrons, photons,
      *    hadrons (including nuclei with energy per nucleon) and muons
      *    invisible particles (neutrinos) can be cut or not
-     **/
-    ParticleCut(HEPEnergyType const eEleCut, HEPEnergyType const ePhoCut,
+     */
+    ParticleCut(TOutput& output, HEPEnergyType const eEleCut, HEPEnergyType const ePhoCut,
                 HEPEnergyType const eHadCut, HEPEnergyType const eMuCut, bool const inv);
 
-    //! simple cut. hadrons and muons are cut by threshold. EM particles are all
-    //! discarded.
-    ParticleCut(HEPEnergyType const eHadCut, HEPEnergyType const euCut, bool const inv);
-
-    //! simplest cut. all particles have same threshold. EM particles can be set to be
-    //! discarded altogether.
-    ParticleCut(HEPEnergyType const eCut, bool const em, bool const inv);
+    /**
+     * Same, but with no output.
+     */
+    ParticleCut(HEPEnergyType const eEleCut, HEPEnergyType const ePhoCut,
+                HEPEnergyType const eHadCut, HEPEnergyType const eMuCut, bool const inv)
+        : ParticleCut(*(new EnergyLossWriterOff()), eEleCut, ePhoCut, eHadCut, eMuCut,
+                      inv) {}
 
-    //! threshold for specific particles redefined. EM and invisible particles can be set
-    //! to be discarded altogether.
+    /**
+     * Threshold for specific particles redefined. EM and invisible particles can be set
+     * to be discarded altogether.
+     */
+    ParticleCut(TOutput& output,
+                std::unordered_map<Code const, HEPEnergyType const> const& eCuts,
+                bool const inv);
+    /**
+     * Same, but with no output.
+     */
     ParticleCut(std::unordered_map<Code const, HEPEnergyType const> const& eCuts,
-                bool const em, bool const inv);
+                bool const inv)
+        : ParticleCut(*(new EnergyLossWriterOff()), eCuts, inv) {}
 
     template <typename TStackView>
     void doSecondaries(TStackView&);
@@ -62,8 +72,8 @@ namespace corsika {
       return meter * std::numeric_limits<double>::infinity();
     }
 
-    void printThresholds();
-    void showResults(); // LCOV_EXCL_LINE
+    void printThresholds() const;
+    void showResults() const; // LCOV_EXCL_LINE
     void reset();
 
     HEPEnergyType getElectronKineticECut() const {
@@ -84,11 +94,6 @@ namespace corsika {
     HEPEnergyType getTimeCutEnergy() const { return energy_timecut_; }
     //! returns total energy of particles that were removed by cut in kinetic energy
     HEPEnergyType getCutEnergy() const { return energy_cut_; }
-    //! returns total energy of particles that were removed by cut for electromagnetic
-    //! particles
-    HEPEnergyType getEmEnergy() const { return energy_emcut_; }
-    //! returns number of electromagnetic particles
-    unsigned int getNumberEmParticles() const { return em_count_; }
     //! returns number of invisible particles
     unsigned int getNumberInvParticles() const { return inv_count_; }
 
@@ -103,13 +108,11 @@ namespace corsika {
     bool isInvisible(Code const&) const;
 
   private:
-    bool doCutEm_;
+    TOutput& output_;
     bool doCutInv_;
     HEPEnergyType energy_cut_ = 0 * electronvolt;
     HEPEnergyType energy_timecut_ = 0 * electronvolt;
-    HEPEnergyType energy_emcut_ = 0 * electronvolt;
     HEPEnergyType energy_invcut_ = 0 * electronvolt;
-    unsigned int em_count_ = 0;
     unsigned int inv_count_ = 0;
     unsigned int energy_count_ = 0;
 
diff --git a/corsika/modules/TrackWriter.hpp b/corsika/modules/TrackWriter.hpp
index 79e1dfc91..cf21939d8 100644
--- a/corsika/modules/TrackWriter.hpp
+++ b/corsika/modules/TrackWriter.hpp
@@ -9,16 +9,16 @@
 #pragma once
 
 #include <corsika/framework/process/ContinuousProcess.hpp>
+#include <corsika/modules/writers/TrackWriterOff.hpp>
 #include <corsika/modules/writers/TrackWriterParquet.hpp>
 
 namespace corsika {
 
-  template <typename TOutputWriter = TrackWriterParquet>
-  class TrackWriter : public ContinuousProcess<TrackWriter<TOutputWriter>>,
-                      public TOutputWriter {
+  template <typename TOutput = TrackWriterOff>
+  class TrackWriter : public ContinuousProcess<TrackWriter<TOutput>> {
 
   public:
-    TrackWriter();
+    TrackWriter(TOutput& output);
 
     template <typename TParticle, typename TTrack>
     ProcessReturn doContinuous(TParticle const&, TTrack const&, bool const limitFlag);
@@ -27,6 +27,9 @@ namespace corsika {
     LengthType getMaxStepLength(TParticle const&, TTrack const&);
 
     YAML::Node getConfig() const;
+
+  private:
+    TOutput& output_;
   };
 
 } // namespace corsika
diff --git a/corsika/modules/conex/CONEXhybrid.hpp b/corsika/modules/conex/CONEXhybrid.hpp
index 0d3878e47..3fb23c95b 100644
--- a/corsika/modules/conex/CONEXhybrid.hpp
+++ b/corsika/modules/conex/CONEXhybrid.hpp
@@ -17,6 +17,12 @@
 #include <corsika/framework/geometry/Vector.hpp>
 #include <corsika/media/ShowerAxis.hpp>
 
+#include <corsika/modules/writers/EnergyLossWriterOff.hpp>
+#include <corsika/modules/writers/EnergyLossWriterParquet.hpp>
+
+#include <corsika/modules/writers/LongitudinalProfileWriterOff.hpp>
+#include <corsika/modules/writers/LongitudinalProfileWriterParquet.hpp>
+
 #include <corsika/modules/conex/CONEX_f.hpp>
 
 namespace corsika {
@@ -25,18 +31,48 @@ namespace corsika {
     LengthType constexpr earthRadius{6371315 * meter};
   } // namespace conex
 
-  class CONEXhybrid : public CascadeEquationsProcess<CONEXhybrid>,
-                      public SecondariesProcess<CONEXhybrid> {
+  template <typename TOutput = EnergyLossWriterOff,
+            typename TProfileOutput = LongitudinalProfileWriterOff>
+  class CONEXhybrid : public CascadeEquationsProcess<CONEXhybrid<TOutput, TProfileOutput>>,
+		      public SecondariesProcess<CONEXhybrid<TOutput, TProfileOutput>> {
+
 
   public:
-    CONEXhybrid(Point center, ShowerAxis const& showerAxis, LengthType groundDist,
+    /**
+     * Constructor with output sink specified.
+     *
+     * The output will be generated by the output sink.
+     *
+     * @param TOutput output data sink.
+     * @param center center of earth.
+     * @param showerAxis shower axis to convert geometry to grammage.
+     * @param groundDist distance to ground.
+     * @param injectionHeight height of injection.
+     * @param primaryEnergy energy of primary particle
+     * @param pdg type of primary particle.
+     */
+    CONEXhybrid(TOutput& output, TProfileOutput& outProfile, Point const& center,
+                ShowerAxis const& showerAxis, LengthType groundDist,
                 LengthType injectionHeight, HEPEnergyType primaryEnergy, PDGCode pdg);
 
     /**
-     * Main entry point to pass new particle data towards CONEX. If a
-     * particles is selected for CONEX, it is removed from the CORSIKA
-     * 8 stack.
+     * Constructor with no output sink specified.
+     *
+     * There will be no output generated.
+     *
+     * @param center center of earth.
+     * @param showerAxis shower axis to convert geometry to grammage.
+     * @param groundDist distance to ground.
+     * @param injectionHeight height of injection.
+     * @param primaryEnergy energy of primary particle
+     * @param pdg type of primary particle.
      */
+    CONEXhybrid(Point const& center, ShowerAxis const& showerAxis, LengthType groundDist,
+                LengthType injectionHeight, HEPEnergyType primaryEnergy, PDGCode pdg)
+        : CONEXhybrid(*(new EnergyLossWriterOff()), *(new LongitudinalProfileWriterOff()),
+                      center, showerAxis, groundDist, injectionHeight, primaryEnergy,
+                      pdg) {}
+
     template <typename TStackView>
     void doSecondaries(TStackView&);
 
@@ -66,6 +102,9 @@ namespace corsika {
     void reset();
 
   private:
+    TOutput& output_;
+    TProfileOutput& profileOutput_;
+
     // data members
     //! CONEX e.m. particle codes
     static std::array<std::pair<Code, int>, 3> constexpr egs_em_codes_{
diff --git a/corsika/modules/energy_loss/BetheBlochPDG.hpp b/corsika/modules/energy_loss/BetheBlochPDG.hpp
index 5f606b82d..83ad434a2 100644
--- a/corsika/modules/energy_loss/BetheBlochPDG.hpp
+++ b/corsika/modules/energy_loss/BetheBlochPDG.hpp
@@ -12,7 +12,10 @@
 #include <corsika/framework/geometry/Point.hpp>
 #include <corsika/framework/geometry/Vector.hpp>
 #include <corsika/framework/process/ContinuousProcess.hpp>
-#include <corsika/media/ShowerAxis.hpp>
+
+
+#include <corsika/modules/writers/EnergyLossWriterParquet.hpp>
+#include <corsika/modules/writers/EnergyLossWriterOff.hpp>
 
 #include <map>
 
@@ -33,20 +36,24 @@ namespace corsika {
    *
    */
 
-  class BetheBlochPDG : public ContinuousProcess<BetheBlochPDG> {
+  template <typename TOutput = EnergyLossWriterOff>
+  class BetheBlochPDG : public ContinuousProcess<BetheBlochPDG<TOutput>> {
 
     using MeVgcm2 = decltype(1e6 * electronvolt / gram * square(1e-2 * meter));
 
   public:
-    BetheBlochPDG(ShowerAxis const& showerAxis);
+    BetheBlochPDG(TOutput& output);
+
+    BetheBlochPDG()
+        : BetheBlochPDG(*(new EnergyLossWriterOff())) {}
 
-    /** clang-format-off
+    /**
      * Interface function of ContinuousProcess.
      *
-     * \param particle The particle to process in its current state
-     * \param track The trajectory in space of this particle, on which doContinuous should
-     *        act
-     * \param limitFlag flag to identify, if BetheBlochPDG::getMaxStepLength is the
+     * @param particle The particle to process in its current state
+     * @param track The trajectory in space of this particle, on which doContinuous
+     *should act
+     * @param limitFlag flag to identify, if BetheBlochPDG::getMaxStepLength is the
      *        globally limiting factor (or not)
      clang-format-on **/
     template <typename TParticle, typename TTrajectory>
@@ -55,9 +62,8 @@ namespace corsika {
 
     template <typename TParticle, typename TTrajectory>
     LengthType getMaxStepLength(TParticle const&,
-                                TTrajectory const&)
-        const; //! limited by the energy threshold! By default the limit is the particle
-               //! rest mass, i.e. kinetic energy is zero
+                                TTrajectory const&);
+    
     template <typename TParticle>
     static HEPEnergyType getBetheBloch(TParticle const&, const GrammageType);
 
@@ -70,8 +76,6 @@ namespace corsika {
     void showResults() const;
     void reset();
     HEPEnergyType getEnergyLost() const { return energy_lost_; }
-    void printProfile() const;
-    HEPEnergyType getTotal() const;
 
   private:
     template <typename TParticle>
@@ -80,11 +84,8 @@ namespace corsika {
     template <typename TTrajectory>
     void fillProfile(TTrajectory const&, HEPEnergyType);
 
-    GrammageType const dX_ = 10_g / square(1_cm); // profile binning
-    GrammageType const dX_threshold_ = 0.0001_g / square(1_cm);
-    ShowerAxis const& shower_axis_;
+    TOutput& output_;
     HEPEnergyType energy_lost_ = HEPEnergyType::zero();
-    std::vector<HEPEnergyType> profile_; // longitudinal profile
   };
 
 } // namespace corsika
diff --git a/corsika/modules/writers/EnergyLossWriterOff.hpp b/corsika/modules/writers/EnergyLossWriterOff.hpp
new file mode 100644
index 000000000..f921e2b86
--- /dev/null
+++ b/corsika/modules/writers/EnergyLossWriterOff.hpp
@@ -0,0 +1,69 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/output/BaseOutput.hpp>
+#include <corsika/framework/core/ParticleProperties.hpp>
+#include <corsika/framework/core/PhysicalUnits.hpp>
+#include <corsika/media/ShowerAxis.hpp>
+
+namespace corsika {
+
+  class EnergyLossWriterOff : public BaseOutput {
+
+  public:
+    /**
+     * Construct a new writer.
+     */
+    EnergyLossWriterOff() = default;
+
+    /**
+     * Called at the start of each library.
+     */
+    void startOfLibrary(boost::filesystem::path const&) final override {}
+
+    /**
+     * Called at the start of each shower.
+     */
+    void startOfShower(unsigned int const) final override {}
+
+    /**
+     * Called at the end of each shower.
+     */
+    void endOfShower(unsigned int const) final override {}
+
+    /**
+     * Called at the end of each library.
+     *
+     * This must also increment the run number since we override
+     * the default behaviour of BaseOutput.
+     */
+    void endOfLibrary() final override {}
+
+    /**
+     * Write a track to the file.
+     */
+    template <typename TTrack>
+    void write(TTrack const&, Code const, HEPEnergyType const) {}
+
+    /**
+     * Add localized energy loss.
+     */
+    void write(Point const&, Code const, HEPEnergyType const) {}
+
+    /**
+     * Add binned energy loss.
+     */
+    void write(GrammageType const, GrammageType const, HEPEnergyType const) {}
+
+    HEPEnergyType getTotal() const { return 0_GeV; }
+
+  }; // namespace corsika
+
+} // namespace corsika
diff --git a/corsika/modules/writers/EnergyLossWriterParquet.hpp b/corsika/modules/writers/EnergyLossWriterParquet.hpp
new file mode 100644
index 000000000..b200c8deb
--- /dev/null
+++ b/corsika/modules/writers/EnergyLossWriterParquet.hpp
@@ -0,0 +1,102 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/output/BaseOutput.hpp>
+#include <corsika/output/ParquetStreamer.hpp>
+#include <corsika/framework/core/ParticleProperties.hpp>
+#include <corsika/framework/core/PhysicalUnits.hpp>
+#include <corsika/media/ShowerAxis.hpp>
+
+#include <vector>
+#include <array>
+
+namespace corsika {
+
+  class EnergyLossWriterParquet : public BaseOutput {
+
+    enum class ProfileIndex { Total, Entries };
+    typedef std::array<HEPEnergyType, static_cast<int>(ProfileIndex::Entries)> Profile;
+
+  public:
+    /**
+     * Construct a new writer.
+     */
+    EnergyLossWriterParquet(
+        ShowerAxis const& axis,
+        GrammageType dX = 10_g / square(1_cm),                // profile binning
+        unsigned int const nBins = 200,                       // number of bins
+        GrammageType dX_threshold = 0.0001_g / square(1_cm)); // ignore too short tracks
+
+    /**
+     * Called at the start of each library.
+     */
+    void startOfLibrary(boost::filesystem::path const& directory) final override;
+
+    /**
+     * Called at the start of each shower.
+     */
+    void startOfShower(unsigned int const showerId) final override;
+
+    /**
+     * Called at the end of each shower.
+     */
+    void endOfShower(unsigned int const showerId) final override;
+
+    /**
+     * Called at the end of each library.
+     *
+     * This must also increment the run number since we override
+     * the default behaviour of BaseOutput.
+     */
+    void endOfLibrary() final override;
+
+    /**
+     * Add continuous energy loss.
+     */
+    template <typename TTrack>
+    void write(TTrack const& track, Code const PID, HEPEnergyType const dE);
+
+    /**
+     * Add localized energy loss.
+     */
+    void write(Point const& point, Code const PID, HEPEnergyType const dE);
+
+    /**
+     * Add binned energy loss.
+     */
+    void write(GrammageType const Xstart, GrammageType const Xend,
+               HEPEnergyType const dE);
+
+    /**
+     * Get total observed energy loss.
+     *
+     * @return HEPEnergyType The total energy.
+     */
+    HEPEnergyType getTotal() const;
+
+    /**
+     * Returns library-wide summary.
+     */
+    YAML::Node getSummary() const;
+
+  public:
+    ParquetStreamer output_; ///< The primary output file.
+
+    ShowerAxis const& showerAxis_; ///< conversion between geometry and grammage
+    GrammageType dX_;              ///< binning of profile.
+    unsigned int nBins_;           ///< number of profile bins.
+    GrammageType dX_threshold_;    ///< too short tracks are discarded.
+    std::vector<Profile> profile_; // longitudinal profile
+
+  }; // namespace corsika
+
+} // namespace corsika
+
+#include <corsika/detail/modules/writers/EnergyLossWriterParquet.inl>
diff --git a/corsika/modules/writers/LongitudinalProfileWriterOff.hpp b/corsika/modules/writers/LongitudinalProfileWriterOff.hpp
new file mode 100644
index 000000000..2ebe6de12
--- /dev/null
+++ b/corsika/modules/writers/LongitudinalProfileWriterOff.hpp
@@ -0,0 +1,56 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/output/BaseOutput.hpp>
+#include <corsika/framework/core/ParticleProperties.hpp>
+#include <corsika/framework/core/PhysicalUnits.hpp>
+#include <corsika/media/ShowerAxis.hpp>
+
+namespace corsika {
+
+  class LongitudinalProfileWriterOff : public BaseOutput {
+
+  public:
+    /**
+     * Construct a new writer.
+     */
+    LongitudinalProfileWriterOff() = default;
+
+    /**
+     * Called at the start of each library.
+     */
+    void startOfLibrary(boost::filesystem::path const&) final override {}
+
+    /**
+     * Called at the start of each shower.
+     */
+    void startOfShower(unsigned int const) final override {}
+
+    /**
+     * Called at the end of each shower.
+     */
+    void endOfShower(unsigned int const) final override {}
+
+    /**
+     * Called at the end of each library.
+     *
+     * This must also increment the run number since we override
+     * the default behaviour of BaseOutput.
+     */
+    void endOfLibrary() final override {}
+
+    template <typename TTrack>
+    void write(TTrack const&, Code const, double const) {}
+
+    void write(GrammageType const, GrammageType const, Code const, double const) {}
+
+  }; // namespace corsika
+
+} // namespace corsika
diff --git a/corsika/modules/writers/LongitudinalProfileWriterParquet.hpp b/corsika/modules/writers/LongitudinalProfileWriterParquet.hpp
new file mode 100644
index 000000000..7c9e7253b
--- /dev/null
+++ b/corsika/modules/writers/LongitudinalProfileWriterParquet.hpp
@@ -0,0 +1,103 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/output/BaseOutput.hpp>
+#include <corsika/output/ParquetStreamer.hpp>
+#include <corsika/framework/core/ParticleProperties.hpp>
+#include <corsika/framework/core/PhysicalUnits.hpp>
+#include <corsika/media/ShowerAxis.hpp>
+
+#include <vector>
+#include <array>
+#include <string>
+
+namespace corsika {
+
+  class LongitudinalProfileWriterParquet : public BaseOutput {
+
+    enum class ProfileIndex {
+      Charged,
+      Hadron,
+      Photon,
+      Electron,
+      Positron,
+      MuPlus,
+      MuMinus,
+      Entries
+    };
+
+    static std::array<
+        char const*, static_cast<int>(ProfileIndex::Entries)> constexpr ProfileIndexNames{
+        {"charged", "hadron", "photon", "electron", "positron", "muplus", "muminus"}};
+
+    typedef std::array<double, static_cast<int>(ProfileIndex::Entries)> ProfileData;
+
+  public:
+    /**
+     * Construct a new writer.
+     */
+    LongitudinalProfileWriterParquet(ShowerAxis const& axis,
+                                     GrammageType dX = 10_g /
+                                                       square(1_cm),  // profile binning
+                                     unsigned int const nBins = 200); // number of bins
+
+    /**
+     * Called at the start of each library.
+     */
+    void startOfLibrary(boost::filesystem::path const& directory) final override;
+
+    /**
+     * Called at the start of each shower.
+     */
+    void startOfShower(unsigned int const showerId) final override;
+
+    /**
+     * Called at the end of each shower.
+     */
+    void endOfShower(unsigned int const showerId) final override;
+
+    /**
+     * Called at the end of each library.
+     *
+     * This must also increment the run number since we override
+     * the default behaviour of BaseOutput.
+     */
+    void endOfLibrary() final override;
+
+    /**
+     * Add continuous profile.
+     */
+    template <typename TTrack>
+    void write(TTrack const& track, Code const PID, double const weight);
+
+    /**
+     * Add binned profile.
+     */
+    void write(GrammageType const Xstart, GrammageType const Xend, Code const PID,
+               double const weight);
+
+    /**
+     * Returns library-wide summary.
+     */
+    YAML::Node getSummary() const;
+
+  public:
+    ParquetStreamer output_; ///< The primary output file.
+
+    ShowerAxis const& showerAxis_;     ///< conversion between geometry and grammage
+    GrammageType dX_;                  ///< binning of profile.
+    unsigned int nBins_;               ///< number of profile bins.
+    std::vector<ProfileData> profile_; // longitudinal profile
+
+  }; // namespace corsika
+
+} // namespace corsika
+
+#include <corsika/detail/modules/writers/LongitudinalProfileWriterParquet.inl>
diff --git a/corsika/modules/writers/ParticleWriterOff.hpp b/corsika/modules/writers/ParticleWriterOff.hpp
new file mode 100644
index 000000000..6f036a74b
--- /dev/null
+++ b/corsika/modules/writers/ParticleWriterOff.hpp
@@ -0,0 +1,39 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/output/BaseOutput.hpp>
+#include <corsika/framework/core/PhysicalUnits.hpp>
+#include <corsika/framework/core/ParticleProperties.hpp>
+
+namespace corsika {
+
+  class ParticleWriterOff : public BaseOutput {
+
+  public:
+    ParticleWriterOff() {}
+    virtual ~ParticleWriterOff() {}
+
+    void startOfLibrary(boost::filesystem::path const&) final override {}
+
+    void endOfShower(unsigned int const) final override {}
+
+    void endOfLibrary() final override {}
+
+    // for pdg particles
+    void write(Code const&, HEPEnergyType const&, LengthType const&, LengthType const&,
+               double const) {}
+
+    // for nuclei
+    void write(unsigned int const, unsigned int const, HEPEnergyType const&,
+               LengthType const&, LengthType const&, double const) {}
+
+  }; // class ParticleWriterOff
+
+} // namespace corsika
diff --git a/corsika/modules/writers/ParticleWriterParquet.hpp b/corsika/modules/writers/ParticleWriterParquet.hpp
new file mode 100644
index 000000000..dd5bd1c94
--- /dev/null
+++ b/corsika/modules/writers/ParticleWriterParquet.hpp
@@ -0,0 +1,88 @@
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <corsika/output/BaseOutput.hpp>
+#include <corsika/output/ParquetStreamer.hpp>
+#include <corsika/framework/core/ParticleProperties.hpp>
+#include <corsika/framework/core/PhysicalUnits.hpp>
+
+namespace corsika {
+
+  class ParticleWriterParquet : public BaseOutput {
+
+  public:
+    /**
+     * Construct an ObservationPlane.
+     */
+    ParticleWriterParquet();
+
+    /**
+     * Called at the start of each library.
+     */
+    void startOfLibrary(boost::filesystem::path const& directory) final override;
+
+    /**
+     * Called at the beginning of each shower.
+     */
+    void startOfShower(unsigned int const showerId) final override;
+
+    /**
+     * Called at the end of each shower.
+     */
+    void endOfShower(unsigned int const showerId) final override;
+
+    /**
+     * Called at the end of each library.
+     *
+     * This must also increment the run number since we override
+     * the default behaviour of BaseOutput.
+     */
+    void endOfLibrary() final override;
+
+    /**
+     * Write a PDG/corsika::Code particle to the file.
+     */
+    void write(Code const& pid, units::si::HEPEnergyType const& energy,
+               units::si::LengthType const& x, units::si::LengthType const& y,
+               const double weight);
+
+    /**
+     * Write a Code::Nucleus particle to the file.
+     */
+    void write(unsigned int const A, unsigned int const Z,
+               units::si::HEPEnergyType const& energy, units::si::LengthType const& x,
+               units::si::LengthType const& y, const double weight);
+
+    /**
+     * Return collected library-level summary for output.
+     */
+    YAML::Node getSummary() const final override;
+
+    /**
+     * If plane is absorbing particles: return the total energy absorbed.
+     */
+    HEPEnergyType getEnergyGround() const { return energyGround_; }
+
+  private:
+    ParquetStreamer output_; ///< The primary output file.
+    unsigned int showerId_;  ///< current shower Id
+
+    double countHadrons_ = 0; ///< count hadrons hitting plane
+    double countMuons_ = 0;   ///< count muons hitting plane
+    double countEM_ = 0;      ///< count EM particles hitting plane.
+    double countOthers_ = 0;  ///< count othe types of particles hitting plane
+
+    HEPEnergyType energyGround_; ///< energy absorbed in ground.
+
+  }; // class ParticleWriterParquet
+
+} // namespace corsika
+
+#include <corsika/detail/modules/writers/ParticleWriterParquet.inl>
diff --git a/corsika/modules/writers/ObservationPlaneWriterParquet.hpp b/corsika/modules/writers/TrackWriterOff.hpp
similarity index 56%
rename from corsika/modules/writers/ObservationPlaneWriterParquet.hpp
rename to corsika/modules/writers/TrackWriterOff.hpp
index 5963aa100..cbee5584a 100644
--- a/corsika/modules/writers/ObservationPlaneWriterParquet.hpp
+++ b/corsika/modules/writers/TrackWriterOff.hpp
@@ -9,33 +9,36 @@
 #pragma once
 
 #include <corsika/output/BaseOutput.hpp>
-#include <corsika/output/ParquetStreamer.hpp>
 #include <corsika/framework/core/ParticleProperties.hpp>
 #include <corsika/framework/core/PhysicalUnits.hpp>
+#include <corsika/framework/geometry/QuantityVector.hpp>
 
 namespace corsika {
 
-  class ObservationPlaneWriterParquet : public BaseOutput {
-
-    ParquetStreamer output_; ///< The primary output file.
+  class TrackWriterOff : public BaseOutput {
 
   public:
     /**
-     * Construct an ObservationPlane.
+     * Construct a new writer.
      *
      * @param name    The name of this output.
      */
-    ObservationPlaneWriterParquet();
+    TrackWriterOff() = default;
 
     /**
      * Called at the start of each library.
      */
-    void startOfLibrary(boost::filesystem::path const& directory) final override;
+    void startOfLibrary(boost::filesystem::path const&) final override {}
+
+    /**
+     * Called at the start of each shower.
+     */
+    void startOfShower(unsigned int const) final override {}
 
     /**
      * Called at the end of each shower.
      */
-    void endOfShower() final override;
+    void endOfShower(unsigned int const) final override {}
 
     /**
      * Called at the end of each library.
@@ -43,18 +46,15 @@ namespace corsika {
      * This must also increment the run number since we override
      * the default behaviour of BaseOutput.
      */
-    void endOfLibrary() final override;
+    void endOfLibrary() final override {}
 
-  protected:
     /**
-     * Write a particle to the file.
+     * Write a track to the file.
      */
     void write(Code const& pid, units::si::HEPEnergyType const& energy,
-               units::si::LengthType const& x, units::si::LengthType const& y,
-               units::si::TimeType const& t);
+               double const weight, QuantityVector<length_d> const& start,
+               QuantityVector<length_d> const& end) {}
 
-  }; // class ObservationPlaneWriterParquet
+  }; // class TrackWriterOff
 
 } // namespace corsika
-
-#include <corsika/detail/modules/writers/ObservationPlaneWriterParquet.inl>
diff --git a/corsika/modules/writers/TrackWriterParquet.hpp b/corsika/modules/writers/TrackWriterParquet.hpp
index 8473cbf7f..b1f519519 100644
--- a/corsika/modules/writers/TrackWriterParquet.hpp
+++ b/corsika/modules/writers/TrackWriterParquet.hpp
@@ -31,10 +31,15 @@ namespace corsika {
      */
     void startOfLibrary(boost::filesystem::path const& directory) final override;
 
+    /**
+     * Called at the start of each shower.
+     */
+    void startOfShower(unsigned int const showerId) final override;
+
     /**
      * Called at the end of each shower.
      */
-    void endOfShower() final override;
+    void endOfShower(unsigned int const showerId) final override;
 
     /**
      * Called at the end of each library.
@@ -54,6 +59,7 @@ namespace corsika {
 
   private:
     ParquetStreamer output_; ///< The primary output file.
+    unsigned int showerId_;  ///< event Id counter
 
   }; // class TrackWriterParquet
 
diff --git a/corsika/output/BaseOutput.hpp b/corsika/output/BaseOutput.hpp
index 0024c5b7a..8581f1fa4 100644
--- a/corsika/output/BaseOutput.hpp
+++ b/corsika/output/BaseOutput.hpp
@@ -7,8 +7,8 @@
  */
 #pragma once
 
+#include <corsika/framework/core/Logging.hpp>
 #include <boost/filesystem.hpp>
-
 #include <yaml-cpp/yaml.h>
 
 namespace corsika {
@@ -20,7 +20,8 @@ namespace corsika {
   class BaseOutput {
 
   protected:
-    BaseOutput();
+    BaseOutput() = default;
+    virtual ~BaseOutput() = default;
 
   public:
     /**
@@ -30,13 +31,17 @@ namespace corsika {
 
     /**
      * Called at the start of each event/shower.
+     *
+     * @param showerId Shower counter.
      */
-    virtual void startOfShower() {}
+    virtual void startOfShower(unsigned int const /*showerId*/) {}
 
     /**
      * Called at the end of each event/shower.
+     *
+     * @param showerId Shower counter.
      */
-    virtual void endOfShower() = 0;
+    virtual void endOfShower(unsigned int const showerId) = 0;
 
     /**
      * Called at the end of each run.
@@ -44,19 +49,19 @@ namespace corsika {
     virtual void endOfLibrary() = 0;
 
     /**
-     * Get the configuration of this output.
+     * Flag to indicate readiness.
      */
-    virtual YAML::Node getConfig() const = 0;
+    bool isInit() const { return is_init_; }
 
     /**
-     * Get any summary information for the entire library.
+     * The output logger.
      */
-    virtual YAML::Node getSummary() { return YAML::Node(); }
+    static auto getLogger() { return logger_; }
 
     /**
-     * Flag to indicate readiness.
+     * Provide YAML Summary for this BaseOutput.
      */
-    bool isInit() const { return is_init_; }
+    virtual YAML::Node getSummary() const { return YAML::Node(); }
 
   protected:
     /**
@@ -64,12 +69,9 @@ namespace corsika {
      */
     void setInit(bool const v) { is_init_ = v; }
 
-    int shower_{0}; ///< The current event number.
-
   private:
-    bool is_init_{false}; ///< flag to indicate readiness
+    bool is_init_{false};                             ///< flag to indicate readiness
+    inline static auto logger_{get_logger("output")}; ///< A custom logger.
   };
 
 } // namespace corsika
-
-#include <corsika/detail/output/BaseOutput.inl>
diff --git a/corsika/output/DummyOutputManager.hpp b/corsika/output/DummyOutputManager.hpp
index fc5cf4647..e7bc1fa76 100644
--- a/corsika/output/DummyOutputManager.hpp
+++ b/corsika/output/DummyOutputManager.hpp
@@ -12,7 +12,7 @@ namespace corsika {
   /*!
    * An output manager that does nothing.
    */
-  class DummyOutputManager final {
+  class DummyOutputManager {
 
   public:
     /**
diff --git a/corsika/output/NoOutput.hpp b/corsika/output/NoOutput.hpp
index 788149b70..ddcabc8ef 100644
--- a/corsika/output/NoOutput.hpp
+++ b/corsika/output/NoOutput.hpp
@@ -30,12 +30,12 @@ namespace corsika {
     /**
      * Called at the start of each event/shower.
      */
-    void startOfShower() final override {}
+    void startOfShower(unsigned int const) final override {}
 
     /**
      * Called at the end of each event/shower.
      */
-    void endOfShower() final override {}
+    void endOfShower(unsigned int const) final override {}
 
     /**
      * Called at the end of each run.
@@ -58,5 +58,3 @@ namespace corsika {
   };
 
 } // namespace corsika
-
-#include <corsika/detail/output/BaseOutput.inl>
diff --git a/corsika/output/OutputManager.hpp b/corsika/output/OutputManager.hpp
index c3e570749..a96983ed6 100644
--- a/corsika/output/OutputManager.hpp
+++ b/corsika/output/OutputManager.hpp
@@ -10,15 +10,16 @@
 #include <chrono>
 #include <string>
 #include <boost/filesystem.hpp>
-#include <corsika/output/BaseOutput.hpp>
 #include <corsika/framework/core/Logging.hpp>
+#include <corsika/output/BaseOutput.hpp>
+#include <corsika/output/YAMLStreamer.hpp>
 
 namespace corsika {
 
   /*!
    * Manages CORSIKA 8 output streams.
    */
-  class OutputManager final {
+  class OutputManager : public YAMLStreamer {
 
     /**
      * Indicates the current state of this manager.
@@ -30,65 +31,61 @@ namespace corsika {
       LibraryFinished,
     };
 
-    OutputState state_{OutputState::NoInit}; ///< The current state of this manager.
-    std::string const name_;                 ///< The name of this simulation file.
-    boost::filesystem::path const root_;     ///< The top-level directory for the output.
-    int count_{0};                           ///< The current ID of this shower.
-    std::chrono::time_point<std::chrono::system_clock> const start_time{
-        std::chrono::system_clock::now()};           ///< The time the manager is created.
-    inline static auto logger{get_logger("output")}; ///< A custom logger.
+  public:
     /**
-     * The outputs that have been registered with us.
+     * Construct an OutputManager instance with a name in a given directory.
+     *
+     * @param name    The name of this output collection.
+     * @param dir     The directory where the output directory will be stored.
      */
-    std::map<std::string, std::reference_wrapper<BaseOutput>> outputs_;
+    OutputManager(std::string const& name, boost::filesystem::path const& dir);
 
     /**
-     * Write a YAML-node to a file.
+     * Handle graceful closure of the outputs upon destruction.
      */
-    void writeNode(YAML::Node const& node, boost::filesystem::path const& path) const;
+    ~OutputManager();
 
-    /**
-     * Write the top-level config of this simulation.
-     */
-    void writeTopLevelConfig() const;
+    template <typename TOutput>
+    void add(std::string const& name, TOutput& output);
 
     /**
-     * Initialize the "registered" output with a given name.
+     * Produces the summary YAML.
+     *
+     * @return YAML::Node
      */
-    void initOutput(std::string const& name) const;
+    YAML::Node getSummary() const;
 
     /**
-     * Write the top-level summary of this library.
+     * Produces the config YAML.
+     *
+     * @return YAML::Node
      */
-    void writeTopLevelSummary() const;
+    YAML::Node getConfig() const;
 
-  public:
+  private:
     /**
-     * Construct an OutputManager instance with a name in a given directory.
-     *
-     * @param name    The name of this output collection.
-     * @param dir     The directory where the output directory will be stored.
+     * Write the top-level config of this simulation.
      */
-    OutputManager(std::string const& name, boost::filesystem::path const& dir);
+    void writeConfig() const;
 
     /**
-     * Handle graceful closure of the outputs upon destruction.
+     * Write the top-level summary of this library.
      */
-    ~OutputManager();
+    void writeSummary() const;
 
     /**
-     * Register an existing output to this manager.
+     * Called at the start of each library.
+     *
+     * This iteratively calls startOfLibrary on each registered output relative to the
+     * library direcotry.
      *
-     * @param name   The unique name of this output.
-     * @param output The output module.
+     * @param dir location of library
      */
-    template <typename TOutput>
-    void add(std::string const& name, TOutput& output);
+    void startOfLibrary(boost::filesystem::path const& dir);
 
+  public:
     /**
      * Called at the start of each library.
-     *
-     * This iteratively calls startOfLibrary on each registered output.
      */
     void startOfLibrary();
 
@@ -110,6 +107,24 @@ namespace corsika {
      */
     void endOfLibrary();
 
+    /**
+     * Return current event number.
+     */
+    int getEventId() const;
+
+  private:
+    boost::filesystem::path root_;           ///< The unique output directory.
+    OutputState state_{OutputState::NoInit}; ///< The current state of this manager.
+    std::string const name_;                 ///< The name of this simulation file.
+    int count_{0};                           ///< The current ID of this shower.
+    std::chrono::time_point<std::chrono::system_clock> const start_time{
+        std::chrono::system_clock::now()}; ///< The time the manager is created.
+    inline static auto logger_{get_logger("output")}; ///< A custom logger.
+    /**
+     * The outputs that have been registered here.
+     */
+    std::map<std::string, std::reference_wrapper<BaseOutput>> outputs_;
+
   }; // class OutputManager
 
 } // namespace corsika
diff --git a/corsika/output/YAMLStreamer.hpp b/corsika/output/YAMLStreamer.hpp
new file mode 100644
index 000000000..e0b333ccc
--- /dev/null
+++ b/corsika/output/YAMLStreamer.hpp
@@ -0,0 +1,30 @@
+
+/*
+ * (c) Copyright 2021 CORSIKA Project, corsika-project@lists.kit.edu
+ *
+ * This software is distributed under the terms of the GNU General Public
+ * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
+ * the license.
+ */
+
+#pragma once
+
+#include <boost/filesystem.hpp>
+#include <yaml-cpp/yaml.h>
+#include <string>
+
+namespace corsika {
+
+  /**
+   * This class automates the construction of simple tabular
+   * YAML files using the YAML::StreamWriter.
+   */
+  class YAMLStreamer {
+  public:
+    inline void writeYAML(YAML::Node const& node,
+                          boost::filesystem::path const& path) const;
+
+  }; // class YAMLStreamer
+} // namespace corsika
+
+#include <corsika/detail/output/YAMLStreamer.inl>
diff --git a/tests/framework/testLogging.cpp b/tests/framework/testLogging.cpp
index bf92cf8df..5b4fc2233 100644
--- a/tests/framework/testLogging.cpp
+++ b/tests/framework/testLogging.cpp
@@ -107,7 +107,7 @@ TEST_CASE("Logging", "[Logging]") {
 
     // these print with the "loggerE" logger
     CORSIKA_LOGGER_INFO(logger, "(8) test macro style logging");
-    CORSIKA_LOGGER_WARN(logger, "(8) test macro style logging");
+    CORSIKA_LOGGER_WARN(logger, "(8) test macro {} logging", "style");
 
     // reset the logging pattern
     logging::reset_pattern(logger);
diff --git a/tests/modules/testObservationPlane.cpp b/tests/modules/testObservationPlane.cpp
index 6411d9738..5457cd622 100644
--- a/tests/modules/testObservationPlane.cpp
+++ b/tests/modules/testObservationPlane.cpp
@@ -17,8 +17,6 @@
 #include <corsika/framework/core/ParticleProperties.hpp>
 #include <corsika/framework/core/PhysicalUnits.hpp>
 
-#include <corsika/output/NoOutput.hpp>
-
 #include <SetupTestEnvironment.hpp>
 #include <SetupTestStack.hpp>
 #include <SetupTestTrajectory.hpp>
diff --git a/tests/modules/testParticleCut.cpp b/tests/modules/testParticleCut.cpp
index ef2b6c636..a1803af75 100644
--- a/tests/modules/testParticleCut.cpp
+++ b/tests/modules/testParticleCut.cpp
@@ -52,7 +52,7 @@ TEST_CASE("ParticleCut", "process,continuous,secondary") {
   SECTION("cut on particle type: inv") {
 
     // particle cut with 20GeV threshold for all, also cut invisible
-    ParticleCut cut(20_GeV, false, true);
+    ParticleCut cut(20_GeV, 20_GeV, 20_GeV, 20_GeV, true);
     CHECK(cut.getHadronKineticECut() == 20_GeV);
 
     // add primary particle to stack
@@ -105,7 +105,7 @@ TEST_CASE("ParticleCut", "process,continuous,secondary") {
   }
 
   SECTION("cut low energy") {
-    ParticleCut cut(20_GeV, true, true);
+    ParticleCut cut(20_GeV, 20_GeV, 20_GeV, 20_GeV, true);
 
     // add primary particle to stack
     auto particle = stack.addParticle(std::make_tuple(
@@ -176,7 +176,7 @@ TEST_CASE("ParticleCut", "process,continuous,secondary") {
   }
 
   SECTION("cut on time") {
-    ParticleCut cut(20_GeV, false, false);
+    ParticleCut cut(20_GeV, 20_GeV, 20_GeV, 20_GeV, false);
     const TimeType too_late = 1_s;
 
     // add primary particle to stack
@@ -206,10 +206,9 @@ TEST_CASE("ParticleCut", "process,continuous,secondary") {
       Line{point0, VelocityVector{rootCS, {0_m / second, 0_m / second, -constants::c}}},
       12_m / constants::c);
 
-  SECTION("cut on DoContinous, just invisibles") {
+  SECTION("cut on doContinous, just invisibles") {
 
-    ParticleCut cut(20_GeV, false, true);
-    CHECK(cut.getHadronKineticECut() == 20_GeV);
+    ParticleCut cut(20_GeV, 20_GeV, 20_GeV, 20_GeV, true);
 
     // add particles, all with energies above the threshold
     // only cut is by species
diff --git a/tests/output/testOutputManager.cpp b/tests/output/testOutputManager.cpp
index c9194abfa..b57a8be95 100644
--- a/tests/output/testOutputManager.cpp
+++ b/tests/output/testOutputManager.cpp
@@ -20,43 +20,35 @@ using namespace corsika;
 struct DummyNoOutput : public NoOutput {
   void check() {
     NoOutput::startOfLibrary("./");
-    NoOutput::startOfShower();
-    NoOutput::endOfShower();
+    NoOutput::startOfShower(0);
+    NoOutput::endOfShower(0);
     NoOutput::endOfLibrary();
-    NoOutput::getConfig();
-    NoOutput::getSummary();
   }
   void checkWrite() { NoOutput::write(Code::Unknown, 1_eV, 1_m, 1_m, 1_ns); }
 };
 
 struct DummyOutput : public BaseOutput {
 
-  mutable bool isConfig_ = false;
-  mutable bool isSummary_ = false;
   bool startLibrary_ = false;
   bool startShower_ = false;
   bool endLibrary_ = false;
   bool endShower_ = false;
 
-  void startOfLibrary(boost::filesystem::path const&) { startLibrary_ = true; }
+  void startOfLibrary(boost::filesystem::path const&) override { startLibrary_ = true; }
 
-  void startOfShower() {
-    BaseOutput::startOfShower();
+  void startOfShower(unsigned int const shower = 0) override {
+    BaseOutput::startOfShower(shower);
     startShower_ = true;
   }
 
-  void endOfShower() { endShower_ = true; }
+  void endOfShower(unsigned int const) override { endShower_ = true; }
 
-  void endOfLibrary() { endLibrary_ = true; }
+  void endOfLibrary() override { endLibrary_ = true; }
 
-  YAML::Node getConfig() const {
-    isConfig_ = true;
-    return YAML::Node();
-  }
-
-  YAML::Node getSummary() {
-    isSummary_ = true;
-    return BaseOutput::getSummary();
+  YAML::Node getSummary() const final override {
+    YAML::Node summary;
+    summary["test"] = "test";
+    return summary;
   }
 };
 
@@ -83,9 +75,6 @@ TEST_CASE("OutputManager") {
         "test",
         test)); // should emit warning which cannot be catched, but no action or failure
 
-    CHECK(test.isConfig_);
-    test.isConfig_ = false;
-
     output.startOfLibrary();
     CHECK(test.startLibrary_);
     test.startLibrary_ = false;
@@ -100,11 +89,7 @@ TEST_CASE("OutputManager") {
 
     output.endOfLibrary();
     CHECK(test.endLibrary_);
-    CHECK(test.isSummary_);
-    test.isSummary_ = false;
     test.endLibrary_ = false;
-
-    CHECK(boost::filesystem::exists("./out_test/check/test/summary.yaml"));
   }
 
   SECTION("auto-write") {
@@ -144,33 +129,21 @@ TEST_CASE("OutputManager") {
     OutputManager output("check", "./out_test");
     CHECK_THROWS(new OutputManager("check", "./out_test"));
 
-    // CHECK_THROWS(output.startOfShower());
-    // CHECK_THROWS(output.endOfShower());
     CHECK_THROWS(output.endOfLibrary());
 
     output.startOfLibrary();
 
     CHECK_THROWS(output.startOfLibrary());
-    // CHECK_THROWS(output.endOfShower());
-    // CHECK_THROWS(output.endOfLibrary());
 
     output.startOfShower();
 
     CHECK_THROWS(output.startOfLibrary());
-    // CHECK_THROWS(output.startOfShower());
-    // CHECK_THROWS(output.endOfLibrary());
 
     output.endOfShower();
 
     CHECK_THROWS(output.startOfLibrary());
-    // CHECK_THROWS(output.startOfShower());
-    // CHECK_THROWS(output.endOfShower());
 
     output.endOfLibrary();
-
-    // CHECK_THROWS(output.endOfShower());
-    // CHECK_THROWS(output.startOfShower());
-    // CHECK_THROWS(output.endOfLibrary());
   }
 
   SECTION("NoOutput") {
diff --git a/tests/output/testParquetStreamer.cpp b/tests/output/testParquetStreamer.cpp
index 387e5cade..13ade2359 100644
--- a/tests/output/testParquetStreamer.cpp
+++ b/tests/output/testParquetStreamer.cpp
@@ -36,17 +36,17 @@ TEST_CASE("ParquetStreamer") {
     test.addField("testfloat", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
                   parquet::ConvertedType::NONE);
 
-    test.enableCompression(1);
+    //    test.enableCompression(1); needs to be enabled via conan
 
     test.buildStreamer();
     CHECK(test.isInit());
 
+    unsigned int testId = 2;
     int testint = 1;
     double testfloat = 2.0;
 
     std::shared_ptr<parquet::StreamWriter> writer = test.getWriter();
-    (*writer) << static_cast<int>(testint) << static_cast<int>(testint)
-              << static_cast<float>(testfloat) << parquet::EndRow;
+    (*writer) << testId << testint << static_cast<float>(testfloat) << parquet::EndRow;
 
     test.closeStreamer();
     CHECK_THROWS(test.getWriter());
diff --git a/tests/output/testWriterObservationPlane.cpp b/tests/output/testWriterObservationPlane.cpp
index 7289ccc81..71fefabaa 100644
--- a/tests/output/testWriterObservationPlane.cpp
+++ b/tests/output/testWriterObservationPlane.cpp
@@ -10,20 +10,18 @@
 
 #include <boost/filesystem.hpp>
 
-#include <corsika/modules/writers/ObservationPlaneWriterParquet.hpp>
+#include <corsika/modules/writers/ParticleWriterParquet.hpp>
 
 #include <corsika/framework/core/Logging.hpp>
 #include <corsika/framework/geometry/QuantityVector.hpp>
 
 using namespace corsika;
 
-struct TestWriterPlane : public ObservationPlaneWriterParquet {
+struct TestWriterPlane : public ParticleWriterParquet {
 
   YAML::Node getConfig() const { return YAML::Node(); }
 
-  void checkWrite() {
-    ObservationPlaneWriterParquet::write(Code::Unknown, 1_eV, 2_m, 3_m, 4_ns);
-  }
+  void checkWrite() { ParticleWriterParquet::write(Code::Unknown, 1_eV, 2_m, 3_m, 1.0); }
 };
 
 TEST_CASE("ObservationPlaneWriterParquet") {
@@ -40,9 +38,9 @@ TEST_CASE("ObservationPlaneWriterParquet") {
 
     TestWriterPlane test;
     test.startOfLibrary("./output_dir");
-    test.startOfShower();
+    test.startOfShower(0);
     test.checkWrite();
-    test.endOfShower();
+    test.endOfShower(0);
     test.endOfLibrary();
 
     CHECK(boost::filesystem::exists("./output_dir/particles.parquet"));
diff --git a/tests/output/testWriterTracks.cpp b/tests/output/testWriterTracks.cpp
index 43fbf8870..ea1cbcda8 100644
--- a/tests/output/testWriterTracks.cpp
+++ b/tests/output/testWriterTracks.cpp
@@ -21,8 +21,7 @@ struct TestWriterTrack : public TrackWriterParquet {
   YAML::Node getConfig() const { return YAML::Node(); }
 
   void checkWrite() {
-    TrackWriterParquet::write(Code::Unknown, 1_eV, {2_m, 3_m, 4_m}, 5_s, {6_m, 7_m, 8_m},
-                              9_s);
+    TrackWriterParquet::write(Code::Unknown, 1_eV, 1.0, {2_m, 3_m, 4_m}, {5_m, 6_m, 7_m});
   }
 };
 
@@ -40,9 +39,9 @@ TEST_CASE("TrackWriterParquet") {
 
     TestWriterTrack test;
     test.startOfLibrary("./output_dir_tracks");
-    test.startOfShower();
+    test.startOfShower(0);
     test.checkWrite();
-    test.endOfShower();
+    test.endOfShower(0);
     test.endOfLibrary();
 
     CHECK(boost::filesystem::exists("./output_dir_tracks/tracks.parquet"));
-- 
GitLab