+#!/usr/bin/env python3
+#
+# CallDistanceTransceiver - send and receive Morse using phone calls distance
+#
+# Copyright (C) 2015 Antonio Ospite <ao2@ao2.it>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+# This hack allows MorseTranslator to be imported also when
+# __name__ == "__main__"
+try:
+ from .MorseTranslator import MorseTranslator
+except SystemError:
+ from MorseTranslator import MorseTranslator
+
+import logging
+import time
+
+
+class CallDistanceTransceiver(object):
+ """Transmit Morse messages using the distance between calls.
+
+ This is basically a pulse-distance modulation (PDM).
+
+ A RING is a pulse and the Morse symbols are encoded in the pause between
+ the first ring of the previous call and the first ring of a new call.
+
+ This strategy is very slow but it can even be used with ancient analog
+ modems which don't have call progress notifications for outgoing calls, and
+ make it also hard to count multiple rings in the same call on the receiving
+ side because there is no explicit notification on the receiving side of
+ when the caller ends a calls.
+
+ For GSM modems, which have a more sophisticate call report signalling,
+ a more efficient encoding can be used (for example using ring counts to
+ encode Morse symbols, i.e. a pulse-length modulation), but for
+ a proof-of-concept, a slow encoding covering the most general setup is
+ fine.
+
+ Plus, supporting communications between analog modems is cool :)
+
+ """
+
+ def __init__(self, modem,
+ call_setup_average=8.5, call_setup_uncertainty=1.5,
+ ring_distance_average=5, ring_uncertainty=0.2):
+ """Encode the Morse symbols using the distance between calls.
+
+ Args:
+ call_setup_average: the time between when the transmitter dials
+ the number and the receiver reports RING notifications. This is
+ needed in order to be sure than the receiver has received
+ _at_least_ one ring. The default chosen here should work fine
+ even with old analog modems, provided that there is some more
+ distance between two calls.
+ call_setup_uncertainty: uncertainty of call_setup_average,
+ a tolerance value
+ ring_distance_average: the time between two consecutive RINGs
+ in the same call, the standard interval is about 5 seconds, but
+ some tolerance is needed to account for line delays.
+ This is needed in order to ignore rings in the same call, and
+ differentiate two different calls.
+ ring_uncertainty: uncertainty of ring_distance_average, a tolerance
+ value
+ """
+
+ self.modem = modem
+ self.translator = MorseTranslator()
+
+ self.destination_number = ""
+
+ self.call_setup_time = call_setup_average + call_setup_uncertainty
+ self.rings_distance = ring_distance_average + ring_uncertainty
+
+ # In theory the symbol distance, the distance between calls which
+ # represent symbols, can be arbitrary, but in practice it's better to
+ # wait at least the duration of a ring between terminating one call and
+ # initiating the next call, as pick-up and hang-up can take some time
+ # with old analog modems.
+ symbol_distance = self.rings_distance
+
+ def symbol_time(multiplier):
+ return self.call_setup_time + symbol_distance * multiplier
+
+ self.dot_time = symbol_time(1)
+ self.dash_time = symbol_time(2)
+ self.signalspace_time = symbol_time(3)
+ self.wordspace_time = symbol_time(4)
+ self.eom_time = symbol_time(5)
+
+ self.ring_uncertainty = ring_uncertainty
+ self.symbol_uncertainty = symbol_distance / 2.
+
+ logging.debug("dot time: transmit: %.2f receive: (%.2f, %.2f)",
+ self.dot_time,
+ (self.dot_time - self.symbol_uncertainty),
+ (self.dot_time + self.symbol_uncertainty))
+ logging.debug("dash time: transmit: %.2f receive: (%.2f, %.2f)",
+ self.dash_time,
+ (self.dash_time - self.symbol_uncertainty),
+ (self.dash_time + self.symbol_uncertainty))
+ logging.debug("signalspace time: transmit: %.2f receive: (%.2f, %.2f)",
+ self.signalspace_time,
+ (self.signalspace_time - self.symbol_uncertainty),
+ (self.signalspace_time + self.symbol_uncertainty))
+ logging.debug("wordspace time: transmit: %.2f receive: (%.2f, %.2f)",
+ self.wordspace_time,
+ (self.wordspace_time - self.symbol_uncertainty),
+ (self.wordspace_time + self.symbol_uncertainty))
+ logging.debug("EOM time: transmit: %.2f receive: (%.2f, +inf)",
+ self.eom_time,
+ (self.eom_time - self.symbol_uncertainty))
+
+ self.previous_ring_time = -1
+ self.previous_call_time = -1
+
+ self.morse_message = ""
+ self.text_message = ""
+
+ self.end_of_message = False
+
+ def log_symbol(self, distance, symbol, extra_info=""):
+ logging.info("distance: %.2f Received \"%s\" %s", distance, symbol,
+ extra_info)
+
+ def receive_character(self):
+ current_ring_time = time.time()
+
+ if self.previous_ring_time == -1:
+ self.previous_ring_time = current_ring_time
+ self.previous_call_time = current_ring_time
+ self.log_symbol(0, "", "(The very first ring)")
+ return
+ else:
+ ring_distance = current_ring_time - self.previous_ring_time
+ logging.debug("RINGs distance: %.2f", ring_distance)
+ self.previous_ring_time = current_ring_time
+
+ # Ignore multiple rings in the same call
+ if abs(ring_distance - self.rings_distance) < self.ring_uncertainty:
+ logging.debug("multiple rings in the same call, distance: %.2f",
+ ring_distance)
+ return
+
+ call_distance = current_ring_time - self.previous_call_time
+ self.previous_call_time = current_ring_time
+
+ if abs(call_distance - self.dot_time) < self.symbol_uncertainty:
+ self.log_symbol(call_distance, '.')
+ self.morse_message += "."
+ return
+
+ if abs(call_distance - self.dash_time) < self.symbol_uncertainty:
+ self.log_symbol(call_distance, '-')
+ self.morse_message += "-"
+ return
+
+ if abs(call_distance - self.signalspace_time) < self.symbol_uncertainty:
+ signal = self.morse_message.strip().split(' ')[-1]
+ character = self.translator.signal_to_character(signal)
+ self.log_symbol(call_distance, ' ', "got \"%s\"" % character)
+ self.morse_message += " "
+ return
+
+ if abs(call_distance - self.wordspace_time) < self.symbol_uncertainty:
+ signal = self.morse_message.strip().split(' ')[-1]
+ character = self.translator.signal_to_character(signal)
+ self.log_symbol(call_distance, '/', "got \"%s\"" % character)
+ self.morse_message += " / "
+ return
+
+ if call_distance >= self.eom_time - self.symbol_uncertainty:
+ signal = self.morse_message.strip().split(' ')[-1]
+ character = self.translator.signal_to_character(signal)
+ self.log_symbol(call_distance, 'EOM', "got \"%s\"" % character)
+ self.end_of_message = True
+ self.previous_ring_time = -1
+ self.previous_call_time = -1
+ return
+
+ # if the code made it up to here, something fishy is going on
+ logging.error("Unexpected distance: %.2f", ring_distance)
+ logging.error("Check the transmitter and receiver parameters")
+
+ def receive_loop(self):
+ while not self.end_of_message:
+ self.modem.get_response("RING")
+ self.receive_character()
+ logging.debug("Current message: %s", self.morse_message)
+
+ self.end_of_message = False
+ self.text_message = self.translator.morse_to_text(self.morse_message)
+ self.morse_message = ""
+
+ def get_text(self):
+ return self.text_message
+
+ def transmit_symbol(self, symbol):
+ if symbol == ".":
+ sleep_time = self.dot_time
+ elif symbol == "-":
+ sleep_time = self.dash_time
+ elif symbol == " ":
+ sleep_time = self.signalspace_time
+ elif symbol == "/":
+ sleep_time = self.wordspace_time
+ elif symbol == "EOM":
+ sleep_time = self.eom_time
+ elif symbol is None:
+ # To terminate the transmission just call and hangup, with no extra
+ # distance
+ sleep_time = self.call_setup_time
+
+ logging.info("Dial and wait %.2f seconds (transmitting '%s')",
+ sleep_time, symbol)
+
+ # Dial, then wait self.call_setup_time to make sure the receiver gets
+ # at least one RING, and then hangup and sleep the remaining time
+ self.modem.send_command("ATDT" + self.destination_number)
+ time.sleep(self.call_setup_time)
+ self.modem.send_command("ATH")
+ self.modem.get_response()
+ time.sleep(sleep_time - self.call_setup_time)
+
+ def transmit_signal(self, signal):
+ logging.debug("Transmitting signal: %s", signal)
+ for symbol in signal:
+ self.transmit_symbol(symbol)
+
+ def transmit(self, message, destination_number):
+ self.destination_number = destination_number
+
+ morse_message = self.translator.text_to_morse(message)
+ signals = morse_message.split()
+ print(signals)
+
+ logging.debug("Starting the transmission")
+ for i, signal in enumerate(signals):
+ logging.debug("Transmitting '%s' as '%s'", message[i], signal)
+ self.transmit_signal(signal)
+
+ # Transmit a signal separator only when strictly necessary:
+ # - after the last symbol, we are going to transmit an EOM
+ # anyway, and that will mark the end of the last symbol.
+ # - between words the word separator act as a symbol separator
+ # too.
+ if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/":
+ self.transmit_symbol(" ")
+
+ self.transmit_symbol("EOM")
+
+ # Since the Morse signals are encoded in the distance between calls, an
+ # extra call is needed in order for receiver actually get the EOM and
+ # see that the transmission has terminated.
+ self.transmit_symbol(None)
+
+ def estimate_transmit_duration(self, message):
+ morsemessage = self.translator.text_to_morse(message)
+ signals = morsemessage.split()
+
+ logging.debug(signals)
+
+ transmitting_time = 0
+ for i, signal in enumerate(signals):
+ logging.debug("signal: %s", signal)
+
+ for symbol in signal:
+ if symbol == ".":
+ transmitting_time += self.dot_time
+ elif symbol == "-":
+ transmitting_time += self.dash_time
+ elif symbol == "/":
+ transmitting_time += self.wordspace_time
+
+ if i != len(signals) - 1 and signals[i + 1] != "/" and symbol != "/":
+ transmitting_time += self.signalspace_time
+
+ transmitting_time += self.eom_time
+
+ logging.debug("Estimated transmitting time: %d seconds",
+ transmitting_time)
+
+
+def test_send_receive():
+ logging.basicConfig(level=logging.DEBUG)
+ call_setup_time = 2
+ call_setup_uncertainty = 0.4
+ ring_time = 1
+ ring_uncertainty = 0.3
+
+ class DummyModem(object):
+ """Always receive a '.' and then a '/', which result in 'E '."""
+
+ def __init__(self):
+ self.ring_count = 0
+
+ def send_command(self, command):
+ pass
+
+ def get_response(self, response):
+ if self.ring_count % 2:
+ # received a '.'
+ time.sleep(call_setup_time + (ring_time + ring_uncertainty))
+ else:
+ # received a '/'
+ time.sleep(call_setup_time + (ring_time + ring_uncertainty) * 4)
+
+ self.ring_count += 1
+
+ xcv = CallDistanceTransceiver(DummyModem(), call_setup_time,
+ call_setup_uncertainty, ring_time,
+ ring_uncertainty)
+ xcv.receive_loop()
+
+
+if __name__ == "__main__":
+ test_send_receive()