Commit 229376e6 authored by CactusSoyeux's avatar CactusSoyeux
Browse files

ajout GPS anafi_base / ajout code test anafi-control

parent a6e72ecc
#!/home/anafi/code/parrot-olympe/out/olympe-linux/pyenv_root/versions/3.9.5/bin/python3
import olympe
import os
import time
#Random
from olympe.messages.ardrone3.PilotingSettingsState import MaxTiltChanged
#Piloting
from olympe.messages.ardrone3.Piloting import TakeOff, Landing, moveBy
#GPS
from olympe.messages.ardrone3.PilotingState import PositionChanged
from olympe.messages.ardrone3.GPSSettingsState import HomeChanged, GPSFixStateChanged
from olympe.messages.controller_info import gps_v2
DRONE_IP = os.environ.get("ANAFI_IP", "192.168.42.1")
olympe.log.update_config({"loggers": {"olympe": {"level": "WARNING"}}})
"""
def get_maxtiltget(drone_anafi):
print("Drone MaxTilt = ", drone_anafi.get_state(MaxTiltChanged)["current"])
"""
def get_gps(drone_anafi):
drone_anafi(GPSFixStateChanged(_policy = 'wait'))
#Print GPS Home location
#print("Home GPS: ", drone_anafi.get_state(HomeChanged))
assert drone_anafi(gps_v2()).wait().success()
time.sleep(1)
#Print GPS Current location
"""
for i in range(4):
print("Current GPS: ", drone_anafi.get_state(PositionChanged))
time.sleep(1)
print("GPS found at: ", drone_anafi.get_state(PositionChanged))
"""
if __name__ == "__main__":
#Create a new drone object
drone = olympe.Drone(DRONE_IP)
#Connect the control station with the drone
drone.connect()
#Call functions
get_gps(drone)
#Disonnect the control station with the drone
drone.disconnect()
\ No newline at end of file
#!/home/anafi/code/parrot-olympe/out/olympe-linux/pyenv_root/versions/3.9.5/bin/python3
#Olympe imports
import argparse
import queue
import threading
import cv2
import olympe
import os
import numpy as np
import sys
import time
from olympe.video.pdraw import Pdraw, PdrawState
from olympe.video.renderer import PdrawRenderer
#ROS imports
import rospy
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from sensor_msgs.msg import CompressedImage
DRONE_IP = os.environ.get("DRONE_IP", "192.168.42.1")
DRONE_RTSP_PORT = os.environ.get("DRONE_RTSP_PORT", "554")
DRONE_STREAM = f"rtsp://{DRONE_IP}:{DRONE_RTSP_PORT}/live"
MEDIA_PATH = '/home/cyver/catkin_ws/src/anafi_base/script/media'
IMG_PATH = os.path.join(MEDIA_PATH , 'output.jpg')
anafi_img = cv2.imread(IMG_PATH)
olympe.log.update_config({"loggers": {"olympe": {"level": "WARNING"}}})
class StreamingExample:
def __init__(self):
# Create the olympe.Drone object from its IP address
self.drone = olympe.Drone(DRONE_IP)
self.frame_queue = queue.Queue()
self.flush_queue_lock = threading.Lock()
self.renderer = None
#Pdraw
self.pdraw = Pdraw()
self.pdraw.set_callbacks(
raw_cb = self.yuv_frame_cb,
flush_raw_cb = self.flush_cb
)
def yuv_frame_cb(self, yuv_frame):
"""
This function will be called by Olympe for each decoded YUV frame.
:type yuv_frame: olympe.VideoFrame
"""
# the VideoFrame.info() dictionary contains some useful information
# such as the video resolution
info = yuv_frame.info()
height, width = ( # noqa
info["raw"]["frame"]["info"]["height"],
info["raw"]["frame"]["info"]["width"],
)
# yuv_frame.vmeta() returns a dictionary that contains additional
# metadata from the drone (GPS coordinates, battery percentage, ...)
# convert pdraw YUV flag to OpenCV YUV flag
cv2_cvt_color_flag = {
olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
}[yuv_frame.format()]
# yuv_frame.as_ndarray() is a 2D numpy array with the proper "shape"
# i.e (3 * height / 2, width) because it's a YUV I420 or NV12 frame
# Use OpenCV to convert the yuv frame to RGB
anafi_img = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag) # noqa
#cv2.imwrite(os.path.join(MEDIA_PATH , 'output.jpg'), anafi_img)
#print("Writing Image")
#print("Image received")
#yuv_frame.ref()
self.frame_queue.put_nowait(yuv_frame)
#print('Image received')
def flush_cb(self, stream):
if stream["vdef_format"] != olympe.VDEF_I420:
return True
with self.flush_queue_lock:
while not self.frame_queue.empty():
self.frame_queue.get_nowait().unref()
return True
def start(self):
#ROS
#pub_img = rospy.Publisher("/output/image_raw/compressed", CompressedImage)
#rospy.init_node('anafi_vid', anonymous=True)
#rate = rospy.Rate(10) # 10hz
#bridge = CvBridge()
#Olympe
#drone = olympe.Drone(DRONE_IP)
#print("Drone created")
#drone.connect()
#print("Drone connected")
self.pdraw.play(url=DRONE_STREAM)
self.renderer = PdrawRenderer(pdraw=self.pdraw)
assert self.pdraw.wait(PdrawState.Playing, timeout=5)
i = 0
while i < 100:
time.sleep(1)
print(i, " seconde")
i = i+1
self.timeout = 5
#Do things while ROS is UP
'''
while not rospy.is_shutdown():
#rospy.loginfo('Publishing image')
#anafi_img = cv2.imread(anafi_img,mode='RGB')
#msg = CompressedImage()
#msg.header.stamp = rospy.Time.now()
#msg.format = "jpeg"
#msg.data = np.array(cv2.imencode('.jpg', anafi_img)[1]).tostring()
#pub_img.publish(msg)
rate.sleep()
'''
def stop(self):
assert self.pdraw.wait(PdrawState.Closed, timeout=self.timeout)
self.renderer.stop()
self.pdraw.close()
self.pdraw.dispose()
def streaming():
streaming_anafi = StreamingExample()
streaming_anafi.start()
streaming_anafi.stop()
if __name__ == "__main__":
streaming()
\ No newline at end of file
#!/home/anafi/code/parrot-olympe/out/olympe-linux/pyenv_root/versions/3.9.5/bin/python3
#Olympe imports
import cv2
import os
#ROS imports
import rospy
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
path = '/home/cyver/catkin_ws/src/anafi_base/script/media'
img_path = os.path.join(path ,'output.jpg')
anafi_img = cv2.imread(img_path)
class Anafi_Data(object):
def __init__(self):
# Params
#self.image = None
self.br = CvBridge()
# Node cycle rate (in Hz).
self.loop_rate = rospy.Rate(1)
# Publishers
self.pub = rospy.Publisher('camera/image', Image,queue_size=10)
self.image = anafi_img
def start(self):
rospy.loginfo("Timing images")
#rospy.spin()
while not rospy.is_shutdown():
rospy.loginfo('publishing image')
#br = CvBridge()
if self.image is not None:
self.pub.publish(self.br.cv2_to_imgmsg(self.image, "rgb8"))
self.loop_rate.sleep()
if __name__ == '__main__':
rospy.init_node('anafi_vid', anonymous=True)
my_node = Anafi_Data()
my_node.start()
\ No newline at end of file
#!/home/anafi/code/parrot-olympe/out/olympe-linux/pyenv_root/versions/3.9.5/bin/python3
import argparse
import cv2
import olympe
import os
import sys
import time
from olympe.video.pdraw import Pdraw, PdrawState
from olympe.video.renderer import PdrawRenderer
DRONE_IP = os.environ.get("DRONE_IP", "192.168.42.1")
#DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
DRONE_RTSP_PORT = os.environ.get("DRONE_RTSP_PORT", "554")
def yuv_frame_cb(yuv_frame):
"""
This function will be called by Olympe for each decoded YUV frame.
:type yuv_frame: olympe.VideoFrame
"""
# the VideoFrame.info() dictionary contains some useful information
# such as the video resolution
info = yuv_frame.info()
height, width = ( # noqa
info["raw"]["frame"]["info"]["height"],
info["raw"]["frame"]["info"]["width"],
)
# yuv_frame.vmeta() returns a dictionary that contains additional
# metadata from the drone (GPS coordinates, battery percentage, ...)
# convert pdraw YUV flag to OpenCV YUV flag
cv2_cvt_color_flag = {
olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
}[yuv_frame.format()]
# yuv_frame.as_ndarray() is a 2D numpy array with the proper "shape"
# i.e (3 * height / 2, width) because it's a YUV I420 or NV12 frame
# Use OpenCV to convert the yuv frame to RGB
cv2frame = cv2.cvtColor(yuv_frame.as_ndarray(), cv2_cvt_color_flag) # noqa
def main():
pdraw = Pdraw()
pdraw.set_callbacks(raw_cb=yuv_frame_cb)
pdraw.play(url=f"rtsp://{DRONE_IP}:{DRONE_RTSP_PORT}/live")
renderer = PdrawRenderer(pdraw=pdraw)
assert pdraw.wait(PdrawState.Playing, timeout=5)
i = 0
while i < 100:
time.sleep(1)
print(i, " seconde")
i = i+1
pdraw.close()
timeout = 5
assert pdraw.wait(PdrawState.Closed, timeout=timeout)
renderer.stop()
pdraw.dispose()
'''
def flush_cb(stream):
if stream["vdef_format"] != olympe.VDEF_I420:
return True
with flush_queue_lock:
while not frame_queue.empty():
frame_queue.get_nowait().unref()
return True
'''
def test_pdraw():
main([])
if __name__ == "__main__":
#main(sys.argv[1:])
main()
\ No newline at end of file
#!/home/anafi/code/parrot-olympe/out/olympe-linux/pyenv_root/versions/3.9.5/bin/python3
import csv
import cv2
import math
import os
import queue
import shlex
import subprocess
import tempfile
import threading
import time
import olympe
from olympe.messages.ardrone3.Piloting import TakeOff, Landing
from olympe.messages.ardrone3.Piloting import moveBy
from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
from olympe.messages.ardrone3.PilotingSettings import MaxTilt
from olympe.messages.ardrone3.GPSSettingsState import GPSFixStateChanged
from olympe.video.pdraw import Pdraw, PdrawState
from olympe.video.renderer import PdrawRenderer
olympe.log.update_config({"loggers": {"olympe": {"level": "WARNING"}}})
DRONE_IP = os.environ.get("DRONE_IP", "192.168.42.1")
#DRONE_IP = os.environ.get("DRONE_IP", "10.202.0.1")
DRONE_RTSP_PORT = os.environ.get("DRONE_RTSP_PORT", "554")
class Streaming:
def __init__(self):
# Create the olympe.Drone object from its IP address
self.drone = olympe.Drone(DRONE_IP)
self.frame_queue = queue.Queue()
self.flush_queue_lock = threading.Lock()
self.renderer = None
def start(self):
# Connect the the drone
self.drone.connect()
if DRONE_RTSP_PORT is not None:
self.drone.streaming.server_addr = f"{DRONE_IP}:{DRONE_RTSP_PORT}"
# You can record the video stream from the drone if you plan to do some
# post processing.
'''
self.drone.streaming.set_output_files(
video=os.path.join(self.tempd, "streaming.mp4"),
metadata=os.path.join(self.tempd, "streaming_metadata.json"),
)
'''
# Setup your callback functions to do some live video processing
self.drone.streaming.set_callbacks(
raw_cb=self.yuv_frame_cb,
#h264_cb=self.h264_frame_cb,
start_cb=self.start_cb,
end_cb=self.end_cb,
flush_raw_cb=self.flush_cb,
)
# Start video streaming
self.drone.streaming.start()
self.renderer = PdrawRenderer(pdraw=self.drone.streaming)
def stop(self):
if self.renderer is not None:
self.renderer.stop()
# Properly stop the video stream and disconnect
self.drone.streaming.stop()
self.drone.disconnect()
#self.h264_stats_file.close()
def yuv_frame_cb(self, yuv_frame):
"""
This function will be called by Olympe for each decoded YUV frame.
:type yuv_frame: olympe.VideoFrame
"""
print('Image reveived')
yuv_frame.ref()
self.frame_queue.put_nowait(yuv_frame)
def flush_cb(self, stream):
if stream["vdef_format"] != olympe.VDEF_I420:
return True
with self.flush_queue_lock:
while not self.frame_queue.empty():
self.frame_queue.get_nowait().unref()
return True
def start_cb(self):
pass
def end_cb(self):
pass
def show_yuv_frame(self, window_name, yuv_frame):
# the VideoFrame.info() dictionary contains some useful information
# such as the video resolution
info = yuv_frame.info()
height, width = ( # noqa
info["raw"]["frame"]["info"]["height"],
info["raw"]["frame"]["info"]["width"],
)
# yuv_frame.vmeta() returns a dictionary that contains additional
# metadata from the drone (GPS coordinates, battery percentage, ...)
# convert pdraw YUV flag to OpenCV YUV flag
cv2_cvt_color_flag = {
olympe.VDEF_I420: cv2.COLOR_YUV2BGR_I420,
olympe.VDEF_NV12: cv2.COLOR_YUV2BGR_NV12,
}[yuv_frame.format()]
def streaming():
streaming_anafi = Streaming()
streaming_anafi.start()
while True:
time.sleep(0.1)
if __name__ == "__main__":
streaming()
<launch>
<node name="anafi_control" pkg="anafi_control" type="anafi_ctrl.py" output="screen"/>
</launch>
\ No newline at end of file
#!/home/anafi/code/parrot-olympe/out/olympe-linux/pyenv_root/versions/3.9.5/bin/python3
import olympe
import os
from olympe.messages.ardrone3.Piloting import TakeOff, moveBy, Landing
from olympe.messages.ardrone3.PilotingState import FlyingStateChanged
DRONE_IP = os.environ.get("DRONE_IP", "192.168.42.1")
def test_moveby():
drone = olympe.Drone(DRONE_IP)
drone.connect()
assert drone(
TakeOff()
>> FlyingStateChanged(state="hovering", _timeout=5)
).wait().success()
assert drone(
moveBy(1, 0, 0, 0)
>> FlyingStateChanged(state="hovering", _timeout=5)
).wait().success()
assert drone(Landing()).wait().success()
drone.disconnect()
if __name__ == "__main__":
test_moveby()
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment