3 # Modem - encapsulate serial modem functionalities
5 # Copyright (C) 2015 Antonio Ospite <ao2@ao2.it>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 def __init__(self, serialport, baudrate=9600):
28 self.init_commands = ["Z", "X3"]
32 self.open(serialport, baudrate)
36 """Scan for available hardware serial ports.
38 The code does not scan for virtual serial ports (e.g. /dev/ttyACM*,
39 /dev/ttyUSB* on linux).
42 a dict {portname: num}
47 port = serial.Serial(i)
48 available[port.portstr] = i
50 except (OSError, serial.SerialException):
54 def open(self, port, baudrate):
55 self.serial = serial.Serial(port, baudrate=baudrate,
65 for command in self.init_commands:
66 self.send_command(command)
67 self.get_response("OK")
70 self.serial.flushInput()
71 self.serial.flushOutput()
73 def convert_cr_lf(self, command):
74 if command.endswith("\r\n"):
76 elif command.endswith("\n"):
77 command = command[:-1] + "\r\n"
78 elif command.endswith("\r"):
85 def send_command(self, command):
86 if command.upper().startswith("AT"):
89 command = "AT" + command
90 command = self.convert_cr_lf(command)
91 logging.debug("AT command: %s", repr(command))
94 self.serial.write(bytes(command, 'UTF-8'))
96 def get_response(self, expected=None):
97 std_responses = ["OK", "RING", "ERROR", "BUSY", "NO CARRIER",
103 data = self.serial.read(1)
105 line += data.decode('UTF-8')
106 if line.endswith("\r\n"):
107 # strip '\r', '\n' and other spaces
108 line = re.sub(r"\s+$", "", line)
109 line = re.sub(r"^\s+", "", line)
112 resultlist.append(line)
114 # Only react on std_responses when NOT asked explicitly
115 # for an expected response
116 if (expected and (line == expected)) or \
117 (not expected and (line in std_responses)):
118 if line in self.callbacks:
119 func = self.callbacks[line]
125 def get_response_loop(self):
129 def register_callback(self, response, callback_func):
130 self.callbacks[response] = callback_func
133 def test(port, number):
134 logging.basicConfig(level=logging.DEBUG)
137 print("Received an OK")
140 print("Phone Ringing")
144 def test_send(port, number):
147 modem.register_callback("OK", on_ok)
153 time_difference = time2 - time1
154 print("time difference: %.2f" % time_difference)
156 modem.send_command("ATDT" + number)
161 print("sleep %.2f" % call_setup_time)
162 time.sleep(call_setup_time)
164 modem.send_command("ATH")
167 print("sleep %.2f" % inter_call_sleep)
168 time.sleep(inter_call_sleep)
172 def test_receive(port):
175 modem.register_callback("OK", on_ok)
177 modem.send_command("I1")
178 response = modem.get_response()
179 print("response:", response)
181 modem.register_callback("RING", on_ring)
182 modem.get_response_loop()
184 test_send(port, number)
188 if __name__ == "__main__":
191 if len(sys.argv) != 3:
192 print("usage: %s <serial port> <destination number>" % sys.argv[0])
194 test(sys.argv[1], sys.argv[2])