Improve buffer timestamping
[gst-aseq-appsrc.git] / gst-aseq-appsrc.c
index 1577ebf..e1b3418 100644 (file)
@@ -36,7 +36,6 @@
 #define DEFAULT_BUFSIZE 65536
 #define DEFAULT_CLIENT_NAME "gst-aseq-appsrc"
 #define DEFAULT_TICK_PERIOD_MS 10
-#define DEFAULT_POLL_TIMEOUT_MS (DEFAULT_TICK_PERIOD_MS / 2)
 
 GST_DEBUG_CATEGORY (mysource_debug);
 #define GST_CAT_DEFAULT mysource_debug
@@ -61,6 +60,7 @@ struct _App
   int npfds;
 
   guint64 tick;
+  guint64 delay;
 };
 
 static App s_app;
@@ -161,6 +161,33 @@ start_queue_timer (snd_seq_t *seq, int queue)
   return ret;
 }
 
+static void
+schedule_tick (App * app)
+{
+  snd_seq_event_t ev;
+  snd_seq_real_time_t time;
+  int ret;
+
+  snd_seq_ev_clear (&ev);
+  snd_seq_ev_set_source (&ev, 0);
+  snd_seq_ev_set_dest (&ev, snd_seq_client_id (app->seq), 0);
+
+  ev.type = SND_SEQ_EVENT_TICK;
+
+  GST_TIME_TO_TIMESPEC (app->tick * DEFAULT_TICK_PERIOD_MS * GST_MSECOND, time);
+  app->tick += 1;
+
+  snd_seq_ev_schedule_real (&ev, app->queue, 0, &time);
+
+  ret = snd_seq_event_output (app->seq, &ev);
+  if (ret < 0)
+    GST_ERROR ("Event output error: %s\n", snd_strerror (ret));
+
+  ret = snd_seq_drain_output (app->seq);
+  if (ret < 0)
+    GST_ERROR ("Event drain error: %s\n", snd_strerror (ret));
+}
+
 static int
 create_port (App * app)
 {
@@ -188,8 +215,6 @@ create_port (App * app)
   if (ret < 0)
     GST_ERROR ("Cannot create port - %s", snd_strerror (ret));
 
-  ret = start_queue_timer (app->seq, app->queue);
-
   return ret;
 }
 
@@ -211,17 +236,14 @@ connect_ports (App * app)
 }
 
 static void
-push_buffer (App * app, gpointer data, guint size)
+push_buffer (App * app, gpointer data, guint size, GstClockTime time)
 {
-  GstClockTime time;
   gpointer local_data;
   GstBuffer *buffer;
   int ret;
 
   buffer = gst_buffer_new ();
 
-  time = app->tick * DEFAULT_TICK_PERIOD_MS * GST_MSECOND;
-
   GST_BUFFER_DTS (buffer) = time;
   GST_BUFFER_PTS (buffer) = time;
 
@@ -237,15 +259,13 @@ push_buffer (App * app, gpointer data, guint size)
       (gpointer) buffer, app->tick, size);
   g_signal_emit_by_name (app->appsrc, "push-buffer", buffer, &ret);
   gst_buffer_unref (buffer);
-
-  app->tick += 1;
 }
 
 static void
-push_tick_buffer (App * app)
+push_tick_buffer (App * app, GstClockTime time)
 {
   app->buffer[0] = 0xf9;
-  push_buffer (app, app->buffer, 1);
+  push_buffer (app, app->buffer, 1, time);
 }
 
 /* This method is called by the need-data signal callback, we feed data into the
@@ -254,17 +274,17 @@ push_tick_buffer (App * app)
 static void
 feed_data (GstElement * appsrc, guint size, App * app)
 {
-  long size_ev = 0;
+  GstClockTime time;
+  long size_ev;
   int err;
   int ret;
 
+poll:
   snd_seq_poll_descriptors (app->seq, app->pfds, app->npfds, POLLIN);
-  ret = poll (app->pfds, app->npfds, DEFAULT_POLL_TIMEOUT_MS);
-  if (ret < 0) {
+  ret = poll (app->pfds, app->npfds, -1);
+  if (ret <= 0) {
     GST_ERROR ("ERROR in poll: %s", strerror (errno));
     gst_app_src_end_of_stream (GST_APP_SRC (appsrc));
-  } else if (ret == 0) {
-    push_tick_buffer (app);
   } else {
     do {
       snd_seq_event_t *event;
@@ -275,6 +295,18 @@ feed_data (GstElement * appsrc, guint size, App * app)
         break;
       }
       if (event) {
+        time = GST_TIMESPEC_TO_TIME (event->time.time) + app->delay;
+
+        /*
+         * Special handling is needed because decoding SND_SEQ_EVENT_TICK is
+         * not supported by the ALSA sequencer.
+         */
+        if (event->type == SND_SEQ_EVENT_TICK) {
+          push_tick_buffer (app, time);
+          schedule_tick (app);
+          break;
+        }
+
         size_ev =
             snd_midi_event_decode (app->parser, app->buffer, DEFAULT_BUFSIZE,
             event);
@@ -282,7 +314,7 @@ feed_data (GstElement * appsrc, guint size, App * app)
           /* ENOENT indicates an event that is not a MIDI message, silently skip it */
           if (-ENOENT == size_ev) {
             GST_WARNING ("Warning: Received non-MIDI message");
-            push_tick_buffer (app);
+            goto poll;
           } else {
             GST_ERROR ("Error decoding event from ALSA to output: %s",
                 strerror (-size_ev));
@@ -290,7 +322,7 @@ feed_data (GstElement * appsrc, guint size, App * app)
             break;
           }
         } else {
-          push_buffer (app, app->buffer, size_ev);
+          push_buffer (app, app->buffer, size_ev, time);
         }
       }
     } while (err > 0);
@@ -336,6 +368,39 @@ bus_message (GstBus * bus, GstMessage * message, App * app)
       g_main_loop_quit (app->loop);
       break;
     }
+    case GST_MESSAGE_STATE_CHANGED:
+    {
+      GstState old_state, new_state;
+
+      gst_message_parse_state_changed (message, &old_state, &new_state, NULL);
+      if (new_state == GST_STATE_PLAYING) {
+        GstClockTime gst_time;
+        GstClockTime base_time;
+        GstClockTime running_time;
+        GstClockTime queue_time;
+        GstClock *clock;
+        snd_seq_queue_status_t *status;
+
+        if (app->tick == 0) {
+          start_queue_timer (app->seq, app->queue);
+          schedule_tick (app);
+        }
+
+        clock = gst_element_get_clock (GST_ELEMENT (app->appsrc));
+        gst_time = gst_clock_get_time (clock);
+        gst_object_unref (clock);
+        base_time = gst_element_get_base_time (GST_ELEMENT (app->appsrc));
+        running_time = gst_time - base_time;
+
+        snd_seq_queue_status_malloc (&status);
+        snd_seq_get_queue_status (app->seq, 0, status);
+        queue_time = GST_TIMESPEC_TO_TIME (*snd_seq_queue_status_get_real_time (status));
+        snd_seq_queue_status_free (status);
+
+        app->delay = running_time - queue_time;
+      }
+      break;
+    }
     case GST_MESSAGE_EOS:
       g_main_loop_quit (app->loop);
       break;