#!/usr/bin/env python3 # # CallDistanceTransceiver - send and receive Morse using phone calls distance # # Copyright (C) 2015 Antonio Ospite # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # This hack allows MorseTranslator to be imported also when # __name__ == "__main__" try: from .MorseTranslator import MorseTranslator except SystemError: from MorseTranslator import MorseTranslator import logging import time class SymbolTime(object): """ In theory the symbol distance (the distance to discriminate symbols) can be arbitrary, and it's only bounded below by the stability of the period time, but in practice it can be necessary to wait a predefined minimum amount of time between periods because of technological limits of the transmitting devices, we call this time the "minimum inter-symbol distance". """ # pylint: disable=too-few-public-methods def __init__(self, period_min, period_max, multiplier, min_inter_symbol_distance=0.0): assert multiplier >= 0 if (period_min == period_max) and (min_inter_symbol_distance == 0): raise ValueError("If (period_min == period_max) a non-zero", "inter-symbol distance MUST be specified") symbol_distance = 2 * (period_max - period_min) if symbol_distance == 0: symbol_distance = min_inter_symbol_distance # The time the transmitter has to wait to disambiguate between this # symbol and a different one. self.dist = min_inter_symbol_distance + symbol_distance * multiplier # The minimum time which represents the symbol at the receiving end. self.min = min_inter_symbol_distance + period_min + \ symbol_distance * multiplier # The maximum time which represents the symbol at the receiving end self.max = min_inter_symbol_distance + period_min + \ symbol_distance * (multiplier + 1) class MorseDistanceModulator(object): def __init__(self, period_min, period_max, pulse_min, pulse_max, inter_symbol_distance): self.set_parameters(period_min, period_max, pulse_min, pulse_max, inter_symbol_distance) def set_parameters(self, period_min, period_max, pulse_min, pulse_max, inter_symbol_distance): self.period_min = period_min self.period_max = period_max self.pulse_min = pulse_min self.pulse_max = pulse_max self.inter_symbol_distance = inter_symbol_distance self.dot_time = SymbolTime(period_min, period_max, 0, inter_symbol_distance) self.dash_time = SymbolTime(period_min, period_max, 1, inter_symbol_distance) self.signalspace_time = SymbolTime(period_min, period_max, 2, inter_symbol_distance) self.wordspace_time = SymbolTime(period_min, period_max, 3, inter_symbol_distance) self.eom_time = SymbolTime(period_min, period_max, 4, inter_symbol_distance) def symbol_to_distance(self, symbol): if symbol == ".": return self.dot_time.dist elif symbol == "-": return self.dash_time.dist elif symbol == " ": return self.signalspace_time.dist elif symbol == "/" or symbol == " / ": return self.wordspace_time.dist elif symbol == "EOM": return self.eom_time.dist raise ValueError("Unexpected symbol %s" % symbol) def is_same_period(self, distance): return distance > self.pulse_min and distance <= self.pulse_max def distance_to_symbol(self, distance): if distance > self.dot_time.min and \ distance <= self.dot_time.max: return "." if distance > self.dash_time.min and \ distance <= self.dash_time.max: return "-" if distance > self.signalspace_time.min and \ distance <= self.signalspace_time.max: return " " if distance > self.wordspace_time.min and \ distance <= self.wordspace_time.max: return "/" if distance > self.eom_time.min: return "EOM" raise ValueError("Unexpected distance %.2f" % distance) def modulate(self, morse): signals = morse.split(' ') distances = [] for i, signal in enumerate(signals): for symbol in signal: distances.append(self.symbol_to_distance(symbol)) # Transmit a signal separator only when strictly necessary. # # Avoid it in these cases: # - after the last symbol, because EOM is going to ne transmitted # anyway and that will mark the end of the last symbol. # - between words, because the word separator act as a symbol # separator too. if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/": distances.append(self.symbol_to_distance(" ")) distances.append(self.symbol_to_distance("EOM")) # Since the Morse signals are encoded in the distance between calls, an # extra call is needed in order for receiver actually get the EOM and # see that the transmission has terminated. distances.append(0) return distances class CallDistanceTransceiver(object): """Transmit Morse messages using the distance between calls. This is basically a pulse-distance modulation (PDM). A RING is a pulse and the Morse symbols are encoded in the pause between the first ring of the previous call and the first ring of a new call. This strategy is very slow but it can even be used with ancient analog modems which don't have call progress notifications for outgoing calls, and make it also hard to count multiple rings in the same call on the receiving side because there is no explicit notification on the receiving side of when the caller ends a calls. For GSM modems, which have a more sophisticate call report signalling, a more efficient encoding can be used (for example using ring counts to encode Morse symbols, i.e. a pulse-length modulation), but for a proof-of-concept, a slow encoding covering the most general setup is fine. Plus, supporting communications between analog modems is cool :) """ def __init__(self, modem, call_setup_time_min=7, call_setup_time_max=15, ring_time_min=4.8, ring_time_max=5.2, add_inter_call_distance=True): """Encode the Morse symbols using the distance between calls. Args: call_setup_time_min: the minimum time between when the transmitter dials the number and the receiver reports RING notifications. call_setup_time_max: the maximum time between when the transmitter dials the number and the receiver reports RING notifications. Waiting this time after dialing ensures that the receiver has received _at_least_ one ring. The default chosen here have been tested to work fine with old analog modems, provided that there is some more distance between two calls, and with Android receivers. ring_time_min: the minimum time between two consecutive RINGs in the same call. ring_time_max: the maximum time between two consecutive RINGs in the same call, the standard interval is about 5 seconds, but line and/or software delays can make it vary. This is needed in order to ignore multiple ring in the same call when multiple RING in the same call are notified, and then be able to discriminate between two different calls. add_inter_call_distance: specify if it is needed to wait an extra fixed time between calls. """ self.modem = modem self.translator = MorseTranslator() self.destination_number = "" self.call_setup_time_max = call_setup_time_max if add_inter_call_distance: # Analog modems don't like to dial a new call immediately after # they hung up the previous one; an extra delay of about one # ring time makes them happy. inter_symbol_distance = ring_time_max else: inter_symbol_distance = 0 self.modulator = MorseDistanceModulator(call_setup_time_min, call_setup_time_max, ring_time_min, ring_time_max, inter_symbol_distance) logging.debug("call setup time between %.2f and %.2f " "--------- dot transmit time: %.2f + %.2f " "receive time: between %.2f and %.2f", self.modulator.period_min, self.modulator.period_max, self.modulator.period_max, self.modulator.dot_time.dist, self.modulator.dot_time.min, self.modulator.dot_time.max) logging.debug("call setup time between %.2f and %.2f " "-------- dash transmit time: %.2f + %.2f " "receive time: between %.2f and %.2f", self.modulator.period_min, self.modulator.period_max, self.modulator.period_max, self.modulator.dash_time.dist, self.modulator.dash_time.min, self.modulator.dash_time.max) logging.debug("call setup time between %.2f and %.2f " "- signalspace transmit time: %.2f + %.2f " "receive time: between %.2f and %.2f", self.modulator.period_min, self.modulator.period_max, self.modulator.period_max, self.modulator.signalspace_time.dist, self.modulator.signalspace_time.min, self.modulator.signalspace_time.max) logging.debug("call setup time between %.2f and %.2f " "--- wordspace transmit time: %.2f + %.2f " "receive time: between %.2f and %.2f", self.modulator.period_min, self.modulator.period_max, self.modulator.period_max, self.modulator.wordspace_time.dist, self.modulator.wordspace_time.min, self.modulator.wordspace_time.max) logging.debug("call setup time between %.2f and %.2f " "--------- EOM transmit time: %.2f + %.2f " "receive time: between %.2f and inf", self.modulator.period_min, self.modulator.period_max, self.modulator.period_max, self.modulator.eom_time.dist, self.modulator.eom_time.min) self.previous_ring_time = -1 self.previous_call_time = -1 self.morse_message = "" self.text_message = "" self.end_of_message = False def log_symbol(self, distance, symbol, extra_info=""): logging.info("distance: %.2f Received \"%s\"%s", distance, symbol, extra_info) def receive_character(self): current_ring_time = time.time() if self.previous_ring_time == -1: self.previous_ring_time = current_ring_time self.previous_call_time = current_ring_time self.log_symbol(0, "", "(The very first ring)") return ring_distance = current_ring_time - self.previous_ring_time logging.debug("RINGs distance: %.2f", ring_distance) self.previous_ring_time = current_ring_time # Ignore multiple rings in the same call if self.modulator.is_same_period(ring_distance): logging.debug("multiple rings in the same call, distance: %.2f", ring_distance) return call_distance = current_ring_time - self.previous_call_time self.previous_call_time = current_ring_time try: symbol = self.modulator.distance_to_symbol(call_distance) except ValueError as err: logging.error("%s", err) logging.error("Check the transmitter and receiver parameters") return extra_info = "" if symbol in [" ", "/", "EOM"]: signal = self.morse_message.strip().split(' ')[-1] character = self.translator.signal_to_character(signal) extra_info = " got \"%s\"" % character self.log_symbol(call_distance, symbol, extra_info) if symbol != "EOM": # Add spaces around the wordspace symbol to make it easier to split # the Morse message in symbols later on if symbol == "/": symbol = " / " self.morse_message += symbol else: self.end_of_message = True self.previous_ring_time = -1 self.previous_call_time = -1 def receive_loop(self): while not self.end_of_message: self.modem.get_response("RING") self.receive_character() logging.debug("Current message: %s", self.morse_message) self.end_of_message = False self.text_message = self.translator.morse_to_text(self.morse_message) self.morse_message = "" def get_text(self): return self.text_message def transmit_symbol(self, symbol, sleep_time): logging.info("Dial and wait %.2f = %.2f + %.2f seconds " "(transmitting '%s')", self.call_setup_time_max + sleep_time, self.call_setup_time_max, sleep_time, symbol) # Dial, then wait self.call_setup_time_max to make sure the receiver # gets at least one RING, and then hangup and sleep the time needed to # transmit a symbol. self.modem.send_command("ATDT" + self.destination_number + ";") time.sleep(self.call_setup_time_max) self.modem.send_command("ATH") self.modem.get_response() time.sleep(sleep_time) def transmit(self, message, destination_number): self.destination_number = destination_number morse_message = self.translator.text_to_morse(message) distances = self.modulator.modulate(morse_message) logging.debug("Starting the transmission") for i, distance in enumerate(distances): # Use 'None' for the last call if i == len(distances) - 1: symbol = None else: total_sleep_time = self.call_setup_time_max + distance symbol = self.modulator.distance_to_symbol(total_sleep_time) self.transmit_symbol(symbol, distance) def estimate_transmit_duration(self, message): morsemessage = self.translator.text_to_morse(message) logging.debug(morsemessage) distances = self.modulator.modulate(morsemessage) transmitting_time = 0 for distance in distances: transmitting_time += self.call_setup_time_max transmitting_time += distance logging.debug("Estimated transmitting time: %.2f seconds", transmitting_time) def test_send_receive(): logging.basicConfig(level=logging.DEBUG) call_setup_time_min = 0 call_setup_time_max = 0.01 ring_time_min = 0 ring_time_max = 0 import random class DummyModem(object): """Always receive a '.', a '/' and then EOM, which results in 'E '.""" def __init__(self): self.ring_count = 0 # Take trasmission times from a transceiver self.transceiver = None random.seed(None) def send_command(self, command): pass def get_response(self, response): # pylint: disable=unused-argument setup_time = random.uniform(self.transceiver.call_setup_time_min, self.transceiver.call_setup_time_max) if self.ring_count == 0: # dummy ring pass elif self.ring_count == 1: # received a '.' time.sleep(setup_time + self.transceiver.dot_time.dist) elif self.ring_count == 2: # received a '/' time.sleep(setup_time + self.transceiver.wordspace_time.dist) else: # received an 'EOM' time.sleep(setup_time + self.transceiver.eom_time.dist) self.ring_count += 1 self.ring_count %= 4 modem = DummyModem() xcv = CallDistanceTransceiver(modem, call_setup_time_min, call_setup_time_max, ring_time_min, ring_time_max, True) modem.transceiver = xcv while True: xcv.receive_loop() modem.ring_count = 0 print() print("Message received!") print("\"%s\"" % xcv.get_text(), flush=True) if __name__ == "__main__": test_send_receive()