Categories
Mastering Development

Update a Global Boolean to Start or Stop a Function

I am writing an app in python that controls some LEDs based on GPIO state. On the hardware, I listen for the GPIOs to change state and publish to an MQTT server running on a raspberry pi to control the LEDs. One of the GPIOs should cause an LED to blink indefinitely until state changes again to turn it off, exactly how a car turn signal works.

So when the GPIO goes high I publish to a topic with the payload ‘on’. I then set a global variable

blinker_status = True

and then basically say (pseudo code)

while (blinker_status) { blink }

When I publish to that same topic with the payload ‘off’, I set the value of the global variable

blinker_status = False

I’m expecting the LED to stop blinking, but it doesn’t. I’m not sure if the issue is that MQTT is blocking?

Once I start the LED blinking, perhaps it’s blocking the on_message() call back and it cannot process additional messages? Should I isolate all of my subscriber clients into their own threads so that publishing to each topic is handled by its own on_message() call back?

I have code I can provide if necessary but at this point I’m stuck on trying to understand why my logic is flawed.

Below is my code

import time
from neopixel import *
from threading import Thread
import paho.mqtt.client as mqtt

mqtt_client = None

# LED strip configuration:
LED_COUNT_LEFT = 150  # Number of LED pixels on the left strip.
LED_COUNT_RIGHT = 1  # Number of LED pixels on the right strip.
LED_FREQ_HZ = 800000  # LED signal frequency in hertz (usually 800khz)
LED_DMA = 10  # DMA channel to use for generating signal (try 10)
LED_BRIGHTNESS = 255  # Set to 0 for darkest and 255 for brightest
LED_INVERT = False  # True to invert the signal (when using NPN transistor level shift)
LED_CHANNEL_LEFT = 0  # set to '1' for GPIOs 13, 19, 41, 45 or 53
LED_CHANNEL_RIGHT = 1

# NOTE:  The WS2812B strip has red and green inverted, so red is actually (0, 255, 0) and green is (255, 0, 0)

left_blinker_status = False
right_blinker_status = False
blinker_status = False

# Create NeoPixel object with appropriate configuration.
# GPIO 18 is pin 12
left_strip = Adafruit_NeoPixel(LED_COUNT_LEFT, 18, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL_LEFT)
# GPIO 13 is pin 33
right_strip = Adafruit_NeoPixel(LED_COUNT_RIGHT, 13, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS,
                                LED_CHANNEL_RIGHT)
# Intialize the library (must be called once before other functions).
left_strip.begin()
right_strip.begin()


# MQTT functions
def start_mqtt():
    global mqtt_client
    try:
        system_id = 'Kawasaki Ninja'

        mqtt_client = mqtt.Client(client_id=system_id)
        mqtt_client.disable_logger()

        # Assign callback functions
        mqtt_client.on_connect = on_connect
        mqtt_client.on_message = on_message

        mqtt_client.connect(host='192.168.1.23')
        # Blocking call that processes network traffic, dispatches callbacks and
        # handles reconnecting.
        mqtt_client.loop_forever()

    except Exception as ex:
        print('Exception in start_mqtt()! exception: {}'.format(ex))
        raise


# THe callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print('Connected with result code ' + str(rc))

    # Subscribing in on_connect() - if we lose connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe('ninja/#')  # The hash sign is wild-card


# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    global blinker_status
    global left_blinker_status
    global right_blinker_status

    print('{} {}'.format(msg.topic, str(msg.payload)))

    # Handle the brake topic
    if msg.topic == 'ninja/brake':
        if msg.payload == 'on':
            # BRAKE ON
            if not left_blinker_status:
                solidColor(left_strip, Color(0, 255, 0))
            if not right_blinker_status:
                solidColor(right_strip, Color(0, 255, 0))
        elif msg.payload == 'off':
            # BRAKE OFF
            if not left_blinker_status:
                solidColor(left_strip, Color(255, 255, 255))
            if not right_blinker_status:
                solidColor(right_strip, Color(255, 255, 255))

    # Handle the left turn signal topic
    elif msg.topic == 'ninja/left_turn_signal':
        # Left turn signal on
        if msg.payload == 'on':
            blinker_status = True
            left_blinker_status = True
            while left_blinker_status:
                blinker(blinker_status, left_strip)
        # Left turn signal off
        elif msg.payload == 'off':
            blinker_status = False
            left_blinker_status = False
            solidColor(left_strip, Color(255, 255, 255))

    # Handle the right turn signal topic
    elif msg.topic == 'ninja/right_turn_signal':
        # Right turn signal on
        if msg.payload == 'on':
            blinker_status = True
            right_blinker_status = True
            while right_blinker_status:
                blinker(blinker_status, right_strip)
        # Right turn signal off
        elif msg.payload == "off":
            blinker_status = False
            right_blinker_status = False
            solidColor(right_strip, Color(255, 255, 255))

    # Handle the party time topic
    elif msg.topic == 'ninja/party_time':
        # Party on
        if msg.payload == 'on':
            colorWipe(left_strip, Color(0, 255, 0))  # Red

        elif msg.payload == 'white':
            solidColor(left_strip, Color(255, 255, 255))

        elif msg.payload == 'wipe':
            colorWipe(left_strip, Color(0, 255, 0))  # Red

        elif msg.payload == 'off':
            solidColor(left_strip, Color(0, 0, 0))


# Neopixel functions
# Define functions which animate LEDs in various ways.
def colorWipe(strip, color, wait_ms=50):
    """Wipe color across display a pixel at a time."""
    for i in range(strip.numPixels()):
        strip.setPixelColor(i, color)
        strip.show()
        time.sleep(wait_ms / 1000.0)


def solidColor(strip, color):
    for i in range(strip.numPixels()):
        strip.setPixelColor(i, color)
    strip.show()


def blinker(blinker_status, strip):
    while blinker_status:
        solidColor(strip, Color(65, 100, 0))
        time.sleep(0.5)
        solidColor(strip, Color(0, 0, 0))
        time.sleep(0.5)


try:
    while True:
        mqtt_thread = Thread(target=start_mqtt, args=())
        mqtt_thread.isDaemon()
        mqtt_thread.start()

except KeyboardInterrupt:
    exit()

Leave a Reply

Your email address will not be published. Required fields are marked *