Reduce Size of PlateRecognizer image

Il_Pres

Getting the hang of it
Joined
Nov 20, 2020
Messages
51
Reaction score
25
Location
Italy
I have a slow internet connection and it will take up to 15 seconds for BI to upload an image to PlateRecognizer cloud. Anyway to reduce the size of the sent image? I find it difficult to understand which image BI is sending and how to control it.
Thank you
 

cjowers

Getting the hang of it
Joined
Jan 28, 2020
Messages
107
Reaction score
36
Location
AUS
if you camera is high megapixel (>2MP), the easiest solution might be to reduce your camera stream size to 2MP. or have a seperate camera stream or clone running at a lower res for just platerecognizer usage. but anything below 2MP and you might be losing quality and information in the plates.
i haven't used platerecognizer, but i did create a python script that identifies license plates using a custom deepstack model and crops any plates and saves them (which is a much much smaller sized image). if you could modify the script to sent to platerecognizer you'd be in business, but you'd be outside of the normal BI integration using that method.
 

Il_Pres

Getting the hang of it
Joined
Nov 20, 2020
Messages
51
Reaction score
25
Location
Italy
Many thanks for your suggestion, which led me also to try to re-encode the stream... will post results.
 

Il_Pres

Getting the hang of it
Joined
Nov 20, 2020
Messages
51
Reaction score
25
Location
Italy
End of the day, I was able to get super fast LPR recognition using a Python script to crop the image down to the CAR box. I had to implement a logic to save the DeepStack JSON to a file and read it from the Python Script. A mix of BI actions, python scripting and NodeRed automations (the least just to open the gate when plate is recognized). File size reduced to less than 100kB.
 

cjowers

Getting the hang of it
Joined
Jan 28, 2020
Messages
107
Reaction score
36
Location
AUS
How did you connect this to blueiris?
not sure about Il_Pres, but in my case, i didn't. BI just captures the jpegs on motion, and dumps to a folder as usual. python+deepstack analyses the images, and does any image processing needed (cropping), then saves to another folder. then another python+deepstack combo analyses the files in the folder for alphanumeric characters and outputs to a .txt log + creates a yolo labeling file for future training. then another python script lets me quickly adjust any incorrect values, so i can feed it back into the custom machine learning model for better OCR results.

you should be able to trigger BI events and things from python tho. what are you wanting to do? send images off to platerecognizer?
 

Il_Pres

Getting the hang of it
Joined
Nov 20, 2020
Messages
51
Reaction score
25
Location
Italy
I've gone further, as I have big delays from the motion is triggered to the moment the jpg get saved and thus processed by the Deepstack integration in Blueiris. I've created a script to grab a jpg snapshot directly from the camera and then let AITool to process it throw deepstack. If car is found, the other script will call ALPR. This is needed to open a gate, to it has to be fast...
 

cjowers

Getting the hang of it
Joined
Jan 28, 2020
Messages
107
Reaction score
36
Location
AUS
I've gone further, as I have big delays from the motion is triggered to the moment the jpg get saved and thus processed by the Deepstack integration in Blueiris. I've created a script to grab a jpg snapshot directly from the camera and then let AITool to process it throw deepstack. If car is found, the other script will call ALPR. This is needed to open a gate, to it has to be fast...
Can you share your script to call ALPR if you dont mind?
 

Il_Pres

Getting the hang of it
Joined
Nov 20, 2020
Messages
51
Reaction score
25
Location
Italy
Find the scripts below. Please note that the first script requires two inputs from BlueIris: &ALERT_PATH and &CAM , so call it like : c:\pythonscript.py &ALERT_PATH &CAM. Before calling this script , you will need a json file containing the DeepStack reply (&JSON macro) and of course the jpg analized by deepstack. The script will look for the last saved file with the camera name and the jgp and json extensions to do it's work.

crop.py
Python:
#!/usr/bin/python

import sys, getopt, json, logging
import os
import PIL
import requests
#import voluptuous as vol
import re
import io
from typing import List, Dict
import json
from datetime import datetime as dt_util
from PIL import Image, ImageDraw, UnidentifiedImageError
from pathlib import Path
import paho.mqtt.publish as publish
from ALPR import PlateRecognizerEntity
from dataclasses import dataclass

DATETIME_FORMAT = "%Y-%m-%d_%H-%M-%S"

input_directory = 'D:\\Alerts\\'
output_directory = "d:\\AI\\"


@dataclass
class InputValues():
   inputfile: str = ''
   filepath: str = ''
   jsonfile: str = ''
   cam: str = ''

def main(argv):
   inputfile = ''
   jsonfile = ''
   cam = ""
   inputfile = argv[0]
   cam = argv[1]
   import glob
 
   list_of_files_json = glob.glob(output_directory + cam + '*.json') # * means all if need specific format then *.csv
   latest_file_json = max(list_of_files_json, key=os.path.getctime)

   list_of_files_jpg = glob.glob(input_directory + cam + '*.jpg') # * means all if need specific format then *.csv
   latest_file_jpg = max(list_of_files_jpg, key=os.path.getctime)

   filepath = latest_file_jpg
   jsonfile = latest_file_json
  
   logger.debug ('Input file is ' + inputfile)
   logger.debug  ('JSON file is ' + jsonfile)
   logger.debug  ('Camera Name is ' + cam)

   return InputValues(inputfile, filepath, jsonfile, cam)
  


formatter = logging.Formatter('%(asctime)s %(levelname)s: %(funcName)s:%(lineno)d %(message)s')

# create logger with 'spam_application'
logger = logging.getLogger('LPR')
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('AI/LPR.log')
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)

# just so we can see things as they happen in stdout/stderr
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)

# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)

logger.debug("Inizio Procedura")

PLATE_READER_URL = "https://api.platerecognizer.com/v1/plate-reader/"
STATS_URL = "https://api.platerecognizer.com/v1/statistics/"

EVENT_VEHICLE_DETECTED = "platerecognizer.vehicle_detected"


def setup_platform(add_entities, discovery_info=None):
   """Set up the platform."""
   # Validate credentials by processing image.
   save_file_folder = "d:\\ai\\"
   # if save_file_folder:
   #    save_file_folder = os.path(save_file_folder)

  
  
   platerecognizer = PlateRecognizerEntity(
      api_token="TOKEN",
      regions = "it",
      save_file_folder="d:\\AI\\",
      save_timestamped_file=True,
      always_save_latest_file=True,
      watched_plates="",
      camera_entity="Targa",
      name=cam,
      mmc="",
      server=PLATE_READER_URL,
      detection_rule = "strict",
      region_strict = "false",
      stats_server = STATS_URL,
      logger= logger
   )
  
   add_entities.append(platerecognizer)

if __name__ == "__main__":
   t = main(sys.argv[1:])

inputfile = t.inputfile
filepath = t.filepath
jsonfile = t.jsonfile
cam = t.cam

###python file - Custom_LPR_detection_v20201229###
try:
   f = open(jsonfile,"r")
   outputfile = json.loads(f.read())#
except Exception as exc:
   logger.error("JSON File error: %s", exc)
 
logger.debug("Parametro DS: %s", jsonfile)

#cleanup variables before run, just in case
left = 0
top = 0
right = 0
bottom = 0
label = 0
confidence = 0.0

#carica DeepStack
response = outputfile[0]['found']

logger.debug("Numero oggetti trovati = " + str(len(response["predictions"])))
#log result for debugging

for detection in response["predictions"]:

   label = detection["label"]

   confidence = detection["confidence"]

   labelList = ['car','truck','boat']
 
#when an object is found, perform an action (crop and save image to output_directory)

   if confidence > 0.2 and label.lower() in labelList:

      #log details of the match

      logger.debug("yes, it is a positive match.  A plate was found in " + inputfile + "  with " + str(confidence) + " confidence.")


      # Opens a image in RGB mode
      try:
         im = Image.open(filepath)
      except Exception as exc:
         logger.error("JPEG File error: %s", exc)

      # Setting the points for cropped image

      # include extra margin of ~20pixels

      left = detection["x_min"]   - 20

      top = detection["y_min"]    - 20

      right = detection["x_max"]  + 20

      bottom = detection["y_max"] + 20


      #ensure margin does not extend beyond image limits  - turn this on if it helps

      #if left < 0:        left    = 0

      #if top < 0:         top     = 0

      #if right > 1080:    right   = 1080

      #if bottom > 1080:   bottom  = 1080

      ###avoid false positives due to timestamp.  filter out image corner  - this was commented out since it was usually actual plates under the timestamp

      ##if object["x_min"] > 100 or object["y_min"] > 40 :
        

         # Cropped image of above dimension

         # (It will not change orginal image)

      im1 = im.crop((left, top, right, bottom))                 

         # save a image using extension
      savePath = output_directory + "cropped_" + inputfile
      im1.save(savePath)

         #log action for debugging

      logger.debug("image saved as " + output_directory + "cropped_" + inputfile)

      pr = []
      setup_platform(pr)
      prE = pr[0]

      logger.debug("Invio file a PR " + inputfile)

      prE.process_image(savePath,im1)

      logger.debug("Ottenuta risposta da PR " + inputfile)

      if prE._state > 0:
            prE._last_detection = dt_util.now().strftime(DATETIME_FORMAT)
            for vehicle in prE._vehicles:
                  ##prE.fire_vehicle_detected_event(vehicle)
                  vehicle_copy = vehicle.copy()
                  try:
                     mqtt_auth = None
                     mqtt_auth = { 'username': "", 'password': "" }
                     targa = vehicle["plate"].upper()
                     payload = "{\"targa\": \"" + targa + "\",\"camera\": \"" + prE._name + "\"}"
                    
                     logger.debug("Pubblico MQTT" + targa)

                     publish.single("BlueIris/LPR",payload , qos=0,
                        retain=False, hostname="10.0.0.51", port=1883, client_id="Plate_Script", keepalive=60, will=None,
                        auth=mqtt_auth, tls=None)

                     logger.debug("Fatto " + targa)
                  except Exception as exc:
                     logger.error("Errore pubblicazione messaggio: %s", exc)


   else:

      logger.debug("No Plate was found in image " + inputfile + " for detected " + label)
