Improve buffer timestamping
[gst-aseq-appsrc.git] / gst-aseq-appsrc.c
index d15ed6e..e1b3418 100644 (file)
 #define DEFAULT_BUFSIZE 65536
 #define DEFAULT_CLIENT_NAME "gst-aseq-appsrc"
 #define DEFAULT_TICK_PERIOD_MS 10
-#define DEFAULT_POLL_TIMEOUT_MS (DEFAULT_TICK_PERIOD_MS / 2)
 
-GST_DEBUG_CATEGORY (mysrc_debug);
-#define GST_CAT_DEFAULT mysrc_debug
+GST_DEBUG_CATEGORY (mysource_debug);
+#define GST_CAT_DEFAULT mysource_debug
 
 typedef struct _App App;
 
@@ -51,6 +50,7 @@ struct _App
   GMainLoop *loop;
 
   snd_seq_t *seq;
+  int queue;
   int port_count;
   snd_seq_addr_t *seq_ports;
   snd_midi_event_t *parser;
@@ -60,9 +60,10 @@ struct _App
   int npfds;
 
   guint64 tick;
+  guint64 delay;
 };
 
-App s_app;
+static App s_app;
 
 static int
 init_seq (App * app)
@@ -75,6 +76,12 @@ init_seq (App * app)
     goto error;
   }
 
+  /*
+   * Prevent Valgrind from reporting cached configuration as memory leaks, see:
+   * http://git.alsa-project.org/?p=alsa-lib.git;a=blob;f=MEMORY-LEAK;hb=HEAD
+   */
+  snd_config_update_free_global();
+
   ret = snd_seq_set_client_name (app->seq, DEFAULT_CLIENT_NAME);
   if (ret < 0) {
     GST_ERROR ("Cannot set client name - %s", snd_strerror (ret));
@@ -137,14 +144,74 @@ out_free_ports_list:
 }
 
 static int
+start_queue_timer (snd_seq_t *seq, int queue)
+{
+  int ret;
+
+  ret = snd_seq_start_queue (seq, queue, NULL);
+  if (ret < 0) {
+    GST_ERROR ("Timer event output error: %s\n", snd_strerror (ret));
+    return ret;
+  }
+
+  ret = snd_seq_drain_output (seq);
+  if (ret < 0)
+    GST_ERROR ("Drain output error: %s\n", snd_strerror (ret));
+
+  return ret;
+}
+
+static void
+schedule_tick (App * app)
+{
+  snd_seq_event_t ev;
+  snd_seq_real_time_t time;
+  int ret;
+
+  snd_seq_ev_clear (&ev);
+  snd_seq_ev_set_source (&ev, 0);
+  snd_seq_ev_set_dest (&ev, snd_seq_client_id (app->seq), 0);
+
+  ev.type = SND_SEQ_EVENT_TICK;
+
+  GST_TIME_TO_TIMESPEC (app->tick * DEFAULT_TICK_PERIOD_MS * GST_MSECOND, time);
+  app->tick += 1;
+
+  snd_seq_ev_schedule_real (&ev, app->queue, 0, &time);
+
+  ret = snd_seq_event_output (app->seq, &ev);
+  if (ret < 0)
+    GST_ERROR ("Event output error: %s\n", snd_strerror (ret));
+
+  ret = snd_seq_drain_output (app->seq);
+  if (ret < 0)
+    GST_ERROR ("Event drain error: %s\n", snd_strerror (ret));
+}
+
+static int
 create_port (App * app)
 {
+  snd_seq_port_info_t *pinfo;
   int ret;
 
-  ret = snd_seq_create_simple_port (app->seq, DEFAULT_CLIENT_NAME,
-      SND_SEQ_PORT_CAP_WRITE |
-      SND_SEQ_PORT_CAP_SUBS_WRITE,
-      SND_SEQ_PORT_TYPE_MIDI_GENERIC | SND_SEQ_PORT_TYPE_APPLICATION);
+  snd_seq_port_info_alloca (&pinfo);
+  snd_seq_port_info_set_name (pinfo, DEFAULT_CLIENT_NAME);
+  snd_seq_port_info_set_type (pinfo, SND_SEQ_PORT_TYPE_MIDI_GENERIC | SND_SEQ_PORT_TYPE_APPLICATION);
+  snd_seq_port_info_set_capability (pinfo, SND_SEQ_PORT_CAP_WRITE | SND_SEQ_PORT_CAP_SUBS_WRITE);
+
+  ret = snd_seq_alloc_queue (app->seq);
+  if (ret < 0) {
+    GST_ERROR ("Cannot allocate queue: %s\n", snd_strerror (ret));
+    return ret;
+  }
+
+  app->queue = ret;
+
+  snd_seq_port_info_set_timestamping (pinfo, 1);
+  snd_seq_port_info_set_timestamp_real (pinfo, 1);
+  snd_seq_port_info_set_timestamp_queue (pinfo, app->queue);
+
+  ret = snd_seq_create_port (app->seq, pinfo);
   if (ret < 0)
     GST_ERROR ("Cannot create port - %s", snd_strerror (ret));
 
@@ -169,43 +236,36 @@ connect_ports (App * app)
 }
 
 static void
