From 74063a6f18706d2045df406ec314c22fcfa63801 Mon Sep 17 00:00:00 2001
From: Remy Prechelt <prechelt@hawaii.edu>
Date: Sun, 25 Oct 2020 00:50:35 -1000
Subject: [PATCH] Change member to private inheritance.

This is the first commit to generate correct, readable Parquet files.
---
 Outputs/ObservationPlaneWriterParquet.h       | 37 +++++++------
 Outputs/OutputManager.h                       | 18 +++++--
 Outputs/ParquetStreamer.h                     | 42 ++++++---------
 Processes/ObservationPlane/ObservationPlane.h | 54 +++++++++++--------
 4 files changed, 81 insertions(+), 70 deletions(-)

diff --git a/Outputs/ObservationPlaneWriterParquet.h b/Outputs/ObservationPlaneWriterParquet.h
index 8c27b099d..7a87709e2 100644
--- a/Outputs/ObservationPlaneWriterParquet.h
+++ b/Outputs/ObservationPlaneWriterParquet.h
@@ -15,7 +15,7 @@
 
 namespace corsika::output {
 
-  class ObservationPlaneWriterParquet : public BaseOutput {
+  class ObservationPlaneWriterParquet : public BaseOutput, private ParquetStreamer {
 
   public:
     /**
@@ -24,7 +24,8 @@ namespace corsika::output {
      * @param name    The name of this output.
      */
     ObservationPlaneWriterParquet(std::string const& name)
-        : name_(name)
+        : ParquetStreamer()
+        , name_(name)
         , event_(0){};
 
     /**
@@ -33,20 +34,21 @@ namespace corsika::output {
     void StartOfRun(std::filesystem::path const& directory) final {
 
       // setup the streamer
-      streamer_.Init((directory / "particles.parquet").string());
+      InitStreamer((directory / "particles.parquet").string());
 
       // build the schema
-      streamer_.AddField("event", parquet::Repetition::REQUIRED, parquet::Type::INT32,
-                         parquet::ConvertedType::INT_32);
-      streamer_.AddField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32,
-                         parquet::ConvertedType::INT_32);
-      streamer_.AddField("energy", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
-                         parquet::ConvertedType::NONE);
-      streamer_.AddField("radius", parquet::Repetition::REQUIRED, parquet::Type::FLOAT,
-                         parquet::ConvertedType::NONE);
+      AddField("event", parquet::Repetition::REQUIRED, parquet::Type::INT32,
+               parquet::ConvertedType::INT_32);
+      AddField("pdg", parquet::Repetition::REQUIRED, parquet::Type::INT32,
+               parquet::ConvertedType::INT_32);
+      AddField("energy", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE,
+               parquet::ConvertedType::NONE);
+      AddField("radius", parquet::Repetition::REQUIRED, parquet::Type::DOUBLE,
+               parquet::ConvertedType::NONE);
 
       // and build the streamer
-      streamer_.Build();
+      BuildStreamer();
+
     }
 
     /**
@@ -62,7 +64,7 @@ namespace corsika::output {
     /**
      * Called at the end of each run.
      */
-    void EndOfRun() final { streamer_.Close(); }
+    void EndOfRun() final { writer_.reset(); outfile_->Close(); }
 
     /**
      * Get final text outputs for the config file.
@@ -75,20 +77,17 @@ namespace corsika::output {
      */
     void Write(particles::Code const& pid, units::si::HEPEnergyType const& energy,
                units::si::LengthType const& distance) {
-
       using namespace units::si;
 
       // write the next row
-      writer_ << event_ << static_cast<int>(particles::GetPDG(pid)) << energy / 1_eV
-              << distance / 1_m << parquet::EndRow;
+      (*writer_) << event_ << static_cast<int>(particles::GetPDG(pid)) << energy / 1_eV
+                 << distance / 1_m << parquet::EndRow;
     }
 
     std::string const name_; ///< The name of this output.
 
   private:
-    int event_;                    ///< The current event number we are processing.
-    ParquetStreamer streamer_;     ///< A parquet stream writer helper
-    parquet::StreamWriter writer_; ///< The writer for this file.
+    int event_; ///< The current event number we are processing.
 
   }; // class ObservationPlaneWriterParquet
 
diff --git a/Outputs/OutputManager.h b/Outputs/OutputManager.h
index 58090fb8e..3630fcad6 100644
--- a/Outputs/OutputManager.h
+++ b/Outputs/OutputManager.h
@@ -21,6 +21,9 @@ namespace corsika::output {
    */
   class OutputManager final {
 
+    /**
+     * Indicates the current state of this manager.
+     */
     enum class OutputState {
       RunNoInit,
       RunInitialized,
@@ -31,7 +34,7 @@ namespace corsika::output {
     OutputState state_{OutputState::RunNoInit}; ///< The current state of this manager.
     std::string const name_;                    ///< The name of this simulation file.
     std::filesystem::path const root_; ///< The top-level directory for the output.
-    inline static auto logger{logging::GetLogger("output_manager")}; ///< A custom logger.
+    inline static auto logger{logging::GetLogger("output")}; ///< A custom logger.
 
     /**
      * The outputs that have been registered with us.
@@ -156,7 +159,7 @@ namespace corsika::output {
     /**
      * Called at the start of each run.
      */
-      void StartOfRun() {
+    void StartOfRun() {
       for (auto& [name, output] : outputs_) {
 
         // construct the path to this output subdirectory
@@ -173,7 +176,7 @@ namespace corsika::output {
     /**
      * Called at the start of each event/shower.
      */
-      void StartOfEvent() {
+    void StartOfEvent() {
 
       // if this is called but we are still in the initialized state,
       // make sure that we transition to RunInProgress
@@ -186,7 +189,7 @@ namespace corsika::output {
     /**
      * Called at the end of each event/shower.
      */
-      void EndOfEvent() {
+    void EndOfEvent() {
 
       for (auto& [name, output] : outputs_) { output.get().EndOfEvent(); }
     }
@@ -195,10 +198,15 @@ namespace corsika::output {
      * Called at the end of each run.
      */
     void EndOfRun() {
-      for (auto& [name, output] : outputs_) { output.get().EndOfRun(); }
+      for (auto& [name, output] : outputs_) {
+        output.get().EndOfRun();
+      }
 
       // and the run has finished
       state_ = OutputState::RunFinished;
+
+      // write any final state information into the config files
+      // for (auto& [name, output] : outputs_) { output.get().EndOfRun(); }
     }
 
   }; // class OutputManager
diff --git a/Outputs/ParquetStreamer.h b/Outputs/ParquetStreamer.h
index 4914efa81..d83525168 100644
--- a/Outputs/ParquetStreamer.h
+++ b/Outputs/ParquetStreamer.h
@@ -1,5 +1,5 @@
 /*
- * (c) Copyright 2019 CORSIKA Project, corsika-project@lists.kit.edu
+ * (c) Copyright 2020 CORSIKA Project, corsika-project@lists.kit.edu
  *
  * This software is distributed under the terms of the GNU General Public
  * Licence version 3 (GPL Version 3). See file LICENSE for a full version of
@@ -23,12 +23,15 @@ namespace corsika::output {
    * This class automates the construction of simple tabular
    * Parquet files using the parquet::StreamWriter.
    */
-  class ParquetStreamer final {
+  class ParquetStreamer {
 
   public:
-    ParquetStreamer() {}
+    ParquetStreamer() = default;
 
-    void Init(std::string const& filepath) {
+    /**
+     * Initialize the streamer to write to a given file.
+     */
+    void InitStreamer(std::string const& filepath) {
 
       // open the file and connect it to our pointer
       PARQUET_ASSIGN_OR_THROW(outfile_, arrow::io::FileOutputStream::Open(filepath));
@@ -42,40 +45,29 @@ namespace corsika::output {
      */
     template <typename... TArgs>
     void AddField(TArgs&&... args) {
-      nodes_.push_back(parquet::schema::PrimitiveNode::Make(args...));
+      fields_.push_back(parquet::schema::PrimitiveNode::Make(args...));
     }
 
     /**
      * Finalize the streamer construction.
      */
-    void Build() {
+    void BuildStreamer() {
 
       // build the top level schema
-      auto schema = std::static_pointer_cast<parquet::schema::GroupNode>(
+      schema_ = std::static_pointer_cast<parquet::schema::GroupNode>(
           parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED,
-                                           nodes_));
+                                           fields_));
 
       // and build the writer
-      writer_ = parquet::StreamWriter(
-          parquet::ParquetFileWriter::Open(outfile_, schema, builder_.build()));
+      writer_ = std::make_shared<parquet::StreamWriter>(
+          parquet::ParquetFileWriter::Open(outfile_, schema_, builder_.build()));
     }
 
-    /**
-     * Get a reference to the writer for this stream.
-     */
-    parquet::StreamWriter& GetWriter() { return writer_; }
-
-    /**
-     * Close the file.
-     */
-    void Close() { outfile_->Close(); }
-
-    ///
-  private:
-    parquet::StreamWriter writer_;
-    parquet::StreamWriter stream_;               ///< The stream writer to 'outfile'
+  protected:
+    std::shared_ptr<parquet::StreamWriter> writer_;               ///< The stream writer to 'outfile'
     parquet::WriterProperties::Builder builder_; ///< The writer properties builder.
-    parquet::schema::NodeVector nodes_;
+    parquet::schema::NodeVector fields_;         ///< The fields in this file.
+    std::shared_ptr<parquet::schema::GroupNode> schema_;   ///< The schema for this file.
     std::shared_ptr<arrow::io::FileOutputStream> outfile_; ///< The output file.
 
   }; // class ParquetHelper
diff --git a/Processes/ObservationPlane/ObservationPlane.h b/Processes/ObservationPlane/ObservationPlane.h
index 986375173..b2dff07ed 100644
--- a/Processes/ObservationPlane/ObservationPlane.h
+++ b/Processes/ObservationPlane/ObservationPlane.h
@@ -35,10 +35,10 @@ namespace corsika::process::observation_plane {
         : TOutputWriter(args...)
         , plane_(plane)
         , deleteOnHit_(deleteOnHit)
-        , energy_ground_(0_GeV)
+        , energy_ground_(units::si::HEPEnergyType::zero())
         , count_ground_(0) {}
 
-    process::EProcessReturn DoContinuous(setup::Stack::ParticleType const& particle,
+    process::EProcessReturn DoContinuous(setup::Stack::ParticleType& particle,
                                          setup::Trajectory const& trajectory) {
 
       using namespace units::si;
@@ -53,15 +53,23 @@ namespace corsika::process::observation_plane {
         return process::EProcessReturn::eOk;
       }
 
+      const auto energy = particle.GetEnergy();
+
       // write the data to the output
-      this->Write(particle.GetPID(), particle.GetEnergy(),
+      this->Write(particle.GetPID(), energy,
                   (trajectory.GetPosition(1) - plane_.GetCenter()).norm());
 
-      if (deleteOnHit_) { return process::EProcessReturn::eParticleAbsorbed; }
-      return process::EProcessReturn::eOk;
+      if (deleteOnHit_) {
+        count_ground_++;
+        energy_ground_ += energy;
+        particle.Delete();
+        return process::EProcessReturn::eParticleAbsorbed;
+      } else {
+        return process::EProcessReturn::eOk;
+      }
     }
 
-    units::si::LengthType MaxStepLength(setup::Stack::ParticleType const&,
+    units::si::LengthType MaxStepLength(setup::Stack::ParticleType& particle,
                                         setup::Trajectory const& trajectory) {
 
       using namespace units::si;
@@ -74,7 +82,9 @@ namespace corsika::process::observation_plane {
       }
 
       auto const pointOfIntersection = trajectory.GetPosition(timeOfIntersection);
-      return (trajectory.GetR0() - pointOfIntersection).norm() * 1.0001;
+      auto dist = (trajectory.GetR0() - pointOfIntersection).norm() * 1.0001;
+      C8LOG_TRACE("ObservationPlane::MaxStepLength l={} m", dist / 1_m);
+      return dist;
     }
 
     YAML::Node GetConfig() const {
@@ -105,20 +115,22 @@ namespace corsika::process::observation_plane {
       return node;
     }
 
-void ObservationPlane::ShowResults() const {
-  C8LOG_INFO(
-      " ******************************\n"
-      " ObservationPlane: \n"
-      " energy in ground (GeV)     :  {}\n"
-      " no. of particles in ground :  {}\n"
-      " ******************************",
-      energy_ground_ / 1_GeV, count_ground_);
-}
-
-void ObservationPlane::Reset() {
-  energy_ground_ = 0_GeV;
-  count_ground_ = 0;
-}
+    void ShowResults() const {
+      using namespace units::si;
+      C8LOG_INFO(
+          " ******************************\n"
+          " ObservationPlane: \n"
+          " energy in ground (GeV)     :  {}\n"
+          " no. of particles in ground :  {}\n"
+          " ******************************",
+          energy_ground_ / 1_GeV, count_ground_);
+    }
+
+    void Reset() {
+      using namespace units::si;
+      energy_ground_ = 0_GeV;
+      count_ground_ = 0;
+    }
     corsika::units::si::HEPEnergyType GetEnergyGround() const { return energy_ground_; }
 
   private:
-- 
GitLab