#!/usr/bin/env python3 # # CallDistanceTransceiver - send and receive Morse using phone calls distance # # Copyright (C) 2015 Antonio Ospite # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # This hack allows MorseTranslator to be imported also when # __name__ == "__main__" try: from .MorseTranslator import MorseTranslator except SystemError: from MorseTranslator import MorseTranslator import logging import time class SymbolTime(object): """ In theory the symbol distance (the distance to discriminate symbols) can be arbitrary, and it's only bounded below by the stability of the pulse time, but in practice it can be necessary to wait a predefined minimum amount of time between pulses because of technological limits of the transmitting devices, we call this time the "minimum inter-symbol distance". """ # pylint: disable=too-few-public-methods def __init__(self, pulse_min, pulse_max, multiplier, min_inter_symbol_distance=0.0): assert multiplier >= 0 if (pulse_min == pulse_max) and (min_inter_symbol_distance == 0): raise ValueError("If (pulse_min == pulse_max) a non-zero", "inter-symbol distance MUST be specified") symbol_distance = 2 * (pulse_max - pulse_min) if symbol_distance == 0: symbol_distance = min_inter_symbol_distance # The time the transmitter has to wait to disambiguate between this # symbol and a different one. self.dist = min_inter_symbol_distance + symbol_distance * multiplier # The minimum time which represents the symbol at the receiving end. self.min = min_inter_symbol_distance + pulse_min + \ symbol_distance * multiplier # The maximum time which represents the symbol at the receiving end self.max = min_inter_symbol_distance + pulse_min + \ symbol_distance * (multiplier + 1) class CallDistanceTransceiver(object): """Transmit Morse messages using the distance between calls. This is basically a pulse-distance modulation (PDM). A RING is a pulse and the Morse symbols are encoded in the pause between the first ring of the previous call and the first ring of a new call. This strategy is very slow but it can even be used with ancient analog modems which don't have call progress notifications for outgoing calls, and make it also hard to count multiple rings in the same call on the receiving side because there is no explicit notification on the receiving side of when the caller ends a calls. For GSM modems, which have a more sophisticate call report signalling, a more efficient encoding can be used (for example using ring counts to encode Morse symbols, i.e. a pulse-length modulation), but for a proof-of-concept, a slow encoding covering the most general setup is fine. Plus, supporting communications between analog modems is cool :) """ def __init__(self, modem, call_setup_time_min=7, call_setup_time_max=15, ring_time_min=4.8, ring_time_max=5.2, add_inter_call_distance=True): """Encode the Morse symbols using the distance between calls. Args: call_setup_time_min: the minimum time between when the transmitter dials the number and the receiver reports RING notifications. call_setup_time_max: the maximum time between when the transmitter dials the number and the receiver reports RING notifications. Waiting this time after dialing ensures that the receiver has received _at_least_ one ring. The default chosen here have been tested to work fine with old analog modems, provided that there is some more distance between two calls, and with Android receivers. ring_time_min: the minimum time between two consecutive RINGs in the same call. ring_time_max: the maximum time between two consecutive RINGs in the same call, the standard interval is about 5 seconds, but line and/or software delays can make it vary. This is needed in order to ignore multiple ring in the same call when multiple RING in the same call are notified, and then be able to discriminate between two different calls. add_inter_call_distance: specify if it is needed to wait an extra fixed time between calls. """ self.modem = modem self.translator = MorseTranslator() self.destination_number = "" self.call_setup_time_min = call_setup_time_min self.call_setup_time_max = call_setup_time_max self.ring_time_min = ring_time_min self.ring_time_max = ring_time_max if add_inter_call_distance: # Analog modems don't like to dial a new call immediately after # they hung up the previous one; an extra delay of about one # ring time makes them happy. inter_symbol_distance = ring_time_max else: inter_symbol_distance = 0 self.dot_time = SymbolTime(call_setup_time_min, call_setup_time_max, 0, inter_symbol_distance) self.dash_time = SymbolTime(call_setup_time_min, call_setup_time_max, 1, inter_symbol_distance) self.signalspace_time = SymbolTime(call_setup_time_min, call_setup_time_max, 2, inter_symbol_distance) self.wordspace_time = SymbolTime(call_setup_time_min, call_setup_time_max, 3, inter_symbol_distance) self.eom_time = SymbolTime(call_setup_time_min, call_setup_time_max, 4, inter_symbol_distance) logging.debug("call setup time between %.2f and %.2f " "--------- dot transmit time: %.2f + %.2f " "receive time: between %.2f and %.2f", self.call_setup_time_min, self.call_setup_time_max, self.call_setup_time_max, self.dot_time.dist, self.dot_time.min, self.dot_time.max) logging.debug("call setup time between %.2f and %.2f " "-------- dash transmit time: %.2f + %.2f " "receive time: between %.2f and %.2f", self.call_setup_time_min, self.call_setup_time_max, self.call_setup_time_max, self.dash_time.dist, self.dash_time.min, self.dash_time.max) logging.debug("call setup time between %.2f and %.2f " "- signalspace transmit time: %.2f + %.2f " "receive time: between %.2f and %.2f", self.call_setup_time_min, self.call_setup_time_max, self.call_setup_time_max, self.signalspace_time.dist, self.signalspace_time.min, self.signalspace_time.max) logging.debug("call setup time between %.2f and %.2f " "--- wordspace transmit time: %.2f + %.2f " "receive time: between %.2f and %.2f", self.call_setup_time_min, self.call_setup_time_max, self.call_setup_time_max, self.wordspace_time.dist, self.wordspace_time.min, self.wordspace_time.max) logging.debug("call setup time between %.2f and %.2f " "--------- EOM transmit time: %.2f + %.2f " "receive time: between %.2f and inf", self.call_setup_time_min, self.call_setup_time_max, self.call_setup_time_max, self.eom_time.dist, self.eom_time.min) self.previous_ring_time = -1 self.previous_call_time = -1 self.morse_message = "" self.text_message = "" self.end_of_message = False def log_symbol(self, distance, symbol, extra_info=""): logging.info("distance: %.2f Received \"%s\" %s", distance, symbol, extra_info) def receive_character(self): current_ring_time = time.time() if self.previous_ring_time == -1: self.previous_ring_time = current_ring_time self.previous_call_time = current_ring_time self.log_symbol(0, "", "(The very first ring)") return ring_distance = current_ring_time - self.previous_ring_time logging.debug("RINGs distance: %.2f", ring_distance) self.previous_ring_time = current_ring_time # Ignore multiple rings in the same call if ring_distance > self.ring_time_min and \ ring_distance <= self.ring_time_max: logging.debug("multiple rings in the same call, distance: %.2f", ring_distance) return call_distance = current_ring_time - self.previous_call_time self.previous_call_time = current_ring_time if call_distance > self.dot_time.min and \ call_distance <= self.dot_time.max: self.log_symbol(call_distance, '.') self.morse_message += "." return if call_distance > self.dash_time.min and \ call_distance <= self.dash_time.max: self.log_symbol(call_distance, '-') self.morse_message += "-" return if call_distance > self.signalspace_time.min and \ call_distance <= self.signalspace_time.max: signal = self.morse_message.strip().split(' ')[-1] character = self.translator.signal_to_character(signal) self.log_symbol(call_distance, ' ', "got \"%s\"" % character) self.morse_message += " " return if call_distance > self.wordspace_time.min and \ call_distance <= self.wordspace_time.max: signal = self.morse_message.strip().split(' ')[-1] character = self.translator.signal_to_character(signal) self.log_symbol(call_distance, '/', "got \"%s\"" % character) self.morse_message += " / " return if call_distance > self.eom_time.min: signal = self.morse_message.strip().split(' ')[-1] character = self.translator.signal_to_character(signal) self.log_symbol(call_distance, 'EOM', "got \"%s\"" % character) self.end_of_message = True self.previous_ring_time = -1 self.previous_call_time = -1 return # if the code made it up to here, something fishy is going on logging.error("Unexpected distance: %.2f", call_distance) logging.error("Check the transmitter and receiver parameters") def receive_loop(self): while not self.end_of_message: self.modem.get_response("RING") self.receive_character() logging.debug("Current message: %s", self.morse_message) self.end_of_message = False self.text_message = self.translator.morse_to_text(self.morse_message) self.morse_message = "" def get_text(self): return self.text_message def transmit_symbol(self, symbol): if symbol == ".": sleep_time = self.dot_time.dist elif symbol == "-": sleep_time = self.dash_time.dist elif symbol == " ": sleep_time = self.signalspace_time.dist elif symbol == "/": sleep_time = self.wordspace_time.dist elif symbol == "EOM": sleep_time = self.eom_time.dist elif symbol is None: # To terminate the transmission just call and hangup, with no extra # distance sleep_time = 0 logging.info("Dial and wait %.2f = %.2f + %.2f seconds " "(transmitting '%s')", self.call_setup_time_max + sleep_time, self.call_setup_time_max, sleep_time, symbol) # Dial, then wait self.call_setup_time_max to make sure the receiver # gets at least one RING, and then hangup and sleep the time needed to # transmit a symbol. self.modem.send_command("ATDT" + self.destination_number + ";") time.sleep(self.call_setup_time_max) self.modem.send_command("ATH") self.modem.get_response() time.sleep(sleep_time) def transmit_signal(self, signal): logging.debug("Transmitting signal: %s", signal) for symbol in signal: self.transmit_symbol(symbol) def transmit(self, message, destination_number): self.destination_number = destination_number morse_message = self.translator.text_to_morse(message) signals = morse_message.split() logging.debug("Starting the transmission") for i, signal in enumerate(signals): logging.debug("Transmitting '%s' as '%s'", message[i], signal) self.transmit_signal(signal) # Transmit a signal separator only when strictly necessary: # - after the last symbol, we are going to transmit an EOM # anyway, and that will mark the end of the last symbol. # - between words the word separator act as a symbol separator # too. if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/": self.transmit_symbol(" ") self.transmit_symbol("EOM") # Since the Morse signals are encoded in the distance between calls, an # extra call is needed in order for receiver actually get the EOM and # see that the transmission has terminated. self.transmit_symbol(None) def estimate_transmit_duration(self, message): morsemessage = self.translator.text_to_morse(message) signals = morsemessage.split() logging.debug(signals) transmitting_time = 0 for i, signal in enumerate(signals): logging.debug("signal: %s", signal) for symbol in signal: transmitting_time += self.call_setup_time_max if symbol == ".": transmitting_time += self.dot_time.dist elif symbol == "-": transmitting_time += self.dash_time.dist elif symbol == "/": transmitting_time += self.wordspace_time.dist if i != len(signals) - 1 and signals[i + 1] != "/" and symbol != "/": transmitting_time += self.call_setup_time_max transmitting_time += self.signalspace_time.dist transmitting_time += self.call_setup_time_max transmitting_time += self.eom_time.dist # The final call needed for the receiver to get the EOM transmitting_time += self.call_setup_time_max logging.debug("Estimated transmitting time: %.2f seconds", transmitting_time) def test_send_receive(): logging.basicConfig(level=logging.DEBUG) call_setup_time_min = 0 call_setup_time_max = 0.01 ring_time_min = 0 ring_time_max = 0 import random class DummyModem(object): """Always receive a '.', a '/' and then EOM, which results in 'E '.""" def __init__(self): self.ring_count = 0 # Take trasmission times from a transceiver self.transceiver = None random.seed(None) def send_command(self, command): pass def get_response(self, response): # pylint: disable=unused-argument setup_time = random.uniform(self.transceiver.call_setup_time_min, self.transceiver.call_setup_time_max) if self.ring_count == 0: # dummy ring pass elif self.ring_count == 1: # received a '.' time.sleep(setup_time + self.transceiver.dot_time.dist) elif self.ring_count == 2: # received a '/' time.sleep(setup_time + self.transceiver.wordspace_time.dist) else: # received an 'EOM' time.sleep(setup_time + self.transceiver.eom_time.dist) self.ring_count += 1 self.ring_count %= 4 modem = DummyModem() xcv = CallDistanceTransceiver(modem, call_setup_time_min, call_setup_time_max, ring_time_min, ring_time_max, True) modem.transceiver = xcv while True: xcv.receive_loop() modem.ring_count = 0 print() print("Message received!") print("\"%s\"" % xcv.get_text(), flush=True) if __name__ == "__main__": test_send_receive()