# nxt.__init__ module -- LEGO Mindstorms NXT python package

# Copyright (C) 2006 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

# nxt.bluesock module -- Bluetooth socket communication with LEGO Minstorms NXT

# Copyright (C) 2006-2007 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

import bluetooth import os from nxt.brick import Brick

class BlueSock?(object):

	bsize = 118	# Bluetooth socket block size
	PORT = 1	# Standard NXT rfcomm port
	def __init__(self, host):
		self.host = host
		self.sock = None
		self.debug = False
	def __str__(self):
		return 'Bluetooth (%s)' % self.host
	def connect(self):
		sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
		sock.connect((self.host, BlueSock.PORT))
		self.sock = sock
		return Brick(self)
	def close(self):
		self.sock.close()
	def send(self, data):
		if self.debug:
			print 'Send:',
			print ':'.join('%02x' % ord(c) for c in data)
		l0 = len(data) & 0xFF
		l1 = (len(data) >> 8) & 0xFF
		d = chr(l0) + chr(l1) + data
		self.sock.send(d)
	def recv(self):
		data = self.sock.recv(2)
		l0 = ord(data[0])
		l1 = ord(data[1])
		plen = l0 + (l1 << 8)
		data = self.sock.recv(plen)
		if self.debug:
			print 'Recv:',
			print ':'.join('%02x' % ord(c) for c in data)
		return data

def _check_brick(arg, value):

	return arg is None or arg == value

def find_bricks(host=None, name=None):

	for h, n in bluetooth.discover_devices(lookup_names=True):
		if _check_brick(host, h) and _check_brick(name, n):
			yield BlueSock(h)

# nxt.brick module -- Classes to represent LEGO Mindstorms NXT bricks

# Copyright (C) 2006 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

from time import sleep from nxt.error import FileNotFound?, ModuleNotFound? from nxt.telegram import OPCODES, Telegram

def _make_poller(opcode, poll_func, parse_func):

	def poll(self, *args, **kwargs):
		ogram = poll_func(opcode, *args, **kwargs)
		self.sock.send(str(ogram))
		igram = Telegram(opcode=opcode, pkt=self.sock.recv())
		return parse_func(igram)
	return poll

class _Meta(type):

	'Metaclass which adds one method for each telegram opcode'
	def __init__(cls, name, bases, dict):
		super(_Meta, cls).__init__(name, bases, dict)
		for opcode in OPCODES:
			poll_func, parse_func = OPCODES[opcode]
			m = _make_poller(opcode, poll_func, parse_func)
			setattr(cls, poll_func.__name__, m)

class Brick(object):

	__metaclass__ = _Meta
	def __init__(self, sock):
		self.sock = sock
	def play_tone_and_wait(self, frequency, duration):
		self.play_tone(frequency, duration)
		sleep(duration / 1000.0)

class FileFinder?(object):

	'Context manager to find files on a NXT brick'
	def __init__(self, brick, pattern):
		self.brick = brick
		self.pattern = pattern
		self.handle = None
	def __enter__(self):
		return self
	def __exit__(self, etp, value, tb):
		if self.handle:
			self.brick.close(self.handle)
	def __iter__(self):
		self.handle, fname, size = self.brick.find_first(self.pattern)
		yield (fname, size)
		while True:
			try:
				handle, fname, size = self.brick.find_next(
					self.handle)
				yield (fname, size)
			except FileNotFound:
				break

class FileReader?(object):

	'Context manager to read a file on a NXT brick'
	def __init__(self, brick, fname):
		self.brick = brick
		self.fname = fname
	def __enter__(self):
		self.handle, self.size = self.brick.open_read(self.fname)
		return self
	def __exit__(self, etp, value, tb):
		self.brick.close(self.handle)
	def __iter__(self):
		rem = self.size
		bsize = self.brick.sock.bsize
		while rem > 0:
			handle, bsize, data = self.brick.read(self.handle,
				min(bsize, rem))
			yield data
			rem -= len(data)

class FileWriter?(object):

	'Context manager to write a file to a NXT brick'
	def __init__(self, brick, fname, fil):
		self.brick = brick
		self.fname = fname
		self.fil = fil
		fil.seek(0, 2)	# seek to end of file
		self.size = fil.tell()
		fil.seek(0)	# seek to start of file
	def __enter__(self):
		self.handle = self.brick.open_write(self.fname, self.size)
		return self
	def __exit__(self, etp, value, tb):
		self.brick.close(self.handle)
	def __iter__(self):
		rem = self.size
		bsize = self.brick.sock.bsize
		while rem > 0:
			b = min(bsize, rem)
			handle, size = self.brick.write(self.handle,
				self.fil.read(b))
			yield size
			rem -= b

class ModuleFinder?(object):

	'Context manager to lookup modules on a NXT brick'
	def __init__(self, brick, pattern):
		self.brick = brick
		self.pattern = pattern
		self.handle = None
	def __enter__(self):
		return self
	def __exit__(self, etp, value, tb):
		if self.handle:
			self.brick.close(self.handle)
	def __iter__(self):
		self.handle, mname, mid, msize, miomap_size = \
			self.brick.request_first_module(self.pattern)
		yield (mname, mid, msize, miomap_size)
		while True:
			try:
				handle, mname, mid, msize, miomap_size = \
					self.brick.request_next_module(
					self.handle)
				yield (mname, mid, msize, miomap_size)
			except ModuleNotFound:
				break

# nxt.compass module -- Classes to read Mindsensors Compass sensors

# Copyright (C) 2007 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

import sensor from time import sleep

class Command(object):

	# NOTE: just a namespace (enumeration)
	AUTO_TRIG_ON = (0x41, 0x02)
	AUTO_TRIG_OFF = (0x53, 0x01)
	MAP_HEADING_BYTE = 0x42		# map heading to 0-255 range
	MAP_HEADING_INTEGER = 0x49	# map heading to 0-36000 range
	SAMPLING_50_HZ = 0x45		# set sampling frequency to 50 Hz
	SAMPLING_60_HZ = 0x55		# set sampling frequency to 60 Hz
	SET_ADPA_MODE_ON = 0x4E		# set ADPA mode on
	SET_ADPA_MODE_OFF = 0x4F	# set ADPA mode off
	BEGIN_CALIBRATION = 0x43	# begin calibration
	DONE_CALIBRATION = 0x44		# done with calibration
	LOAD_USER_CALIBRATION = 0x4C	# load user calibration value

# I2C addresses for a Mindsensors CMPS-Nx compass sensor I2C_ADDRESS_CMPS_NX = {

	0x41: ('command', 1, True),
	0x42: ('heading_lsb', 1, False),
	0x43: ('heading_msb', 1, False),
	0x44: ('x_offset_lsb', 1, True),
	0x45: ('x_offset_msb', 1, True),
	0x46: ('y_offset_lsb', 1, True),
	0x47: ('y_offset_msb', 1, True),
	0x48: ('x_range_lsb', 1, True),
	0x49: ('x_range_msb', 1, True),
	0x4A: ('y_range_lsb', 1, True),
	0x4B: ('y_range_msb', 1, True),
	0x4C: ('x_raw_lsb', 1, True),
	0x4D: ('x_raw_msb', 1, True),
	0x4E: ('y_raw_lsb', 1, True),
	0x4F: ('y_raw_msb', 1, True),

}

class _MetaCMPS_Nx(sensor._Meta):

	'Metaclass which adds accessor methods for CMPS-Nx I2C addresses'
	def __init__(cls, name, bases, dict):
		super(_MetaCMPS_Nx, cls).__init__(name, bases, dict)
		for address in I2C_ADDRESS_CMPS_NX:
			name, n_bytes, set_method = I2C_ADDRESS_CMPS_NX[address]
			q = sensor._make_query(address, n_bytes)
			setattr(cls, 'get_' + name, q)
			if set_method:
				c = sensor._make_command(address)
				setattr(cls, 'set_' + name, c)

class CompassSensor?(sensor.DigitalSensor?):

	__metaclass__ = _MetaCMPS_Nx
	def __init__(self, brick, port):
		super(CompassSensor, self).__init__(brick, port)
		self.sensor_type = Type.LOW_SPEED_9V
		self.mode = Mode.RAW
		self.set_input_mode()
		sleep(0.1)	# Give I2C time to initialize

CompassSensor?.get_sample = CompassSensor?.get_heading_lsb

# nxt.direct module -- LEGO Mindstorms NXT direct telegrams

# Copyright (C) 2006-2007 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

def _create(opcode):

	'Create a simple direct telegram'
	from telegram import Telegram
	return Telegram(True, opcode)

def start_program(opcode, fname):

	tgram = _create(opcode)
	tgram.add_filename(fname)
	return tgram

def _parse_simple(tgram):

	tgram.check_status()

def stop_program(opcode):

	return _create(opcode)

def play_sound_file(opcode, loop, fname):

	tgram = _create(opcode)
	tgram.add_u8(loop)
	tgram.add_filename(fname)
	return tgram

def play_tone(opcode, frequency, duration):

	'Play a tone at frequency (Hz) for duration (ms)'
	tgram = _create(opcode)
	tgram.add_u16(frequency)
	tgram.add_u16(duration)
	return tgram

def set_output_state(opcode, port, power, mode, regulation, turn_ratio,

   run_state, tacho_limit):
	tgram = _create(opcode)
	tgram.add_u8(port)
	tgram.add_s8(power)
	tgram.add_u8(mode)
	tgram.add_u8(regulation)
	tgram.add_s8(turn_ratio)
	tgram.add_u8(run_state)
	tgram.add_u32(tacho_limit)
	return tgram

def set_input_mode(opcode, port, sensor_type, sensor_mode):

	tgram = _create(opcode)
	tgram.add_u8(port)
	tgram.add_u8(sensor_type)
	tgram.add_u8(sensor_mode)
	return tgram

def get_output_state(opcode, port):

	tgram = _create(opcode)
	tgram.add_u8(port)
	return tgram

def _parse_get_output_state(tgram):

	tgram.check_status()
	port = tgram.parse_u8()
	power = tgram.parse_s8()
	mode = tgram.parse_u8()
	regulation = tgram.parse_u8()
	turn_ratio = tgram.parse_s8()
	run_state = tgram.parse_u8()
	tacho_limit = tgram.parse_u32()
	tacho_count = tgram.parse_s32()
	block_tacho_count = tgram.parse_s32()
	rotation_count = tgram.parse_s32()
	return (port, power, mode, regulation, turn_ratio, run_state,
		tacho_limit, tacho_count, block_tacho_count, rotation_count)

def get_input_values(opcode, port):

	tgram = _create(opcode)
	tgram.add_u8(port)
	return tgram

def _parse_get_input_values(tgram):

	tgram.check_status()
	port = tgram.parse_u8()
	valid = tgram.parse_u8()
	calibrated = tgram.parse_u8()
	sensor_type = tgram.parse_u8()
	sensor_mode = tgram.parse_u8()
	raw_ad_value = tgram.parse_u16()
	normalized_ad_value = tgram.parse_u16()
	scaled_value = tgram.parse_s16()
	calibrated_value = tgram.parse_s16()
	return (port, valid, calibrated, sensor_type, sensor_mode, raw_ad_value,
		normalized_ad_value, scaled_value, calibrated_value)

def reset_input_scaled_value(opcode, port):

	tgram = _create(opcode)
	tgram.add_u8(port)
	return tgram

def message_write(opcode, inbox, message):

	tgram = _create(opcode)
	tgram.add_u8(inbox)
	tgram.add_u8(len(message) + 1)
	tgram.add_string(len(message), message)
	tgram.add_u8(0)
	return tgram

def reset_motor_position(opcode, port, relative):

	tgram = _create(opcode)
	tgram.add_u8(port)
	tgram.add_u8(relative)
	return tgram

def get_battery_level(opcode):

	return _create(opcode)

def _parse_get_battery_level(tgram):

	tgram.check_status()
	millivolts = tgram.parse_u16()
	return millivolts

def stop_sound_playback(opcode):

	return _create(opcode)

def keep_alive(opcode):

	return _create(opcode)

def _parse_keep_alive(tgram):

	tgram.check_status()
	sleep_time = tgram.parse_u32()
	return sleep_time

def ls_get_status(opcode, port):

	'Get status of low-speed sensor (ultrasonic)'
	tgram = _create(opcode)
	tgram.add_u8(port)
	return tgram

def _parse_ls_get_status(tgram):

	tgram.check_status()
	n_bytes = tgram.parse_u8()
	return n_bytes

def ls_write(opcode, port, tx_data, rx_bytes):

	'Write a low-speed command to a sensor (ultrasonic)'
	tgram = _create(opcode)
	tgram.add_u8(port)
	tgram.add_u8(len(tx_data))
	tgram.add_u8(rx_bytes)
	tgram.add_string(len(tx_data), tx_data)
	return tgram

def ls_read(opcode, port):

	'Read a low-speed sensor value (ultrasonic)'
	tgram = _create(opcode)
	tgram.add_u8(port)
	return tgram

def _parse_ls_read(tgram):

	tgram.check_status()
	n_bytes = tgram.parse_u8()
	contents = tgram.parse_string()
	return contents[:n_bytes]

def get_current_program_name(opcode):

	return _create(opcode)

def _parse_get_current_program_name(tgram):

	tgram.check_status()
	fname = tgram.parse_string()
	return fname

def message_read(opcode, remote_inbox, local_inbox, remove):

	tgram = _create(opcode)
	tgram.add_u8(remote_inbox)
	tgram.add_u8(local_inbox)
	tgram.add_u8(remove)
	return tgram

def _parse_message_read(tgram):

	tgram.check_status()
	local_inbox = tgram.parse_u8()
	n_bytes = tgram.parse_u8()
	message = tgram.parse_string()
	return (local_inbox, message[:n_bytes])

OPCODES = {

	0x00: (start_program, _parse_simple),
	0x01: (stop_program, _parse_simple),
	0x02: (play_sound_file, _parse_simple),
	0x03: (play_tone, _parse_simple),
	0x04: (set_output_state, _parse_simple),
	0x05: (set_input_mode, _parse_simple),
	0x06: (get_output_state, _parse_get_output_state),
	0x07: (get_input_values, _parse_get_input_values),
	0x08: (reset_input_scaled_value, _parse_simple),
	0x09: (message_write, _parse_simple),
	0x0A: (reset_motor_position, _parse_simple),
	0x0B: (get_battery_level, _parse_get_battery_level),
	0x0C: (stop_sound_playback, _parse_simple),
	0x0D: (keep_alive, _parse_keep_alive),
	0x0E: (ls_get_status, _parse_ls_get_status),
	0x0F: (ls_write, _parse_simple),
	0x10: (ls_read, _parse_ls_read),
	0x11: (get_current_program_name, _parse_get_current_program_name),
	0x13: (message_read, _parse_message_read),

}

# nxt.error module -- LEGO Mindstorms NXT error handling

# Copyright (C) 2006,2007 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

class ProtocolError?(Exception):

	pass

class SysProtError?(ProtocolError?):

	pass

class FileExistsError?(SysProtError?):

	pass

class FileNotFound?(SysProtError?):

	pass

class ModuleNotFound?(SysProtError?):

	pass

class DirProtError?(ProtocolError?):

	pass

class I2CError(DirProtError?):

	pass

class I2CPendingError?(I2CError):

	pass

