Commit f1e20c56 authored by CactusSoyeux's avatar CactusSoyeux
Browse files

Node rename, add of all joystick publishers, creation of services

parent 229376e6
#!/home/anafi/code/parrot-olympe/out/olympe-linux/pyenv_root/versions/3.9.5/bin/python3
#Olympe imports
import argparse
import queue
import threading
import cv2
import olympe
import os
import numpy as np
import sys
import time
from olympe.video.pdraw import Pdraw, PdrawState
#from olympe.video.renderer import PdrawRenderer
from olympe.messages.ardrone3.PilotingState import PositionChanged, GpsLocationChanged
from olympe.messages.ardrone3.GPSSettingsState import GPSFixStateChanged,HomeChanged
#from olympe.messages.ardrone3.GPSState import NumberOfSatelliteChanged
#ROS imports
import rospy
from cv_bridge import CvBridge
#Pub types
from sensor_msgs.msg import Image, NavSatFix
from sensor_msgs.msg import CompressedImage
from std_msgs.msg import String, Int8, Float32, Bool
from geometry_msgs.msg import Quaternion, Vector3
#Drone OS param
DRONE_IP = os.environ.get("DRONE_IP", "192.168.42.1")
DRONE_RTSP_PORT = os.environ.get("DRONE_RTSP_PORT", "554")
DRONE_STREAM = f"rtsp://{DRONE_IP}:{DRONE_RTSP_PORT}/live"
#Can be deleted
MEDIA_PATH = '/home/anafi/catkin_ws/src/anafi_base/script/media'
IMG_PATH = os.path.join(MEDIA_PATH , 'output.jpg')
olympe.log.update_config({"loggers": {"olympe": {"level": "WARNING"}}})
class StreamingMeta:
def __init__(self):
# Create the olympe.Drone object from its IP address
self.drone = olympe.Drone(DRONE_IP)
self.drone.connect()
self.frame_queue = queue.Queue()
self.flush_queue_lock = threading.Lock()
#Initialise Publishers
self.anafi_img = None #cv2.imread(IMG_PATH)
self.bat_perc = None
self.gnd_dist = None
self.air_speed = None
self.quat = Quaternion(0 , 1, 0, 0)
self.speed = Vector3(0,0,0)
self.state = None
self.sat = None
self.gps = NavSatFix()
#Pdraw
self.pdraw = Pdraw()
self.pdraw.set_callbacks(
raw_cb = self.yuv_frame_cb,
flush_raw_cb = self.flush_cb
)
#ROS Node init
rospy.init_node('anafi_stream', anonymous=True)
self.br = CvBridge()
self.loop_rate = rospy.Rate(10)
#ROS Publishers init
self.img_pub = rospy.Publisher('camera/image', Image, queue_size=2)
self.bat_pub = rospy.Publisher('meta/battery', Int8, queue_size=2)
self.gnd_dist_pub = rospy.Publisher('meta/gnd_dist', Float32, queue_size=2)
self.air_speed_pub = rospy.Publisher('meta/air_speed', Float32, queue_size=2)
self.quat_pub = rospy.Publisher('meta/quat', Quaternion, queue_size=2)
self.speed_pub = rospy.Publisher('meta/speed', Vector3, queue_size=2)
self.state_pub = rospy.Publisher('meta/state', String, queue_size=2)
self.sat_pub = rospy.Publisher('meta/sat', Bool, queue_size=2)
self.gps_pub = rospy.Publisher('meta/gps', NavSatFix, queue_size=2)
def yuv_frame_cb(self, yuv_frame):
#Stream MetaData
meta = yuv_frame.vmeta()
if meta[1] is not None:
self.bat_perc = meta[1]["battery_percentage"]
self.gnd_dist = meta[1]["ground_distance"]
self.air_speed = meta[1]["air_speed"]
self.quat = Quaternion(meta[1]["drone_quat"]["x"], meta[1]["drone_quat"]["y"], meta[1]["drone_quat"]["z"], meta[1]["drone_quat"]["w"])
self.speed = Vector3(meta[1]["speed"]["north"], meta[1]["speed"]["east"], meta[1]["speed"]["down"])
self.state = meta[1]["state"]
if "location" in meta[1]:
self.sat = True
self.gps.latitude = meta[1]["location"]["latitude"]
self.gps.longitude = meta[1]["location"]["longitude"]
self.gps.altitude = meta[1]["location"]["altitude"]
self.gps.position_covariance_type = 0
if "location" not in meta[1]:
self.sat = False
self.gps.latitude = 500
self.gps.longitude = 500
self.gps.altitude = 500
self.gps.position_covariance_type = 0
#print(meta[1])
#Convert image from YUV to CV2
cv2_cvt_color_flag = {
olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
}[yuv_frame.format()]
self.anafi_img = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag)
#Add frame to queue
self.frame_queue.put_nowait(yuv_frame)
def flush_cb(self, stream):
if stream["vdef_format"] != olympe.VDEF_I420:
return True
with self.flush_queue_lock:
while not self.frame_queue.empty():
self.frame_queue.get_nowait().unref()
return True
def start(self):
#Olympe
self.drone = olympe.Drone(DRONE_IP)
print("Drone created")
self.drone.connect()
print("Drone connected")
self.drone(GPSFixStateChanged(_policy = 'wait'))
print("Drone GPSFixStateChanged")
#Pdraw
self.pdraw.play(url=DRONE_STREAM)
self.timeout = 2
assert self.pdraw.wait(PdrawState.Playing, timeout=self.timeout)
#ROS
rospy.loginfo("ROS Start")
while not rospy.is_shutdown():
#ROS Publishers
if self.anafi_img is not None:
self.img_pub.publish(self.br.cv2_to_imgmsg(self.anafi_img, "rgb8"))
#Not None verified in the frame call back
self.bat_pub.publish(self.bat_perc)
self.gnd_dist_pub.publish(self.gnd_dist)
self.air_speed_pub.publish(self.air_speed)
self.quat_pub.publish(self.quat)
self.speed_pub.publish(self.speed)
self.state_pub.publish(self.state)
self.sat_pub.publish(self.sat)
self.gps_pub.publish(self.gps)
self.loop_rate.sleep()
#Olympe
#print("GPS Before Take Off: ", self.drone.get_state(HomeChanged))
#print("Number of GPS Satellite: ", self.drone.get_state(NumberOfSatelliteChanged)["numberOfSatellite"])
def stop(self):
assert self.pdraw.wait(PdrawState.Closed, timeout=self.timeout)
#self.renderer.stop()
self.pdraw.close()
self.pdraw.dispose()
#self.drone.disconnect()
def streaming():
streaming_anafi = StreamingMeta()
streaming_anafi.start()
streaming_anafi.stop()
if __name__ == "__main__":
streaming()
\ No newline at end of file
string str
---
string str
\ No newline at end of file
#!/home/anafi/code/parrot-olympe/out/olympe-linux/pyenv_root/versions/3.9.5/bin/python3
from __future__ import print_function
#Olympe and basics imports
import olympe
import os
import argparse
import queue
import threading
import cv2
import numpy as np
import sys
import time
#Olympe Flight imports
from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing
from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
#ROS imports
import rospy
from cv_bridge import CvBridge
#Pub types
from sensor_msgs.msg import Joy
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32
#Services types
from anafi_control.srv import *
#Drone OS param
#DRONE_IP = os.environ.get("DRONE_IP", "192.168.42.1")
olympe.log.update_config({"loggers": {"olympe": {"level": "WARNING"}}})
class JoyCommand:
def __init__(self):
# Create the olympe.Drone object from its IP address
#self.drone = olympe.Drone(DRONE_IP)
#self.drone.connect()
#Initialise Publishers
self.vel = Twist()
self.cam = 0
self.sensivity = 1
#ROS Node init
rospy.init_node('anafi_joy_control', anonymous=True)
self.br = CvBridge()
self.loop_rate = rospy.Rate(10)
#ROS Publishers init
self.vel_pub = rospy.Publisher('cmd_vel', Twist, queue_size=2)
self.cam_pub = rospy.Publisher('cmd_cam', Float32, queue_size=2)
rospy.Subscriber("/joy", Joy, self.joyCallback)
def takeOffLandService(self, str):
rospy.wait_for_service("take_off_land")
try:
take_off_land = rospy.ServiceProxy("take_off_land", TakeOffLand)
resp = take_off_land(str)
return resp.str
except rospy.ServiceException as e:
print("Serice call failed: %s" %e)
def joyCallback(self, joy):
#Update Twist values if one of the DMS (Dead Man Switch) is triggered
if joy.buttons[4] or joy.buttons[5]:
self.vel.linear.x = joy.axes[4] * self.sensivity
self.vel.linear.y = joy.axes[6] * self.sensivity
self.vel.linear.z = ((joy.axes[2]+1)/2 - (joy.axes[5]+1)/2) * self.sensivity
self.vel.angular.z = joy.axes[3] * self.sensivity
#Modify command sensivity
if joy.axes[0] == -1 and self.sensivity < 2:
self.sensivity += 0.1
if joy.axes[0] == 1 and self.sensivity > 1:
self.sensivity -= 0.1
#Modify camera angle
if joy.axes[1] == 1 and self.cam < 0:
self.cam += 15
if joy.axes[1] == -1 and self.cam > -90:
self.cam -= 15
#Take off and Land services
if joy.buttons[7] and not joy.buttons[6]:
#Send Take off service
print(self.takeOffLandService("take_off"))
elif joy.buttons[6] and not joy.buttons[7]:
#Send Land service
print(self.takeOffLandService("land"))
else:
self.vel = Twist()
#Publishers
if self.vel is not None:
self.vel_pub.publish(self.vel)
if self.cam is not None:
self.cam_pub.publish(self.cam)
def start(self):
#Olympe
#self.drone = olympe.Drone(DRONE_IP)
#print("Drone created")
#self.drone.connect()
#print("Drone connected")
#ROS
rospy.loginfo("ROS Start")
while not rospy.is_shutdown():
self.loop_rate.sleep()
def stop(self):
#self.drone.disconnect()
pass
if __name__ == "__main__":
joy_cmd = JoyCommand()
joy_cmd.start()
joy_cmd.stop()
\ No newline at end of file
string str
---
string str
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment