#!/usr/bin/env python3 # # CallDistanceTransceiver - send and receive Morse using phone calls distance # # Copyright (C) 2015 Antonio Ospite # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # This hack allows MorseTranslator to be imported also when # __name__ == "__main__" try: from .MorseTranslator import MorseTranslator except SystemError: from MorseTranslator import MorseTranslator import logging import time class CallDistanceTransceiver(object): """Transmit Morse messages using the distance between calls. This is basically a pulse-distance modulation (PDM). A RING is a pulse and the Morse symbols are encoded in the pause between the first ring of the previous call and the first ring of a new call. This strategy is very slow but it can even be used with ancient analog modems which don't have call progress notifications for outgoing calls, and make it also hard to count multiple rings in the same call on the receiving side because there is no explicit notification on the receiving side of when the caller ends a calls. For GSM modems, which have a more sophisticate call report signalling, a more efficient encoding can be used (for example using ring counts to encode Morse symbols, i.e. a pulse-length modulation), but for a proof-of-concept, a slow encoding covering the most general setup is fine. Plus, supporting communications between analog modems is cool :) """ def __init__(self, modem, call_setup_average=8.5, call_setup_uncertainty=1.5, ring_distance_average=5, ring_uncertainty=0.2): """Encode the Morse symbols using the distance between calls. Args: call_setup_average: the time between when the transmitter dials the number and the receiver reports RING notifications. This is needed in order to be sure than the receiver has received _at_least_ one ring. The default chosen here should work fine even with old analog modems, provided that there is some more distance between two calls. call_setup_uncertainty: uncertainty of call_setup_average, a tolerance value ring_distance_average: the time between two consecutive RINGs in the same call, the standard interval is about 5 seconds, but some tolerance is needed to account for line delays. This is needed in order to ignore rings in the same call, and differentiate two different calls. ring_uncertainty: uncertainty of ring_distance_average, a tolerance value """ self.modem = modem self.translator = MorseTranslator() self.destination_number = "" self.call_setup_time = call_setup_average + call_setup_uncertainty self.rings_distance = ring_distance_average + ring_uncertainty # In theory the symbol distance, the distance between calls which # represent symbols, can be arbitrary, but in practice it's better to # wait at least the duration of a ring between terminating one call and # initiating the next call, as pick-up and hang-up can take some time # with old analog modems. symbol_distance = self.rings_distance def symbol_time(multiplier): return self.call_setup_time + symbol_distance * multiplier self.dot_time = symbol_time(1) self.dash_time = symbol_time(2) self.signalspace_time = symbol_time(3) self.wordspace_time = symbol_time(4) self.eom_time = symbol_time(5) self.ring_uncertainty = ring_uncertainty self.symbol_uncertainty = symbol_distance / 2. logging.debug("dot time: transmit: %.2f receive: (%.2f, %.2f)", self.dot_time, (self.dot_time - self.symbol_uncertainty), (self.dot_time + self.symbol_uncertainty)) logging.debug("dash time: transmit: %.2f receive: (%.2f, %.2f)", self.dash_time, (self.dash_time - self.symbol_uncertainty), (self.dash_time + self.symbol_uncertainty)) logging.debug("signalspace time: transmit: %.2f receive: (%.2f, %.2f)", self.signalspace_time, (self.signalspace_time - self.symbol_uncertainty), (self.signalspace_time + self.symbol_uncertainty)) logging.debug("wordspace time: transmit: %.2f receive: (%.2f, %.2f)", self.wordspace_time, (self.wordspace_time - self.symbol_uncertainty), (self.wordspace_time + self.symbol_uncertainty)) logging.debug("EOM time: transmit: %.2f receive: (%.2f, +inf)", self.eom_time, (self.eom_time - self.symbol_uncertainty)) self.previous_ring_time = -1 self.previous_call_time = -1 self.morse_message = "" self.text_message = "" self.end_of_message = False def log_symbol(self, distance, symbol, extra_info=""): logging.info("distance: %.2f Received \"%s\" %s", distance, symbol, extra_info) def receive_character(self): current_ring_time = time.time() if self.previous_ring_time == -1: self.previous_ring_time = current_ring_time self.previous_call_time = current_ring_time self.log_symbol(0, "", "(The very first ring)") return ring_distance = current_ring_time - self.previous_ring_time logging.debug("RINGs distance: %.2f", ring_distance) self.previous_ring_time = current_ring_time # Ignore multiple rings in the same call if abs(ring_distance - self.rings_distance) < self.ring_uncertainty: logging.debug("multiple rings in the same call, distance: %.2f", ring_distance) return call_distance = current_ring_time - self.previous_call_time self.previous_call_time = current_ring_time if abs(call_distance - self.dot_time) < self.symbol_uncertainty: self.log_symbol(call_distance, '.') self.morse_message += "." return if abs(call_distance - self.dash_time) < self.symbol_uncertainty: self.log_symbol(call_distance, '-') self.morse_message += "-" return if abs(call_distance - self.signalspace_time) < self.symbol_uncertainty: signal = self.morse_message.strip().split(' ')[-1] character = self.translator.signal_to_character(signal) self.log_symbol(call_distance, ' ', "got \"%s\"" % character) self.morse_message += " " return if abs(call_distance - self.wordspace_time) < self.symbol_uncertainty: signal = self.morse_message.strip().split(' ')[-1] character = self.translator.signal_to_character(signal) self.log_symbol(call_distance, '/', "got \"%s\"" % character) self.morse_message += " / " return if call_distance >= self.eom_time - self.symbol_uncertainty: signal = self.morse_message.strip().split(' ')[-1] character = self.translator.signal_to_character(signal) self.log_symbol(call_distance, 'EOM', "got \"%s\"" % character) self.end_of_message = True self.previous_ring_time = -1 self.previous_call_time = -1 return # if the code made it up to here, something fishy is going on logging.error("Unexpected distance: %.2f", call_distance) logging.error("Check the transmitter and receiver parameters") def receive_loop(self): while not self.end_of_message: self.modem.get_response("RING") self.receive_character() logging.debug("Current message: %s", self.morse_message) self.end_of_message = False self.text_message = self.translator.morse_to_text(self.morse_message) self.morse_message = "" def get_text(self): return self.text_message def transmit_symbol(self, symbol): if symbol == ".": sleep_time = self.dot_time elif symbol == "-": sleep_time = self.dash_time elif symbol == " ": sleep_time = self.signalspace_time elif symbol == "/": sleep_time = self.wordspace_time elif symbol == "EOM": sleep_time = self.eom_time elif symbol is None: # To terminate the transmission just call and hangup, with no extra # distance sleep_time = self.call_setup_time logging.info("Dial and wait %.2f seconds (transmitting '%s')", sleep_time, symbol) # Dial, then wait self.call_setup_time to make sure the receiver gets # at least one RING, and then hangup and sleep the remaining time self.modem.send_command("ATDT" + self.destination_number + ";") time.sleep(self.call_setup_time) self.modem.send_command("ATH") self.modem.get_response() time.sleep(sleep_time - self.call_setup_time) def transmit_signal(self, signal): logging.debug("Transmitting signal: %s", signal) for symbol in signal: self.transmit_symbol(symbol) def transmit(self, message, destination_number): self.destination_number = destination_number morse_message = self.translator.text_to_morse(message) signals = morse_message.split() print(signals) logging.debug("Starting the transmission") for i, signal in enumerate(signals): logging.debug("Transmitting '%s' as '%s'", message[i], signal) self.transmit_signal(signal) # Transmit a signal separator only when strictly necessary: # - after the last symbol, we are going to transmit an EOM # anyway, and that will mark the end of the last symbol. # - between words the word separator act as a symbol separator # too. if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/": self.transmit_symbol(" ") self.transmit_symbol("EOM") # Since the Morse signals are encoded in the distance between calls, an # extra call is needed in order for receiver actually get the EOM and # see that the transmission has terminated. self.transmit_symbol(None) def estimate_transmit_duration(self, message): morsemessage = self.translator.text_to_morse(message) signals = morsemessage.split() logging.debug(signals) transmitting_time = 0 for i, signal in enumerate(signals): logging.debug("signal: %s", signal) for symbol in signal: if symbol == ".": transmitting_time += self.dot_time elif symbol == "-": transmitting_time += self.dash_time elif symbol == "/": transmitting_time += self.wordspace_time if i != len(signals) - 1 and signals[i + 1] != "/" and symbol != "/": transmitting_time += self.signalspace_time transmitting_time += self.eom_time logging.debug("Estimated transmitting time: %.2f seconds", transmitting_time) def test_send_receive(): logging.basicConfig(level=logging.DEBUG) call_setup_time = 2 call_setup_uncertainty = 0.4 ring_time = 1 ring_uncertainty = 0.3 class DummyModem(object): """Always receive a '.' and then a '/', which result in 'E '.""" def __init__(self): self.ring_count = 0 def send_command(self, command): pass def get_response(self, response): if self.ring_count % 2: # received a '.' time.sleep(call_setup_time + (ring_time + ring_uncertainty)) else: # received a '/' time.sleep(call_setup_time + (ring_time + ring_uncertainty) * 4) self.ring_count += 1 xcv = CallDistanceTransceiver(DummyModem(), call_setup_time, call_setup_uncertainty, ring_time, ring_uncertainty) xcv.receive_loop() if __name__ == "__main__": test_send_receive()