Improve buffer timestamping
[gst-aseq-appsrc.git] / gst-aseq-appsrc.c
1 /* GStreamer
2  *
3  * gst-aseq-appsrc: a GStreamer appsrc for the ALSA MIDI sequencer API
4  *
5  * Copyright (C) 2014  Antonio Ospite <ao2@ao2.it>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /* Code _heavily_ inspired to aseqdump and amidicat:
24  * http://git.alsa-project.org/?p=alsa-utils.git;a=tree;f=seq/aseqdump
25  * http://krellan.com/amidicat/
26  */
27
28 #include <gst/gst.h>
29 #include <gst/app/gstappsrc.h>
30
31 #include <string.h>
32 #include <sys/poll.h>
33 #include <alsa/asoundlib.h>
34 #include <glib-unix.h>
35
36 #define DEFAULT_BUFSIZE 65536
37 #define DEFAULT_CLIENT_NAME "gst-aseq-appsrc"
38 #define DEFAULT_TICK_PERIOD_MS 10
39
40 GST_DEBUG_CATEGORY (mysource_debug);
41 #define GST_CAT_DEFAULT mysource_debug
42
43 typedef struct _App App;
44
45 struct _App
46 {
47   GstElement *pipeline;
48   GstElement *appsrc;
49
50   GMainLoop *loop;
51
52   snd_seq_t *seq;
53   int queue;
54   int port_count;
55   snd_seq_addr_t *seq_ports;
56   snd_midi_event_t *parser;
57   unsigned char *buffer;
58
59   struct pollfd *pfds;
60   int npfds;
61
62   guint64 tick;
63   guint64 delay;
64 };
65
66 static App s_app;
67
68 static int
69 init_seq (App * app)
70 {
71   int ret;
72
73   ret = snd_seq_open (&app->seq, "default", SND_SEQ_OPEN_DUPLEX, 0);
74   if (ret < 0) {
75     GST_ERROR ("Cannot open sequencer - %s", snd_strerror (ret));
76     goto error;
77   }
78
79   /*
80    * Prevent Valgrind from reporting cached configuration as memory leaks, see:
81    * http://git.alsa-project.org/?p=alsa-lib.git;a=blob;f=MEMORY-LEAK;hb=HEAD
82    */
83   snd_config_update_free_global();
84
85   ret = snd_seq_set_client_name (app->seq, DEFAULT_CLIENT_NAME);
86   if (ret < 0) {
87     GST_ERROR ("Cannot set client name - %s", snd_strerror (ret));
88     goto error_seq_close;
89   }
90
91   return 0;
92
93 error_seq_close:
94   snd_seq_close (app->seq);
95 error:
96   return ret;
97 }
98
99 /* Parses one or more port addresses from the string */
100 static int
101 parse_ports (const char *arg, App * app)
102 {
103   gchar **ports_list;
104   guint i;
105   int ret = 0;
106
107   GST_DEBUG ("ports: %s", arg);
108
109   /*
110    * Assume that ports are separated by commas.
111    *
112    * Commas are used instead of spaces because those are valid in client
113    * names.
114    */
115   ports_list = g_strsplit (arg, ",", 0);
116
117   app->port_count = g_strv_length (ports_list);
118   app->seq_ports = g_try_new (snd_seq_addr_t, app->port_count);
119   if (!app->seq_ports) {
120     GST_ERROR ("Out of memory");
121     ret = -ENOMEM;
122     goto out_free_ports_list;
123   }
124
125   for (i = 0; i < (guint)app->port_count; i++) {
126     gchar *port_name = ports_list[i];
127
128     ret = snd_seq_parse_address (app->seq, &app->seq_ports[i],
129         port_name);
130     if (ret < 0) {
131       GST_ERROR ("Invalid port %s - %s", port_name,
132           snd_strerror (ret));
133       goto error_free_seq_ports;
134     }
135   }
136
137   goto out_free_ports_list;
138
139 error_free_seq_ports:
140   g_free (app->seq_ports);
141 out_free_ports_list:
142   g_strfreev (ports_list);
143   return ret;
144 }
145
146 static int
147 start_queue_timer (snd_seq_t *seq, int queue)
148 {
149   int ret;
150
151   ret = snd_seq_start_queue (seq, queue, NULL);
152   if (ret < 0) {
153     GST_ERROR ("Timer event output error: %s\n", snd_strerror (ret));
154     return ret;
155   }
156
157   ret = snd_seq_drain_output (seq);
158   if (ret < 0)
159     GST_ERROR ("Drain output error: %s\n", snd_strerror (ret));
160
161   return ret;
162 }
163
164 static void
165 schedule_tick (App * app)
166 {
167   snd_seq_event_t ev;
168   snd_seq_real_time_t time;
169   int ret;
170
171   snd_seq_ev_clear (&ev);
172   snd_seq_ev_set_source (&ev, 0);
173   snd_seq_ev_set_dest (&ev, snd_seq_client_id (app->seq), 0);
174
175   ev.type = SND_SEQ_EVENT_TICK;
176
177   GST_TIME_TO_TIMESPEC (app->tick * DEFAULT_TICK_PERIOD_MS * GST_MSECOND, time);
178   app->tick += 1;
179
180   snd_seq_ev_schedule_real (&ev, app->queue, 0, &time);
181
182   ret = snd_seq_event_output (app->seq, &ev);
183   if (ret < 0)
184     GST_ERROR ("Event output error: %s\n", snd_strerror (ret));
185
186   ret = snd_seq_drain_output (app->seq);
187   if (ret < 0)
188     GST_ERROR ("Event drain error: %s\n", snd_strerror (ret));
189 }
190
191 static int
192 create_port (App * app)
193 {
194   snd_seq_port_info_t *pinfo;
195   int ret;
196
197   snd_seq_port_info_alloca (&pinfo);
198   snd_seq_port_info_set_name (pinfo, DEFAULT_CLIENT_NAME);
199   snd_seq_port_info_set_type (pinfo, SND_SEQ_PORT_TYPE_MIDI_GENERIC | SND_SEQ_PORT_TYPE_APPLICATION);
200   snd_seq_port_info_set_capability (pinfo, SND_SEQ_PORT_CAP_WRITE | SND_SEQ_PORT_CAP_SUBS_WRITE);
201
202   ret = snd_seq_alloc_queue (app->seq);
203   if (ret < 0) {
204     GST_ERROR ("Cannot allocate queue: %s\n", snd_strerror (ret));
205     return ret;
206   }
207
208   app->queue = ret;
209
210   snd_seq_port_info_set_timestamping (pinfo, 1);
211   snd_seq_port_info_set_timestamp_real (pinfo, 1);
212   snd_seq_port_info_set_timestamp_queue (pinfo, app->queue);
213
214   ret = snd_seq_create_port (app->seq, pinfo);
215   if (ret < 0)
216     GST_ERROR ("Cannot create port - %s", snd_strerror (ret));
217
218   return ret;
219 }
220
221 static void
222 connect_ports (App * app)
223 {
224   int i;
225   int ret;
226
227   for (i = 0; i < app->port_count; ++i) {
228     ret =
229         snd_seq_connect_from (app->seq, 0, app->seq_ports[i].client,
230         app->seq_ports[i].port);
231     if (ret < 0)
232       /* warning */
233       GST_WARNING ("Cannot connect from port %d:%d - %s",
234           app->seq_ports[i].client, app->seq_ports[i].port, snd_strerror (ret));
235   }
236 }
237
238 static void
239 push_buffer (App * app, gpointer data, guint size, GstClockTime time)
240 {
241   gpointer local_data;
242   GstBuffer *buffer;
243   int ret;
244
245   buffer = gst_buffer_new ();
246
247   GST_BUFFER_DTS (buffer) = time;
248   GST_BUFFER_PTS (buffer) = time;
249
250   local_data = g_memdup (data, size);
251
252   gst_buffer_append_memory (buffer,
253       gst_memory_new_wrapped (0, local_data, size, 0, size, local_data,
254           g_free));
255
256   GST_MEMDUMP ("MIDI data:", local_data, size);
257
258   GST_DEBUG ("feed buffer %p, tick %" G_GUINT64_FORMAT " size: %u",
259       (gpointer) buffer, app->tick, size);
260   g_signal_emit_by_name (app->appsrc, "push-buffer", buffer, &ret);
261   gst_buffer_unref (buffer);
262 }
263
264 static void
265 push_tick_buffer (App * app, GstClockTime time)
266 {
267   app->buffer[0] = 0xf9;
268   push_buffer (app, app->buffer, 1, time);
269 }
270
271 /* This method is called by the need-data signal callback, we feed data into the
272  * appsrc.
273  */
274 static void
275 feed_data (GstElement * appsrc, guint size, App * app)
276 {
277   GstClockTime time;
278   long size_ev;
279   int err;
280   int ret;
281
282 poll:
283   snd_seq_poll_descriptors (app->seq, app->pfds, app->npfds, POLLIN);
284   ret = poll (app->pfds, app->npfds, -1);
285   if (ret <= 0) {
286     GST_ERROR ("ERROR in poll: %s", strerror (errno));
287     gst_app_src_end_of_stream (GST_APP_SRC (appsrc));
288   } else {
289     do {
290       snd_seq_event_t *event;
291       err = snd_seq_event_input (app->seq, &event);
292       if (err < 0 && err != -EAGAIN) {
293         GST_ERROR ("Error in snd_seq_event_input: %s", snd_strerror (err));
294         gst_app_src_end_of_stream (GST_APP_SRC (appsrc));
295         break;
296       }
297       if (event) {
298         time = GST_TIMESPEC_TO_TIME (event->time.time) + app->delay;
299
300         /*
301          * Special handling is needed because decoding SND_SEQ_EVENT_TICK is
302          * not supported by the ALSA sequencer.
303          */
304         if (event->type == SND_SEQ_EVENT_TICK) {
305           push_tick_buffer (app, time);
306           schedule_tick (app);
307           break;
308         }
309
310         size_ev =
311             snd_midi_event_decode (app->parser, app->buffer, DEFAULT_BUFSIZE,
312             event);
313         if (size_ev < 0) {
314           /* ENOENT indicates an event that is not a MIDI message, silently skip it */
315           if (-ENOENT == size_ev) {
316             GST_WARNING ("Warning: Received non-MIDI message");
317             goto poll;
318           } else {
319             GST_ERROR ("Error decoding event from ALSA to output: %s",
320                 strerror (-size_ev));
321             gst_app_src_end_of_stream (GST_APP_SRC (appsrc));
322             break;
323           }
324         } else {
325           push_buffer (app, app->buffer, size_ev, time);
326         }
327       }
328     } while (err > 0);
329   }
330
331   return;
332 }
333
334 /* this callback is called when pipeline has constructed a source object to read
335  * from. Since we provided the appsrc:// uri to pipeline, this will be the
336  * appsrc that we must handle. We set up a signals to push data into appsrc. */
337 static void
338 found_source (GObject * object, GObject * orig, GParamSpec * pspec, App * app)
339 {
340   /* get a handle to the appsrc */
341   g_object_get (orig, pspec->name, &app->appsrc, NULL);
342
343   GST_DEBUG ("got appsrc %p", (gpointer) app->appsrc);
344
345   /* configure the appsrc, we will push a buffer to appsrc when it needs more
346    * data */
347   g_signal_connect (app->appsrc, "need-data", G_CALLBACK (feed_data), app);
348 }
349
350 static gboolean
351 bus_message (GstBus * bus, GstMessage * message, App * app)
352 {
353   GST_DEBUG ("got message %s",
354       gst_message_type_get_name (GST_MESSAGE_TYPE (message)));
355
356   switch (GST_MESSAGE_TYPE (message)) {
357     case GST_MESSAGE_ERROR:
358     {
359       GError *error = NULL;
360       gchar *dbg_info = NULL;
361
362       gst_message_parse_error (message, &error, &dbg_info);
363       g_printerr ("ERROR from element %s: %s",
364           GST_OBJECT_NAME (message->src), error->message);
365       g_printerr ("Debugging info: %s", (dbg_info) ? dbg_info : "none");
366       g_error_free (error);
367       g_free (dbg_info);
368       g_main_loop_quit (app->loop);
369       break;
370     }
371     case GST_MESSAGE_STATE_CHANGED:
372     {
373       GstState old_state, new_state;
374
375       gst_message_parse_state_changed (message, &old_state, &new_state, NULL);
376       if (new_state == GST_STATE_PLAYING) {
377         GstClockTime gst_time;
378         GstClockTime base_time;
379         GstClockTime running_time;
380         GstClockTime queue_time;
381         GstClock *clock;
382         snd_seq_queue_status_t *status;
383
384         if (app->tick == 0) {
385           start_queue_timer (app->seq, app->queue);
386           schedule_tick (app);
387         }
388
389         clock = gst_element_get_clock (GST_ELEMENT (app->appsrc));
390         gst_time = gst_clock_get_time (clock);
391         gst_object_unref (clock);
392         base_time = gst_element_get_base_time (GST_ELEMENT (app->appsrc));
393         running_time = gst_time - base_time;
394
395         snd_seq_queue_status_malloc (&status);
396         snd_seq_get_queue_status (app->seq, 0, status);
397         queue_time = GST_TIMESPEC_TO_TIME (*snd_seq_queue_status_get_real_time (status));
398         snd_seq_queue_status_free (status);
399
400         app->delay = running_time - queue_time;
401       }
402       break;
403     }
404     case GST_MESSAGE_EOS:
405       g_main_loop_quit (app->loop);
406       break;
407     default:
408       break;
409   }
410   return TRUE;
411 }
412
413 static int
414 app_init (App * app, char *ports)
415 {
416   int ret;
417
418   app->tick = 0;
419
420   ret = init_seq (app);
421   if (ret < 0)
422     goto err;
423
424   if (ports) {
425     ret = parse_ports (ports, app);
426     if (ret < 0)
427       goto err_seq_close;
428   }
429
430   ret = create_port (app);
431   if (ret < 0)
432     goto err_free_ports;
433
434   connect_ports (app);
435
436   ret = snd_seq_nonblock (app->seq, 1);
437   if (ret < 0) {
438     GST_ERROR ("Cannot set nonblock mode - %s", snd_strerror (ret));
439     goto err_free_ports;
440   }
441
442   snd_midi_event_new (DEFAULT_BUFSIZE, &app->parser);
443   snd_midi_event_init (app->parser);
444   snd_midi_event_reset_decode (app->parser);
445
446   snd_midi_event_no_status (app->parser, 1);
447
448   app->buffer = malloc (DEFAULT_BUFSIZE);
449   if (app->buffer == NULL) {
450     ret = -ENOMEM;
451     goto err_free_parser;
452   }
453
454   app->npfds = snd_seq_poll_descriptors_count (app->seq, POLLIN);
455   app->pfds = malloc (sizeof (*app->pfds) * app->npfds);
456   if (app->pfds == NULL) {
457     ret = -ENOMEM;
458     goto err_free_buffer;
459   }
460
461   return 0;
462
463 err_free_buffer:
464   free (app->buffer);
465 err_free_parser:
466   snd_midi_event_free (app->parser);
467 err_free_ports:
468   g_free (app->seq_ports);
469 err_seq_close:
470   snd_seq_close (app->seq);
471 err:
472   return ret;
473 }
474
475 static void
476 app_finalize (App * app)
477 {
478   /* free the resources */
479   free (app->pfds);
480   free (app->buffer);
481   snd_midi_event_free (app->parser);
482   g_free (app->seq_ports);
483   snd_seq_close (app->seq);
484 }
485
486 static gboolean
487 on_sigint (gpointer user_data)
488 {
489   GMainLoop *loop = (GMainLoop *) user_data;
490   g_message ("Caught SIGINT. Initiating shutdown.");
491   g_main_loop_quit (loop);
492   return FALSE;
493 }
494
495 int
496 main (int argc, char *argv[])
497 {
498   App *app = &s_app;
499   GstBus *bus;
500   GstCaps *caps;
501   int ret;
502
503   GOptionContext *ctx;
504   GError *err = NULL;
505   gchar *ports = NULL;
506   gboolean verbose = FALSE;
507   GOptionEntry options[] = {
508     {"ports", 'p', 0, G_OPTION_ARG_STRING, &ports,
509         "Comma separated list of sequencer ports", "client:port,..."},
510     {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose,
511          "Output status information and property notifications", NULL},
512     {NULL}
513   };
514
515   ctx = g_option_context_new (NULL);
516   g_option_context_add_main_entries (ctx, options, NULL);
517   g_option_context_add_group (ctx, gst_init_get_option_group ());
518   if (!g_option_context_parse (ctx, &argc, &argv, &err)) {
519     if (err)
520       g_printerr ("Error initializing: %s\n", GST_STR_NULL (err->message));
521     else
522       g_printerr ("Error initializing: Unknown error!\n");
523     exit (1);
524   }
525   g_option_context_free (ctx);
526
527   gst_init (&argc, &argv);
528
529   GST_DEBUG_CATEGORY_INIT (mysource_debug, "mysource", 0,
530       "ALSA MIDI sequencer appsrc pipeline");
531
532   ret = app_init (app, ports);
533   if (ret < 0)
534     return ret;
535   free (ports);
536
537   if (app->port_count > 0)
538     printf ("Waiting for data.\n");
539   else
540     printf ("Waiting for data at port %d:0.\n", snd_seq_client_id (app->seq));
541
542   /* create a mainloop to get messages */
543   app->loop = g_main_loop_new (NULL, FALSE);
544
545   app->pipeline =
546       gst_parse_launch
547       ("appsrc name=mysource ! fluiddec ! audioconvert ! autoaudiosink", NULL);
548   g_assert (app->pipeline);
549
550   if (verbose)
551     g_signal_connect (app->pipeline, "deep-notify", G_CALLBACK (gst_object_default_deep_notify), NULL);
552
553   bus = gst_pipeline_get_bus (GST_PIPELINE (app->pipeline));
554   g_assert (bus);
555
556   /* add watch for messages */
557   gst_bus_add_watch (bus, (GstBusFunc) bus_message, app);
558
559   /* get the appsrc */
560   app->appsrc = gst_bin_get_by_name (GST_BIN (app->pipeline), "mysource");
561   g_assert (app->appsrc);
562   g_assert (GST_IS_APP_SRC (app->appsrc));
563   g_signal_connect (app->appsrc, "need-data", G_CALLBACK (feed_data), app);
564
565   g_object_set (app->appsrc, "format", GST_FORMAT_TIME, NULL);
566   g_object_set (app->appsrc, "is-live", TRUE, NULL);
567
568   /* set the caps on the source */
569   caps = gst_caps_new_simple ("audio/x-midi-event", NULL, NULL);
570   gst_app_src_set_caps (GST_APP_SRC (app->appsrc), caps);
571   gst_caps_unref (caps);
572
573   /* get notification when the source is created so that we get a handle to it
574    * and can configure it */
575   g_signal_connect (app->pipeline, "deep-notify::source",
576       (GCallback) found_source, app);
577
578   /* go to playing and wait in a mainloop. */
579   gst_element_set_state (app->pipeline, GST_STATE_PLAYING);
580
581   /* this mainloop is stopped when we receive an error or EOS, or on SIGINT */
582   g_unix_signal_add (SIGINT, on_sigint, app->loop);
583   g_main_loop_run (app->loop);
584
585   g_main_loop_unref (app->loop);
586
587   GST_DEBUG ("stopping");
588
589   gst_element_set_state (app->pipeline, GST_STATE_NULL);
590
591   gst_object_unref (app->appsrc);
592
593   gst_object_unref (bus);
594
595   gst_object_unref (app->pipeline);
596
597   app_finalize (app);
598
599   return 0;
600 }