073b74229cc614067b27bb38fddc57c668ab39fa
[SaveMySugar/python3-savemysugar.git] / src / savemysugar / Modem.py
1 #!/usr/bin/env python3
2 #
3 # Modem - encapsulate serial modem functionalities
4 #
5 # Copyright (C) 2015  Antonio Ospite <ao2@ao2.it>
6 #
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20 import logging
21 import re
22 import serial
23
24
25 class Modem(object):
26     def __init__(self, serialport, baudrate=9600):
27         self.timeout = 0.1
28         self.init_commands = ["Z", "X3"]
29
30         self.callbacks = {}
31
32         self.open(serialport, baudrate)
33
34     @staticmethod
35     def scan():
36         """Scan for available hardware serial ports.
37
38         The code does not scan for virtual serial ports (e.g. /dev/ttyACM*,
39         /dev/ttyUSB* on linux).
40
41         Return:
42             a dict {portname: num}
43         """
44         available = {}
45         for i in range(256):
46             try:
47                 port = serial.Serial(i)
48                 available[port.portstr] = i
49                 port.close()
50             except (OSError, serial.SerialException):
51                 pass
52         return available
53
54     def open(self, port, baudrate):
55         self.serial = serial.Serial(port, baudrate=baudrate,
56                                     timeout=self.timeout,
57                                     rtscts=0, xonxoff=0)
58         self.reset()
59
60     def close(self):
61         self.reset()
62         self.serial.close()
63
64     def reset(self):
65         for command in self.init_commands:
66             self.send_command(command)
67             self.get_response("OK")
68
69     def flush(self):
70         self.serial.flushInput()
71         self.serial.flushOutput()
72
73     def convert_cr_lf(self, command):
74         if command.endswith("\r\n"):
75             pass
76         elif command.endswith("\n"):
77             command = command[:-1] + "\r\n"
78         elif command.endswith("\r"):
79             command += "\n"
80         else:
81             command += "\r\n"
82
83         return command
84
85     def send_command(self, command):
86         if command.upper().startswith("AT"):
87             command = command[2:]
88
89         command = "AT" + command
90         command = self.convert_cr_lf(command)
91         logging.debug("AT command: %s", repr(command))
92
93         self.flush()
94         self.serial.write(bytes(command, 'UTF-8'))
95
96     def get_response(self, expected=None):
97         std_responses = ["OK", "RING", "ERROR", "BUSY", "NO CARRIER",
98                          "NO DIALTONE"]
99
100         resultlist = []
101         line = ""
102         while True:
103             data = self.serial.read(1)
104             if len(data):
105                 line += data.decode('UTF-8')
106                 if line.endswith("\r\n"):
107                     # strip '\r', '\n' and other spaces
108                     line = re.sub(r"\s+$", "", line)
109                     line = re.sub(r"^\s+", "", line)
110
111                     if len(line):
112                         resultlist.append(line)
113
114                         # Only react on std_responses when NOT asked explicitly
115                         # for an expected response
116                         if (expected and (line == expected)) or \
117                                 (not expected and (line in std_responses)):
118                             if line in self.callbacks:
119                                 func = self.callbacks[line]
120                                 func()
121                             return resultlist
122
123                         line = ""
124
125     def get_response_loop(self):
126         while True:
127             self.get_response()
128
129     def register_callback(self, response, callback_func):
130         self.callbacks[response] = callback_func
131
132
133 def test(port, number):
134     logging.basicConfig(level=logging.DEBUG)
135
136     def on_ok():
137         print("Received an OK")
138
139     def on_ring():
140         print("Phone Ringing")
141
142     import time
143
144     def test_send(port, number):
145         modem = Modem(port)
146
147         modem.register_callback("OK", on_ok)
148
149         time1 = 0
150         time2 = 0
151
152         for i in range(3):
153             time_difference = time2 - time1
154             print("time difference: %.2f" % time_difference)
155             time1 = time.time()
156             modem.send_command("ATDT" + number)
157
158             call_setup_time = 9
159             inter_call_sleep = 5
160
161             print("sleep %.2f" % call_setup_time)
162             time.sleep(call_setup_time)
163
164             modem.send_command("ATH")
165             modem.get_response()
166
167             print("sleep %.2f" % inter_call_sleep)
168             time.sleep(inter_call_sleep)
169
170             time2 = time.time()
171
172     def test_receive(port):
173         modem = Modem(port)
174
175         modem.register_callback("OK", on_ok)
176
177         modem.send_command("I1")
178         response = modem.get_response()
179         print("response:", response)
180
181         modem.register_callback("RING", on_ring)
182         modem.get_response_loop()
183
184     test_send(port, number)
185     test_receive(port)
186
187
188 if __name__ == "__main__":
189     import sys
190     print(Modem.scan())
191     if len(sys.argv) != 3:
192         print("usage: %s <serial port> <destination number>" % sys.argv[0])
193         sys.exit(1)
194     test(sys.argv[1], sys.argv[2])