-push_buffer (App * app, gpointer data, guint size)
+push_buffer (App * app, gpointer data, guint size, GstClockTime time)
 {
+  gpointer local_data;
   GstBuffer *buffer;
-  GstClockTime time;
   int ret;
-  gpointer local_data;
 
-  /* read the next chunk */
   buffer = gst_buffer_new ();
 
-  time = app->tick * DEFAULT_TICK_PERIOD_MS * GST_MSECOND;
-
   GST_BUFFER_DTS (buffer) = time;
   GST_BUFFER_PTS (buffer) = time;
-  GST_BUFFER_OFFSET (buffer) = time;
-  GST_BUFFER_DURATION (buffer) = DEFAULT_TICK_PERIOD_MS * GST_MSECOND;
 
-  local_data = g_malloc (size);
-  memcpy (local_data, data, size);
+  local_data = g_memdup (data, size);
 
   gst_buffer_append_memory (buffer,
       gst_memory_new_wrapped (0, local_data, size, 0, size, local_data,
           g_free));
 
+  GST_MEMDUMP ("MIDI data:", local_data, size);
+
   GST_DEBUG ("feed buffer %p, tick %" G_GUINT64_FORMAT " size: %u",
       (gpointer) buffer, app->tick, size);
   g_signal_emit_by_name (app->appsrc, "push-buffer", buffer, &ret);
   gst_buffer_unref (buffer);
-
-  app->tick += 1;
 }
 
 static void
-push_tick_buffer (App * app)
+push_tick_buffer (App * app, GstClockTime time)
 {
   app->buffer[0] = 0xf9;
-  push_buffer (app, app->buffer, 1);
+  push_buffer (app, app->buffer, 1, time);
 }
 
 /* This method is called by the need-data signal callback, we feed data into the
@@ -214,23 +274,39 @@ push_tick_buffer (App * app)
 static void
 feed_data (GstElement * appsrc, guint size, App * app)
 {
-  long size_ev = 0;
+  GstClockTime time;
+  long size_ev;
   int err;
   int ret;
 
+poll:
   snd_seq_poll_descriptors (app->seq, app->pfds, app->npfds, POLLIN);
-  ret = poll (app->pfds, app->npfds, DEFAULT_POLL_TIMEOUT_MS);
-  if (ret < 0) {
+  ret = poll (app->pfds, app->npfds, -1);
+  if (ret <= 0) {
     GST_ERROR ("ERROR in poll: %s", strerror (errno));
-  } else if (ret == 0) {
-    push_tick_buffer (app);
+    gst_app_src_end_of_stream (GST_APP_SRC (appsrc));
   } else {
     do {
       snd_seq_event_t *event;
       err = snd_seq_event_input (app->seq, &event);
-      if (err < 0)
+      if (err < 0 && err != -EAGAIN) {
+        GST_ERROR ("Error in snd_seq_event_input: %s", snd_strerror (err));
+        gst_app_src_end_of_stream (GST_APP_SRC (appsrc));
         break;
+      }
       if (event) {
+        time = GST_TIMESPEC_TO_TIME (event->time.time) + app->delay;
+
+        /*
+         * Special handling is needed because decoding SND_SEQ_EVENT_TICK is
+         * not supported by the ALSA sequencer.
+         */
+        if (event->type == SND_SEQ_EVENT_TICK) {
+          push_tick_buffer (app, time);
+          schedule_tick (app);
+          break;
+        }
+
         size_ev =
             snd_midi_event_decode (app->parser, app->buffer, DEFAULT_BUFSIZE,
             event);
