X-Git-Url: https://git.ao2.it/gst-aseq-appsrc.git/blobdiff_plain/9f7e5fde8b002efb97504ee9a4eb464fce4072ed..d49c7665151703d8074a07d2d1834e904ec85758:/gst-aseq-appsrc.c diff --git a/gst-aseq-appsrc.c b/gst-aseq-appsrc.c index 7d4a885..e1b3418 100644 --- a/gst-aseq-appsrc.c +++ b/gst-aseq-appsrc.c @@ -36,10 +36,9 @@ #define DEFAULT_BUFSIZE 65536 #define DEFAULT_CLIENT_NAME "gst-aseq-appsrc" #define DEFAULT_TICK_PERIOD_MS 10 -#define DEFAULT_POLL_TIMEOUT_MS (DEFAULT_TICK_PERIOD_MS / 2) -GST_DEBUG_CATEGORY (mysrc_debug); -#define GST_CAT_DEFAULT mysrc_debug +GST_DEBUG_CATEGORY (mysource_debug); +#define GST_CAT_DEFAULT mysource_debug typedef struct _App App; @@ -51,8 +50,9 @@ struct _App GMainLoop *loop; snd_seq_t *seq; + int queue; int port_count; - snd_seq_addr_t *ports; + snd_seq_addr_t *seq_ports; snd_midi_event_t *parser; unsigned char *buffer; @@ -60,9 +60,10 @@ struct _App int npfds; guint64 tick; + guint64 delay; }; -App s_app; +static App s_app; static int init_seq (App * app) @@ -75,6 +76,12 @@ init_seq (App * app) goto error; } + /* + * Prevent Valgrind from reporting cached configuration as memory leaks, see: + * http://git.alsa-project.org/?p=alsa-lib.git;a=blob;f=MEMORY-LEAK;hb=HEAD + */ + snd_config_update_free_global(); + ret = snd_seq_set_client_name (app->seq, DEFAULT_CLIENT_NAME); if (ret < 0) { GST_ERROR ("Cannot set client name - %s", snd_strerror (ret)); @@ -89,67 +96,122 @@ error: return ret; } -/* parses one or more port addresses from the string */ +/* Parses one or more port addresses from the string */ static int parse_ports (const char *arg, App * app) { - char *buf, *s, *port_name; - int ret; + gchar **ports_list; + guint i; + int ret = 0; GST_DEBUG ("ports: %s", arg); - /* make a copy of the string because we're going to modify it */ - buf = strdup (arg); - if (!buf) { + /* + * Assume that ports are separated by commas. + * + * Commas are used instead of spaces because those are valid in client + * names. + */ + ports_list = g_strsplit (arg, ",", 0); + + app->port_count = g_strv_length (ports_list); + app->seq_ports = g_try_new (snd_seq_addr_t, app->port_count); + if (!app->seq_ports) { GST_ERROR ("Out of memory"); ret = -ENOMEM; - goto out; + goto out_free_ports_list; } - for (port_name = s = buf; s; port_name = s + 1) { - /* Assume that ports are separated by commas. We don't use - * spaces because those are valid in client names. */ - s = strchr (port_name, ','); - if (s) - *s = '\0'; - - app->port_count++; - app->ports = - realloc (app->ports, app->port_count * sizeof (snd_seq_addr_t)); - if (!app->ports) { - GST_ERROR ("Out of memory"); - ret = -ENOMEM; - goto out_free_buf; - } + for (i = 0; i < (guint)app->port_count; i++) { + gchar *port_name = ports_list[i]; - ret = - snd_seq_parse_address (app->seq, &app->ports[app->port_count - 1], + ret = snd_seq_parse_address (app->seq, &app->seq_ports[i], port_name); if (ret < 0) { - GST_ERROR ("Invalid port %s - %s", port_name, snd_strerror (ret)); - goto error_free_ports; + GST_ERROR ("Invalid port %s - %s", port_name, + snd_strerror (ret)); + goto error_free_seq_ports; } } - goto out_free_buf; + goto out_free_ports_list; -error_free_ports: - free (app->ports); -out_free_buf: - free (buf); -out: +error_free_seq_ports: + g_free (app->seq_ports); +out_free_ports_list: + g_strfreev (ports_list); return ret; } static int +start_queue_timer (snd_seq_t *seq, int queue) +{ + int ret; + + ret = snd_seq_start_queue (seq, queue, NULL); + if (ret < 0) { + GST_ERROR ("Timer event output error: %s\n", snd_strerror (ret)); + return ret; + } + + ret = snd_seq_drain_output (seq); + if (ret < 0) + GST_ERROR ("Drain output error: %s\n", snd_strerror (ret)); + + return ret; +} + +static void +schedule_tick (App * app) +{ + snd_seq_event_t ev; + snd_seq_real_time_t time; + int ret; + + snd_seq_ev_clear (&ev); + snd_seq_ev_set_source (&ev, 0); + snd_seq_ev_set_dest (&ev, snd_seq_client_id (app->seq), 0); + + ev.type = SND_SEQ_EVENT_TICK; + + GST_TIME_TO_TIMESPEC (app->tick * DEFAULT_TICK_PERIOD_MS * GST_MSECOND, time); + app->tick += 1; + + snd_seq_ev_schedule_real (&ev, app->queue, 0, &time); + + ret = snd_seq_event_output (app->seq, &ev); + if (ret < 0) + GST_ERROR ("Event output error: %s\n", snd_strerror (ret)); + + ret = snd_seq_drain_output (app->seq); + if (ret < 0) + GST_ERROR ("Event drain error: %s\n", snd_strerror (ret)); +} + +static int create_port (App * app) { + snd_seq_port_info_t *pinfo; int ret; - ret = snd_seq_create_simple_port (app->seq, DEFAULT_CLIENT_NAME, - SND_SEQ_PORT_CAP_WRITE | - SND_SEQ_PORT_CAP_SUBS_WRITE, - SND_SEQ_PORT_TYPE_MIDI_GENERIC | SND_SEQ_PORT_TYPE_APPLICATION); + snd_seq_port_info_alloca (&pinfo); + snd_seq_port_info_set_name (pinfo, DEFAULT_CLIENT_NAME); + snd_seq_port_info_set_type (pinfo, SND_SEQ_PORT_TYPE_MIDI_GENERIC | SND_SEQ_PORT_TYPE_APPLICATION); + snd_seq_port_info_set_capability (pinfo, SND_SEQ_PORT_CAP_WRITE | SND_SEQ_PORT_CAP_SUBS_WRITE); + + ret = snd_seq_alloc_queue (app->seq); + if (ret < 0) { + GST_ERROR ("Cannot allocate queue: %s\n", snd_strerror (ret)); + return ret; + } + + app->queue = ret; + + snd_seq_port_info_set_timestamping (pinfo, 1); + snd_seq_port_info_set_timestamp_real (pinfo, 1); + snd_seq_port_info_set_timestamp_queue (pinfo, app->queue); + + ret = snd_seq_create_port (app->seq, pinfo); if (ret < 0) GST_ERROR ("Cannot create port - %s", snd_strerror (ret)); @@ -164,53 +226,46 @@ connect_ports (App * app) for (i = 0; i < app->port_count; ++i) { ret = - snd_seq_connect_from (app->seq, 0, app->ports[i].client, - app->ports[i].port); + snd_seq_connect_from (app->seq, 0, app->seq_ports[i].client, + app->seq_ports[i].port); if (ret < 0) /* warning */ GST_WARNING ("Cannot connect from port %d:%d - %s", - app->ports[i].client, app->ports[i].port, snd_strerror (ret)); + app->seq_ports[i].client, app->seq_ports[i].port, snd_strerror (ret)); } } static void -push_buffer (App * app, gpointer data, guint size) +push_buffer (App * app, gpointer data, guint size, GstClockTime time) { + gpointer local_data; GstBuffer *buffer; - GstClockTime time; int ret; - gpointer local_data; - /* read the next chunk */ buffer = gst_buffer_new (); - time = app->tick * DEFAULT_TICK_PERIOD_MS * GST_MSECOND; - GST_BUFFER_DTS (buffer) = time; GST_BUFFER_PTS (buffer) = time; - GST_BUFFER_OFFSET (buffer) = time; - GST_BUFFER_DURATION (buffer) = DEFAULT_TICK_PERIOD_MS * GST_MSECOND; - local_data = g_malloc (size); - memcpy (local_data, data, size); + local_data = g_memdup (data, size); gst_buffer_append_memory (buffer, gst_memory_new_wrapped (0, local_data, size, 0, size, local_data, g_free)); + GST_MEMDUMP ("MIDI data:", local_data, size); + GST_DEBUG ("feed buffer %p, tick %" G_GUINT64_FORMAT " size: %u", (gpointer) buffer, app->tick, size); g_signal_emit_by_name (app->appsrc, "push-buffer", buffer, &ret); gst_buffer_unref (buffer); - - app->tick += 1; } static void -push_tick_buffer (App * app) +push_tick_buffer (App * app, GstClockTime time) { app->buffer[0] = 0xf9; - push_buffer (app, app->buffer, 1); + push_buffer (app, app->buffer, 1, time); } /* This method is called by the need-data signal callback, we feed data into the @@ -219,23 +274,39 @@ push_tick_buffer (App * app) static void feed_data (GstElement * appsrc, guint size, App * app) { - long size_ev = 0; + GstClockTime time; + long size_ev; int err; int ret; +poll: snd_seq_poll_descriptors (app->seq, app->pfds, app->npfds, POLLIN); - ret = poll (app->pfds, app->npfds, DEFAULT_POLL_TIMEOUT_MS); - if (ret < 0) { + ret = poll (app->pfds, app->npfds, -1); + if (ret <= 0) { GST_ERROR ("ERROR in poll: %s", strerror (errno)); - } else if (ret == 0) { - push_tick_buffer (app); + gst_app_src_end_of_stream (GST_APP_SRC (appsrc)); } else { do { snd_seq_event_t *event; err = snd_seq_event_input (app->seq, &event); - if (err < 0) + if (err < 0 && err != -EAGAIN) { + GST_ERROR ("Error in snd_seq_event_input: %s", snd_strerror (err)); + gst_app_src_end_of_stream (GST_APP_SRC (appsrc)); break; + } if (event) { + time = GST_TIMESPEC_TO_TIME (event->time.time) + app->delay; + + /* + * Special handling is needed because decoding SND_SEQ_EVENT_TICK is + * not supported by the ALSA sequencer. + */ + if (event->type == SND_SEQ_EVENT_TICK) { + push_tick_buffer (app, time); + schedule_tick (app); + break; + } + size_ev = snd_midi_event_decode (app->parser, app->buffer, DEFAULT_BUFSIZE, event); @@ -243,14 +314,15 @@ feed_data (GstElement * appsrc, guint size, App * app) /* ENOENT indicates an event that is not a MIDI message, silently skip it */ if (-ENOENT == size_ev) { GST_WARNING ("Warning: Received non-MIDI message"); - push_tick_buffer (app); + goto poll; } else { GST_ERROR ("Error decoding event from ALSA to output: %s", strerror (-size_ev)); + gst_app_src_end_of_stream (GST_APP_SRC (appsrc)); break; } } else { - push_buffer (app, app->buffer, size_ev); + push_buffer (app, app->buffer, size_ev, time); } } } while (err > 0); @@ -296,6 +368,39 @@ bus_message (GstBus * bus, GstMessage * message, App * app) g_main_loop_quit (app->loop); break; } + case GST_MESSAGE_STATE_CHANGED: + { + GstState old_state, new_state; + + gst_message_parse_state_changed (message, &old_state, &new_state, NULL); + if (new_state == GST_STATE_PLAYING) { + GstClockTime gst_time; + GstClockTime base_time; + GstClockTime running_time; + GstClockTime queue_time; + GstClock *clock; + snd_seq_queue_status_t *status; + + if (app->tick == 0) { + start_queue_timer (app->seq, app->queue); + schedule_tick (app); + } + + clock = gst_element_get_clock (GST_ELEMENT (app->appsrc)); + gst_time = gst_clock_get_time (clock); + gst_object_unref (clock); + base_time = gst_element_get_base_time (GST_ELEMENT (app->appsrc)); + running_time = gst_time - base_time; + + snd_seq_queue_status_malloc (&status); + snd_seq_get_queue_status (app->seq, 0, status); + queue_time = GST_TIMESPEC_TO_TIME (*snd_seq_queue_status_get_real_time (status)); + snd_seq_queue_status_free (status); + + app->delay = running_time - queue_time; + } + break; + } case GST_MESSAGE_EOS: g_main_loop_quit (app->loop); break; @@ -360,7 +465,7 @@ err_free_buffer: err_free_parser: snd_midi_event_free (app->parser); err_free_ports: - free (app->ports); + g_free (app->seq_ports); err_seq_close: snd_seq_close (app->seq); err: @@ -374,7 +479,7 @@ app_finalize (App * app) free (app->pfds); free (app->buffer); snd_midi_event_free (app->parser); - free (app->ports); + g_free (app->seq_ports); snd_seq_close (app->seq); } @@ -398,9 +503,12 @@ main (int argc, char *argv[]) GOptionContext *ctx; GError *err = NULL; gchar *ports = NULL; + gboolean verbose = FALSE; GOptionEntry options[] = { {"ports", 'p', 0, G_OPTION_ARG_STRING, &ports, "Comma separated list of sequencer ports", "client:port,..."}, + {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, + "Output status information and property notifications", NULL}, {NULL} }; @@ -418,7 +526,7 @@ main (int argc, char *argv[]) gst_init (&argc, &argv); - GST_DEBUG_CATEGORY_INIT (mysrc_debug, "mysrc", 0, + GST_DEBUG_CATEGORY_INIT (mysource_debug, "mysource", 0, "ALSA MIDI sequencer appsrc pipeline"); ret = app_init (app, ports); @@ -439,6 +547,9 @@ main (int argc, char *argv[]) ("appsrc name=mysource ! fluiddec ! audioconvert ! autoaudiosink", NULL); g_assert (app->pipeline); + if (verbose) + g_signal_connect (app->pipeline, "deep-notify", G_CALLBACK (gst_object_default_deep_notify), NULL); + bus = gst_pipeline_get_bus (GST_PIPELINE (app->pipeline)); g_assert (bus);