1#!/usr/bin/env python 2########################################################################### 3# odb_io.py 4# 5# Copyright 2004 Donour Sizemore (donour@uchicago.edu) 6# Copyright 2009 Secons Ltd. (www.obdtester.com) 7# 8# This file is part of pyOBD. 9# 10# pyOBD is free software; you can redistribute it and/or modify 11# it under the terms of the GNU General Public License as published by 12# the Free Software Foundation; either version 2 of the License, or 13# (at your option) any later version. 14# 15# pyOBD is distributed in the hope that it will be useful, 16# but WITHOUT ANY WARRANTY; without even the implied warranty of 17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18# GNU General Public License for more details. 19# 20# You should have received a copy of the GNU General Public License 21# along with pyOBD; if not, write to the Free Software 22# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 23########################################################################### 24 25import serial 26import string 27import time 28from math import ceil 29import wx #due to debugEvent messaging 30 31import obd_sensors 32 33from obd_sensors import hex_to_int 34 35GET_DTC_COMMAND = "03" 36CLEAR_DTC_COMMAND = "04" 37GET_FREEZE_DTC_COMMAND = "07" 38 39from debugEvent import * 40 41#__________________________________________________________________________ 42def decrypt_dtc_code(code): 43 """Returns the 5-digit DTC code from hex encoding""" 44 dtc = [] 45 current = code 46 for i in range(0,3): 47 if len(current)<4: 48 raise "Tried to decode bad DTC: %s" % code 49 50 tc = obd_sensors.hex_to_int(current[0]) #typecode 51 tc = tc >> 2 52 if tc == 0: 53 type = "P" 54 elif tc == 1: 55 type = "C" 56 elif tc == 2: 57 type = "B" 58 elif tc == 3: 59 type = "U" 60 else: 61 raise tc 62 63 dig1 = str(obd_sensors.hex_to_int(current[0]) & 3) 64 dig2 = str(obd_sensors.hex_to_int(current[1])) 65 dig3 = str(obd_sensors.hex_to_int(current[2])) 66 dig4 = str(obd_sensors.hex_to_int(current[3])) 67 dtc.append(type+dig1+dig2+dig3+dig4) 68 current = current[4:] 69 return dtc 70#__________________________________________________________________________ 71 72class OBDPort: 73 """ OBDPort abstracts all communication with OBD-II device.""" 74 def __init__(self,portnum,_notify_window,SERTIMEOUT,RECONNATTEMPTS): 75 """Initializes port by resetting device and gettings supported PIDs. """ 76 # These should really be set by the user. 77 baud = 9600 78 databits = 8 79 par = serial.PARITY_NONE # parity 80 sb = 1 # stop bits 81 to = SERTIMEOUT 82 self.ELMver = "Unknown" 83 self.State = 1 #state SERIAL is 1 connected, 0 disconnected (connection failed) 84 85 self._notify_window=_notify_window 86 wx.PostEvent(self._notify_window, DebugEvent([1,"Opening interface (serial port)"])) 87 88 try: 89 self.port = serial.Serial(portnum,baud, \ 90 parity = par, stopbits = sb, bytesize = databits,timeout = to) 91 92 except serial.SerialException: 93 self.State = 0 94 return None 95 96 wx.PostEvent(self._notify_window, DebugEvent([1,"Interface successfully " + self.port.portstr + " opened"])) 97 wx.PostEvent(self._notify_window, DebugEvent([1,"Connecting to ECU..."])) 98 99 count=0 100 while 1: #until error is returned try to connect 101 try: 102 self.send_command("atz") # initialize 103 except serial.SerialException: 104 self.State = 0 105 return None 106 107 self.ELMver = self.get_result() 108 wx.PostEvent(self._notify_window, DebugEvent([2,"atz response:" + self.ELMver])) 109 self.send_command("ate0") # echo off 110 wx.PostEvent(self._notify_window, DebugEvent([2,"ate0 response:" + self.get_result()])) 111 self.send_command("0100") 112 ready = self.get_result() 113 wx.PostEvent(self._notify_window, DebugEvent([2,"0100 response1:" + ready])) 114 if ready=="BUSINIT: ...OK": 115 ready=self.get_result() 116 wx.PostEvent(self._notify_window, DebugEvent([2,"0100 response2:" + ready])) 117 return None 118 else: 119 #ready=ready[-5:] #Expecting error message: BUSINIT:.ERROR (parse last 5 chars) 120 wx.PostEvent(self._notify_window, DebugEvent([2,"Connection attempt failed:" + ready])) 121 time.sleep(5) 122 if count==RECONNATTEMPTS: 123 self.close() 124 self.State = 0 125 return None 126 wx.PostEvent(self._notify_window, DebugEvent([2,"Connection attempt:" + str(count)])) 127 count=count+1 128 129 def close(self): 130 """ Resets device and closes all associated filehandles""" 131 132 if (self.port!= None) and self.State==1: 133 self.send_command("atz") 134 self.port.close() 135 136 self.port = None 137 self.ELMver = "Unknown" 138 139 def send_command(self, cmd): 140 """Internal use only: not a public interface""" 141 if self.port: 142 self.port.flushOutput() 143 self.port.flushInput() 144 for c in cmd: 145 self.port.write(c) 146 self.port.write("\r\n") 147 wx.PostEvent(self._notify_window, DebugEvent([3,"Send command:" + cmd])) 148 149 def interpret_result(self,code): 150 """Internal use only: not a public interface""" 151 # Code will be the string returned from the device. 152 # It should look something like this: 153 # '41 11 0 0\r\r' 154 155 # 9 seems to be the length of the shortest valid response 156 if len(code) < 7: 157 raise "BogusCode" 158 159 # get the first thing returned, echo should be off 160 code = string.split(code, "\r") 161 code = code[0] 162 163 #remove whitespace 164 code = string.split(code) 165 code = string.join(code, "") 166 167 #cables can behave differently 168 if code[:6] == "NODATA": # there is no such sensor 169 return "NODATA" 170 171 # first 4 characters are code from ELM 172 code = code[4:] 173 return code 174 175 def get_result(self): 176 """Internal use only: not a public interface""" 177 time.sleep(0.1) 178 if self.port: 179 buffer = "" 180 while 1: 181 c = self.port.read(1) 182 if c == '\r' and len(buffer) > 0: 183 break 184 else: 185 if buffer != "" or c != ">": #if something is in buffer, add everything 186 buffer = buffer + c 187 wx.PostEvent(self._notify_window, DebugEvent([3,"Get result:" + buffer])) 188 return buffer 189 else: 190 wx.PostEvent(self._notify_window, DebugEvent([3,"NO self.port!" + buffer])) 191 return None 192 193 # get sensor value from command 194 def get_sensor_value(self,sensor): 195 """Internal use only: not a public interface""" 196 cmd = sensor.cmd 197 self.send_command(cmd) 198 data = self.get_result() 199 200 if data: 201 data = self.interpret_result(data) 202 if data != "NODATA": 203 data = sensor.value(data) 204 else: 205 return "NORESPONSE" 206 return data 207 208 # return string of sensor name and value from sensor index 209 def sensor(self , sensor_index): 210 """Returns 3-tuple of given sensors. 3-tuple consists of 211 (Sensor Name (string), Sensor Value (string), Sensor Unit (string) ) """ 212 sensor = obd_sensors.SENSORS[sensor_index] 213 r = self.get_sensor_value(sensor) 214 return (sensor.name,r, sensor.unit) 215 216 def sensor_names(self): 217 """Internal use only: not a public interface""" 218 names = [] 219 for s in obd_sensors.SENSORS: 220 names.append(s.name) 221 return names 222 223 def get_tests_MIL(self): 224 statusText=["Unsupported","Supported - Completed","Unsupported","Supported - Incompleted"] 225 226 statusRes = self.sensor(1)[1] #GET values 227 statusTrans = [] #translate values to text 228 229 statusTrans.append(str(statusRes[0])) #DTCs 230 231 if statusRes[1]==0: #MIL 232 statusTrans.append("Off") 233 else: 234 statusTrans.append("On") 235 236 for i in range(2,len(statusRes)): #Tests 237 statusTrans.append(statusText[statusRes[i]]) 238 239 return statusTrans 240 241 # 242 # fixme: j1979 specifies that the program should poll until the number 243 # of returned DTCs matches the number indicated by a call to PID 01 244 # 245 def get_dtc(self): 246 """Returns a list of all pending DTC codes. Each element consists of 247 a 2-tuple: (DTC code (string), Code description (string) )""" 248 dtcLetters = ["P", "C", "B", "U"] 249 r = self.sensor(1)[1] #data 250 dtcNumber = r[0] 251 mil = r[1] 252 DTCCodes = [] 253 254 255 print "Number of stored DTC:" + str(dtcNumber) + " MIL: " + str(mil) 256 # get all DTC, 3 per mesg response 257 for i in range(0, ((dtcNumber+2)/3)): 258 self.send_command(GET_DTC_COMMAND) 259 res = self.get_result() 260 print "DTC result:" + res 261 for i in range(0, 3): 262 val1 = hex_to_int(res[3+i*6:5+i*6]) 263 val2 = hex_to_int(res[6+i*6:8+i*6]) #get DTC codes from response (3 DTC each 2 bytes) 264 val = (val1<<8)+val2 #DTC val as int 265 266 if val==0: #skip fill of last packet 267 break 268 269 DTCStr=dtcLetters[(val&0xC000)>14]+str((val&0x3000)>>12)+str(val&0x0fff) 270 271 DTCCodes.append(["Active",DTCStr]) 272 273 #read mode 7 274 self.send_command(GET_FREEZE_DTC_COMMAND) 275 res = self.get_result() 276 277 if res[:7] == "NO DATA": #no freeze frame 278 return DTCCodes 279 280 print "DTC freeze result:" + res 281 for i in range(0, 3): 282 val1 = hex_to_int(res[3+i*6:5+i*6]) 283 val2 = hex_to_int(res[6+i*6:8+i*6]) #get DTC codes from response (3 DTC each 2 bytes) 284 val = (val1<<8)+val2 #DTC val as int 285 286 if val==0: #skip fill of last packet 287 break 288 289 DTCStr=dtcLetters[(val&0xC000)>14]+str((val&0x3000)>>12)+str(val&0x0fff) 290 DTCCodes.append(["Passive",DTCStr]) 291 292 return DTCCodes 293 294 def clear_dtc(self): 295 """Clears all DTCs and freeze frame data""" 296 self.send_command(CLEAR_DTC_COMMAND) 297 r = self.get_result() 298 return r 299 300 def log(self, sensor_index, filename): 301 file = open(filename, "w") 302 start_time = time.time() 303 if file: 304 data = self.sensor(sensor_index) 305 file.write("%s \t%s(%s)\n" % \ 306 ("Time", string.strip(data[0]), data[2])) 307 while 1: 308 now = time.time() 309 data = self.sensor(sensor_index) 310 line = "%.6f,\t%s\n" % (now - start_time, data[1]) 311 file.write(line) 312 file.flush() 313