linux関連/nxtpython/nxtfile
をテンプレートにして作成
[
トップ
] [
新規
|
一覧
|
単語検索
|
最終更新
|
ヘルプ
]
開始行:
#navi(linux関連);
#hr
# nxt.__init__ module -- LEGO Mindstorms NXT python package
# Copyright (C) 2006 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
# nxt.bluesock module -- Bluetooth socket communication w...
# Copyright (C) 2006-2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
import bluetooth
import os
from nxt.brick import Brick
class BlueSock(object):
bsize = 118 # Bluetooth socket block size
PORT = 1 # Standard NXT rfcomm port
def __init__(self, host):
self.host = host
self.sock = None
self.debug = False
def __str__(self):
return 'Bluetooth (%s)' % self.host
def connect(self):
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
sock.connect((self.host, BlueSock.PORT))
self.sock = sock
return Brick(self)
def close(self):
self.sock.close()
def send(self, data):
if self.debug:
print 'Send:',
print ':'.join('%02x' % ord(c) for c in data)
l0 = len(data) & 0xFF
l1 = (len(data) >> 8) & 0xFF
d = chr(l0) + chr(l1) + data
self.sock.send(d)
def recv(self):
data = self.sock.recv(2)
l0 = ord(data[0])
l1 = ord(data[1])
plen = l0 + (l1 << 8)
data = self.sock.recv(plen)
if self.debug:
print 'Recv:',
print ':'.join('%02x' % ord(c) for c in data)
return data
def _check_brick(arg, value):
return arg is None or arg == value
def find_bricks(host=None, name=None):
for h, n in bluetooth.discover_devices(lookup_names=True):
if _check_brick(host, h) and _check_brick(name, n):
yield BlueSock(h)
# nxt.brick module -- Classes to represent LEGO Mindstorm...
# Copyright (C) 2006 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
from time import sleep
from nxt.error import FileNotFound, ModuleNotFound
from nxt.telegram import OPCODES, Telegram
def _make_poller(opcode, poll_func, parse_func):
def poll(self, *args, **kwargs):
ogram = poll_func(opcode, *args, **kwargs)
self.sock.send(str(ogram))
igram = Telegram(opcode=opcode, pkt=self.sock.recv())
return parse_func(igram)
return poll
class _Meta(type):
'Metaclass which adds one method for each telegram opcode'
def __init__(cls, name, bases, dict):
super(_Meta, cls).__init__(name, bases, dict)
for opcode in OPCODES:
poll_func, parse_func = OPCODES[opcode]
m = _make_poller(opcode, poll_func, parse_func)
setattr(cls, poll_func.__name__, m)
class Brick(object):
__metaclass__ = _Meta
def __init__(self, sock):
self.sock = sock
def play_tone_and_wait(self, frequency, duration):
self.play_tone(frequency, duration)
sleep(duration / 1000.0)
class FileFinder(object):
'Context manager to find files on a NXT brick'
def __init__(self, brick, pattern):
self.brick = brick
self.pattern = pattern
self.handle = None
def __enter__(self):
return self
def __exit__(self, etp, value, tb):
if self.handle:
self.brick.close(self.handle)
def __iter__(self):
self.handle, fname, size = self.brick.find_first(self.p...
yield (fname, size)
while True:
try:
handle, fname, size = self.brick.find_next(
self.handle)
yield (fname, size)
except FileNotFound:
break
class FileReader(object):
'Context manager to read a file on a NXT brick'
def __init__(self, brick, fname):
self.brick = brick
self.fname = fname
def __enter__(self):
self.handle, self.size = self.brick.open_read(self.fname)
return self
def __exit__(self, etp, value, tb):
self.brick.close(self.handle)
def __iter__(self):
rem = self.size
bsize = self.brick.sock.bsize
while rem > 0:
handle, bsize, data = self.brick.read(self.handle,
min(bsize, rem))
yield data
rem -= len(data)
class FileWriter(object):
'Context manager to write a file to a NXT brick'
def __init__(self, brick, fname, fil):
self.brick = brick
self.fname = fname
self.fil = fil
fil.seek(0, 2) # seek to end of file
self.size = fil.tell()
fil.seek(0) # seek to start of file
def __enter__(self):
self.handle = self.brick.open_write(self.fname, self.si...
return self
def __exit__(self, etp, value, tb):
self.brick.close(self.handle)
def __iter__(self):
rem = self.size
bsize = self.brick.sock.bsize
while rem > 0:
b = min(bsize, rem)
handle, size = self.brick.write(self.handle,
self.fil.read(b))
yield size
rem -= b
class ModuleFinder(object):
'Context manager to lookup modules on a NXT brick'
def __init__(self, brick, pattern):
self.brick = brick
self.pattern = pattern
self.handle = None
def __enter__(self):
return self
def __exit__(self, etp, value, tb):
if self.handle:
self.brick.close(self.handle)
def __iter__(self):
self.handle, mname, mid, msize, miomap_size = \
self.brick.request_first_module(self.pattern)
yield (mname, mid, msize, miomap_size)
while True:
try:
handle, mname, mid, msize, miomap_size = \
self.brick.request_next_module(
self.handle)
yield (mname, mid, msize, miomap_size)
except ModuleNotFound:
break
# nxt.compass module -- Classes to read Mindsensors Compa...
# Copyright (C) 2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
import sensor
from time import sleep
class Command(object):
# NOTE: just a namespace (enumeration)
AUTO_TRIG_ON = (0x41, 0x02)
AUTO_TRIG_OFF = (0x53, 0x01)
MAP_HEADING_BYTE = 0x42 # map heading to 0-255 range
MAP_HEADING_INTEGER = 0x49 # map heading to 0-36000 range
SAMPLING_50_HZ = 0x45 # set sampling frequency to 50 Hz
SAMPLING_60_HZ = 0x55 # set sampling frequency to 60 Hz
SET_ADPA_MODE_ON = 0x4E # set ADPA mode on
SET_ADPA_MODE_OFF = 0x4F # set ADPA mode off
BEGIN_CALIBRATION = 0x43 # begin calibration
DONE_CALIBRATION = 0x44 # done with calibration
LOAD_USER_CALIBRATION = 0x4C # load user calibration value
# I2C addresses for a Mindsensors CMPS-Nx compass sensor
I2C_ADDRESS_CMPS_NX = {
0x41: ('command', 1, True),
0x42: ('heading_lsb', 1, False),
0x43: ('heading_msb', 1, False),
0x44: ('x_offset_lsb', 1, True),
0x45: ('x_offset_msb', 1, True),
0x46: ('y_offset_lsb', 1, True),
0x47: ('y_offset_msb', 1, True),
0x48: ('x_range_lsb', 1, True),
0x49: ('x_range_msb', 1, True),
0x4A: ('y_range_lsb', 1, True),
0x4B: ('y_range_msb', 1, True),
0x4C: ('x_raw_lsb', 1, True),
0x4D: ('x_raw_msb', 1, True),
0x4E: ('y_raw_lsb', 1, True),
0x4F: ('y_raw_msb', 1, True),
}
class _MetaCMPS_Nx(sensor._Meta):
'Metaclass which adds accessor methods for CMPS-Nx I2C a...
def __init__(cls, name, bases, dict):
super(_MetaCMPS_Nx, cls).__init__(name, bases, dict)
for address in I2C_ADDRESS_CMPS_NX:
name, n_bytes, set_method = I2C_ADDRESS_CMPS_NX[address]
q = sensor._make_query(address, n_bytes)
setattr(cls, 'get_' + name, q)
if set_method:
c = sensor._make_command(address)
setattr(cls, 'set_' + name, c)
class CompassSensor(sensor.DigitalSensor):
__metaclass__ = _MetaCMPS_Nx
def __init__(self, brick, port):
super(CompassSensor, self).__init__(brick, port)
self.sensor_type = Type.LOW_SPEED_9V
self.mode = Mode.RAW
self.set_input_mode()
sleep(0.1) # Give I2C time to initialize
CompassSensor.get_sample = CompassSensor.get_heading_lsb
# nxt.direct module -- LEGO Mindstorms NXT direct telegrams
# Copyright (C) 2006-2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
def _create(opcode):
'Create a simple direct telegram'
from telegram import Telegram
return Telegram(True, opcode)
def start_program(opcode, fname):
tgram = _create(opcode)
tgram.add_filename(fname)
return tgram
def _parse_simple(tgram):
tgram.check_status()
def stop_program(opcode):
return _create(opcode)
def play_sound_file(opcode, loop, fname):
tgram = _create(opcode)
tgram.add_u8(loop)
tgram.add_filename(fname)
return tgram
def play_tone(opcode, frequency, duration):
'Play a tone at frequency (Hz) for duration (ms)'
tgram = _create(opcode)
tgram.add_u16(frequency)
tgram.add_u16(duration)
return tgram
def set_output_state(opcode, port, power, mode, regulatio...
run_state, tacho_limit):
tgram = _create(opcode)
tgram.add_u8(port)
tgram.add_s8(power)
tgram.add_u8(mode)
tgram.add_u8(regulation)
tgram.add_s8(turn_ratio)
tgram.add_u8(run_state)
tgram.add_u32(tacho_limit)
return tgram
def set_input_mode(opcode, port, sensor_type, sensor_mode):
tgram = _create(opcode)
tgram.add_u8(port)
tgram.add_u8(sensor_type)
tgram.add_u8(sensor_mode)
return tgram
def get_output_state(opcode, port):
tgram = _create(opcode)
tgram.add_u8(port)
return tgram
def _parse_get_output_state(tgram):
tgram.check_status()
port = tgram.parse_u8()
power = tgram.parse_s8()
mode = tgram.parse_u8()
regulation = tgram.parse_u8()
turn_ratio = tgram.parse_s8()
run_state = tgram.parse_u8()
tacho_limit = tgram.parse_u32()
tacho_count = tgram.parse_s32()
block_tacho_count = tgram.parse_s32()
rotation_count = tgram.parse_s32()
return (port, power, mode, regulation, turn_ratio, run_s...
tacho_limit, tacho_count, block_tacho_count, rotation_c...
def get_input_values(opcode, port):
tgram = _create(opcode)
tgram.add_u8(port)
return tgram
def _parse_get_input_values(tgram):
tgram.check_status()
port = tgram.parse_u8()
valid = tgram.parse_u8()
calibrated = tgram.parse_u8()
sensor_type = tgram.parse_u8()
sensor_mode = tgram.parse_u8()
raw_ad_value = tgram.parse_u16()
normalized_ad_value = tgram.parse_u16()
scaled_value = tgram.parse_s16()
calibrated_value = tgram.parse_s16()
return (port, valid, calibrated, sensor_type, sensor_mod...
normalized_ad_value, scaled_value, calibrated_value)
def reset_input_scaled_value(opcode, port):
tgram = _create(opcode)
tgram.add_u8(port)
return tgram
def message_write(opcode, inbox, message):
tgram = _create(opcode)
tgram.add_u8(inbox)
tgram.add_u8(len(message) + 1)
tgram.add_string(len(message), message)
tgram.add_u8(0)
return tgram
def reset_motor_position(opcode, port, relative):
tgram = _create(opcode)
tgram.add_u8(port)
tgram.add_u8(relative)
return tgram
def get_battery_level(opcode):
return _create(opcode)
def _parse_get_battery_level(tgram):
tgram.check_status()
millivolts = tgram.parse_u16()
return millivolts
def stop_sound_playback(opcode):
return _create(opcode)
def keep_alive(opcode):
return _create(opcode)
def _parse_keep_alive(tgram):
tgram.check_status()
sleep_time = tgram.parse_u32()
return sleep_time
def ls_get_status(opcode, port):
'Get status of low-speed sensor (ultrasonic)'
tgram = _create(opcode)
tgram.add_u8(port)
return tgram
def _parse_ls_get_status(tgram):
tgram.check_status()
n_bytes = tgram.parse_u8()
return n_bytes
def ls_write(opcode, port, tx_data, rx_bytes):
'Write a low-speed command to a sensor (ultrasonic)'
tgram = _create(opcode)
tgram.add_u8(port)
tgram.add_u8(len(tx_data))
tgram.add_u8(rx_bytes)
tgram.add_string(len(tx_data), tx_data)
return tgram
def ls_read(opcode, port):
'Read a low-speed sensor value (ultrasonic)'
tgram = _create(opcode)
tgram.add_u8(port)
return tgram
def _parse_ls_read(tgram):
tgram.check_status()
n_bytes = tgram.parse_u8()
contents = tgram.parse_string()
return contents[:n_bytes]
def get_current_program_name(opcode):
return _create(opcode)
def _parse_get_current_program_name(tgram):
tgram.check_status()
fname = tgram.parse_string()
return fname
def message_read(opcode, remote_inbox, local_inbox, remov...
tgram = _create(opcode)
tgram.add_u8(remote_inbox)
tgram.add_u8(local_inbox)
tgram.add_u8(remove)
return tgram
def _parse_message_read(tgram):
tgram.check_status()
local_inbox = tgram.parse_u8()
n_bytes = tgram.parse_u8()
message = tgram.parse_string()
return (local_inbox, message[:n_bytes])
OPCODES = {
0x00: (start_program, _parse_simple),
0x01: (stop_program, _parse_simple),
0x02: (play_sound_file, _parse_simple),
0x03: (play_tone, _parse_simple),
0x04: (set_output_state, _parse_simple),
0x05: (set_input_mode, _parse_simple),
0x06: (get_output_state, _parse_get_output_state),
0x07: (get_input_values, _parse_get_input_values),
0x08: (reset_input_scaled_value, _parse_simple),
0x09: (message_write, _parse_simple),
0x0A: (reset_motor_position, _parse_simple),
0x0B: (get_battery_level, _parse_get_battery_level),
0x0C: (stop_sound_playback, _parse_simple),
0x0D: (keep_alive, _parse_keep_alive),
0x0E: (ls_get_status, _parse_ls_get_status),
0x0F: (ls_write, _parse_simple),
0x10: (ls_read, _parse_ls_read),
0x11: (get_current_program_name, _parse_get_current_prog...
0x13: (message_read, _parse_message_read),
}
# nxt.error module -- LEGO Mindstorms NXT error handling
# Copyright (C) 2006,2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
class ProtocolError(Exception):
pass
class SysProtError(ProtocolError):
pass
class FileExistsError(SysProtError):
pass
class FileNotFound(SysProtError):
pass
class ModuleNotFound(SysProtError):
pass
class DirProtError(ProtocolError):
pass
class I2CError(DirProtError):
pass
class I2CPendingError(I2CError):
pass
CODES = {
0x00: None,
0x20: I2CPendingError('Pending communication transaction...
0x40: DirProtError('Specified mailbox queue is empty'),
0x81: SysProtError('No more handles'),
0x82: SysProtError('No space'),
0x83: SysProtError('No more files'),
0x84: SysProtError('End of file expected'),
0x85: SysProtError('End of file'),
0x86: SysProtError('Not a linear file'),
0x87: FileNotFound('File not found'),
0x88: SysProtError('Handle already closed'),
0x89: SysProtError('No linear space'),
0x8A: SysProtError('Undefined error'),
0x8B: SysProtError('File is busy'),
0x8C: SysProtError('No write buffers'),
0x8D: SysProtError('Append not possible'),
0x8E: SysProtError('File is full'),
0x8F: FileExistsError('File exists'),
0x90: ModuleNotFound('Module not found'),
0x91: SysProtError('Out of bounds'),
0x92: SysProtError('Illegal file name'),
0x93: SysProtError('Illegal handle'),
0xBD: DirProtError('Request failed (i.e. specified file ...
0xBE: DirProtError('Unknown command opcode'),
0xBF: DirProtError('Insane packet'),
0xC0: DirProtError('Data contains out-of-range values'),
0xDD: DirProtError('Communication bus error'),
0xDE: DirProtError('No free memory in communication buff...
0xDF: DirProtError('Specified channel/connection is not ...
0xE0: I2CError('Specified channel/connection not configu...
0xEC: DirProtError('No active program'),
0xED: DirProtError('Illegal size specified'),
0xEE: DirProtError('Illegal mailbox queue ID specified'),
0xEF: DirProtError('Attempted to access invalid field of...
0xF0: DirProtError('Bad input or output specified'),
0xFB: DirProtError('Insufficient memory available'),
0xFF: DirProtError('Bad arguments'),
}
def check_status(status):
if status:
ex = CODES.get(status)
if ex:
raise ex
else:
raise ProtocolError, status
# nxt.locator module -- Locate LEGO Minstorms NXT bricks ...
# Copyright (C) 2006-2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
class BrickNotFoundError(Exception):
pass
def find_bricks(host=None, name=None):
try:
import usbsock
socks = usbsock.find_bricks(host, name)
for s in socks:
yield s
except ImportError:
pass
try:
import bluesock
from bluetooth import BluetoothError
try:
socks = bluesock.find_bricks(host, name)
for s in socks:
yield s
except BluetoothError:
pass
except ImportError:
pass
def find_one_brick(host=None, name=None):
for s in find_bricks(host, name):
return s
raise BrickNotFoundError
# nxt.motor module -- Class to control LEGO Mindstorms NX...
# Copyright (C) 2006 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
PORT_A = 0x00
PORT_B = 0x01
PORT_C = 0x02
PORT_ALL = 0xFF
MODE_IDLE = 0x00
MODE_MOTOR_ON = 0x01
MODE_BRAKE = 0x02
MODE_REGULATED = 0x04
REGULATION_IDLE = 0x00
REGULATION_MOTOR_SPEED = 0x01
REGULATION_MOTOR_SYNC = 0x02
RUN_STATE_IDLE = 0x00
RUN_STATE_RAMP_UP = 0x10
RUN_STATE_RUNNING = 0x20
RUN_STATE_RAMP_DOWN = 0x40
LIMIT_RUN_FOREVER = 0
class Motor(object):
def __init__(self, brick, port):
self.brick = brick
self.port = port
self.power = 0
self.mode = MODE_IDLE
self.regulation = REGULATION_IDLE
self.turn_ratio = 0
self.run_state = RUN_STATE_IDLE
self.tacho_limit = LIMIT_RUN_FOREVER
self.tacho_count = 0
self.block_tacho_count = 0
self.rotation_count = 0
def set_output_state(self):
self.brick.set_output_state(self.port, self.power, self...
self.regulation, self.turn_ratio, self.run_state,
self.tacho_limit)
def get_output_state(self):
values = self.brick.get_output_state(self.port)
(self.port, self.power, self.mode, self.regulation,
self.turn_ratio, self.run_state, self.tacho_limit,
tacho_count, block_tacho_count, rotation_count) = values
return values
def reset_position(self, relative):
self.brick.reset_motor_position(self.port, relative)
# nxt.sensor module -- Classes to read LEGO Mindstorms NX...
# Copyright (C) 2006,2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
from time import sleep
from nxt.error import I2CError, I2CPendingError
PORT_1 = 0x00
PORT_2 = 0x01
PORT_3 = 0x02
PORT_4 = 0x03
class Type(object):
# NOTE: just a namespace (enumeration)
NO_SENSOR = 0x00
SWITCH = 0x01 # Touch sensor
TEMPERATURE = 0x02
REFLECTION = 0x03
ANGLE = 0x04
LIGHT_ACTIVE = 0x05 # Light sensor (illuminated)
LIGHT_INACTIVE = 0x06 # Light sensor (ambient)
SOUND_DB = 0x07 # Sound sensor (unadjusted)
SOUND_DBA = 0x08 # Sound sensor (adjusted)
CUSTOM = 0x09
LOW_SPEED = 0x0A
LOW_SPEED_9V = 0x0B # Low-speed I2C (Ultrasonic sensor)
class Mode(object):
# NOTE: just a namespace (enumeration)
RAW = 0x00
BOOLEAN = 0x20
TRANSITION_CNT = 0x40
PERIOD_COUNTER = 0x60
PCT_FULL_SCALE = 0x80
CELSIUS = 0xA0
FAHRENHEIT = 0xC0
ANGLE_STEPS = 0xE0
MASK = 0xE0
MASK_SLOPE = 0x1F # Why isn't this slope thing documented?
class Sensor(object):
def __init__(self, brick, port):
self.brick = brick
self.port = port
self.sensor_type = Type.NO_SENSOR
self.mode = Mode.RAW
def set_input_mode(self):
self.brick.set_input_mode(self.port, self.sensor_type,
self.mode)
class AnalogSensor(Sensor):
def __init__(self, brick, port):
super(AnalogSensor, self).__init__(brick, port)
self.valid = False
self.calibrated = False
self.raw_ad_value = 0
self.normalized_ad_value = 0
self.scaled_value = 0
self.calibrated_value = 0
def get_input_values(self):
values = self.brick.get_input_values(self.port)
(self.port, self.valid, self.calibrated, self.sensor_ty...
self.mode, self.raw_ad_value, self.normalized_ad_value,
self.scaled_value, self.calibrated_value) = values
return values
def reset_input_scaled_value(self):
self.brick.reset_input_scaled_value()
def get_sample(self):
self.get_input_values()
return self.scaled_value
class TouchSensor(AnalogSensor):
def __init__(self, brick, port):
super(TouchSensor, self).__init__(brick, port)
self.sensor_type = Type.SWITCH
self.mode = Mode.BOOLEAN
self.set_input_mode()
def is_pressed(self):
return bool(self.scaled_value)
def get_sample(self):
self.get_input_values()
return self.is_pressed()
class LightSensor(AnalogSensor):
def __init__(self, brick, port):
super(LightSensor, self).__init__(brick, port)
self.set_illuminated(True)
def set_illuminated(self, active):
if active:
self.sensor_type = Type.LIGHT_ACTIVE
else:
self.sensor_type = Type.LIGHT_INACTIVE
self.set_input_mode()
class SoundSensor(AnalogSensor):
def __init__(self, brick, port):
super(SoundSensor, self).__init__(brick, port)
self.set_adjusted(True)
def set_adjusted(self, active):
if active:
self.sensor_type = Type.SOUND_DBA
else:
self.sensor_type = Type.SOUND_DB
self.set_input_mode()
I2C_ADDRESS = {
0x00: ('version', 8),
0x08: ('product_id', 8),
0x10: ('sensor_type', 8),
0x11: ('factory_zero', 1), # is this really correct?
0x12: ('factory_scale_factor', 1),
0x13: ('factory_scale_divisor', 1),
0x14: ('measurement_units', 1),
}
def _make_query(address, n_bytes):
def query(self):
data = self.i2c_query(address, n_bytes)
if n_bytes == 1:
return ord(data)
else:
return data
return query
class _Meta(type):
'Metaclass which adds accessor methods for I2C addresses'
def __init__(cls, name, bases, dict):
super(_Meta, cls).__init__(name, bases, dict)
for address in I2C_ADDRESS:
name, n_bytes = I2C_ADDRESS[address]
q = _make_query(address, n_bytes)
setattr(cls, 'get_' + name, q)
class DigitalSensor(Sensor):
__metaclass__ = _Meta
I2C_DEV = 0x02
def __init__(self, brick, port):
super(DigitalSensor, self).__init__(brick, port)
def _ls_get_status(self, n_bytes):
for n in range(3):
try:
b = self.brick.ls_get_status(self.port)
if b >= n_bytes:
return b
except I2CPendingError:
sleep(0.01)
raise I2CError, 'ls_get_status timeout'
def i2c_command(self, address, value):
msg = chr(DigitalSensor.I2C_DEV) + chr(address) + chr(v...
self.brick.ls_write(self.port, msg, 0)
def i2c_query(self, address, n_bytes):
msg = chr(DigitalSensor.I2C_DEV) + chr(address)
self.brick.ls_write(self.port, msg, n_bytes)
self._ls_get_status(n_bytes)
data = self.brick.ls_read(self.port)
if len(data) < n_bytes:
raise I2CError, 'Read failure'
return data[-n_bytes:]
class CommandState(object):
# NOTE: just a namespace (enumeration)
OFF = 0x00
SINGLE_SHOT = 0x01
CONTINUOUS_MEASUREMENT = 0x02
EVENT_CAPTURE = 0x03 # Check for ultrasonic interference
REQUEST_WARM_RESET = 0x04
# I2C addresses for an Ultrasonic sensor
I2C_ADDRESS_US = {
0x40: ('continuous_measurement_interval', 1, True),
0x41: ('command_state', 1, True),
0x42: ('measurement_byte_0', 1, False),
0x43: ('measurement_byte_1', 1, False),
0x44: ('measurement_byte_2', 1, False),
0x45: ('measurement_byte_3', 1, False),
0x46: ('measurement_byte_4', 1, False),
0x47: ('measurement_byte_5', 1, False),
0x48: ('measurement_byte_6', 1, False),
0x49: ('measurement_byte_7', 1, False),
0x50: ('actual_zero', 1, True),
0x51: ('actual_scale_factor', 1, True),
0x52: ('actual_scale_divisor', 1, True),
}
def _make_command(address):
def command(self, value):
self.i2c_command(address, value)
return command
class _MetaUS(_Meta):
'Metaclass which adds accessor methods for US I2C addres...
def __init__(cls, name, bases, dict):
super(_MetaUS, cls).__init__(name, bases, dict)
for address in I2C_ADDRESS_US:
name, n_bytes, set_method = I2C_ADDRESS_US[address]
q = _make_query(address, n_bytes)
setattr(cls, 'get_' + name, q)
if set_method:
c = _make_command(address)
setattr(cls, 'set_' + name, c)
class UltrasonicSensor(DigitalSensor):
__metaclass__ = _MetaUS
def __init__(self, brick, port):
super(UltrasonicSensor, self).__init__(brick, port)
self.sensor_type = Type.LOW_SPEED_9V
self.mode = Mode.RAW
self.set_input_mode()
sleep(0.1) # Give I2C time to initialize
def get_single_shot_measurement(self):
self.set_command_state(CommandState.SINGLE_SHOT)
return self.get_measurement_byte_0()
UltrasonicSensor.get_sample = UltrasonicSensor.get_measur...
# nxt.system module -- LEGO Mindstorms NXT system telegrams
# Copyright (C) 2006 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
def _create(opcode):
'Create a simple system telegram'
from telegram import Telegram
return Telegram(False, opcode)
def _create_with_file(opcode, fname):
tgram = _create(opcode)
tgram.add_filename(fname)
return tgram
def _create_with_handle(opcode, handle):
tgram = _create(opcode)
tgram.add_u8(handle)
return tgram
def open_read(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_open_read(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u32()
return (handle, n_bytes)
def open_write(opcode, fname, n_bytes):
tgram = _create_with_file(opcode, fname)
tgram.add_u32(n_bytes)
return tgram
def _parse_open_write(tgram):
tgram.check_status()
handle = tgram.parse_u8()
return handle
def read(opcode, handle, n_bytes):
tgram = _create_with_handle(opcode, handle)
tgram.add_u16(n_bytes)
return tgram
def _parse_read(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u16()
data = tgram.parse_string()
return (handle, n_bytes, data)
def write(opcode, handle, data):
tgram = _create_with_handle(opcode, handle)
tgram.add_string(len(data), data)
return tgram
def _parse_write(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u16()
return (handle, n_bytes)
def close(opcode, handle):
return _create_with_handle(opcode, handle)
def _parse_close(tgram):
tgram.check_status()
handle = tgram.parse_u8()
return handle
def delete(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_delete(tgram):
tgram.check_status()
handle = tgram.parse_u8()
fname = tgram.parse_string()
return (handle, fname)
def find_first(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_find(tgram):
tgram.check_status()
handle = tgram.parse_u8()
fname = tgram.parse_string(20)
n_bytes = tgram.parse_u32()
return (handle, fname, n_bytes)
def find_next(opcode, handle):
return _create_with_handle(opcode, handle)
def get_firmware_version(opcode):
return _create(opcode)
def _parse_get_firmware_version(tgram):
tgram.check_status()
prot_minor = tgram.parse_u8()
prot_major = tgram.parse_u8()
prot_version = (prot_major, prot_minor)
fw_minor = tgram.parse_u8()
fw_major = tgram.parse_u8()
fw_version = (fw_major, fw_minor)
return (prot_version, fw_version)
def open_write_linear(opcode, fname, n_bytes):
tgram = _create_with_file(opcode, fname)
tgram.add_u32(n_bytes)
return tgram
def open_read_linear(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_open_read_linear(tgram):
tgram.check_status()
n_bytes = tgram.parse_u32()
return n_bytes
def open_write_data(opcode, fname, n_bytes):
tgram = _create_with_file(opcode, fname)
tgram.add_u32(n_bytes)
return tgram
def open_append_data(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_open_append_data(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u32()
return (handle, n_bytes)
def request_first_module(opcode, mname):
return _create_with_file(opcode, mname)
def _parse_request_module(tgram):
tgram.check_status()
handle = tgram.parse_u8()
mname = tgram.parse_string(20)
mod_id = tgram.parse_u32()
mod_size = tgram.parse_u32()
mod_iomap_size = tgram.parse_u16()
return (handle, mname, mod_id, mod_size, mod_iomap_size)
def request_next_module(opcode, handle):
return _create_with_handle(opcode, handle)
def close_module_handle(opcode, handle):
return _create_with_handle(opcode, handle)
def read_io_map(opcode, mod_id, offset, n_bytes):
tgram = _create(opcode)
tgram.add_u32(mod_id)
tgram.add_u16(offset)
tgram.add_u16(n_bytes)
return tgram
def _parse_read_io_map(tgram):
tgram.check_status()
mod_id = tgram.parse_u32()
n_bytes = tgram.parse_u16()
contents = tgram.parse_string()
return (mod_id, n_bytes, contents)
def write_io_map(opcode, mod_id, offset, content):
tgram = _create(opcode)
tgram.add_u32(mod_id)
tgram.add_u16(offset)
tgram.add_u16(len(content))
tgram.add_string(len(content), content)
return tgram
def _parse_write_io_map(tgram):
tgram.check_status()
mod_id = tgram.parse_u32()
n_bytes = tgram.parse_u16()
return (mod_id, n_bytes)
def boot(opcode):
# Note: this command is USB only (no Bluetooth)
tgram = _create(opcode)
tgram.add_string(19, "Let's dance: SAMBA\0")
return tgram
def _parse_boot(tgram):
tgram.check_status()
resp = tgram.parse_string()
# Resp should be 'Yes\0'
return resp
def set_brick_name(opcode, bname):
tgram = _create(opcode)
# FIXME: validate brick name
tgram.add_string(len(bname), bname)
return tgram
def _parse_set_brick_name(tgram):
tgram.check_status()
def get_device_info(opcode):
return _create(opcode)
def _parse_get_device_info(tgram):
tgram.check_status()
name = tgram.parse_string(15)
a0 = tgram.parse_u8()
a1 = tgram.parse_u8()
a2 = tgram.parse_u8()
a3 = tgram.parse_u8()
a4 = tgram.parse_u8()
a5 = tgram.parse_u8()
a6 = tgram.parse_u8()
# FIXME: what is a6 for?
address = '%02X:%02X:%02X:%02X:%02X:%02X' % (a0, a1, a2,...
signal_strength = tgram.parse_u32()
user_flash = tgram.parse_u32()
return (name, address, signal_strength, user_flash)
def delete_user_flash(opcode):
return _create(opcode)
def _parse_delete_user_flash(tgram):
tgram.check_status()
def poll_command_length(opcode, buf_num):
tgram = _create(opcode)
tgram.add_u8(buf_num)
return tgram
def _parse_poll_command_length(tgram):
buf_num = tgram.parse_u8()
tgram.check_status()
n_bytes = tgram.parse_u8()
return (buf_num, n_bytes)
def poll_command(opcode, buf_num, n_bytes):
tgram = _create(opcode)
tgram.add_u8(buf_num)
tgram.add_u8(n_bytes)
return tgram
def _parse_poll_command(tgram):
buf_num = tgram.parse_u8()
tgram.check_status()
n_bytes = tgram.parse_u8()
command = tgram.parse_string()
return (buf_num, n_bytes, command)
def bluetooth_factory_reset(opcode):
# Note: this command is USB only (no Bluetooth)
return _create(opcode)
def _parse_bluetooth_factory_reset(tgram):
tgram.check_status()
OPCODES = {
0x80: (open_read, _parse_open_read),
0x81: (open_write, _parse_open_write),
0x82: (read, _parse_read),
0x83: (write, _parse_write),
0x84: (close, _parse_close),
0x85: (delete, _parse_delete),
0x86: (find_first, _parse_find),
0x87: (find_next, _parse_find),
0x88: (get_firmware_version, _parse_get_firmware_version),
0x89: (open_write_linear, _parse_open_write),
0x8A: (open_read_linear, _parse_open_read_linear),
0x8B: (open_write_data, _parse_open_write),
0x8C: (open_append_data, _parse_open_append_data),
0x90: (request_first_module, _parse_request_module),
0x91: (request_next_module, _parse_request_module),
0x92: (close_module_handle, _parse_close),
0x94: (read_io_map, _parse_read_io_map),
0x95: (write_io_map, _parse_write_io_map),
0x97: (boot, _parse_boot),
0x98: (set_brick_name, _parse_set_brick_name),
0x9B: (get_device_info, _parse_get_device_info),
0xA0: (delete_user_flash, _parse_delete_user_flash),
0xA1: (poll_command_length, _parse_poll_command_length),
0xA2: (poll_command, _parse_poll_command),
0xA4: (bluetooth_factory_reset, _parse_bluetooth_factory...
}
# nxt.telegram module -- LEGO Mindstorms NXT telegram for...
# Copyright (C) 2006 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
from cStringIO import StringIO
from struct import pack, unpack
import nxt.error
class InvalidReplyError(Exception):
pass
class InvalidOpcodeError(Exception):
pass
class Telegram(object):
TYPE = 0 # type byte offset
CODE = 1 # code byte offset
DATA = 2 # data byte offset
TYPE_NOT_DIRECT = 0x01 # system vs. direct type
TYPE_REPLY = 0x02 # reply type (from NXT brick)
TYPE_REPLY_NOT_REQUIRED = 0x80 # reply not required flag
def __init__(self, direct=False, opcode=0, reply_req=Tru...
if pkt:
self.pkt = StringIO(pkt)
self.typ = self.parse_u8()
self.opcode = self.parse_u8()
if not self.is_reply():
raise InvalidReplyError
if self.opcode != opcode:
raise InvalidOpcodeError, self.opcode
else:
self.pkt = StringIO()
typ = 0
if not direct:
typ |= Telegram.TYPE_NOT_DIRECT
if not reply_req:
typ |= Telegram.TYPE_REPLY_NOT_REQUIRED
self.add_u8(typ)
self.add_u8(opcode)
def __str__(self):
return self.pkt.getvalue()
def is_reply(self):
return self.typ == Telegram.TYPE_REPLY
def add_string(self, n_bytes, v):
self.pkt.write(pack('%ds' % n_bytes, v))
def add_filename(self, fname):
self.pkt.write(pack('20s', fname))
def add_s8(self, v):
self.pkt.write(pack('<b', v))
def add_u8(self, v):
self.pkt.write(pack('<B', v))
def add_s16(self, v):
self.pkt.write(pack('<h', v))
def add_u16(self, v):
self.pkt.write(pack('<H', v))
def add_s32(self, v):
self.pkt.write(pack('<i', v))
def add_u32(self, v):
self.pkt.write(pack('<I', v))
def parse_string(self, n_bytes=0):
if n_bytes:
return unpack('%ss' % n_bytes,
self.pkt.read(n_bytes))[0]
else:
return self.pkt.read()
def parse_s8(self):
return unpack('<b', self.pkt.read(1))[0]
def parse_u8(self):
return unpack('<B', self.pkt.read(1))[0]
def parse_s16(self):
return unpack('<h', self.pkt.read(2))[0]
def parse_u16(self):
return unpack('<H', self.pkt.read(2))[0]
def parse_s32(self):
return unpack('<i', self.pkt.read(4))[0]
def parse_u32(self):
return unpack('<I', self.pkt.read(4))[0]
def check_status(self):
nxt.error.check_status(self.parse_u8())
import nxt.direct
import nxt.system
OPCODES = dict(nxt.system.OPCODES)
OPCODES.update(nxt.direct.OPCODES)
# nxt.usbsock module -- USB socket communication with LEG...
# Copyright (C) 2006-2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
import usb
from nxt.brick import Brick
ID_VENDOR_LEGO = 0x0694
ID_PRODUCT_NXT = 0x0002
class USBSock(object):
bsize = 60 # USB socket block size
def __init__(self, device):
self.device = device
self.handle = None
self.debug = False
def __str__(self):
return 'USB (%s)' % (self.device.filename)
def connect(self):
config = self.device.configurations[0]
iface = config.interfaces[0][0]
self.blk_out, self.blk_in = iface.endpoints
self.handle = self.device.open()
self.handle.setConfiguration(1)
self.handle.claimInterface(0)
self.handle.reset()
return Brick(self)
def close(self):
self.device = None
self.handle = None
self.blk_out = None
self.blk_in = None
def send(self, data):
if self.debug:
print 'Send:',
print ':'.join('%02x' % ord(c) for c in data)
self.handle.bulkWrite(self.blk_out.address, data)
def recv(self):
data = self.handle.bulkRead(self.blk_in.address, 64)
if self.debug:
print 'Recv:',
print ':'.join('%02x' % (c & 0xFF) for c in data)
# NOTE: bulkRead returns a tuple of ints ... make it sane
return ''.join(chr(d & 0xFF) for d in data)
def find_bricks(host=None, name=None):
# FIXME: probably should check host and name
for bus in usb.busses():
for device in bus.devices:
if device.idVendor == ID_VENDOR_LEGO and \
device.idProduct == ID_PRODUCT_NXT:
yield USBSock(device)
終了行:
#navi(linux関連);
#hr
# nxt.__init__ module -- LEGO Mindstorms NXT python package
# Copyright (C) 2006 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
# nxt.bluesock module -- Bluetooth socket communication w...
# Copyright (C) 2006-2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
import bluetooth
import os
from nxt.brick import Brick
class BlueSock(object):
bsize = 118 # Bluetooth socket block size
PORT = 1 # Standard NXT rfcomm port
def __init__(self, host):
self.host = host
self.sock = None
self.debug = False
def __str__(self):
return 'Bluetooth (%s)' % self.host
def connect(self):
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
sock.connect((self.host, BlueSock.PORT))
self.sock = sock
return Brick(self)
def close(self):
self.sock.close()
def send(self, data):
if self.debug:
print 'Send:',
print ':'.join('%02x' % ord(c) for c in data)
l0 = len(data) & 0xFF
l1 = (len(data) >> 8) & 0xFF
d = chr(l0) + chr(l1) + data
self.sock.send(d)
def recv(self):
data = self.sock.recv(2)
l0 = ord(data[0])
l1 = ord(data[1])
plen = l0 + (l1 << 8)
data = self.sock.recv(plen)
if self.debug:
print 'Recv:',
print ':'.join('%02x' % ord(c) for c in data)
return data
def _check_brick(arg, value):
return arg is None or arg == value
def find_bricks(host=None, name=None):
for h, n in bluetooth.discover_devices(lookup_names=True):
if _check_brick(host, h) and _check_brick(name, n):
yield BlueSock(h)
# nxt.brick module -- Classes to represent LEGO Mindstorm...
# Copyright (C) 2006 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
from time import sleep
from nxt.error import FileNotFound, ModuleNotFound
from nxt.telegram import OPCODES, Telegram
def _make_poller(opcode, poll_func, parse_func):
def poll(self, *args, **kwargs):
ogram = poll_func(opcode, *args, **kwargs)
self.sock.send(str(ogram))
igram = Telegram(opcode=opcode, pkt=self.sock.recv())
return parse_func(igram)
return poll
class _Meta(type):
'Metaclass which adds one method for each telegram opcode'
def __init__(cls, name, bases, dict):
super(_Meta, cls).__init__(name, bases, dict)
for opcode in OPCODES:
poll_func, parse_func = OPCODES[opcode]
m = _make_poller(opcode, poll_func, parse_func)
setattr(cls, poll_func.__name__, m)
class Brick(object):
__metaclass__ = _Meta
def __init__(self, sock):
self.sock = sock
def play_tone_and_wait(self, frequency, duration):
self.play_tone(frequency, duration)
sleep(duration / 1000.0)
class FileFinder(object):
'Context manager to find files on a NXT brick'
def __init__(self, brick, pattern):
self.brick = brick
self.pattern = pattern
self.handle = None
def __enter__(self):
return self
def __exit__(self, etp, value, tb):
if self.handle:
self.brick.close(self.handle)
def __iter__(self):
self.handle, fname, size = self.brick.find_first(self.p...
yield (fname, size)
while True:
try:
handle, fname, size = self.brick.find_next(
self.handle)
yield (fname, size)
except FileNotFound:
break
class FileReader(object):
'Context manager to read a file on a NXT brick'
def __init__(self, brick, fname):
self.brick = brick
self.fname = fname
def __enter__(self):
self.handle, self.size = self.brick.open_read(self.fname)
return self
def __exit__(self, etp, value, tb):
self.brick.close(self.handle)
def __iter__(self):
rem = self.size
bsize = self.brick.sock.bsize
while rem > 0:
handle, bsize, data = self.brick.read(self.handle,
min(bsize, rem))
yield data
rem -= len(data)
class FileWriter(object):
'Context manager to write a file to a NXT brick'
def __init__(self, brick, fname, fil):
self.brick = brick
self.fname = fname
self.fil = fil
fil.seek(0, 2) # seek to end of file
self.size = fil.tell()
fil.seek(0) # seek to start of file
def __enter__(self):
self.handle = self.brick.open_write(self.fname, self.si...
return self
def __exit__(self, etp, value, tb):
self.brick.close(self.handle)
def __iter__(self):
rem = self.size
bsize = self.brick.sock.bsize
while rem > 0:
b = min(bsize, rem)
handle, size = self.brick.write(self.handle,
self.fil.read(b))
yield size
rem -= b
class ModuleFinder(object):
'Context manager to lookup modules on a NXT brick'
def __init__(self, brick, pattern):
self.brick = brick
self.pattern = pattern
self.handle = None
def __enter__(self):
return self
def __exit__(self, etp, value, tb):
if self.handle:
self.brick.close(self.handle)
def __iter__(self):
self.handle, mname, mid, msize, miomap_size = \
self.brick.request_first_module(self.pattern)
yield (mname, mid, msize, miomap_size)
while True:
try:
handle, mname, mid, msize, miomap_size = \
self.brick.request_next_module(
self.handle)
yield (mname, mid, msize, miomap_size)
except ModuleNotFound:
break
# nxt.compass module -- Classes to read Mindsensors Compa...
# Copyright (C) 2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
import sensor
from time import sleep
class Command(object):
# NOTE: just a namespace (enumeration)
AUTO_TRIG_ON = (0x41, 0x02)
AUTO_TRIG_OFF = (0x53, 0x01)
MAP_HEADING_BYTE = 0x42 # map heading to 0-255 range
MAP_HEADING_INTEGER = 0x49 # map heading to 0-36000 range
SAMPLING_50_HZ = 0x45 # set sampling frequency to 50 Hz
SAMPLING_60_HZ = 0x55 # set sampling frequency to 60 Hz
SET_ADPA_MODE_ON = 0x4E # set ADPA mode on
SET_ADPA_MODE_OFF = 0x4F # set ADPA mode off
BEGIN_CALIBRATION = 0x43 # begin calibration
DONE_CALIBRATION = 0x44 # done with calibration
LOAD_USER_CALIBRATION = 0x4C # load user calibration value
# I2C addresses for a Mindsensors CMPS-Nx compass sensor
I2C_ADDRESS_CMPS_NX = {
0x41: ('command', 1, True),
0x42: ('heading_lsb', 1, False),
0x43: ('heading_msb', 1, False),
0x44: ('x_offset_lsb', 1, True),
0x45: ('x_offset_msb', 1, True),
0x46: ('y_offset_lsb', 1, True),
0x47: ('y_offset_msb', 1, True),
0x48: ('x_range_lsb', 1, True),
0x49: ('x_range_msb', 1, True),
0x4A: ('y_range_lsb', 1, True),
0x4B: ('y_range_msb', 1, True),
0x4C: ('x_raw_lsb', 1, True),
0x4D: ('x_raw_msb', 1, True),
0x4E: ('y_raw_lsb', 1, True),
0x4F: ('y_raw_msb', 1, True),
}
class _MetaCMPS_Nx(sensor._Meta):
'Metaclass which adds accessor methods for CMPS-Nx I2C a...
def __init__(cls, name, bases, dict):
super(_MetaCMPS_Nx, cls).__init__(name, bases, dict)
for address in I2C_ADDRESS_CMPS_NX:
name, n_bytes, set_method = I2C_ADDRESS_CMPS_NX[address]
q = sensor._make_query(address, n_bytes)
setattr(cls, 'get_' + name, q)
if set_method:
c = sensor._make_command(address)
setattr(cls, 'set_' + name, c)
class CompassSensor(sensor.DigitalSensor):
__metaclass__ = _MetaCMPS_Nx
def __init__(self, brick, port):
super(CompassSensor, self).__init__(brick, port)
self.sensor_type = Type.LOW_SPEED_9V
self.mode = Mode.RAW
self.set_input_mode()
sleep(0.1) # Give I2C time to initialize
CompassSensor.get_sample = CompassSensor.get_heading_lsb
# nxt.direct module -- LEGO Mindstorms NXT direct telegrams
# Copyright (C) 2006-2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
def _create(opcode):
'Create a simple direct telegram'
from telegram import Telegram
return Telegram(True, opcode)
def start_program(opcode, fname):
tgram = _create(opcode)
tgram.add_filename(fname)
return tgram
def _parse_simple(tgram):
tgram.check_status()
def stop_program(opcode):
return _create(opcode)
def play_sound_file(opcode, loop, fname):
tgram = _create(opcode)
tgram.add_u8(loop)
tgram.add_filename(fname)
return tgram
def play_tone(opcode, frequency, duration):
'Play a tone at frequency (Hz) for duration (ms)'
tgram = _create(opcode)
tgram.add_u16(frequency)
tgram.add_u16(duration)
return tgram
def set_output_state(opcode, port, power, mode, regulatio...
run_state, tacho_limit):
tgram = _create(opcode)
tgram.add_u8(port)
tgram.add_s8(power)
tgram.add_u8(mode)
tgram.add_u8(regulation)
tgram.add_s8(turn_ratio)
tgram.add_u8(run_state)
tgram.add_u32(tacho_limit)
return tgram
def set_input_mode(opcode, port, sensor_type, sensor_mode):
tgram = _create(opcode)
tgram.add_u8(port)
tgram.add_u8(sensor_type)
tgram.add_u8(sensor_mode)
return tgram
def get_output_state(opcode, port):
tgram = _create(opcode)
tgram.add_u8(port)
return tgram
def _parse_get_output_state(tgram):
tgram.check_status()
port = tgram.parse_u8()
power = tgram.parse_s8()
mode = tgram.parse_u8()
regulation = tgram.parse_u8()
turn_ratio = tgram.parse_s8()
run_state = tgram.parse_u8()
tacho_limit = tgram.parse_u32()
tacho_count = tgram.parse_s32()
block_tacho_count = tgram.parse_s32()
rotation_count = tgram.parse_s32()
return (port, power, mode, regulation, turn_ratio, run_s...
tacho_limit, tacho_count, block_tacho_count, rotation_c...
def get_input_values(opcode, port):
tgram = _create(opcode)
tgram.add_u8(port)
return tgram
def _parse_get_input_values(tgram):
tgram.check_status()
port = tgram.parse_u8()
valid = tgram.parse_u8()
calibrated = tgram.parse_u8()
sensor_type = tgram.parse_u8()
sensor_mode = tgram.parse_u8()
raw_ad_value = tgram.parse_u16()
normalized_ad_value = tgram.parse_u16()
scaled_value = tgram.parse_s16()
calibrated_value = tgram.parse_s16()
return (port, valid, calibrated, sensor_type, sensor_mod...
normalized_ad_value, scaled_value, calibrated_value)
def reset_input_scaled_value(opcode, port):
tgram = _create(opcode)
tgram.add_u8(port)
return tgram
def message_write(opcode, inbox, message):
tgram = _create(opcode)
tgram.add_u8(inbox)
tgram.add_u8(len(message) + 1)
tgram.add_string(len(message), message)
tgram.add_u8(0)
return tgram
def reset_motor_position(opcode, port, relative):
tgram = _create(opcode)
tgram.add_u8(port)
tgram.add_u8(relative)
return tgram
def get_battery_level(opcode):
return _create(opcode)
def _parse_get_battery_level(tgram):
tgram.check_status()
millivolts = tgram.parse_u16()
return millivolts
def stop_sound_playback(opcode):
return _create(opcode)
def keep_alive(opcode):
return _create(opcode)
def _parse_keep_alive(tgram):
tgram.check_status()
sleep_time = tgram.parse_u32()
return sleep_time
def ls_get_status(opcode, port):
'Get status of low-speed sensor (ultrasonic)'
tgram = _create(opcode)
tgram.add_u8(port)
return tgram
def _parse_ls_get_status(tgram):
tgram.check_status()
n_bytes = tgram.parse_u8()
return n_bytes
def ls_write(opcode, port, tx_data, rx_bytes):
'Write a low-speed command to a sensor (ultrasonic)'
tgram = _create(opcode)
tgram.add_u8(port)
tgram.add_u8(len(tx_data))
tgram.add_u8(rx_bytes)
tgram.add_string(len(tx_data), tx_data)
return tgram
def ls_read(opcode, port):
'Read a low-speed sensor value (ultrasonic)'
tgram = _create(opcode)
tgram.add_u8(port)
return tgram
def _parse_ls_read(tgram):
tgram.check_status()
n_bytes = tgram.parse_u8()
contents = tgram.parse_string()
return contents[:n_bytes]
def get_current_program_name(opcode):
return _create(opcode)
def _parse_get_current_program_name(tgram):
tgram.check_status()
fname = tgram.parse_string()
return fname
def message_read(opcode, remote_inbox, local_inbox, remov...
tgram = _create(opcode)
tgram.add_u8(remote_inbox)
tgram.add_u8(local_inbox)
tgram.add_u8(remove)
return tgram
def _parse_message_read(tgram):
tgram.check_status()
local_inbox = tgram.parse_u8()
n_bytes = tgram.parse_u8()
message = tgram.parse_string()
return (local_inbox, message[:n_bytes])
OPCODES = {
0x00: (start_program, _parse_simple),
0x01: (stop_program, _parse_simple),
0x02: (play_sound_file, _parse_simple),
0x03: (play_tone, _parse_simple),
0x04: (set_output_state, _parse_simple),
0x05: (set_input_mode, _parse_simple),
0x06: (get_output_state, _parse_get_output_state),
0x07: (get_input_values, _parse_get_input_values),
0x08: (reset_input_scaled_value, _parse_simple),
0x09: (message_write, _parse_simple),
0x0A: (reset_motor_position, _parse_simple),
0x0B: (get_battery_level, _parse_get_battery_level),
0x0C: (stop_sound_playback, _parse_simple),
0x0D: (keep_alive, _parse_keep_alive),
0x0E: (ls_get_status, _parse_ls_get_status),
0x0F: (ls_write, _parse_simple),
0x10: (ls_read, _parse_ls_read),
0x11: (get_current_program_name, _parse_get_current_prog...
0x13: (message_read, _parse_message_read),
}
# nxt.error module -- LEGO Mindstorms NXT error handling
# Copyright (C) 2006,2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
class ProtocolError(Exception):
pass
class SysProtError(ProtocolError):
pass
class FileExistsError(SysProtError):
pass
class FileNotFound(SysProtError):
pass
class ModuleNotFound(SysProtError):
pass
class DirProtError(ProtocolError):
pass
class I2CError(DirProtError):
pass
class I2CPendingError(I2CError):
pass
CODES = {
0x00: None,
0x20: I2CPendingError('Pending communication transaction...
0x40: DirProtError('Specified mailbox queue is empty'),
0x81: SysProtError('No more handles'),
0x82: SysProtError('No space'),
0x83: SysProtError('No more files'),
0x84: SysProtError('End of file expected'),
0x85: SysProtError('End of file'),
0x86: SysProtError('Not a linear file'),
0x87: FileNotFound('File not found'),
0x88: SysProtError('Handle already closed'),
0x89: SysProtError('No linear space'),
0x8A: SysProtError('Undefined error'),
0x8B: SysProtError('File is busy'),
0x8C: SysProtError('No write buffers'),
0x8D: SysProtError('Append not possible'),
0x8E: SysProtError('File is full'),
0x8F: FileExistsError('File exists'),
0x90: ModuleNotFound('Module not found'),
0x91: SysProtError('Out of bounds'),
0x92: SysProtError('Illegal file name'),
0x93: SysProtError('Illegal handle'),
0xBD: DirProtError('Request failed (i.e. specified file ...
0xBE: DirProtError('Unknown command opcode'),
0xBF: DirProtError('Insane packet'),
0xC0: DirProtError('Data contains out-of-range values'),
0xDD: DirProtError('Communication bus error'),
0xDE: DirProtError('No free memory in communication buff...
0xDF: DirProtError('Specified channel/connection is not ...
0xE0: I2CError('Specified channel/connection not configu...
0xEC: DirProtError('No active program'),
0xED: DirProtError('Illegal size specified'),
0xEE: DirProtError('Illegal mailbox queue ID specified'),
0xEF: DirProtError('Attempted to access invalid field of...
0xF0: DirProtError('Bad input or output specified'),
0xFB: DirProtError('Insufficient memory available'),
0xFF: DirProtError('Bad arguments'),
}
def check_status(status):
if status:
ex = CODES.get(status)
if ex:
raise ex
else:
raise ProtocolError, status
# nxt.locator module -- Locate LEGO Minstorms NXT bricks ...
# Copyright (C) 2006-2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
class BrickNotFoundError(Exception):
pass
def find_bricks(host=None, name=None):
try:
import usbsock
socks = usbsock.find_bricks(host, name)
for s in socks:
yield s
except ImportError:
pass
try:
import bluesock
from bluetooth import BluetoothError
try:
socks = bluesock.find_bricks(host, name)
for s in socks:
yield s
except BluetoothError:
pass
except ImportError:
pass
def find_one_brick(host=None, name=None):
for s in find_bricks(host, name):
return s
raise BrickNotFoundError
# nxt.motor module -- Class to control LEGO Mindstorms NX...
# Copyright (C) 2006 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
PORT_A = 0x00
PORT_B = 0x01
PORT_C = 0x02
PORT_ALL = 0xFF
MODE_IDLE = 0x00
MODE_MOTOR_ON = 0x01
MODE_BRAKE = 0x02
MODE_REGULATED = 0x04
REGULATION_IDLE = 0x00
REGULATION_MOTOR_SPEED = 0x01
REGULATION_MOTOR_SYNC = 0x02
RUN_STATE_IDLE = 0x00
RUN_STATE_RAMP_UP = 0x10
RUN_STATE_RUNNING = 0x20
RUN_STATE_RAMP_DOWN = 0x40
LIMIT_RUN_FOREVER = 0
class Motor(object):
def __init__(self, brick, port):
self.brick = brick
self.port = port
self.power = 0
self.mode = MODE_IDLE
self.regulation = REGULATION_IDLE
self.turn_ratio = 0
self.run_state = RUN_STATE_IDLE
self.tacho_limit = LIMIT_RUN_FOREVER
self.tacho_count = 0
self.block_tacho_count = 0
self.rotation_count = 0
def set_output_state(self):
self.brick.set_output_state(self.port, self.power, self...
self.regulation, self.turn_ratio, self.run_state,
self.tacho_limit)
def get_output_state(self):
values = self.brick.get_output_state(self.port)
(self.port, self.power, self.mode, self.regulation,
self.turn_ratio, self.run_state, self.tacho_limit,
tacho_count, block_tacho_count, rotation_count) = values
return values
def reset_position(self, relative):
self.brick.reset_motor_position(self.port, relative)
# nxt.sensor module -- Classes to read LEGO Mindstorms NX...
# Copyright (C) 2006,2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
from time import sleep
from nxt.error import I2CError, I2CPendingError
PORT_1 = 0x00
PORT_2 = 0x01
PORT_3 = 0x02
PORT_4 = 0x03
class Type(object):
# NOTE: just a namespace (enumeration)
NO_SENSOR = 0x00
SWITCH = 0x01 # Touch sensor
TEMPERATURE = 0x02
REFLECTION = 0x03
ANGLE = 0x04
LIGHT_ACTIVE = 0x05 # Light sensor (illuminated)
LIGHT_INACTIVE = 0x06 # Light sensor (ambient)
SOUND_DB = 0x07 # Sound sensor (unadjusted)
SOUND_DBA = 0x08 # Sound sensor (adjusted)
CUSTOM = 0x09
LOW_SPEED = 0x0A
LOW_SPEED_9V = 0x0B # Low-speed I2C (Ultrasonic sensor)
class Mode(object):
# NOTE: just a namespace (enumeration)
RAW = 0x00
BOOLEAN = 0x20
TRANSITION_CNT = 0x40
PERIOD_COUNTER = 0x60
PCT_FULL_SCALE = 0x80
CELSIUS = 0xA0
FAHRENHEIT = 0xC0
ANGLE_STEPS = 0xE0
MASK = 0xE0
MASK_SLOPE = 0x1F # Why isn't this slope thing documented?
class Sensor(object):
def __init__(self, brick, port):
self.brick = brick
self.port = port
self.sensor_type = Type.NO_SENSOR
self.mode = Mode.RAW
def set_input_mode(self):
self.brick.set_input_mode(self.port, self.sensor_type,
self.mode)
class AnalogSensor(Sensor):
def __init__(self, brick, port):
super(AnalogSensor, self).__init__(brick, port)
self.valid = False
self.calibrated = False
self.raw_ad_value = 0
self.normalized_ad_value = 0
self.scaled_value = 0
self.calibrated_value = 0
def get_input_values(self):
values = self.brick.get_input_values(self.port)
(self.port, self.valid, self.calibrated, self.sensor_ty...
self.mode, self.raw_ad_value, self.normalized_ad_value,
self.scaled_value, self.calibrated_value) = values
return values
def reset_input_scaled_value(self):
self.brick.reset_input_scaled_value()
def get_sample(self):
self.get_input_values()
return self.scaled_value
class TouchSensor(AnalogSensor):
def __init__(self, brick, port):
super(TouchSensor, self).__init__(brick, port)
self.sensor_type = Type.SWITCH
self.mode = Mode.BOOLEAN
self.set_input_mode()
def is_pressed(self):
return bool(self.scaled_value)
def get_sample(self):
self.get_input_values()
return self.is_pressed()
class LightSensor(AnalogSensor):
def __init__(self, brick, port):
super(LightSensor, self).__init__(brick, port)
self.set_illuminated(True)
def set_illuminated(self, active):
if active:
self.sensor_type = Type.LIGHT_ACTIVE
else:
self.sensor_type = Type.LIGHT_INACTIVE
self.set_input_mode()
class SoundSensor(AnalogSensor):
def __init__(self, brick, port):
super(SoundSensor, self).__init__(brick, port)
self.set_adjusted(True)
def set_adjusted(self, active):
if active:
self.sensor_type = Type.SOUND_DBA
else:
self.sensor_type = Type.SOUND_DB
self.set_input_mode()
I2C_ADDRESS = {
0x00: ('version', 8),
0x08: ('product_id', 8),
0x10: ('sensor_type', 8),
0x11: ('factory_zero', 1), # is this really correct?
0x12: ('factory_scale_factor', 1),
0x13: ('factory_scale_divisor', 1),
0x14: ('measurement_units', 1),
}
def _make_query(address, n_bytes):
def query(self):
data = self.i2c_query(address, n_bytes)
if n_bytes == 1:
return ord(data)
else:
return data
return query
class _Meta(type):
'Metaclass which adds accessor methods for I2C addresses'
def __init__(cls, name, bases, dict):
super(_Meta, cls).__init__(name, bases, dict)
for address in I2C_ADDRESS:
name, n_bytes = I2C_ADDRESS[address]
q = _make_query(address, n_bytes)
setattr(cls, 'get_' + name, q)
class DigitalSensor(Sensor):
__metaclass__ = _Meta
I2C_DEV = 0x02
def __init__(self, brick, port):
super(DigitalSensor, self).__init__(brick, port)
def _ls_get_status(self, n_bytes):
for n in range(3):
try:
b = self.brick.ls_get_status(self.port)
if b >= n_bytes:
return b
except I2CPendingError:
sleep(0.01)
raise I2CError, 'ls_get_status timeout'
def i2c_command(self, address, value):
msg = chr(DigitalSensor.I2C_DEV) + chr(address) + chr(v...
self.brick.ls_write(self.port, msg, 0)
def i2c_query(self, address, n_bytes):
msg = chr(DigitalSensor.I2C_DEV) + chr(address)
self.brick.ls_write(self.port, msg, n_bytes)
self._ls_get_status(n_bytes)
data = self.brick.ls_read(self.port)
if len(data) < n_bytes:
raise I2CError, 'Read failure'
return data[-n_bytes:]
class CommandState(object):
# NOTE: just a namespace (enumeration)
OFF = 0x00
SINGLE_SHOT = 0x01
CONTINUOUS_MEASUREMENT = 0x02
EVENT_CAPTURE = 0x03 # Check for ultrasonic interference
REQUEST_WARM_RESET = 0x04
# I2C addresses for an Ultrasonic sensor
I2C_ADDRESS_US = {
0x40: ('continuous_measurement_interval', 1, True),
0x41: ('command_state', 1, True),
0x42: ('measurement_byte_0', 1, False),
0x43: ('measurement_byte_1', 1, False),
0x44: ('measurement_byte_2', 1, False),
0x45: ('measurement_byte_3', 1, False),
0x46: ('measurement_byte_4', 1, False),
0x47: ('measurement_byte_5', 1, False),
0x48: ('measurement_byte_6', 1, False),
0x49: ('measurement_byte_7', 1, False),
0x50: ('actual_zero', 1, True),
0x51: ('actual_scale_factor', 1, True),
0x52: ('actual_scale_divisor', 1, True),
}
def _make_command(address):
def command(self, value):
self.i2c_command(address, value)
return command
class _MetaUS(_Meta):
'Metaclass which adds accessor methods for US I2C addres...
def __init__(cls, name, bases, dict):
super(_MetaUS, cls).__init__(name, bases, dict)
for address in I2C_ADDRESS_US:
name, n_bytes, set_method = I2C_ADDRESS_US[address]
q = _make_query(address, n_bytes)
setattr(cls, 'get_' + name, q)
if set_method:
c = _make_command(address)
setattr(cls, 'set_' + name, c)
class UltrasonicSensor(DigitalSensor):
__metaclass__ = _MetaUS
def __init__(self, brick, port):
super(UltrasonicSensor, self).__init__(brick, port)
self.sensor_type = Type.LOW_SPEED_9V
self.mode = Mode.RAW
self.set_input_mode()
sleep(0.1) # Give I2C time to initialize
def get_single_shot_measurement(self):
self.set_command_state(CommandState.SINGLE_SHOT)
return self.get_measurement_byte_0()
UltrasonicSensor.get_sample = UltrasonicSensor.get_measur...
# nxt.system module -- LEGO Mindstorms NXT system telegrams
# Copyright (C) 2006 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
def _create(opcode):
'Create a simple system telegram'
from telegram import Telegram
return Telegram(False, opcode)
def _create_with_file(opcode, fname):
tgram = _create(opcode)
tgram.add_filename(fname)
return tgram
def _create_with_handle(opcode, handle):
tgram = _create(opcode)
tgram.add_u8(handle)
return tgram
def open_read(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_open_read(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u32()
return (handle, n_bytes)
def open_write(opcode, fname, n_bytes):
tgram = _create_with_file(opcode, fname)
tgram.add_u32(n_bytes)
return tgram
def _parse_open_write(tgram):
tgram.check_status()
handle = tgram.parse_u8()
return handle
def read(opcode, handle, n_bytes):
tgram = _create_with_handle(opcode, handle)
tgram.add_u16(n_bytes)
return tgram
def _parse_read(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u16()
data = tgram.parse_string()
return (handle, n_bytes, data)
def write(opcode, handle, data):
tgram = _create_with_handle(opcode, handle)
tgram.add_string(len(data), data)
return tgram
def _parse_write(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u16()
return (handle, n_bytes)
def close(opcode, handle):
return _create_with_handle(opcode, handle)
def _parse_close(tgram):
tgram.check_status()
handle = tgram.parse_u8()
return handle
def delete(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_delete(tgram):
tgram.check_status()
handle = tgram.parse_u8()
fname = tgram.parse_string()
return (handle, fname)
def find_first(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_find(tgram):
tgram.check_status()
handle = tgram.parse_u8()
fname = tgram.parse_string(20)
n_bytes = tgram.parse_u32()
return (handle, fname, n_bytes)
def find_next(opcode, handle):
return _create_with_handle(opcode, handle)
def get_firmware_version(opcode):
return _create(opcode)
def _parse_get_firmware_version(tgram):
tgram.check_status()
prot_minor = tgram.parse_u8()
prot_major = tgram.parse_u8()
prot_version = (prot_major, prot_minor)
fw_minor = tgram.parse_u8()
fw_major = tgram.parse_u8()
fw_version = (fw_major, fw_minor)
return (prot_version, fw_version)
def open_write_linear(opcode, fname, n_bytes):
tgram = _create_with_file(opcode, fname)
tgram.add_u32(n_bytes)
return tgram
def open_read_linear(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_open_read_linear(tgram):
tgram.check_status()
n_bytes = tgram.parse_u32()
return n_bytes
def open_write_data(opcode, fname, n_bytes):
tgram = _create_with_file(opcode, fname)
tgram.add_u32(n_bytes)
return tgram
def open_append_data(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_open_append_data(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u32()
return (handle, n_bytes)
def request_first_module(opcode, mname):
return _create_with_file(opcode, mname)
def _parse_request_module(tgram):
tgram.check_status()
handle = tgram.parse_u8()
mname = tgram.parse_string(20)
mod_id = tgram.parse_u32()
mod_size = tgram.parse_u32()
mod_iomap_size = tgram.parse_u16()
return (handle, mname, mod_id, mod_size, mod_iomap_size)
def request_next_module(opcode, handle):
return _create_with_handle(opcode, handle)
def close_module_handle(opcode, handle):
return _create_with_handle(opcode, handle)
def read_io_map(opcode, mod_id, offset, n_bytes):
tgram = _create(opcode)
tgram.add_u32(mod_id)
tgram.add_u16(offset)
tgram.add_u16(n_bytes)
return tgram
def _parse_read_io_map(tgram):
tgram.check_status()
mod_id = tgram.parse_u32()
n_bytes = tgram.parse_u16()
contents = tgram.parse_string()
return (mod_id, n_bytes, contents)
def write_io_map(opcode, mod_id, offset, content):
tgram = _create(opcode)
tgram.add_u32(mod_id)
tgram.add_u16(offset)
tgram.add_u16(len(content))
tgram.add_string(len(content), content)
return tgram
def _parse_write_io_map(tgram):
tgram.check_status()
mod_id = tgram.parse_u32()
n_bytes = tgram.parse_u16()
return (mod_id, n_bytes)
def boot(opcode):
# Note: this command is USB only (no Bluetooth)
tgram = _create(opcode)
tgram.add_string(19, "Let's dance: SAMBA\0")
return tgram
def _parse_boot(tgram):
tgram.check_status()
resp = tgram.parse_string()
# Resp should be 'Yes\0'
return resp
def set_brick_name(opcode, bname):
tgram = _create(opcode)
# FIXME: validate brick name
tgram.add_string(len(bname), bname)
return tgram
def _parse_set_brick_name(tgram):
tgram.check_status()
def get_device_info(opcode):
return _create(opcode)
def _parse_get_device_info(tgram):
tgram.check_status()
name = tgram.parse_string(15)
a0 = tgram.parse_u8()
a1 = tgram.parse_u8()
a2 = tgram.parse_u8()
a3 = tgram.parse_u8()
a4 = tgram.parse_u8()
a5 = tgram.parse_u8()
a6 = tgram.parse_u8()
# FIXME: what is a6 for?
address = '%02X:%02X:%02X:%02X:%02X:%02X' % (a0, a1, a2,...
signal_strength = tgram.parse_u32()
user_flash = tgram.parse_u32()
return (name, address, signal_strength, user_flash)
def delete_user_flash(opcode):
return _create(opcode)
def _parse_delete_user_flash(tgram):
tgram.check_status()
def poll_command_length(opcode, buf_num):
tgram = _create(opcode)
tgram.add_u8(buf_num)
return tgram
def _parse_poll_command_length(tgram):
buf_num = tgram.parse_u8()
tgram.check_status()
n_bytes = tgram.parse_u8()
return (buf_num, n_bytes)
def poll_command(opcode, buf_num, n_bytes):
tgram = _create(opcode)
tgram.add_u8(buf_num)
tgram.add_u8(n_bytes)
return tgram
def _parse_poll_command(tgram):
buf_num = tgram.parse_u8()
tgram.check_status()
n_bytes = tgram.parse_u8()
command = tgram.parse_string()
return (buf_num, n_bytes, command)
def bluetooth_factory_reset(opcode):
# Note: this command is USB only (no Bluetooth)
return _create(opcode)
def _parse_bluetooth_factory_reset(tgram):
tgram.check_status()
OPCODES = {
0x80: (open_read, _parse_open_read),
0x81: (open_write, _parse_open_write),
0x82: (read, _parse_read),
0x83: (write, _parse_write),
0x84: (close, _parse_close),
0x85: (delete, _parse_delete),
0x86: (find_first, _parse_find),
0x87: (find_next, _parse_find),
0x88: (get_firmware_version, _parse_get_firmware_version),
0x89: (open_write_linear, _parse_open_write),
0x8A: (open_read_linear, _parse_open_read_linear),
0x8B: (open_write_data, _parse_open_write),
0x8C: (open_append_data, _parse_open_append_data),
0x90: (request_first_module, _parse_request_module),
0x91: (request_next_module, _parse_request_module),
0x92: (close_module_handle, _parse_close),
0x94: (read_io_map, _parse_read_io_map),
0x95: (write_io_map, _parse_write_io_map),
0x97: (boot, _parse_boot),
0x98: (set_brick_name, _parse_set_brick_name),
0x9B: (get_device_info, _parse_get_device_info),
0xA0: (delete_user_flash, _parse_delete_user_flash),
0xA1: (poll_command_length, _parse_poll_command_length),
0xA2: (poll_command, _parse_poll_command),
0xA4: (bluetooth_factory_reset, _parse_bluetooth_factory...
}
# nxt.telegram module -- LEGO Mindstorms NXT telegram for...
# Copyright (C) 2006 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
from cStringIO import StringIO
from struct import pack, unpack
import nxt.error
class InvalidReplyError(Exception):
pass
class InvalidOpcodeError(Exception):
pass
class Telegram(object):
TYPE = 0 # type byte offset
CODE = 1 # code byte offset
DATA = 2 # data byte offset
TYPE_NOT_DIRECT = 0x01 # system vs. direct type
TYPE_REPLY = 0x02 # reply type (from NXT brick)
TYPE_REPLY_NOT_REQUIRED = 0x80 # reply not required flag
def __init__(self, direct=False, opcode=0, reply_req=Tru...
if pkt:
self.pkt = StringIO(pkt)
self.typ = self.parse_u8()
self.opcode = self.parse_u8()
if not self.is_reply():
raise InvalidReplyError
if self.opcode != opcode:
raise InvalidOpcodeError, self.opcode
else:
self.pkt = StringIO()
typ = 0
if not direct:
typ |= Telegram.TYPE_NOT_DIRECT
if not reply_req:
typ |= Telegram.TYPE_REPLY_NOT_REQUIRED
self.add_u8(typ)
self.add_u8(opcode)
def __str__(self):
return self.pkt.getvalue()
def is_reply(self):
return self.typ == Telegram.TYPE_REPLY
def add_string(self, n_bytes, v):
self.pkt.write(pack('%ds' % n_bytes, v))
def add_filename(self, fname):
self.pkt.write(pack('20s', fname))
def add_s8(self, v):
self.pkt.write(pack('<b', v))
def add_u8(self, v):
self.pkt.write(pack('<B', v))
def add_s16(self, v):
self.pkt.write(pack('<h', v))
def add_u16(self, v):
self.pkt.write(pack('<H', v))
def add_s32(self, v):
self.pkt.write(pack('<i', v))
def add_u32(self, v):
self.pkt.write(pack('<I', v))
def parse_string(self, n_bytes=0):
if n_bytes:
return unpack('%ss' % n_bytes,
self.pkt.read(n_bytes))[0]
else:
return self.pkt.read()
def parse_s8(self):
return unpack('<b', self.pkt.read(1))[0]
def parse_u8(self):
return unpack('<B', self.pkt.read(1))[0]
def parse_s16(self):
return unpack('<h', self.pkt.read(2))[0]
def parse_u16(self):
return unpack('<H', self.pkt.read(2))[0]
def parse_s32(self):
return unpack('<i', self.pkt.read(4))[0]
def parse_u32(self):
return unpack('<I', self.pkt.read(4))[0]
def check_status(self):
nxt.error.check_status(self.parse_u8())
import nxt.direct
import nxt.system
OPCODES = dict(nxt.system.OPCODES)
OPCODES.update(nxt.direct.OPCODES)
# nxt.usbsock module -- USB socket communication with LEG...
# Copyright (C) 2006-2007 Douglas P Lau
#
# This program is free software; you can redistribute it ...
# it under the terms of the GNU General Public License as...
# the Free Software Foundation; either version 2 of the L...
# (at your option) any later version.
#
# This program is distributed in the hope that it will be...
# but WITHOUT ANY WARRANTY; without even the implied warr...
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. S...
# GNU General Public License for more details.
import usb
from nxt.brick import Brick
ID_VENDOR_LEGO = 0x0694
ID_PRODUCT_NXT = 0x0002
class USBSock(object):
bsize = 60 # USB socket block size
def __init__(self, device):
self.device = device
self.handle = None
self.debug = False
def __str__(self):
return 'USB (%s)' % (self.device.filename)
def connect(self):
config = self.device.configurations[0]
iface = config.interfaces[0][0]
self.blk_out, self.blk_in = iface.endpoints
self.handle = self.device.open()
self.handle.setConfiguration(1)
self.handle.claimInterface(0)
self.handle.reset()
return Brick(self)
def close(self):
self.device = None
self.handle = None
self.blk_out = None
self.blk_in = None
def send(self, data):
if self.debug:
print 'Send:',
print ':'.join('%02x' % ord(c) for c in data)
self.handle.bulkWrite(self.blk_out.address, data)
def recv(self):
data = self.handle.bulkRead(self.blk_in.address, 64)
if self.debug:
print 'Recv:',
print ':'.join('%02x' % (c & 0xFF) for c in data)
# NOTE: bulkRead returns a tuple of ints ... make it sane
return ''.join(chr(d & 0xFF) for d in data)
def find_bricks(host=None, name=None):
# FIXME: probably should check host and name
for bus in usb.busses():
for device in bus.devices:
if device.idVendor == ID_VENDOR_LEGO and \
device.idProduct == ID_PRODUCT_NXT:
yield USBSock(device)
ページ名: