Pas op!

De documentatie, in het specifiek de Camera Streamer app, van Farm-ng over het gebruik van de camera is momenteel incorrect. Hier is contact over opgenomen. Gebruik voor alsnog deze pagina i.p.v. de Farm-ng code voorbeelden!

Voordat je begint met het uitlezen van de camera’s op de robot raad ik je aan om de volgende twee pagina’s te lezen:

Hierna heb jij genoeg kennis van de camera’s om er mee te kunnen werken in je eigen project.

Code

Je kan natuurlijk zelf bepalen hoe je met de data van de camera om wilt gaan. Ik heb zelf een implementatie geschreven die de 4 streams van 1 camera combineert. Voel je vrij om deze letterlijk over te nemen als dit past bij jouw doeleinde.

De Image klasse

Onze camera implementatie combineert alle 4 de streams in een klasse. Dat kan je opslaan in de vorm van een ndarray van NumPy. Zelf heb ik er Optionals van gemaakt omdat ik regelmatig afbeeldingen weg gooi. Voel je vrij om hier concrete ndarrays van te maken.

@dataclass  
class Image:
    rgb: Optional[np.ndarray] 
    left: Optional[np.ndarray] 
    right: Optional[np.ndarray]
    disparity: Optional[np.ndarray]

De Camera Handler

De ICameraHandler is de interface van de camera. Je werkt met interfaces zodat je ze kan vervangen met andere implementaties wanneer je lokaal werkt! Zo heb ik voor mezelf een Camera implementatie gemaakt die de opnames van de robot kan gebruiken als camera input.

class ICameraHandler(ABC):  
    """Manages continuous subscriptions to the four Oak‑D camera streams."""  
  
    @abstractmethod  
    def start_streaming(self) -> None:  
        """Spawn the background task that keeps the four subscriptions alive.""" 
        pass  
  
    @abstractmethod  
    async def get_camera_data(self) -> Image:  
        """Return a shallow copy of the latest image data."""  
        pass  
  
    @abstractmethod  
    def stop_streaming(self) -> None:  
        """Stops all subscriptions """  
        pass

De concrete implementatie voor de robot heb ik zo gemaakt dat je die met een methode op kan halen, overal in je applicatie. Voel je vrij om je eigen camera klasse te implementeren.

class CameraHandler(ICameraHandler):  
    """  
    Manages continuous subscriptions to the four Oak‑D camera streams.  
    Allows easy access to the latest snapshot of all fours streams. :)    """  
    STREAM_NAMES = ("rgb", "disparity", "left", "right")  
  
    def __init__(self, logger: ILogger, image_converter: IImageConverter):  
        self.logger = logger  
        self.image_converter = image_converter  
  
        self._image = Image(None, None, None, None) # The latest image. Only access under lock.  
        self._lock = asyncio.Lock()  
        self._oak_client: Optional[EventClient] = None # This is the oak0 client that will be created in the init client method :)  
        self._bg_task: Optional[asyncio.Task] = None # Keep a reference to the background task so we can cancel it later.  
  
    def start_streaming(self):  
        """Spawn the background task that keeps the four subscriptions alive."""  
        if self._bg_task is None or self._bg_task.done():  
            self._bg_task = asyncio.create_task(self._run())  
            self.logger.info("Camera streaming task started")  
  
    async def get_camera_data(self) -> Image:  
        """Return a shallow copy of the latest image data."""  
        async with self._lock:  
            return Image(  
                rgb=self._image.rgb,  
                disparity=self._image.disparity,  
                left=self._image.left,  
                right=self._image.right,  
                result=None,  
            )  
  
    def stop_streaming(self) -> None:  
        """Cancel the background task."""  
        if self._bg_task:  
            self._bg_task.cancel()  
        for t in getattr(self, "_reader_tasks", []):  
            t.cancel()  
        self.logger.info("Camera streaming task cancelled")  
  
    async def _run(self) -> None:  
        await self._init_client()  
        if not self._oak_client:  
            return  
  
        self.generators = {  
            name: self._oak_client.subscribe(  
                SubscribeRequest(  
                    uri={"path": f"{self._oak_client.config.name}/{name}"},  
                    every_n=self._oak_client.config.subscriptions[0].every_n,  
                ),  
                decode=False,  
            )  
            for name in self.STREAM_NAMES  
        }  
  
        # Start a persistent reader for each stream (rgb, disparity, left & right).  
        self._reader_tasks = [  
            asyncio.create_task(self._read_loop(name, gen))  
            for name, gen in self.generators.items()  
        ]  
  
        # Wait until the whole handler is cancelled.  
        await asyncio.gather(*self._reader_tasks)  
  
    async def _read_loop(self, stream_name: str, gen):  
        async for event, payload in gen:  
            try:  
                msg = payload_to_protobuf(event, payload)  
                cv_img = self.image_converter.oak_data_to_opencv(msg)  
  
                # Set the np.ndarray data to the correct stream_name. :)  
                async with self._lock:  
                    setattr(self._image, stream_name, cv_img)  
  
                self.logger.info(f"Received {stream_name} frame – shape {cv_img.shape}")  
            except Exception as exc:  
                self.logger.error(f"Failed to decode {stream_name}: {exc}")  
  
    async def _init_client(self) -> None:  
        """Load the service config."""  
  
        service_config_path = Path() / "src" / "res" / "service_config_oak0.json"  
        if not service_config_path.exists():  
            raise FileNotFoundError("Service config not found!")  
  
        cfg_list = proto_from_json_file(service_config_path, EventServiceConfigList())  
        for cfg in cfg_list.configs:  
  
            # If found:  
            if cfg.name == "oak0":  
                self._oak_client = EventClient(cfg)  
                self.logger.info("oak0 client created")  
                return  
  
        # If not found:  
        self.logger.error("oak0 client not found in service config")

