From e61571468cad844ff26cfa686f4bd78b578cc9dc Mon Sep 17 00:00:00 2001
From: Glenn Bradford <glenn.bradford@unimelb.edu.au>
Date: Sun, 6 Mar 2022 16:23:10 +1100
Subject: [PATCH] added command port and receive stream tagging

---
 grc/gen_bladerf_blocks.py       |   3 +
 lib/bladerf/bladerf_common.cc   |  13 ++++
 lib/bladerf/bladerf_common.h    |   3 +
 lib/bladerf/bladerf_sink_c.cc   |  63 ++++++++++++++++++
 lib/bladerf/bladerf_sink_c.h    |   5 ++
 lib/bladerf/bladerf_source_c.cc | 114 ++++++++++++++++++++++++++++++--
 lib/bladerf/bladerf_source_c.h  |   8 +++
 lib/sink_impl.cc                |   2 +
 lib/source_impl.cc              |   2 +
 9 files changed, 208 insertions(+), 5 deletions(-)

diff --git a/grc/gen_bladerf_blocks.py b/grc/gen_bladerf_blocks.py
index 10711b7..e341930 100644
--- a/grc/gen_bladerf_blocks.py
+++ b/grc/gen_bladerf_blocks.py
@@ -173,6 +173,9 @@ parameters:
 ${params}
 
 inputs:
+- domain: message
+  id: command
+  optional: true
 - domain: message
   id: pmic_in
   optional: true
diff --git a/lib/bladerf/bladerf_common.cc b/lib/bladerf/bladerf_common.cc
index ab38a6f..c3df5bb 100644
--- a/lib/bladerf/bladerf_common.cc
+++ b/lib/bladerf/bladerf_common.cc
@@ -1521,3 +1521,16 @@ bool bladerf_common::is_antenna_valid(bladerf_direction dir,
 
   return false;
 }
+
+uint64_t bladerf_common::get_timestamp(bladerf_direction dir)
+{
+  int status;
+  bladerf_timestamp ts;
+
+  status = bladerf_get_timestamp(_dev.get(), dir, &ts);
+  if (status != 0) {
+    BLADERF_THROW_STATUS(status, "Failed to read current timestamp");
+  }
+
+  return static_cast<uint64_t>(ts);
+}
diff --git a/lib/bladerf/bladerf_common.h b/lib/bladerf/bladerf_common.h
index 0ea6a01..f2d8749 100644
--- a/lib/bladerf/bladerf_common.h
+++ b/lib/bladerf/bladerf_common.h
@@ -275,6 +275,9 @@ protected:
 
   void set_lpf_mode(bladerf_channel ch,  bladerf_lpf_mode mode);
 
+  /* Get current timestamp */
+  uint64_t get_timestamp(bladerf_direction dir);
+
 
   /*****************************************************************************
    * Protected members
diff --git a/lib/bladerf/bladerf_sink_c.cc b/lib/bladerf/bladerf_sink_c.cc
index d7b5634..415d6c8 100644
--- a/lib/bladerf/bladerf_sink_c.cc
+++ b/lib/bladerf/bladerf_sink_c.cc
@@ -44,6 +44,13 @@
 
 using namespace boost::assign;
 
+const pmt::pmt_t CMD_TIME_KEY = pmt::mp("time");
+const pmt::pmt_t CMD_CHAN_KEY = pmt::mp("chan");
+const pmt::pmt_t CMD_GAIN_KEY = pmt::mp("gain");
+const pmt::pmt_t CMD_FREQ_KEY = pmt::mp("freq");
+const pmt::pmt_t CMD_RATE_KEY = pmt::mp("rate");
+const pmt::pmt_t CMD_BW_KEY   = pmt::mp("bandwidth");
+
 /******************************************************************************
  * Functions
  ******************************************************************************/
@@ -74,6 +81,12 @@ bladerf_sink_c::bladerf_sink_c(const std::string &args) :
   _running(false)
 {
   setup_blade_messaging();
+  
+  /* add command port */
+  message_port_register_in(pmt::mp("command"));
+  set_msg_handler(pmt::mp("command"),
+    [this](const pmt::pmt_t& msg){ handle_command(msg); });
+
   dict_t dict = params_to_dict(args);
 
   /* Perform src/sink agnostic initializations */
@@ -603,3 +616,53 @@ void bladerf_sink_c::set_biastee_mode(const std::string &mode)
     BLADERF_THROW_STATUS(status, "Failed to set bias-tee");
   }
 }
+
+uint64_t bladerf_sink_c::get_timestamp(void)
+{
+  return bladerf_common::get_timestamp(BLADERF_TX);
+}
+
+void bladerf_sink_c::handle_command(const pmt::pmt_t& msg)
+{
+  // commands must be in PMT dictionary
+  if (!pmt::is_dict(msg)) {
+    BLADERF_WARNING("Command messages must be PMT dictionary.");
+    return;
+  }
+  
+  // schedule timed commands
+  if (pmt::dict_has_key(msg, CMD_TIME_KEY)) {
+    BLADERF_WARNING("No support for timestamp synced commands.");
+    // TODO - push command on queue
+  }
+  
+  // determine channel
+  auto chan = pmt::to_long(pmt::dict_ref(msg,
+                                         CMD_CHAN_KEY,
+                                         pmt::from_long(0)));
+  
+  // execute commands
+  pmt::pmt_t cmds = pmt::dict_items(msg);
+  for (size_t i = 0; i < pmt::length(cmds); i++) {
+    pmt::pmt_t key = pmt::car(pmt::nth(i, cmds));
+    pmt::pmt_t val = pmt::cdr(pmt::nth(i, cmds));
+    
+    if (key == CMD_GAIN_KEY) {
+      double gain = pmt::to_double(val);
+      set_gain(gain, chan);
+    } else if (key == CMD_FREQ_KEY) {
+      double freq = pmt::to_double(val);
+      set_center_freq(freq, chan);
+    } else if (key == CMD_RATE_KEY) {
+      double rate = pmt::to_double(val);
+      set_sample_rate(rate);
+    } else if (key == CMD_BW_KEY) {
+      double bw = pmt::to_double(val);
+      set_bandwidth(bw, chan);
+    } else if (key == CMD_TIME_KEY || key == CMD_CHAN_KEY) {
+      // skip time and channel keys
+    } else {
+      BLADERF_WARNING("Unknown command (" << key << ")");
+    }
+  }
+}
diff --git a/lib/bladerf/bladerf_sink_c.h b/lib/bladerf/bladerf_sink_c.h
index ad0765c..6d858e4 100644
--- a/lib/bladerf/bladerf_sink_c.h
+++ b/lib/bladerf/bladerf_sink_c.h
@@ -116,6 +116,8 @@ public:
 
   void set_biastee_mode(const std::string &mode);
 
+  uint64_t get_timestamp(void);
+
 private:
   int transmit_with_tags(int16_t const *samples, int noutput_items);
 
@@ -131,6 +133,9 @@ private:
 
   /* Scaling factor used when converting from float to int16_t */
   const float SCALING_FACTOR = 2048.0f;
+
+  /* command port */
+  void handle_command(const pmt::pmt_t& msg);
 };
 
 #endif // INCLUDED_BLADERF_SINK_C_H
diff --git a/lib/bladerf/bladerf_source_c.cc b/lib/bladerf/bladerf_source_c.cc
index d936916..c585327 100644
--- a/lib/bladerf/bladerf_source_c.cc
+++ b/lib/bladerf/bladerf_source_c.cc
@@ -44,6 +44,18 @@
 
 using namespace boost::assign;
 
+const pmt::pmt_t CMD_TIME_KEY = pmt::mp("time");
+const pmt::pmt_t CMD_CHAN_KEY = pmt::mp("chan");
+const pmt::pmt_t CMD_GAIN_KEY = pmt::mp("gain");
+const pmt::pmt_t CMD_FREQ_KEY = pmt::mp("freq");
+const pmt::pmt_t CMD_RATE_KEY = pmt::mp("rate");
+const pmt::pmt_t CMD_BW_KEY   = pmt::mp("bandwidth");
+const pmt::pmt_t CMD_TAG_KEY  = pmt::mp("tag");
+
+const pmt::pmt_t TAG_RX_TIME  = pmt::mp("rx_time");
+const pmt::pmt_t TAG_RX_RATE  = pmt::mp("rx_rate");
+const pmt::pmt_t TAG_RX_FREQ  = pmt::mp("rx_freq");
+
 /******************************************************************************
  * Functions
  ******************************************************************************/
@@ -71,7 +83,8 @@ bladerf_source_c::bladerf_source_c(const std::string &args) :
   _16icbuf(NULL),
   _32fcbuf(NULL),
   _running(false),