ALPR.py
Python:
import sys, getopt, json, logging
import os
import PIL
import requests
#import voluptuous as vol
import re
import io
from typing import List, Dict
import json
from datetime import datetime as dt_util
from PIL import Image, ImageDraw, UnidentifiedImageError
from pathlib import Path
from paho.mqtt.client import Client

ATTR_PLATE = "plate"
ATTR_CONFIDENCE = "confidence"
ATTR_REGION_CODE = "region_code"
ATTR_VEHICLE_TYPE = "vehicle_type"
ATTR_ORIENTATION = "orientation"
DATETIME_FORMAT = "%Y-%m-%d_%H-%M-%S"

RED = (255, 0, 0)  # For objects within the ROI
DEFAULT_REGIONS = ['it']

def get_plates(results : List[Dict]) -> List[str]:
    """
    Return the list of candidate plates.
    If no plates empty list returned.
    """
    plates = []
    candidates = [result['candidates'] for result in results]
    for candidate in candidates:
        cand_plates = [cand['plate'] for cand in candidate]
        for plate in cand_plates:
            plates.append(plate)
    return list(set(plates))

def get_orientations(results : List[Dict],LOGGER) -> List[str]:
    """
    Return the list of candidate orientations.
    If no orientations empty list returned.
    """
    try:
        orientations = []
        candidates = [result['orientation'] for result in results]
        for candidate in candidates:
            for cand in candidate:
                LOGGER.debug("get_orientations cand: %s", cand)
                if cand["score"] >= 0.7:
                    orientations.append(cand["orientation"])
        return list(set(orientations))
    except Exception as exc:
        LOGGER.error("get_orientations error: %s", exc)


class PlateRecognizerEntity:
    """Create entity."""

    def __init__(
        self,
        api_token,
        regions,
        save_file_folder,
        save_timestamped_file,
        always_save_latest_file,
        watched_plates,
        camera_entity,
        name,
        mmc,
        server,
        detection_rule,
        region_strict,
        stats_server,
        logger
    ):
        """Init."""
        self._headers = {"Authorization": f"Token {api_token}"}
        self._regions = regions
        self._camera = camera_entity
        if name:
            self._name = name
        else:
            camera_name = "Targa"
            self._name = f"platerecognizer_{camera_name}"
        self._save_file_folder = save_file_folder
        self._save_timestamped_file = save_timestamped_file
        self._always_save_latest_file = always_save_latest_file
        self._watched_plates = watched_plates
        self._mmc = mmc
        self._server = server
        self._detection_rule = detection_rule
        self._region_strict = region_strict
        self._state = None
        self._results = {}
        self._vehicles = [{}]
        self._orientations = []
        self._plates = []
        self._statistics = {}
        self._last_detection = None
        self._image_width = None
        self._image_height = None
        self._image = None
        self._config = {}
        self._stats_server = stats_server
        self.get_statistics()
        self._LOGGER= logger

    def process_image(self, savePath,im):
      """Process an image."""
      self._state = None
      self._results = {}
      self._vehicles = [{}]
      self._plates = []
      self._orientations = []
  
      img_byte_arr = io.BytesIO()
      im.save(img_byte_arr, format="JPEG")
      self._image = im
      self._imageIO = img_byte_arr.getvalue()
      
      self._image_width, self._image_height = im.size
      
      if self._regions == DEFAULT_REGIONS:
         regions = None
      else:
         regions = self._regions
      if self._detection_rule:
         self._config.update({"detection_rule" : self._detection_rule})
      if self._region_strict:
         self._config.update({"region": self._region_strict})
      try:
         self._LOGGER.debug("Config: " + str(json.dumps(self._config)))
         response = requests.post(
               self._server,
               data=dict(regions=regions, camera_id=self.name, mmc=self._mmc, config=json.dumps(self._config)), 
               files={"upload": self._imageIO},
               headers=self._headers
         ).json()
         self._results = response["results"]
         self._plates = get_plates(response['results'])
         if self._mmc:
               self._orientations = get_orientations(response['results'],self._LOGGER)
         self._vehicles = [
               {
                  ATTR_PLATE: r["plate"],
                  ATTR_CONFIDENCE: r["score"],
                  ATTR_REGION_CODE: r["region"]["code"],
                  ATTR_VEHICLE_TYPE: r["vehicle"]["type"],
               }
               for r in self._results
         ]
      except Exception as exc:
         self._LOGGER.error("platerecognizer error: %s", exc)
         self._LOGGER.error(f"platerecognizer api response: {response}")

      self._state = len(self._vehicles)
      if self._state > 0:
         self._last_detection = dt_util.now().strftime(DATETIME_FORMAT)
         for vehicle in self._vehicles:
               self.fire_vehicle_detected_event(vehicle)
    #   if self._save_file_folder:
    #      if self._state > 0 or self._always_save_latest_file:
    #            self.save_image()
    #   if self._server == "":
    #      self.get_statistics()
    #   else:
    #      stats = response["usage"]
    #      calls_remaining = stats["max_calls"] - stats["calls"]
    #      stats.update({"calls_remaining": calls_remaining})
    #      self._statistics = stats

    def get_statistics(self):
        try:
            response = requests.get(self._stats_server, headers=self._headers).json()
            calls_remaining = response["total_calls"] - response["usage"]["calls"]
            response.update({"calls_remaining": calls_remaining})
            self._statistics = response.copy()
        except Exception as exc:
            self._LOGGER.error("platerecognizer error getting statistics: %s", exc)

    def fire_vehicle_detected_event(self, vehicle):
         """Send event."""
        
        
        
