Improve buffer timestamping
[gst-aseq-appsrc.git] / gst-aseq-appsrc.c
index 7d4a885..e1b3418 100644 (file)
 #define DEFAULT_BUFSIZE 65536
 #define DEFAULT_CLIENT_NAME "gst-aseq-appsrc"
 #define DEFAULT_TICK_PERIOD_MS 10
-#define DEFAULT_POLL_TIMEOUT_MS (DEFAULT_TICK_PERIOD_MS / 2)
 
-GST_DEBUG_CATEGORY (mysrc_debug);
-#define GST_CAT_DEFAULT mysrc_debug
+GST_DEBUG_CATEGORY (mysource_debug);
+#define GST_CAT_DEFAULT mysource_debug
 
 typedef struct _App App;
 
@@ -51,8 +50,9 @@ struct _App
   GMainLoop *loop;
 
   snd_seq_t *seq;
+  int queue;
   int port_count;
-  snd_seq_addr_t *ports;
+  snd_seq_addr_t *seq_ports;
   snd_midi_event_t *parser;
   unsigned char *buffer;
 
@@ -60,9 +60,10 @@ struct _App
   int npfds;
 
   guint64 tick;
+  guint64 delay;
 };
 
-App s_app;
+static App s_app;
 
 static int
 init_seq (App * app)
@@ -75,6 +76,12 @@ init_seq (App * app)
     goto error;
   }
 
+  /*
+   * Prevent Valgrind from reporting cached configuration as memory leaks, see:
+   * http://git.alsa-project.org/?p=alsa-lib.git;a=blob;f=MEMORY-LEAK;hb=HEAD
+   */
+  snd_config_update_free_global();
+
   ret = snd_seq_set_client_name (app->seq, DEFAULT_CLIENT_NAME);
   if (ret < 0) {
     GST_ERROR ("Cannot set client name - %s", snd_strerror (ret));
@@ -89,67 +96,122 @@ error:
   return ret;
 }
 