-  _agcmode(BLADERF_GAIN_DEFAULT)
+  _agcmode(BLADERF_GAIN_DEFAULT),
+  _add_tag(false)
 {
   dict_t dict = params_to_dict(args);
   /* Perform src/sink agnostic initializations */
@@ -79,6 +92,11 @@ bladerf_source_c::bladerf_source_c(const std::string &args) :
 
   setup_blade_messaging();
 
+  /* add command port */
+  message_port_register_in(pmt::mp("command"));
+  set_msg_handler(pmt::mp("command"),
+    [this](const pmt::pmt_t& msg) { handle_command(msg); });
+
   /* Loopback */
   set_loopback_mode(dict.count("loopback") ? dict["loopback"] : "none");
 
@@ -203,6 +221,7 @@ bool bladerf_source_c::start()
   _32fcbuf = reinterpret_cast<gr_complex *>(volk_malloc(_samples_per_buffer*sizeof(gr_complex), alignment));
 
   _running = true;
+  _add_tag = true;
   fire_trigger();
   return true;
 }
@@ -281,10 +300,18 @@ int bladerf_source_c::work(int noutput_items,
     _failures = 0;
   }
 
+  // check metadata status for overrun
+  bool overrun = false;
+  int actual_count = noutput_items;
+  if (meta_ptr != NULL && (meta.status & BLADERF_META_STATUS_OVERRUN)) {
+    overrun = true;
+    actual_count = meta.actual_count;
+  }
+
   // convert from int16_t to float
   // output_items is gr_complex (2x float), so num_points is 2*noutput_items
   volk_16i_s32f_convert_32f(reinterpret_cast<float *>(_32fcbuf), _16icbuf,
-                            SCALING_FACTOR, 2*noutput_items);
+                            SCALING_FACTOR, 2*actual_count);
 
   // copy the samples into output_items
   gr_complex **out = reinterpret_cast<gr_complex **>(&output_items[0]);
@@ -293,17 +320,40 @@ int bladerf_source_c::work(int noutput_items,
     // we need to deinterleave the multiplex as we copy
     gr_complex const *deint_in = _32fcbuf;
 
-    for (size_t i = 0; i < (noutput_items/nstreams); ++i) {
+    for (size_t i = 0; i < (actual_count/nstreams); ++i) {
       for (size_t n = 0; n < nstreams; ++n) {
         memcpy(out[n]++, deint_in++, sizeof(gr_complex));
       }
     }
   } else {
     // no deinterleaving to do: simply copy everything
-    memcpy(out[0], _32fcbuf, sizeof(gr_complex) * noutput_items);
+    memcpy(out[0], _32fcbuf, sizeof(gr_complex) * actual_count);
+  }
+
+  // tag output stream on start, overrun, and rate/freq changes
+  if (meta_ptr != NULL && _add_tag) {
+    double rate = get_sample_rate();
+    for (size_t n = 0; n < nstreams; n++) {
+      double freq = get_center_freq(n);
+      add_item_tag(n, nitems_written(n),
+                   TAG_RX_TIME,
+                   pmt::from_uint64(meta.timestamp));
+      add_item_tag(n, nitems_written(n),
+                   TAG_RX_RATE,
+                   pmt::from_double(rate));
+      add_item_tag(n, nitems_written(n),
+                   TAG_RX_FREQ,
+                   pmt::from_double(freq));
+    }
+
+    _add_tag = false;
+  }
+
+  if (overrun) {
+    _add_tag = true;
   }
 
-  return noutput_items;
+  return actual_count;
 }
 
 osmosdr::meta_range_t bladerf_source_c::get_sample_rates()
@@ -313,6 +363,7 @@ osmosdr::meta_range_t bladerf_source_c::get_sample_rates()
 
 double bladerf_source_c::set_sample_rate(double rate)
 {
+  _add_tag = true;
   return bladerf_common::set_sample_rate(rate, chan2channel(BLADERF_RX, 0));
 }
 
@@ -328,6 +379,7 @@ osmosdr::freq_range_t bladerf_source_c::get_freq_range(size_t chan)
 
 double bladerf_source_c::set_center_freq(double freq, size_t chan)
 {
+  _add_tag = true;
   return bladerf_common::set_center_freq(freq, chan2channel(BLADERF_RX, chan));
 }
 
@@ -637,3 +689,55 @@ void bladerf_source_c::set_agc_mode(const std::string &agcmode)
   }
 #endif
 }