#        vehicle_copy.update({ATTR_ENTITY_ID: self.entity_id})
 #       self.hass.bus.fire(EVENT_VEHICLE_DETECTED, vehicle_copy)

    def save_image(self):
        """Save a timestamped image with bounding boxes around plates."""
        draw = ImageDraw.Draw(self._image)

        decimal_places = 3
        for vehicle in self._results:
            box = (
                    round(vehicle['box']["ymin"] / self._image_height, decimal_places),
                    round(vehicle['box']["xmin"] / self._image_width, decimal_places),
                    round(vehicle['box']["ymax"] / self._image_height, decimal_places),
                    round(vehicle['box']["xmax"] / self._image_width, decimal_places),
            )
            text = vehicle['plate']
            # draw_box(
            #     draw,
            #     box,
            #     self._image_width,
            #     self._image_height,
            #     text=text,
            #     color=RED,
            #     )

        latest_save_path = self._save_file_folder + self._name + "_latest.png"
        self._image.save(latest_save_path)

        if self._save_timestamped_file:
            timestamp_save_path = self._save_file_folder + self._name +"_" + self._last_detection + ".png"
            #self._image.save(timestamp_save_path)
            self._LOGGER.info("platerecognizer saved file %s", timestamp_save_path)

    @property
    def camera_entity(self):
        """Return camera entity id from process pictures."""
        return self._camera

    @property
    def name(self):
        """Return the name of the sensor."""
        return self._name

    @property
    def should_poll(self):
        """Return the polling state."""
        return False

    @property
    def state(self):
        """Return the state of the entity."""
        return self._state

    @property
    def unit_of_measurement(self):
        """Return the unit of measurement."""
        return ATTR_PLATE

    @property
    def device_state_attributes(self):
        """Return the attributes."""
        attr = {}
        attr.update({"last_detection": self._last_detection})
        attr.update({"vehicles": self._vehicles})
        attr.update({ATTR_ORIENTATION: self._orientations})
        if self._watched_plates:
            watched_plates_results = {plate : False for plate in self._watched_plates}
            for plate in self._watched_plates:
                if plate in self._plates:
                    watched_plates_results.update({plate: True})
            attr[CONF_WATCHED_PLATES] = watched_plates_results
        attr.update({"statistics": self._statistics})
        if self._regions != DEFAULT_REGIONS:
            attr[CONF_REGIONS] = self._regions
        if self._server != PLATE_READER_URL:
            attr[CONF_SERVER] = str(self._server)
        if self._save_file_folder:
            attr[CONF_SAVE_FILE_FOLDER] = str(self._save_file_folder)
            attr[CONF_SAVE_TIMESTAMPTED_FILE] = self._save_timestamped_file
            attr[CONF_ALWAYS_SAVE_LATEST_FILE] = self._always_save_latest_file
        return attr
So the logic is: BlueIris saves the JPG and analyzes it throug DeepStack. Save the JSON reply to a file using alert actions, call the crop.py script, that will call ALPR and also post a mqtt message to NodeRed to open the gate in case a plate is found.

Credid should be given to the work done by Robin here: GitHub - robmarkcole/HASS-plate-recognizer: Read number plates with https://platerecognizer.com/
 
Top