import time
+class SymbolTime(object):
+ """
+ In theory the symbol distance (the distance to discriminate symbols) can
+ be arbitrary, and it's only bounded below by the stability of the pulse
+ time, but in practice it can be necessary to wait a predefined minimum
+ amount of time between pulses because of technological limits of the
+ transmitting devices, we call this time the "minimum inter-symbol
+ distance".
+ """
+
+ # pylint: disable=too-few-public-methods
+ def __init__(self, pulse_min, pulse_max, multiplier,
+ min_inter_symbol_distance=0.0):
+ assert multiplier >= 0
+ if (pulse_min == pulse_max) and (min_inter_symbol_distance == 0):
+ raise ValueError("If (pulse_min == pulse_max) a non-zero",
+ "inter-symbol distance MUST be specified")
+
+ symbol_distance = 2 * (pulse_max - pulse_min)
+ if symbol_distance == 0:
+ symbol_distance = min_inter_symbol_distance
+
+ # The time the transmitter has to wait to disambiguate between this
+ # symbol and a different one.
+ self.dist = min_inter_symbol_distance + symbol_distance * multiplier
+
+ # The minimum time which represents the symbol at the receiving end.
+ self.min = min_inter_symbol_distance + pulse_min + \
+ symbol_distance * multiplier
+
+ # The maximum time which represents the symbol at the receiving end
+ self.max = min_inter_symbol_distance + pulse_min + \
+ symbol_distance * (multiplier + 1)
+
+
class CallDistanceTransceiver(object):
"""Transmit Morse messages using the distance between calls.
fine.
Plus, supporting communications between analog modems is cool :)
-
"""
def __init__(self, modem,
- call_setup_average=8.5, call_setup_uncertainty=1.5,
- ring_distance_average=5, ring_uncertainty=0.2):
+ call_setup_time_min=7, call_setup_time_max=15,
+ ring_time_min=4.8, ring_time_max=5.2,
+ add_inter_call_distance=True):
"""Encode the Morse symbols using the distance between calls.
Args:
- call_setup_average: the time between when the transmitter dials
- the number and the receiver reports RING notifications. This is
- needed in order to be sure than the receiver has received
- _at_least_ one ring. The default chosen here should work fine
- even with old analog modems, provided that there is some more
- distance between two calls.
- call_setup_uncertainty: uncertainty of call_setup_average,
- a tolerance value
- ring_distance_average: the time between two consecutive RINGs
+
+ call_setup_time_min: the minimum time between when the transmitter
+ dials the number and the receiver reports RING notifications.
+ call_setup_time_max: the maximum time between when the transmitter
+ dials the number and the receiver reports RING notifications.
+ Waiting this time after dialing ensures that the receiver has
+ received _at_least_ one ring. The default chosen here have been
+ tested to work fine with old analog modems, provided that there
+ is some more distance between two calls, and with Android
+ receivers.
+ ring_time_min: the minimum time between two consecutive RINGs
+ in the same call.
+ ring_time_max: the maximum time between two consecutive RINGs
in the same call, the standard interval is about 5 seconds, but
- some tolerance is needed to account for line delays.
- This is needed in order to ignore rings in the same call, and
- differentiate two different calls.
- ring_uncertainty: uncertainty of ring_distance_average, a tolerance
- value
- """
+ line and/or software delays can make it vary. This is needed
+ in order to ignore multiple ring in the same call when multiple
+ RING in the same call are notified, and then be able to
+ discriminate between two different calls.
+ add_inter_call_distance: specify if it is needed to wait an extra
+ fixed time between calls.
+ """
self.modem = modem
self.translator = MorseTranslator()
self.destination_number = ""
- self.call_setup_time = call_setup_average + call_setup_uncertainty
- self.rings_distance = ring_distance_average + ring_uncertainty
-
- # In theory the symbol distance, the distance between calls which
- # represent symbols, can be arbitrary, but in practice it's better to
- # wait at least the duration of a ring between terminating one call and
- # initiating the next call, as pick-up and hang-up can take some time
- # with old analog modems.
- symbol_distance = self.rings_distance
-
- def symbol_time(multiplier):
- return self.call_setup_time + symbol_distance * multiplier
-
- self.dot_time = symbol_time(1)
- self.dash_time = symbol_time(2)
- self.signalspace_time = symbol_time(3)
- self.wordspace_time = symbol_time(4)
- self.eom_time = symbol_time(5)
-
- self.ring_uncertainty = ring_uncertainty
- self.symbol_uncertainty = symbol_distance / 2.
-
- logging.debug("dot time: transmit: %.2f receive: (%.2f, %.2f)",
- self.dot_time,
- (self.dot_time - self.symbol_uncertainty),
- (self.dot_time + self.symbol_uncertainty))
- logging.debug("dash time: transmit: %.2f receive: (%.2f, %.2f)",
- self.dash_time,
- (self.dash_time - self.symbol_uncertainty),
- (self.dash_time + self.symbol_uncertainty))
- logging.debug("signalspace time: transmit: %.2f receive: (%.2f, %.2f)",
- self.signalspace_time,
- (self.signalspace_time - self.symbol_uncertainty),
- (self.signalspace_time + self.symbol_uncertainty))
- logging.debug("wordspace time: transmit: %.2f receive: (%.2f, %.2f)",
- self.wordspace_time,
- (self.wordspace_time - self.symbol_uncertainty),
- (self.wordspace_time + self.symbol_uncertainty))
- logging.debug("EOM time: transmit: %.2f receive: (%.2f, +inf)",
- self.eom_time,
- (self.eom_time - self.symbol_uncertainty))
+ self.call_setup_time_min = call_setup_time_min
+ self.call_setup_time_max = call_setup_time_max
+
+ self.ring_time_min = ring_time_min
+ self.ring_time_max = ring_time_max
+
+ if add_inter_call_distance:
+ # Analog modems don't like to dial a new call immediately after
+ # they hung up the previous one; an extra delay of about one
+ # ring time makes them happy.
+ inter_symbol_distance = ring_time_max
+ else:
+ inter_symbol_distance = 0
+
+ self.dot_time = SymbolTime(call_setup_time_min,
+ call_setup_time_max, 0,
+ inter_symbol_distance)
+ self.dash_time = SymbolTime(call_setup_time_min,
+ call_setup_time_max, 1,
+ inter_symbol_distance)
+ self.signalspace_time = SymbolTime(call_setup_time_min,
+ call_setup_time_max, 2,
+ inter_symbol_distance)
+ self.wordspace_time = SymbolTime(call_setup_time_min,
+ call_setup_time_max, 3,
+ inter_symbol_distance)
+ self.eom_time = SymbolTime(call_setup_time_min,
+ call_setup_time_max, 4,
+ inter_symbol_distance)
+
+ logging.debug("call setup time between %.2f and %.2f "
+ "--------- dot transmit time: %.2f + %.2f "
+ "receive time: between %.2f and %.2f",
+ self.call_setup_time_min,
+ self.call_setup_time_max,
+ self.call_setup_time_max,
+ self.dot_time.dist,
+ self.dot_time.min,
+ self.dot_time.max)
+ logging.debug("call setup time between %.2f and %.2f "
+ "-------- dash transmit time: %.2f + %.2f "
+ "receive time: between %.2f and %.2f",
+ self.call_setup_time_min,
+ self.call_setup_time_max,
+ self.call_setup_time_max,
+ self.dash_time.dist,
+ self.dash_time.min,
+ self.dash_time.max)
+ logging.debug("call setup time between %.2f and %.2f "
+ "- signalspace transmit time: %.2f + %.2f "
+ "receive time: between %.2f and %.2f",
+ self.call_setup_time_min,
+ self.call_setup_time_max,
+ self.call_setup_time_max,
+ self.signalspace_time.dist,
+ self.signalspace_time.min,
+ self.signalspace_time.max)
+ logging.debug("call setup time between %.2f and %.2f "
+ "--- wordspace transmit time: %.2f + %.2f "
+ "receive time: between %.2f and %.2f",
+ self.call_setup_time_min,
+ self.call_setup_time_max,
+ self.call_setup_time_max,
+ self.wordspace_time.dist,
+ self.wordspace_time.min,
+ self.wordspace_time.max)
+ logging.debug("call setup time between %.2f and %.2f "
+ "--------- EOM transmit time: %.2f + %.2f "
+ "receive time: between %.2f and inf",
+ self.call_setup_time_min,
+ self.call_setup_time_max,
+ self.call_setup_time_max,
+ self.eom_time.dist,
+ self.eom_time.min)
self.previous_ring_time = -1
self.previous_call_time = -1
self.end_of_message = False
def log_symbol(self, distance, symbol, extra_info=""):
- logging.info("distance: %.2f Received \"%s\" %s", distance, symbol,
+ logging.info("distance: %.2f Received \"%s\"%s", distance, symbol,
extra_info)
def receive_character(self):
self.previous_ring_time = current_ring_time
# Ignore multiple rings in the same call
- if abs(ring_distance - self.rings_distance) < self.ring_uncertainty:
+ if ring_distance > self.ring_time_min and \
+ ring_distance <= self.ring_time_max:
logging.debug("multiple rings in the same call, distance: %.2f",
ring_distance)
return
call_distance = current_ring_time - self.previous_call_time
self.previous_call_time = current_ring_time
- if abs(call_distance - self.dot_time) < self.symbol_uncertainty:
+ if call_distance > self.dot_time.min and \
+ call_distance <= self.dot_time.max:
self.log_symbol(call_distance, '.')
self.morse_message += "."
return
- if abs(call_distance - self.dash_time) < self.symbol_uncertainty:
+ if call_distance > self.dash_time.min and \
+ call_distance <= self.dash_time.max:
self.log_symbol(call_distance, '-')
self.morse_message += "-"
return
- if abs(call_distance - self.signalspace_time) < self.symbol_uncertainty:
+ if call_distance > self.signalspace_time.min and \
+ call_distance <= self.signalspace_time.max:
signal = self.morse_message.strip().split(' ')[-1]
character = self.translator.signal_to_character(signal)
- self.log_symbol(call_distance, ' ', "got \"%s\"" % character)
+ self.log_symbol(call_distance, ' ', " got \"%s\"" % character)
self.morse_message += " "
return
- if abs(call_distance - self.wordspace_time) < self.symbol_uncertainty:
+ if call_distance > self.wordspace_time.min and \
+ call_distance <= self.wordspace_time.max:
signal = self.morse_message.strip().split(' ')[-1]
character = self.translator.signal_to_character(signal)
- self.log_symbol(call_distance, '/', "got \"%s\"" % character)
+ self.log_symbol(call_distance, '/', " got \"%s\"" % character)
self.morse_message += " / "
return
- if call_distance >= self.eom_time - self.symbol_uncertainty:
+ if call_distance > self.eom_time.min:
signal = self.morse_message.strip().split(' ')[-1]
character = self.translator.signal_to_character(signal)
- self.log_symbol(call_distance, 'EOM', "got \"%s\"" % character)
+ self.log_symbol(call_distance, 'EOM', " got \"%s\"" % character)
self.end_of_message = True
self.previous_ring_time = -1
self.previous_call_time = -1
return
# if the code made it up to here, something fishy is going on
- logging.error("Unexpected distance: %.2f", ring_distance)
+ logging.error("Unexpected distance: %.2f", call_distance)
logging.error("Check the transmitter and receiver parameters")
def receive_loop(self):
def transmit_symbol(self, symbol):
if symbol == ".":
- sleep_time = self.dot_time
+ sleep_time = self.dot_time.dist
elif symbol == "-":
- sleep_time = self.dash_time
+ sleep_time = self.dash_time.dist
elif symbol == " ":
- sleep_time = self.signalspace_time
+ sleep_time = self.signalspace_time.dist
elif symbol == "/":
- sleep_time = self.wordspace_time
+ sleep_time = self.wordspace_time.dist
elif symbol == "EOM":
- sleep_time = self.eom_time
+ sleep_time = self.eom_time.dist
elif symbol is None:
# To terminate the transmission just call and hangup, with no extra
# distance
- sleep_time = self.call_setup_time
-
- logging.info("Dial and wait %.2f seconds (transmitting '%s')",
- sleep_time, symbol)
-
- # Dial, then wait self.call_setup_time to make sure the receiver gets
- # at least one RING, and then hangup and sleep the remaining time
+ sleep_time = 0
+
+ logging.info("Dial and wait %.2f = %.2f + %.2f seconds "
+ "(transmitting '%s')",
+ self.call_setup_time_max + sleep_time,
+ self.call_setup_time_max,
+ sleep_time,
+ symbol)
+
+ # Dial, then wait self.call_setup_time_max to make sure the receiver
+ # gets at least one RING, and then hangup and sleep the time needed to
+ # transmit a symbol.
self.modem.send_command("ATDT" + self.destination_number + ";")
- time.sleep(self.call_setup_time)
+ time.sleep(self.call_setup_time_max)
self.modem.send_command("ATH")
self.modem.get_response()
- time.sleep(sleep_time - self.call_setup_time)
+ time.sleep(sleep_time)
def transmit_signal(self, signal):
logging.debug("Transmitting signal: %s", signal)
morse_message = self.translator.text_to_morse(message)
signals = morse_message.split()
- print(signals)
logging.debug("Starting the transmission")
for i, signal in enumerate(signals):
logging.debug("signal: %s", signal)
for symbol in signal:
+ transmitting_time += self.call_setup_time_max
if symbol == ".":
- transmitting_time += self.dot_time
+ transmitting_time += self.dot_time.dist
elif symbol == "-":
- transmitting_time += self.dash_time
+ transmitting_time += self.dash_time.dist
elif symbol == "/":
- transmitting_time += self.wordspace_time
+ transmitting_time += self.wordspace_time.dist
if i != len(signals) - 1 and signals[i + 1] != "/" and symbol != "/":
- transmitting_time += self.signalspace_time
+ transmitting_time += self.call_setup_time_max
+ transmitting_time += self.signalspace_time.dist
+
+ transmitting_time += self.call_setup_time_max
+ transmitting_time += self.eom_time.dist
- transmitting_time += self.eom_time
+ # The final call needed for the receiver to get the EOM
+ transmitting_time += self.call_setup_time_max
- logging.debug("Estimated transmitting time: %d seconds",
+ logging.debug("Estimated transmitting time: %.2f seconds",
transmitting_time)
def test_send_receive():
logging.basicConfig(level=logging.DEBUG)
- call_setup_time = 2
- call_setup_uncertainty = 0.4
- ring_time = 1
- ring_uncertainty = 0.3
+ call_setup_time_min = 0
+ call_setup_time_max = 0.01
+ ring_time_min = 0
+ ring_time_max = 0
+
+ import random
class DummyModem(object):
- """Always receive a '.' and then a '/', which result in 'E '."""
+ """Always receive a '.', a '/' and then EOM, which results in 'E '."""
def __init__(self):
self.ring_count = 0
+ # Take trasmission times from a transceiver
+ self.transceiver = None
+
+ random.seed(None)
+
def send_command(self, command):
pass
def get_response(self, response):
- if self.ring_count % 2:
+ # pylint: disable=unused-argument
+
+ setup_time = random.uniform(self.transceiver.call_setup_time_min,
+ self.transceiver.call_setup_time_max)
+
+ if self.ring_count == 0:
+ # dummy ring
+ pass
+ elif self.ring_count == 1:
# received a '.'
- time.sleep(call_setup_time + (ring_time + ring_uncertainty))
- else:
+ time.sleep(setup_time + self.transceiver.dot_time.dist)
+ elif self.ring_count == 2:
# received a '/'
- time.sleep(call_setup_time + (ring_time + ring_uncertainty) * 4)
+ time.sleep(setup_time + self.transceiver.wordspace_time.dist)
+ else:
+ # received an 'EOM'
+ time.sleep(setup_time + self.transceiver.eom_time.dist)
self.ring_count += 1
+ self.ring_count %= 4
+
+ modem = DummyModem()
+
+ xcv = CallDistanceTransceiver(modem,
+ call_setup_time_min, call_setup_time_max,
+ ring_time_min, ring_time_max, True)
+
+ modem.transceiver = xcv
- xcv = CallDistanceTransceiver(DummyModem(), call_setup_time,
- call_setup_uncertainty, ring_time,
- ring_uncertainty)
- xcv.receive_loop()
+ while True:
+ xcv.receive_loop()
+ modem.ring_count = 0
+ print()
+ print("Message received!")
+ print("\"%s\"" % xcv.get_text(), flush=True)
if __name__ == "__main__":