CODES = {

	0x00: None,
	0x20: I2CPendingError('Pending communication transaction in progress'),
	0x40: DirProtError('Specified mailbox queue is empty'),
	0x81: SysProtError('No more handles'),
	0x82: SysProtError('No space'),
	0x83: SysProtError('No more files'),
	0x84: SysProtError('End of file expected'),
	0x85: SysProtError('End of file'),
	0x86: SysProtError('Not a linear file'),
	0x87: FileNotFound('File not found'),
	0x88: SysProtError('Handle already closed'),
	0x89: SysProtError('No linear space'),
	0x8A: SysProtError('Undefined error'),
	0x8B: SysProtError('File is busy'),
	0x8C: SysProtError('No write buffers'),
	0x8D: SysProtError('Append not possible'),
	0x8E: SysProtError('File is full'),
	0x8F: FileExistsError('File exists'),
	0x90: ModuleNotFound('Module not found'),
	0x91: SysProtError('Out of bounds'),
	0x92: SysProtError('Illegal file name'),
	0x93: SysProtError('Illegal handle'),
	0xBD: DirProtError('Request failed (i.e. specified file not found)'),
	0xBE: DirProtError('Unknown command opcode'),
	0xBF: DirProtError('Insane packet'),
	0xC0: DirProtError('Data contains out-of-range values'),
	0xDD: DirProtError('Communication bus error'),
	0xDE: DirProtError('No free memory in communication buffer'),
	0xDF: DirProtError('Specified channel/connection is not valid'),
	0xE0: I2CError('Specified channel/connection not configured or busy'),
	0xEC: DirProtError('No active program'),
	0xED: DirProtError('Illegal size specified'),
	0xEE: DirProtError('Illegal mailbox queue ID specified'),
	0xEF: DirProtError('Attempted to access invalid field of a structure'),
	0xF0: DirProtError('Bad input or output specified'),
	0xFB: DirProtError('Insufficient memory available'),
	0xFF: DirProtError('Bad arguments'),

}

def check_status(status):

	if status:
		ex = CODES.get(status)
		if ex:
			raise ex
		else:
			raise ProtocolError, status

# nxt.locator module -- Locate LEGO Minstorms NXT bricks via USB or Bluetooth

# Copyright (C) 2006-2007 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

class BrickNotFoundError?(Exception):

	pass

def find_bricks(host=None, name=None):

	try:
		import usbsock
		socks = usbsock.find_bricks(host, name)
		for s in socks:
			yield s
	except ImportError:
		pass
	try:
		import bluesock
		from bluetooth import BluetoothError
		try:
			socks = bluesock.find_bricks(host, name)
			for s in socks:
				yield s
		except BluetoothError:
			pass
	except ImportError:
		pass

def find_one_brick(host=None, name=None):

	for s in find_bricks(host, name):
		return s
	raise BrickNotFoundError

# nxt.motor module -- Class to control LEGO Mindstorms NXT motors

# Copyright (C) 2006 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

PORT_A = 0x00 PORT_B = 0x01 PORT_C = 0x02 PORT_ALL = 0xFF

MODE_IDLE = 0x00 MODE_MOTOR_ON = 0x01 MODE_BRAKE = 0x02 MODE_REGULATED = 0x04

REGULATION_IDLE = 0x00 REGULATION_MOTOR_SPEED = 0x01 REGULATION_MOTOR_SYNC = 0x02

RUN_STATE_IDLE = 0x00 RUN_STATE_RAMP_UP = 0x10 RUN_STATE_RUNNING = 0x20 RUN_STATE_RAMP_DOWN = 0x40

LIMIT_RUN_FOREVER = 0

class Motor(object):

	def __init__(self, brick, port):
		self.brick = brick
		self.port = port
		self.power = 0
		self.mode = MODE_IDLE
		self.regulation = REGULATION_IDLE
		self.turn_ratio = 0
		self.run_state = RUN_STATE_IDLE
		self.tacho_limit = LIMIT_RUN_FOREVER
		self.tacho_count = 0
		self.block_tacho_count = 0
		self.rotation_count = 0
	def set_output_state(self):
		self.brick.set_output_state(self.port, self.power, self.mode,
			self.regulation, self.turn_ratio, self.run_state,
			self.tacho_limit)
	def get_output_state(self):
		values = self.brick.get_output_state(self.port)
		(self.port, self.power, self.mode, self.regulation,
			self.turn_ratio, self.run_state, self.tacho_limit,
			tacho_count, block_tacho_count, rotation_count) = values
		return values
	def reset_position(self, relative):
		self.brick.reset_motor_position(self.port, relative)

# nxt.sensor module -- Classes to read LEGO Mindstorms NXT sensors

# Copyright (C) 2006,2007 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

from time import sleep from nxt.error import I2CError, I2CPendingError?

PORT_1 = 0x00 PORT_2 = 0x01 PORT_3 = 0x02 PORT_4 = 0x03

class Type(object):

	# NOTE: just a namespace (enumeration)
	NO_SENSOR = 0x00
	SWITCH = 0x01		# Touch sensor
	TEMPERATURE = 0x02
	REFLECTION = 0x03
	ANGLE = 0x04
	LIGHT_ACTIVE = 0x05	# Light sensor (illuminated)
	LIGHT_INACTIVE = 0x06	# Light sensor (ambient)
	SOUND_DB = 0x07		# Sound sensor (unadjusted)
	SOUND_DBA = 0x08	# Sound sensor (adjusted)
	CUSTOM = 0x09
	LOW_SPEED = 0x0A
	LOW_SPEED_9V = 0x0B	# Low-speed I2C (Ultrasonic sensor)

