diff --git a/arduino/mono_encoder/mono_encoder.ino b/arduino/mono_encoder/mono_encoder.ino new file mode 100644 index 000000000..c1ae5e642 --- /dev/null +++ b/arduino/mono_encoder/mono_encoder.ino @@ -0,0 +1,318 @@ +/* + * mono_encoder.ino + * + * Read one or two single-channel encoders, + * Ticks are counted for each transition + * from open-to-closed and closed-to-open. + * + * For example, a 20 slot optical rotary encoder + * will have be 40 ticks per revolution. + * + * The sketch implements the r/p/c command protocol + * for on-demand sending of encoder value + * and continuous sending with provided delay. + * See the comment in the loop below for details. + * commands are send one per line (ending in '\n') + * 'r' command resets position to zero + * 'p' command sends position immediately + * 'c' command starts/stops continuous mode + * - if it is followed by an integer, + * then use this as the delay in ms + * between readings. + * - if it is not followed by an integer + * then stop continuous mode + * + * Sends the encoder ticks and a timestamp + * as a comma delimited pair: + * "{ticks},{ticksMs}" + + * In a dual encoder setup the second encoder values + * as separated from the first by a semicolon: + * "{ticks},{ticksMs};{ticks},{ticksMs}" + * + * If USE_ENCODER_INTERRUPTS is defined then ticks + * will be maintained with interrupt service routines. + * In this case the pins must be interrupt capable. + * On the Arduino Uno/Nano pins 2 and 3 are interrupt + * capable, so the sketch uses those two pins by default. + * If your board uses different pins then modify the sketch + * before downloading to your microcontroller board. + * The advantage of this mode is that is can a high rate + * of ticks as one would see with a high resolution encoder. + * The disadvantage is that is does a poor job of debouncing, + * so it is not appropriate for mechanical encoders. + * + * If USE_ENCODER_INTERRUPTS is NOT defined, then polling + * will be used to maintain the tick count. The code + * implements a robust debouncing technique. + * The advantage of this mode is the debounce logic; + * it can robustly count ticks even from a noisy + * mechanical encoder like a KY-040 rotary encoder. + * The disadvantage is that this cannot handle high rates + * of ticks as would be delivered by a high resolution optical + * encoder. + * + * The method debounces noisy inputs, + * looking for stable transitions from + * open-to-closed and closed-to-open by polling + * the encoder pin and building a 16 bit value. + * We start by looking for a stable closed-to-open + * transition, which will be a 1 followed by 15 zeroes. + * Then we change to look for the open-to-closed transition, + * which is a zero reading followed by 15 ones. + * + * Given a polling delay of 100 microseconds, the + * minimum time for a stable reading is + * 100 microseconds * 16 bits = 1.6 milliseconds. + * That then would allow for a maximum read rate of + * 1000 / 1.6 = 625 ticks per second. + * For high resolution encoders, that may be too slow. + * In that case, reduce the POLL_DELAY_MICROS to + * suit your encoder and use case. + */ +#include + +#define ENCODER_PIN (2) // input pin for first encoder +#define ENCODER_2_PIN (3) // input pin for second encoder +#define POLL_DELAY_MICROS (100UL) // microseconds between polls +#define USE_ENCODER_INTERRUPTS (1) // if defined then use interrupts to read ticks + // otherwise use polling. + +// +// state for an encoder pin +// +struct EncoderState { + int pin; // gpio pin number + volatile long ticks; // current tick count; do NOT read this directly, use readEncoders() + uint16_t readings; // history of last 16 readings + uint16_t transition; // value of last 16 readings for next stable state transition + unsigned long pollAtMicros; // time of next poll +}; + +// list of encoders +static EncoderState encoders[] = { + {ENCODER_PIN, 0L, 0, 0x8000, 0UL}, + #ifdef ENCODER_2_PIN + {ENCODER_2_PIN, 0L, 0, 0x8000, 0UL}, + #endif +}; + +#define ENCODER_COUNT (sizeof(encoders) / sizeof(EncoderState)) + +boolean continuous = false; // true to send continuously, false to only send on demand +unsigned long sendAtMs = 0; // next send time in continuous mode +unsigned long delayMs = 0; // time between sends in continuous mode + +#ifdef USE_ENCODER_INTERRUPTS + typedef void (*gpio_isr_type)(); + + // + // count the interrupts + // + void encode_0() { + ++encoders[0].ticks; + } + + #ifdef ENCODER_2_PIN + void encode_1() { + ++encoders[1].ticks; + } + #endif + + gpio_isr_type _isr_routines[ENCODER_COUNT] = { + encode_0, + #ifdef ENCODER_2_PIN + encode_1 + #endif + }; +#endif + + + +// +// Poll the encoder and increment ticks +// on stable transitions. +// +void pollEncoder(EncoderState &encoder) { + #ifdef USE_ENCODER_INTERRUPTS + if (NOT_AN_INTERRUPT == digitalPinToInterrupt(encoder.pin)) { + Serial.print("Pin "); + Serial.print(encoder.pin); + Serial.println(" must be an interrupt capable pin when compiled with USE_ENCODER_INTERRUPTS"); + } + #else + unsigned long nowMicros = micros(); + if (nowMicros >= encoder.pollAtMicros) { + // + // shift state left and add new reading + // + encoder.readings = (encoder.readings << 1) | digitalRead(encoder.pin); + + // + // if readings match target transition + // then count the ticks and + // start looking for the next target transion + // + if (encoder.readings == encoder.transition) { + encoder.ticks += 1; + encoder.transition = ~encoder.transition; // invert transition + } + + encoder.pollAtMicros = nowMicros + POLL_DELAY_MICROS; + } + #endif +} + + +void pollEncoders() { + for(uint8_t i = 0; i < ENCODER_COUNT; i += 1) { + pollEncoder(encoders[i]); + } +} + +// +// reset tick counters to zero +// +void resetEncoders() { + // turn off interrupts so we can + // write mutable shared state atomically + #ifdef USE_ENCODER_INTERRUPTS + noInterrupts(); + #endif + + for(uint8_t i = 0; i < ENCODER_COUNT; i += 1) { + encoders[i].ticks = 0; + } + + // turn back on interrupts + #ifdef USE_ENCODER_INTERRUPTS + interrupts(); + #endif +} + + +void readEncoders(long (&ticks)[ENCODER_COUNT]) { + // turn off interrupts so we can + // read mutable shared state atomically + #ifdef USE_ENCODER_INTERRUPTS + noInterrupts(); + #endif + + for(uint8_t i = 0; i < ENCODER_COUNT; i += 1) { + ticks[i] = encoders[i].ticks; + } + + // turn back on interrupts + #ifdef USE_ENCODER_INTERRUPTS + interrupts(); + #endif +} + + +// +// write ticks to serial port +// as a string tuple with ticks and tickMs separate by a comma +// +// "{ticks},{ticksMs}" +// +void writeTicks(unsigned long ticksMs) { + long ticks[ENCODER_COUNT]; + readEncoders(ticks); + + Serial.print(ticks[0]); + Serial.print(','); + Serial.print(ticksMs); + for(uint8_t i = 1; i < ENCODER_COUNT; i += 1) { + Serial.print(";"); + Serial.print(ticks[i]); + Serial.print(','); + Serial.print(ticksMs); + } + Serial.println(""); +} + + +void setup() { + // set all encoder pins to inputs + for(uint8_t i = 0; i < ENCODER_COUNT; i += 1) { + pinMode(encoders[i].pin, INPUT); + #ifdef USE_ENCODER_INTERRUPTS + attachInterrupt(digitalPinToInterrupt(encoders[i].pin), _isr_routines[i], FALLING); + #endif + } + Serial.begin(115200); +} + + +void loop() { + // + // poll each encoder's ticks + // + pollEncoders(); + unsigned long ticksMs = millis(); + + // + // commands are send one per line (ending in '\n') + // 'r' command resets position to zero + // 'p' command sends position immediately + // 'c' command starts/stops continuous mode + // - if it is followed by an integer, + // then use this as the delay in ms + // between readings. + // - if it is not followed by an integer + // then stop continuous mode + // + if (Serial.available() > 0) { + String newcommand = Serial.readStringUntil('\n'); + newcommand.trim(); + + if (newcommand == "r") { + // + // reset tick counters to zero + // + resetEncoders(); + } + else if (newcommand == "p") { + // + // send current ticks immediately + // + writeTicks(ticksMs); + } + else if (newcommand.startsWith("c")) { + // + // continous mode start or stop + // + if (1 == newcommand.length()) { + // + // stop continuous mode + // + continuous = false; + } else { + // + // convert characters after 'c' to an integer + // and use this as the delay in milliseconds + // + String intString = newcommand.substring(1); + if (isDigit(intString.charAt(0))) { + delayMs = (unsigned long)intString.toInt(); + continuous = true; + } + } + } else { + // unknown command + } + } + + // + // we are in continuous mode, + // then send the ticks when + // the delay expires + // + if (continuous) { + if (ticksMs >= sendAtMs) { + sendAtMs = ticksMs + delayMs; + writeTicks(ticksMs); + } + } +} + diff --git a/arduino/quadrature_encoder/quadrature_encoder.ino b/arduino/quadrature_encoder/quadrature_encoder.ino new file mode 100644 index 000000000..7a3221c4a --- /dev/null +++ b/arduino/quadrature_encoder/quadrature_encoder.ino @@ -0,0 +1,363 @@ +/* + * quadrature_encoder.ino + * + * This does not use any libraries. + * + * Note that use of interrupt capable pins will increase + * performance (see `Hardware Requirements` section in + * the above link). The Arduino Uno/Nano and other + * microcontrollers based on the AtMega 328 chip + * have only two available pins that are interrupt + * capable; on the Uno/Nano they are pins D2 and D3. + * The defaults in the sketch use one of these for each + * of two encoders if `DUAL_ENCODERS` is defined, otherwise + * it uses both for a single encoder. + * If you have a different microcontroller + * then see your datasheet for which pins are interrupt + * capable. + * + * The sketch implements the r/p/c command protocol + * for on-demand sending of encoder value + * and continuous sending with provided delay. + * See the comment in the loop below for details. + * commands are send one per line (ending in '\n') + * 'r' command resets position to zero + * 'p' command sends position immediately + * 'c' command starts/stops continuous mode + * - if it is followed by an integer, + * then use this as the delay in ms + * between readings. + * - if it is not followed by an integer + * then stop continuous mode + * + * Sends the encoder ticks and a timestamp + * as a comma delimited pair: ticks,timeMs + * In a dual encoder setup the second encoder values + * as separated from the first by a semicolon: ticks,timeMs;ticks,timeMs + * + */ +#include + +#define USE_ENCODER_INTERRUPTS +#define DUAL_ENCODERS + +#ifdef DUAL_ENCODERS + // + // Note that if this is for a differential drive configuration + // then the encoders are generally oriented oppositely, so when + // one is turning clockwise the other is turning counter-clockwise. + // Therefore we want the data and clock pins to be hooked up oppositely + // for each encoder. This can be achieved by reversing the connections + // on the right encoder, so that the data pin of the encoder is + // assigned the ENCODER_2_PIN_A and the clock pin of the encoder + // is assigned to ENCODER_2_PIN_B. + // + #define ENCODER_PIN_A (2) // clock input pin for first encoder (left wheel) + #define ENCODER_PIN_B (4) // data input pin for first encoder + #define ENCODER_2_PIN_A (3) // input pin for second encoder (right wheel, see above) + #define ENCODER_2_PIN_B (5) // input pin for second encoder +#else + #define ENCODER_PIN_A (2) // clock input pin for first encoder + #define ENCODER_PIN_B (3) // data input pin for first encoder +#endif + +#define POLL_DELAY_MICROS (100UL) // microseconds between polls + +// +// state for an encoder pin +// +struct EncoderState { + uint8_t pin_clock; // CLOCK pin + uint8_t pin_data; // DATA pin + uint8_t transition; // last two (from and to) quadrature cycle steps as nibble bit field + int8_t transitions; // count of valid quadrature cycle transitions + volatile long ticks; // current tick count (count of full cycles) +}; + + +// list of encoders +static EncoderState encoders[] = { + {ENCODER_PIN_A, ENCODER_PIN_B, 0x03, 0, 0L}, + #ifdef ENCODER_2_PIN_A + {ENCODER_2_PIN_A, ENCODER_2_PIN_B, 0x03, 0, 0L}, + #endif +}; + +#define ENCODER_COUNT (sizeof(encoders) / sizeof(EncoderState)) + +boolean continuous = false; // true to send continuously, false to only send on demand +unsigned long sendAtMs = 0; // next send time in continuous mode +unsigned long delayMs = 0; // time between sends in continuous mode + + +#ifdef USE_ENCODER_INTERRUPTS + typedef void (*gpio_isr_type)(); + + // + // count the interrupts. + // When clock is low (we interrupt on high to low transition): + // - if data pin is high, this is aclockwise turn. + // - if data pin is low, this is a counter-clockwise turn. + // + void encode_0() { + if (digitalRead(encoders[0].pin_data)) { + encoders[0].ticks += 1; + } else { + encoders[0].ticks -= 1; + } + } + + #ifdef ENCODER_2_PIN_A + void encode_1() { + if (digitalRead(encoders[1].pin_data)) { + encoders[1].ticks += 1; + } else { + encoders[1].ticks -= 1; + } + } + #endif + + gpio_isr_type _isr_routines[] = { + encode_0, + #ifdef ENCODER_2_PIN_A + encode_1 + #endif + }; +#endif + + +// +// Poll the encoder and increment ticks +// on stable transitions. +// +// The method debounces noisy inputs, +// looking for valid quadrature transitions +// on the two encoder pins. A full quadrature +// clockwise cycle is: +// +// step 0: clk = 1, data = 1 +// step 1: clk = 0, data = 1 +// step 2: clk = 0, data = 0 +// step 3: clk = 1, data = 0 +// step 0: clk = 1, data = 1 +// +// We can count the transition between steps as +1. +// There are 4 transitions between steps, so a +// full quadrature cycle has +4 transitions. +// +// So when we are on step 0 we can check the +// transition count and if it is 4, then +// we have gone clockwise one full cycle (one tick), +// so we add one to the total ticks. +// +// If the transition count is not 4, then there +// has been one or more invalid transitions +// caused by switch bouncing. In this case +// we do not count the tick. +// +// In anycase we clear the transition count +// so we can track the next quadrature cycle. +// +// A counter clockwise cycle would go in reverse, +// and we would count each counter clockwise +// transition as -1. If we get to step 0 and +// the transition count is -4, then we have +// gone one full quadrature cycle counter-clockwise, +// so we subtract one tick form the total. +// +void pollEncoder(EncoderState &encoder) { + #ifdef USE_ENCODER_INTERRUPTS + if (NOT_AN_INTERRUPT == digitalPinToInterrupt(encoder.pin_clock)) { + Serial.print("Pin "); + Serial.print(encoder.pin_clock); + Serial.println(" must be an interrupt capable pin when compiled with USE_ENCODER_INTERRUPTS"); + } + #else + static int8_t TRANSITION_DIRECTION[] = {0,-1,1,0,1,0,0,-1,-1,0,0,1,0,1,-1,0}; + + uint8_t current_state = (digitalRead(encoder.pin_clock) << 1) | digitalRead(encoder.pin_data); + + // + // encoder from(clk | data) and to(clk | data) as a nibble + // and use it to look up the direction of that transition + // + encoder.transition = ((encoder.transition & 0x03) << 2) | current_state; + encoder.transitions += TRANSITION_DIRECTION[encoder.transition]; + + // + // When clock and data pins are high, + // then we are at the end of one quadrature + // cycle and at the beginning of the next. + // If we had 4 valid transitions, count the tick, + // otherwise there was an invalid transition (a bounce), + // so we do not count the tick. + // + if (0x03 == current_state) { + if (4 == encoder.transitions) { + encoder.ticks += 1; // we went 1 cycle clockwise + } else if (-4 == encoder.transitions) { + encoder.ticks -= 1; + } + encoder.transitions = 0; + } + #endif +} + + +void pollEncoders() { + for(uint8_t i = 0; i < ENCODER_COUNT; i += 1) { + pollEncoder(encoders[i]); + } +} + +// +// reset tick counters to zero +// +void resetEncoders() { + // turn off interrupts so we can + // write mutable shared state atomically + #ifdef USE_ENCODER_INTERRUPTS + noInterrupts(); + #endif + + for(uint8_t i = 0; i < ENCODER_COUNT; i += 1) { + encoders[i].ticks = 0; + } + + // turn back on interrupts + #ifdef USE_ENCODER_INTERRUPTS + interrupts(); + #endif +} + +void readEncoders(long ticks[ENCODER_COUNT]) { + // turn off interrupts so we can + // read mutable shared state atomically + #ifdef USE_ENCODER_INTERRUPTS + noInterrupts(); + #endif + + for(uint8_t i = 0; i < ENCODER_COUNT; i += 1) { + ticks[i] = encoders[i].ticks; + } + + // turn back on interrupts + #ifdef USE_ENCODER_INTERRUPTS + interrupts(); + #endif +} + + +// +// write ticks to serial port +// as a string tuple with ticks and tickMs separate by a comma +// +// "{ticks},{ticksMs}" +// +void writeTicks(unsigned long ticksMs) { + long ticks[ENCODER_COUNT]; + readEncoders(ticks); + + Serial.print(ticks[0]); + Serial.print(','); + Serial.print(ticksMs); + for(uint8_t i = 1; i < ENCODER_COUNT; i += 1) { + Serial.print(";"); + Serial.print(ticks[i]); + Serial.print(','); + Serial.print(ticksMs); + } + Serial.println(""); +} + + +void setup() { + // set all encoder pins to inputs + for(uint8_t i = 0; i < ENCODER_COUNT; i += 1) { + pinMode(encoders[i].pin_clock, INPUT); + pinMode(encoders[i].pin_data, INPUT); + #ifdef USE_ENCODER_INTERRUPTS + // + // if the pin is interrupt capable, then use interrupts + // + if (NOT_AN_INTERRUPT != digitalPinToInterrupt(encoders[i].pin_clock)) { + attachInterrupt(digitalPinToInterrupt(encoders[i].pin_clock), _isr_routines[i], FALLING); + } + #endif + } + + Serial.begin(115200); +} + + +void loop() { + // + // poll each encoder's ticks + // + pollEncoders(); + unsigned long ticksMs = millis(); + + // + // commands are send one per line (ending in '\n') + // 'r' command resets position to zero + // 'p' command sends position immediately + // 'c' command starts/stops continuous mode + // - if it is followed by an integer, + // then use this as the delay in ms + // between readings. + // - if it is not followed by an integer + // then stop continuous mode + // + if (Serial.available() > 0) { + String newcommand = Serial.readStringUntil('\n'); + newcommand.trim(); + + if (newcommand == "r") { + // + // reset tick counters to zero + // + resetEncoders(); + } + else if (newcommand == "p") { + // + // send current ticks immediately + // + writeTicks(ticksMs); + } + else if (newcommand.startsWith("c")) { + // + // continous mode start or stop + // + if (1 == newcommand.length()) { + // + // stop continuous mode + // + continuous = false; + } else { + // + // convert characters after 'c' to an integer + // and use this as the delay in milliseconds + // + String intString = newcommand.substring(1); + if (isDigit(intString.charAt(0))) { + delayMs = (unsigned long)intString.toInt(); + continuous = true; + } + } + } else { + // unknown command + } + } + + // + // we are in continuous mode, + // then send the ticks when + // the delay expires + // + if (continuous) { + if (ticksMs >= sendAtMs) { + sendAtMs = ticksMs + delayMs; + writeTicks(ticksMs); + } + } +} diff --git a/donkeycar/la.py b/donkeycar/la.py index fd782c511..25096bfa3 100755 --- a/donkeycar/la.py +++ b/donkeycar/la.py @@ -143,7 +143,8 @@ def normalize(self): def normalized(self): m = self.mag() v = Vec3(self.x, self.y, self.z) - v.scale(1.0 / m) + if m > 0: + v.scale(1.0 / m) return v def subtract(self, v): @@ -619,4 +620,4 @@ def __init__(self, a, b): def vector_to(self, p): delta = self.origin - p dot = delta.dot(self.dir) - return self.dir.scaled(dot) - delta \ No newline at end of file + return self.dir.scaled(dot) - delta diff --git a/donkeycar/parts/actuator.py b/donkeycar/parts/actuator.py index dd1ac3128..fdf761a74 100644 --- a/donkeycar/parts/actuator.py +++ b/donkeycar/parts/actuator.py @@ -293,6 +293,10 @@ def __init__(self, controller, left_pulse, right_pulse): set_pulse = getattr(controller, "set_pulse", None) if set_pulse is None or not callable(set_pulse): raise ValueError("controller must have a set_pulse method") + if not utils.is_number_type(left_pulse): + raise ValueError("left_pulse must be a number") + if not utils.is_number_type(right_pulse): + raise ValueError("right_pulse must be a number") self.controller = controller self.left_pulse = left_pulse @@ -950,11 +954,14 @@ class RPi_GPIO_Servo(object): ''' Servo controlled from the gpio pins on Rpi ''' - def __init__(self, pin, pin_scheme=None, freq = 50, min=5.0, max=7.8): + def __init__(self, pin, pin_scheme=None, freq=50, min=5.0, max=7.8): self.pin = pin - GPIO.setmode(GPIO.BCM if pin_scheme is None else pin_scheme) + if pin_scheme is None: + pin_scheme = GPIO.BCM + GPIO.setmode(pin_scheme) GPIO.setup(self.pin, GPIO.OUT) - + + self.throttle = 0 self.pwm = GPIO.PWM(self.pin, freq) self.pwm.start(0) self.min = min @@ -965,12 +972,11 @@ def run(self, pulse): Update the speed of the motor where 1 is full forward and -1 is full backwards. ''' - #I've read 90 is a good max + # I've read 90 is a good max self.throttle = dk.map_frange(pulse, -1.0, 1.0, self.min, self.max) - #logger.debug(pulse, self.throttle) + # logger.debug(pulse, self.throttle) self.pwm.ChangeDutyCycle(self.throttle) - def shutdown(self): self.pwm.stop() GPIO.cleanup() diff --git a/donkeycar/parts/encoder.py b/donkeycar/parts/encoder.py index c1c158750..7919f0e4a 100644 --- a/donkeycar/parts/encoder.py +++ b/donkeycar/parts/encoder.py @@ -1,8 +1,11 @@ """ +Deprecated in favor on donkeycar.parts.Tachometer and donkeycar.parts.Odometer. + Encoders and odometry """ from datetime import datetime +from donkeycar.utilities.deprecated import deprecated import re import time @@ -18,6 +21,7 @@ # This samples the odometer at 10HZ and does a moving average over the past ten readings to derive a velocity +@deprecated("Deprecated in favor donkeycar.parts.tachometer.Tachometer(SerialEncoder)") class ArduinoEncoder(object): def __init__(self, mm_per_tick=0.0000599, debug=False): import serial @@ -60,6 +64,8 @@ def shutdown(self): self.on = False time.sleep(.5) + +@deprecated("Deprecated as unused") class AStarSpeed: def __init__(self): from donkeycar.parts.teensy import TeensyRCin @@ -118,6 +124,7 @@ def shutdown(self): time.sleep(.5) +@deprecated("Deprecated in favor of donkeycar.parts.tachometer.Tachometer(GpioEncoder)") class RotaryEncoder(): def __init__(self, mm_per_tick=0.306096, pin=13, poll_delay=0.0166, debug=False): import pigpio diff --git a/donkeycar/parts/explode.py b/donkeycar/parts/explode.py index fb4926659..7a3d784fd 100644 --- a/donkeycar/parts/explode.py +++ b/donkeycar/parts/explode.py @@ -1,9 +1,13 @@ # -# part that executes a no-arg method when a button is clicked +# part that explodes a dictionary argument into individually named arguments # class ExplodeDict: + """ + part that expands a dictionary input argument + into individually named output arguments + """ def __init__(self, memory, output_prefix = ""): """ Break a map into key/value pairs and write diff --git a/donkeycar/parts/kinematics.py b/donkeycar/parts/kinematics.py index a6e13a4b8..360c740de 100644 --- a/donkeycar/parts/kinematics.py +++ b/donkeycar/parts/kinematics.py @@ -1,12 +1,616 @@ import logging +import math +import time from typing import Tuple -from donkeycar.utils import is_number_type, clamp +from donkeycar.utils import compare_to, sign, is_number_type, clamp logger = logging.getLogger(__name__) -def differential_steering(throttle: float, steering: float, steering_zero: float = 0.01) -> Tuple[float, float]: +def limit_angle(angle:float): + """ + limit angle to pi to -pi radians (one full circle) + """ + return math.atan2(math.sin(angle), math.cos(angle)) + # twopi = math.pi * 2 + # while(angle > math.pi): + # angle -= twopi + # while(angle < -math.pi): + # angle += twopi + # return angle + + +class Pose2D: + def __init__(self, x:float=0.0, y:float=0.0, angle:float=0.0) -> None: + self.x = x + self.y = y + self.angle = angle + + +class Bicycle: + """ + Bicycle forward kinematics for a car-like vehicle (Ackerman steering), + using the point midway between the front wheels as a reference point, + takes the steering angle in radians and output of the odometer + and turns those into: + - forward distance and velocity of the reference point between the front wheels, + - pose; angle aligned (x,y) position and orientation in radians + - pose velocity; change in angle aligned position and orientation per second + @param wheel_base: distance between the front and back wheels + + NOTE: this version uses the point midway between the front wheels + as the point of reference. + see https://thef1clan.com/2020/09/21/vehicle-dynamics-the-kinematic-bicycle-model/ + """ + def __init__(self, wheel_base:float, debug=False): + self.wheel_base:float = wheel_base + self.debug = debug + self.timestamp:float = 0 + self.forward_distance:float = 0 + self.forward_velocity:float = 0 + self.steering_angle = None + self.pose = Pose2D() + self.pose_velocity = Pose2D() + self.running:bool = True + + def run(self, forward_distance:float, steering_angle:float, timestamp:float=None) -> Tuple[float, float, float, float, float, float, float, float, float]: + """ + params + forward_distance: distance the reference point between the + front wheels has travelled + steering_angle: angle in radians of the front 'wheel' from forward. + In this case left is positive, right is negative, + and directly forward is zero. + timestamp: time of distance readings or None to use current time + returns + distance + velocity + x is horizontal position of reference point midway between front wheels + y is vertical position of reference point midway between front wheels + angle is orientation in radians of the vehicle along it's wheel base + (along the line between the reference points midway between + the front wheels and and midway between the back wheels) + x' is the horizontal velocity (rate of change of reference point along horizontal axis) + y' is the vertical velocity (rate of change of reference point along vertical axis) + angle' is the angular velocity (rate of change of orientation) + timestamp + + """ + if timestamp is None: + timestamp = time.time() + + steering_angle = limit_angle(steering_angle) + + if self.running: + if 0 == self.timestamp: + self.forward_distance = 0 + self.forward_velocity=0 + self.steering_angle = steering_angle + self.pose = Pose2D() + self.pose_velocity = Pose2D() + self.timestamp = timestamp + elif timestamp > self.timestamp: + # + # changes from last run + # + delta_time = timestamp - self.timestamp + delta_distance = forward_distance - self.forward_distance + delta_steering_angle = steering_angle - self.steering_angle + + # + # new position and orientation + # assumes delta_time is small and so assumes movement is linear + # + # x, y, angle = update_bicycle_front_wheel_pose( + # self.pose, + # self.wheel_base, + # self.steering_angle + delta_steering_angle / 2, + # delta_distance) + # + # # + # # update velocities + # # + # forward_velocity = delta_distance / delta_time + # self.pose_velocity.angle = (angle - self.pose.angle) / delta_time + # self.pose_velocity.x = (x - self.pose.x) / delta_time + # self.pose_velocity.y = (y - self.pose.y) / delta_time + # # + # # update pose + # # + # self.pose.x = x + # self.pose.y = y + # self.pose.angle = angle + + # + # new velocities + # + forward_velocity = delta_distance / delta_time + self.pose_velocity.angle = bicycle_angular_velocity(self.wheel_base, forward_velocity, steering_angle) + delta_angle = self.pose_velocity.angle * delta_time + estimated_angle = limit_angle(self.pose.angle + delta_angle / 2) + self.pose_velocity.x = forward_velocity * math.cos(estimated_angle) + self.pose_velocity.y = forward_velocity * math.sin(estimated_angle) + + # + # new pose + # + self.pose.x = self.pose.x + self.pose_velocity.x * delta_time + self.pose.y = self.pose.y + self.pose_velocity.y * delta_time + self.pose.angle = limit_angle(self.pose.angle + delta_angle) + + self.steering_angle = steering_angle + + # + # update odometry + # + self.forward_distance = forward_distance + self.forward_velocity = forward_velocity + + self.timestamp = timestamp + + result = ( + self.forward_distance, + self.forward_velocity, + self.pose.x, self.pose.y, self.pose.angle, + self.pose_velocity.x, self.pose_velocity.y, self.pose_velocity.angle, + self.timestamp + ) + if self.debug: + logger.info(result) + return result + + return 0, 0, 0, 0, 0, 0, 0, 0, self.timestamp + + def shutdown(self): + self.running = False + + +class InverseBicycle: + """ + Bicycle inverse kinematics for a car-like vehicle (Ackerman steering) + takes the forward velocity and the angular velocity in radians/second + and converts these to: + - forward velocity (pass through), + - steering angle in radians + @param wheel_base: distance between the front and back wheels + + NOTE: this version uses the point midway between the rear wheels + as the point of reference. + see https://thef1clan.com/2020/09/21/vehicle-dynamics-the-kinematic-bicycle-model/ + """ + def __init__(self, wheel_base:float, debug=False): + self.wheel_base:float = wheel_base + self.debug = debug + self.timestamp:float = 0 + + def run(self, forward_velocity:float, angular_velocity:float, timestamp:float=None) -> Tuple[float, float, float]: + """ + @param forward_velocity:float in meters per second + @param angular_velocity:float in radians per second + @return tuple + - forward_velocity:float in meters per second (basically a pass through) + - steering_angle:float in radians + - timestamp:float + """ + if timestamp is None: + timestamp = time.time() + + """ + derivation from bicycle model: + angular_velocity = forward_velocity * math.tan(steering_angle) / self.wheel_base + math.tan(steering_angle) = angular_velocity * self.wheel_base / forward_velocity + steering_angle = math.atan(angular_velocity * self.wheel_base / forward_velocity) + """ + steering_angle = bicycle_steering_angle(self.wheel_base, forward_velocity, angular_velocity) + self.timestamp = timestamp + + return forward_velocity, steering_angle, timestamp + + +def update_bicycle_front_wheel_pose(front_wheel, wheel_base, steering_angle, distance): + """ + Calculates the ending position of the front wheel of a bicycle kinematics model. + This is expected to be called at a high rate such that we can model the + the travel as a line rather than an arc. + + Arguments: + front_wheel -- starting pose at front wheel as tuple of (x, y, angle) where + x -- initial x-coordinate of the front wheel (float) + y -- initial y-coordinate of the front wheel (float) + angle -- initial orientation of the vehicle along it's wheel base (in radians) (float) + wheel_base -- length of the wheel base (float) + steering_angle -- steering angle (in radians) (float) + distance -- distance travelled by the vehicle (float) + + Returns: + A tuple (x_f, y_f, theta_f) representing the ending position and orientation of the front wheel. + x_f -- ending x-coordinate of the front wheel (float) + y_f -- ending y-coordinate of the front wheel (float) + theta_f -- ending orientation of the vehicle (in radians) (float) + """ + if distance == 0: + return front_wheel + + if steering_angle == 0: + x = front_wheel.x + distance * math.cos(front_wheel.angle) + y = front_wheel.y + distance * math.sin(front_wheel.angle) + theta = front_wheel.angle + else: + theta = limit_angle(front_wheel.angle + math.tan(steering_angle) * distance / wheel_base) + x = front_wheel.x + distance * math.cos(theta) + y = front_wheel.y + distance * math.sin(theta) + return x, y, theta + + +def bicycle_steering_angle(wheel_base:float, forward_velocity:float, angular_velocity:float) -> float: + """ + Calculate bicycle steering for the vehicle from the angular velocity. + For car-like vehicles, calculate the angular velocity using + the bicycle model and the measured max forward velocity and max steering angle. + """ + # + # derivation from bicycle model: + # angular_velocity = forward_velocity * math.tan(steering_angle) / self.wheel_base + # math.tan(steering_angle) = angular_velocity * self.wheel_base / forward_velocity + # steering_angle = math.atan(angular_velocity * self.wheel_base / forward_velocity) + # + # return math.atan(angular_velocity * wheel_base / forward_velocity) + return limit_angle(math.asin(angular_velocity * wheel_base / forward_velocity)) + + +def bicycle_angular_velocity(wheel_base:float, forward_velocity:float, steering_angle:float) -> float: + """ + Calculate angular velocity for the vehicle from the bicycle steering angle. + For car-like vehicles, calculate the angular velocity using + the bicycle model and the measured max forward velocity and max steering angle. + """ + # + # for car-like (bicycle model) vehicle, for the back axle: + # angular_velocity = forward_velocity * math.tan(steering_angle) / wheel_base if velocity is from rear wheels + # angular_velocity = forward_velocity * math.tan(steering_angle) / wheel_base if velocity is from front wheels + # + # return forward_velocity * math.tan(steering_angle) / wheel_base # velocity for rear wheel + return forward_velocity * math.sin(steering_angle) / wheel_base # velocity of front wheel + + +class BicycleNormalizeAngularVelocity: + """ + For a car-like vehicle, convert an angular velocity in radians per second + to a value between -1 and 1 inclusive. + """ + def __init__(self, wheel_base:float, max_forward_velocity:float, max_steering_angle:float) -> None: + self.max_angular_velocity = bicycle_angular_velocity(wheel_base, max_forward_velocity, max_steering_angle) + + def run(self, angular_velocity:float) -> float: + return angular_velocity / self.max_angular_velocity + + +class BicycleUnnormalizeAngularVelocity: + """ + For a car-like vehicle, convert a normalized angular velocity in range -1 to 1 + into a real angular velocity in radians per second. + """ + def __init__(self, wheel_base:float, max_forward_velocity:float, max_steering_angle:float) -> None: + self.max_angular_velocity = bicycle_angular_velocity(wheel_base, max_forward_velocity, max_steering_angle) + + def run(self, normalized_angular_velocity:float) -> float: + if abs(normalized_angular_velocity) > 1: + logger.error("Warning: normalized_angular_velocity must be between -1 and 1") + return normalized_angular_velocity * self.max_angular_velocity + + +class Unicycle: + """ + Unicycle forward kinematics takes the output of the + left and right odometers and + turns those into: + - forward distance and velocity, + - pose; angle aligned (x,y) position and orientation in radians + - pose velocity; change in angle aligned position and orientation per second + axle_length: distance between the two drive wheels + wheel_radius: radius of wheel; must be in same units as axle_length + It is assumed that both wheels have the same radius + see http://faculty.salina.k-state.edu/tim/robotics_sg/Control/kinematics/unicycle.html + """ + def __init__(self, axle_length:float, debug=False): + self.axle_length:float = axle_length + self.debug = debug + self.timestamp:float = 0 + self.left_distance:float = 0 + self.right_distance:float = 0 + self.velocity:float = 0 + self.pose = Pose2D() + self.pose_velocity = Pose2D() + self.running:bool = True + + def run(self, left_distance:float, right_distance:float, timestamp:float=None) -> Tuple[float, float, float, float, float, float, float, float, float]: + """ + params + left_distance: distance left wheel has travelled + right_distance: distance right wheel has travelled + timestamp: time of distance readings or None to use current time + returns + distance + velocity + x is horizontal position of point midway between wheels + y is vertical position of point midway between wheels + angle is orientation in radians around point midway between wheels + x' is the horizontal velocity + y' is the vertical velocity + angle' is the angular velocity + timestamp + + """ + if timestamp is None: + timestamp = time.time() + + if self.running: + if 0 == self.timestamp: + self.timestamp = timestamp + self.left_distance = left_distance + self.right_distance = right_distance + self.velocity=0 + self.pose = Pose2D() + self.pose_velocity = Pose2D() + self.timestamp = timestamp + elif timestamp > self.timestamp: + # + # changes from last run + # + delta_left_distance = left_distance - self.left_distance + delta_right_distance = right_distance - self.right_distance + delta_distance = (delta_left_distance + delta_right_distance) / 2 + delta_angle = (delta_right_distance - delta_left_distance) / self.axle_length + delta_time = timestamp - self.timestamp + + forward_velocity = delta_distance / delta_time + angle_velocity = delta_angle / delta_time + + # + # new position and orientation + # + estimated_angle = limit_angle(self.pose.angle + delta_angle / 2) + x = self.pose.x + delta_distance * math.cos(estimated_angle) + y = self.pose.y + delta_distance * math.sin(estimated_angle) + angle = limit_angle(self.pose.angle + delta_angle) + + # + # new velocities + # + self.pose_velocity.x = (x - self.pose.x) / delta_time + self.pose_velocity.y = (y - self.pose.y) / delta_time + self.pose_velocity.angle = angle_velocity + + # + # update pose + # + self.pose.x = x + self.pose.y = y + self.pose.angle = angle + + # + # update odometry + # + self.left_distance = left_distance + self.right_distance = right_distance + self.velocity = forward_velocity + + self.timestamp = timestamp + + return ( + (self.left_distance + self.right_distance) / 2, + self.velocity, + self.pose.x, self.pose.y, self.pose.angle, + self.pose_velocity.x, self.pose_velocity.y, self.pose_velocity.angle, + self.timestamp + ) + + return 0, 0, 0, 0, 0, 0, 0, 0, self.timestamp + + def shutdown(self): + self.running = False + + +class InverseUnicycle: + """ + Unicycle inverse kinematics that converts forward velocity and + angular orientation velocity into invidual linear wheel velocities + in a differential drive robot. + """ + def __init__(self, axle_length:float, wheel_radius:float, min_speed:float, max_speed:float, steering_zero:float=0.01, debug=False): + self.axle_length:float = axle_length + self.wheel_radius:float = wheel_radius + self.min_speed:float = min_speed + self.max_speed:float = max_speed + self.steering_zero:float = steering_zero + self.timestamp = 0 + self.debug = debug + + self.wheel_diameter = 2 * wheel_radius + self.wheel_circumference = math.pi * self.wheel_diameter + + def run(self, forward_velocity:float, angular_velocity:float, timestamp:float=None) -> Tuple[float, float, float]: + """ + Convert turning velocity in radians and forward velocity (like meters per second) + into left and right linear wheel speeds that result in that forward speed + at that turning angle + see http://faculty.salina.k-state.edu/tim/robotics_sg/Control/kinematics/unicycle.html#calculating-wheel-velocities + + @parma forward_velocity:float in meters per second + @param angular_velocity:float in radians per second + @param timestamp:float epoch seconds or None to use current time + @return tuple + - left_wheel_velocity: in meters per second + - right_wheel_velocity in meters per second + - timestamp + """ + if timestamp is None: + timestamp = time.time() + + left_linear_speed = forward_velocity - angular_velocity * self.axle_length / 2 + right_linear_speed = forward_velocity + angular_velocity * self.axle_length / 2 + + self.timestamp = timestamp + + # left/right linear speeds and timestamp + return left_linear_speed, right_linear_speed, timestamp + + def shutdown(self): + pass + + +def unicycle_angular_velocity(wheel_radius:float, axle_length:float, left_velocity:float, right_velocity:float) -> float: + """ + Calculate angular velocity for the unicycle vehicle. + For differential drive, calculate angular velocity + using the unicycle model and linear wheel velocities. + """ + # + # since angular_velocity = wheel_radius / axle_length * (right_rotational_velocity - left_rotational_velocity) + # where wheel rotational velocity is in radians per second. + # + right_rotational_velocity = wheel_rotational_velocity(wheel_radius, right_velocity) + left_rotational_velocity = wheel_rotational_velocity(wheel_radius, left_velocity) + return wheel_radius / axle_length * (right_rotational_velocity - left_rotational_velocity) + + +def unicycle_max_angular_velocity(wheel_radius:float, axle_length:float, max_forward_velocity:float) -> float: + """ + Calculate maximum angular velocity for the vehicle, so we can convert between + normalized and unnormalized forms of the angular velocity. + For differential drive, calculate maximum angular velocity + using the unicycle model and assuming one + one wheel is stopped and one wheel is at max velocity. + """ + # + # since angular_velocity = wheel_radius / axle_length * (right_rotational_velocity - left_rotational_velocity) + # where wheel rotational velocity is in radians per second. + # then if we drive the right wheel at maximum velocity and keep the left wheel stopped + # we get max_angular_velocity = wheel_radius / axle_length * max_forward_velocity + # + return unicycle_angular_velocity(wheel_radius, axle_length, 0, max_forward_velocity) + + +class UnicycleNormalizeAngularVelocity: + """ + For a differential drive vehicle, convert an angular velocity in radians per second + to a value between -1 and 1 inclusive. + """ + def __init__(self, wheel_radius:float, axle_length:float, max_forward_velocity:float) -> None: + self.max_angular_velocity = unicycle_max_angular_velocity(wheel_radius, axle_length, max_forward_velocity) + + def run(self, angular_velocity:float) -> float: + return angular_velocity / self.max_angular_velocity + + +class UnicycleUnnormalizeAngularVelocity: + """ + For a differential drive vehicle, convert a normalized angular velocity in range -1 to 1 + into a real angular velocity in radians per second. + """ + def __init__(self, wheel_radius:float, axle_length:float, max_forward_velocity:float) -> None: + self.max_angular_velocity = unicycle_max_angular_velocity(wheel_radius, axle_length, max_forward_velocity) + + def run(self, normalized_angular_velocity:float) -> float: + if abs(normalized_angular_velocity) > 1: + logger.error("Warning: normalized_angular_velocity must be between -1 and 1") + return normalized_angular_velocity * self.max_angular_velocity + + +class NormalizeSteeringAngle: + """ + Part to convert real steering angle in radians + to a to a normalize steering value in range -1 to 1 + """ + def __init__(self, max_steering_angle:float, steering_zero:float=0.0) -> None: + """ + @param max_steering_angle:float measured maximum steering angle in radians + @param steering_zero:float value at or below which normalized steering values + are considered to be zero. + """ + self.max_steering_angle = max_steering_angle + self.steering_zero = steering_zero + + def run(self, steering_angle) -> float: + """ + @param steering angle in radians where + positive radians is a left turn, + negative radians is a right turn + @return a normalized steering value in range -1 to 1, where + -1 is full left, corresponding to positive max_steering_angle + 1 is full right, corresponding to negative max_steering_angle + """ + if not is_number_type(steering_angle): + logger.error("steering angle must be a number.") + return 0 + + steering = steering_angle / self.max_steering_angle + if abs(steering) <= self.steering_zero: + return 0 + return -steering # positive steering angle is negative normalized + + def shutdown(self): + pass + + +class UnnormalizeSteeringAngle: + """ + Part to convert normalized steering in range -1 to 1 + to a to real steering angle in radians + """ + def __init__(self, max_steering_angle:float, steering_zero:float=0.0) -> None: + """ + @param max_steering_angle:float measured maximum steering angle in radians + @param steering_zero:float value at or below which normalized steering values + are considered to be zero. + """ + self.max_steering_angle = max_steering_angle + self.steering_zero = steering_zero + + def run(self, steering) -> float: + """ + @param a normalized steering value in range -1 to 1, where + -1 is full left, corresponding to positive max_steering_angle + 1 is full right, corresponding to negative max_steering_angle + @return steering angle in radians where + positive radians is a left turn, + negative radians is a right turn + """ + if not is_number_type(steering): + logger.error("steering must be a number") + return 0 + + if steering > 1 or steering < -1: + logger.warn(f"steering = {steering}, but must be between 1(right) and -1(left)") + + steering = clamp(steering, -1, 1) + + s = sign(steering) + steering = abs(steering) + if steering <= self.steering_zero: + return 0.0 + + return self.max_steering_angle * steering * -s + + def shutdown(self): + pass + + +def wheel_rotational_velocity(wheel_radius:float, speed:float) -> float: + """ + Convert a forward speed to wheel rotational speed in radians per second. + Units like wheel_radius in meters and speed in meters per second + results in radians per second rotational wheel speed. + + @wheel_radius:float radius of wheel in same distance units as speed + @speed:float speed in distance units compatible with radius + @return:float wheel's rotational speed in radians per second + """ + return speed / wheel_radius + + +def differential_steering(throttle: float, steering: float, steering_zero: float = 0) -> Tuple[float, float]: """ Turn steering angle and speed/throttle into left and right wheel speeds/throttle. diff --git a/donkeycar/parts/odometer.py b/donkeycar/parts/odometer.py new file mode 100644 index 000000000..e8065575a --- /dev/null +++ b/donkeycar/parts/odometer.py @@ -0,0 +1,63 @@ +import time +from typing import Tuple + +from donkeycar.utilities.circular_buffer import CircularBuffer + + +class Odometer: + """ + An Odometer takes the output of a Tachometer (revolutions) and + turns those into a distance and velocity. Velocity can be + optionally smoothed across a given number of readings. + """ + def __init__(self, distance_per_revolution:float, smoothing_count=1, debug=False): + self.distance_per_revolution:float = distance_per_revolution + self.timestamp:float = 0 + self.revolutions:float = 0 + self.running:bool = True + self.queue = CircularBuffer(smoothing_count if smoothing_count >= 1 else 1) + self.debug = debug + self.reading = (0, 0, None) # distance, velocity, timestamp + + def poll(self, revolutions:int, timestamp:float=None): + if self.running: + if timestamp is None: + timestamp = time.time() + distance = revolutions * self.distance_per_revolution + + # smooth velocity + velocity = 0 + if self.queue.count > 0: + lastDistance, lastVelocity, lastTimestamp = self.queue.tail() + if timestamp > lastTimestamp: + velocity = (distance - lastDistance) / (timestamp - lastTimestamp) + self.queue.enqueue((distance, velocity, timestamp)) + + # + # Assignment in Python is atomic and so it is threadsafe + # + self.reading = (distance, velocity, timestamp) + + def update(self): + while(self.running): + self.poll(self.revolutions, None) + time.sleep(0) # give other threads time + + def run_threaded(self, revolutions:float=0, timestamp:float=None) -> Tuple[float, float, float]: + if self.running: + self.revolutions = revolutions + self.timestamp = timestamp if timestamp is not None else time.time() + + return self.reading + return 0, 0, self.timestamp + + def run(self, revolutions:float=0, timestamp:float=None) -> Tuple[float, float, float]: + if self.running: + self.timestamp = timestamp if timestamp is not None else time.time() + self.poll(revolutions, self.timestamp) + + return self.reading + return 0, 0, self.timestamp + + def shutdown(self): + self.running = False diff --git a/donkeycar/parts/path.py b/donkeycar/parts/path.py index ef42a6884..c0bd4985c 100644 --- a/donkeycar/parts/path.py +++ b/donkeycar/parts/path.py @@ -244,10 +244,14 @@ def run(self, img, path): for iP in range(0, len(path) - 1): ax, ay = path[iP] bx, by = path[iP + 1] + + # + # y increases going north, so handle this with scale + # self.plot_line(ax * self.scale + self.offset[0], - ay * self.scale + self.offset[1], + ay * -self.scale + self.offset[1], bx * self.scale + self.offset[0], - by * self.scale + self.offset[1], + by * -self.scale + self.offset[1], draw, color) return img @@ -279,7 +283,7 @@ def plot_circle(self, x, y, rad, draw, color, width=1): def run(self, img, x, y): draw = ImageDraw.Draw(img) self.plot_circle(x * self.scale + self.offset[0], - y * self.scale + self.offset[1], + y * -self.scale + self.offset[1], # y increases going north self.radius, draw, self.color) diff --git a/donkeycar/parts/pigpio_enc.py b/donkeycar/parts/pigpio_enc.py index 298098c9e..58b8b4032 100644 --- a/donkeycar/parts/pigpio_enc.py +++ b/donkeycar/parts/pigpio_enc.py @@ -1,6 +1,13 @@ +from donkeycar.utilities.deprecated import deprecated import pigpio import time +# +# contents of this file are deprecated +# in favor of donkeycar.parts.tachometer and donkeycar.parts.odometer +# + +@deprecated("Deprecated in favor of donkeycar.parts.odometer.Odometer") class OdomDist(object): """ Take a tick input from odometry and compute the distance travelled @@ -51,6 +58,7 @@ def run(self, ticks, throttle): return self.meters, self.meters_per_second, distance +@deprecated("Deprecated in favor of donkeycar.parts.tachometer.GpioTachometer") class PiPGIOEncoder(): def __init__(self, pin, pi): self.pin = pin diff --git a/donkeycar/parts/pins.py b/donkeycar/parts/pins.py index 2a8631db8..f9c4091c5 100644 --- a/donkeycar/parts/pins.py +++ b/donkeycar/parts/pins.py @@ -76,7 +76,7 @@ def __init__(self) -> None: def start(self, on_input=None, edge: int = PinEdge.RISING) -> None: """ Start the pin in input mode. - :param on_input: function to call when an edge is detected, or None to ignore + :param on_input: no-arg function to call when an edge is detected, or None to ignore :param edge: type of edge(s) that trigger on_input; default is PinEdge.RISING This raises a RuntimeError if the pin is already started. You can check to see if the pin is started by calling @@ -422,7 +422,7 @@ def gpio_fn(pin_scheme:int, fn:Callable[[], Any]): GPIO.setmode(pin_scheme) elif prev_scheme != pin_scheme: raise RuntimeError(f"Attempt to change GPIO pin scheme from ({prev_scheme}) to ({pin_scheme})" - " after it has been set. All RPi.GPIO user must use the same pin scheme.") + " after it has been set. All RPi.GPIO pins must use the same pin scheme.") val = fn() return val @@ -436,6 +436,7 @@ def __init__(self, pin_number: int, pin_scheme: str, pull: int = PinPull.PULL_NO """ self.pin_number = pin_number self.pin_scheme = gpio_pin_scheme[pin_scheme] + self.pin_scheme_str = pin_scheme self.pull = pull self.on_input = None self._state = PinState.NOT_STARTED @@ -443,15 +444,18 @@ def __init__(self, pin_number: int, pin_scheme: str, pull: int = PinPull.PULL_NO def _callback(self, pin_number): if self.on_input is not None: - self.on_input(self.pin_number, self.input()) + self.on_input() def start(self, on_input=None, edge=PinEdge.RISING) -> None: """ - :param on_input: function to call when an edge is detected, or None to ignore + :param on_input: no-arg function to call when an edge is detected, or None to ignore :param edge: type of edge(s) that trigger on_input; default is """ if self.state() != PinState.NOT_STARTED: raise RuntimeError(f"Attempt to start InputPinGpio({self.pin_number}) that is already started.") + + # stop the pin so it is not in use, then start it + gpio_fn(self.pin_scheme, lambda: GPIO.cleanup(self.pin_number)) gpio_fn(self.pin_scheme, lambda: GPIO.setup(self.pin_number, GPIO.IN, pull_up_down=gpio_pin_pull[self.pull])) if on_input is not None: self.on_input = on_input @@ -459,11 +463,14 @@ def start(self, on_input=None, edge=PinEdge.RISING) -> None: self.pin_scheme, lambda: GPIO.add_event_detect(self.pin_number, gpio_pin_edge[edge], callback=self._callback)) self.input() # read first state + logger.info(f"InputPin 'RPI_GPIO.{self.pin_scheme_str}.{self.pin_number}' started.") def stop(self) -> None: if self.state() != PinState.NOT_STARTED: + self.on_input = None gpio_fn(self.pin_scheme, lambda: GPIO.cleanup(self.pin_number)) self._state = PinState.NOT_STARTED + logger.info(f"InputPin 'RPI_GPIO.{self.pin_scheme_str}.{self.pin_number}' stopped.") def state(self) -> int: return self._state @@ -479,19 +486,24 @@ class OutputPinGpio(OutputPin): """ def __init__(self, pin_number: int, pin_scheme: str) -> None: self.pin_number = pin_number + self.pin_scheme_str = pin_scheme self.pin_scheme = gpio_pin_scheme[pin_scheme] self._state = PinState.NOT_STARTED def start(self, state: int = PinState.LOW) -> None: if self.state() != PinState.NOT_STARTED: raise RuntimeError(f"Attempt to start OutputPinGpio({self.pin_number}) that is already started.") + # first stop the pin so we know it is not in use, then start it. + gpio_fn(self.pin_scheme, lambda: GPIO.cleanup(self.pin_number)) gpio_fn(self.pin_scheme, lambda: GPIO.setup(self.pin_number, GPIO.OUT)) self.output(state) + logger.info(f"OutputPin 'RPI_GPIO.{self.pin_scheme_str}.{self.pin_number}' started.") def stop(self) -> None: if self.state() != PinState.NOT_STARTED: gpio_fn(self.pin_scheme, lambda: GPIO.cleanup(self.pin_number)) self._state = PinState.NOT_STARTED + logger.info(f"OutputPin 'RPI_GPIO.{self.pin_scheme_str}.{self.pin_number}' stopped.") def state(self) -> int: return self._state @@ -507,6 +519,7 @@ class PwmPinGpio(PwmPin): """ def __init__(self, pin_number: int, pin_scheme: str, frequency_hz: float = 50) -> None: self.pin_number = pin_number + self.pin_scheme_str = pin_scheme self.pin_scheme = gpio_pin_scheme[pin_scheme] self.frequency = int(frequency_hz) self.pwm = None @@ -517,17 +530,23 @@ def start(self, duty: float = 0) -> None: raise RuntimeError("Attempt to start PwmPinGpio that is already started.") if duty < 0 or duty > 1: raise ValueError("duty_cycle must be in range 0 to 1") + + # stop the pin so we know it is not in use, then start it + gpio_fn(self.pin_scheme, lambda: GPIO.cleanup(self.pin_number)) gpio_fn(self.pin_scheme, lambda: GPIO.setup(self.pin_number, GPIO.OUT)) self.pwm = gpio_fn(self.pin_scheme, lambda: GPIO.PWM(self.pin_number, self.frequency)) self.pwm.start(duty * 100) # takes duty in range 0 to 100 self._state = duty + logger.info(f"PwmPin 'RPI_GPIO.{self.pin_scheme_str}.{self.pin_number}' started.") def stop(self) -> None: if self.pwm is not None: self.pwm.stop() gpio_fn(self.pin_scheme, lambda: GPIO.cleanup(self.pin_number)) + logger.info(f"PwmPin 'RPI_GPIO.{self.pin_scheme_str}.{self.pin_number}' stopped.") self._state = PinState.NOT_STARTED + def state(self) -> float: return self._state @@ -757,12 +776,12 @@ def __del__(self): def _callback(self, gpio, level, tock): if self.on_input is not None: - self.on_input(gpio, level) + self.on_input() def start(self, on_input=None, edge=PinEdge.RISING) -> None: """ Start the input pin and optionally set callback. - :param on_input: function to call when an edge is detected, or None to ignore + :param on_input: no-arg function to call when an edge is detected, or None to ignore :param edge: type of edge(s) that trigger on_input; default is PinEdge.RISING """ if self.state() != PinState.NOT_STARTED: @@ -984,11 +1003,12 @@ def duty_cycle(self, duty: float) -> None: 'both': PinEdge.BOTH } - def on_input(pin_number, state): + def on_input(): + state = ttl_in_pin.input() if state == PinState.HIGH: - print("+", pin_number, time.time()*1000) + print("+", ttl_in_pin.pin_number, time.time()*1000) elif state == PinState.LOW: - print("-", pin_number, time.time()*1000) + print("-", ttl_in_pin.pin_number, time.time()*1000) pwm_out_pin: PwmPin = None ttl_out_pin: OutputPin = None @@ -1041,3 +1061,5 @@ def on_input(pin_number, state): pwm_out_pin.stop() if ttl_out_pin is not None: ttl_out_pin.stop() + if ttl_in_pin is not None: + ttl_in_pin.stop() diff --git a/donkeycar/parts/pose.py b/donkeycar/parts/pose.py new file mode 100644 index 000000000..fa4e43b29 --- /dev/null +++ b/donkeycar/parts/pose.py @@ -0,0 +1,262 @@ +import logging +import time +from typing import Tuple + +from donkeycar.utils import is_number_type +from donkeycar.parts.kinematics import NormalizeSteeringAngle, UnnormalizeSteeringAngle, TwoWheelSteeringThrottle +from donkeycar.parts.kinematics import Unicycle, InverseUnicycle, UnicycleUnnormalizeAngularVelocity +from donkeycar.parts.kinematics import Bicycle, InverseBicycle, BicycleUnnormalizeAngularVelocity +from donkeycar.parts.kinematics import differential_steering +from donkeycar.parts.tachometer import Tachometer, MockEncoder + +logger = logging.getLogger(__name__) + +class UnicycleDistance: + def run(self, left, right): + if is_number_type(left) and is_number_type(right): + return (left + right) / 2.0 + else: + logger.error("left and right must be floats") + return 0.0 + + +class BicyclePose: + """ + Part that integrates encoder+tachometer+odometer+kinematics + for the purposes of estimating the pose of a car-like vehicle over time. + """ + def __init__(self, cfg, poll_delay_secs:float=0): + from donkeycar.parts.serial_port import SerialPort + from donkeycar.parts.tachometer import (Tachometer, SerialEncoder, + GpioEncoder, EncoderChannel, + MockEncoder) + from donkeycar.parts.odometer import Odometer + + distance_per_revolution = cfg.WHEEL_RADIUS * 2 * 3.141592653589793 + + self.poll_delay_secs = poll_delay_secs + self.encoder = None + self.tachometer = None + self.odometer = None + self.steerer = None + self.bicycle = None + self.Running = False + + self.inputs = (0, 0) # (throttle, steering) + self.reading = (0, 0, 0, 0, 0, 0, 0, 0, None) + + if cfg.ENCODER_TYPE == "GPIO": + from donkeycar.parts import pins + self.encoder = GpioEncoder(gpio_pin=pins.input_pin_by_id(cfg.ODOM_PIN), + debounce_ns=cfg.ENCODER_DEBOUNCE_NS, + debug=cfg.ODOM_DEBUG) + elif cfg.ENCODER_TYPE == "arduino": + self.encoder = SerialEncoder(serial_port=SerialPort(cfg.ODOM_SERIAL, cfg.ODOM_SERIAL_BAUDRATE), debug=cfg.ODOM_DEBUG) + elif cfg.ENCODER_TYPE.upper() == "MOCK": + self.encoder = MockEncoder(cfg.MOCK_TICKS_PER_SECOND) + else: + print("No supported encoder found") + + if self.encoder: + self.tachometer = Tachometer( + self.encoder, + ticks_per_revolution=cfg.ENCODER_PPR, + direction_mode=cfg.TACHOMETER_MODE, + poll_delay_secs=1.0 / (cfg.DRIVE_LOOP_HZ * 3), + debug=cfg.ODOM_DEBUG) + + if self.tachometer: + self.odometer = Odometer( + distance_per_revolution=distance_per_revolution, + smoothing_count=cfg.ODOM_SMOOTHING, + debug=cfg.ODOM_DEBUG) + + self.steerer = UnnormalizeSteeringAngle(cfg.MAX_STEERING_ANGLE) + self.bicycle = Bicycle(cfg.WHEEL_BASE, cfg.ODOM_DEBUG) + self.running = True + else: + logger.error("Unable to initialize BicyclePose part") + + def poll(self, timestamp=None): + if self.running: + if self.tachometer: + throttle, steering = self.inputs + if isinstance(self.encoder, MockEncoder): + self.encoder.run(throttle, timestamp) + revolutions, timestamp = self.tachometer.run(throttle, timestamp) + distance, velocity, timestamp = self.odometer.run(revolutions, timestamp) + steering_angle = self.steerer.run(steering) + self.reading = self.bicycle.run(distance, steering_angle, timestamp) + + def update(self): + """ + This will get called on it's own thread. + throttle: sign of throttle is use used to determine direction. + timestamp: timestamp for update or None to use current time. + This is useful for creating deterministic tests. + """ + while self.running: + self.poll() + time.sleep(self.poll_delay_secs) # give other threads time + + def run_threaded(self, throttle:float=0.0, steering:float=0.0, timestamp:float=None) -> Tuple[float, float, float, float, float, float, float, float, float]: + if self.running and self.tachometer: + if throttle is None: + throttle = 0 + if steering is None: + steering = 0 + + self.inputs = (throttle, steering) + + return self.reading + + return 0, 0, 0, 0, 0, 0, 0, 0, timestamp + + def run(self, throttle:float=0.0, steering:float=0.0, timestamp:float=None) -> Tuple[float, float, float, float, float, float, float, float, float]: + if self.running and self.tachometer: + if throttle is None: + throttle = 0 + if steering is None: + steering = 0 + + self.inputs = (throttle, steering) + self.poll(timestamp) + + return self.reading + + return 0, 0, 0, 0, 0, 0, 0, 0, timestamp + + +class UnicyclePose: + """ + Part that integrates encoders+tachometers+odometers+kinematics + for the purposes of estimating the pose of a + differential drive vehicle over time. + """ + def __init__(self, cfg, poll_delay_secs:float=0): + from donkeycar.parts.serial_port import SerialPort + from donkeycar.parts.tachometer import (SerialEncoder, + GpioEncoder, EncoderChannel) + from donkeycar.parts.odometer import Odometer + + # distance_per_revolution = cfg.ENCODER_PPR * cfg.MM_PER_TICK / 1000 + distance_per_revolution = cfg.WHEEL_RADIUS * 2 * 3.141592653589793 + + self.poll_delay_secs = poll_delay_secs + self.left_throttle = 0 + self.right_throttle = 0 + self.encoder = None + self.tachometer = None + self.odometer = None + self.unicycle = None + + self.inputs = (0, 0) # (left_throttle, right_throttle) + self.reading = (0, 0, 0, 0, 0, 0, 0, 0, None) + + if cfg.ENCODER_TYPE == "GPIO": + from donkeycar.parts import pins + self.encoder = [ + GpioEncoder(gpio_pin=pins.input_pin_by_id(cfg.ODOM_PIN), + debounce_ns=cfg.ENCODER_DEBOUNCE_NS, + debug=cfg.ODOM_DEBUG), + GpioEncoder(gpio_pin=pins.input_pin_by_id(cfg.ODOM_PIN_2), + debounce_ns=cfg.ENCODER_DEBOUNCE_NS, + debug=cfg.ODOM_DEBUG) + ] + elif cfg.ENCODER_TYPE == "arduino": + serial_encoder = SerialEncoder(serial_port=SerialPort(cfg.ODOM_SERIAL, cfg.ODOM_SERIAL_BAUDRATE), debug=cfg.ODOM_DEBUG) + self.encoder = [ + serial_encoder, + EncoderChannel(encoder=serial_encoder, channel=1) + ] + elif cfg.ENCODER_TYPE.upper() == "MOCK": + self.encoder = [ + MockEncoder(cfg.MOCK_TICKS_PER_SECOND), + MockEncoder(cfg.MOCK_TICKS_PER_SECOND), + ] + else: + print("No supported encoder found") + + if self.encoder: + self.tachometer = [ + Tachometer( + self.encoder[0], + ticks_per_revolution=cfg.ENCODER_PPR, + direction_mode=cfg.TACHOMETER_MODE, + poll_delay_secs=0, + debug=cfg.ODOM_DEBUG), + Tachometer( + self.encoder[1], + ticks_per_revolution=cfg.ENCODER_PPR, + direction_mode=cfg.TACHOMETER_MODE, + poll_delay_secs=0, + debug=cfg.ODOM_DEBUG) + ] + + if self.tachometer: + self.odometer = [ + Odometer( + distance_per_revolution=distance_per_revolution, + smoothing_count=cfg.ODOM_SMOOTHING, + debug=cfg.ODOM_DEBUG), + Odometer( + distance_per_revolution=distance_per_revolution, + smoothing_count=cfg.ODOM_SMOOTHING, + debug=cfg.ODOM_DEBUG) + ] + self.unicycle = Unicycle(cfg.AXLE_LENGTH, cfg.ODOM_DEBUG) + self.running = True + + def poll(self, timestamp=None): + if self.running: + if self.tachometer: + left_timestamp = timestamp + right_timestamp = timestamp + left_throttle, right_throttle = self.inputs + if isinstance(self.encoder[0], MockEncoder): + self.encoder[0].run(left_throttle, left_timestamp) + self.encoder[1].run(right_throttle, right_timestamp) + + left_revolutions, left_timestamp = self.tachometer[0].run(left_throttle, left_timestamp) + right_revolutions, right_timestamp = self.tachometer[1].run(right_throttle, right_timestamp) + + left_distance, left_velocity, left_timestamp = self.odometer[0].run(left_revolutions, left_timestamp) + right_distance, right_velocity, right_timestamp = self.odometer[1].run(right_revolutions, right_timestamp) + + self.reading = self.unicycle.run(left_distance, right_distance, right_timestamp) + + def update(self): + """ + This will get called on it's own thread. + throttle: sign of throttle is use used to determine direction. + timestamp: timestamp for update or None to use current time. + This is useful for creating deterministic tests. + """ + while self.running: + self.poll() + time.sleep(self.poll_delay_secs) # give other threads time + + def run_threaded(self, throttle:float=0.0, steering:float=0.0, timestamp:float=None) -> Tuple[float, float, float, float, float, float, float, float, float]: + if self.running and self.tachometer: + if throttle is None: + throttle = 0 + if steering is None: + steering = 0 + + self.inputs = differential_steering(throttle, steering) + return self.reading + + return 0, 0, 0, 0, 0, 0, 0, 0, timestamp + + def run(self, throttle:float=0.0, steering:float=0.0, timestamp:float=None) -> Tuple[float, float, float, float, float, float, float, float, float]: + if self.running and self.tachometer: + if throttle is None: + throttle = 0 + if steering is None: + steering = 0 + + self.inputs = differential_steering(throttle, steering) + self.poll(timestamp) + return self.reading + + return 0, 0, 0, 0, 0, 0, 0, 0, timestamp diff --git a/donkeycar/parts/tachometer.py b/donkeycar/parts/tachometer.py new file mode 100644 index 000000000..26d7b02fa --- /dev/null +++ b/donkeycar/parts/tachometer.py @@ -0,0 +1,732 @@ +from abc import (ABC, abstractmethod) +import logging +import threading +import time +from typing import Tuple + +from donkeycar.utils import is_number_type +from donkeycar.parts.serial_port import SerialPort +from donkeycar.parts.pins import InputPin, PinEdge + + +logger = logging.getLogger("donkeycar.parts.tachometer") + + +def sign(value) -> int: + if value > 0: + return 1 + if value < 0: + return -1 + return 0 + + +class EncoderMode: + FORWARD_ONLY = 1 # only sum ticks (ticks may be signed) + FORWARD_REVERSE = 2 # subtract ticks if throttle is negative + FORWARD_REVERSE_STOP = 3 # ignore ticks if throttle is zero + + +class AbstractEncoder(ABC): + """ + Interface for an encoder. + To create a new encoder class, subclass from + AbstractEncoder and implement + start_ticks(), stop_ticks() and poll_ticks(). + Tachometer() takes an encoder in it's contructor. + """ + @abstractmethod + def start_ticks(self): + """Initialize the encoder""" + pass + + @abstractmethod + def stop_ticks(self): + """release the encoder resources""" + pass + + @abstractmethod + def poll_ticks(self, direction:int): + """ + Update the encoder ticks + direction: 1 if forward, -1 if reverse, 0 if stopped. + + This will request a new value from the encoder. + """ + pass + + @abstractmethod + def get_ticks(self, encoder_index:int=0) -> int: + """ + Get last polled encoder ticks + encoder_index: zero based index of encoder. + return: Most recently polled encoder ticks + + This will return the value from the + most recent call to poll_ticks(). It + will not request new values from the encoder. + """ + return 0 + + +class SerialEncoder(AbstractEncoder): + """ + Encoder that requests tick count over serial port. + The other end is typically a microcontroller that + is reading an encoder, which may be a single-channel + encoder (so ticks only increase) or a quarature encoder + (so ticks may increment or decrement). + + Quadrature encoders can detect when the + encoder is going forward, backward or stopped. + For such encoders, use the default direction mode, + FORWARD_ONLY, and changes in tick count will correctly + be summed to the current tick count. + + Single channel encoders cannot encode direction; + their count will only ever increase. So this part + can take the signed throttle value as direction and + use it to decide if the ticks should be incremented + or decremented. + For vehicles that 'coast' at zero throttle, choose + FORWARD_BACKWARD direction mode so we continue to + integrate ticks while coasting. + For vehicles with brakes or that otherwise stop quickly, + choose FORWARD_BACKWARD_STOP direction mode + so encoder noise is not integrated while stopped. + + This part assumes a microcontroller connected via + serial port that implements the following + 'r/p/c' protocol: + + Commands are sent to the microcontroller + one per line (ending in '\n'): + 'r' command resets position to zero + 'p' command sends position immediately + 'c' command starts/stops continuous mode + - if it is followed by an integer, + then use this as the delay in ms + between readings. + - if it is not followed by an integer + then stop continuous mode + + The microcontroller sends one reading per line. + Each reading includes the tick count and the time + that the reading was taken, separated by a comma + and ending in a newline. Readings for multiple + encoders are separated by colons. + + {ticks},{milliseconds};{ticks},{milliseconds}\n + + There is an example arduino sketch that implements the + 'r/p/c' protocol using the teensy encoder library at + donkeycar/arduino/encoder/encoder.ino The sketch + presumes a quadrature encoder connect to pins 2 & 3 + of an arduino. If you have a different microcontroller + or want to use different pins or if you want to + use a single-channel encoder, then modify that sketch. + + """ + def __init__(self, serial_port:SerialPort=None, debug=False): + if serial_port is None: + raise ValueError("serial_port must be an instance of SerialPort") + self.ser = serial_port + self.ticks = [0,0] + self.lasttick = [0,0] + self.lock = threading.Lock() + self.buffered_ticks = [] + self.running = False + + def start_ticks(self): + self.ser.start() + self.ser.writeln('r') # restart the encoder to zero + # if self.poll_delay_secs > 0: + # # start continuous mode if given non-zero delay + # self.ser.writeln("c" + str(int(self.poll_delay_secs * 1000))) + self.ser.writeln('p') # ask for the first ticks + self.running = True + + def stop_ticks(self): + self.running = False + if self.ser is not None: + self.ser.stop() + + # + # TODO: serious bug; we need to have independant directions for each encoder; + # either poll_ticks takes encoder index or we keep track of direction for each channel. + # This is not an immediate problem because we never try to drive left and right wheels in the opposite direction. + # + def poll_ticks(self, direction:int): + """ + read the encoder ticks + direction: 1 if forward, -1 if reverse, 0 if stopped. + return: updated encoder ticks + """ + # + # If there are characters waiting, + # then read from the serial port. + # Read all lines and use the last one + # because it has the most recent reading + # + input = '' + while (self.running and (self.ser.buffered() > 0) and (input == "")): + _, input = self.ser.readln() + + # + # queue up another reading by sending the "p" command to the Arduino + # + self.ser.writeln('p') + + # + # if we got data, update the ticks + # if we got no data, then use last ticks we read + # + # data for encoders is separated by semicolon + # and ticks and time for single encoder + # is comma separated. + # + # "ticks,ticksMs;ticks,ticksMs" + # + if input != '': + try: + # + # split separate encoder outputs + # "ticks,time;ticks,time" -> ["ticks,time", "ticks,time"] + # + values = [s.strip() for s in input.split(';')] + + # + # split tick/time pairs for each encoder + # ["ticks,time", "ticks,time"] -> [["ticks", "time"], ["ticks", "time"]] + # + values = [v.split(',') for v in values] + for i in range(len(values)): + total_ticks = int((values[i][0]).strip()) + + delta_ticks = 0 + if i < len(self.lasttick): + delta_ticks = total_ticks - self.lasttick[i] + + # + # save in our threadsafe buffers. + # Grow buffers to handle the channels that + # the microcontroller is returning so we + # don't have to keep configuration sync'd. + # + if i < len(self.lasttick): + self.lasttick[i] = total_ticks + else: + self.lasttick.append(total_ticks) + + if i < len(self.buffered_ticks): + self.buffered_ticks[i] += delta_ticks * direction + else: + self.buffered_ticks.append(delta_ticks * direction) + except ValueError: + logger.error("failure parsing encoder values from serial") + + # + # if we can get the lock, + # - then update the shared global + # - clear the buffered values + # if we can't get the lock, then just skip until next cycle + # + if self.lock.acquire(blocking=False): + try: + for i in range(len(self.buffered_ticks)): + # + # grow array to handle the channels that + # the microcontroller is returning + # + if i < len(self.ticks): + self.ticks[i] += self.buffered_ticks[i] + else: + self.ticks.append(self.buffered_ticks[i]) + self.buffered_ticks[i] = 0 + finally: + self.lock.release() + + def get_ticks(self, encoder_index:int=0) -> int: + """ + Get last polled encoder ticks + encoder_index: zero based index of encoder. + return: Most recently polled encoder ticks + + This will return the same value as the + most recent call to poll_ticks(). It + will not request new values from the encoder. + It will not block. + """ + with self.lock: + return self.ticks[encoder_index] if encoder_index < len(self.ticks) else 0 + + +class EncoderChannel(AbstractEncoder): + """ + Wrapper around SerialEncoder to pull + out channel 2 as separate encoder. + + This MUST be added AFTER the parent SerialEncoder, + so that the parent encodere gets polled before + we attempt to call get_ticks() for this encoder channel. + """ + def __init__(self, encoder:SerialEncoder, channel:int) -> None: + self.encoder = encoder + self.channel = channel + super().__init__() + + def start_ticks(self): + if not self.encoder.running: + self.encoder.start_ticks() + + def stop_ticks(self): + if self.encoder.running: + self.encoder.stop_ticks() + + def poll_ticks(self, direction:int): + self.encoder.poll_ticks(direction) + + def get_ticks(self, encoder_index:int=0) -> int: + return self.encoder.get_ticks(encoder_index=self.channel) + + +class GpioEncoder(AbstractEncoder): + """ + An single-channel encoder read using an InputPin + :gpio_pin: InputPin that will get the signal + :debounce_ns: int number of nano seconds before accepting another + encoder signal. This is used to ignore bounces. + :debug: bool True to output extra logging + """ + def __init__(self, gpio_pin: InputPin, debounce_ns:int=0, debug=False): + # validate gpio_pin + if gpio_pin is None: + raise ValueError('The encoder input pin must be a valid InputPin.') + + self.debug = debug + self.counter = 0 + self._cb_counter = 0 + self.direction = 0 + self.pin = gpio_pin + self.debounce_ns:int = debounce_ns + self.debounce_time:int = 0 + if self.debounce_ns > 0: + logger.warn("GpioEncoder debounce_ns will be ignored.") + self.lock = threading.Lock() + + def _cb(self): + """ + Callback routine called by GPIO when a tick is detected + :pin_number: int the pin number that generated the interrupt. + :pin_state: int the state of the pin + """ + # + # we avoid blocking by updating an internal counter, + # then if we can get a lock, use the internal counter + # to update the public counter used by the application. + # + self._cb_counter += 1 + if self.lock.acquire(blocking=False): + try: + self.counter += self._cb_counter * self.direction + self._cb_counter = 0 + finally: + self.lock.release() + + def start_ticks(self): + # configure GPIO pin + self.pin.start(on_input=lambda: self._cb(), edge=PinEdge.RISING) + logger.info(f'GpioEncoder on InputPin "RPI_GPIO.{self.pin.pin_scheme_str}.{self.pin.pin_number}" started.') + + def poll_ticks(self, direction:int): + """ + read the encoder ticks + direction: 1 if forward, -1 if reverse, 0 if stopped. + return: updated encoder ticks + """ + with self.lock: + self.direction = direction + + def stop_ticks(self): + self.pin.stop() + logger.info(f'GpioEncoder on InputPin "RPI_GPIO.{self.pin.pin_scheme_str}.{self.pin.pin_number}" stopped.') + + def get_ticks(self, encoder_index:int=0) -> int: + """ + Get last polled encoder ticks + encoder_index: zero based index of encoder. + return: Most recently polled encoder ticks + + This will return the same value as the + most recent call to poll_ticks(). It + will not request new values from the encoder. + This will not block. + """ + with self.lock: + return self.counter if encoder_index == 0 else 0 + + +class MockEncoder(AbstractEncoder): + """ + A mock encoder that turns throttle values into ticks. + It generates ENCODER_PPR ticks per second at full throttle. + The run() method must be called at the same rate as the + tachometer calls the poll() method. + """ + def __init__(self, ticks_per_second: float): + self.ticks_per_second = ticks_per_second + self.throttle = 0 + self.ticks = 0 + self.remainder_ticks = 0 + self.timestamp = None + self.running = False + + def run(self, throttle:float, timestamp: int = None): + if timestamp is None: + timestamp = time.time() + + # + # poll() passed None for throttle and steering, + # so we use the last value passed to run() + # + if throttle is not None: + self.throttle = throttle + + def start_ticks(self): + self.running = True + + def stop_ticks(self): + self.running = False + + def poll_ticks(self, direction: int): + timestamp = time.time() + last_time = self.timestamp if self.timestamp is not None else timestamp + self.timestamp = timestamp + + if self.running: + delta_time = timestamp - last_time + delta_ticks = abs(self.throttle) * direction * self.ticks_per_second * delta_time + self.remainder_ticks + delta_int_ticks = int(delta_ticks) + self.ticks += delta_int_ticks + self.remainder_ticks = delta_ticks - delta_int_ticks + + def get_ticks(self, encoder_index: int = 0) -> int: + return self.ticks + + +class Tachometer: + """ + Tachometer converts encoder ticks to revolutions + and supports modifying direction based on + throttle input. + + Parameters + ---------- + encoder is an instance of an encoder class + derived from AbstactEncoder. + config is ticks per revolution, + input is throttle value (used to set direction), + output is current number of revolutions and timestamp + note: if you just want raw encoder output, use + ticks_per_revolution=1 + """ + + def __init__(self, + encoder:AbstractEncoder, + ticks_per_revolution:float=1, + direction_mode=EncoderMode.FORWARD_ONLY, + poll_delay_secs:float=0.01, + debug=False): + """ + Tachometer converts encoder ticks to revolutions + and supports modifying direction based on + throttle input. + + Parameters + ---------- + encoder: AbstractEncoder + an instance of an encoder class derived from AbstactEncoder. + ticks_per_revolution: int + The number of encoder ticks per wheel revolution. + If you just want raw encoder output, use ticks_per_revolution=1 + direction_mode: EncoderMode + Determines how revolutions count up or down based on throttle + poll_delay_secs: float + seconds between polling of encoder value; should be low or zero. + """ + + if encoder is None: + raise ValueError("encoder must be an instance of AbstractEncoder") + self.encoder = encoder + self.running:bool = False + self.ticks_per_revolution:float = ticks_per_revolution + self.direction_mode = direction_mode + self.ticks:int = 0 + self.direction:int = 1 # default to forward ticks + self.timestamp:float = 0 + self.throttle = 0.0 + self.debug = debug + self.poll_delay_secs = poll_delay_secs + self.encoder.start_ticks() + self.running = True + + # TODO: refactor tachometer so we don't pass throttle to poll() + def poll(self, throttle, timestamp): + """ + Parameters + ---------- + throttle : float + positive means forward + negative means backward + zero means stopped + timestamp: int, optional + the timestamp to apply to the tick reading + or None to use the current time + """ + + if self.running: + # if a timestamp if provided, use it + if timestamp is None: + timestamp = time.time() + + # set direction flag based on direction mode + if throttle is not None: + if EncoderMode.FORWARD_REVERSE == self.direction_mode: + # if throttle is zero, leave direction alone to model 'coasting' + if throttle != 0: + self.direction = sign(throttle) + elif EncoderMode.FORWARD_REVERSE_STOP == self.direction_mode: + self.direction = sign(throttle) + + lastTicks = self.ticks + self.timestamp = timestamp + self.encoder.poll_ticks(self.direction) + self.ticks = self.encoder.get_ticks() + if self.debug and self.ticks != lastTicks: + logger.info("tachometer: t = {}, r = {}, ts = {}".format(self.ticks, self.ticks / self.ticks_per_revolution, timestamp)) + + def update(self): + while(self.running): + self.poll(self.throttle, None) + time.sleep(self.poll_delay_secs) # give other threads time + + def run_threaded(self, throttle:float=0.0, timestamp:float=None) -> Tuple[float, float]: + """ + Parameters + ---------- + throttle : float + positive means forward + negative means backward + zero means stopped + timestamp: int, optional + the timestamp to apply to the tick reading + or None to use the current time + + Returns + ------- + Tuple + revolutions: float + cummulative revolutions of wheel attached to encoder + timestamp: float + the time the encoder ticks were read + """ + if self.running: + thisTimestamp = self.timestamp + thisRevolutions = self.ticks / self.ticks_per_revolution + + # update throttle for next poll() + if throttle is not None: + self.throttle = throttle + self.timestamp = timestamp if timestamp is not None else time.time() + + # return (revolutions, timestamp) + return thisRevolutions, thisTimestamp + return 0, self.timestamp + + def run(self, throttle:float=1.0, timestamp:float=None) -> Tuple[float, float]: + """ + throttle: sign of throttle is use used to determine direction. + timestamp: timestamp for update or None to use current time. + This is useful for creating deterministic tests. + """ + if self.running: + # update throttle for next poll() + self.throttle = throttle if throttle is not None else 0 + self.timestamp = timestamp if timestamp is not None else time.time() + self.poll(throttle, timestamp) + + # return (revolutions, timestamp) + return self.ticks / self.ticks_per_revolution, self.timestamp + return 0, self.timestamp + + def shutdown(self): + self.running = False + self.encoder.stop_ticks() + + +class InverseTachometer: + """ + Used by simulator: take distance and calculate revolutions + """ + def __init__(self, meters_per_revolution:float): + self.meters_per_revolution = meters_per_revolution + self.revolutions = 0.0 + self.timestamp = time.time() + + def run(self, distance:float, timestamp=None): + return self.run_threaded(distance, timestamp) + + def run_threaded(self, distance:float, timestamp=None): + # if a timestamp if provided, use it + if timestamp is None: + timestamp = time.time() + if is_number_type(distance): + self.timestamp = timestamp + self.revolutions = distance / self.meters_per_revolution + else: + logger.error("distance must be a float") + return self.revolutions, self.timestamp + +# TODO create MockThrottleEncoder that takes throttle and turns to ticks +# TODO create MockInverseEncoder that takes distance and turns to ticks +# TODO with those two we should be able to create mock pose estimation pipeline +# for the mock drivetrain and for the simulator respectively; +# The trick is feeding them the throttle or distance since they are +# not standard parts; perhaps we should make them parts. + + +if __name__ == "__main__": + import argparse + from threading import Thread + import sys + import time + from donkeycar.parts.pins import input_pin_by_id + + # parse arguments + parser = argparse.ArgumentParser() + parser.add_argument("-r", "--rate", type=float, default=20, + help = "Number of readings per second") + parser.add_argument("-n", "--number", type=int, default=40, + help = "Number of readings to collect") + parser.add_argument("-ppr", "--pulses-per-revolution", type=int, required=True, + help="Pulses per revolution of output shaft") + parser.add_argument("-d", "--direction-mode", type=int, default=1, + help="1=FORWARD_ONLY, 2=FORWARD_REVERSE, 3=FORWARD_REVERSE_STOP") + parser.add_argument("-s", "--serial-port", type=str, default=None, + help="serial-port to open, like '/dev/ttyACM0'") + parser.add_argument("-b", "--baud-rate", type=int, default=115200, + help="Serial port baud rate") + parser.add_argument("-e", "--encoder-index", type=int, default=0, + help="Serial encoder index (0 based) if more than one encoder") + parser.add_argument("-p", "--pin", type=str, default=None, + help="pin specifier for encoder InputPin, like 'RPI_GPIO.BCM.22'") # noqa + parser.add_argument("-dbc", "--debounce-ns", type=int, default=100, + help="debounce delay in nanoseconds for reading gpio pin") # noqa + parser.add_argument("-db", "--debug", action='store_true', help = "show debug output") + parser.add_argument("-t", "--threaded", action='store_true', help = "run in threaded mode") + + # Read arguments from command line + args = parser.parse_args() + + help = [] + if args.rate < 1: + help.append("-r/--rate: must be >= 1.") + + if args.number < 1: + help.append("-n/--number: must be >= 1.") + + if args.direction_mode < 1 and args.direction_mode > 3: + help.append("-d/--direction-mode must be 1, 2, or") + + if args.pulses_per_revolution <= 0: + help.append("-ppr/--pulses-per-revolution must be > 0") + + if args.serial_port is None and args.pin is None: + help.append("either -s/--serial_port or -p/--pin must be passed") + + if args.serial_port is not None and args.pin is not None: + help.append("only one of -s/--serial_port or -p/--pin must be passed") + + if args.serial_port is not None and len(args.serial_port) == 0: + help.append("-s/--serial-port not be empty if passed") + + if args.baud_rate <= 0: + help.append("-b/--baud-rate must be > 0") + + if args.pin is not None and args.pin == "": + help.append("-p/--pin must be non-empty if passed") + + if args.debounce_ns < 0: + help.append("-dbc/--debounce-ns must be greater than zero") + + if len(help) > 0: + parser.print_help() + for h in help: + print(" " + h) + sys.exit(1) + + update_thread = None + serial_port = None + tachometer = None + + try: + scan_count = 0 + seconds_per_scan = 1.0 / args.rate + scan_time = time.time() + seconds_per_scan + + # + # construct a tachometer part of the correct type + # + if args.serial_port is not None: + serial_port = SerialPort(args.serial_port, args.baud_rate) + tachometer = Tachometer( + encoder=EncoderChannel(SerialEncoder(serial_port=serial_port, debug=args.debug), args.encoder_index), + ticks_per_revolution=args.pulses_per_revolution, + direction_mode=args.direction_mode, + poll_delay_secs=1/(args.rate*2), + debug=args.debug) + if args.pin is not None: + tachometer = Tachometer( + encoder=GpioEncoder(gpio_pin=input_pin_by_id(args.pin), debounce_ns=args.debounce_ns, debug=args.debug), + ticks_per_revolution=args.pulses_per_revolution, + direction_mode=args.direction_mode, + debug=args.debug) + + # + # start the threaded part + # and a threaded window to show plot + # + if args.threaded: + update_thread = Thread(target=tachometer.update, args=()) + update_thread.start() + + while scan_count < args.number: + start_time = time.time() + + # emit the scan + scan_count += 1 + + # get most recent scan and plot it + if args.threaded: + measurements = tachometer.run_threaded() + else: + measurements = tachometer.run() + + print(measurements) + + # yield time to background threads + sleep_time = seconds_per_scan - (time.time() - start_time) + if sleep_time > 0.0: + time.sleep(sleep_time) + else: + time.sleep(0) # yield time to other threads + + except KeyboardInterrupt: + print('Stopping early.') + except Exception as e: + print(e) + exit(1) + finally: + if tachometer is not None: + tachometer.shutdown() + if update_thread is not None: + update_thread.join() # wait for thread to end diff --git a/donkeycar/parts/velocity.py b/donkeycar/parts/velocity.py new file mode 100644 index 000000000..7304aec59 --- /dev/null +++ b/donkeycar/parts/velocity.py @@ -0,0 +1,127 @@ +import math +from donkeycar.utils import map_frange, sign + +class VelocityNormalize: + """ + Normalize a velocity into to range 0..1.0 + given the measured minimum and maximum speeds. + @param min_speed: speed below which car stalls + @param max_speed: car's top speed (may be a target, not a limit) + @param min_normal_speed: the normalized throttle corresponding to min_speed + """ + def __init__(self, min_speed:float, max_speed:float, min_normal_speed:float=0.1) -> None: + self.min_speed = min_speed + self.max_speed = max_speed + self.min_normal_speed = min_normal_speed + + def run(self, speed:float) -> float: + s = sign(speed) + speed = abs(speed) + if speed < self.min_speed: + return 0.0 + if speed >= self.max_speed: + return s * 1.0 + return s * map_frange( + speed, + self.min_speed, self.max_speed, + self.min_normal_speed, 1.0) + + def shutdown(self): + pass + + +class VelocityUnnormalize: + """ + Map normalized speed (0 to 1) to actual speed + """ + def __init__(self, min_speed:float, max_speed:float, min_normal_speed:float=0.1) -> None: + self.min_speed = min_speed + self.max_speed = max_speed + self.min_normal_speed = min_normal_speed + + def run(self, speed:float) -> float: + s = sign(speed) + speed = abs(speed) + if speed < self.min_normal_speed: + return 0.0 + if speed >= 1.0: + return s * 1.0 + return s * map_frange( + speed, + self.min_normal_speed, 1.0, + self.min_speed, self.max_speed) + + def shutdown(self): + pass + + +class StepSpeedController: + """ + Simplistic constant step controller. + Just increases speed if we are too slow + or decreases speed if we are too fast. + Includes feed-forward when reversing direction + or starting from stopped. + """ + def __init__(self, min_speed:float, max_speed:float, throttle_step:float=1/255, min_throttle:float=0) -> None: + """ + @param min_speed is speed below which vehicle stalls (so slowest stable working speed) + @param max_speed is speed at maximum throttle + @param throttle_steps is number of steps in working range of throttle (min_throttle to 1.0) + @param min_throttle is throttle that corresponds to min_speed; the throttle below which the vehicle stalls. + """ + self.min_speed = min_speed + self.max_speed = max_speed + self.min_throttle = min_throttle + self.step_size = throttle_step + + def run(self, throttle:float, speed:float, target_speed:float) -> float: + """ + Given current throttle and speed and a target speed, + calculate a new throttle to attain target speed + @param throttle is current throttle (-1 to 1) + @param speed is current speed where reverse speeds are negative + @param throttle_steps number of steps between min_throttle and max_throttle=1.0 + @param min_throttle is throttle that corresponds to min_speed + max_throttle is assumed to be 1.0 + """ + if speed is None or target_speed is None: + # no speed to control, just return throttle + return throttle + + target_direction = sign(target_speed) + direction = sign(speed) + + target_speed = abs(target_speed) + speed = abs(speed) + + # + # treat speed below minimum as zero + # + if target_speed < self.min_speed: + return 0 + + # + # when changing direction or starting from stopped, + # calculate a feed-forward throttle estimate + # so we jump into a working range quickly + # + if direction != target_direction: + # if we are going too fast to just reverse, then slow down first + if speed > self.min_speed: + return 0 + + # calculate first estimate of throttle to achieve target speed + return target_direction * map_frange(target_speed, self.min_speed, self.max_speed, self.min_throttle, 1.0) + + # + # modify throttle + # + if speed > target_speed: + # too fast, slow down + throttle -= self.step_size + elif speed > target_speed: + # too slow, speed up + throttle += self.step_size + + return target_direction * throttle diff --git a/donkeycar/templates/complete.py b/donkeycar/templates/complete.py index 8001ab995..facffd3ff 100644 --- a/donkeycar/templates/complete.py +++ b/donkeycar/templates/complete.py @@ -20,6 +20,7 @@ # import cv2 early to avoid issue with importing after tensorflow # see https://github.com/opencv/opencv/issues/14884#issuecomment-599852128 # + try: import cv2 except: @@ -35,8 +36,12 @@ from donkeycar.parts.behavior import BehaviorPart from donkeycar.parts.file_watcher import FileWatcher from donkeycar.parts.launch import AiLaunch +from donkeycar.parts.kinematics import NormalizeSteeringAngle, UnnormalizeSteeringAngle, TwoWheelSteeringThrottle +from donkeycar.parts.kinematics import Unicycle, InverseUnicycle, UnicycleUnnormalizeAngularVelocity +from donkeycar.parts.kinematics import Bicycle, InverseBicycle, BicycleUnnormalizeAngularVelocity from donkeycar.parts.explode import ExplodeDict from donkeycar.parts.transform import Lambda +from donkeycar.parts.logger import LoggerPart from donkeycar.utils import * logger = logging.getLogger(__name__) @@ -68,10 +73,10 @@ def drive(cfg, model_path=None, use_joystick=False, model_type=None, else: model_type = cfg.DEFAULT_MODEL_TYPE - #Initialize car + # Initialize car V = dk.vehicle.Vehicle() - #Initialize logging before anything else to allow console logging + # Initialize logging before anything else to allow console logging if cfg.HAVE_CONSOLE_LOGGING: logger.setLevel(logging.getLevelName(cfg.LOGGING_LEVEL)) ch = logging.StreamHandler() @@ -117,7 +122,8 @@ def drive(cfg, model_path=None, use_joystick=False, model_type=None, if cfg.SHOW_FPS: from donkeycar.parts.fps import FrequencyLogger - V.add(FrequencyLogger(cfg.FPS_DEBUG_INTERVAL), outputs=["fps/current", "fps/fps_list"]) + V.add(FrequencyLogger(cfg.FPS_DEBUG_INTERVAL), + outputs=["fps/current", "fps/fps_list"]) # # add the user input controller(s) @@ -128,12 +134,12 @@ def drive(cfg, model_path=None, use_joystick=False, model_type=None, ctr = add_user_controller(V, cfg, use_joystick) # - # explode the buttons into their own key/values in memory + # explode the buttons input map into individual output key/values in memory # V.add(ExplodeDict(V.mem, "web/"), inputs=['web/buttons']) # - # adding a button handler is just adding a part with a run_condition + # For example: adding a button handler is just adding a part with a run_condition # set to the button's name, so it runs when button is pressed. # V.add(Lambda(lambda v: print(f"web/w1 clicked")), inputs=["web/w1"], run_condition="web/w1") @@ -308,7 +314,6 @@ def load_model_json(kl, json_fnm): # for the configured model format # model_reload_cb = None - if '.h5' in model_path or '.trt' in model_path or '.tflite' in \ model_path or '.savedmodel' in model_path or '.pth': # load the whole model with weigths, etc @@ -387,7 +392,7 @@ def run(self, *components): # # collect model inference outputs # - outputs = ['pilot/angle', 'pilot/throttle'] + outputs = ['pilot/steering', 'pilot/throttle'] if cfg.TRAIN_LOCALIZER: outputs.append("pilot/loc") @@ -435,25 +440,25 @@ def run(self, *components): # Choose what inputs should change the car. class DriveMode: def run(self, mode, - user_angle, user_throttle, - pilot_angle, pilot_throttle): + user_steering, user_throttle, + pilot_steering, pilot_throttle): if mode == 'user': - return user_angle, user_throttle + return user_steering, user_throttle elif mode == 'local_angle': - return pilot_angle if pilot_angle else 0.0, user_throttle + return pilot_steering if pilot_steering else 0.0, user_throttle else: - return pilot_angle if pilot_angle else 0.0, \ + return pilot_steering if pilot_steering else 0.0, \ pilot_throttle * cfg.AI_THROTTLE_MULT \ if pilot_throttle else 0.0 V.add(DriveMode(), - inputs=['user/mode', 'user/angle', 'user/throttle', - 'pilot/angle', 'pilot/throttle'], - outputs=['angle', 'throttle']) - + inputs=['user/mode', 'user/steering', 'user/throttle', + 'pilot/steering', 'pilot/throttle'], + outputs=['steering', 'throttle']) + V.add(LoggerPart(['steering', 'throttle']), inputs=['steering', 'throttle']) if (cfg.CONTROLLER_TYPE != "pigpio_rc") and (cfg.CONTROLLER_TYPE != "MM1"): if isinstance(ctr, JoystickController): @@ -488,7 +493,6 @@ def run(self, mode, recording): # add_drivetrain(V, cfg) - # OLED setup if cfg.USE_SSD1306_128_32: from donkeycar.parts.oled import OLEDPart @@ -499,10 +503,10 @@ def run(self, mode, recording): # add tub to save data if cfg.USE_LIDAR: - inputs = ['cam/image_array', 'lidar/dist_array', 'user/angle', 'user/throttle', 'user/mode'] + inputs = ['cam/image_array', 'lidar/dist_array', 'user/steering', 'user/throttle', 'user/mode'] types = ['image_array', 'nparray','float', 'float', 'str'] else: - inputs=['cam/image_array','user/angle', 'user/throttle', 'user/mode'] + inputs=['cam/image_array','user/steering', 'user/throttle', 'user/mode'] types=['image_array','float', 'float','str'] if cfg.HAVE_ODOM: @@ -540,7 +544,7 @@ def run(self, mode, recording): types += ['nparray'] if cfg.RECORD_DURING_AI: - inputs += ['pilot/angle', 'pilot/throttle'] + inputs += ['pilot/steering', 'pilot/throttle'] types += ['float', 'float'] if cfg.HAVE_PERFMON: @@ -604,7 +608,7 @@ def add_user_controller(V, cfg, use_joystick, input_image='cam/image_array'): ctr = LocalWebController(port=cfg.WEB_CONTROL_PORT, mode=cfg.WEB_INIT_MODE) V.add(ctr, inputs=[input_image, 'tub/num_records', 'user/mode', 'recording'], - outputs=['user/angle', 'user/throttle', 'user/mode', 'recording', 'web/buttons'], + outputs=['user/steering', 'user/throttle', 'user/mode', 'recording', 'web/buttons'], threaded=True) # @@ -620,7 +624,7 @@ def add_user_controller(V, cfg, use_joystick, input_image='cam/image_array'): V.add( ctr, inputs=['user/mode', 'recording'], - outputs=['user/angle', 'user/throttle', + outputs=['user/steering', 'user/throttle', 'user/mode', 'recording'], threaded=False) else: @@ -657,7 +661,7 @@ def add_user_controller(V, cfg, use_joystick, input_image='cam/image_array'): V.add( ctr, inputs=[input_image, 'user/mode', 'recording'], - outputs=['user/angle', 'user/throttle', + outputs=['user/steering', 'user/throttle', 'user/mode', 'recording'], threaded=True) return ctr @@ -675,7 +679,7 @@ def add_simulator(V, cfg): # record_distance=cfg.SIM_RECORD_DISTANCE, record_orientation=cfg.SIM_RECORD_ORIENTATION, delay=cfg.SIM_ARTIFICIAL_LATENCY) threaded = True - inputs = ['angle', 'throttle'] + inputs = ['steering', 'throttle'] outputs = ['cam/image_array'] if cfg.SIM_RECORD_LOCATION: @@ -785,7 +789,7 @@ def add_camera(V, cfg, camera_type): V.add(cam, inputs=inputs, outputs=outputs, threaded=threaded) -def add_odometry(V, cfg): +def add_odometry(V, cfg, threaded=True): """ If the configuration support odometry, then add encoders, odometry and kinematics to the vehicle pipeline @@ -793,17 +797,18 @@ def add_odometry(V, cfg): On output this may be modified. :param cfg: the configuration (from myconfig.py) """ + from donkeycar.parts.pose import BicyclePose, UnicyclePose + if cfg.HAVE_ODOM: - if cfg.ENCODER_TYPE == "GPIO": - from donkeycar.parts.encoder import RotaryEncoder - enc = RotaryEncoder(mm_per_tick=cfg.MM_PER_TICK, pin=cfg.ODOM_PIN, poll_delay=1.0/(cfg.DRIVE_LOOP_HZ*3), debug=cfg.ODOM_DEBUG) - V.add(enc, inputs=['throttle'], outputs=['enc/speed'], threaded=True) - elif cfg.ENCODER_TYPE == "arduino": - from donkeycar.parts.encoder import ArduinoEncoder - enc = ArduinoEncoder(mm_per_tick=cfg.MM_PER_TICK, debug=cfg.ODOM_DEBUG) - V.add(enc, outputs=['enc/speed'], threaded=True) - else: - print("No supported encoder found") + poll_delay_secs = 0.01 # pose estimation runs at 100hz + kinematics = UnicyclePose(cfg, poll_delay_secs) if cfg.HAVE_ODOM_2 else BicyclePose(cfg, poll_delay_secs) + V.add(kinematics, + inputs = ["throttle", "steering", None], + outputs = ['enc/distance', 'enc/speed', 'pos/x', 'pos/y', + 'pos/angle', 'vel/x', 'vel/y', 'vel/angle', + 'nul/timestamp'], + threaded = threaded) + # # IMU setup @@ -836,7 +841,7 @@ def add_drivetrain(V, cfg): is_differential_drive = cfg.DRIVE_TRAIN_TYPE.startswith("DC_TWO_WHEEL") if is_differential_drive: V.add(TwoWheelSteeringThrottle(), - inputs=['throttle', 'angle'], + inputs=['throttle', 'steering'], outputs=['left/throttle', 'right/throttle']) if cfg.DRIVE_TRAIN_TYPE == "PWM_STEERING_THROTTLE": @@ -864,7 +869,7 @@ def add_drivetrain(V, cfg): max_pulse=dt['THROTTLE_FORWARD_PWM'], zero_pulse=dt['THROTTLE_STOPPED_PWM'], min_pulse=dt['THROTTLE_REVERSE_PWM']) - V.add(steering, inputs=['angle'], threaded=True) + V.add(steering, inputs=['steering'], threaded=True) V.add(throttle, inputs=['throttle'], threaded=True) elif cfg.DRIVE_TRAIN_TYPE == "I2C_SERVO": @@ -885,7 +890,7 @@ def add_drivetrain(V, cfg): zero_pulse=cfg.THROTTLE_STOPPED_PWM, min_pulse=cfg.THROTTLE_REVERSE_PWM) - V.add(steering, inputs=['angle'], threaded=True) + V.add(steering, inputs=['steering'], threaded=True) V.add(throttle, inputs=['throttle'], threaded=True) elif cfg.DRIVE_TRAIN_TYPE == "DC_STEER_THROTTLE": @@ -897,7 +902,7 @@ def add_drivetrain(V, cfg): pins.pwm_pin_by_id(dt['FWD_DUTY_PIN']), pins.pwm_pin_by_id(dt['BWD_DUTY_PIN'])) - V.add(steering, inputs=['angle']) + V.add(steering, inputs=['steering']) V.add(throttle, inputs=['throttle']) elif cfg.DRIVE_TRAIN_TYPE == "DC_TWO_WHEEL": @@ -945,7 +950,7 @@ def add_drivetrain(V, cfg): pins.pwm_pin_by_id(dt['FWD_DUTY_PIN']), pins.pwm_pin_by_id(dt['BWD_DUTY_PIN'])) - V.add(steering, inputs=['angle'], threaded=True) + V.add(steering, inputs=['steering'], threaded=True) V.add(motor, inputs=["throttle"]) elif cfg.DRIVE_TRAIN_TYPE == "SERVO_HBRIDGE_3PIN": @@ -968,7 +973,7 @@ def add_drivetrain(V, cfg): pins.output_pin_by_id(dt['BWD_PIN']), pins.pwm_pin_by_id(dt['DUTY_PIN'])) - V.add(steering, inputs=['angle'], threaded=True) + V.add(steering, inputs=['steering'], threaded=True) V.add(motor, inputs=["throttle"]) elif cfg.DRIVE_TRAIN_TYPE == "SERVO_HBRIDGE_PWM": @@ -988,12 +993,12 @@ def add_drivetrain(V, cfg): from donkeycar.parts.actuator import Mini_HBridge_DC_Motor_PWM motor = Mini_HBridge_DC_Motor_PWM(cfg.HBRIDGE_PIN_FWD, cfg.HBRIDGE_PIN_BWD) - V.add(steering, inputs=['angle'], threaded=True) + V.add(steering, inputs=['steering'], threaded=True) V.add(motor, inputs=["throttle"]) elif cfg.DRIVE_TRAIN_TYPE == "MM1": from donkeycar.parts.robohat import RoboHATDriver - V.add(RoboHATDriver(cfg), inputs=['angle', 'throttle']) + V.add(RoboHATDriver(cfg), inputs=['steering', 'throttle']) elif cfg.DRIVE_TRAIN_TYPE == "PIGPIO_PWM": # @@ -1013,7 +1018,7 @@ def add_drivetrain(V, cfg): max_pulse=cfg.THROTTLE_FORWARD_PWM, zero_pulse=cfg.THROTTLE_STOPPED_PWM, min_pulse=cfg.THROTTLE_REVERSE_PWM) - V.add(steering, inputs=['angle'], threaded=True) + V.add(steering, inputs=['steering'], threaded=True) V.add(throttle, inputs=['throttle'], threaded=True) elif cfg.DRIVE_TRAIN_TYPE == "VESC": @@ -1028,7 +1033,7 @@ def add_drivetrain(V, cfg): cfg.VESC_STEERING_SCALE, cfg.VESC_STEERING_OFFSET ) - V.add(vesc, inputs=['angle', 'throttle']) + V.add(vesc, inputs=['steering', 'throttle']) if __name__ == '__main__': diff --git a/donkeycar/templates/path_follow.py b/donkeycar/templates/path_follow.py index dea0e4429..8066feec6 100644 --- a/donkeycar/templates/path_follow.py +++ b/donkeycar/templates/path_follow.py @@ -214,17 +214,20 @@ def run(self, mode): path = CsvThrottlePath(min_dist=cfg.PATH_MIN_DIST) V.add(path, inputs=['recording', 'pos/x', 'pos/y', 'user/throttle'], outputs=['path', 'throttles']) - if cfg.DONKEY_GYM: - lpos = LoggerPart(inputs=['dist/left', 'dist/right', 'dist', 'pos/pos_x', 'pos/pos_y', 'yaw'], level="INFO", logger="simulator") - V.add(lpos, inputs=lpos.inputs) - if cfg.HAVE_ODOM: - if cfg.HAVE_ODOM_2: - lpos = LoggerPart(inputs=['enc/left/distance', 'enc/right/distance', 'enc/left/timestamp', 'enc/right/timestamp'], level="INFO", logger="odometer") - V.add(lpos, inputs=lpos.inputs) - lpos = LoggerPart(inputs=['enc/distance', 'enc/timestamp'], level="INFO", logger="odometer") - V.add(lpos, inputs=lpos.inputs) - lpos = LoggerPart(inputs=['pos/x', 'pos/y', 'pos/angle'], level="INFO", logger="kinematics") - V.add(lpos, inputs=lpos.inputs) + # + # log pose + # + # if cfg.DONKEY_GYM: + # lpos = LoggerPart(inputs=['dist/left', 'dist/right', 'dist', 'pos/pos_x', 'pos/pos_y', 'yaw'], level="INFO", logger="simulator") + # V.add(lpos, inputs=lpos.inputs) + # if cfg.HAVE_ODOM: + # if cfg.HAVE_ODOM_2: + # lpos = LoggerPart(inputs=['enc/left/distance', 'enc/right/distance', 'enc/left/timestamp', 'enc/right/timestamp'], level="INFO", logger="odometer") + # # V.add(lpos, inputs=lpos.inputs) + # lpos = LoggerPart(inputs=['enc/distance', 'enc/timestamp'], level="INFO", logger="odometer") + # V.add(lpos, inputs=lpos.inputs) + # lpos = LoggerPart(inputs=['pos/x', 'pos/y', 'pos/steering'], level="INFO", logger="kinematics") + # V.add(lpos, inputs=lpos.inputs) def save_path(): if path.length() > 0: @@ -291,7 +294,7 @@ def reset_origin(): # This will use the cross track error and PID constants to try to steer back towards the path. pid = PIDController(p=cfg.PID_P, i=cfg.PID_I, d=cfg.PID_D) pilot = PID_Pilot(pid, cfg.PID_THROTTLE, cfg.USE_CONSTANT_THROTTLE, min_throttle=cfg.PID_THROTTLE) - V.add(pilot, inputs=['cte/error', 'throttles', 'cte/closest_pt'], outputs=['pilot/angle', 'pilot/throttle'], run_condition="run_pilot") + V.add(pilot, inputs=['cte/error', 'throttles', 'cte/closest_pt'], outputs=['pilot/steering', 'pilot/throttle'], run_condition="run_pilot") def dec_pid_d(): pid.Kd -= cfg.PID_D_DELTA @@ -440,19 +443,21 @@ def run(self, mode:str, recording:bool): #Choose what inputs should change the car. class DriveMode: def run(self, mode, - user_angle, user_throttle, - pilot_angle, pilot_throttle): + user_steering, user_throttle, + pilot_steering, pilot_throttle): if mode == 'user': - return user_angle, user_throttle + return user_steering, user_throttle elif mode == 'local_angle': - return pilot_angle, user_throttle + return pilot_steering, user_throttle else: - return pilot_angle, pilot_throttle + return pilot_steering, pilot_throttle V.add(DriveMode(), - inputs=['user/mode', 'user/angle', 'user/throttle', - 'pilot/angle', 'pilot/throttle'], - outputs=['angle', 'throttle']) + inputs=['user/mode', 'user/steering', 'user/throttle', + 'pilot/steering', 'pilot/throttle'], + outputs=['steering', 'throttle']) + + # V.add(LoggerPart(['user/mode', 'steering', 'throttle'], logger="drivemode"), inputs=['user/mode', 'steering', 'throttle']) # # To make differential drive steer, @@ -460,7 +465,7 @@ def run(self, mode, # if is_differential_drive: V.add(TwoWheelSteeringThrottle(), - inputs=['throttle', 'angle'], + inputs=['throttle', 'steering'], outputs=['left/throttle', 'right/throttle']) # diff --git a/donkeycar/tests/test_circular_buffer.py b/donkeycar/tests/test_circular_buffer.py new file mode 100644 index 000000000..b04705483 --- /dev/null +++ b/donkeycar/tests/test_circular_buffer.py @@ -0,0 +1,124 @@ +import unittest +import pytest + +from donkeycar.utilities.circular_buffer import CircularBuffer + +class TestCircularBuffer(unittest.TestCase): + + def test_circular_buffer_queue(self): + """ + enqueue items to head + dequeue items from tail + """ + queue:CircularBuffer = CircularBuffer(3, defaultValue="out-of-range") + self.assertEqual(3, queue.capacity) + self.assertEqual(0, queue.count) + + queue.enqueue(0) + self.assertEqual(1, queue.count) + self.assertEqual(0, queue.head()) + self.assertEqual(0, queue.tail()) + + queue.enqueue(1) + self.assertEqual(2, queue.count) + self.assertEqual(1, queue.head()) + self.assertEqual(0, queue.tail()) + + queue.enqueue(2) + self.assertEqual(3, queue.count) + self.assertEqual(2, queue.head()) + self.assertEqual(0, queue.tail()) + + queue.enqueue(3) + self.assertEqual(3, queue.count) + self.assertEqual(3, queue.head()) + self.assertEqual(1, queue.tail()) + + self.assertEqual(1, queue.dequeue()) + self.assertEqual(2, queue.count) + self.assertEqual(3, queue.head()) + self.assertEqual(2, queue.tail()) + + self.assertEqual(2, queue.dequeue()) + self.assertEqual(1, queue.count) + self.assertEqual(3, queue.head()) + self.assertEqual(3, queue.tail()) + + self.assertEqual(3, queue.dequeue()) + self.assertEqual(0, queue.count) + self.assertEqual("out-of-range", queue.head()) + self.assertEqual("out-of-range", queue.tail()) + + self.assertEqual("out-of-range", queue.dequeue()) + + def test_circular_buffer_stack(self): + """ + push items to head + pop items from head + """ + stack:CircularBuffer = CircularBuffer(2, defaultValue="out-of-range") + self.assertEqual(2, stack.capacity) + self.assertEqual(0, stack.count) + self.assertEqual("out-of-range", stack.pop()) + + stack.push(0) + self.assertEqual(1, stack.count) + self.assertEqual(0, stack.head()) + self.assertEqual(0, stack.tail()) + + stack.push(1) + self.assertEqual(2, stack.count) + self.assertEqual(1, stack.head()) + self.assertEqual(0, stack.tail()) + + # pushing onto a full stack raises exception + try: + stack.push(2) + self.fail("should have thrown exception") + except IndexError: + # nothing should have changed + self.assertEqual(2, stack.count) + self.assertEqual(1, stack.head()) + self.assertEqual(0, stack.tail()) + + self.assertEqual(1, stack.pop()) + self.assertEqual(1, stack.count) + self.assertEqual(0, stack.head()) + self.assertEqual(0, stack.tail()) + + self.assertEqual(0, stack.pop()) + self.assertEqual(0, stack.count) + self.assertEqual("out-of-range", stack.head()) + self.assertEqual("out-of-range", stack.tail()) + + # popping from empty stack returns default + self.assertEqual("out-of-range", stack.pop()) + + def test_circular_buffer_array(self): + """ + append items to tail + set/get items by index + """ + array:CircularBuffer = CircularBuffer(2, defaultValue="out-of-range") + self.assertEqual(2, array.capacity) + self.assertEqual(0, array.count) + self.assertEqual("out-of-range", array.get(0)) + + array.append(0) + self.assertEqual(1, array.count) + self.assertEqual(0, array.head()) + self.assertEqual(0, array.tail()) + self.assertEqual(0, array.get(0)) + self.assertEqual("out-of-range", array.get(1)) + + array.append(1) + self.assertEqual(2, array.count) + self.assertEqual(0, array.head()) + self.assertEqual(1, array.tail()) + self.assertEqual(0, array.get(0)) + self.assertEqual(1, array.get(1)) + self.assertEqual("out-of-range", array.get(2)) + + for i in range(array.count): + array.set(i, array.count-i) + self.assertEqual(array.count-i, array.get(i)) diff --git a/donkeycar/tests/test_kinematics.py b/donkeycar/tests/test_kinematics.py index 519c5b4c0..4875748c4 100644 --- a/donkeycar/tests/test_kinematics.py +++ b/donkeycar/tests/test_kinematics.py @@ -2,7 +2,465 @@ import time import unittest +from donkeycar.parts.kinematics import Bicycle, InverseBicycle, InverseUnicycle, bicycle_angular_velocity, limit_angle +from donkeycar.parts.kinematics import BicycleNormalizeAngularVelocity, BicycleUnnormalizeAngularVelocity +from donkeycar.parts.kinematics import Unicycle, unicycle_angular_velocity, unicycle_max_angular_velocity +from donkeycar.parts.kinematics import UnicycleNormalizeAngularVelocity, UnicycleUnnormalizeAngularVelocity +from donkeycar.parts.kinematics import NormalizeSteeringAngle, UnnormalizeSteeringAngle from donkeycar.parts.kinematics import differential_steering +from donkeycar.utils import sign + +# +# python -m unittest donkeycar/tests/test_kinematics.py +# +class TestBicycle(unittest.TestCase): + + def test_forward(self): + bicycle = Bicycle(1) + + bicycle.run(0, 0, 1) + self.assertEqual(0, bicycle.forward_distance) + self.assertEqual(0, bicycle.forward_velocity) + self.assertEqual(1, bicycle.timestamp) + self.assertEqual(0, bicycle.pose.x) + self.assertEqual(0, bicycle.pose.y) + self.assertEqual(0, bicycle.pose.angle) + self.assertEqual(0, bicycle.pose_velocity.x) + self.assertEqual(0, bicycle.pose_velocity.y) + self.assertEqual(0, bicycle.pose_velocity.angle) + + bicycle.run(1, 0, 2) + self.assertEqual(1, bicycle.forward_distance) + self.assertEqual(1, bicycle.forward_velocity) + self.assertEqual(2, bicycle.timestamp) + self.assertEqual(1, bicycle.pose.x) + self.assertEqual(0, bicycle.pose.y) + self.assertEqual(0, bicycle.pose.angle) + self.assertEqual(1, bicycle.pose_velocity.x) + self.assertEqual(0, bicycle.pose_velocity.y) + self.assertEqual(0, bicycle.pose_velocity.angle) + + bicycle.run(10, 0, 3) + self.assertEqual(10, bicycle.forward_distance) + self.assertEqual(9, bicycle.forward_velocity) + self.assertEqual(3, bicycle.timestamp) + self.assertEqual(10, bicycle.pose.x) + self.assertEqual(0, bicycle.pose.y) + self.assertEqual(0, bicycle.pose.angle) + self.assertEqual(9, bicycle.pose_velocity.x) + self.assertEqual(0, bicycle.pose_velocity.y) + self.assertEqual(0, bicycle.pose_velocity.angle) + + def test_reverse(self): + bicycle = Bicycle(1) + + bicycle.run(0, 0, 1) + self.assertEqual(0, bicycle.forward_distance) + self.assertEqual(0, bicycle.forward_velocity) + self.assertEqual(1, bicycle.timestamp) + self.assertEqual(0, bicycle.pose.x) + self.assertEqual(0, bicycle.pose.y) + self.assertEqual(0, bicycle.pose.angle) + self.assertEqual(0, bicycle.pose_velocity.x) + self.assertEqual(0, bicycle.pose_velocity.y) + self.assertEqual(0, bicycle.pose_velocity.angle) + + bicycle.run(-1, 0, 2) + self.assertEqual(-1, bicycle.forward_distance) + self.assertEqual(-1, bicycle.forward_velocity) + self.assertEqual(2, bicycle.timestamp) + self.assertEqual(-1, bicycle.pose.x) + self.assertEqual(0, bicycle.pose.y) + self.assertEqual(0, bicycle.pose.angle) + self.assertEqual(-1, bicycle.pose_velocity.x) + self.assertEqual(0, bicycle.pose_velocity.y) + self.assertEqual(0, bicycle.pose_velocity.angle) + + bicycle.run(-10, 0, 3) + self.assertEqual(-10, bicycle.forward_distance) + self.assertEqual(-9, bicycle.forward_velocity) + self.assertEqual(3, bicycle.timestamp) + self.assertEqual(-10, bicycle.pose.x) + self.assertEqual(0, bicycle.pose.y) + self.assertEqual(0, bicycle.pose.angle) + self.assertEqual(-9, bicycle.pose_velocity.x) + self.assertEqual(0, bicycle.pose_velocity.y) + self.assertEqual(0, bicycle.pose_velocity.angle) + + def test_forward_reverse(self): + bicycle = Bicycle(1) + + bicycle.run(0, 0, 1) + self.assertEqual(0, bicycle.forward_distance) + self.assertEqual(0, bicycle.forward_velocity) + self.assertEqual(1, bicycle.timestamp) + self.assertEqual(0, bicycle.pose.x) + self.assertEqual(0, bicycle.pose.y) + self.assertEqual(0, bicycle.pose.angle) + self.assertEqual(0, bicycle.pose_velocity.x) + self.assertEqual(0, bicycle.pose_velocity.y) + self.assertEqual(0, bicycle.pose_velocity.angle) + + bicycle.run(1, 0, 2) + self.assertEqual(1, bicycle.forward_distance) + self.assertEqual(1, bicycle.forward_velocity) + self.assertEqual(2, bicycle.timestamp) + self.assertEqual(1, bicycle.pose.x) + self.assertEqual(0, bicycle.pose.y) + self.assertEqual(0, bicycle.pose.angle) + self.assertEqual(1, bicycle.pose_velocity.x) + self.assertEqual(0, bicycle.pose_velocity.y) + self.assertEqual(0, bicycle.pose_velocity.angle) + + bicycle.run(-1, 0, 3) + self.assertEqual(-1, bicycle.forward_distance) + self.assertEqual(-2, bicycle.forward_velocity) + self.assertEqual(3, bicycle.timestamp) + self.assertEqual(-1, bicycle.pose.x) + self.assertEqual(0, bicycle.pose.y) + self.assertEqual(0, bicycle.pose.angle) + self.assertEqual(-2, bicycle.pose_velocity.x) + self.assertEqual(0, bicycle.pose_velocity.y) + self.assertEqual(0, bicycle.pose_velocity.angle) + + def do_90degree_turn(self, steering_angle): + turn_direction = sign(steering_angle) + steering_angle = limit_angle(steering_angle) + + wheel_base = 0.5 + bicycle = Bicycle(wheel_base) # wheel base is 1/2 meter + + + # + # turn an 1/4 of circle while wheel is turned 22.5 degrees. + # do this at 20 hertz, like the vehicle loop + # + # calculate length of circle's perimeter + # see [Rear-axle reference point](https://thef1clan.com/2020/09/21/vehicle-dynamics-the-kinematic-bicycle-model/) + # + R = abs(wheel_base / math.sin(steering_angle)) # Radius of turn (radius to Instantaneous Center of Rotation) + C = 2 * math.pi * R # Circumference of turn + + turn_distance = C / 4 # drive the 90 degrees of perimeter + turn_steps = math.floor(turn_distance * 200) # 200 odometry readings per meter + steps_per_second = 20 + time_per_step = 1 / steps_per_second # 20 odometry readings per second + print() + print("X,Y,A") + for i in range(turn_steps): + distance, velocity, pose_x, pose_y, pose_angle, pose_velocity_x, pose_velocity_y, pose_velocity_angle, timestamp = bicycle.run(turn_distance * (i+1) / turn_steps, steering_angle, 1 + time_per_step * (i + 1)) + print(pose_x, pose_y, pose_angle, sep=",") + self.assertAlmostEqual(turn_distance, bicycle.forward_distance, 3) + self.assertAlmostEqual(1 + turn_steps * time_per_step, bicycle.timestamp, 3) + self.assertLessEqual((R - pose_x) / R, 0.005) # final error less than 0.5% + self.assertLessEqual((turn_direction * R - pose_y) / R, 0.005) # final error less than 0.5% + self.assertLessEqual(((turn_direction * math.pi / 2) - pose_angle) / (math.pi / 2), 0.005) + + + def test_turn_left(self): + # 22.5 degrees steering angle + self.do_90degree_turn(steering_angle = math.pi / 8) + + def test_turn_right(self): + # 22.5 degrees steering angle + self.do_90degree_turn(steering_angle = -math.pi / 8) + + +class TestUnicycle(unittest.TestCase): + + def test_forward(self): + unicycle = Unicycle(1) + + unicycle.run(0, 0, 1) + self.assertEqual(0, unicycle.left_distance) + self.assertEqual(0, unicycle.right_distance) + self.assertEqual(1, unicycle.timestamp) + self.assertEqual(0, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(0, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + unicycle.run(1, 1, 2) + self.assertEqual(1, unicycle.left_distance) + self.assertEqual(1, unicycle.right_distance) + self.assertEqual(2, unicycle.timestamp) + self.assertEqual(1, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(1, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + unicycle.run(10, 10, 3) + self.assertEqual(10, unicycle.left_distance) + self.assertEqual(10, unicycle.right_distance) + self.assertEqual(3, unicycle.timestamp) + self.assertEqual(10, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(9, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + def test_reverse(self): + unicycle = Unicycle(1) + + unicycle.run(0, 0, 1) + self.assertEqual(0, unicycle.left_distance) + self.assertEqual(0, unicycle.right_distance) + self.assertEqual(1, unicycle.timestamp) + self.assertEqual(0, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(0, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + unicycle.run(-1, -1, 2) + self.assertEqual(-1, unicycle.left_distance) + self.assertEqual(-1, unicycle.right_distance) + self.assertEqual(2, unicycle.timestamp) + self.assertEqual(-1, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(-1, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + unicycle.run(-10, -10, 3) + self.assertEqual(-10, unicycle.left_distance) + self.assertEqual(-10, unicycle.right_distance) + self.assertEqual(3, unicycle.timestamp) + self.assertEqual(-10, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(-9, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + + def test_forward_reverse(self): + unicycle = Unicycle(1) + + unicycle.run(0, 0, 1) + self.assertEqual(0, unicycle.left_distance) + self.assertEqual(0, unicycle.right_distance) + self.assertEqual(1, unicycle.timestamp) + self.assertEqual(0, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(0, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + unicycle.run(1, 1, 2) + self.assertEqual(1, unicycle.left_distance) + self.assertEqual(1, unicycle.right_distance) + self.assertEqual(2, unicycle.timestamp) + self.assertEqual(1, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(1, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + unicycle.run(-1, -1, 3) + self.assertEqual(-1, unicycle.left_distance) + self.assertEqual(-1, unicycle.right_distance) + self.assertEqual(3, unicycle.timestamp) + self.assertEqual(-1, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(-2, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + def test_turn_left(self): + unicycle = Unicycle(1) + + unicycle.run(0, 0, 1) + self.assertEqual(0, unicycle.left_distance) + self.assertEqual(0, unicycle.right_distance) + self.assertEqual(1, unicycle.timestamp) + self.assertEqual(0, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(0, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + # + # turn left 90 degrees, pivoting on left wheel. + # pivot on a non-moving wheel creates a circle + # with a radius equal to the axle-length. + # do this at 20 hertz, like the vehicle loop + # + turn_distance = math.pi / 2 + turn_steps = 20 + steps_per_second = 20 + time_per_step = 1 / steps_per_second # 20 hertz + for i in range(turn_steps): + distance, velocity, pose_x, pose_y, pose_angle, pose_velocity_x, pose_velocity_y, pose_velocity_angle, timestamp = unicycle.run(0, turn_distance * (i+1) / turn_steps, 1 + time_per_step * (i + 1)) + self.assertEqual(0, unicycle.left_distance) + self.assertEqual(turn_distance, unicycle.right_distance) + self.assertEqual(1 + turn_steps * time_per_step, unicycle.timestamp) + self.assertAlmostEqual(0.5, pose_x, 3) + self.assertAlmostEqual(0.5, pose_y, 3) + self.assertAlmostEqual(math.pi / 2, pose_angle, 3) + + def test_turn_right(self): + unicycle = Unicycle(1) + + unicycle.run(0, 0, 1) + self.assertEqual(0, unicycle.left_distance) + self.assertEqual(0, unicycle.right_distance) + self.assertEqual(1, unicycle.timestamp) + self.assertEqual(0, unicycle.pose.x) + self.assertEqual(0, unicycle.pose.y) + self.assertEqual(0, unicycle.pose.angle) + self.assertEqual(0, unicycle.pose_velocity.x) + self.assertEqual(0, unicycle.pose_velocity.y) + self.assertEqual(0, unicycle.pose_velocity.angle) + + # + # turn right 180 degrees, pivoting on right wheel. + # pivot on a non-moving wheel creates a circle + # with a radius equal to the axle-length. + # do this at 20 hertz, like the vehicle loop + # + turn_distance = math.pi / 2 + turn_steps = 20 + steps_per_second = 20 + time_per_step = 1 / steps_per_second # 20 hertz + for i in range(turn_steps): + distance, velocity, pose_x, pose_y, pose_angle, pose_velocity_x, pose_velocity_y, pose_velocity_angle, timestamp = unicycle.run(turn_distance * (i+1) / turn_steps, 0, 1 + time_per_step * (i + 1)) + self.assertEqual(0, unicycle.right_distance) + self.assertEqual(turn_distance, unicycle.left_distance) + self.assertEqual(1 + turn_steps * time_per_step, unicycle.timestamp) + self.assertAlmostEqual(0.5, pose_x, 3) + self.assertAlmostEqual(-0.5, pose_y, 3) + self.assertAlmostEqual(-math.pi / 2, pose_angle, 3) + + +class TestInverseBicycle(unittest.TestCase): + def test_inverse_bicycle(self): + wheel_base = 0.5 + steering_angle = math.pi / 8 + angular_velocity = bicycle_angular_velocity(wheel_base, 1, steering_angle) + inverse = InverseBicycle(wheel_base) + forward_velocity, inverse_steering_angle, _ = inverse.run(1, angular_velocity) + self.assertAlmostEqual(forward_velocity, 1, 3) + self.assertAlmostEqual(inverse_steering_angle, steering_angle, 3) + + +class TestInverseUnicycle(unittest.TestCase): + def test_inverse_unicycle(self): + axle_length = 0.3 + wheel_radius = 0.01 + angular_velocity = unicycle_angular_velocity(wheel_radius, axle_length, 0.5, 1.5) + inverse = InverseUnicycle(axle_length, wheel_radius, 0.01, 10, 0) + left_velocity, right_velocity, _ = inverse.run(1, angular_velocity) + self.assertAlmostEqual(left_velocity, 0.5, 3) + self.assertAlmostEqual(right_velocity, 1.5, 3) + + +class TestBicycleNormalizeAngularVelocity(unittest.TestCase): + def test_normalize(self): + wheel_base = 0.5 + max_forward_velocity = 10 + max_steering_angle = math.pi / 6 + max_angular_velocity = bicycle_angular_velocity(wheel_base, max_forward_velocity, max_steering_angle) + normalize = BicycleNormalizeAngularVelocity(wheel_base, max_forward_velocity, max_steering_angle) + unnormalize = BicycleUnnormalizeAngularVelocity(wheel_base, max_forward_velocity, max_steering_angle) + + angular_velocity = normalize.run(max_angular_velocity) + self.assertAlmostEqual(angular_velocity, 1.0, 3) + angular_velocity = unnormalize.run(angular_velocity) + self.assertAlmostEqual(angular_velocity, max_angular_velocity, 3) + + angular_velocity = normalize.run(max_angular_velocity / 2) + self.assertAlmostEqual(angular_velocity, 0.5, 3) + angular_velocity = unnormalize.run(angular_velocity) + self.assertAlmostEqual(angular_velocity, max_angular_velocity / 2, 3) + + angular_velocity = normalize.run(0) + self.assertAlmostEqual(angular_velocity, 0, 3) + angular_velocity = unnormalize.run(angular_velocity) + self.assertAlmostEqual(angular_velocity, 0, 3) + + angular_velocity = normalize.run(-max_angular_velocity / 2) + self.assertAlmostEqual(angular_velocity, -0.5, 3) + angular_velocity = unnormalize.run(angular_velocity) + self.assertAlmostEqual(angular_velocity, -max_angular_velocity / 2, 3) + + angular_velocity = normalize.run(-max_angular_velocity) + self.assertAlmostEqual(angular_velocity, -1.0, 3) + angular_velocity = unnormalize.run(angular_velocity) + self.assertAlmostEqual(angular_velocity, -max_angular_velocity, 3) + + +class TestUnicycleNormalizeAngularVelocity(unittest.TestCase): + def test_normalize(self): + wheel_radius = 0.01 + axle_length = 0.1 + max_forward_velocity = 10 + max_angular_velocity = unicycle_max_angular_velocity(wheel_radius, axle_length, max_forward_velocity) + normalize = UnicycleNormalizeAngularVelocity(wheel_radius, axle_length, max_forward_velocity) + unnormalize = UnicycleUnnormalizeAngularVelocity(wheel_radius, axle_length, max_forward_velocity) + + angular_velocity = normalize.run(max_angular_velocity / 2) + self.assertAlmostEqual(angular_velocity, 0.5, 3) + angular_velocity = unnormalize.run(angular_velocity) + self.assertAlmostEqual(angular_velocity, max_angular_velocity / 2, 3) + + +class TestNormalizeSteeringAngle(unittest.TestCase): + def test_normalize(self): + max_steering_angle = math.pi / 6 + normalize = NormalizeSteeringAngle(max_steering_angle) + unnormalize = UnnormalizeSteeringAngle(max_steering_angle) + + # + # steering(-1) == steering_angle(max_steering_angle) + # + steering = normalize.run(max_steering_angle) + self.assertAlmostEqual(steering, -1.0, 3) + steering_angle = unnormalize.run(steering) + self.assertAlmostEqual(steering_angle, max_steering_angle, 3) + + # + # steering(-0.5) == steering_angle(max_steering_angle/2) + # + steering = normalize.run(max_steering_angle / 2) + self.assertAlmostEqual(steering, -0.5, 3) + steering_angle = unnormalize.run(steering) + self.assertAlmostEqual(steering_angle, max_steering_angle / 2, 3) + + # + # steering(0) == steering_angle(0) + # + steering = normalize.run(0) + self.assertAlmostEqual(steering, 0, 3) + steering_angle = unnormalize.run(steering) + self.assertAlmostEqual(steering_angle, 0, 3) + + # + # steering(0.5) == steering_angle(-max_steering_angle/2) + # + steering = normalize.run(-max_steering_angle / 2) + self.assertAlmostEqual(steering, 0.5, 3) + steering_angle = unnormalize.run(steering) + self.assertAlmostEqual(steering_angle, -max_steering_angle / 2, 3) + + # + # steering(1) == steering_angle(-max_steering_angle) + # + steering = normalize.run(-max_steering_angle) + self.assertAlmostEqual(steering, 1.0, 3) + steering_angle = unnormalize.run(steering) + self.assertAlmostEqual(steering_angle, -max_steering_angle, 3) class TestTwoWheelSteeringThrottle(unittest.TestCase): @@ -25,3 +483,261 @@ def test_differential_steering(self): self.assertEqual((1.0, 0.8), differential_steering(1.0, 0.20, 0.0)) self.assertEqual((0.5, 0.45), differential_steering(0.5, 0.10, 0.0)) self.assertEqual((0.5, 0.40), differential_steering(0.5, 0.20, 0.0)) + +from donkeycar.templates.complete import add_odometry +from donkeycar.vehicle import Vehicle +from donkeycar.parts.kinematics import NormalizeSteeringAngle, limit_angle +import math + + +def calc_center_of_rotation(wheelbase, front_wheel, orientation, steering_angle): + """ + Calculate the center of rotation for a bicycle turn. + + Args: + wheelbase (float): The length of the bicycle wheelbase. + front_wheel (tuple): The x and y coordinates of the front wheel (x, y). + orientation (float): The orientation of the bicycle in radians. + steering_angle (float): The steering angle of the bicycle in radians. + + Returns: + tuple: The x and y coordinates of the center of rotation + and the radius of the rotation (x, y, radius) . + if the steering_angle is zero, the function returns the position + of the front wheel as the center of rotation and a radius of zero, + since there is no turn in this case. + """ + x, y = front_wheel + if steering_angle == 0: + return (x, y, 0) + else: + R = wheelbase / math.sin(steering_angle) + cx = x - R * math.sin(orientation) + cy = y + R * math.cos(orientation) + return (cx, cy, abs(R)) # radius is always positive + + +def calc_arc_end_point(center, radius, point, radians): + """ + Calculates the ending point on a circle given the center of the circle, the radius of the circle, + a point on the circle and a distance along the circle in radians from the given point. + + Arguments: + center -- the center of the circle represented as a tuple (x, y) (tuple) + radius -- the radius of the circle (float) + point -- a point on the circle represented as a tuple (x, y) (tuple) + radians -- the distance along the circle in radians from the given point (float) + + Returns: + end_point -- the ending point on the circle represented as a tuple (x, y) (tuple) + """ + center_x, center_y = center + point_x, point_y = point + + theta = math.atan2(point_y - center_y, point_x - center_x) + theta_end = theta + radians + + x = center_x + radius * math.cos(theta_end) + y = center_y + radius * math.sin(theta_end) + end_point = (x, y) + return end_point + + +def calc_bicycle_pose(wheelbase, front_wheel, orientation, steering_angle, distance): + """ + Calculate the ending pose for a bicycle with the given wheel base, + starting front wheel position, orientation, and steering angle that has + driven the given distance. + :param wheelbase: + :param front_wheel: + :param orientation: + :param steering_angle: + :return: ending pose as tuple of (x, y, orientation) + """ + + if distance == 0: + return front_wheel.x, front_wheel.y, orientation + + if steering_angle == 0: + ending_x = front_wheel[0] + distance * math.cos(orientation) + ending_y = front_wheel[1] + distance * math.sin(orientation) + ending_angle = orientation + else: + # - calculate the center and radius of the turn, + center_x, center_y, turn_radius = calc_center_of_rotation(wheelbase, + front_wheel, + orientation, + steering_angle) + print(f"Center of rotation = ({center_x}, {center_y})") + print(f"Radius of turn = {turn_radius}") + + # - calculate the circumference of a full turn + turn_circumference = 2 * math.pi * turn_radius + + # - calculate the angular distance travelled in radians + turn_radians = (distance / turn_circumference) * 2 * math.pi + turn_radians *= sign(steering_angle) # right turn is negative angle + + # - calculate the end position on the arc + ending_x, ending_y = calc_arc_end_point((center_x, center_y), + turn_radius, front_wheel, turn_radians) + + # - calculate the end orientation + ending_angle = limit_angle(orientation + math.tan(steering_angle) * distance / wheelbase) + + return ending_x, ending_y, ending_angle + + +def calc_line_length(p1, p2): + """ + Calculate the length of a line segment given its endpoints. + + Args: + p1 (tuple): The x and y coordinates of the first endpoint (x1, y1). + p2 (tuple): The x and y coordinates of the second endpoint (x2, y2). + + Returns: + float: The length of the line segment. + """ + x1, y1 = p1 + x2, y2 = p2 + return math.sqrt((x2 - x1)**2 + (y2 - y1)**2) + + +class TestVehicle(unittest.TestCase): + + def initialize_config(self): + # initialize configurations + class Config: + pass + + cfg = Config() + # cfg.DONKEY_GYM = True + # cfg.__setattr__('DONKEY_GYM', True) + # cfg.SIM_RECORD_DISTANCE = True + + cfg.DRIVE_LOOP_HZ = 50 + + cfg.DRIVE_TRAIN_TYPE = "mock" + cfg.AXLE_LENGTH = 0.175 # length of axle; distance between left and right wheels in meters + cfg.WHEEL_BASE = 0.20 # distance between front and back wheels in meters + cfg.WHEEL_RADIUS = 0.04 # radius of wheel in meters + cfg.MAX_STEERING_ANGLE = 25.0 / 360.0 * 3.141592653589793 * 2 # for car-like robot; maximum steering angle in radians (corresponding to tire angle at steering == -1) + cfg.MIN_SPEED = 0.1 # minimum speed in meters per second; speed below which car stalls + cfg.MAX_SPEED = 3.0 # maximum speed in meters per second; speed at maximum throttle (1.0) + cfg.MIN_THROTTLE = 0.1 # throttle (0 to 1.0) that corresponds to MIN_SPEED, throttle below which car stalls + cfg.HAVE_ODOM = True + cfg.HAVE_ODOM_2 = False + cfg.ENCODER_PPR = 600 + cfg.MM_PER_TICK = ( + cfg.WHEEL_RADIUS * 2 * 3.141592653589793) / cfg.ENCODER_PPR * 1000 # How much travel with a single encoder tick, in mm. Roll you car a meter and divide total ticks measured by 1,000 + cfg.ODOM_SMOOTHING = 1 # number of odometer readings to use when calculating velocity + cfg.ODOM_DEBUG = False # Write out values on vel and distance as it runs + cfg.ENCODER_TYPE = "MOCK" + cfg.TACHOMETER_MODE = 1 # FORWARD_ONLY + + cfg.ENCODER_TYPE = "MOCK" + cfg.MOCK_TICKS_PER_SECOND = cfg.ENCODER_PPR * 10 + + return cfg + + def initialize_vehicle(self, cfg): + # Initialize vehicle pipeline + v = Vehicle() + add_odometry(v, cfg, threaded=False) + return v + + def do_drive(self, cfg, forward_velocity, steering_angle): + steering_angle = limit_angle(steering_angle) + turn_direction = sign(steering_angle) + + wheel_base = cfg.WHEEL_BASE + + + # + # turn an 1/4 of circle while wheel is turned 12.5 degrees. + # do this at 20 hertz, like the vehicle loop + # + # calculate length of circle's perimeter + # see [Rear-axle reference point](https://thef1clan.com/2020/09/21/vehicle-dynamics-the-kinematic-bicycle-model/) + # + turn_distance = None + if steering_angle != 0.0: + R = wheel_base / math.sin(steering_angle) # Radius of turn (radius to Instantaneous Center of Rotation) + C = 2 * math.pi * R # Circumference of turn + turn_distance = abs(C / 4) # drive the 90 degrees of perimeter + else: + # going straight + turn_distance = forward_velocity * 2 # just use 2 seconds of driving + + # + # calculate a ticks_per_second value for the mock encoder + # + turn_seconds = turn_distance / forward_velocity + steps_per_second = cfg.DRIVE_LOOP_HZ + turn_steps = steps_per_second * turn_seconds + + meters_per_revolution = cfg.WHEEL_RADIUS * 2 * math.pi + turn_revolutions = turn_distance / meters_per_revolution + turn_ticks = turn_revolutions * cfg.ENCODER_PPR + ticks_per_second = turn_ticks / turn_seconds + + # + # set turns-per-tick for mock encoder into configuration + # + cfg.MOCK_TICKS_PER_SECOND = ticks_per_second + + # set throttle and steering angle + throttle = 1.0 # full throttle, so we can use the cfg.MOCK_TICKS_PER_SECOND as the encoder rate + normalize_steering = NormalizeSteeringAngle(max_steering_angle=cfg.MAX_STEERING_ANGLE) + steering = normalize_steering.run(steering_angle) + + # run the vehicle and then stop it + vehicle = self.initialize_vehicle(cfg) + vehicle.mem.put(['throttle', 'steering'], [throttle, steering]) + + # run the vehicle loop and get how long it took + loop_count, loop_seconds = vehicle.start(rate_hz=steps_per_second, max_loop_count=int(turn_steps), verbose=True) + + # read the final values from vehicle memory + distance, velocity, pose_x, pose_y, pose_angle = vehicle.mem.get(['enc/distance', 'enc/velocity', 'pos/x', 'pos/y', 'pos/angle', ]) + + # + # use the actual loop time to determine the expected distance + # + loop_ticks = ticks_per_second * loop_seconds + loop_distance = loop_ticks / cfg.ENCODER_PPR * meters_per_revolution + self.assertAlmostEqual(loop_distance, distance, 2) + self.assertLessEqual(abs(loop_distance - distance) / distance, 0.005) # final error less than 0.5% + + # + # calculate the expected ending position using the center of the turn, + # the radius of the turn, the circumference of the turn + # and the distance travelled to get the point on the turn where + # the vehicle ends up + # + ending_x, ending_y, ending_angle = calc_bicycle_pose(cfg.WHEEL_BASE, (0, 0), 0, steering_angle, distance) + + print(f"Expected Distance = {loop_distance}") + print(f"Calculated Pose = ({pose_x}, {pose_y}, {pose_angle})") + print(f"Expected ending Pose = ({ending_x}, {ending_y}, {ending_angle})") + + if steering_angle != 0.0: + self.assertLessEqual(abs((ending_x - pose_x) / loop_distance), 0.005) # final error less than 0.5% + self.assertLessEqual(abs((ending_y - pose_y) / loop_distance), 0.005) # final error less than 0.5% + self.assertLessEqual(abs((ending_angle - pose_angle) / (2 * math.pi)), 0.03) # final error less than 3% + + def test_drive_straight(self): + # 12.5 degrees steering angle + cfg = self.initialize_config() + self.do_drive(cfg, forward_velocity=cfg.MAX_SPEED / 2, steering_angle = 0.0) + + def test_turn_left(self): + # 12.5 degrees steering angle + cfg = self.initialize_config() + self.do_drive(cfg, forward_velocity=cfg.MAX_SPEED / 2, steering_angle =cfg.MAX_STEERING_ANGLE / 2) + + def test_turn_right(self): + # -12.5 degrees steering angle + cfg = self.initialize_config() + self.do_drive(cfg, forward_velocity=cfg.MAX_SPEED / 2, steering_angle =-cfg.MAX_STEERING_ANGLE / 2) diff --git a/donkeycar/tests/test_odometer.py b/donkeycar/tests/test_odometer.py new file mode 100644 index 000000000..3e629bfed --- /dev/null +++ b/donkeycar/tests/test_odometer.py @@ -0,0 +1,29 @@ + +import unittest +import time + +from donkeycar.parts.odometer import Odometer + +class TestOdometer(unittest.TestCase): + + def test_odometer(self): + odometer = Odometer(0.2) # 0.2 meters per revolution + + ts = time.time() # initial time + distance, velocity, timestamp = odometer.run(1, ts) # first reading is one revolution + self.assertEqual(ts, timestamp) + self.assertEqual(0.2, distance) # distance travelled is 0.2 meters + self.assertEqual(0, velocity) # zero velocity until we get two distances + + ts += 1 # add one second + distance, velocity, timestamp = odometer.run(2, ts) # total of 2 revolutions + self.assertEqual(ts, timestamp) + self.assertEqual(0.4, distance) # 0.4 meters travelled + self.assertEqual(0.2, velocity) # 0.2 meters per second since last update + + ts += 1 # add one second + distance, velocity, timestamp = odometer.run(2, ts) # don't move + self.assertEqual(ts, timestamp) + self.assertEqual(0.4, distance) # still at 0.4 meters travelled + self.assertEqual(0, velocity) # 0 meter per second in last interval + diff --git a/donkeycar/tests/test_tachometer.py b/donkeycar/tests/test_tachometer.py new file mode 100644 index 000000000..44a72ab3d --- /dev/null +++ b/donkeycar/tests/test_tachometer.py @@ -0,0 +1,152 @@ +import time +import unittest + +from donkeycar.parts.tachometer import Tachometer +from donkeycar.parts.tachometer import AbstractEncoder +from donkeycar.parts.tachometer import EncoderMode +from donkeycar.parts.tachometer import InverseTachometer + + +class MockEncoder(AbstractEncoder): + def __init__(self) -> None: + self.ticks = 0 + super().__init__() + def start_ticks(self): + pass + def stop_ticks(self): + pass + def poll_ticks(self, direction: int) -> int: + """ + add a tick based on direction + """ + assert((1==direction) or (0==direction) or (-1==direction)) + self.ticks += direction + return self.ticks + def get_ticks(self, encoder_index:int=0) -> int: + return self.ticks + +class MockTachometer(Tachometer): + def __init__(self, ticks_per_revolution: float, direction_mode, debug=False): + super().__init__(MockEncoder(), ticks_per_revolution=ticks_per_revolution, direction_mode=direction_mode, poll_delay_secs=0, debug=debug) + + +class TestTachometer(unittest.TestCase): + + def test_tachometer_forward_only(self): + # + # FORWARD_ONLY mode will ignore throttle + # and just increase revolutions + # + tachometer = MockTachometer( + ticks_per_revolution=10, + direction_mode=EncoderMode.FORWARD_ONLY) + + ts = time.time() + revolutions, timestamp = tachometer.run(throttle=1, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.1, revolutions, 4) + + ts += 1 # add one second and one revolution + revolutions, timestamp = tachometer.run(throttle=1, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.2, revolutions, 4) + + ts += 1 # add one second, but stopped + revolutions, timestamp = tachometer.run(throttle=0, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.3, revolutions, 4) + + ts += 1 # add one second and reverse one revolution + revolutions, timestamp = tachometer.run(throttle=-1, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.4, revolutions, 4) + + ts += 1 # add one second, but stopped and coasting + revolutions, timestamp = tachometer.run(throttle=0, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.5, revolutions, 4) + + + def test_tachometer_forward_reverse(self): + # + # FORWARD_REVERSE mode will ignore zero throttle + # and continue changing revolutions in the + # current direction. So if direction was + # forward, then throttle=0 will continue forward, + # if direction was reverse then throttle=0 + # will continue in reverse. + # + tachometer = MockTachometer( + ticks_per_revolution=10, + direction_mode=EncoderMode.FORWARD_REVERSE) + + ts = time.time() + revolutions, timestamp = tachometer.run(throttle=1, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.1, revolutions, 4) + + ts += 1 # add one second and one revolution + revolutions, timestamp = tachometer.run(throttle=1, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.2, revolutions, 4) + + ts += 1 # add one second, but stopped + revolutions, timestamp = tachometer.run(throttle=0, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.3, revolutions, 4) + + ts += 1 # add one second and reverse one revolution + revolutions, timestamp = tachometer.run(throttle=-1, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.2, revolutions, 4) + + ts += 1 # add one second, but stopped and coasting + revolutions, timestamp = tachometer.run(throttle=0, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.1, revolutions, 4) + + + def test_tachometer_forward_reverse_stop(self): + # + # FORWARD_REVERSE_STOP mode will honor the + # throttle completely; + # positive throttle being forward, increasing revolutions + # negative throttle being reverse, decreasing revolutions + # zero throttle being stopped; no change in revolutions + # + tachometer = MockTachometer( + ticks_per_revolution=10, + direction_mode=EncoderMode.FORWARD_REVERSE_STOP) + + ts = time.time() + revolutions, timestamp = tachometer.run(throttle=1, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.1, revolutions, 4) + + ts += 1 # add one second and one revolution + revolutions, timestamp = tachometer.run(throttle=1, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.2, revolutions, 4) + + ts += 1 # add one second, but stopped + revolutions, timestamp = tachometer.run(throttle=0, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.2, revolutions, 4) + + ts += 1 # add one second and reverse one revolution + revolutions, timestamp = tachometer.run(throttle=-1, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.1, revolutions, 4) + + ts += 1 # add one second, but stopped + revolutions, timestamp = tachometer.run(throttle=0, timestamp=ts) + self.assertEqual(ts, timestamp) + self.assertAlmostEqual(0.1, revolutions, 4) + + +class TestInverseTachometer(unittest.TestCase): + + def test_inverse_tachometer_forward(self): + tachometer = InverseTachometer(meters_per_revolution=0.25) + revolutions, _ = tachometer.run(distance=100) + self.assertEqual(400, revolutions) diff --git a/donkeycar/utilities/circular_buffer.py b/donkeycar/utilities/circular_buffer.py new file mode 100644 index 000000000..d3285ef8a --- /dev/null +++ b/donkeycar/utilities/circular_buffer.py @@ -0,0 +1,143 @@ + +class CircularBuffer: + """ + Fixed capacity circular buffer that can be used as + a queue, stack or array + """ + def __init__(self, capacity:int, defaultValue=None) -> None: + if capacity <= 0: + raise ValueError("capacity must be greater than zero") + self.defaultValue = defaultValue + self.buffer:list = [None] * capacity + self.capacity:int = capacity + self.count:int = 0 + self.tailIndex:int = 0 + + def head(self): + """ + Non-destructive get of the entry at the head (index count-1) + return: if list is not empty, the head (most recently pushed/enqueued) entry. + if list is empty, the default value provided in the constructor + """ + if self.count > 0: + return self.buffer[(self.tailIndex + self.count - 1) % self.capacity] + return self.defaultValue + + def tail(self): + """ + Non-destructive get of the entry at the tail (index 0) + return: if list is not empty, the tail (least recently pushed/enqueued) entry. + if list is empty, the default value provided in the constructor + """ + if self.count > 0: + return self.buffer[self.tailIndex] + return self.defaultValue + + def enqueue(self, value): + """ + Push a value onto the head of the buffer. + If the buffer is full, then the value + at the tail is dropped to make room. + """ + if self.count < self.capacity: + self.count += 1 + else: + # drop entry at the tail + self.tailIndex = (self.tailIndex + 1) % self.capacity + + # write value at head + self.buffer[(self.tailIndex + self.count - 1) % self.capacity] = value + + def dequeue(self): + """ + Remove value at tail of list and return it + return: if list not empty, value at tail + if list empty, the default value + """ + theValue = self.tail() + if self.count > 0: + self.count -= 1 + self.tailIndex = (self.tailIndex + 1) % self.capacity + return theValue + + def push(self, value): + """ + Push a value onto the head of the buffer. + If the buffer is full, then a IndexError + is raised. + """ + if self.count >= self.capacity: + raise IndexError("Attempt to push to a full buffer") + + self.enqueue(value) + + def pop(self): + """ + Remove value at head of list and return it + return: if list not empty, the value at head + if list empty, the default value + """ + theValue = self.head() + if self.count > 0: + self.count -= 1 + return theValue + + def append(self, value): + """ + append a value to the tail (index count-1) of the buffer. + If the buffer is at capacity, then this + will raise an IndexError. + """ + if self.count >= self.capacity: + raise IndexError("Attempt to append to a full buffer") + + # make space a tail for the value + self.count += 1 + self.tailIndex = (self.tailIndex - 1) % self.capacity + self.buffer[self.tailIndex] = value + + + def get(self, i:int): + """ + Get value at given index where + head is index 0 and tail is is index count-1; + i: index from 0 to count-1 (head is zero) + return: value at index + or the default value if index is out of range + """ + if (i >= 0) and (i < self.count): + return self.buffer[(self.tailIndex + (self.count + i - 1)) % self.capacity] + + return self.defaultValue + + def set(self, i:int, value): + """ + Set value at given index where + head is index 0 and tail is is index count-1; + """ + if (i >= 0) and (i < self.count): + self.buffer[(self.tailIndex + (self.count + i - 1)) % self.capacity] = value + return + raise IndexError("buffer index is out of range") + + def truncateTo(self, count): + """ + Truncate the list to the given number of values. + If the given number is greater than or equal to + the current count(), then nothing is changed. + If the given number is less than the + current count(), then elements are dropped + from the tail to resize to the given number. + The capactity of the queue is not changed. + + So to drop all entries except the head: + truncateTo(1) + + count: the desired number of elements to + leave in the queue (maximum) + """ + if count < 0 or count > self.capacity: + raise ValueError("count is out of range") + self.count = count + self.tailIndex = (self.tailIndex + count) % self.capacity + diff --git a/donkeycar/vehicle.py b/donkeycar/vehicle.py index b54cc4104..614b9d78d 100644 --- a/donkeycar/vehicle.py +++ b/donkeycar/vehicle.py @@ -146,6 +146,7 @@ def start(self, rate_hz=10, max_loop_count=None, verbose=False): # wait until the parts warm up. logger.info('Starting vehicle at {} Hz'.format(rate_hz)) + loop_start_time = time.time() loop_count = 0 while self.on: start_time = time.time() @@ -154,20 +155,26 @@ def start(self, rate_hz=10, max_loop_count=None, verbose=False): self.update_parts() # stop drive loop if loop_count exceeds max_loopcount - if max_loop_count and loop_count > max_loop_count: + if max_loop_count and loop_count >= max_loop_count: self.on = False - - sleep_time = 1.0 / rate_hz - (time.time() - start_time) - if sleep_time > 0.0: - time.sleep(sleep_time) else: - # print a message when could not maintain loop rate. - if verbose: - logger.info('WARN::Vehicle: jitter violation in vehicle loop ' - 'with {0:4.0f}ms'.format(abs(1000 * sleep_time))) + sleep_time = 1.0 / rate_hz - (time.time() - start_time) + if sleep_time > 0.0: + time.sleep(sleep_time) + else: + # print a message when could not maintain loop rate. + if verbose: + logger.info('WARN::Vehicle: jitter violation in vehicle loop ' + 'with {0:4.0f}ms'.format(abs(1000 * sleep_time))) + + if verbose and loop_count % 200 == 0: + self.profiler.report() + + + loop_total_time = time.time() - loop_start_time + logger.info(f"Vehicle executed {loop_count} steps in {loop_total_time} seconds.") - if verbose and loop_count % 200 == 0: - self.profiler.report() + return loop_count, loop_total_time except KeyboardInterrupt: pass diff --git a/setup.py b/setup.py index 0a9b86b2f..ae3c7b9ca 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ def package_files(directory, strip_leading): long_description = fh.read() setup(name='donkeycar', - version="4.4.dev4", + version="4.4.dev5", long_description=long_description, description='Self driving library for python.', url='https://github.com/autorope/donkeycar',