+
+uint64_t bladerf_source_c::get_timestamp(void)
+{
+  return bladerf_common::get_timestamp(BLADERF_RX);
+}
+
+void bladerf_source_c::handle_command(const pmt::pmt_t& msg)
+{
+  // commands must be in PMT dictionary
+  if (!pmt::is_dict(msg)) {
+    BLADERF_WARNING("Command messages must be PMT dictionary.");
+    return;
+  }
+
+  // schedule timed commands
+  if (pmt::dict_has_key(msg, CMD_TIME_KEY)) {
+    BLADERF_WARNING("No support for timestamp synced commands.");
+    // TODO push on command queue
+  }
+
+  // determine channel
+  int chan = pmt::to_long(pmt::dict_ref(msg,
+                                        CMD_CHAN_KEY,
+                                        pmt::from_long(0)));
+
+  // execute commands
+  pmt::pmt_t cmds = pmt::dict_items(msg);
+  for (size_t i = 0; i < pmt::length(cmds); i++) {
+    auto key = pmt::car(pmt::nth(i, cmds));
+    auto val = pmt::cdr(pmt::nth(i, cmds));
+
+    if (key == CMD_GAIN_KEY) {
+      double gain = pmt::to_double(val);
+      set_gain(gain, chan);
+    } else if (key == CMD_FREQ_KEY) {
+      double freq = pmt::to_double(val);
+      set_center_freq(freq, chan);
+    } else if (key == CMD_RATE_KEY) {
+      double rate = pmt::to_double(val);
+      set_sample_rate(rate);
+    } else if (key == CMD_BW_KEY) {
+      double bw = pmt::to_double(val);
+      set_bandwidth(bw, chan);
+    } else if (key == CMD_TAG_KEY) {
+      _add_tag = true;
+    } else if (key == CMD_TIME_KEY || key == CMD_CHAN_KEY) {
+      // skip time and channel keys
+    } else {
+      BLADERF_WARNING("Unknown command (" << key << ")");
+    }
+  }
+}
diff --git a/lib/bladerf/bladerf_source_c.h b/lib/bladerf/bladerf_source_c.h
index ae4bf62..40a42e0 100644
--- a/lib/bladerf/bladerf_source_c.h
+++ b/lib/bladerf/bladerf_source_c.h
@@ -122,6 +122,8 @@ public:
   void set_rx_mux_mode(const std::string &rxmux);
   void set_agc_mode(const std::string &agcmode);
 
+  uint64_t get_timestamp(void);
+
 private:
   // Sample-handling buffers
   int16_t *_16icbuf;              /**< raw samples from bladeRF */
@@ -135,6 +137,12 @@ private:
 
   /* Scaling factor used when converting from int16_t to float */
   const float SCALING_FACTOR = 2048.0f;
+
+  /* Rx stream tags */
+  bool _add_tag;
+
+  /* command port */
+  void handle_command(const pmt::pmt_t& cmd);
 };
 
 #endif // INCLUDED_BLADERF_SOURCE_C_H
diff --git a/lib/sink_impl.cc b/lib/sink_impl.cc
index b337a99..17e2971 100644
--- a/lib/sink_impl.cc
+++ b/lib/sink_impl.cc
@@ -27,6 +27,7 @@ namespace gr {
       , sample_rate_(0)
     {
         message_port_register_hier_in(pmt::mp("pmic_in"));
+        message_port_register_hier_in(pmt::mp("command"));
         message_port_register_hier_out(pmt::mp("pmic_out"));
 
         auto dev_list = bladerf_sink_c::get_devices();
@@ -41,6 +42,7 @@ namespace gr {
         }
 
         msg_connect(self(), pmt::mp("pmic_in"), device_, pmt::mp("pmic_in"));
+        msg_connect(self(), pmt::mp("command"), device_, pmt::mp("command"));
         msg_connect(device_, pmt::mp("pmic_out"), self(), pmt::mp("pmic_out"));
     }
 
diff --git a/lib/source_impl.cc b/lib/source_impl.cc
index 2ea2ed2..17cfdab 100644
--- a/lib/source_impl.cc
+++ b/lib/source_impl.cc
@@ -25,6 +25,7 @@ namespace gr {
               args_to_io_signature(args))
     {
         message_port_register_hier_in(pmt::mp("pmic_in"));
+        message_port_register_hier_in(pmt::mp("command"));
         message_port_register_hier_out(pmt::mp("pmic_out"));
 
         auto dev_list = bladerf_source_c::get_devices();
@@ -52,6 +53,7 @@ namespace gr {
 
         }
         msg_connect(self(), pmt::mp("pmic_in"), device_, pmt::mp("pmic_in"));
+        msg_connect(self(), pmt::mp("command"), device_, pmt::mp("command"));
         msg_connect(device_, pmt::mp("pmic_out"), self(), pmt::mp("pmic_out"));
      }
 
-- 
GitLab