class Mode(object):

	# NOTE: just a namespace (enumeration)
	RAW = 0x00
	BOOLEAN = 0x20
	TRANSITION_CNT = 0x40
	PERIOD_COUNTER = 0x60
	PCT_FULL_SCALE = 0x80
	CELSIUS = 0xA0
	FAHRENHEIT = 0xC0
	ANGLE_STEPS = 0xE0
	MASK = 0xE0
	MASK_SLOPE = 0x1F	# Why isn't this slope thing documented?

class Sensor(object):

	def __init__(self, brick, port):
		self.brick = brick
		self.port = port
		self.sensor_type = Type.NO_SENSOR
		self.mode = Mode.RAW
	def set_input_mode(self):
		self.brick.set_input_mode(self.port, self.sensor_type,
			self.mode)

class AnalogSensor?(Sensor):

	def __init__(self, brick, port):
		super(AnalogSensor, self).__init__(brick, port)
		self.valid = False
		self.calibrated = False
		self.raw_ad_value = 0
		self.normalized_ad_value = 0
		self.scaled_value = 0
		self.calibrated_value = 0
	def get_input_values(self):
		values = self.brick.get_input_values(self.port)
		(self.port, self.valid, self.calibrated, self.sensor_type,
			self.mode, self.raw_ad_value, self.normalized_ad_value,
			self.scaled_value, self.calibrated_value) = values
		return values
	def reset_input_scaled_value(self):
		self.brick.reset_input_scaled_value()
	def get_sample(self):
		self.get_input_values()
		return self.scaled_value

class TouchSensor?(AnalogSensor?):

	def __init__(self, brick, port):
		super(TouchSensor, self).__init__(brick, port)
		self.sensor_type = Type.SWITCH
		self.mode = Mode.BOOLEAN
		self.set_input_mode()
	def is_pressed(self):
		return bool(self.scaled_value)
	def get_sample(self):
		self.get_input_values()
		return self.is_pressed()

class LightSensor?(AnalogSensor?):

	def __init__(self, brick, port):
		super(LightSensor, self).__init__(brick, port)
		self.set_illuminated(True)
	def set_illuminated(self, active):
		if active:
			self.sensor_type = Type.LIGHT_ACTIVE
		else:
			self.sensor_type = Type.LIGHT_INACTIVE
		self.set_input_mode()

class SoundSensor?(AnalogSensor?):

	def __init__(self, brick, port):
		super(SoundSensor, self).__init__(brick, port)
		self.set_adjusted(True)
	def set_adjusted(self, active):
		if active:
			self.sensor_type = Type.SOUND_DBA
		else:
			self.sensor_type = Type.SOUND_DB
		self.set_input_mode()

I2C_ADDRESS = {

	0x00: ('version', 8),
	0x08: ('product_id', 8),
	0x10: ('sensor_type', 8),
	0x11: ('factory_zero', 1),		# is this really correct?
	0x12: ('factory_scale_factor', 1),
	0x13: ('factory_scale_divisor', 1),
	0x14: ('measurement_units', 1),

}

def _make_query(address, n_bytes):

	def query(self):
		data = self.i2c_query(address, n_bytes)
		if n_bytes == 1:
			return ord(data)
		else:
			return data
	return query

class _Meta(type):

	'Metaclass which adds accessor methods for I2C addresses'
	def __init__(cls, name, bases, dict):
		super(_Meta, cls).__init__(name, bases, dict)
		for address in I2C_ADDRESS:
			name, n_bytes = I2C_ADDRESS[address]
			q = _make_query(address, n_bytes)
			setattr(cls, 'get_' + name, q)

class DigitalSensor?(Sensor):

	__metaclass__ = _Meta
	I2C_DEV = 0x02
	def __init__(self, brick, port):
		super(DigitalSensor, self).__init__(brick, port)
	def _ls_get_status(self, n_bytes):
		for n in range(3):
			try:
				b = self.brick.ls_get_status(self.port)
				if b >= n_bytes:
					return b
			except I2CPendingError:
				sleep(0.01)
		raise I2CError, 'ls_get_status timeout'
	def i2c_command(self, address, value):
		msg = chr(DigitalSensor.I2C_DEV) + chr(address) + chr(value)
		self.brick.ls_write(self.port, msg, 0)
	def i2c_query(self, address, n_bytes):
		msg = chr(DigitalSensor.I2C_DEV) + chr(address)
		self.brick.ls_write(self.port, msg, n_bytes)
		self._ls_get_status(n_bytes)
		data = self.brick.ls_read(self.port)
		if len(data) < n_bytes:
			raise I2CError, 'Read failure'
		return data[-n_bytes:]

class CommandState?(object):

	# NOTE: just a namespace (enumeration)
	OFF = 0x00
	SINGLE_SHOT = 0x01
	CONTINUOUS_MEASUREMENT = 0x02
	EVENT_CAPTURE = 0x03		# Check for ultrasonic interference
	REQUEST_WARM_RESET = 0x04

# I2C addresses for an Ultrasonic sensor I2C_ADDRESS_US = {

	0x40: ('continuous_measurement_interval', 1, True),
	0x41: ('command_state', 1, True),
	0x42: ('measurement_byte_0', 1, False),
	0x43: ('measurement_byte_1', 1, False),
	0x44: ('measurement_byte_2', 1, False),
	0x45: ('measurement_byte_3', 1, False),
	0x46: ('measurement_byte_4', 1, False),
	0x47: ('measurement_byte_5', 1, False),
	0x48: ('measurement_byte_6', 1, False),
	0x49: ('measurement_byte_7', 1, False),
	0x50: ('actual_zero', 1, True),
	0x51: ('actual_scale_factor', 1, True),
	0x52: ('actual_scale_divisor', 1, True),

}

def _make_command(address):

	def command(self, value):
		self.i2c_command(address, value)
	return command

class _MetaUS(_Meta):

	'Metaclass which adds accessor methods for US I2C addresses'
	def __init__(cls, name, bases, dict):
		super(_MetaUS, cls).__init__(name, bases, dict)
		for address in I2C_ADDRESS_US:
			name, n_bytes, set_method = I2C_ADDRESS_US[address]
			q = _make_query(address, n_bytes)
			setattr(cls, 'get_' + name, q)
			if set_method:
				c = _make_command(address)
				setattr(cls, 'set_' + name, c)

class UltrasonicSensor?(DigitalSensor?):

	__metaclass__ = _MetaUS
	def __init__(self, brick, port):
		super(UltrasonicSensor, self).__init__(brick, port)
		self.sensor_type = Type.LOW_SPEED_9V
		self.mode = Mode.RAW
		self.set_input_mode()
		sleep(0.1)	# Give I2C time to initialize
	def get_single_shot_measurement(self):
		self.set_command_state(CommandState.SINGLE_SHOT)
		return self.get_measurement_byte_0()

UltrasonicSensor?.get_sample = UltrasonicSensor?.get_measurement_byte_0

# nxt.system module -- LEGO Mindstorms NXT system telegrams

# Copyright (C) 2006 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

def _create(opcode):

	'Create a simple system telegram'
	from telegram import Telegram
	return Telegram(False, opcode)

def _create_with_file(opcode, fname):

	tgram = _create(opcode)
	tgram.add_filename(fname)
	return tgram

def _create_with_handle(opcode, handle):

	tgram = _create(opcode)
	tgram.add_u8(handle)
	return tgram

def open_read(opcode, fname):

	return _create_with_file(opcode, fname)

def _parse_open_read(tgram):

	tgram.check_status()
	handle = tgram.parse_u8()
	n_bytes = tgram.parse_u32()
	return (handle, n_bytes)

def open_write(opcode, fname, n_bytes):

	tgram = _create_with_file(opcode, fname)
	tgram.add_u32(n_bytes)
	return tgram

def _parse_open_write(tgram):

	tgram.check_status()
	handle = tgram.parse_u8()
	return handle

def read(opcode, handle, n_bytes):

	tgram = _create_with_handle(opcode, handle)
	tgram.add_u16(n_bytes)
	return tgram

def _parse_read(tgram):

	tgram.check_status()
	handle = tgram.parse_u8()
	n_bytes = tgram.parse_u16()
	data = tgram.parse_string()
	return (handle, n_bytes, data)

def write(opcode, handle, data):

	tgram = _create_with_handle(opcode, handle)
	tgram.add_string(len(data), data)
	return tgram

def _parse_write(tgram):

	tgram.check_status()
	handle = tgram.parse_u8()
	n_bytes = tgram.parse_u16()
	return (handle, n_bytes)

def close(opcode, handle):

	return _create_with_handle(opcode, handle)

def _parse_close(tgram):

	tgram.check_status()
	handle = tgram.parse_u8()
	return handle

def delete(opcode, fname):

	return _create_with_file(opcode, fname)

def _parse_delete(tgram):

	tgram.check_status()
	handle = tgram.parse_u8()
	fname = tgram.parse_string()
	return (handle, fname)

def find_first(opcode, fname):

	return _create_with_file(opcode, fname)

def _parse_find(tgram):

	tgram.check_status()
	handle = tgram.parse_u8()
	fname = tgram.parse_string(20)
	n_bytes = tgram.parse_u32()
	return (handle, fname, n_bytes)

def find_next(opcode, handle):

	return _create_with_handle(opcode, handle)

def get_firmware_version(opcode):

	return _create(opcode)

def _parse_get_firmware_version(tgram):

	tgram.check_status()
	prot_minor = tgram.parse_u8()
	prot_major = tgram.parse_u8()
	prot_version = (prot_major, prot_minor)
	fw_minor = tgram.parse_u8()
	fw_major = tgram.parse_u8()
	fw_version = (fw_major, fw_minor)
	return (prot_version, fw_version)

def open_write_linear(opcode, fname, n_bytes):

	tgram = _create_with_file(opcode, fname)
	tgram.add_u32(n_bytes)
	return tgram

def open_read_linear(opcode, fname):

	return _create_with_file(opcode, fname)

def _parse_open_read_linear(tgram):

	tgram.check_status()
	n_bytes = tgram.parse_u32()
	return n_bytes

def open_write_data(opcode, fname, n_bytes):

	tgram = _create_with_file(opcode, fname)
	tgram.add_u32(n_bytes)
	return tgram

def open_append_data(opcode, fname):

	return _create_with_file(opcode, fname)

def _parse_open_append_data(tgram):

	tgram.check_status()
	handle = tgram.parse_u8()
	n_bytes = tgram.parse_u32()
	return (handle, n_bytes)

def request_first_module(opcode, mname):

	return _create_with_file(opcode, mname)

def _parse_request_module(tgram):

	tgram.check_status()
	handle = tgram.parse_u8()
	mname = tgram.parse_string(20)
	mod_id = tgram.parse_u32()
	mod_size = tgram.parse_u32()
	mod_iomap_size = tgram.parse_u16()
	return (handle, mname, mod_id, mod_size, mod_iomap_size)

def request_next_module(opcode, handle):

	return _create_with_handle(opcode, handle)

def close_module_handle(opcode, handle):

	return _create_with_handle(opcode, handle)

def read_io_map(opcode, mod_id, offset, n_bytes):

	tgram = _create(opcode)
	tgram.add_u32(mod_id)
	tgram.add_u16(offset)
	tgram.add_u16(n_bytes)
	return tgram

def _parse_read_io_map(tgram):

	tgram.check_status()
	mod_id = tgram.parse_u32()
	n_bytes = tgram.parse_u16()
	contents = tgram.parse_string()
	return (mod_id, n_bytes, contents)

def write_io_map(opcode, mod_id, offset, content):

	tgram = _create(opcode)
	tgram.add_u32(mod_id)
	tgram.add_u16(offset)
	tgram.add_u16(len(content))
	tgram.add_string(len(content), content)
	return tgram

def _parse_write_io_map(tgram):

	tgram.check_status()
	mod_id = tgram.parse_u32()
	n_bytes = tgram.parse_u16()
	return (mod_id, n_bytes)

def boot(opcode):

	# Note: this command is USB only (no Bluetooth)
	tgram = _create(opcode)
	tgram.add_string(19, "Let's dance: SAMBA\0")
	return tgram

def _parse_boot(tgram):

	tgram.check_status()
	resp = tgram.parse_string()
	# Resp should be 'Yes\0'
	return resp

def set_brick_name(opcode, bname):

	tgram = _create(opcode)
	# FIXME: validate brick name
	tgram.add_string(len(bname), bname)
	return tgram

def _parse_set_brick_name(tgram):

	tgram.check_status()

def get_device_info(opcode):

	return _create(opcode)

def _parse_get_device_info(tgram):

	tgram.check_status()
	name = tgram.parse_string(15)
	a0 = tgram.parse_u8()
	a1 = tgram.parse_u8()
	a2 = tgram.parse_u8()
	a3 = tgram.parse_u8()
	a4 = tgram.parse_u8()
	a5 = tgram.parse_u8()
	a6 = tgram.parse_u8()
	# FIXME: what is a6 for?
	address = '%02X:%02X:%02X:%02X:%02X:%02X' % (a0, a1, a2, a3, a4, a5)
	signal_strength = tgram.parse_u32()
	user_flash = tgram.parse_u32()
	return (name, address, signal_strength, user_flash)

def delete_user_flash(opcode):

	return _create(opcode)

def _parse_delete_user_flash(tgram):

	tgram.check_status()

def poll_command_length(opcode, buf_num):

	tgram = _create(opcode)
	tgram.add_u8(buf_num)
	return tgram

def _parse_poll_command_length(tgram):

	buf_num = tgram.parse_u8()
	tgram.check_status()
	n_bytes = tgram.parse_u8()
	return (buf_num, n_bytes)

def poll_command(opcode, buf_num, n_bytes):

	tgram = _create(opcode)
	tgram.add_u8(buf_num)
	tgram.add_u8(n_bytes)
	return tgram

def _parse_poll_command(tgram):

	buf_num = tgram.parse_u8()
	tgram.check_status()
	n_bytes = tgram.parse_u8()
	command = tgram.parse_string()
	return (buf_num, n_bytes, command)

def bluetooth_factory_reset(opcode):

	# Note: this command is USB only (no Bluetooth)
	return _create(opcode)

def _parse_bluetooth_factory_reset(tgram):

	tgram.check_status()

OPCODES = {

	0x80: (open_read, _parse_open_read),
	0x81: (open_write, _parse_open_write),
	0x82: (read, _parse_read),
	0x83: (write, _parse_write),
	0x84: (close, _parse_close),
	0x85: (delete, _parse_delete),
	0x86: (find_first, _parse_find),
	0x87: (find_next, _parse_find),
	0x88: (get_firmware_version, _parse_get_firmware_version),
	0x89: (open_write_linear, _parse_open_write),
	0x8A: (open_read_linear, _parse_open_read_linear),
	0x8B: (open_write_data, _parse_open_write),
	0x8C: (open_append_data, _parse_open_append_data),
	0x90: (request_first_module, _parse_request_module),
	0x91: (request_next_module, _parse_request_module),
	0x92: (close_module_handle, _parse_close),
	0x94: (read_io_map, _parse_read_io_map),
	0x95: (write_io_map, _parse_write_io_map),
	0x97: (boot, _parse_boot),
	0x98: (set_brick_name, _parse_set_brick_name),
	0x9B: (get_device_info, _parse_get_device_info),
	0xA0: (delete_user_flash, _parse_delete_user_flash),
	0xA1: (poll_command_length, _parse_poll_command_length),
	0xA2: (poll_command, _parse_poll_command),
	0xA4: (bluetooth_factory_reset, _parse_bluetooth_factory_reset),

}

