X-Git-Url: https://git.ao2.it/SaveMySugar/python3-savemysugar.git/blobdiff_plain/78fd1e817fd08b35b4e04d3315374c9c9bc0d898..4ec1ed78e91a0a9049b6a4a9803b8bb954084dd7:/src/savemysugar/CallDistanceTransceiver.py diff --git a/src/savemysugar/CallDistanceTransceiver.py b/src/savemysugar/CallDistanceTransceiver.py index 588c985..15fcdae 100755 --- a/src/savemysugar/CallDistanceTransceiver.py +++ b/src/savemysugar/CallDistanceTransceiver.py @@ -18,147 +18,22 @@ # along with this program. If not, see . -# This hack allows MorseTranslator to be imported also when +# This hack allows importing local modules also when # __name__ == "__main__" try: + from .MorseDistanceModulator import MorseDistanceModulator from .MorseTranslator import MorseTranslator except SystemError: + from MorseDistanceModulator import MorseDistanceModulator from MorseTranslator import MorseTranslator import logging import time -class SymbolTime(object): - """ - In theory the symbol distance (the distance to discriminate symbols) can - be arbitrary, and it's only bounded below by the stability of the period - time, but in practice it can be necessary to wait a predefined minimum - amount of time between periods because of technological limits of the - transmitting devices, we call this time the "minimum inter-symbol - distance". - """ - - # pylint: disable=too-few-public-methods - def __init__(self, period_min, period_max, multiplier, - min_inter_symbol_distance=0.0): - assert multiplier >= 0 - if (period_min == period_max) and (min_inter_symbol_distance == 0): - raise ValueError("If (period_min == period_max) a non-zero", - "inter-symbol distance MUST be specified") - - symbol_distance = 2 * (period_max - period_min) - if symbol_distance == 0: - symbol_distance = min_inter_symbol_distance - - # The time the transmitter has to wait to disambiguate between this - # symbol and a different one. - self.dist = min_inter_symbol_distance + symbol_distance * multiplier - - # The minimum time which represents the symbol at the receiving end. - self.min = min_inter_symbol_distance + period_min + \ - symbol_distance * multiplier - - # The maximum time which represents the symbol at the receiving end - self.max = min_inter_symbol_distance + period_min + \ - symbol_distance * (multiplier + 1) - - -class MorseDistanceModulator(object): - def __init__(self, period_min, period_max, pulse_min, pulse_max, - inter_symbol_distance): - self.set_parameters(period_min, period_max, - pulse_min, pulse_max, - inter_symbol_distance) - - def set_parameters(self, period_min, period_max, pulse_min, pulse_max, - inter_symbol_distance): - self.period_min = period_min - self.period_max = period_max - self.pulse_min = pulse_min - self.pulse_max = pulse_max - self.inter_symbol_distance = inter_symbol_distance - - self.dot_time = SymbolTime(period_min, - period_max, 0, - inter_symbol_distance) - self.dash_time = SymbolTime(period_min, - period_max, 1, - inter_symbol_distance) - self.signalspace_time = SymbolTime(period_min, - period_max, 2, - inter_symbol_distance) - self.wordspace_time = SymbolTime(period_min, - period_max, 3, - inter_symbol_distance) - self.eom_time = SymbolTime(period_min, - period_max, 4, - inter_symbol_distance) - - def symbol_to_distance(self, symbol): - if symbol == ".": - return self.dot_time.dist - elif symbol == "-": - return self.dash_time.dist - elif symbol == " ": - return self.signalspace_time.dist - elif symbol == "/" or symbol == " / ": - return self.wordspace_time.dist - elif symbol == "EOM": - return self.eom_time.dist - - raise ValueError("Unexpected symbol %s" % symbol) - - def is_same_period(self, distance): - return distance > self.pulse_min and distance <= self.pulse_max - - def distance_to_symbol(self, distance): - if distance > self.dot_time.min and \ - distance <= self.dot_time.max: - return "." - - if distance > self.dash_time.min and \ - distance <= self.dash_time.max: - return "-" - - if distance > self.signalspace_time.min and \ - distance <= self.signalspace_time.max: - return " " - - if distance > self.wordspace_time.min and \ - distance <= self.wordspace_time.max: - return "/" - - if distance > self.eom_time.min: - return "EOM" - - raise ValueError("Unexpected distance %.2f" % distance) - - def modulate(self, morse): - signals = morse.split(' ') - distances = [] - for i, signal in enumerate(signals): - for symbol in signal: - distances.append(self.symbol_to_distance(symbol)) - - # Transmit a signal separator only when strictly necessary. - # - # Avoid it in these cases: - # - after the last symbol, because EOM is going to ne transmitted - # anyway and that will mark the end of the last symbol. - # - between words, because the word separator act as a symbol - # separator too. - if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/": - distances.append(self.symbol_to_distance(" ")) - - distances.append(self.symbol_to_distance("EOM")) - - # Since the Morse signals are encoded in the distance between calls, an - # extra call is needed in order for receiver actually get the EOM and - # see that the transmission has terminated. - distances.append(0) - - return distances +def log_symbol(distance, symbol, extra_info=""): + logging.info("distance: %.2f Received \"%s\"%s", distance, symbol, + extra_info) class CallDistanceTransceiver(object): @@ -185,7 +60,7 @@ class CallDistanceTransceiver(object): """ def __init__(self, modem, - call_setup_time_min=7, call_setup_time_max=15, + call_setup_time_min=7, call_setup_time_max=16.5, ring_time_min=4.8, ring_time_max=5.2, add_inter_call_distance=True): """Encode the Morse symbols using the distance between calls. @@ -216,8 +91,6 @@ class CallDistanceTransceiver(object): self.modem = modem self.translator = MorseTranslator() - self.destination_number = "" - self.call_setup_time_max = call_setup_time_max if add_inter_call_distance: @@ -287,17 +160,13 @@ class CallDistanceTransceiver(object): self.end_of_message = False - def log_symbol(self, distance, symbol, extra_info=""): - logging.info("distance: %.2f Received \"%s\"%s", distance, symbol, - extra_info) - - def receive_character(self): + def receive_symbol(self): current_ring_time = time.time() if self.previous_ring_time == -1: self.previous_ring_time = current_ring_time self.previous_call_time = current_ring_time - self.log_symbol(0, "", "(The very first ring)") + log_symbol(0, "", "(The very first ring)") return ring_distance = current_ring_time - self.previous_ring_time @@ -326,7 +195,7 @@ class CallDistanceTransceiver(object): character = self.translator.signal_to_character(signal) extra_info = " got \"%s\"" % character - self.log_symbol(call_distance, symbol, extra_info) + log_symbol(call_distance, symbol, extra_info) if symbol != "EOM": # Add spaces around the wordspace symbol to make it easier to split @@ -342,7 +211,7 @@ class CallDistanceTransceiver(object): def receive_loop(self): while not self.end_of_message: self.modem.get_response("RING") - self.receive_character() + self.receive_symbol() logging.debug("Current message: %s", self.morse_message) self.end_of_message = False @@ -352,7 +221,7 @@ class CallDistanceTransceiver(object): def get_text(self): return self.text_message - def transmit_symbol(self, symbol, sleep_time): + def transmit_symbol(self, destination_number, symbol, sleep_time): logging.info("Dial and wait %.2f = %.2f + %.2f seconds " "(transmitting '%s')", self.call_setup_time_max + sleep_time, @@ -363,15 +232,27 @@ class CallDistanceTransceiver(object): # Dial, then wait self.call_setup_time_max to make sure the receiver # gets at least one RING, and then hangup and sleep the time needed to # transmit a symbol. - self.modem.send_command("ATDT" + self.destination_number + ";") + time_before = time.time() + self.modem.send_command("ATDT" + destination_number + ";") time.sleep(self.call_setup_time_max) self.modem.send_command("ATH") self.modem.get_response() - time.sleep(sleep_time) + time_after = time.time() - def transmit(self, message, destination_number): - self.destination_number = destination_number + # Account for possible delays in order to be as adherent as + # possible to the nominal total symbol transmission distance. + delay = (time_after - time_before) - self.call_setup_time_max + logging.debug("Delay %.2f", delay) + remaining_sleep_time = sleep_time - delay + if remaining_sleep_time < 0: + remaining_sleep_time = 0 + + logging.debug("Should sleep %.2f. Will sleep %.2f", sleep_time, + remaining_sleep_time) + time.sleep(remaining_sleep_time) + + def transmit(self, message, destination_number): morse_message = self.translator.text_to_morse(message) distances = self.modulator.modulate(morse_message) @@ -384,7 +265,7 @@ class CallDistanceTransceiver(object): total_sleep_time = self.call_setup_time_max + distance symbol = self.modulator.distance_to_symbol(total_sleep_time) - self.transmit_symbol(symbol, distance) + self.transmit_symbol(destination_number, symbol, distance) def estimate_transmit_duration(self, message): morsemessage = self.translator.text_to_morse(message) @@ -401,7 +282,7 @@ class CallDistanceTransceiver(object): transmitting_time) -def test_send_receive(): +def test_transmit_receive(): logging.basicConfig(level=logging.DEBUG) call_setup_time_min = 0 call_setup_time_max = 0.01 @@ -424,7 +305,7 @@ def test_send_receive(): def send_command(self, command): pass - def get_response(self, response): + def get_response(self, response=None): # pylint: disable=unused-argument setup_time = random.uniform(self.transceiver.modulator.period_min, @@ -457,13 +338,15 @@ def test_send_receive(): modem.transceiver = xcv + xcv.transmit("CODEX PARIS", "0") + while True: - xcv.receive_loop() modem.ring_count = 0 + xcv.receive_loop() print() print("Message received!") print("\"%s\"" % xcv.get_text(), flush=True) if __name__ == "__main__": - test_send_receive() + test_transmit_receive()