X-Git-Url: https://git.ao2.it/SaveMySugar/python3-savemysugar.git/blobdiff_plain/fb6bc3a9f2e122a7c45c7188f1e0df13a810e420..a7908afd8b3ddc42aa74ab6c20ba8d8a20ea4b9a:/src/savemysugar/CallDistanceTransceiver.py diff --git a/src/savemysugar/CallDistanceTransceiver.py b/src/savemysugar/CallDistanceTransceiver.py index c251673..bfae0b6 100755 --- a/src/savemysugar/CallDistanceTransceiver.py +++ b/src/savemysugar/CallDistanceTransceiver.py @@ -29,6 +29,138 @@ import logging import time +class SymbolTime(object): + """ + In theory the symbol distance (the distance to discriminate symbols) can + be arbitrary, and it's only bounded below by the stability of the period + time, but in practice it can be necessary to wait a predefined minimum + amount of time between periods because of technological limits of the + transmitting devices, we call this time the "minimum inter-symbol + distance". + """ + + # pylint: disable=too-few-public-methods + def __init__(self, period_min, period_max, multiplier, + min_inter_symbol_distance=0.0): + assert multiplier >= 0 + if (period_min == period_max) and (min_inter_symbol_distance == 0): + raise ValueError("If (period_min == period_max) a non-zero", + "inter-symbol distance MUST be specified") + + symbol_distance = 2 * (period_max - period_min) + if symbol_distance == 0: + symbol_distance = min_inter_symbol_distance + + # The time the transmitter has to wait to disambiguate between this + # symbol and a different one. + self.dist = min_inter_symbol_distance + symbol_distance * multiplier + + # The minimum time which represents the symbol at the receiving end. + self.min = min_inter_symbol_distance + period_min + \ + symbol_distance * multiplier + + # The maximum time which represents the symbol at the receiving end + self.max = min_inter_symbol_distance + period_min + \ + symbol_distance * (multiplier + 1) + + +class MorseDistanceModulator(object): + def __init__(self, period_min, period_max, pulse_min, pulse_max, + inter_symbol_distance): + self.set_parameters(period_min, period_max, + pulse_min, pulse_max, + inter_symbol_distance) + + def set_parameters(self, period_min, period_max, pulse_min, pulse_max, + inter_symbol_distance): + self.period_min = period_min + self.period_max = period_max + self.pulse_min = pulse_min + self.pulse_max = pulse_max + self.inter_symbol_distance = inter_symbol_distance + + self.dot_time = SymbolTime(period_min, + period_max, 0, + inter_symbol_distance) + self.dash_time = SymbolTime(period_min, + period_max, 1, + inter_symbol_distance) + self.signalspace_time = SymbolTime(period_min, + period_max, 2, + inter_symbol_distance) + self.wordspace_time = SymbolTime(period_min, + period_max, 3, + inter_symbol_distance) + self.eom_time = SymbolTime(period_min, + period_max, 4, + inter_symbol_distance) + + def symbol_to_distance(self, symbol): + if symbol == ".": + return self.dot_time.dist + elif symbol == "-": + return self.dash_time.dist + elif symbol == " ": + return self.signalspace_time.dist + elif symbol == "/" or symbol == " / ": + return self.wordspace_time.dist + elif symbol == "EOM": + return self.eom_time.dist + + raise ValueError("Unexpected symbol %s" % symbol) + + def is_same_period(self, distance): + return distance > self.pulse_min and distance <= self.pulse_max + + def distance_to_symbol(self, distance): + if distance > self.dot_time.min and \ + distance <= self.dot_time.max: + return "." + + if distance > self.dash_time.min and \ + distance <= self.dash_time.max: + return "-" + + if distance > self.signalspace_time.min and \ + distance <= self.signalspace_time.max: + return " " + + if distance > self.wordspace_time.min and \ + distance <= self.wordspace_time.max: + return "/" + + if distance > self.eom_time.min: + return "EOM" + + raise ValueError("Unexpected distance %.2f" % distance) + + def modulate(self, morse): + signals = morse.split(' ') + distances = [] + for i, signal in enumerate(signals): + for symbol in signal: + distances.append(self.symbol_to_distance(symbol)) + + # Transmit a signal separator only when strictly necessary. + # + # Avoid it in these cases: + # - after the last symbol, because EOM is going to ne transmitted + # anyway and that will mark the end of the last symbol. + # - between words, because the word separator act as a symbol + # separator too. + if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/": + distances.append(self.symbol_to_distance(" ")) + + distances.append(self.symbol_to_distance("EOM")) + + # Since the Morse signals are encoded in the distance between calls, an + # extra call is needed in order for receiver actually get the EOM and + # see that the transmission has terminated. + distances.append(0) + + return distances + + class CallDistanceTransceiver(object): """Transmit Morse messages using the distance between calls. @@ -50,78 +182,102 @@ class CallDistanceTransceiver(object): fine. Plus, supporting communications between analog modems is cool :) - """ def __init__(self, modem, - call_setup_average=8.5, call_setup_uncertainty=1.5, - ring_distance_average=5, ring_uncertainty=0.2): + call_setup_time_min=7, call_setup_time_max=15, + ring_time_min=4.8, ring_time_max=5.2, + add_inter_call_distance=True): """Encode the Morse symbols using the distance between calls. Args: - call_setup_average: the time between when the transmitter dials - the number and the receiver reports RING notifications. This is - needed in order to be sure than the receiver has received - _at_least_ one ring. The default chosen here should work fine - even with old analog modems, provided that there is some more - distance between two calls. - call_setup_uncertainty: uncertainty of call_setup_average, - a tolerance value - ring_distance_average: the time between two consecutive RINGs + + call_setup_time_min: the minimum time between when the transmitter + dials the number and the receiver reports RING notifications. + call_setup_time_max: the maximum time between when the transmitter + dials the number and the receiver reports RING notifications. + Waiting this time after dialing ensures that the receiver has + received _at_least_ one ring. The default chosen here have been + tested to work fine with old analog modems, provided that there + is some more distance between two calls, and with Android + receivers. + ring_time_min: the minimum time between two consecutive RINGs + in the same call. + ring_time_max: the maximum time between two consecutive RINGs in the same call, the standard interval is about 5 seconds, but - some tolerance is needed to account for line delays. - This is needed in order to ignore rings in the same call, and - differentiate two different calls. - ring_uncertainty: uncertainty of ring_distance_average, a tolerance - value - """ + line and/or software delays can make it vary. This is needed + in order to ignore multiple ring in the same call when multiple + RING in the same call are notified, and then be able to + discriminate between two different calls. + add_inter_call_distance: specify if it is needed to wait an extra + fixed time between calls. + """ self.modem = modem self.translator = MorseTranslator() self.destination_number = "" - self.call_setup_time = call_setup_average + call_setup_uncertainty - self.rings_distance = ring_distance_average + ring_uncertainty - - # In theory the symbol distance, the distance between calls which - # represent symbols, can be arbitrary, but in practice it's better to - # wait at least the duration of a ring between terminating one call and - # initiating the next call, as pick-up and hang-up can take some time - # with old analog modems. - symbol_distance = self.rings_distance - - def symbol_time(multiplier): - return self.call_setup_time + symbol_distance * multiplier - - self.dot_time = symbol_time(1) - self.dash_time = symbol_time(2) - self.signalspace_time = symbol_time(3) - self.wordspace_time = symbol_time(4) - self.eom_time = symbol_time(5) - - self.ring_uncertainty = ring_uncertainty - self.symbol_uncertainty = symbol_distance / 2. - - logging.debug("dot time: transmit: %.2f receive: (%.2f, %.2f)", - self.dot_time, - (self.dot_time - self.symbol_uncertainty), - (self.dot_time + self.symbol_uncertainty)) - logging.debug("dash time: transmit: %.2f receive: (%.2f, %.2f)", - self.dash_time, - (self.dash_time - self.symbol_uncertainty), - (self.dash_time + self.symbol_uncertainty)) - logging.debug("signalspace time: transmit: %.2f receive: (%.2f, %.2f)", - self.signalspace_time, - (self.signalspace_time - self.symbol_uncertainty), - (self.signalspace_time + self.symbol_uncertainty)) - logging.debug("wordspace time: transmit: %.2f receive: (%.2f, %.2f)", - self.wordspace_time, - (self.wordspace_time - self.symbol_uncertainty), - (self.wordspace_time + self.symbol_uncertainty)) - logging.debug("EOM time: transmit: %.2f receive: (%.2f, +inf)", - self.eom_time, - (self.eom_time - self.symbol_uncertainty)) + self.call_setup_time_max = call_setup_time_max + + if add_inter_call_distance: + # Analog modems don't like to dial a new call immediately after + # they hung up the previous one; an extra delay of about one + # ring time makes them happy. + inter_symbol_distance = ring_time_max + else: + inter_symbol_distance = 0 + + self.modulator = MorseDistanceModulator(call_setup_time_min, + call_setup_time_max, + ring_time_min, + ring_time_max, + inter_symbol_distance) + + logging.debug("call setup time between %.2f and %.2f " + "--------- dot transmit time: %.2f + %.2f " + "receive time: between %.2f and %.2f", + self.modulator.period_min, + self.modulator.period_max, + self.modulator.period_max, + self.modulator.dot_time.dist, + self.modulator.dot_time.min, + self.modulator.dot_time.max) + logging.debug("call setup time between %.2f and %.2f " + "-------- dash transmit time: %.2f + %.2f " + "receive time: between %.2f and %.2f", + self.modulator.period_min, + self.modulator.period_max, + self.modulator.period_max, + self.modulator.dash_time.dist, + self.modulator.dash_time.min, + self.modulator.dash_time.max) + logging.debug("call setup time between %.2f and %.2f " + "- signalspace transmit time: %.2f + %.2f " + "receive time: between %.2f and %.2f", + self.modulator.period_min, + self.modulator.period_max, + self.modulator.period_max, + self.modulator.signalspace_time.dist, + self.modulator.signalspace_time.min, + self.modulator.signalspace_time.max) + logging.debug("call setup time between %.2f and %.2f " + "--- wordspace transmit time: %.2f + %.2f " + "receive time: between %.2f and %.2f", + self.modulator.period_min, + self.modulator.period_max, + self.modulator.period_max, + self.modulator.wordspace_time.dist, + self.modulator.wordspace_time.min, + self.modulator.wordspace_time.max) + logging.debug("call setup time between %.2f and %.2f " + "--------- EOM transmit time: %.2f + %.2f " + "receive time: between %.2f and inf", + self.modulator.period_min, + self.modulator.period_max, + self.modulator.period_max, + self.modulator.eom_time.dist, + self.modulator.eom_time.min) self.previous_ring_time = -1 self.previous_call_time = -1 @@ -132,7 +288,7 @@ class CallDistanceTransceiver(object): self.end_of_message = False def log_symbol(self, distance, symbol, extra_info=""): - logging.info("distance: %.2f Received \"%s\" %s", distance, symbol, + logging.info("distance: %.2f Received \"%s\"%s", distance, symbol, extra_info) def receive_character(self): @@ -149,7 +305,7 @@ class CallDistanceTransceiver(object): self.previous_ring_time = current_ring_time # Ignore multiple rings in the same call - if abs(ring_distance - self.rings_distance) < self.ring_uncertainty: + if self.modulator.is_same_period(ring_distance): logging.debug("multiple rings in the same call, distance: %.2f", ring_distance) return @@ -157,42 +313,31 @@ class CallDistanceTransceiver(object): call_distance = current_ring_time - self.previous_call_time self.previous_call_time = current_ring_time - if abs(call_distance - self.dot_time) < self.symbol_uncertainty: - self.log_symbol(call_distance, '.') - self.morse_message += "." - return - - if abs(call_distance - self.dash_time) < self.symbol_uncertainty: - self.log_symbol(call_distance, '-') - self.morse_message += "-" + try: + symbol = self.modulator.distance_to_symbol(call_distance) + except ValueError as err: + logging.error("%s", err) + logging.error("Check the transmitter and receiver parameters") return - if abs(call_distance - self.signalspace_time) < self.symbol_uncertainty: + extra_info = "" + if symbol in [" ", "/", "EOM"]: signal = self.morse_message.strip().split(' ')[-1] character = self.translator.signal_to_character(signal) - self.log_symbol(call_distance, ' ', "got \"%s\"" % character) - self.morse_message += " " - return + extra_info = " got \"%s\"" % character - if abs(call_distance - self.wordspace_time) < self.symbol_uncertainty: - signal = self.morse_message.strip().split(' ')[-1] - character = self.translator.signal_to_character(signal) - self.log_symbol(call_distance, '/', "got \"%s\"" % character) - self.morse_message += " / " - return + self.log_symbol(call_distance, symbol, extra_info) - if call_distance >= self.eom_time - self.symbol_uncertainty: - signal = self.morse_message.strip().split(' ')[-1] - character = self.translator.signal_to_character(signal) - self.log_symbol(call_distance, 'EOM', "got \"%s\"" % character) + if symbol != "EOM": + # Add spaces around the wordspace symbol to make it easier to split + # the Morse message in symbols later on + if symbol == "/": + symbol = " / " + self.morse_message += symbol + else: self.end_of_message = True self.previous_ring_time = -1 self.previous_call_time = -1 - return - - # if the code made it up to here, something fishy is going on - logging.error("Unexpected distance: %.2f", ring_distance) - logging.error("Check the transmitter and receiver parameters") def receive_loop(self): while not self.end_of_message: @@ -207,122 +352,119 @@ class CallDistanceTransceiver(object): def get_text(self): return self.text_message - def transmit_symbol(self, symbol): - if symbol == ".": - sleep_time = self.dot_time - elif symbol == "-": - sleep_time = self.dash_time - elif symbol == " ": - sleep_time = self.signalspace_time - elif symbol == "/": - sleep_time = self.wordspace_time - elif symbol == "EOM": - sleep_time = self.eom_time - elif symbol is None: - # To terminate the transmission just call and hangup, with no extra - # distance - sleep_time = self.call_setup_time - - logging.info("Dial and wait %.2f seconds (transmitting '%s')", - sleep_time, symbol) - - # Dial, then wait self.call_setup_time to make sure the receiver gets - # at least one RING, and then hangup and sleep the remaining time - self.modem.send_command("ATDT" + self.destination_number) - time.sleep(self.call_setup_time) + def transmit_symbol(self, symbol, sleep_time): + logging.info("Dial and wait %.2f = %.2f + %.2f seconds " + "(transmitting '%s')", + self.call_setup_time_max + sleep_time, + self.call_setup_time_max, + sleep_time, + symbol) + + # Dial, then wait self.call_setup_time_max to make sure the receiver + # gets at least one RING, and then hangup and sleep the time needed to + # transmit a symbol. + self.modem.send_command("ATDT" + self.destination_number + ";") + time.sleep(self.call_setup_time_max) self.modem.send_command("ATH") self.modem.get_response() - time.sleep(sleep_time - self.call_setup_time) - - def transmit_signal(self, signal): - logging.debug("Transmitting signal: %s", signal) - for symbol in signal: - self.transmit_symbol(symbol) + time.sleep(sleep_time) def transmit(self, message, destination_number): self.destination_number = destination_number morse_message = self.translator.text_to_morse(message) - signals = morse_message.split() - print(signals) + distances = self.modulator.modulate(morse_message) logging.debug("Starting the transmission") - for i, signal in enumerate(signals): - logging.debug("Transmitting '%s' as '%s'", message[i], signal) - self.transmit_signal(signal) - - # Transmit a signal separator only when strictly necessary: - # - after the last symbol, we are going to transmit an EOM - # anyway, and that will mark the end of the last symbol. - # - between words the word separator act as a symbol separator - # too. - if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/": - self.transmit_symbol(" ") - - self.transmit_symbol("EOM") + for i, distance in enumerate(distances): + # Use 'None' for the last call + if i == len(distances) - 1: + symbol = None + else: + total_sleep_time = self.call_setup_time_max + distance + symbol = self.modulator.distance_to_symbol(total_sleep_time) - # Since the Morse signals are encoded in the distance between calls, an - # extra call is needed in order for receiver actually get the EOM and - # see that the transmission has terminated. - self.transmit_symbol(None) + self.transmit_symbol(symbol, distance) def estimate_transmit_duration(self, message): morsemessage = self.translator.text_to_morse(message) - signals = morsemessage.split() + logging.debug(morsemessage) - logging.debug(signals) + distances = self.modulator.modulate(morsemessage) transmitting_time = 0 - for i, signal in enumerate(signals): - logging.debug("signal: %s", signal) - - for symbol in signal: - if symbol == ".": - transmitting_time += self.dot_time - elif symbol == "-": - transmitting_time += self.dash_time - elif symbol == "/": - transmitting_time += self.wordspace_time - - if i != len(signals) - 1 and signals[i + 1] != "/" and symbol != "/": - transmitting_time += self.signalspace_time + for distance in distances: + transmitting_time += self.call_setup_time_max + transmitting_time += distance - transmitting_time += self.eom_time - - logging.debug("Estimated transmitting time: %d seconds", + logging.debug("Estimated transmitting time: %.2f seconds", transmitting_time) def test_send_receive(): logging.basicConfig(level=logging.DEBUG) - call_setup_time = 2 - call_setup_uncertainty = 0.4 - ring_time = 1 - ring_uncertainty = 0.3 + call_setup_time_min = 0 + call_setup_time_max = 0.01 + ring_time_min = 0 + ring_time_max = 0 + + import random class DummyModem(object): - """Always receive a '.' and then a '/', which result in 'E '.""" + """Always receive a '.', a '/' and then EOM, which results in 'E '.""" def __init__(self): self.ring_count = 0 + # Take trasmission times from a transceiver + self.transceiver = None + + random.seed(None) + def send_command(self, command): pass - def get_response(self, response): - if self.ring_count % 2: + def get_response(self, response=None): + # pylint: disable=unused-argument + + setup_time = random.uniform(self.transceiver.modulator.period_min, + self.transceiver.modulator.period_max) + + if self.ring_count == 0: + # dummy ring + pass + elif self.ring_count == 1: # received a '.' - time.sleep(call_setup_time + (ring_time + ring_uncertainty)) - else: + dot_time = self.transceiver.modulator.dot_time.dist + time.sleep(setup_time + dot_time) + elif self.ring_count == 2: # received a '/' - time.sleep(call_setup_time + (ring_time + ring_uncertainty) * 4) + wordspace_time = self.transceiver.modulator.wordspace_time.dist + time.sleep(setup_time + wordspace_time) + else: + # received an 'EOM' + eom_time = self.transceiver.modulator.eom_time.dist + time.sleep(setup_time + eom_time) self.ring_count += 1 + self.ring_count %= 4 + + modem = DummyModem() + + xcv = CallDistanceTransceiver(modem, + call_setup_time_min, call_setup_time_max, + ring_time_min, ring_time_max, True) + + modem.transceiver = xcv + + xcv.transmit("CODEX PARIS", "0") - xcv = CallDistanceTransceiver(DummyModem(), call_setup_time, - call_setup_uncertainty, ring_time, - ring_uncertainty) - xcv.receive_loop() + while True: + modem.ring_count = 0 + xcv.receive_loop() + print() + print("Message received!") + print("\"%s\"" % xcv.get_text(), flush=True) if __name__ == "__main__":