3 # CallDistanceTransceiver - send and receive Morse using phone calls distance
5 # Copyright (C) 2015 Antonio Ospite <ao2@ao2.it>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 # This hack allows MorseTranslator to be imported also when
22 # __name__ == "__main__"
24 from .MorseTranslator import MorseTranslator
26 from MorseTranslator import MorseTranslator
32 class SymbolTime(object):
34 In theory the symbol distance (the distance to discriminate symbols) can
35 be arbitrary, and it's only bounded below by the stability of the period
36 time, but in practice it can be necessary to wait a predefined minimum
37 amount of time between periods because of technological limits of the
38 transmitting devices, we call this time the "minimum inter-symbol
42 # pylint: disable=too-few-public-methods
43 def __init__(self, period_min, period_max, multiplier,
44 min_inter_symbol_distance=0.0):
45 assert multiplier >= 0
46 if (period_min == period_max) and (min_inter_symbol_distance == 0):
47 raise ValueError("If (period_min == period_max) a non-zero",
48 "inter-symbol distance MUST be specified")
50 symbol_distance = 2 * (period_max - period_min)
51 if symbol_distance == 0:
52 symbol_distance = min_inter_symbol_distance
54 # The time the transmitter has to wait to disambiguate between this
55 # symbol and a different one.
56 self.dist = min_inter_symbol_distance + symbol_distance * multiplier
58 # The minimum time which represents the symbol at the receiving end.
59 self.min = min_inter_symbol_distance + period_min + \
60 symbol_distance * multiplier
62 # The maximum time which represents the symbol at the receiving end
63 self.max = min_inter_symbol_distance + period_min + \
64 symbol_distance * (multiplier + 1)
67 class MorseDistanceModulator(object):
68 def __init__(self, period_min, period_max, pulse_min, pulse_max,
69 inter_symbol_distance):
70 self.set_parameters(period_min, period_max,
72 inter_symbol_distance)
74 def set_parameters(self, period_min, period_max, pulse_min, pulse_max,
75 inter_symbol_distance):
76 self.period_min = period_min
77 self.period_max = period_max
78 self.pulse_min = pulse_min
79 self.pulse_max = pulse_max
80 self.inter_symbol_distance = inter_symbol_distance
82 self.dot_time = SymbolTime(period_min,
84 inter_symbol_distance)
85 self.dash_time = SymbolTime(period_min,
87 inter_symbol_distance)
88 self.signalspace_time = SymbolTime(period_min,
90 inter_symbol_distance)
91 self.wordspace_time = SymbolTime(period_min,
93 inter_symbol_distance)
94 self.eom_time = SymbolTime(period_min,
96 inter_symbol_distance)
98 def symbol_to_distance(self, symbol):
100 return self.dot_time.dist
102 return self.dash_time.dist
104 return self.signalspace_time.dist
105 elif symbol == "/" or symbol == " / ":
106 return self.wordspace_time.dist
107 elif symbol == "EOM":
108 return self.eom_time.dist
110 raise ValueError("Unexpected symbol %s" % symbol)
112 def is_same_period(self, distance):
113 return distance > self.pulse_min and distance <= self.pulse_max
115 def distance_to_symbol(self, distance):
116 if distance > self.dot_time.min and \
117 distance <= self.dot_time.max:
120 if distance > self.dash_time.min and \
121 distance <= self.dash_time.max:
124 if distance > self.signalspace_time.min and \
125 distance <= self.signalspace_time.max:
128 if distance > self.wordspace_time.min and \
129 distance <= self.wordspace_time.max:
132 if distance > self.eom_time.min:
135 raise ValueError("Unexpected distance %.2f" % distance)
137 def modulate(self, morse):
138 signals = morse.split(' ')
140 for i, signal in enumerate(signals):
141 for symbol in signal:
142 distances.append(self.symbol_to_distance(symbol))
144 # Transmit a signal separator only when strictly necessary.
146 # Avoid it in these cases:
147 # - after the last symbol, because EOM is going to ne transmitted
148 # anyway and that will mark the end of the last symbol.
149 # - between words, because the word separator act as a symbol
151 if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/":
152 distances.append(self.symbol_to_distance(" "))
154 distances.append(self.symbol_to_distance("EOM"))
156 # Since the Morse signals are encoded in the distance between calls, an
157 # extra call is needed in order for receiver actually get the EOM and
158 # see that the transmission has terminated.
164 def log_symbol(distance, symbol, extra_info=""):
165 logging.info("distance: %.2f Received \"%s\"%s", distance, symbol,
169 class CallDistanceTransceiver(object):
170 """Transmit Morse messages using the distance between calls.
172 This is basically a pulse-distance modulation (PDM).
174 A RING is a pulse and the Morse symbols are encoded in the pause between
175 the first ring of the previous call and the first ring of a new call.
177 This strategy is very slow but it can even be used with ancient analog
178 modems which don't have call progress notifications for outgoing calls, and
179 make it also hard to count multiple rings in the same call on the receiving
180 side because there is no explicit notification on the receiving side of
181 when the caller ends a calls.
183 For GSM modems, which have a more sophisticate call report signalling,
184 a more efficient encoding can be used (for example using ring counts to
185 encode Morse symbols, i.e. a pulse-length modulation), but for
186 a proof-of-concept, a slow encoding covering the most general setup is
189 Plus, supporting communications between analog modems is cool :)
192 def __init__(self, modem,
193 call_setup_time_min=7, call_setup_time_max=15,
194 ring_time_min=4.8, ring_time_max=5.2,
195 add_inter_call_distance=True):
196 """Encode the Morse symbols using the distance between calls.
200 call_setup_time_min: the minimum time between when the transmitter
201 dials the number and the receiver reports RING notifications.
202 call_setup_time_max: the maximum time between when the transmitter
203 dials the number and the receiver reports RING notifications.
204 Waiting this time after dialing ensures that the receiver has
205 received _at_least_ one ring. The default chosen here have been
206 tested to work fine with old analog modems, provided that there
207 is some more distance between two calls, and with Android
209 ring_time_min: the minimum time between two consecutive RINGs
211 ring_time_max: the maximum time between two consecutive RINGs
212 in the same call, the standard interval is about 5 seconds, but
213 line and/or software delays can make it vary. This is needed
214 in order to ignore multiple ring in the same call when multiple
215 RING in the same call are notified, and then be able to
216 discriminate between two different calls.
217 add_inter_call_distance: specify if it is needed to wait an extra
218 fixed time between calls.
222 self.translator = MorseTranslator()
224 self.call_setup_time_max = call_setup_time_max
226 if add_inter_call_distance:
227 # Analog modems don't like to dial a new call immediately after
228 # they hung up the previous one; an extra delay of about one
229 # ring time makes them happy.
230 inter_symbol_distance = ring_time_max
232 inter_symbol_distance = 0
234 self.modulator = MorseDistanceModulator(call_setup_time_min,
238 inter_symbol_distance)
240 logging.debug("call setup time between %.2f and %.2f "
241 "--------- dot transmit time: %.2f + %.2f "
242 "receive time: between %.2f and %.2f",
243 self.modulator.period_min,
244 self.modulator.period_max,
245 self.modulator.period_max,
246 self.modulator.dot_time.dist,
247 self.modulator.dot_time.min,
248 self.modulator.dot_time.max)
249 logging.debug("call setup time between %.2f and %.2f "
250 "-------- dash transmit time: %.2f + %.2f "
251 "receive time: between %.2f and %.2f",
252 self.modulator.period_min,
253 self.modulator.period_max,
254 self.modulator.period_max,
255 self.modulator.dash_time.dist,
256 self.modulator.dash_time.min,
257 self.modulator.dash_time.max)
258 logging.debug("call setup time between %.2f and %.2f "
259 "- signalspace transmit time: %.2f + %.2f "
260 "receive time: between %.2f and %.2f",
261 self.modulator.period_min,
262 self.modulator.period_max,
263 self.modulator.period_max,
264 self.modulator.signalspace_time.dist,
265 self.modulator.signalspace_time.min,
266 self.modulator.signalspace_time.max)
267 logging.debug("call setup time between %.2f and %.2f "
268 "--- wordspace transmit time: %.2f + %.2f "
269 "receive time: between %.2f and %.2f",
270 self.modulator.period_min,
271 self.modulator.period_max,
272 self.modulator.period_max,
273 self.modulator.wordspace_time.dist,
274 self.modulator.wordspace_time.min,
275 self.modulator.wordspace_time.max)
276 logging.debug("call setup time between %.2f and %.2f "
277 "--------- EOM transmit time: %.2f + %.2f "
278 "receive time: between %.2f and inf",
279 self.modulator.period_min,
280 self.modulator.period_max,
281 self.modulator.period_max,
282 self.modulator.eom_time.dist,
283 self.modulator.eom_time.min)
285 self.previous_ring_time = -1
286 self.previous_call_time = -1
288 self.morse_message = ""
289 self.text_message = ""
291 self.end_of_message = False
293 def receive_character(self):
294 current_ring_time = time.time()
296 if self.previous_ring_time == -1:
297 self.previous_ring_time = current_ring_time
298 self.previous_call_time = current_ring_time
299 log_symbol(0, "", "(The very first ring)")
302 ring_distance = current_ring_time - self.previous_ring_time
303 logging.debug("RINGs distance: %.2f", ring_distance)
304 self.previous_ring_time = current_ring_time
306 # Ignore multiple rings in the same call
307 if self.modulator.is_same_period(ring_distance):
308 logging.debug("multiple rings in the same call, distance: %.2f",
312 call_distance = current_ring_time - self.previous_call_time
313 self.previous_call_time = current_ring_time
316 symbol = self.modulator.distance_to_symbol(call_distance)
317 except ValueError as err:
318 logging.error("%s", err)
319 logging.error("Check the transmitter and receiver parameters")
323 if symbol in [" ", "/", "EOM"]:
324 signal = self.morse_message.strip().split(' ')[-1]
325 character = self.translator.signal_to_character(signal)
326 extra_info = " got \"%s\"" % character
328 log_symbol(call_distance, symbol, extra_info)
331 # Add spaces around the wordspace symbol to make it easier to split
332 # the Morse message in symbols later on
335 self.morse_message += symbol
337 self.end_of_message = True
338 self.previous_ring_time = -1
339 self.previous_call_time = -1
341 def receive_loop(self):
342 while not self.end_of_message:
343 self.modem.get_response("RING")
344 self.receive_character()
345 logging.debug("Current message: %s", self.morse_message)
347 self.end_of_message = False
348 self.text_message = self.translator.morse_to_text(self.morse_message)
349 self.morse_message = ""
352 return self.text_message
354 def transmit_symbol(self, destination_number, symbol, sleep_time):
355 logging.info("Dial and wait %.2f = %.2f + %.2f seconds "
356 "(transmitting '%s')",
357 self.call_setup_time_max + sleep_time,
358 self.call_setup_time_max,
362 # Dial, then wait self.call_setup_time_max to make sure the receiver
363 # gets at least one RING, and then hangup and sleep the time needed to
365 self.modem.send_command("ATDT" + destination_number + ";")
366 time.sleep(self.call_setup_time_max)
367 self.modem.send_command("ATH")
368 self.modem.get_response()
369 time.sleep(sleep_time)
371 def transmit(self, message, destination_number):
372 morse_message = self.translator.text_to_morse(message)
373 distances = self.modulator.modulate(morse_message)
375 logging.debug("Starting the transmission")
376 for i, distance in enumerate(distances):
377 # Use 'None' for the last call
378 if i == len(distances) - 1:
381 total_sleep_time = self.call_setup_time_max + distance
382 symbol = self.modulator.distance_to_symbol(total_sleep_time)
384 self.transmit_symbol(destination_number, symbol, distance)
386 def estimate_transmit_duration(self, message):
387 morsemessage = self.translator.text_to_morse(message)
388 logging.debug(morsemessage)
390 distances = self.modulator.modulate(morsemessage)
392 transmitting_time = 0
393 for distance in distances:
394 transmitting_time += self.call_setup_time_max
395 transmitting_time += distance
397 logging.debug("Estimated transmitting time: %.2f seconds",
401 def test_transmit_receive():
402 logging.basicConfig(level=logging.DEBUG)
403 call_setup_time_min = 0
404 call_setup_time_max = 0.01
410 class DummyModem(object):
411 """Always receive a '.', a '/' and then EOM, which results in 'E '."""
416 # Take trasmission times from a transceiver
417 self.transceiver = None
421 def send_command(self, command):
424 def get_response(self, response=None):
425 # pylint: disable=unused-argument
427 setup_time = random.uniform(self.transceiver.modulator.period_min,
428 self.transceiver.modulator.period_max)
430 if self.ring_count == 0:
433 elif self.ring_count == 1:
435 dot_time = self.transceiver.modulator.dot_time.dist
436 time.sleep(setup_time + dot_time)
437 elif self.ring_count == 2:
439 wordspace_time = self.transceiver.modulator.wordspace_time.dist
440 time.sleep(setup_time + wordspace_time)
443 eom_time = self.transceiver.modulator.eom_time.dist
444 time.sleep(setup_time + eom_time)
451 xcv = CallDistanceTransceiver(modem,
452 call_setup_time_min, call_setup_time_max,
453 ring_time_min, ring_time_max, True)
455 modem.transceiver = xcv
457 xcv.transmit("CODEX PARIS", "0")
463 print("Message received!")
464 print("\"%s\"" % xcv.get_text(), flush=True)
467 if __name__ == "__main__":
468 test_transmit_receive()