3 # CallDistanceTransceiver - send and receive Morse using phone calls distance
5 # Copyright (C) 2015 Antonio Ospite <ao2@ao2.it>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 # This hack allows MorseTranslator to be imported also when
22 # __name__ == "__main__"
24 from .MorseTranslator import MorseTranslator
26 from MorseTranslator import MorseTranslator
32 class SymbolTime(object):
34 In theory the symbol distance (the distance to discriminate symbols) can
35 be arbitrary, and it's only bounded below by the stability of the period
36 time, but in practice it can be necessary to wait a predefined minimum
37 amount of time between periods because of technological limits of the
38 transmitting devices, we call this time the "minimum inter-symbol
42 # pylint: disable=too-few-public-methods
43 def __init__(self, period_min, period_max, multiplier,
44 min_inter_symbol_distance=0.0):
45 assert multiplier >= 0
46 if (period_min == period_max) and (min_inter_symbol_distance == 0):
47 raise ValueError("If (period_min == period_max) a non-zero",
48 "inter-symbol distance MUST be specified")
50 symbol_distance = 2 * (period_max - period_min)
51 if symbol_distance == 0:
52 symbol_distance = min_inter_symbol_distance
54 # The time the transmitter has to wait to disambiguate between this
55 # symbol and a different one.
56 self.dist = min_inter_symbol_distance + symbol_distance * multiplier
58 # The minimum time which represents the symbol at the receiving end.
59 self.min = min_inter_symbol_distance + period_min + \
60 symbol_distance * multiplier
62 # The maximum time which represents the symbol at the receiving end
63 self.max = min_inter_symbol_distance + period_min + \
64 symbol_distance * (multiplier + 1)
67 class MorseDistanceModulator(object):
68 def __init__(self, period_min, period_max, pulse_min, pulse_max,
69 inter_symbol_distance):
70 self.set_parameters(period_min, period_max,
72 inter_symbol_distance)
74 def set_parameters(self, period_min, period_max, pulse_min, pulse_max,
75 inter_symbol_distance):
76 self.period_min = period_min
77 self.period_max = period_max
78 self.pulse_min = pulse_min
79 self.pulse_max = pulse_max
80 self.inter_symbol_distance = inter_symbol_distance
82 self.dot_time = SymbolTime(period_min,
84 inter_symbol_distance)
85 self.dash_time = SymbolTime(period_min,
87 inter_symbol_distance)
88 self.signalspace_time = SymbolTime(period_min,
90 inter_symbol_distance)
91 self.wordspace_time = SymbolTime(period_min,
93 inter_symbol_distance)
94 self.eom_time = SymbolTime(period_min,
96 inter_symbol_distance)
98 def symbol_to_distance(self, symbol):
100 return self.dot_time.dist
102 return self.dash_time.dist
104 return self.signalspace_time.dist
105 elif symbol == "/" or symbol == " / ":
106 return self.wordspace_time.dist
107 elif symbol == "EOM":
108 return self.eom_time.dist
110 raise ValueError("Unexpected symbol %s" % symbol)
112 def is_same_period(self, distance):
113 return distance > self.pulse_min and distance <= self.pulse_max
115 def distance_to_symbol(self, distance):
116 if distance > self.dot_time.min and \
117 distance <= self.dot_time.max:
120 if distance > self.dash_time.min and \
121 distance <= self.dash_time.max:
124 if distance > self.signalspace_time.min and \
125 distance <= self.signalspace_time.max:
128 if distance > self.wordspace_time.min and \
129 distance <= self.wordspace_time.max:
132 if distance > self.eom_time.min:
135 raise ValueError("Unexpected distance %.2f" % distance)
137 def modulate(self, morse):
138 signals = morse.split(' ')
140 for i, signal in enumerate(signals):
141 for symbol in signal:
142 distances.append(self.symbol_to_distance(symbol))
144 # Transmit a signal separator only when strictly necessary.
146 # Avoid it in these cases:
147 # - after the last symbol, because EOM is going to ne transmitted
148 # anyway and that will mark the end of the last symbol.
149 # - between words, because the word separator act as a symbol
151 if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/":
152 distances.append(self.symbol_to_distance(" "))
154 distances.append(self.symbol_to_distance("EOM"))
156 # Since the Morse signals are encoded in the distance between calls, an
157 # extra call is needed in order for receiver actually get the EOM and
158 # see that the transmission has terminated.
164 class CallDistanceTransceiver(object):
165 """Transmit Morse messages using the distance between calls.
167 This is basically a pulse-distance modulation (PDM).
169 A RING is a pulse and the Morse symbols are encoded in the pause between
170 the first ring of the previous call and the first ring of a new call.
172 This strategy is very slow but it can even be used with ancient analog
173 modems which don't have call progress notifications for outgoing calls, and
174 make it also hard to count multiple rings in the same call on the receiving
175 side because there is no explicit notification on the receiving side of
176 when the caller ends a calls.
178 For GSM modems, which have a more sophisticate call report signalling,
179 a more efficient encoding can be used (for example using ring counts to
180 encode Morse symbols, i.e. a pulse-length modulation), but for
181 a proof-of-concept, a slow encoding covering the most general setup is
184 Plus, supporting communications between analog modems is cool :)
187 def __init__(self, modem,
188 call_setup_time_min=7, call_setup_time_max=15,
189 ring_time_min=4.8, ring_time_max=5.2,
190 add_inter_call_distance=True):
191 """Encode the Morse symbols using the distance between calls.
195 call_setup_time_min: the minimum time between when the transmitter
196 dials the number and the receiver reports RING notifications.
197 call_setup_time_max: the maximum time between when the transmitter
198 dials the number and the receiver reports RING notifications.
199 Waiting this time after dialing ensures that the receiver has
200 received _at_least_ one ring. The default chosen here have been
201 tested to work fine with old analog modems, provided that there
202 is some more distance between two calls, and with Android
204 ring_time_min: the minimum time between two consecutive RINGs
206 ring_time_max: the maximum time between two consecutive RINGs
207 in the same call, the standard interval is about 5 seconds, but
208 line and/or software delays can make it vary. This is needed
209 in order to ignore multiple ring in the same call when multiple
210 RING in the same call are notified, and then be able to
211 discriminate between two different calls.
212 add_inter_call_distance: specify if it is needed to wait an extra
213 fixed time between calls.
217 self.translator = MorseTranslator()
219 self.destination_number = ""
221 self.call_setup_time_max = call_setup_time_max
223 if add_inter_call_distance:
224 # Analog modems don't like to dial a new call immediately after
225 # they hung up the previous one; an extra delay of about one
226 # ring time makes them happy.
227 inter_symbol_distance = ring_time_max
229 inter_symbol_distance = 0
231 self.modulator = MorseDistanceModulator(call_setup_time_min,
235 inter_symbol_distance)
237 logging.debug("call setup time between %.2f and %.2f "
238 "--------- dot transmit time: %.2f + %.2f "
239 "receive time: between %.2f and %.2f",
240 self.modulator.period_min,
241 self.modulator.period_max,
242 self.modulator.period_max,
243 self.modulator.dot_time.dist,
244 self.modulator.dot_time.min,
245 self.modulator.dot_time.max)
246 logging.debug("call setup time between %.2f and %.2f "
247 "-------- dash transmit time: %.2f + %.2f "
248 "receive time: between %.2f and %.2f",
249 self.modulator.period_min,
250 self.modulator.period_max,
251 self.modulator.period_max,
252 self.modulator.dash_time.dist,
253 self.modulator.dash_time.min,
254 self.modulator.dash_time.max)
255 logging.debug("call setup time between %.2f and %.2f "
256 "- signalspace transmit time: %.2f + %.2f "
257 "receive time: between %.2f and %.2f",
258 self.modulator.period_min,
259 self.modulator.period_max,
260 self.modulator.period_max,
261 self.modulator.signalspace_time.dist,
262 self.modulator.signalspace_time.min,
263 self.modulator.signalspace_time.max)
264 logging.debug("call setup time between %.2f and %.2f "
265 "--- wordspace transmit time: %.2f + %.2f "
266 "receive time: between %.2f and %.2f",
267 self.modulator.period_min,
268 self.modulator.period_max,
269 self.modulator.period_max,
270 self.modulator.wordspace_time.dist,
271 self.modulator.wordspace_time.min,
272 self.modulator.wordspace_time.max)
273 logging.debug("call setup time between %.2f and %.2f "
274 "--------- EOM transmit time: %.2f + %.2f "
275 "receive time: between %.2f and inf",
276 self.modulator.period_min,
277 self.modulator.period_max,
278 self.modulator.period_max,
279 self.modulator.eom_time.dist,
280 self.modulator.eom_time.min)
282 self.previous_ring_time = -1
283 self.previous_call_time = -1
285 self.morse_message = ""
286 self.text_message = ""
288 self.end_of_message = False
290 def log_symbol(self, distance, symbol, extra_info=""):
291 logging.info("distance: %.2f Received \"%s\"%s", distance, symbol,
294 def receive_character(self):
295 current_ring_time = time.time()
297 if self.previous_ring_time == -1:
298 self.previous_ring_time = current_ring_time
299 self.previous_call_time = current_ring_time
300 self.log_symbol(0, "", "(The very first ring)")
303 ring_distance = current_ring_time - self.previous_ring_time
304 logging.debug("RINGs distance: %.2f", ring_distance)
305 self.previous_ring_time = current_ring_time
307 # Ignore multiple rings in the same call
308 if self.modulator.is_same_period(ring_distance):
309 logging.debug("multiple rings in the same call, distance: %.2f",
313 call_distance = current_ring_time - self.previous_call_time
314 self.previous_call_time = current_ring_time
317 symbol = self.modulator.distance_to_symbol(call_distance)
318 except ValueError as err:
319 logging.error("%s", err)
320 logging.error("Check the transmitter and receiver parameters")
324 if symbol in [" ", "/", "EOM"]:
325 signal = self.morse_message.strip().split(' ')[-1]
326 character = self.translator.signal_to_character(signal)
327 extra_info = " got \"%s\"" % character
329 self.log_symbol(call_distance, symbol, extra_info)
332 # Add spaces around the wordspace symbol to make it easier to split
333 # the Morse message in symbols later on
336 self.morse_message += symbol
338 self.end_of_message = True
339 self.previous_ring_time = -1
340 self.previous_call_time = -1
342 def receive_loop(self):
343 while not self.end_of_message:
344 self.modem.get_response("RING")
345 self.receive_character()
346 logging.debug("Current message: %s", self.morse_message)
348 self.end_of_message = False
349 self.text_message = self.translator.morse_to_text(self.morse_message)
350 self.morse_message = ""
353 return self.text_message
355 def transmit_symbol(self, symbol, sleep_time):
356 logging.info("Dial and wait %.2f = %.2f + %.2f seconds "
357 "(transmitting '%s')",
358 self.call_setup_time_max + sleep_time,
359 self.call_setup_time_max,
363 # Dial, then wait self.call_setup_time_max to make sure the receiver
364 # gets at least one RING, and then hangup and sleep the time needed to
366 self.modem.send_command("ATDT" + self.destination_number + ";")
367 time.sleep(self.call_setup_time_max)
368 self.modem.send_command("ATH")
369 self.modem.get_response()
370 time.sleep(sleep_time)
372 def transmit(self, message, destination_number):
373 self.destination_number = destination_number
375 morse_message = self.translator.text_to_morse(message)
376 distances = self.modulator.modulate(morse_message)
378 logging.debug("Starting the transmission")
379 for i, distance in enumerate(distances):
380 # Use 'None' for the last call
381 if i == len(distances) - 1:
384 total_sleep_time = self.call_setup_time_max + distance
385 symbol = self.modulator.distance_to_symbol(total_sleep_time)
387 self.transmit_symbol(symbol, distance)
389 def estimate_transmit_duration(self, message):
390 morsemessage = self.translator.text_to_morse(message)
391 logging.debug(morsemessage)
393 distances = self.modulator.modulate(morsemessage)
395 transmitting_time = 0
396 for distance in distances:
397 transmitting_time += self.call_setup_time_max
398 transmitting_time += distance
400 logging.debug("Estimated transmitting time: %.2f seconds",
404 def test_send_receive():
405 logging.basicConfig(level=logging.DEBUG)
406 call_setup_time_min = 0
407 call_setup_time_max = 0.01
413 class DummyModem(object):
414 """Always receive a '.', a '/' and then EOM, which results in 'E '."""
419 # Take trasmission times from a transceiver
420 self.transceiver = None
424 def send_command(self, command):
427 def get_response(self, response=None):
428 # pylint: disable=unused-argument
430 setup_time = random.uniform(self.transceiver.modulator.period_min,
431 self.transceiver.modulator.period_max)
433 if self.ring_count == 0:
436 elif self.ring_count == 1:
438 dot_time = self.transceiver.modulator.dot_time.dist
439 time.sleep(setup_time + dot_time)
440 elif self.ring_count == 2:
442 wordspace_time = self.transceiver.modulator.wordspace_time.dist
443 time.sleep(setup_time + wordspace_time)
446 eom_time = self.transceiver.modulator.eom_time.dist
447 time.sleep(setup_time + eom_time)
454 xcv = CallDistanceTransceiver(modem,
455 call_setup_time_min, call_setup_time_max,
456 ring_time_min, ring_time_max, True)
458 modem.transceiver = xcv
460 xcv.transmit("CODEX PARIS", "0")
466 print("Message received!")
467 print("\"%s\"" % xcv.get_text(), flush=True)
470 if __name__ == "__main__":