-/* parses one or more port addresses from the string */
+/* Parses one or more port addresses from the string */
 static int
 parse_ports (const char *arg, App * app)
 {
-  char *buf, *s, *port_name;
-  int ret;
+  gchar **ports_list;
+  guint i;
+  int ret = 0;
 
   GST_DEBUG ("ports: %s", arg);
 
-  /* make a copy of the string because we're going to modify it */
-  buf = strdup (arg);
-  if (!buf) {
+  /*
+   * Assume that ports are separated by commas.
+   *
+   * Commas are used instead of spaces because those are valid in client
+   * names.
+   */
+  ports_list = g_strsplit (arg, ",", 0);
+
+  app->port_count = g_strv_length (ports_list);
+  app->seq_ports = g_try_new (snd_seq_addr_t, app->port_count);
+  if (!app->seq_ports) {
     GST_ERROR ("Out of memory");
     ret = -ENOMEM;
-    goto out;
+    goto out_free_ports_list;
   }
 
-  for (port_name = s = buf; s; port_name = s + 1) {
-    /* Assume that ports are separated by commas.  We don't use
-     * spaces because those are valid in client names. */
-    s = strchr (port_name, ',');
-    if (s)
-      *s = '\0';
-
-    app->port_count++;
-    app->ports =
-        realloc (app->ports, app->port_count * sizeof (snd_seq_addr_t));
-    if (!app->ports) {
-      GST_ERROR ("Out of memory");
-      ret = -ENOMEM;
-      goto out_free_buf;
-    }
+  for (i = 0; i < (guint)app->port_count; i++) {
+    gchar *port_name = ports_list[i];
 
-    ret =
-        snd_seq_parse_address (app->seq, &app->ports[app->port_count - 1],
+    ret = snd_seq_parse_address (app->seq, &app->seq_ports[i],
         port_name);
     if (ret < 0) {
-      GST_ERROR ("Invalid port %s - %s", port_name, snd_strerror (ret));
-      goto error_free_ports;
+      GST_ERROR ("Invalid port %s - %s", port_name,
+          snd_strerror (ret));
+      goto error_free_seq_ports;
     }
   }
 
-  goto out_free_buf;
+  goto out_free_ports_list;
 
-error_free_ports:
-  free (app->ports);
-out_free_buf:
-  free (buf);
-out:
+error_free_seq_ports:
+  g_free (app->seq_ports);
+out_free_ports_list:
+  g_strfreev (ports_list);
   return ret;
 }
 
 static int
+start_queue_timer (snd_seq_t *seq, int queue)
+{
+  int ret;
+
+  ret = snd_seq_start_queue (seq, queue, NULL);
+  if (ret < 0) {
+    GST_ERROR ("Timer event output error: %s\n", snd_strerror (ret));
+    return ret;
+  }
+
+  ret = snd_seq_drain_output (seq);
+  if (ret < 0)
+    GST_ERROR ("Drain output error: %s\n", snd_strerror (ret));
+
+  return ret;
+}
+
+static void
+schedule_tick (App * app)
+{
+  snd_seq_event_t ev;
+  snd_seq_real_time_t time;
+  int ret;
+
+  snd_seq_ev_clear (&ev);
+  snd_seq_ev_set_source (&ev, 0);
+  snd_seq_ev_set_dest (&ev, snd_seq_client_id (app->seq), 0);
+
+  ev.type = SND_SEQ_EVENT_TICK;
+
+  GST_TIME_TO_TIMESPEC (app->tick * DEFAULT_TICK_PERIOD_MS * GST_MSECOND, time);
+  app->tick += 1;
+
+  snd_seq_ev_schedule_real (&ev, app->queue, 0, &time);
+
+  ret = snd_seq_event_output (app->seq, &ev);
+  if (ret < 0)
+    GST_ERROR ("Event output error: %s\n", snd_strerror (ret));
+
+  ret = snd_seq_drain_output (app->seq);
+  if (ret < 0)
+    GST_ERROR ("Event drain error: %s\n", snd_strerror (ret));
+}
+
+static int
 create_port (App * app)
 {
+  snd_seq_port_info_t *pinfo;
   int ret;
 
-  ret = snd_seq_create_simple_port (app->seq, DEFAULT_CLIENT_NAME,
-      SND_SEQ_PORT_CAP_WRITE |
-      SND_SEQ_PORT_CAP_SUBS_WRITE,
-      SND_SEQ_PORT_TYPE_MIDI_GENERIC | SND_SEQ_PORT_TYPE_APPLICATION);
+  snd_seq_port_info_alloca (&pinfo);
+  snd_seq_port_info_set_name (pinfo, DEFAULT_CLIENT_NAME);
+  snd_seq_port_info_set_type (pinfo, SND_SEQ_PORT_TYPE_MIDI_GENERIC | SND_SEQ_PORT_TYPE_APPLICATION);
+  snd_seq_port_info_set_capability (pinfo, SND_SEQ_PORT_CAP_WRITE | SND_SEQ_PORT_CAP_SUBS_WRITE);
+
+  ret = snd_seq_alloc_queue (app->seq);
+  if (ret < 0) {
+    GST_ERROR ("Cannot allocate queue: %s\n", snd_strerror (ret));
+    return ret;
+  }
+
+  app->queue = ret;
+
+  snd_seq_port_info_set_timestamping (pinfo, 1);
+  snd_seq_port_info_set_timestamp_real (pinfo, 1);
+  snd_seq_port_info_set_timestamp_queue (pinfo, app->queue);
+
+  ret = snd_seq_create_port (app->seq, pinfo);
   if (ret < 0)
     GST_ERROR ("Cannot create port - %s", snd_strerror (ret));
 
@@ -164,53 +226,46 @@ connect_ports (App * app)
 
   for (i = 0; i < app->port_count; ++i) {
     ret =
-        snd_seq_connect_from (app->seq, 0, app->ports[i].client,
-        app->ports[i].port);
+        snd_seq_connect_from (app->seq, 0, app->seq_ports[i].client,
+        app->seq_ports[i].port);
     if (ret < 0)
       /* warning */
       GST_WARNING ("Cannot connect from port %d:%d - %s",
-          app->ports[i].client, app->ports[i].port, snd_strerror (ret));
+          app->seq_ports[i].client, app->seq_ports[i].port, snd_strerror (ret));
   }
 }
 
 static void
-push_buffer (App * app, gpointer data, guint size)
+push_buffer (App * app, gpointer data, guint size, GstClockTime time)
 {
+  gpointer local_data;
   GstBuffer *buffer;
-  GstClockTime time;
   int ret;
-  gpointer local_data;
 
-  /* read the next chunk */
   buffer = gst_buffer_new ();
 
-  time = app->tick * DEFAULT_TICK_PERIOD_MS * GST_MSECOND;
-
   GST_BUFFER_DTS (buffer) = time;
   GST_BUFFER_PTS (buffer) = time;
-  GST_BUFFER_OFFSET (buffer) = time;
-  GST_BUFFER_DURATION (buffer) = DEFAULT_TICK_PERIOD_MS * GST_MSECOND;
 
-  local_data = g_malloc (size);
-  memcpy (local_data, data, size);
+  local_data = g_memdup (data, size);
 
   gst_buffer_append_memory (buffer,
       gst_memory_new_wrapped (0, local_data, size, 0, size, local_data,
           g_free));
 
+  GST_MEMDUMP ("MIDI data:", local_data, size);
+
   GST_DEBUG ("feed buffer %p, tick %" G_GUINT64_FORMAT " size: %u",
       (gpointer) buffer, app->tick, size);
   g_signal_emit_by_name (app->appsrc, "push-buffer", buffer, &ret);
   gst_buffer_unref (buffer);
-
-  app->tick += 1;
 }
 
 static void
-push_tick_buffer (App * app)
+push_tick_buffer (App * app, GstClockTime time)
 {
   app->buffer[0] = 0xf9;
-  push_buffer (app, app->buffer, 1);
+  push_buffer (app, app->buffer, 1, time);
 }
 
 /* This method is called by the need-data signal callback, we feed data into the
@@ -219,23 +274,39 @@ push_tick_buffer (App * app)
 static void
 feed_data (GstElement * appsrc, guint size, App * app)
 {
-  long size_ev = 0;
+  GstClockTime time;
+  long size_ev;
   int err;
   int ret;
 
+poll:
   snd_seq_poll_descriptors (app->seq, app->pfds, app->npfds, POLLIN);
-  ret = poll (app->pfds, app->npfds, DEFAULT_POLL_TIMEOUT_MS);
-  if (ret < 0) {
+  ret = poll (app->pfds, app->npfds, -1);
+  if (ret <= 0) {
     GST_ERROR ("ERROR in poll: %s", strerror (errno));
-  } else if (ret == 0) {
-    push_tick_buffer (app);
+    gst_app_src_end_of_stream (GST_APP_SRC (appsrc));
   } else {
     do {
       snd_seq_event_t *event;
       err = snd_seq_event_input (app->seq, &event);
-      if (err < 0)
+      if (err < 0 && err != -EAGAIN) {
+        GST_ERROR ("Error in snd_seq_event_input: %s", snd_strerror (err));
+        gst_app_src_end_of_stream (GST_APP_SRC (appsrc));
         break;
+      }
       if (event) {
+        time = GST_TIMESPEC_TO_TIME (event->time.time) + app->delay;
+
+        /*
+         * Special handling is needed because decoding SND_SEQ_EVENT_TICK is
+         * not supported by the ALSA sequencer.
+         */
+        if (event->type == SND_SEQ_EVENT_TICK) {
+          push_tick_buffer (app, time);
+          schedule_tick (app);
+          break;
+        }
+
         size_ev =
             snd_midi_event_decode (app->parser, app->buffer, DEFAULT_BUFSIZE,
             event);
@@ -243,14 +314,15 @@ feed_data (GstElement * appsrc, guint size, App * app)
           /* ENOENT indicates an event that is not a MIDI message, silently skip it */
           if (-ENOENT == size_ev) {
             GST_WARNING ("Warning: Received non-MIDI message");
-            push_tick_buffer (app);
+            goto poll;
           } else {
             GST_ERROR ("Error decoding event from ALSA to output: %s",
                 strerror (-size_ev));
+            gst_app_src_end_of_stream (GST_APP_SRC (appsrc));
             break;
           }
         } else {
-          push_buffer (app, app->buffer, size_ev);
+          push_buffer (app, app->buffer, size_ev, time);
         }
       }
     } while (err > 0);
