import sys, getopt, json, logging
import os
import PIL
import requests
import re
import io
from typing import List, Dict
import json
from datetime import datetime as dt_util
from PIL import Image, ImageDraw, UnidentifiedImageError
from pathlib import Path
from paho.mqtt.client import Client
ATTR_PLATE = "plate"
ATTR_CONFIDENCE = "confidence"
ATTR_REGION_CODE = "region_code"
ATTR_VEHICLE_TYPE = "vehicle_type"
ATTR_ORIENTATION = "orientation"
DATETIME_FORMAT = "%Y-%m-%d_%H-%M-%S"
RED = (255, 0, 0)
DEFAULT_REGIONS = ['it']
def get_plates(results : List[Dict]) -> List[str]:
"""
Return the list of candidate plates.
If no plates empty list returned.
"""
plates = []
candidates = [result['candidates'] for result in results]
for candidate in candidates:
cand_plates = [cand['plate'] for cand in candidate]
for plate in cand_plates:
plates.append(plate)
return list(set(plates))
def get_orientations(results : List[Dict],LOGGER) -> List[str]:
"""
Return the list of candidate orientations.
If no orientations empty list returned.
"""
try:
orientations = []
candidates = [result['orientation'] for result in results]
for candidate in candidates:
for cand in candidate:
LOGGER.debug("get_orientations cand: %s", cand)
if cand["score"] >= 0.7:
orientations.append(cand["orientation"])
return list(set(orientations))
except Exception as exc:
LOGGER.error("get_orientations error: %s", exc)
class PlateRecognizerEntity:
"""Create entity."""
def __init__(
self,
api_token,
regions,
save_file_folder,
save_timestamped_file,
always_save_latest_file,
watched_plates,
camera_entity,
name,
mmc,
server,
detection_rule,
region_strict,
stats_server,
logger
):
"""Init."""
self._headers = {"Authorization": f"Token {api_token}"}
self._regions = regions
self._camera = camera_entity
if name:
self._name = name
else:
camera_name = "Targa"
self._name = f"platerecognizer_{camera_name}"
self._save_file_folder = save_file_folder
self._save_timestamped_file = save_timestamped_file
self._always_save_latest_file = always_save_latest_file
self._watched_plates = watched_plates
self._mmc = mmc
self._server = server
self._detection_rule = detection_rule
self._region_strict = region_strict
self._state = None
self._results = {}
self._vehicles = [{}]
self._orientations = []
self._plates = []
self._statistics = {}
self._last_detection = None
self._image_width = None
self._image_height = None
self._image = None
self._config = {}
self._stats_server = stats_server
self.get_statistics()
self._LOGGER= logger
def process_image(self, savePath,im):
"""Process an image."""
self._state = None
self._results = {}
self._vehicles = [{}]
self._plates = []
self._orientations = []
img_byte_arr = io.BytesIO()
im.save(img_byte_arr, format="JPEG")
self._image = im
self._imageIO = img_byte_arr.getvalue()
self._image_width, self._image_height = im.size
if self._regions == DEFAULT_REGIONS:
regions = None
else:
regions = self._regions
if self._detection_rule:
self._config.update({"detection_rule" : self._detection_rule})
if self._region_strict:
self._config.update({"region": self._region_strict})
try:
self._LOGGER.debug("Config: " + str(json.dumps(self._config)))
response = requests.post(
self._server,
data=dict(regions=regions, camera_id=self.name, mmc=self._mmc, config=json.dumps(self._config)),
files={"upload": self._imageIO},
headers=self._headers
).json()
self._results = response["results"]
self._plates = get_plates(response['results'])
if self._mmc:
self._orientations = get_orientations(response['results'],self._LOGGER)
self._vehicles = [
{
ATTR_PLATE: r["plate"],
ATTR_CONFIDENCE: r["score"],
ATTR_REGION_CODE: r["region"]["code"],
ATTR_VEHICLE_TYPE: r["vehicle"]["type"],
}
for r in self._results
]
except Exception as exc:
self._LOGGER.error("platerecognizer error: %s", exc)
self._LOGGER.error(f"platerecognizer api response: {response}")
self._state = len(self._vehicles)
if self._state > 0:
self._last_detection = dt_util.now().strftime(DATETIME_FORMAT)
for vehicle in self._vehicles:
self.fire_vehicle_detected_event(vehicle)
def get_statistics(self):
try:
response = requests.get(self._stats_server, headers=self._headers).json()
calls_remaining = response["total_calls"] - response["usage"]["calls"]
response.update({"calls_remaining": calls_remaining})
self._statistics = response.copy()
except Exception as exc:
self._LOGGER.error("platerecognizer error getting statistics: %s", exc)
def fire_vehicle_detected_event(self, vehicle):
"""Send event."""
def save_image(self):
"""Save a timestamped image with bounding boxes around plates."""
draw = ImageDraw.Draw(self._image)
decimal_places = 3
for vehicle in self._results:
box = (
round(vehicle['box']["ymin"] / self._image_height, decimal_places),
round(vehicle['box']["xmin"] / self._image_width, decimal_places),
round(vehicle['box']["ymax"] / self._image_height, decimal_places),
round(vehicle['box']["xmax"] / self._image_width, decimal_places),
)
text = vehicle['plate']
latest_save_path = self._save_file_folder + self._name + "_latest.png"
self._image.save(latest_save_path)
if self._save_timestamped_file:
timestamp_save_path = self._save_file_folder + self._name +"_" + self._last_detection + ".png"
self._LOGGER.info("platerecognizer saved file %s", timestamp_save_path)
@property
def camera_entity(self):
"""Return camera entity id from process pictures."""
return self._camera
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def should_poll(self):
"""Return the polling state."""
return False
@property
def state(self):
"""Return the state of the entity."""
return self._state
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
return ATTR_PLATE
@property
def device_state_attributes(self):
"""Return the attributes."""
attr = {}
attr.update({"last_detection": self._last_detection})
attr.update({"vehicles": self._vehicles})
attr.update({ATTR_ORIENTATION: self._orientations})
if self._watched_plates:
watched_plates_results = {plate : False for plate in self._watched_plates}
for plate in self._watched_plates:
if plate in self._plates:
watched_plates_results.update({plate: True})
attr[CONF_WATCHED_PLATES] = watched_plates_results
attr.update({"statistics": self._statistics})
if self._regions != DEFAULT_REGIONS:
attr[CONF_REGIONS] = self._regions
if self._server != PLATE_READER_URL:
attr[CONF_SERVER] = str(self._server)
if self._save_file_folder:
attr[CONF_SAVE_FILE_FOLDER] = str(self._save_file_folder)
attr[CONF_SAVE_TIMESTAMPTED_FILE] = self._save_timestamped_file
attr[CONF_ALWAYS_SAVE_LATEST_FILE] = self._always_save_latest_file
return attr