@@ -238,14 +314,15 @@ feed_data (GstElement * appsrc, guint size, App * app)
           /* ENOENT indicates an event that is not a MIDI message, silently skip it */
           if (-ENOENT == size_ev) {
             GST_WARNING ("Warning: Received non-MIDI message");
-            push_tick_buffer (app);
+            goto poll;
           } else {
             GST_ERROR ("Error decoding event from ALSA to output: %s",
                 strerror (-size_ev));
+            gst_app_src_end_of_stream (GST_APP_SRC (appsrc));
             break;
           }
         } else {
-          push_buffer (app, app->buffer, size_ev);
+          push_buffer (app, app->buffer, size_ev, time);
         }
       }
     } while (err > 0);
@@ -291,6 +368,39 @@ bus_message (GstBus * bus, GstMessage * message, App * app)
       g_main_loop_quit (app->loop);
       break;
     }
+    case GST_MESSAGE_STATE_CHANGED:
+    {
+      GstState old_state, new_state;
+
+      gst_message_parse_state_changed (message, &old_state, &new_state, NULL);
+      if (new_state == GST_STATE_PLAYING) {
+        GstClockTime gst_time;
+        GstClockTime base_time;
+        GstClockTime running_time;
+        GstClockTime queue_time;
+        GstClock *clock;
+        snd_seq_queue_status_t *status;
+
+        if (app->tick == 0) {
+          start_queue_timer (app->seq, app->queue);
+          schedule_tick (app);
+        }
+
+        clock = gst_element_get_clock (GST_ELEMENT (app->appsrc));
+        gst_time = gst_clock_get_time (clock);
+        gst_object_unref (clock);
+        base_time = gst_element_get_base_time (GST_ELEMENT (app->appsrc));
+        running_time = gst_time - base_time;
+
+        snd_seq_queue_status_malloc (&status);
+        snd_seq_get_queue_status (app->seq, 0, status);
+        queue_time = GST_TIMESPEC_TO_TIME (*snd_seq_queue_status_get_real_time (status));
+        snd_seq_queue_status_free (status);
+
+        app->delay = running_time - queue_time;
+      }
+      break;
+    }
     case GST_MESSAGE_EOS:
       g_main_loop_quit (app->loop);
       break;
@@ -393,9 +503,12 @@ main (int argc, char *argv[])
   GOptionContext *ctx;
   GError *err = NULL;
   gchar *ports = NULL;
+  gboolean verbose = FALSE;
   GOptionEntry options[] = {
     {"ports", 'p', 0, G_OPTION_ARG_STRING, &ports,
         "Comma separated list of sequencer ports", "client:port,..."},
+    {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose,
+         "Output status information and property notifications", NULL},
     {NULL}
   };
 
@@ -413,7 +526,7 @@ main (int argc, char *argv[])
 
   gst_init (&argc, &argv);
 
-  GST_DEBUG_CATEGORY_INIT (mysrc_debug, "mysrc", 0,
+  GST_DEBUG_CATEGORY_INIT (mysource_debug, "mysource", 0,
       "ALSA MIDI sequencer appsrc pipeline");
 
   ret = app_init (app, ports);
@@ -434,6 +547,9 @@ main (int argc, char *argv[])
       ("appsrc name=mysource ! fluiddec ! audioconvert ! autoaudiosink", NULL);
   g_assert (app->pipeline);
 
+  if (verbose)
+    g_signal_connect (app->pipeline, "deep-notify", G_CALLBACK (gst_object_default_deep_notify), NULL);
+
   bus = gst_pipeline_get_bus (GST_PIPELINE (app->pipeline));
   g_assert (bus);