# TODO: window positioning from thread import * import random import struct from crc8 import crc8 as checksum import time import string # used in debug only def y_test((_d1,y1,_d2), (_d3,y2,_d4)): if y1 < y2: return -1 elif y1 == y2: return 0 else: return 1 class VMSRestarted(IOError): pass class VMSError(IOError): error_code = 0 def __init__(self, error_code): self.error_code = error_code class VMS: cs = allocate_lock() state = 0 state_text = ('ready', 'measuring', 'transfer calibration', 'laser operations') serial_no = (2*10)*'00' test = {} # self.test results fd = 0 _image_id_tag = int(time.time()) image_files = ['Foggy1.jpg'] def get_image_data(self, types): self._state(1) image_list = self._get_image_data(types) self._state(0) return image_list def get_image_files(self, types): self._state(1) image_list = self._get_image_data(types) files = [] for (id, file_type, data) in image_list: filename = tempfile.mktemp("." + file_type) wfd = os.open(filename, "wb") wfd.write(data) wfd.close() files.append(filename) self.image_files = files self._state(0) return files def measure(self, laser): self._state(1) self.laser_on(laser) self._request_measure() (used_laser, measured) = self._read_measure() assert used_laser == laser, "VMS: Measure data arriving from wrong laser (%d, expected %d)" % (used_laser, laser) self.laser_off(laser) self._state(0) return measured def raw_measure(self, laser): self._state(1) self.laser_on(laser) self._request_object_list() (used_laser, measured) = self._read_object_list() assert used_laser == laser, "VMS: Measure data arriving from wrong laser (%d, expected %d)" % (used_laser, laser) self._read_stop_lists() # always follows a object list self.laser_off(laser) self._state(0) return measured def ready(self): return state == 0 def set_calibration(self, laser, x_calib_params, y_calib_params): self._state(2) # TODO self._state(0) return def get_calibration(self, laser): self._state(2) # TODO self._state(0) return 11*(laser,) def modes(self): return [1, 2] # Lasers def laser_on(self, laser): save_state = self._state(3) assert laser in self.modes(), "Error: trying to lit laser %d" % laser self._set_laser(laser) self._state(save_state) print "Laser ", laser, " on" def laser_off(self, laser): save_state = self._state(3) self._set_laser(0) self._state(save_state) print "Laser ", laser, " off" ################################################################ # Protocol functions # # # 1: Upload image # _formats = ['', 'raw', 'gif', 'jpg'] _types = ['', 'Binary laser image', 'Binary background image', 'Grayscale laser image', 'Grayscale background image'] _types_formats = (0, 1, 1, 3, 3) def _get_images(self, request_list, format_list=[], expose=1, count=1): """ Example: _request_images([(52, 'Grayscale laser image')]) returns: [(52, 'JPEG', ...)] """ data = struct.pack("BB", expose, count) # make format_list equal lengt of request_list if len(format_list) < len(request_list): format_list += (len(request_list) - len(format_list)) * [''] ### BUG in Python? #for (id, t), f in request_list, format_list: request_list = map((lambda (id, t), f: (id, t, f)), request_list, format_list) binary_request_list = [] for (id, t, f) in request_list: # convert to ints t = self._types.index(t) if f == '': f = self._types_formats[t] else: f = self._formats.index(f) binary_request_list.append( (id, t, f) ) # pack data += struct.pack("BBB", t, id, f) self._write_msg(1, data) images = [] for (id, _, f) in binary_request_list: images.append( (id, self._formats[f], self._read_image(id)) ) return images def _read_image(self, id): no = 0 packet = self._read_packet(id, no) while packet.len() > 0: # each image ends with an empty packet data += packet no += 1 packet = self._read_packet(id, no) return data def _read_packet(self, id, no): packet = self._read_msg(1) (read_id, read_no) = struct.unpack("BB", packet[0:2]) assert read_id == id, "VMS: Identity mismatch, expected %d got %d" % (id, read_id) assert read_no == no, "VMS: Number mismatch, expected %d got %d" % (no, read_no) return packet[2:] # # 4: Set laser line on/off # def _set_laser(self): data = struct.pack("B", self._laser) self._write_msg(4, data) data = self._read_msg(4) # # 10: Lists (X,Y,width) # def _request_measure(self, no_of_lists=1): """ no_of_lists == 0: means continously """ data = struct.pack("B", no_of_lists) self._write_msg(10, data) def _read_measure(self): """ read object list """ (type, len, data) = self._read_msg(10) laser = ord(data[0]) measured = [] for ix in range(1, len, 5): object = struct.unpack("') f.write('


Currently ' + self.state_text[self.state] + '

') f.write('') f.write('
') f.write('
') f.write('

Actions

') f.write('') f.write('') f.write('
') f.write('

') f.write('

Constants

') f.write('Serian no: 0x' + self.serial_no) f.write('
Self test: ' + str(self.test)) f.write('
') image_files = self.image_files if image_files: f.write('
') for file in image_files: f.write('' % file) # # Other helper functions # def __init__(self, fd): self.fd = fd def _state(self, state): """ Set new state, return old (internal use) """ self.cs.acquire() save_state = self.state self.state = state self.cs.release() return save_state def _next_id_tag(self): self.cs.acquire() tag = self._image_id_tag = (self._image_id_tag + 1) % 256 self.cs.release() return tag def _get_image_data(self, types): request_list = [] for t in types: tag = self._next_id_tag() request_list.append( (tag, t) ) image_list = self._get_images(request_list) return image_list class SimulatedVMS(VMS): def get_image_files(self, types): self.image_files = ['Foggy1.jpg', 'Marble01.jpg'] return self.image_files def measure(self, laser): self.laser_on(laser) measured = [] for item in range(10): x_mm = 20 * item y_mm = 52 + random.randrange(-5, 5) if item == 4: w_mm = random.randrange(9,11) else: w_mm = random.randrange(6,8) measured.append((x_mm,y_mm,w_mm)) self.laser_off(laser) return measured def raw_measure(self, laser): self.laser_on(laser) measured = [] for item in range(10): xpix = 10 * item + random.randrange(-1,1) ypix = 52 + random.randrange(-5, 5) if item == 4: wpix = random.randrange(30, 50) else: wpix = random.randrange(15, 30) measured.append((xpix,ypix,wpix)) self.laser_off(laser) measured.sort(y_test) return measured def _set_laser(self, laser): pass def _read_msg(self, expected_type=0): if expected_type == 1: data = struct.pack("BB", 0, 0) elif expected_type == 4: data = "" elif expected_type == 10: data = struct.pack("B", 1) # list elif expected_type == 16: data = 18*'\0' # why not... :-) elif expected_type == 64: data = "" # TODO: implement else: print 'Warning: unexpected type', expected_type return self._read_msg(16) data = struct.pack("BBB", 255, expected_type, len(data)) + data return data