class SymbolTime(object):
"""
In theory the symbol distance (the distance to discriminate symbols) can
- be arbitrary, and it's only bounded below by the stability of the pulse
+ be arbitrary, and it's only bounded below by the stability of the period
time, but in practice it can be necessary to wait a predefined minimum
- amount of time between pulses because of technological limits of the
+ amount of time between periods because of technological limits of the
transmitting devices, we call this time the "minimum inter-symbol
distance".
"""
# pylint: disable=too-few-public-methods
- def __init__(self, pulse_min, pulse_max, multiplier,
+ def __init__(self, period_min, period_max, multiplier,
min_inter_symbol_distance=0.0):
assert multiplier >= 0
- if (pulse_min == pulse_max) and (min_inter_symbol_distance == 0):
- raise ValueError("If (pulse_min == pulse_max) a non-zero",
+ if (period_min == period_max) and (min_inter_symbol_distance == 0):
+ raise ValueError("If (period_min == period_max) a non-zero",
"inter-symbol distance MUST be specified")
- symbol_distance = 2 * (pulse_max - pulse_min)
+ symbol_distance = 2 * (period_max - period_min)
if symbol_distance == 0:
symbol_distance = min_inter_symbol_distance
self.dist = min_inter_symbol_distance + symbol_distance * multiplier
# The minimum time which represents the symbol at the receiving end.
- self.min = min_inter_symbol_distance + pulse_min + \
+ self.min = min_inter_symbol_distance + period_min + \
symbol_distance * multiplier
# The maximum time which represents the symbol at the receiving end
- self.max = min_inter_symbol_distance + pulse_min + \
+ self.max = min_inter_symbol_distance + period_min + \
symbol_distance * (multiplier + 1)
+class MorseDistanceModulator(object):
+ def __init__(self, period_min, period_max, pulse_min, pulse_max,
+ inter_symbol_distance):
+ self.set_parameters(period_min, period_max,
+ pulse_min, pulse_max,
+ inter_symbol_distance)
+
+ def set_parameters(self, period_min, period_max, pulse_min, pulse_max,
+ inter_symbol_distance):
+ self.period_min = period_min
+ self.period_max = period_max
+ self.pulse_min = pulse_min
+ self.pulse_max = pulse_max
+ self.inter_symbol_distance = inter_symbol_distance
+
+ self.dot_time = SymbolTime(period_min,
+ period_max, 0,
+ inter_symbol_distance)
+ self.dash_time = SymbolTime(period_min,
+ period_max, 1,
+ inter_symbol_distance)
+ self.signalspace_time = SymbolTime(period_min,
+ period_max, 2,
+ inter_symbol_distance)
+ self.wordspace_time = SymbolTime(period_min,
+ period_max, 3,
+ inter_symbol_distance)
+ self.eom_time = SymbolTime(period_min,
+ period_max, 4,
+ inter_symbol_distance)
+
+ def symbol_to_distance(self, symbol):
+ if symbol == ".":
+ return self.dot_time.dist
+ elif symbol == "-":
+ return self.dash_time.dist
+ elif symbol == " ":
+ return self.signalspace_time.dist
+ elif symbol == "/" or symbol == " / ":
+ return self.wordspace_time.dist
+ elif symbol == "EOM":
+ return self.eom_time.dist
+
+ raise ValueError("Unexpected symbol %s" % symbol)
+
+ def is_same_period(self, distance):
+ return distance > self.pulse_min and distance <= self.pulse_max
+
+ def distance_to_symbol(self, distance):
+ if distance > self.dot_time.min and \
+ distance <= self.dot_time.max:
+ return "."
+
+ if distance > self.dash_time.min and \
+ distance <= self.dash_time.max:
+ return "-"
+
+ if distance > self.signalspace_time.min and \
+ distance <= self.signalspace_time.max:
+ return " "
+
+ if distance > self.wordspace_time.min and \
+ distance <= self.wordspace_time.max:
+ return "/"
+
+ if distance > self.eom_time.min:
+ return "EOM"
+
+ raise ValueError("Unexpected distance %.2f" % distance)
+
+ def modulate(self, morse):
+ signals = morse.split(' ')
+ distances = []
+ for i, signal in enumerate(signals):
+ for symbol in signal:
+ distances.append(self.symbol_to_distance(symbol))
+
+ # Transmit a signal separator only when strictly necessary.
+ #
+ # Avoid it in these cases:
+ # - after the last symbol, because EOM is going to ne transmitted
+ # anyway and that will mark the end of the last symbol.
+ # - between words, because the word separator act as a symbol
+ # separator too.
+ if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/":
+ distances.append(self.symbol_to_distance(" "))
+
+ distances.append(self.symbol_to_distance("EOM"))
+
+ # Since the Morse signals are encoded in the distance between calls, an
+ # extra call is needed in order for receiver actually get the EOM and
+ # see that the transmission has terminated.
+ distances.append(0)
+
+ return distances
+
+
class CallDistanceTransceiver(object):
"""Transmit Morse messages using the distance between calls.
self.destination_number = ""
- self.call_setup_time_min = call_setup_time_min
self.call_setup_time_max = call_setup_time_max
- self.ring_time_min = ring_time_min
- self.ring_time_max = ring_time_max
-
if add_inter_call_distance:
# Analog modems don't like to dial a new call immediately after
# they hung up the previous one; an extra delay of about one
else:
inter_symbol_distance = 0
- self.dot_time = SymbolTime(call_setup_time_min,
- call_setup_time_max, 0,
- inter_symbol_distance)
- self.dash_time = SymbolTime(call_setup_time_min,
- call_setup_time_max, 1,
- inter_symbol_distance)
- self.signalspace_time = SymbolTime(call_setup_time_min,
- call_setup_time_max, 2,
- inter_symbol_distance)
- self.wordspace_time = SymbolTime(call_setup_time_min,
- call_setup_time_max, 3,
- inter_symbol_distance)
- self.eom_time = SymbolTime(call_setup_time_min,
- call_setup_time_max, 4,
- inter_symbol_distance)
+ self.modulator = MorseDistanceModulator(call_setup_time_min,
+ call_setup_time_max,
+ ring_time_min,
+ ring_time_max,
+ inter_symbol_distance)
logging.debug("call setup time between %.2f and %.2f "
"--------- dot transmit time: %.2f + %.2f "
"receive time: between %.2f and %.2f",
- self.call_setup_time_min,
- self.call_setup_time_max,
- self.call_setup_time_max,
- self.dot_time.dist,
- self.dot_time.min,
- self.dot_time.max)
+ self.modulator.period_min,
+ self.modulator.period_max,
+ self.modulator.period_max,
+ self.modulator.dot_time.dist,
+ self.modulator.dot_time.min,
+ self.modulator.dot_time.max)
logging.debug("call setup time between %.2f and %.2f "
"-------- dash transmit time: %.2f + %.2f "
"receive time: between %.2f and %.2f",
- self.call_setup_time_min,
- self.call_setup_time_max,
- self.call_setup_time_max,
- self.dash_time.dist,
- self.dash_time.min,
- self.dash_time.max)
+ self.modulator.period_min,
+ self.modulator.period_max,
+ self.modulator.period_max,
+ self.modulator.dash_time.dist,
+ self.modulator.dash_time.min,
+ self.modulator.dash_time.max)
logging.debug("call setup time between %.2f and %.2f "
"- signalspace transmit time: %.2f + %.2f "
"receive time: between %.2f and %.2f",
- self.call_setup_time_min,
- self.call_setup_time_max,
- self.call_setup_time_max,
- self.signalspace_time.dist,
- self.signalspace_time.min,
- self.signalspace_time.max)
+ self.modulator.period_min,
+ self.modulator.period_max,
+ self.modulator.period_max,
+ self.modulator.signalspace_time.dist,
+ self.modulator.signalspace_time.min,
+ self.modulator.signalspace_time.max)
logging.debug("call setup time between %.2f and %.2f "
"--- wordspace transmit time: %.2f + %.2f "
"receive time: between %.2f and %.2f",
- self.call_setup_time_min,
- self.call_setup_time_max,
- self.call_setup_time_max,
- self.wordspace_time.dist,
- self.wordspace_time.min,
- self.wordspace_time.max)
+ self.modulator.period_min,
+ self.modulator.period_max,
+ self.modulator.period_max,
+ self.modulator.wordspace_time.dist,
+ self.modulator.wordspace_time.min,
+ self.modulator.wordspace_time.max)
logging.debug("call setup time between %.2f and %.2f "
"--------- EOM transmit time: %.2f + %.2f "
"receive time: between %.2f and inf",
- self.call_setup_time_min,
- self.call_setup_time_max,
- self.call_setup_time_max,
- self.eom_time.dist,
- self.eom_time.min)
+ self.modulator.period_min,
+ self.modulator.period_max,
+ self.modulator.period_max,
+ self.modulator.eom_time.dist,
+ self.modulator.eom_time.min)
self.previous_ring_time = -1
self.previous_call_time = -1
self.end_of_message = False
def log_symbol(self, distance, symbol, extra_info=""):
- logging.info("distance: %.2f Received \"%s\" %s", distance, symbol,
+ logging.info("distance: %.2f Received \"%s\"%s", distance, symbol,
extra_info)
def receive_character(self):
self.previous_ring_time = current_ring_time
# Ignore multiple rings in the same call
- if ring_distance > self.ring_time_min and \
- ring_distance <= self.ring_time_max:
+ if self.modulator.is_same_period(ring_distance):
logging.debug("multiple rings in the same call, distance: %.2f",
ring_distance)
return
call_distance = current_ring_time - self.previous_call_time
self.previous_call_time = current_ring_time
- if call_distance > self.dot_time.min and \
- call_distance <= self.dot_time.max:
- self.log_symbol(call_distance, '.')
- self.morse_message += "."
- return
-
- if call_distance > self.dash_time.min and \
- call_distance <= self.dash_time.max:
- self.log_symbol(call_distance, '-')
- self.morse_message += "-"
+ try:
+ symbol = self.modulator.distance_to_symbol(call_distance)
+ except ValueError as err:
+ logging.error("%s", err)
+ logging.error("Check the transmitter and receiver parameters")
return
- if call_distance > self.signalspace_time.min and \
- call_distance <= self.signalspace_time.max:
+ extra_info = ""
+ if symbol in [" ", "/", "EOM"]:
signal = self.morse_message.strip().split(' ')[-1]
character = self.translator.signal_to_character(signal)
- self.log_symbol(call_distance, ' ', "got \"%s\"" % character)
- self.morse_message += " "
- return
+ extra_info = " got \"%s\"" % character
- if call_distance > self.wordspace_time.min and \
- call_distance <= self.wordspace_time.max:
- signal = self.morse_message.strip().split(' ')[-1]
- character = self.translator.signal_to_character(signal)
- self.log_symbol(call_distance, '/', "got \"%s\"" % character)
- self.morse_message += " / "
- return
+ self.log_symbol(call_distance, symbol, extra_info)
- if call_distance > self.eom_time.min:
- signal = self.morse_message.strip().split(' ')[-1]
- character = self.translator.signal_to_character(signal)
- self.log_symbol(call_distance, 'EOM', "got \"%s\"" % character)
+ if symbol != "EOM":
+ # Add spaces around the wordspace symbol to make it easier to split
+ # the Morse message in symbols later on
+ if symbol == "/":
+ symbol = " / "
+ self.morse_message += symbol
+ else:
self.end_of_message = True
self.previous_ring_time = -1
self.previous_call_time = -1
- return
-
- # if the code made it up to here, something fishy is going on
- logging.error("Unexpected distance: %.2f", call_distance)
- logging.error("Check the transmitter and receiver parameters")
def receive_loop(self):
while not self.end_of_message:
def get_text(self):
return self.text_message
- def transmit_symbol(self, symbol):
- if symbol == ".":
- sleep_time = self.dot_time.dist
- elif symbol == "-":
- sleep_time = self.dash_time.dist
- elif symbol == " ":
- sleep_time = self.signalspace_time.dist
- elif symbol == "/":
- sleep_time = self.wordspace_time.dist
- elif symbol == "EOM":
- sleep_time = self.eom_time.dist
- elif symbol is None:
- # To terminate the transmission just call and hangup, with no extra
- # distance
- sleep_time = 0
-
+ def transmit_symbol(self, symbol, sleep_time):
logging.info("Dial and wait %.2f = %.2f + %.2f seconds "
"(transmitting '%s')",
self.call_setup_time_max + sleep_time,
self.modem.get_response()
time.sleep(sleep_time)
- def transmit_signal(self, signal):
- logging.debug("Transmitting signal: %s", signal)
- for symbol in signal:
- self.transmit_symbol(symbol)
-
def transmit(self, message, destination_number):
self.destination_number = destination_number
morse_message = self.translator.text_to_morse(message)
- signals = morse_message.split()
+ distances = self.modulator.modulate(morse_message)
logging.debug("Starting the transmission")
- for i, signal in enumerate(signals):
- logging.debug("Transmitting '%s' as '%s'", message[i], signal)
- self.transmit_signal(signal)
-
- # Transmit a signal separator only when strictly necessary:
- # - after the last symbol, we are going to transmit an EOM
- # anyway, and that will mark the end of the last symbol.
- # - between words the word separator act as a symbol separator
- # too.
- if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/":
- self.transmit_symbol(" ")
+ for i, distance in enumerate(distances):
+ # Use 'None' for the last call
+ if i == len(distances) - 1:
+ symbol = None
+ else:
+ total_sleep_time = self.call_setup_time_max + distance
+ symbol = self.modulator.distance_to_symbol(total_sleep_time)
- self.transmit_symbol("EOM")
-
- # Since the Morse signals are encoded in the distance between calls, an
- # extra call is needed in order for receiver actually get the EOM and
- # see that the transmission has terminated.
- self.transmit_symbol(None)
+ self.transmit_symbol(symbol, distance)
def estimate_transmit_duration(self, message):
morsemessage = self.translator.text_to_morse(message)
- signals = morsemessage.split()
+ logging.debug(morsemessage)
- logging.debug(signals)
+ distances = self.modulator.modulate(morsemessage)
transmitting_time = 0
- for i, signal in enumerate(signals):
- logging.debug("signal: %s", signal)
-
- for symbol in signal:
- transmitting_time += self.call_setup_time_max
- if symbol == ".":
- transmitting_time += self.dot_time.dist
- elif symbol == "-":
- transmitting_time += self.dash_time.dist
- elif symbol == "/":
- transmitting_time += self.wordspace_time.dist
-
- if i != len(signals) - 1 and signals[i + 1] != "/" and symbol != "/":
- transmitting_time += self.call_setup_time_max
- transmitting_time += self.signalspace_time.dist
-
- transmitting_time += self.call_setup_time_max
- transmitting_time += self.eom_time.dist
-
- # The final call needed for the receiver to get the EOM
- transmitting_time += self.call_setup_time_max
+ for distance in distances:
+ transmitting_time += self.call_setup_time_max
+ transmitting_time += distance
logging.debug("Estimated transmitting time: %.2f seconds",
transmitting_time)
def test_send_receive():
logging.basicConfig(level=logging.DEBUG)
- call_setup_time = 2
- call_setup_uncertainty = 0.4
- ring_time = 1
- ring_uncertainty = 0.3
+ call_setup_time_min = 0
+ call_setup_time_max = 0.01
+ ring_time_min = 0
+ ring_time_max = 0
+
+ import random
class DummyModem(object):
- """Always receive a '.' and then a '/', which result in 'E '."""
+ """Always receive a '.', a '/' and then EOM, which results in 'E '."""
def __init__(self):
self.ring_count = 0
+ # Take trasmission times from a transceiver
+ self.transceiver = None
+
+ random.seed(None)
+
def send_command(self, command):
pass
- def get_response(self, response):
- if self.ring_count % 2:
+ def get_response(self, response=None):
+ # pylint: disable=unused-argument
+
+ setup_time = random.uniform(self.transceiver.modulator.period_min,
+ self.transceiver.modulator.period_max)
+
+ if self.ring_count == 0:
+ # dummy ring
+ pass
+ elif self.ring_count == 1:
# received a '.'
- time.sleep(call_setup_time + (ring_time + ring_uncertainty))
- else:
+ dot_time = self.transceiver.modulator.dot_time.dist
+ time.sleep(setup_time + dot_time)
+ elif self.ring_count == 2:
# received a '/'
- time.sleep(call_setup_time + (ring_time + ring_uncertainty) * 4)
+ wordspace_time = self.transceiver.modulator.wordspace_time.dist
+ time.sleep(setup_time + wordspace_time)
+ else:
+ # received an 'EOM'
+ eom_time = self.transceiver.modulator.eom_time.dist
+ time.sleep(setup_time + eom_time)
self.ring_count += 1
+ self.ring_count %= 4
+
+ modem = DummyModem()
+
+ xcv = CallDistanceTransceiver(modem,
+ call_setup_time_min, call_setup_time_max,
+ ring_time_min, ring_time_max, True)
+
+ modem.transceiver = xcv
+
+ xcv.transmit("CODEX PARIS", "0")
- xcv = CallDistanceTransceiver(DummyModem(), call_setup_time,
- call_setup_uncertainty, ring_time,
- ring_uncertainty)
- xcv.receive_loop()
+ while True:
+ xcv.receive_loop()
+ modem.ring_count = 0
+ print()
+ print("Message received!")
+ print("\"%s\"" % xcv.get_text(), flush=True)
if __name__ == "__main__":