3 # CallDistanceTransceiver - send and receive Morse using phone calls distance
5 # Copyright (C) 2015 Antonio Ospite <ao2@ao2.it>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 # This hack allows MorseTranslator to be imported also when
22 # __name__ == "__main__"
24 from .MorseTranslator import MorseTranslator
26 from MorseTranslator import MorseTranslator
32 class SymbolTime(object):
34 In theory the symbol distance (the distance to discriminate symbols) can
35 be arbitrary, and it's only bounded below by the stability of the pulse
36 time, but in practice it can be necessary to wait a predefined minimum
37 amount of time between pulses because of technological limits of the
38 transmitting devices, we call this time the "minimum inter-symbol
42 # pylint: disable=too-few-public-methods
43 def __init__(self, pulse_min, pulse_max, multiplier,
44 min_inter_symbol_distance=0.0):
45 assert multiplier >= 0
46 if (pulse_min == pulse_max) and (min_inter_symbol_distance == 0):
47 raise ValueError("If (pulse_min == pulse_max) a non-zero",
48 "inter-symbol distance MUST be specified")
50 symbol_distance = 2 * (pulse_max - pulse_min)
51 if symbol_distance == 0:
52 symbol_distance = min_inter_symbol_distance
54 # The time the transmitter has to wait to disambiguate between this
55 # symbol and a different one.
56 self.dist = min_inter_symbol_distance + symbol_distance * multiplier
58 # The minimum time which represents the symbol at the receiving end.
59 self.min = min_inter_symbol_distance + pulse_min + \
60 symbol_distance * multiplier
62 # The maximum time which represents the symbol at the receiving end
63 self.max = min_inter_symbol_distance + pulse_min + \
64 symbol_distance * (multiplier + 1)
67 class CallDistanceTransceiver(object):
68 """Transmit Morse messages using the distance between calls.
70 This is basically a pulse-distance modulation (PDM).
72 A RING is a pulse and the Morse symbols are encoded in the pause between
73 the first ring of the previous call and the first ring of a new call.
75 This strategy is very slow but it can even be used with ancient analog
76 modems which don't have call progress notifications for outgoing calls, and
77 make it also hard to count multiple rings in the same call on the receiving
78 side because there is no explicit notification on the receiving side of
79 when the caller ends a calls.
81 For GSM modems, which have a more sophisticate call report signalling,
82 a more efficient encoding can be used (for example using ring counts to
83 encode Morse symbols, i.e. a pulse-length modulation), but for
84 a proof-of-concept, a slow encoding covering the most general setup is
87 Plus, supporting communications between analog modems is cool :)
90 def __init__(self, modem,
91 call_setup_time_min=7, call_setup_time_max=15,
92 ring_time_min=4.8, ring_time_max=5.2,
93 add_inter_call_distance=True):
94 """Encode the Morse symbols using the distance between calls.
98 call_setup_time_min: the minimum time between when the transmitter
99 dials the number and the receiver reports RING notifications.
100 call_setup_time_max: the maximum time between when the transmitter
101 dials the number and the receiver reports RING notifications.
102 Waiting this time after dialing ensures that the receiver has
103 received _at_least_ one ring. The default chosen here have been
104 tested to work fine with old analog modems, provided that there
105 is some more distance between two calls, and with Android
107 ring_time_min: the minimum time between two consecutive RINGs
109 ring_time_max: the maximum time between two consecutive RINGs
110 in the same call, the standard interval is about 5 seconds, but
111 line and/or software delays can make it vary. This is needed
112 in order to ignore multiple ring in the same call when multiple
113 RING in the same call are notified, and then be able to
114 discriminate between two different calls.
115 add_inter_call_distance: specify if it is needed to wait an extra
116 fixed time between calls.
120 self.translator = MorseTranslator()
122 self.destination_number = ""
124 self.call_setup_time_min = call_setup_time_min
125 self.call_setup_time_max = call_setup_time_max
127 self.ring_time_min = ring_time_min
128 self.ring_time_max = ring_time_max
130 if add_inter_call_distance:
131 # Analog modems don't like to dial a new call immediately after
132 # they hung up the previous one; an extra delay of about one
133 # ring time makes them happy.
134 inter_symbol_distance = ring_time_max
136 inter_symbol_distance = 0
138 self.dot_time = SymbolTime(call_setup_time_min,
139 call_setup_time_max, 0,
140 inter_symbol_distance)
141 self.dash_time = SymbolTime(call_setup_time_min,
142 call_setup_time_max, 1,
143 inter_symbol_distance)
144 self.signalspace_time = SymbolTime(call_setup_time_min,
145 call_setup_time_max, 2,
146 inter_symbol_distance)
147 self.wordspace_time = SymbolTime(call_setup_time_min,
148 call_setup_time_max, 3,
149 inter_symbol_distance)
150 self.eom_time = SymbolTime(call_setup_time_min,
151 call_setup_time_max, 4,
152 inter_symbol_distance)
154 logging.debug("call setup time between %.2f and %.2f "
155 "--------- dot transmit time: %.2f + %.2f "
156 "receive time: between %.2f and %.2f",
157 self.call_setup_time_min,
158 self.call_setup_time_max,
159 self.call_setup_time_max,
163 logging.debug("call setup time between %.2f and %.2f "
164 "-------- dash transmit time: %.2f + %.2f "
165 "receive time: between %.2f and %.2f",
166 self.call_setup_time_min,
167 self.call_setup_time_max,
168 self.call_setup_time_max,
172 logging.debug("call setup time between %.2f and %.2f "
173 "- signalspace transmit time: %.2f + %.2f "
174 "receive time: between %.2f and %.2f",
175 self.call_setup_time_min,
176 self.call_setup_time_max,
177 self.call_setup_time_max,
178 self.signalspace_time.dist,
179 self.signalspace_time.min,
180 self.signalspace_time.max)
181 logging.debug("call setup time between %.2f and %.2f "
182 "--- wordspace transmit time: %.2f + %.2f "
183 "receive time: between %.2f and %.2f",
184 self.call_setup_time_min,
185 self.call_setup_time_max,
186 self.call_setup_time_max,
187 self.wordspace_time.dist,
188 self.wordspace_time.min,
189 self.wordspace_time.max)
190 logging.debug("call setup time between %.2f and %.2f "
191 "--------- EOM transmit time: %.2f + %.2f "
192 "receive time: between %.2f and inf",
193 self.call_setup_time_min,
194 self.call_setup_time_max,
195 self.call_setup_time_max,
199 self.previous_ring_time = -1
200 self.previous_call_time = -1
202 self.morse_message = ""
203 self.text_message = ""
205 self.end_of_message = False
207 def log_symbol(self, distance, symbol, extra_info=""):
208 logging.info("distance: %.2f Received \"%s\"%s", distance, symbol,
211 def receive_character(self):
212 current_ring_time = time.time()
214 if self.previous_ring_time == -1:
215 self.previous_ring_time = current_ring_time
216 self.previous_call_time = current_ring_time
217 self.log_symbol(0, "", "(The very first ring)")
220 ring_distance = current_ring_time - self.previous_ring_time
221 logging.debug("RINGs distance: %.2f", ring_distance)
222 self.previous_ring_time = current_ring_time
224 # Ignore multiple rings in the same call
225 if ring_distance > self.ring_time_min and \
226 ring_distance <= self.ring_time_max:
227 logging.debug("multiple rings in the same call, distance: %.2f",
231 call_distance = current_ring_time - self.previous_call_time
232 self.previous_call_time = current_ring_time
234 if call_distance > self.dot_time.min and \
235 call_distance <= self.dot_time.max:
236 self.log_symbol(call_distance, '.')
237 self.morse_message += "."
240 if call_distance > self.dash_time.min and \
241 call_distance <= self.dash_time.max:
242 self.log_symbol(call_distance, '-')
243 self.morse_message += "-"
246 if call_distance > self.signalspace_time.min and \
247 call_distance <= self.signalspace_time.max:
248 signal = self.morse_message.strip().split(' ')[-1]
249 character = self.translator.signal_to_character(signal)
250 self.log_symbol(call_distance, ' ', " got \"%s\"" % character)
251 self.morse_message += " "
254 if call_distance > self.wordspace_time.min and \
255 call_distance <= self.wordspace_time.max:
256 signal = self.morse_message.strip().split(' ')[-1]
257 character = self.translator.signal_to_character(signal)
258 self.log_symbol(call_distance, '/', " got \"%s\"" % character)
259 self.morse_message += " / "
262 if call_distance > self.eom_time.min:
263 signal = self.morse_message.strip().split(' ')[-1]
264 character = self.translator.signal_to_character(signal)
265 self.log_symbol(call_distance, 'EOM', " got \"%s\"" % character)
266 self.end_of_message = True
267 self.previous_ring_time = -1
268 self.previous_call_time = -1
271 # if the code made it up to here, something fishy is going on
272 logging.error("Unexpected distance: %.2f", call_distance)
273 logging.error("Check the transmitter and receiver parameters")
275 def receive_loop(self):
276 while not self.end_of_message:
277 self.modem.get_response("RING")
278 self.receive_character()
279 logging.debug("Current message: %s", self.morse_message)
281 self.end_of_message = False
282 self.text_message = self.translator.morse_to_text(self.morse_message)
283 self.morse_message = ""
286 return self.text_message
288 def transmit_symbol(self, symbol):
290 sleep_time = self.dot_time.dist
292 sleep_time = self.dash_time.dist
294 sleep_time = self.signalspace_time.dist
296 sleep_time = self.wordspace_time.dist
297 elif symbol == "EOM":
298 sleep_time = self.eom_time.dist
300 # To terminate the transmission just call and hangup, with no extra
304 logging.info("Dial and wait %.2f = %.2f + %.2f seconds "
305 "(transmitting '%s')",
306 self.call_setup_time_max + sleep_time,
307 self.call_setup_time_max,
311 # Dial, then wait self.call_setup_time_max to make sure the receiver
312 # gets at least one RING, and then hangup and sleep the time needed to
314 self.modem.send_command("ATDT" + self.destination_number + ";")
315 time.sleep(self.call_setup_time_max)
316 self.modem.send_command("ATH")
317 self.modem.get_response()
318 time.sleep(sleep_time)
320 def transmit_signal(self, signal):
321 logging.debug("Transmitting signal: %s", signal)
322 for symbol in signal:
323 self.transmit_symbol(symbol)
325 def transmit(self, message, destination_number):
326 self.destination_number = destination_number
328 morse_message = self.translator.text_to_morse(message)
329 signals = morse_message.split()
331 logging.debug("Starting the transmission")
332 for i, signal in enumerate(signals):
333 logging.debug("Transmitting '%s' as '%s'", message[i], signal)
334 self.transmit_signal(signal)
336 # Transmit a signal separator only when strictly necessary:
337 # - after the last symbol, we are going to transmit an EOM
338 # anyway, and that will mark the end of the last symbol.
339 # - between words the word separator act as a symbol separator
341 if i != len(signals) - 1 and signals[i + 1] != "/" and signal != "/":
342 self.transmit_symbol(" ")
344 self.transmit_symbol("EOM")
346 # Since the Morse signals are encoded in the distance between calls, an
347 # extra call is needed in order for receiver actually get the EOM and
348 # see that the transmission has terminated.
349 self.transmit_symbol(None)
351 def estimate_transmit_duration(self, message):
352 morsemessage = self.translator.text_to_morse(message)
353 signals = morsemessage.split()
355 logging.debug(signals)
357 transmitting_time = 0
358 for i, signal in enumerate(signals):
359 logging.debug("signal: %s", signal)
361 for symbol in signal:
362 transmitting_time += self.call_setup_time_max
364 transmitting_time += self.dot_time.dist
366 transmitting_time += self.dash_time.dist
368 transmitting_time += self.wordspace_time.dist
370 if i != len(signals) - 1 and signals[i + 1] != "/" and symbol != "/":
371 transmitting_time += self.call_setup_time_max
372 transmitting_time += self.signalspace_time.dist
374 transmitting_time += self.call_setup_time_max
375 transmitting_time += self.eom_time.dist
377 # The final call needed for the receiver to get the EOM
378 transmitting_time += self.call_setup_time_max
380 logging.debug("Estimated transmitting time: %.2f seconds",
384 def test_send_receive():
385 logging.basicConfig(level=logging.DEBUG)
386 call_setup_time_min = 0
387 call_setup_time_max = 0.01
393 class DummyModem(object):
394 """Always receive a '.', a '/' and then EOM, which results in 'E '."""
399 # Take trasmission times from a transceiver
400 self.transceiver = None
404 def send_command(self, command):
407 def get_response(self, response):
408 # pylint: disable=unused-argument
410 setup_time = random.uniform(self.transceiver.call_setup_time_min,
411 self.transceiver.call_setup_time_max)
413 if self.ring_count == 0:
416 elif self.ring_count == 1:
418 time.sleep(setup_time + self.transceiver.dot_time.dist)
419 elif self.ring_count == 2:
421 time.sleep(setup_time + self.transceiver.wordspace_time.dist)
424 time.sleep(setup_time + self.transceiver.eom_time.dist)
431 xcv = CallDistanceTransceiver(modem,
432 call_setup_time_min, call_setup_time_max,
433 ring_time_min, ring_time_max, True)
435 modem.transceiver = xcv
441 print("Message received!")
442 print("\"%s\"" % xcv.get_text(), flush=True)
445 if __name__ == "__main__":