# nxt.telegram module -- LEGO Mindstorms NXT telegram formatting and parsing

# Copyright (C) 2006 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

from cStringIO import StringIO from struct import pack, unpack import nxt.error

class InvalidReplyError?(Exception):

	pass

class InvalidOpcodeError?(Exception):

	pass

class Telegram(object):

	TYPE = 0	# type byte offset
	CODE = 1	# code byte offset
	DATA = 2	# data byte offset
	TYPE_NOT_DIRECT = 0x01		# system vs. direct type
	TYPE_REPLY = 0x02		# reply type (from NXT brick)
	TYPE_REPLY_NOT_REQUIRED = 0x80	# reply not required flag
	def __init__(self, direct=False, opcode=0, reply_req=True, pkt=None):
		if pkt:
			self.pkt = StringIO(pkt)
			self.typ = self.parse_u8()
			self.opcode = self.parse_u8()
			if not self.is_reply():
				raise InvalidReplyError
			if self.opcode != opcode:
				raise InvalidOpcodeError, self.opcode
		else:
			self.pkt = StringIO()
			typ = 0
			if not direct:
				typ |= Telegram.TYPE_NOT_DIRECT
			if not reply_req:
				typ |= Telegram.TYPE_REPLY_NOT_REQUIRED
			self.add_u8(typ)
			self.add_u8(opcode)
	def __str__(self):
		return self.pkt.getvalue()
	def is_reply(self):
		return self.typ == Telegram.TYPE_REPLY
	def add_string(self, n_bytes, v):
		self.pkt.write(pack('%ds' % n_bytes, v))
	def add_filename(self, fname):
		self.pkt.write(pack('20s', fname))
	def add_s8(self, v):
		self.pkt.write(pack('<b', v))
	def add_u8(self, v):
		self.pkt.write(pack('<B', v))
	def add_s16(self, v):
		self.pkt.write(pack('<h', v))
	def add_u16(self, v):
		self.pkt.write(pack('<H', v))
	def add_s32(self, v):
		self.pkt.write(pack('<i', v))
	def add_u32(self, v):
		self.pkt.write(pack('<I', v))
	def parse_string(self, n_bytes=0):
		if n_bytes:
			return unpack('%ss' % n_bytes,
				self.pkt.read(n_bytes))[0]
		else:
			return self.pkt.read()
	def parse_s8(self):
		return unpack('<b', self.pkt.read(1))[0]
	def parse_u8(self):
		return unpack('<B', self.pkt.read(1))[0]
	def parse_s16(self):
		return unpack('<h', self.pkt.read(2))[0]
	def parse_u16(self):
		return unpack('<H', self.pkt.read(2))[0]
	def parse_s32(self):
		return unpack('<i', self.pkt.read(4))[0]
	def parse_u32(self):
		return unpack('<I', self.pkt.read(4))[0]
	def check_status(self):
		nxt.error.check_status(self.parse_u8())

import nxt.direct import nxt.system

OPCODES = dict(nxt.system.OPCODES) OPCODES.update(nxt.direct.OPCODES)

# nxt.usbsock module -- USB socket communication with LEGO Minstorms NXT

# Copyright (C) 2006-2007 Douglas P Lau

#

# This program is free software; you can redistribute it and/or modify

# it under the terms of the GNU General Public License as published by

# the Free Software Foundation; either version 2 of the License, or

# (at your option) any later version.

#

# This program is distributed in the hope that it will be useful,

# but WITHOUT ANY WARRANTY; without even the implied warranty of

# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

# GNU General Public License for more details.

import usb from nxt.brick import Brick

ID_VENDOR_LEGO = 0x0694 ID_PRODUCT_NXT = 0x0002

class USBSock(object):

	bsize = 60	# USB socket block size
	def __init__(self, device):
		self.device = device
		self.handle = None
		self.debug = False
	def __str__(self):
		return 'USB (%s)' % (self.device.filename)
	def connect(self):
		config = self.device.configurations[0]
		iface = config.interfaces[0][0]
		self.blk_out, self.blk_in = iface.endpoints
		self.handle = self.device.open()
		self.handle.setConfiguration(1)
		self.handle.claimInterface(0)
		self.handle.reset()
		return Brick(self)
	def close(self):
		self.device = None
		self.handle = None
		self.blk_out = None
		self.blk_in = None
	def send(self, data):
		if self.debug:
			print 'Send:',
			print ':'.join('%02x' % ord(c) for c in data)
		self.handle.bulkWrite(self.blk_out.address, data)
	def recv(self):
		data = self.handle.bulkRead(self.blk_in.address, 64)
		if self.debug:
			print 'Recv:',
			print ':'.join('%02x' % (c & 0xFF) for c in data)
		# NOTE: bulkRead returns a tuple of ints ... make it sane
		return ''.join(chr(d & 0xFF) for d in data)

def find_bricks(host=None, name=None):

	# FIXME: probably should check host and name
	for bus in usb.busses():
		for device in bus.devices:
			if device.idVendor == ID_VENDOR_LEGO and \
			   device.idProduct == ID_PRODUCT_NXT:
				yield USBSock(device)

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2016-11-01 (火) 16:12:08 (2747d)