Sample Program¶
A sample programs that uses dronebuddylib to interact with a drone is given below. This program uses the library to recognize speech, recognize intent, and execute drone functions based on the recognized intent. The program also uses the library to recognize faces, objects, and text. The program also uses the library to generate speech and read aloud the recognized intent and the results of the face, object, and text recognition.
import asyncio
import datetime
import threading
import time
from typing import List
import cv2
import requests
import speech_recognition
from djitellopy import Tello
from dronebuddylib import SpeechRecognitionEngine, IntentRecognitionEngine, FaceRecognitionEngine, \
ObjectDetectionEngine, TextRecognitionEngine, SpeechGenerationEngine
from dronebuddylib.models import EngineConfigurations
from dronebuddylib.models.enums import AtomicEngineConfigurations, IntentRecognitionAlgorithm, DroneCommands, \
FaceRecognitionAlgorithm, VisionAlgorithm, TextRecognitionAlgorithm, SpeechGenerationAlgorithm
from dronebuddylib.utils.enums import SpeechRecognitionAlgorithm, SpeechRecognitionMultiAlgoAlgorithmSupportedAlgorithms
from dronebuddylib.utils.utils import Logger
engine_configs = EngineConfigurations({})
logger = Logger()
is_drone_in_air = False
def open_mic_operations(drone_instance, on_recognized_callback):
speech_microphone = speech_recognition.Microphone()
engine_configs.add_configuration(AtomicEngineConfigurations.SPEECH_RECOGNITION_MULTI_ALGO_ALGORITHM_NAME,
SpeechRecognitionMultiAlgoAlgorithmSupportedAlgorithms.GOOGLE.name)
engine = SpeechRecognitionEngine(SpeechRecognitionAlgorithm.MULTI_ALGO_SPEECH_RECOGNITION, engine_configs)
intent_engine = init_intent_rec_engine()
face_recognition_engine = init_face_rec_engine()
object_recognition_engine = init_object_rec_engine()
text_recognition_engine = init_text_rec_engine()
voice_engine = init_voice_generation_engine()
while True:
with speech_microphone as source:
logger.log_info("TEST",
"Recognizing voice *********************************************************************************************************")
print(
"*********************************************************************************************************")
print("Say something...")
print(
"*********************************************************************************************************")
print(time.time())
try:
logger.log_info("TEST",
"Recognizing voice *********************************************************************************************************")
result = engine.recognize_speech(source)
if result.recognized_speech is not None:
logger.log_info("TEST", "Recognized: " + result.recognized_speech)
intent = recognize_intent_gpt(intent_engine, result.recognized_speech)
read_aloud_text = execute_drone_functions(intent, drone_instance, face_recognition_engine,
object_recognition_engine,
text_recognition_engine, voice_engine)
on_recognized_callback(read_aloud_text, voice_engine)
else:
logger.log_warning("TEST", "Not Recognized: voice ")
except speech_recognition.WaitTimeoutError:
engine.recognize_speech(source)
time.sleep(1) # Sleep to simulate work and prevent a tight loop
def generate_voice_response(voice_engine, text):
try:
voice_engine.read_phrase(text)
except Exception as e:
logger.log_error("Error in voice generation:", str(e))
def recognize_intent_snips(recognized_text):
engine = SpeechRecognitionEngine(SpeechRecognitionAlgorithm.MULTI_ALGO_SPEECH_RECOGNITION, engine_configs)
engine = IntentRecognitionEngine(IntentRecognitionAlgorithm.SNIPS_NLU, engine_configs)
recognized_intent = engine.recognize_intent(recognized_text)
logger.log_info("Recognized intent: ", recognized_intent.intent)
return recognized_intent.intent
def init_intent_rec_engine():
engine_configs.add_configuration(AtomicEngineConfigurations.INTENT_RECOGNITION_OPEN_AI_TEMPERATURE, "0.7")
engine_configs.add_configuration(AtomicEngineConfigurations.INTENT_RECOGNITION_OPEN_AI_MODEL, "gpt-3.5-turbo-0613")
engine_configs.add_configuration(AtomicEngineConfigurations.INTENT_RECOGNITION_OPEN_AI_LOGGER_LOCATION,
"C:\\Users\\Public\\projects\\drone-buddy-library\\dronebuddylib\\atoms\\intentrecognition\\resources\\stats\\")
engine_configs.add_configuration(AtomicEngineConfigurations.INTENT_RECOGNITION_OPEN_AI_API_KEY,
"sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
engine_configs.add_configuration(AtomicEngineConfigurations.INTENT_RECOGNITION_OPEN_AI_API_URL,
"https://api.openai.com/v1/chat/completions")
engine = IntentRecognitionEngine(IntentRecognitionAlgorithm.CHAT_GPT, engine_configs)
return engine
def init_object_rec_engine():
engine_configs.add_configuration(AtomicEngineConfigurations.OBJECT_DETECTION_YOLO_VERSION, "yolov8n.pt")
engine = ObjectDetectionEngine(VisionAlgorithm.YOLO, engine_configs)
return engine
def init_voice_generation_engine():
engine = SpeechGenerationEngine(SpeechGenerationAlgorithm.GOOGLE_TTS_OFFLINE.name, engine_configs)
return engine
def init_face_rec_engine():
engine = FaceRecognitionEngine(FaceRecognitionAlgorithm.FACE_RECC, engine_configs)
return engine
def recognize_intent_gpt(engine, recognized_text):
try:
recognized_intent = engine.recognize_intent(recognized_text)
logger.log_info("Recognized intent: ", recognized_intent.intent)
return recognized_intent.intent
except:
logger.log_error("Recognized intent: ", 'NONE')
return "NONE"
def execute_drone_functions(intent: str, drone_instance, face_engine, object_engine, text_engine, voice_engine):
global is_drone_in_air
if intent == DroneCommands.TAKE_OFF.name:
is_drone_in_air = True
take_off(drone_instance)
return "I'm taking off"
elif intent == DroneCommands.LAND.name:
is_drone_in_air = False
land(drone_instance)
return "I'm landing"
elif intent == DroneCommands.ROTATE_CLOCKWISE.name:
rotate_clockwise(drone_instance)
return "I'm rotating clockwise"
elif intent == DroneCommands.ROTATE_COUNTER_CLOCKWISE.name:
rotate_counter_clockwise(drone_instance)
return "I'm rotating counter clockwise"
elif intent == DroneCommands.FORWARD.name:
move_forward(drone_instance)
return "I'm moving forward"
elif intent == DroneCommands.BACKWARD.name:
move_backward(drone_instance)
return "I'm moving backward"
elif intent == DroneCommands.LEFT.name:
move_left(drone_instance)
return "I'm moving to the left"
elif intent == DroneCommands.RIGHT.name:
move_right(drone_instance)
return "I'm moving to the right"
elif intent == DroneCommands.UP.name:
move_up(drone_instance)
return "I'm moving up"
elif intent == DroneCommands.DOWN.name:
move_down(drone_instance)
return "I'm moving down"
elif intent == DroneCommands.FLIP.name:
flip_forward(drone_instance) # Assuming flip_forward is the desired flip command
return "I'm flipping"
elif intent == DroneCommands.RECOGNIZE_TEXT.name:
text = recognize_text(text_engine, drone_instance)
return "I read the text as " + text
elif intent == DroneCommands.RECOGNIZE_PEOPLE.name:
return recognize_people(face_engine, drone_instance)
# return "I'm trying to recognize people"
elif intent == DroneCommands.RECOGNIZE_OBJECTS.name:
detected = recognize_objects(object_engine, drone_instance)
return detected
elif intent == DroneCommands.STOP.name:
land(drone_instance)
is_drone_in_air = False
return "I'm stopping"
def init_drone():
drone_instance = Tello()
drone_instance.connect()
drone_instance.streamon()
return drone_instance
def take_off(drone_instance):
logger.log_info("Executing functions: ", "Drone is taking off")
if drone_instance is not None:
drone_instance.takeoff()
def land(drone_instance):
logger.log_info("Executing functions: ", "Drone is landing")
if drone_instance is not None:
drone_instance.land()
def rotate_clockwise(drone_instance):
logger.log_info("Executing functions: ", "Drone is rotating clockwise")
if drone_instance is not None:
drone_instance.rotate_clockwise(90)
def rotate_counter_clockwise(drone_instance):
logger.log_info("Executing functions: ", "Drone is rotating counter clockwise")
if drone_instance is not None:
drone_instance.rotate_counter_clockwise(90)
def move_forward(drone_instance):
logger.log_info("Executing functions: ", "Drone is moving forward")
if drone_instance is not None:
drone_instance.move_forward(30)
def move_backward(drone_instance):
logger.log_info("Executing functions: ", "Drone is moving backward")
if drone_instance is not None:
drone_instance.move_backward(30)
def move_left(drone_instance):
logger.log_info("Executing functions: ", "Drone is moving left")
if drone_instance is not None:
drone_instance.move_left(30)
def move_right(drone_instance):
logger.log_info("Executing functions: ", "Drone is moving right")
if drone_instance is not None:
drone_instance.move_right(30)
def move_up(drone_instance):
logger.log_info("Executing functions: ", "Drone is moving up")
if drone_instance is not None:
drone_instance.move_up(30)
def move_down(drone_instance):
logger.log_info("Executing functions: ", "Drone is moving down")
if drone_instance is not None:
drone_instance.move_down(30)
def flip_forward(drone_instance):
logger.log_info("Executing functions: ", "Drone is flipping forward")
if drone_instance is not None:
drone_instance.flip_forward()
def flip_backward(drone_instance):
logger.log_info("Executing functions: ", "Drone is flipping backward")
if drone_instance is not None:
drone_instance.flip_backward()
def flip_left(drone_instance):
logger.log_info("Executing functions: ", "Drone is flipping left")
if drone_instance is not None:
drone_instance.flip_left()
def recognize_people(engine, drone_instance):
logger.log_info("Executing functions: ", "Drone is recognizing people")
current_resized_image = get_image_with_cv2(drone_instance)
result = engine.recognize_face(current_resized_image)
return describe_face_rec_results(result)
def get_image_with_cv2(drone_instance):
current_frame = drone_instance.get_frame_read().frame
current_resized_image = cv2.resize(current_frame, (500, 500))
return current_resized_image
def recognize_objects(engine, drone_instance):
logger.log_info("Executing functions: ", "Drone is recognizing objects")
current_resized_image = get_image_with_cv2(drone_instance)
detected_objects = engine.get_detected_objects(current_resized_image)
return describe_object_rec_results(detected_objects.object_names)
def format_list(object_list: list):
# creates a string from the object list, if there are duplicates count the number of duplicates and add it to the string.
# for example if there are duplicates of chair in te list, add to the string 2 chairs
# add a comma after each object and before the last item and a 'and' before the last item
formatted_list = ""
for object in object_list:
if object_list.count(object) > 1:
formatted_list += str(object_list.count(object)) + " " + object + ", "
else:
formatted_list += object + ", "
formatted_list = formatted_list[:-2]
formatted_list = formatted_list[::-1].replace(",", "and ", 1)[::-1]
return formatted_list
def describe_face_rec_results(labels):
global init_position
# remove duplicates from the labels
labels = list(dict.fromkeys(labels))
read_aloud_text = "I see "
if len(labels) == 0:
read_aloud_text = "I don't see anyone I recognize"
elif len(labels) == 1:
read_aloud_text = " I see " + get_describing_phrase(labels[0])
elif len(labels) >= 2:
read_aloud_text = " I see "
for i in range(0, len(labels) - 2):
read_aloud_text += get_describing_phrase(labels[i]) + " , "
read_aloud_text = read_aloud_text + " and " + get_describing_phrase((labels[len(labels) - 1]))
return read_aloud_text
def get_describing_phrase(name):
if name.lower() == 'unknown':
return "someone I don't recognize"
else:
return name
def describe_object_rec_results(labels):
# remove duplicates from the labels
labels = list(dict.fromkeys(labels))
read_aloud_text = "I see "
if len(labels) == 0:
read_aloud_text = "I don't see anything in the front, you are safe to move forward"
elif len(labels) == 1:
read_aloud_text = " I see a " + labels[0]
elif len(labels) >= 2:
read_aloud_text = " I see a "
for i in range(0, len(labels) - 1):
read_aloud_text += labels[i] + " , "
read_aloud_text = read_aloud_text + " and a " + labels[len(labels) - 1]
else:
read_aloud_text = "I don't see anything in the front, you are safe to move forward"
logger.log_success("Recognized objects: ", read_aloud_text)
return read_aloud_text
def init_text_rec_engine():
engine = TextRecognitionEngine(TextRecognitionAlgorithm.GOOGLE_VISION, engine_configs)
return engine
def recognize_text(engine, drone_instance):
logger.log_info("Executing functions: ", "Drone is recognizing text")
image_path = save_frame(drone_instance, "text_rec_images")
result = engine.recognize_text(image_path)
return result.text
def save_frame(drone_instance, type):
# Assuming 'frame' is your frame from the drone
frame = drone_instance.get_frame_read().frame
# Specify the path where you want to save the image
output_path = r"C:\Users\Public\projects\drone-buddy-launcher\resources\\" + type + "\\" + str(
datetime.datetime.now().timestamp()) + ".jpg"
# Save the frame as a JPEG image
cv2.imwrite(output_path, frame)
return output_path
def keep_drone_in_air(drone_instance):
moving_dir = -1
voice = init_voice_generation_engine()
global is_drone_in_air
while True:
logger.log_info("Executing functions: ", "Drone is in the air")
if drone_instance is not None and is_drone_in_air:
print("battery: ", drone_instance.get_battery())
if drone_instance.get_battery() < 20:
land(drone_instance)
voice.read_phrase("I'm running out of battery, I'm landing")
if drone_instance.get_temperature() > 90:
land(drone_instance)
voice.read_phrase("I'm getting overheated, I'm landing")
drone_instance.send_rc_control(0, 0, moving_dir, 0)
moving_dir = moving_dir * -1
# break
time.sleep(4) # Sleep to simulate work and prevent a tight loop
def find_person(face_engine, drone_instance):
logger.log_info("Executing functions: ", "Drone is recognizing people")
current_resized_image = get_image_with_cv2(drone_instance)
result = face_engine.recognize_face(current_resized_image)
return result
def on_voice_recognized(recognized_text, voice_engine):
logger.log_info("Recognized text:", recognized_text)
# Call the voice generation function with the recognized text
generate_voice_response(voice_engine, recognized_text)
if __name__ == '__main__':
# drone_instance = None
drone_instance = init_drone()
# Create threads
thread1 = threading.Thread(target=open_mic_operations, args=(drone_instance, on_voice_recognized,))
thread2 = threading.Thread(target=keep_drone_in_air, args=(drone_instance,))
# Start threads
thread1.start()
thread2.start()
thread1.join()
thread2.join()