# nxt.__init__ module -- LEGO Mindstorms NXT python package # Copyright (C) 2006 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # nxt.bluesock module -- Bluetooth socket communication with LEGO Minstorms NXT # Copyright (C) 2006-2007 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. import bluetooth import os from nxt.brick import Brick class BlueSock?(object): bsize = 118 # Bluetooth socket block size PORT = 1 # Standard NXT rfcomm port def __init__(self, host): self.host = host self.sock = None self.debug = False def __str__(self): return 'Bluetooth (%s)' % self.host def connect(self): sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) sock.connect((self.host, BlueSock.PORT)) self.sock = sock return Brick(self) def close(self): self.sock.close() def send(self, data): if self.debug: print 'Send:', print ':'.join('%02x' % ord(c) for c in data) l0 = len(data) & 0xFF l1 = (len(data) >> 8) & 0xFF d = chr(l0) + chr(l1) + data self.sock.send(d) def recv(self): data = self.sock.recv(2) l0 = ord(data[0]) l1 = ord(data[1]) plen = l0 + (l1 << 8) data = self.sock.recv(plen) if self.debug: print 'Recv:', print ':'.join('%02x' % ord(c) for c in data) return data def _check_brick(arg, value): return arg is None or arg == value def find_bricks(host=None, name=None): for h, n in bluetooth.discover_devices(lookup_names=True): if _check_brick(host, h) and _check_brick(name, n): yield BlueSock(h) # nxt.brick module -- Classes to represent LEGO Mindstorms NXT bricks # Copyright (C) 2006 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. from time import sleep from nxt.error import FileNotFound?, ModuleNotFound? from nxt.telegram import OPCODES, Telegram def _make_poller(opcode, poll_func, parse_func): def poll(self, *args, **kwargs): ogram = poll_func(opcode, *args, **kwargs) self.sock.send(str(ogram)) igram = Telegram(opcode=opcode, pkt=self.sock.recv()) return parse_func(igram) return poll class _Meta(type): 'Metaclass which adds one method for each telegram opcode' def __init__(cls, name, bases, dict): super(_Meta, cls).__init__(name, bases, dict) for opcode in OPCODES: poll_func, parse_func = OPCODES[opcode] m = _make_poller(opcode, poll_func, parse_func) setattr(cls, poll_func.__name__, m) class Brick(object): __metaclass__ = _Meta def __init__(self, sock): self.sock = sock def play_tone_and_wait(self, frequency, duration): self.play_tone(frequency, duration) sleep(duration / 1000.0) class FileFinder?(object): 'Context manager to find files on a NXT brick' def __init__(self, brick, pattern): self.brick = brick self.pattern = pattern self.handle = None def __enter__(self): return self def __exit__(self, etp, value, tb): if self.handle: self.brick.close(self.handle) def __iter__(self): self.handle, fname, size = self.brick.find_first(self.pattern) yield (fname, size) while True: try: handle, fname, size = self.brick.find_next( self.handle) yield (fname, size) except FileNotFound: break class FileReader?(object): 'Context manager to read a file on a NXT brick' def __init__(self, brick, fname): self.brick = brick self.fname = fname def __enter__(self): self.handle, self.size = self.brick.open_read(self.fname) return self def __exit__(self, etp, value, tb): self.brick.close(self.handle) def __iter__(self): rem = self.size bsize = self.brick.sock.bsize while rem > 0: handle, bsize, data = self.brick.read(self.handle, min(bsize, rem)) yield data rem -= len(data) class FileWriter?(object): 'Context manager to write a file to a NXT brick' def __init__(self, brick, fname, fil): self.brick = brick self.fname = fname self.fil = fil fil.seek(0, 2) # seek to end of file self.size = fil.tell() fil.seek(0) # seek to start of file def __enter__(self): self.handle = self.brick.open_write(self.fname, self.size) return self def __exit__(self, etp, value, tb): self.brick.close(self.handle) def __iter__(self): rem = self.size bsize = self.brick.sock.bsize while rem > 0: b = min(bsize, rem) handle, size = self.brick.write(self.handle, self.fil.read(b)) yield size rem -= b class ModuleFinder?(object): 'Context manager to lookup modules on a NXT brick' def __init__(self, brick, pattern): self.brick = brick self.pattern = pattern self.handle = None def __enter__(self): return self def __exit__(self, etp, value, tb): if self.handle: self.brick.close(self.handle) def __iter__(self): self.handle, mname, mid, msize, miomap_size = \ self.brick.request_first_module(self.pattern) yield (mname, mid, msize, miomap_size) while True: try: handle, mname, mid, msize, miomap_size = \ self.brick.request_next_module( self.handle) yield (mname, mid, msize, miomap_size) except ModuleNotFound: break # nxt.compass module -- Classes to read Mindsensors Compass sensors # Copyright (C) 2007 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. import sensor from time import sleep class Command(object): # NOTE: just a namespace (enumeration) AUTO_TRIG_ON = (0x41, 0x02) AUTO_TRIG_OFF = (0x53, 0x01) MAP_HEADING_BYTE = 0x42 # map heading to 0-255 range MAP_HEADING_INTEGER = 0x49 # map heading to 0-36000 range SAMPLING_50_HZ = 0x45 # set sampling frequency to 50 Hz SAMPLING_60_HZ = 0x55 # set sampling frequency to 60 Hz SET_ADPA_MODE_ON = 0x4E # set ADPA mode on SET_ADPA_MODE_OFF = 0x4F # set ADPA mode off BEGIN_CALIBRATION = 0x43 # begin calibration DONE_CALIBRATION = 0x44 # done with calibration LOAD_USER_CALIBRATION = 0x4C # load user calibration value # I2C addresses for a Mindsensors CMPS-Nx compass sensor I2C_ADDRESS_CMPS_NX = { 0x41: ('command', 1, True), 0x42: ('heading_lsb', 1, False), 0x43: ('heading_msb', 1, False), 0x44: ('x_offset_lsb', 1, True), 0x45: ('x_offset_msb', 1, True), 0x46: ('y_offset_lsb', 1, True), 0x47: ('y_offset_msb', 1, True), 0x48: ('x_range_lsb', 1, True), 0x49: ('x_range_msb', 1, True), 0x4A: ('y_range_lsb', 1, True), 0x4B: ('y_range_msb', 1, True), 0x4C: ('x_raw_lsb', 1, True), 0x4D: ('x_raw_msb', 1, True), 0x4E: ('y_raw_lsb', 1, True), 0x4F: ('y_raw_msb', 1, True), } class _MetaCMPS_Nx(sensor._Meta): 'Metaclass which adds accessor methods for CMPS-Nx I2C addresses' def __init__(cls, name, bases, dict): super(_MetaCMPS_Nx, cls).__init__(name, bases, dict) for address in I2C_ADDRESS_CMPS_NX: name, n_bytes, set_method = I2C_ADDRESS_CMPS_NX[address] q = sensor._make_query(address, n_bytes) setattr(cls, 'get_' + name, q) if set_method: c = sensor._make_command(address) setattr(cls, 'set_' + name, c) class CompassSensor?(sensor.DigitalSensor?): __metaclass__ = _MetaCMPS_Nx def __init__(self, brick, port): super(CompassSensor, self).__init__(brick, port) self.sensor_type = Type.LOW_SPEED_9V self.mode = Mode.RAW self.set_input_mode() sleep(0.1) # Give I2C time to initialize CompassSensor?.get_sample = CompassSensor?.get_heading_lsb # nxt.direct module -- LEGO Mindstorms NXT direct telegrams # Copyright (C) 2006-2007 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. def _create(opcode): 'Create a simple direct telegram' from telegram import Telegram return Telegram(True, opcode) def start_program(opcode, fname): tgram = _create(opcode) tgram.add_filename(fname) return tgram def _parse_simple(tgram): tgram.check_status() def stop_program(opcode): return _create(opcode) def play_sound_file(opcode, loop, fname): tgram = _create(opcode) tgram.add_u8(loop) tgram.add_filename(fname) return tgram def play_tone(opcode, frequency, duration): 'Play a tone at frequency (Hz) for duration (ms)' tgram = _create(opcode) tgram.add_u16(frequency) tgram.add_u16(duration) return tgram def set_output_state(opcode, port, power, mode, regulation, turn_ratio, run_state, tacho_limit): tgram = _create(opcode) tgram.add_u8(port) tgram.add_s8(power) tgram.add_u8(mode) tgram.add_u8(regulation) tgram.add_s8(turn_ratio) tgram.add_u8(run_state) tgram.add_u32(tacho_limit) return tgram def set_input_mode(opcode, port, sensor_type, sensor_mode): tgram = _create(opcode) tgram.add_u8(port) tgram.add_u8(sensor_type) tgram.add_u8(sensor_mode) return tgram def get_output_state(opcode, port): tgram = _create(opcode) tgram.add_u8(port) return tgram def _parse_get_output_state(tgram): tgram.check_status() port = tgram.parse_u8() power = tgram.parse_s8() mode = tgram.parse_u8() regulation = tgram.parse_u8() turn_ratio = tgram.parse_s8() run_state = tgram.parse_u8() tacho_limit = tgram.parse_u32() tacho_count = tgram.parse_s32() block_tacho_count = tgram.parse_s32() rotation_count = tgram.parse_s32() return (port, power, mode, regulation, turn_ratio, run_state, tacho_limit, tacho_count, block_tacho_count, rotation_count) def get_input_values(opcode, port): tgram = _create(opcode) tgram.add_u8(port) return tgram def _parse_get_input_values(tgram): tgram.check_status() port = tgram.parse_u8() valid = tgram.parse_u8() calibrated = tgram.parse_u8() sensor_type = tgram.parse_u8() sensor_mode = tgram.parse_u8() raw_ad_value = tgram.parse_u16() normalized_ad_value = tgram.parse_u16() scaled_value = tgram.parse_s16() calibrated_value = tgram.parse_s16() return (port, valid, calibrated, sensor_type, sensor_mode, raw_ad_value, normalized_ad_value, scaled_value, calibrated_value) def reset_input_scaled_value(opcode, port): tgram = _create(opcode) tgram.add_u8(port) return tgram def message_write(opcode, inbox, message): tgram = _create(opcode) tgram.add_u8(inbox) tgram.add_u8(len(message) + 1) tgram.add_string(len(message), message) tgram.add_u8(0) return tgram def reset_motor_position(opcode, port, relative): tgram = _create(opcode) tgram.add_u8(port) tgram.add_u8(relative) return tgram def get_battery_level(opcode): return _create(opcode) def _parse_get_battery_level(tgram): tgram.check_status() millivolts = tgram.parse_u16() return millivolts def stop_sound_playback(opcode): return _create(opcode) def keep_alive(opcode): return _create(opcode) def _parse_keep_alive(tgram): tgram.check_status() sleep_time = tgram.parse_u32() return sleep_time def ls_get_status(opcode, port): 'Get status of low-speed sensor (ultrasonic)' tgram = _create(opcode) tgram.add_u8(port) return tgram def _parse_ls_get_status(tgram): tgram.check_status() n_bytes = tgram.parse_u8() return n_bytes def ls_write(opcode, port, tx_data, rx_bytes): 'Write a low-speed command to a sensor (ultrasonic)' tgram = _create(opcode) tgram.add_u8(port) tgram.add_u8(len(tx_data)) tgram.add_u8(rx_bytes) tgram.add_string(len(tx_data), tx_data) return tgram def ls_read(opcode, port): 'Read a low-speed sensor value (ultrasonic)' tgram = _create(opcode) tgram.add_u8(port) return tgram def _parse_ls_read(tgram): tgram.check_status() n_bytes = tgram.parse_u8() contents = tgram.parse_string() return contents[:n_bytes] def get_current_program_name(opcode): return _create(opcode) def _parse_get_current_program_name(tgram): tgram.check_status() fname = tgram.parse_string() return fname def message_read(opcode, remote_inbox, local_inbox, remove): tgram = _create(opcode) tgram.add_u8(remote_inbox) tgram.add_u8(local_inbox) tgram.add_u8(remove) return tgram def _parse_message_read(tgram): tgram.check_status() local_inbox = tgram.parse_u8() n_bytes = tgram.parse_u8() message = tgram.parse_string() return (local_inbox, message[:n_bytes]) OPCODES = { 0x00: (start_program, _parse_simple), 0x01: (stop_program, _parse_simple), 0x02: (play_sound_file, _parse_simple), 0x03: (play_tone, _parse_simple), 0x04: (set_output_state, _parse_simple), 0x05: (set_input_mode, _parse_simple), 0x06: (get_output_state, _parse_get_output_state), 0x07: (get_input_values, _parse_get_input_values), 0x08: (reset_input_scaled_value, _parse_simple), 0x09: (message_write, _parse_simple), 0x0A: (reset_motor_position, _parse_simple), 0x0B: (get_battery_level, _parse_get_battery_level), 0x0C: (stop_sound_playback, _parse_simple), 0x0D: (keep_alive, _parse_keep_alive), 0x0E: (ls_get_status, _parse_ls_get_status), 0x0F: (ls_write, _parse_simple), 0x10: (ls_read, _parse_ls_read), 0x11: (get_current_program_name, _parse_get_current_program_name), 0x13: (message_read, _parse_message_read), } # nxt.error module -- LEGO Mindstorms NXT error handling # Copyright (C) 2006,2007 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. class ProtocolError?(Exception): pass class SysProtError?(ProtocolError?): pass class FileExistsError?(SysProtError?): pass class FileNotFound?(SysProtError?): pass class ModuleNotFound?(SysProtError?): pass class DirProtError?(ProtocolError?): pass class I2CError(DirProtError?): pass class I2CPendingError?(I2CError): pass CODES = { 0x00: None, 0x20: I2CPendingError('Pending communication transaction in progress'), 0x40: DirProtError('Specified mailbox queue is empty'), 0x81: SysProtError('No more handles'), 0x82: SysProtError('No space'), 0x83: SysProtError('No more files'), 0x84: SysProtError('End of file expected'), 0x85: SysProtError('End of file'), 0x86: SysProtError('Not a linear file'), 0x87: FileNotFound('File not found'), 0x88: SysProtError('Handle already closed'), 0x89: SysProtError('No linear space'), 0x8A: SysProtError('Undefined error'), 0x8B: SysProtError('File is busy'), 0x8C: SysProtError('No write buffers'), 0x8D: SysProtError('Append not possible'), 0x8E: SysProtError('File is full'), 0x8F: FileExistsError('File exists'), 0x90: ModuleNotFound('Module not found'), 0x91: SysProtError('Out of bounds'), 0x92: SysProtError('Illegal file name'), 0x93: SysProtError('Illegal handle'), 0xBD: DirProtError('Request failed (i.e. specified file not found)'), 0xBE: DirProtError('Unknown command opcode'), 0xBF: DirProtError('Insane packet'), 0xC0: DirProtError('Data contains out-of-range values'), 0xDD: DirProtError('Communication bus error'), 0xDE: DirProtError('No free memory in communication buffer'), 0xDF: DirProtError('Specified channel/connection is not valid'), 0xE0: I2CError('Specified channel/connection not configured or busy'), 0xEC: DirProtError('No active program'), 0xED: DirProtError('Illegal size specified'), 0xEE: DirProtError('Illegal mailbox queue ID specified'), 0xEF: DirProtError('Attempted to access invalid field of a structure'), 0xF0: DirProtError('Bad input or output specified'), 0xFB: DirProtError('Insufficient memory available'), 0xFF: DirProtError('Bad arguments'), } def check_status(status): if status: ex = CODES.get(status) if ex: raise ex else: raise ProtocolError, status # nxt.locator module -- Locate LEGO Minstorms NXT bricks via USB or Bluetooth # Copyright (C) 2006-2007 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. class BrickNotFoundError?(Exception): pass def find_bricks(host=None, name=None): try: import usbsock socks = usbsock.find_bricks(host, name) for s in socks: yield s except ImportError: pass try: import bluesock from bluetooth import BluetoothError try: socks = bluesock.find_bricks(host, name) for s in socks: yield s except BluetoothError: pass except ImportError: pass def find_one_brick(host=None, name=None): for s in find_bricks(host, name): return s raise BrickNotFoundError # nxt.motor module -- Class to control LEGO Mindstorms NXT motors # Copyright (C) 2006 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. PORT_A = 0x00 PORT_B = 0x01 PORT_C = 0x02 PORT_ALL = 0xFF MODE_IDLE = 0x00 MODE_MOTOR_ON = 0x01 MODE_BRAKE = 0x02 MODE_REGULATED = 0x04 REGULATION_IDLE = 0x00 REGULATION_MOTOR_SPEED = 0x01 REGULATION_MOTOR_SYNC = 0x02 RUN_STATE_IDLE = 0x00 RUN_STATE_RAMP_UP = 0x10 RUN_STATE_RUNNING = 0x20 RUN_STATE_RAMP_DOWN = 0x40 LIMIT_RUN_FOREVER = 0 class Motor(object): def __init__(self, brick, port): self.brick = brick self.port = port self.power = 0 self.mode = MODE_IDLE self.regulation = REGULATION_IDLE self.turn_ratio = 0 self.run_state = RUN_STATE_IDLE self.tacho_limit = LIMIT_RUN_FOREVER self.tacho_count = 0 self.block_tacho_count = 0 self.rotation_count = 0 def set_output_state(self): self.brick.set_output_state(self.port, self.power, self.mode, self.regulation, self.turn_ratio, self.run_state, self.tacho_limit) def get_output_state(self): values = self.brick.get_output_state(self.port) (self.port, self.power, self.mode, self.regulation, self.turn_ratio, self.run_state, self.tacho_limit, tacho_count, block_tacho_count, rotation_count) = values return values def reset_position(self, relative): self.brick.reset_motor_position(self.port, relative) # nxt.sensor module -- Classes to read LEGO Mindstorms NXT sensors # Copyright (C) 2006,2007 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. from time import sleep from nxt.error import I2CError, I2CPendingError? PORT_1 = 0x00 PORT_2 = 0x01 PORT_3 = 0x02 PORT_4 = 0x03 class Type(object): # NOTE: just a namespace (enumeration) NO_SENSOR = 0x00 SWITCH = 0x01 # Touch sensor TEMPERATURE = 0x02 REFLECTION = 0x03 ANGLE = 0x04 LIGHT_ACTIVE = 0x05 # Light sensor (illuminated) LIGHT_INACTIVE = 0x06 # Light sensor (ambient) SOUND_DB = 0x07 # Sound sensor (unadjusted) SOUND_DBA = 0x08 # Sound sensor (adjusted) CUSTOM = 0x09 LOW_SPEED = 0x0A LOW_SPEED_9V = 0x0B # Low-speed I2C (Ultrasonic sensor) class Mode(object): # NOTE: just a namespace (enumeration) RAW = 0x00 BOOLEAN = 0x20 TRANSITION_CNT = 0x40 PERIOD_COUNTER = 0x60 PCT_FULL_SCALE = 0x80 CELSIUS = 0xA0 FAHRENHEIT = 0xC0 ANGLE_STEPS = 0xE0 MASK = 0xE0 MASK_SLOPE = 0x1F # Why isn't this slope thing documented? class Sensor(object): def __init__(self, brick, port): self.brick = brick self.port = port self.sensor_type = Type.NO_SENSOR self.mode = Mode.RAW def set_input_mode(self): self.brick.set_input_mode(self.port, self.sensor_type, self.mode) class AnalogSensor?(Sensor): def __init__(self, brick, port): super(AnalogSensor, self).__init__(brick, port) self.valid = False self.calibrated = False self.raw_ad_value = 0 self.normalized_ad_value = 0 self.scaled_value = 0 self.calibrated_value = 0 def get_input_values(self): values = self.brick.get_input_values(self.port) (self.port, self.valid, self.calibrated, self.sensor_type, self.mode, self.raw_ad_value, self.normalized_ad_value, self.scaled_value, self.calibrated_value) = values return values def reset_input_scaled_value(self): self.brick.reset_input_scaled_value() def get_sample(self): self.get_input_values() return self.scaled_value class TouchSensor?(AnalogSensor?): def __init__(self, brick, port): super(TouchSensor, self).__init__(brick, port) self.sensor_type = Type.SWITCH self.mode = Mode.BOOLEAN self.set_input_mode() def is_pressed(self): return bool(self.scaled_value) def get_sample(self): self.get_input_values() return self.is_pressed() class LightSensor?(AnalogSensor?): def __init__(self, brick, port): super(LightSensor, self).__init__(brick, port) self.set_illuminated(True) def set_illuminated(self, active): if active: self.sensor_type = Type.LIGHT_ACTIVE else: self.sensor_type = Type.LIGHT_INACTIVE self.set_input_mode() class SoundSensor?(AnalogSensor?): def __init__(self, brick, port): super(SoundSensor, self).__init__(brick, port) self.set_adjusted(True) def set_adjusted(self, active): if active: self.sensor_type = Type.SOUND_DBA else: self.sensor_type = Type.SOUND_DB self.set_input_mode() I2C_ADDRESS = { 0x00: ('version', 8), 0x08: ('product_id', 8), 0x10: ('sensor_type', 8), 0x11: ('factory_zero', 1), # is this really correct? 0x12: ('factory_scale_factor', 1), 0x13: ('factory_scale_divisor', 1), 0x14: ('measurement_units', 1), } def _make_query(address, n_bytes): def query(self): data = self.i2c_query(address, n_bytes) if n_bytes == 1: return ord(data) else: return data return query class _Meta(type): 'Metaclass which adds accessor methods for I2C addresses' def __init__(cls, name, bases, dict): super(_Meta, cls).__init__(name, bases, dict) for address in I2C_ADDRESS: name, n_bytes = I2C_ADDRESS[address] q = _make_query(address, n_bytes) setattr(cls, 'get_' + name, q) class DigitalSensor?(Sensor): __metaclass__ = _Meta I2C_DEV = 0x02 def __init__(self, brick, port): super(DigitalSensor, self).__init__(brick, port) def _ls_get_status(self, n_bytes): for n in range(3): try: b = self.brick.ls_get_status(self.port) if b >= n_bytes: return b except I2CPendingError: sleep(0.01) raise I2CError, 'ls_get_status timeout' def i2c_command(self, address, value): msg = chr(DigitalSensor.I2C_DEV) + chr(address) + chr(value) self.brick.ls_write(self.port, msg, 0) def i2c_query(self, address, n_bytes): msg = chr(DigitalSensor.I2C_DEV) + chr(address) self.brick.ls_write(self.port, msg, n_bytes) self._ls_get_status(n_bytes) data = self.brick.ls_read(self.port) if len(data) < n_bytes: raise I2CError, 'Read failure' return data[-n_bytes:] class CommandState?(object): # NOTE: just a namespace (enumeration) OFF = 0x00 SINGLE_SHOT = 0x01 CONTINUOUS_MEASUREMENT = 0x02 EVENT_CAPTURE = 0x03 # Check for ultrasonic interference REQUEST_WARM_RESET = 0x04 # I2C addresses for an Ultrasonic sensor I2C_ADDRESS_US = { 0x40: ('continuous_measurement_interval', 1, True), 0x41: ('command_state', 1, True), 0x42: ('measurement_byte_0', 1, False), 0x43: ('measurement_byte_1', 1, False), 0x44: ('measurement_byte_2', 1, False), 0x45: ('measurement_byte_3', 1, False), 0x46: ('measurement_byte_4', 1, False), 0x47: ('measurement_byte_5', 1, False), 0x48: ('measurement_byte_6', 1, False), 0x49: ('measurement_byte_7', 1, False), 0x50: ('actual_zero', 1, True), 0x51: ('actual_scale_factor', 1, True), 0x52: ('actual_scale_divisor', 1, True), } def _make_command(address): def command(self, value): self.i2c_command(address, value) return command class _MetaUS(_Meta): 'Metaclass which adds accessor methods for US I2C addresses' def __init__(cls, name, bases, dict): super(_MetaUS, cls).__init__(name, bases, dict) for address in I2C_ADDRESS_US: name, n_bytes, set_method = I2C_ADDRESS_US[address] q = _make_query(address, n_bytes) setattr(cls, 'get_' + name, q) if set_method: c = _make_command(address) setattr(cls, 'set_' + name, c) class UltrasonicSensor?(DigitalSensor?): __metaclass__ = _MetaUS def __init__(self, brick, port): super(UltrasonicSensor, self).__init__(brick, port) self.sensor_type = Type.LOW_SPEED_9V self.mode = Mode.RAW self.set_input_mode() sleep(0.1) # Give I2C time to initialize def get_single_shot_measurement(self): self.set_command_state(CommandState.SINGLE_SHOT) return self.get_measurement_byte_0() UltrasonicSensor?.get_sample = UltrasonicSensor?.get_measurement_byte_0 # nxt.system module -- LEGO Mindstorms NXT system telegrams # Copyright (C) 2006 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. def _create(opcode): 'Create a simple system telegram' from telegram import Telegram return Telegram(False, opcode) def _create_with_file(opcode, fname): tgram = _create(opcode) tgram.add_filename(fname) return tgram def _create_with_handle(opcode, handle): tgram = _create(opcode) tgram.add_u8(handle) return tgram def open_read(opcode, fname): return _create_with_file(opcode, fname) def _parse_open_read(tgram): tgram.check_status() handle = tgram.parse_u8() n_bytes = tgram.parse_u32() return (handle, n_bytes) def open_write(opcode, fname, n_bytes): tgram = _create_with_file(opcode, fname) tgram.add_u32(n_bytes) return tgram def _parse_open_write(tgram): tgram.check_status() handle = tgram.parse_u8() return handle def read(opcode, handle, n_bytes): tgram = _create_with_handle(opcode, handle) tgram.add_u16(n_bytes) return tgram def _parse_read(tgram): tgram.check_status() handle = tgram.parse_u8() n_bytes = tgram.parse_u16() data = tgram.parse_string() return (handle, n_bytes, data) def write(opcode, handle, data): tgram = _create_with_handle(opcode, handle) tgram.add_string(len(data), data) return tgram def _parse_write(tgram): tgram.check_status() handle = tgram.parse_u8() n_bytes = tgram.parse_u16() return (handle, n_bytes) def close(opcode, handle): return _create_with_handle(opcode, handle) def _parse_close(tgram): tgram.check_status() handle = tgram.parse_u8() return handle def delete(opcode, fname): return _create_with_file(opcode, fname) def _parse_delete(tgram): tgram.check_status() handle = tgram.parse_u8() fname = tgram.parse_string() return (handle, fname) def find_first(opcode, fname): return _create_with_file(opcode, fname) def _parse_find(tgram): tgram.check_status() handle = tgram.parse_u8() fname = tgram.parse_string(20) n_bytes = tgram.parse_u32() return (handle, fname, n_bytes) def find_next(opcode, handle): return _create_with_handle(opcode, handle) def get_firmware_version(opcode): return _create(opcode) def _parse_get_firmware_version(tgram): tgram.check_status() prot_minor = tgram.parse_u8() prot_major = tgram.parse_u8() prot_version = (prot_major, prot_minor) fw_minor = tgram.parse_u8() fw_major = tgram.parse_u8() fw_version = (fw_major, fw_minor) return (prot_version, fw_version) def open_write_linear(opcode, fname, n_bytes): tgram = _create_with_file(opcode, fname) tgram.add_u32(n_bytes) return tgram def open_read_linear(opcode, fname): return _create_with_file(opcode, fname) def _parse_open_read_linear(tgram): tgram.check_status() n_bytes = tgram.parse_u32() return n_bytes def open_write_data(opcode, fname, n_bytes): tgram = _create_with_file(opcode, fname) tgram.add_u32(n_bytes) return tgram def open_append_data(opcode, fname): return _create_with_file(opcode, fname) def _parse_open_append_data(tgram): tgram.check_status() handle = tgram.parse_u8() n_bytes = tgram.parse_u32() return (handle, n_bytes) def request_first_module(opcode, mname): return _create_with_file(opcode, mname) def _parse_request_module(tgram): tgram.check_status() handle = tgram.parse_u8() mname = tgram.parse_string(20) mod_id = tgram.parse_u32() mod_size = tgram.parse_u32() mod_iomap_size = tgram.parse_u16() return (handle, mname, mod_id, mod_size, mod_iomap_size) def request_next_module(opcode, handle): return _create_with_handle(opcode, handle) def close_module_handle(opcode, handle): return _create_with_handle(opcode, handle) def read_io_map(opcode, mod_id, offset, n_bytes): tgram = _create(opcode) tgram.add_u32(mod_id) tgram.add_u16(offset) tgram.add_u16(n_bytes) return tgram def _parse_read_io_map(tgram): tgram.check_status() mod_id = tgram.parse_u32() n_bytes = tgram.parse_u16() contents = tgram.parse_string() return (mod_id, n_bytes, contents) def write_io_map(opcode, mod_id, offset, content): tgram = _create(opcode) tgram.add_u32(mod_id) tgram.add_u16(offset) tgram.add_u16(len(content)) tgram.add_string(len(content), content) return tgram def _parse_write_io_map(tgram): tgram.check_status() mod_id = tgram.parse_u32() n_bytes = tgram.parse_u16() return (mod_id, n_bytes) def boot(opcode): # Note: this command is USB only (no Bluetooth) tgram = _create(opcode) tgram.add_string(19, "Let's dance: SAMBA\0") return tgram def _parse_boot(tgram): tgram.check_status() resp = tgram.parse_string() # Resp should be 'Yes\0' return resp def set_brick_name(opcode, bname): tgram = _create(opcode) # FIXME: validate brick name tgram.add_string(len(bname), bname) return tgram def _parse_set_brick_name(tgram): tgram.check_status() def get_device_info(opcode): return _create(opcode) def _parse_get_device_info(tgram): tgram.check_status() name = tgram.parse_string(15) a0 = tgram.parse_u8() a1 = tgram.parse_u8() a2 = tgram.parse_u8() a3 = tgram.parse_u8() a4 = tgram.parse_u8() a5 = tgram.parse_u8() a6 = tgram.parse_u8() # FIXME: what is a6 for? address = '%02X:%02X:%02X:%02X:%02X:%02X' % (a0, a1, a2, a3, a4, a5) signal_strength = tgram.parse_u32() user_flash = tgram.parse_u32() return (name, address, signal_strength, user_flash) def delete_user_flash(opcode): return _create(opcode) def _parse_delete_user_flash(tgram): tgram.check_status() def poll_command_length(opcode, buf_num): tgram = _create(opcode) tgram.add_u8(buf_num) return tgram def _parse_poll_command_length(tgram): buf_num = tgram.parse_u8() tgram.check_status() n_bytes = tgram.parse_u8() return (buf_num, n_bytes) def poll_command(opcode, buf_num, n_bytes): tgram = _create(opcode) tgram.add_u8(buf_num) tgram.add_u8(n_bytes) return tgram def _parse_poll_command(tgram): buf_num = tgram.parse_u8() tgram.check_status() n_bytes = tgram.parse_u8() command = tgram.parse_string() return (buf_num, n_bytes, command) def bluetooth_factory_reset(opcode): # Note: this command is USB only (no Bluetooth) return _create(opcode) def _parse_bluetooth_factory_reset(tgram): tgram.check_status() OPCODES = { 0x80: (open_read, _parse_open_read), 0x81: (open_write, _parse_open_write), 0x82: (read, _parse_read), 0x83: (write, _parse_write), 0x84: (close, _parse_close), 0x85: (delete, _parse_delete), 0x86: (find_first, _parse_find), 0x87: (find_next, _parse_find), 0x88: (get_firmware_version, _parse_get_firmware_version), 0x89: (open_write_linear, _parse_open_write), 0x8A: (open_read_linear, _parse_open_read_linear), 0x8B: (open_write_data, _parse_open_write), 0x8C: (open_append_data, _parse_open_append_data), 0x90: (request_first_module, _parse_request_module), 0x91: (request_next_module, _parse_request_module), 0x92: (close_module_handle, _parse_close), 0x94: (read_io_map, _parse_read_io_map), 0x95: (write_io_map, _parse_write_io_map), 0x97: (boot, _parse_boot), 0x98: (set_brick_name, _parse_set_brick_name), 0x9B: (get_device_info, _parse_get_device_info), 0xA0: (delete_user_flash, _parse_delete_user_flash), 0xA1: (poll_command_length, _parse_poll_command_length), 0xA2: (poll_command, _parse_poll_command), 0xA4: (bluetooth_factory_reset, _parse_bluetooth_factory_reset), } # nxt.telegram module -- LEGO Mindstorms NXT telegram formatting and parsing # Copyright (C) 2006 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. from cStringIO import StringIO from struct import pack, unpack import nxt.error class InvalidReplyError?(Exception): pass class InvalidOpcodeError?(Exception): pass class Telegram(object): TYPE = 0 # type byte offset CODE = 1 # code byte offset DATA = 2 # data byte offset TYPE_NOT_DIRECT = 0x01 # system vs. direct type TYPE_REPLY = 0x02 # reply type (from NXT brick) TYPE_REPLY_NOT_REQUIRED = 0x80 # reply not required flag def __init__(self, direct=False, opcode=0, reply_req=True, pkt=None): if pkt: self.pkt = StringIO(pkt) self.typ = self.parse_u8() self.opcode = self.parse_u8() if not self.is_reply(): raise InvalidReplyError if self.opcode != opcode: raise InvalidOpcodeError, self.opcode else: self.pkt = StringIO() typ = 0 if not direct: typ |= Telegram.TYPE_NOT_DIRECT if not reply_req: typ |= Telegram.TYPE_REPLY_NOT_REQUIRED self.add_u8(typ) self.add_u8(opcode) def __str__(self): return self.pkt.getvalue() def is_reply(self): return self.typ == Telegram.TYPE_REPLY def add_string(self, n_bytes, v): self.pkt.write(pack('%ds' % n_bytes, v)) def add_filename(self, fname): self.pkt.write(pack('20s', fname)) def add_s8(self, v): self.pkt.write(pack('<b', v)) def add_u8(self, v): self.pkt.write(pack('<B', v)) def add_s16(self, v): self.pkt.write(pack('<h', v)) def add_u16(self, v): self.pkt.write(pack('<H', v)) def add_s32(self, v): self.pkt.write(pack('<i', v)) def add_u32(self, v): self.pkt.write(pack('<I', v)) def parse_string(self, n_bytes=0): if n_bytes: return unpack('%ss' % n_bytes, self.pkt.read(n_bytes))[0] else: return self.pkt.read() def parse_s8(self): return unpack('<b', self.pkt.read(1))[0] def parse_u8(self): return unpack('<B', self.pkt.read(1))[0] def parse_s16(self): return unpack('<h', self.pkt.read(2))[0] def parse_u16(self): return unpack('<H', self.pkt.read(2))[0] def parse_s32(self): return unpack('<i', self.pkt.read(4))[0] def parse_u32(self): return unpack('<I', self.pkt.read(4))[0] def check_status(self): nxt.error.check_status(self.parse_u8()) import nxt.direct import nxt.system OPCODES = dict(nxt.system.OPCODES) OPCODES.update(nxt.direct.OPCODES) # nxt.usbsock module -- USB socket communication with LEGO Minstorms NXT # Copyright (C) 2006-2007 Douglas P Lau # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. import usb from nxt.brick import Brick ID_VENDOR_LEGO = 0x0694 ID_PRODUCT_NXT = 0x0002 class USBSock(object): bsize = 60 # USB socket block size def __init__(self, device): self.device = device self.handle = None self.debug = False def __str__(self): return 'USB (%s)' % (self.device.filename) def connect(self): config = self.device.configurations[0] iface = config.interfaces[0][0] self.blk_out, self.blk_in = iface.endpoints self.handle = self.device.open() self.handle.setConfiguration(1) self.handle.claimInterface(0) self.handle.reset() return Brick(self) def close(self): self.device = None self.handle = None self.blk_out = None self.blk_in = None def send(self, data): if self.debug: print 'Send:', print ':'.join('%02x' % ord(c) for c in data) self.handle.bulkWrite(self.blk_out.address, data) def recv(self): data = self.handle.bulkRead(self.blk_in.address, 64) if self.debug: print 'Recv:', print ':'.join('%02x' % (c & 0xFF) for c in data) # NOTE: bulkRead returns a tuple of ints ... make it sane return ''.join(chr(d & 0xFF) for d in data) def find_bricks(host=None, name=None): # FIXME: probably should check host and name for bus in usb.busses(): for device in bus.devices: if device.idVendor == ID_VENDOR_LEGO and \ device.idProduct == ID_PRODUCT_NXT: yield USBSock(device) |