Een camera task

Nu hebben we dus een klasse om de data uit te lezen. Top! Nu nog die data gebruiken! Als je, zoals aangeraden (of gedwongen door mijn template) Lagom gebruikt voor Dependency Injection, kan je de ICameraHandler gewoon in de constructor van je nieuwe taak gooien.

Een minimaal voorbeeld uit mijn eigen applicatie, gebruikmakend van de template:

class CameraStreamTask(BaseTask):  
    def __init__(self, thread_handler: IThreadHandler, kivy_handler: IKivyHandler, camera_handler: ICameraHandler, image_converter: IImageConverter, image_handler: IImageHandler, logger: ILogger):  
        super().__init__(thread_handler)  
        self.kivy_handler = kivy_handler  
        self.camera_handler = camera_handler  
        self.image_converter = image_converter  
        self.image_handler = image_handler  
        self.logger = logger  
  
    def can_run(self) -> bool:  
        return True  
  
    async def act(self) -> None:  
        self.camera_handler.start_streaming()  
  
        await self.thread_handler.sleep(0.1)  # Without this errors will occur! Keep this!  
        image = await self.camera_handler.get_camera_data()  # This is the actual camera call.  
        image = self.image_handler.handle(image) # Then process it through opencv.  
  
        if image.rgb is not None:  
            self.update_texture(self.image_converter.opencv_to_kivy(image.rgb))  
            self.logger.info('Camera Feed Updated')  
        else:  
            self.logger.warning("Image RGB was None!")  
  
        await self.thread_handler.sleep(0.05)  
  
    def update_texture(self, texture):  
        """Puts the image on your screen."""  
        try:  
            self.kivy_handler.get_kivy_app().root.ids['your_image_id'].texture = texture  
            self.logger.info('Camera Feed Updated')  
        except Exception as e:  
            self.logger.error(f"Error updating texture: {e}")  
  
    @property  
    def name(self) -> str:  
        return "Camera Stream"

Service config

De service_config_oak0.json bevat het volgende:

{  
    "configs": [  
        {  
        "name": "oak0",  
        "port": 50010,  
        "host": "localhost",  
        "log_level": "INFO",  
        "subscriptions": [  
            {  
                "every_n": 1  
            }  
        ]  
    }  
    ]  
}

Mocht je de camera aan willen passen, kan je deze bij name veranderen naar oak1, oak2 of oak3. Verder is het misschien ook interessant om de every_n aan te passen. Dat is om hoeveel iteraties die de camera data stuurt naar onze applicatie.