@@ -296,6 +368,39 @@ bus_message (GstBus * bus, GstMessage * message, App * app)
       g_main_loop_quit (app->loop);
       break;
     }
+    case GST_MESSAGE_STATE_CHANGED:
+    {
+      GstState old_state, new_state;
+
+      gst_message_parse_state_changed (message, &old_state, &new_state, NULL);
+      if (new_state == GST_STATE_PLAYING) {
+        GstClockTime gst_time;
+        GstClockTime base_time;
+        GstClockTime running_time;
+        GstClockTime queue_time;
+        GstClock *clock;
+        snd_seq_queue_status_t *status;
+
+        if (app->tick == 0) {
+          start_queue_timer (app->seq, app->queue);
+          schedule_tick (app);
+        }
+
+        clock = gst_element_get_clock (GST_ELEMENT (app->appsrc));
+        gst_time = gst_clock_get_time (clock);
+        gst_object_unref (clock);
+        base_time = gst_element_get_base_time (GST_ELEMENT (app->appsrc));
+        running_time = gst_time - base_time;
+
+        snd_seq_queue_status_malloc (&status);
+        snd_seq_get_queue_status (app->seq, 0, status);
+        queue_time = GST_TIMESPEC_TO_TIME (*snd_seq_queue_status_get_real_time (status));
+        snd_seq_queue_status_free (status);
+
+        app->delay = running_time - queue_time;
+      }
+      break;
+    }
     case GST_MESSAGE_EOS:
       g_main_loop_quit (app->loop);
       break;
@@ -360,7 +465,7 @@ err_free_buffer:
 err_free_parser:
   snd_midi_event_free (app->parser);
 err_free_ports:
-  free (app->ports);
+  g_free (app->seq_ports);
 err_seq_close:
   snd_seq_close (app->seq);
 err:
@@ -374,7 +479,7 @@ app_finalize (App * app)
   free (app->pfds);
   free (app->buffer);
   snd_midi_event_free (app->parser);
-  free (app->ports);
+  g_free (app->seq_ports);
   snd_seq_close (app->seq);
 }
 
@@ -398,9 +503,12 @@ main (int argc, char *argv[])
   GOptionContext *ctx;
   GError *err = NULL;
   gchar *ports = NULL;
+  gboolean verbose = FALSE;
   GOptionEntry options[] = {
     {"ports", 'p', 0, G_OPTION_ARG_STRING, &ports,
         "Comma separated list of sequencer ports", "client:port,..."},
+    {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose,
+         "Output status information and property notifications", NULL},
     {NULL}
   };
 
@@ -418,7 +526,7 @@ main (int argc, char *argv[])
 
   gst_init (&argc, &argv);
 
-  GST_DEBUG_CATEGORY_INIT (mysrc_debug, "mysrc", 0,
+  GST_DEBUG_CATEGORY_INIT (mysource_debug, "mysource", 0,
       "ALSA MIDI sequencer appsrc pipeline");
 
   ret = app_init (app, ports);
@@ -439,6 +547,9 @@ main (int argc, char *argv[])
       ("appsrc name=mysource ! fluiddec ! audioconvert ! autoaudiosink", NULL);
   g_assert (app->pipeline);
 
+  if (verbose)
+    g_signal_connect (app->pipeline, "deep-notify", G_CALLBACK (gst_object_default_deep_notify), NULL);
+
   bus = gst_pipeline_get_bus (GST_PIPELINE (app->pipeline));
   g_assert (bus);