Pas op!
De documentatie, in het specifiek de Camera Streamer app, van Farm-ng over het gebruik van de camera is momenteel incorrect. Hier is contact over opgenomen. Gebruik voor alsnog deze pagina i.p.v. de Farm-ng code voorbeelden!
Voordat je begint met het uitlezen van de camera’s op de robot raad ik je aan om de volgende twee pagina’s te lezen:
Hierna heb jij genoeg kennis van de camera’s om er mee te kunnen werken in je eigen project.
Code
Je kan natuurlijk zelf bepalen hoe je met de data van de camera om wilt gaan. Ik heb zelf een implementatie geschreven die de 4 streams van 1 camera combineert. Voel je vrij om deze letterlijk over te nemen als dit past bij jouw doeleinde.
De Image klasse
Onze camera implementatie combineert alle 4 de streams in een klasse. Dat kan je opslaan in de vorm van een ndarray van NumPy. Zelf heb ik er Optionals van gemaakt omdat ik regelmatig afbeeldingen weg gooi. Voel je vrij om hier concrete ndarrays van te maken.
@dataclass
class Image:
rgb: Optional[np.ndarray]
left: Optional[np.ndarray]
right: Optional[np.ndarray]
disparity: Optional[np.ndarray]De Camera Handler
De ICameraHandler is de interface van de camera. Je werkt met interfaces zodat je ze kan vervangen met andere implementaties wanneer je lokaal werkt! Zo heb ik voor mezelf een Camera implementatie gemaakt die de opnames van de robot kan gebruiken als camera input.
class ICameraHandler(ABC):
"""Manages continuous subscriptions to the four Oak‑D camera streams."""
@abstractmethod
def start_streaming(self) -> None:
"""Spawn the background task that keeps the four subscriptions alive."""
pass
@abstractmethod
async def get_camera_data(self) -> Image:
"""Return a shallow copy of the latest image data."""
pass
@abstractmethod
def stop_streaming(self) -> None:
"""Stops all subscriptions """
passDe concrete implementatie voor de robot heb ik zo gemaakt dat je die met een methode op kan halen, overal in je applicatie. Voel je vrij om je eigen camera klasse te implementeren.
class CameraHandler(ICameraHandler):
"""
Manages continuous subscriptions to the four Oak‑D camera streams.
Allows easy access to the latest snapshot of all fours streams. :) """
STREAM_NAMES = ("rgb", "disparity", "left", "right")
def __init__(self, logger: ILogger, image_converter: IImageConverter):
self.logger = logger
self.image_converter = image_converter
self._image = Image(None, None, None, None) # The latest image. Only access under lock.
self._lock = asyncio.Lock()
self._oak_client: Optional[EventClient] = None # This is the oak0 client that will be created in the init client method :)
self._bg_task: Optional[asyncio.Task] = None # Keep a reference to the background task so we can cancel it later.
def start_streaming(self):
"""Spawn the background task that keeps the four subscriptions alive."""
if self._bg_task is None or self._bg_task.done():
self._bg_task = asyncio.create_task(self._run())
self.logger.info("Camera streaming task started")
async def get_camera_data(self) -> Image:
"""Return a shallow copy of the latest image data."""
async with self._lock:
return Image(
rgb=self._image.rgb,
disparity=self._image.disparity,
left=self._image.left,
right=self._image.right,
result=None,
)
def stop_streaming(self) -> None:
"""Cancel the background task."""
if self._bg_task:
self._bg_task.cancel()
for t in getattr(self, "_reader_tasks", []):
t.cancel()
self.logger.info("Camera streaming task cancelled")
async def _run(self) -> None:
await self._init_client()
if not self._oak_client:
return
self.generators = {
name: self._oak_client.subscribe(
SubscribeRequest(
uri={"path": f"{self._oak_client.config.name}/{name}"},
every_n=self._oak_client.config.subscriptions[0].every_n,
),
decode=False,
)
for name in self.STREAM_NAMES
}
# Start a persistent reader for each stream (rgb, disparity, left & right).
self._reader_tasks = [
asyncio.create_task(self._read_loop(name, gen))
for name, gen in self.generators.items()
]
# Wait until the whole handler is cancelled.
await asyncio.gather(*self._reader_tasks)
async def _read_loop(self, stream_name: str, gen):
async for event, payload in gen:
try:
msg = payload_to_protobuf(event, payload)
cv_img = self.image_converter.oak_data_to_opencv(msg)
# Set the np.ndarray data to the correct stream_name. :)
async with self._lock:
setattr(self._image, stream_name, cv_img)
self.logger.info(f"Received {stream_name} frame – shape {cv_img.shape}")
except Exception as exc:
self.logger.error(f"Failed to decode {stream_name}: {exc}")
async def _init_client(self) -> None:
"""Load the service config."""
service_config_path = Path() / "src" / "res" / "service_config_oak0.json"
if not service_config_path.exists():
raise FileNotFoundError("Service config not found!")
cfg_list = proto_from_json_file(service_config_path, EventServiceConfigList())
for cfg in cfg_list.configs:
# If found:
if cfg.name == "oak0":
self._oak_client = EventClient(cfg)
self.logger.info("oak0 client created")
return
# If not found:
self.logger.error("oak0 client not found in service config")Een camera task
Nu hebben we dus een klasse om de data uit te lezen. Top! Nu nog die data gebruiken!
Als je, zoals aangeraden (of gedwongen door mijn template) Lagom gebruikt voor Dependency Injection, kan je de ICameraHandler gewoon in de constructor van je nieuwe taak gooien.
Een minimaal voorbeeld uit mijn eigen applicatie, gebruikmakend van de template:
class CameraStreamTask(BaseTask):
def __init__(self, thread_handler: IThreadHandler, kivy_handler: IKivyHandler, camera_handler: ICameraHandler, image_converter: IImageConverter, image_handler: IImageHandler, logger: ILogger):
super().__init__(thread_handler)
self.kivy_handler = kivy_handler
self.camera_handler = camera_handler
self.image_converter = image_converter
self.image_handler = image_handler
self.logger = logger
def can_run(self) -> bool:
return True
async def act(self) -> None:
self.camera_handler.start_streaming()
await self.thread_handler.sleep(0.1) # Without this errors will occur! Keep this!
image = await self.camera_handler.get_camera_data() # This is the actual camera call.
image = self.image_handler.handle(image) # Then process it through opencv.
if image.rgb is not None:
self.update_texture(self.image_converter.opencv_to_kivy(image.rgb))
self.logger.info('Camera Feed Updated')
else:
self.logger.warning("Image RGB was None!")
await self.thread_handler.sleep(0.05)
def update_texture(self, texture):
"""Puts the image on your screen."""
try:
self.kivy_handler.get_kivy_app().root.ids['your_image_id'].texture = texture
self.logger.info('Camera Feed Updated')
except Exception as e:
self.logger.error(f"Error updating texture: {e}")
@property
def name(self) -> str:
return "Camera Stream"Service config
De service_config_oak0.json bevat het volgende:
{
"configs": [
{
"name": "oak0",
"port": 50010,
"host": "localhost",
"log_level": "INFO",
"subscriptions": [
{
"every_n": 1
}
]
}
]
}Mocht je de camera aan willen passen, kan je deze bij name veranderen naar oak1, oak2 of oak3. Verder is het misschien ook interessant om de every_n aan te passen. Dat is om hoeveel iteraties die de camera data stuurt naar onze applicatie.