#!/usr/bin/env python3 # # Modem - encapsulate serial modem functionalities # # Copyright (C) 2015 Antonio Ospite # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging import re import serial class Modem(object): def __init__(self, serialport, baudrate=9600): self.timeout = 0.1 self.init_commands = ["Z", "X3"] self.callbacks = {} self.open(serialport, baudrate) @staticmethod def scan(): """Scan for available hardware serial ports. The code does not scan for virtual serial ports (e.g. /dev/ttyACM*, /dev/ttyUSB* on linux). Return: a dict {portname: num} """ available = {} for i in range(256): try: port = serial.Serial(i) available[port.portstr] = i port.close() except (OSError, serial.SerialException): pass return available def open(self, port, baudrate): self.serial = serial.Serial(port, baudrate=baudrate, timeout=self.timeout, rtscts=0, xonxoff=0) self.reset() def close(self): self.reset() self.serial.close() def reset(self): for command in self.init_commands: self.send_command(command) self.get_response("OK") def flush(self): self.serial.flushInput() self.serial.flushOutput() def convert_cr_lf(self, command): if command.endswith("\r\n"): pass elif command.endswith("\n"): command = command[:-1] + "\r\n" elif command.endswith("\r"): command += "\n" else: command += "\r\n" return command def send_command(self, command): if command.upper().startswith("AT"): command = command[2:] command = "AT" + command command = self.convert_cr_lf(command) logging.debug("AT command: %s", repr(command)) self.flush() self.serial.write(bytes(command, 'UTF-8')) def get_response(self, expected=None): std_responses = ["OK", "RING", "ERROR", "BUSY", "NO CARRIER", "NO DIALTONE"] resultlist = [] line = "" while True: data = self.serial.read(1) if len(data): line += data.decode('UTF-8') if line.endswith("\r\n"): # strip '\r', '\n' and other spaces line = re.sub(r"\s+$", "", line) line = re.sub(r"^\s+", "", line) if len(line): resultlist.append(line) # Only react on std_responses when NOT asked explicitly # for an expected response if (expected and (line == expected)) or \ (not expected and (line in std_responses)): if line in self.callbacks: func = self.callbacks[line] func() return resultlist line = "" def get_response_loop(self): while True: self.get_response() def register_callback(self, response, callback_func): self.callbacks[response] = callback_func def test(port, number): logging.basicConfig(level=logging.DEBUG) def on_ok(): print("Received an OK") def on_ring(): print("Phone Ringing") import time def test_send(port, number): modem = Modem(port) modem.register_callback("OK", on_ok) time1 = 0 time2 = 0 for i in range(3): time_difference = time2 - time1 print("time difference: %.2f" % time_difference) time1 = time.time() modem.send_command("ATDT" + number + ";") call_setup_time = 9 inter_call_sleep = 5 print("sleep %.2f" % call_setup_time) time.sleep(call_setup_time) modem.send_command("ATH") modem.get_response() print("sleep %.2f" % inter_call_sleep) time.sleep(inter_call_sleep) time2 = time.time() def test_receive(port): modem = Modem(port) modem.register_callback("OK", on_ok) modem.send_command("I1") response = modem.get_response() print("response:", response) modem.register_callback("RING", on_ring) modem.get_response_loop() test_send(port, number) test_receive(port) if __name__ == "__main__": import sys print(Modem.scan()) if len(sys.argv) != 3: print("usage: %s " % sys.argv[0]) sys.exit(1) test(sys.argv[1], sys.argv[2])