3 # CallDistanceTransceiver - send and receive Morse using phone calls distance
5 # Copyright (C) 2015 Antonio Ospite <ao2@ao2.it>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 # This hack allows MorseTranslator to be imported also when
22 # __name__ == "__main__"
24 from .MorseTranslator import MorseTranslator
26 from MorseTranslator import MorseTranslator
32 class SymbolTime(object):
34 In theory the symbol distance (the distance to discriminate symbols) can
35 be arbitrary, and it's only bounded below by the stability of the period
36 time, but in practice it can be necessary to wait a predefined minimum
37 amount of time between periods because of technological limits of the
38 transmitting devices, we call this time the "minimum inter-symbol
42 # pylint: disable=too-few-public-methods
43 def __init__(self, period_min, period_max, multiplier,
44 min_inter_symbol_distance=0.0):
45 assert multiplier >= 0
46 if (period_min == period_max) and (min_inter_symbol_distance == 0):
47 raise ValueError("If (period_min == period_max) a non-zero",
48 "inter-symbol distance MUST be specified")
50 symbol_distance = 2 * (period_max - period_min)
51 if symbol_distance == 0:
52 symbol_distance = min_inter_symbol_distance
54 # The time the transmitter has to wait to disambiguate between this
55 # symbol and a different one.
56 self.dist = min_inter_symbol_distance + symbol_distance * multiplier
58 # The minimum time which represents the symbol at the receiving end.
59 self.min = min_inter_symbol_distance + period_min + \
60 symbol_distance * multiplier
62 # The maximum time which represents the symbol at the receiving end
63 self.max = min_inter_symbol_distance + period_min + \
64 symbol_distance * (multiplier + 1)
67 class MorseDistanceModulator(object):
68 def __init__(self, period_min, period_max, pulse_min, pulse_max,
69 inter_symbol_distance):
70 self.set_parameters(period_min, period_max,
72 inter_symbol_distance)
74 def set_parameters(self, period_min, period_max, pulse_min, pulse_max,
75 inter_symbol_distance):
76 self.period_min = period_min
77 self.period_max = period_max
78 self.pulse_min = pulse_min
79 self.pulse_max = pulse_max
80 self.inter_symbol_distance = inter_symbol_distance
82 self.dot_time = SymbolTime(period_min,
84 inter_symbol_distance)
85 self.dash_time = SymbolTime(period_min,
87 inter_symbol_distance)
88 self.signalspace_time = SymbolTime(period_min,
90 inter_symbol_distance)
91 self.wordspace_time = SymbolTime(period_min,
93 inter_symbol_distance)
94 self.eom_time = SymbolTime(period_min,
96 inter_symbol_distance)
98 def symbol_to_distance(self, symbol):
100 return self.dot_time.dist
102 return self.dash_time.dist
104 return self.signalspace_time.dist
105 elif symbol == "/" or symbol == " / ":
106 return self.wordspace_time.dist
107 elif symbol == "EOM":
108 return self.eom_time.dist
110 raise ValueError("Unexpected symbol %s" % symbol)
112 def is_same_period(self, distance):
113 return distance > self.pulse_min and distance <= self.pulse_max
115 def distance_to_symbol(self, distance):
116 if distance > self.dot_time.min and \
117 distance <= self.dot_time.max:
120 if distance > self.dash_time.min and \
121 distance <= self.dash_time.max:
124 if distance > self.signalspace_time.min and \
125 distance <= self.signalspace_time.max:
128 if distance > self.wordspace_time.min and \
129 distance <= self.wordspace_time.max:
132 if distance > self.eom_time.min:
135 raise ValueError("Unexpected distance %.2f" % distance)
137 def modulate(self, morse):
138 signals = morse.split(' ')
140 for i, signal in enumerate(signals):
141 for symbol in signal:
142 distances.append(self.symbol_to_distance(symbol))
144 # Transmit a signal separator only when strictly necessary.
146 # Avoid it in these cases:
147 # - after the last symbol, because EOM is going to ne transmitted
148 # anyway and that will mark the end of the last symbol.
149 # - between words, because the word separator act as a symbol
151 if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/":
152 distances.append(self.symbol_to_distance(" "))
154 distances.append(self.symbol_to_distance("EOM"))
156 # Since the Morse signals are encoded in the distance between calls, an
157 # extra call is needed in order for receiver actually get the EOM and
158 # see that the transmission has terminated.
164 class CallDistanceTransceiver(object):
165 """Transmit Morse messages using the distance between calls.
167 This is basically a pulse-distance modulation (PDM).
169 A RING is a pulse and the Morse symbols are encoded in the pause between
170 the first ring of the previous call and the first ring of a new call.
172 This strategy is very slow but it can even be used with ancient analog
173 modems which don't have call progress notifications for outgoing calls, and
174 make it also hard to count multiple rings in the same call on the receiving
175 side because there is no explicit notification on the receiving side of
176 when the caller ends a calls.
178 For GSM modems, which have a more sophisticate call report signalling,
179 a more efficient encoding can be used (for example using ring counts to
180 encode Morse symbols, i.e. a pulse-length modulation), but for
181 a proof-of-concept, a slow encoding covering the most general setup is
184 Plus, supporting communications between analog modems is cool :)
187 def __init__(self, modem,
188 call_setup_time_min=7, call_setup_time_max=15,
189 ring_time_min=4.8, ring_time_max=5.2,
190 add_inter_call_distance=True):
191 """Encode the Morse symbols using the distance between calls.
195 call_setup_time_min: the minimum time between when the transmitter
196 dials the number and the receiver reports RING notifications.
197 call_setup_time_max: the maximum time between when the transmitter
198 dials the number and the receiver reports RING notifications.
199 Waiting this time after dialing ensures that the receiver has
200 received _at_least_ one ring. The default chosen here have been
201 tested to work fine with old analog modems, provided that there
202 is some more distance between two calls, and with Android
204 ring_time_min: the minimum time between two consecutive RINGs
206 ring_time_max: the maximum time between two consecutive RINGs
207 in the same call, the standard interval is about 5 seconds, but
208 line and/or software delays can make it vary. This is needed
209 in order to ignore multiple ring in the same call when multiple
210 RING in the same call are notified, and then be able to
211 discriminate between two different calls.
212 add_inter_call_distance: specify if it is needed to wait an extra
213 fixed time between calls.
217 self.translator = MorseTranslator()
219 self.call_setup_time_max = call_setup_time_max
221 if add_inter_call_distance:
222 # Analog modems don't like to dial a new call immediately after
223 # they hung up the previous one; an extra delay of about one
224 # ring time makes them happy.
225 inter_symbol_distance = ring_time_max
227 inter_symbol_distance = 0
229 self.modulator = MorseDistanceModulator(call_setup_time_min,
233 inter_symbol_distance)
235 logging.debug("call setup time between %.2f and %.2f "
236 "--------- dot transmit time: %.2f + %.2f "
237 "receive time: between %.2f and %.2f",
238 self.modulator.period_min,
239 self.modulator.period_max,
240 self.modulator.period_max,
241 self.modulator.dot_time.dist,
242 self.modulator.dot_time.min,
243 self.modulator.dot_time.max)
244 logging.debug("call setup time between %.2f and %.2f "
245 "-------- dash transmit time: %.2f + %.2f "
246 "receive time: between %.2f and %.2f",
247 self.modulator.period_min,
248 self.modulator.period_max,
249 self.modulator.period_max,
250 self.modulator.dash_time.dist,
251 self.modulator.dash_time.min,
252 self.modulator.dash_time.max)
253 logging.debug("call setup time between %.2f and %.2f "
254 "- signalspace transmit time: %.2f + %.2f "
255 "receive time: between %.2f and %.2f",
256 self.modulator.period_min,
257 self.modulator.period_max,
258 self.modulator.period_max,
259 self.modulator.signalspace_time.dist,
260 self.modulator.signalspace_time.min,
261 self.modulator.signalspace_time.max)
262 logging.debug("call setup time between %.2f and %.2f "
263 "--- wordspace transmit time: %.2f + %.2f "
264 "receive time: between %.2f and %.2f",
265 self.modulator.period_min,
266 self.modulator.period_max,
267 self.modulator.period_max,
268 self.modulator.wordspace_time.dist,
269 self.modulator.wordspace_time.min,
270 self.modulator.wordspace_time.max)
271 logging.debug("call setup time between %.2f and %.2f "
272 "--------- EOM transmit time: %.2f + %.2f "
273 "receive time: between %.2f and inf",
274 self.modulator.period_min,
275 self.modulator.period_max,
276 self.modulator.period_max,
277 self.modulator.eom_time.dist,
278 self.modulator.eom_time.min)
280 self.previous_ring_time = -1
281 self.previous_call_time = -1
283 self.morse_message = ""
284 self.text_message = ""
286 self.end_of_message = False
288 def log_symbol(self, distance, symbol, extra_info=""):
289 logging.info("distance: %.2f Received \"%s\"%s", distance, symbol,
292 def receive_character(self):
293 current_ring_time = time.time()
295 if self.previous_ring_time == -1:
296 self.previous_ring_time = current_ring_time
297 self.previous_call_time = current_ring_time
298 self.log_symbol(0, "", "(The very first ring)")
301 ring_distance = current_ring_time - self.previous_ring_time
302 logging.debug("RINGs distance: %.2f", ring_distance)
303 self.previous_ring_time = current_ring_time
305 # Ignore multiple rings in the same call
306 if self.modulator.is_same_period(ring_distance):
307 logging.debug("multiple rings in the same call, distance: %.2f",
311 call_distance = current_ring_time - self.previous_call_time
312 self.previous_call_time = current_ring_time
315 symbol = self.modulator.distance_to_symbol(call_distance)
316 except ValueError as err:
317 logging.error("%s", err)
318 logging.error("Check the transmitter and receiver parameters")
322 if symbol in [" ", "/", "EOM"]:
323 signal = self.morse_message.strip().split(' ')[-1]
324 character = self.translator.signal_to_character(signal)
325 extra_info = " got \"%s\"" % character
327 self.log_symbol(call_distance, symbol, extra_info)
330 # Add spaces around the wordspace symbol to make it easier to split
331 # the Morse message in symbols later on
334 self.morse_message += symbol
336 self.end_of_message = True
337 self.previous_ring_time = -1
338 self.previous_call_time = -1
340 def receive_loop(self):
341 while not self.end_of_message:
342 self.modem.get_response("RING")
343 self.receive_character()
344 logging.debug("Current message: %s", self.morse_message)
346 self.end_of_message = False
347 self.text_message = self.translator.morse_to_text(self.morse_message)
348 self.morse_message = ""
351 return self.text_message
353 def transmit_symbol(self, destination_number, symbol, sleep_time):
354 logging.info("Dial and wait %.2f = %.2f + %.2f seconds "
355 "(transmitting '%s')",
356 self.call_setup_time_max + sleep_time,
357 self.call_setup_time_max,
361 # Dial, then wait self.call_setup_time_max to make sure the receiver
362 # gets at least one RING, and then hangup and sleep the time needed to
364 self.modem.send_command("ATDT" + destination_number + ";")
365 time.sleep(self.call_setup_time_max)
366 self.modem.send_command("ATH")
367 self.modem.get_response()
368 time.sleep(sleep_time)
370 def transmit(self, message, destination_number):
371 morse_message = self.translator.text_to_morse(message)
372 distances = self.modulator.modulate(morse_message)
374 logging.debug("Starting the transmission")
375 for i, distance in enumerate(distances):
376 # Use 'None' for the last call
377 if i == len(distances) - 1:
380 total_sleep_time = self.call_setup_time_max + distance
381 symbol = self.modulator.distance_to_symbol(total_sleep_time)
383 self.transmit_symbol(destination_number, symbol, distance)
385 def estimate_transmit_duration(self, message):
386 morsemessage = self.translator.text_to_morse(message)
387 logging.debug(morsemessage)
389 distances = self.modulator.modulate(morsemessage)
391 transmitting_time = 0
392 for distance in distances:
393 transmitting_time += self.call_setup_time_max
394 transmitting_time += distance
396 logging.debug("Estimated transmitting time: %.2f seconds",
400 def test_transmit_receive():
401 logging.basicConfig(level=logging.DEBUG)
402 call_setup_time_min = 0
403 call_setup_time_max = 0.01
409 class DummyModem(object):
410 """Always receive a '.', a '/' and then EOM, which results in 'E '."""
415 # Take trasmission times from a transceiver
416 self.transceiver = None
420 def send_command(self, command):
423 def get_response(self, response=None):
424 # pylint: disable=unused-argument
426 setup_time = random.uniform(self.transceiver.modulator.period_min,
427 self.transceiver.modulator.period_max)
429 if self.ring_count == 0:
432 elif self.ring_count == 1:
434 dot_time = self.transceiver.modulator.dot_time.dist
435 time.sleep(setup_time + dot_time)
436 elif self.ring_count == 2:
438 wordspace_time = self.transceiver.modulator.wordspace_time.dist
439 time.sleep(setup_time + wordspace_time)
442 eom_time = self.transceiver.modulator.eom_time.dist
443 time.sleep(setup_time + eom_time)
450 xcv = CallDistanceTransceiver(modem,
451 call_setup_time_min, call_setup_time_max,
452 ring_time_min, ring_time_max, True)
454 modem.transceiver = xcv
456 xcv.transmit("CODEX PARIS", "0")
462 print("Message received!")
463 print("\"%s\"" % xcv.get_text(), flush=True)
466 if __name__ == "__main__":
467 test_transmit_receive()