RGB Anomaly Detection¶
In this cookbook, we identify color / RGB anomalies for segmented items. Capture a base image to extract your ground truth RGB with Roboflow and compare to neew data collected. In this scenario, we are assessing variations in logo color.
Click the Open in Colab button to run the cookbook on Google Colab.
Let's begin!
Install required packages¶
In [ ]:
Copied!
!pip install sklearn
!pip install sklearn
Imports¶
In [ ]:
Copied!
import cv2
import numpy as np
import time
import base64
import requests
import os, glob
from sklearn.cluster import KMeans
import cv2
import numpy as np
import time
import base64
import requests
import os, glob
from sklearn.cluster import KMeans
Extract target RGB color from polygon and run Kmeans¶
In [ ]:
Copied!
def parse_polygon_annotation(annotation_data, image_shape):
width, height = image_shape[1], image_shape[0]
return [(int(data['x']), int(data['y'])) for data in annotation_data]
def extract_polygon_area(image_path, polygon_points):
image = cv2.imread(image_path)
mask = np.zeros(image.shape[:2], dtype=np.uint8)
cv2.drawContours(mask, [np.array(polygon_points)], -1, (255, 255, 255), -1)
return cv2.bitwise_and(image, image, mask=mask)
def compute_average_color(image):
mask = np.all(image != [0, 0, 0], axis=2)
avg_color = np.mean(image[mask], axis=0)
return avg_color
def color_difference(color1, color2):
return np.linalg.norm(np.array(color1) - np.array(color2))
def count_color_matches(dominant_colors, target_colors, threshold):
matches_count = {tuple(target): 0 for target in target_colors}
matched_colors = {tuple(target): [] for target in target_colors}
for color in dominant_colors:
for target in target_colors:
difference = color_difference(color, target)
if difference < threshold:
matches_count[tuple(target)] += 1
matched_colors[tuple(target)].append(color)
return matches_count, matched_colors
def get_dominant_colors(image, k=5):
image = image.reshape((image.shape[0] * image.shape[1], 3))
image = image[np.any(image != [0, 0, 0], axis=1)]
kmeans = KMeans(n_clusters=k, n_init='auto')
kmeans.fit(image)
dominant_colors = kmeans.cluster_centers_
return dominant_colors
def extract_target_colors(target_image_path,inference_server_address, project_id, version_number):
target_image = cv2.imread(target_image_path)
with open(target_image_path, "rb") as f:
im_bytes = f.read()
im_b64 = base64.b64encode(im_bytes).decode("utf8")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
params = {
'api_key': 'FFgkmScNUBERP9t3PJvV',
}
response = requests.post(inference_server_address + project_id + str(version_number), params=params, headers=headers, data=im_b64)
data = response.json()
for predictions in data['predictions']:
Pred_points = predictions['points']
target_image = cv2.imread(target_image_path)
polygon_points = parse_polygon_annotation(Pred_points, target_image.shape)
polygon_image = extract_polygon_area(target_image_path, polygon_points)
target_dominant_colors = get_dominant_colors(polygon_image)
return target_dominant_colors
def match_images_with_target_colors(target_dominant_colors, images_folder, inference_server_address, project_id, version_number, color_threshold):
global prediction_counter, image_counter
total_matches = 0
matched_filepaths = []
extention_images = ".jpg"
get_images = sorted(glob.glob(images_folder + '*' + extention_images))
for images in get_images:
t0 = time.time()
print("File path: " + images)
img = cv2.imread(images)
with open(images, "rb") as f:
im_bytes = f.read()
im_b64 = base64.b64encode(im_bytes).decode("utf8")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
params = {
'api_key': '',
}
response = requests.post(inference_server_address + project_id + str(version_number), params=params, headers=headers, data=im_b64)
data = response.json()
for predictions in data['predictions']:
prediction_counter += 1
image_counter += 1
Pred_points = predictions['points']
image = cv2.imread(images)
polygon_points = parse_polygon_annotation(Pred_points, image.shape)
polygon_image = extract_polygon_area(images, polygon_points)
dominant_colors = get_dominant_colors(polygon_image)
matches, matched_colors_list = count_color_matches(dominant_colors, target_dominant_colors, color_threshold)
all_matched = all(value > 0 for value in matches.values())
if all_matched:
matched_filepaths.append(images)
total_matches += 1
print(f"\nTotal images where all target colors matched: {total_matches}")
print(f"\nMatched images where all target colors matched: {matched_filepaths}")
def parse_polygon_annotation(annotation_data, image_shape):
width, height = image_shape[1], image_shape[0]
return [(int(data['x']), int(data['y'])) for data in annotation_data]
def extract_polygon_area(image_path, polygon_points):
image = cv2.imread(image_path)
mask = np.zeros(image.shape[:2], dtype=np.uint8)
cv2.drawContours(mask, [np.array(polygon_points)], -1, (255, 255, 255), -1)
return cv2.bitwise_and(image, image, mask=mask)
def compute_average_color(image):
mask = np.all(image != [0, 0, 0], axis=2)
avg_color = np.mean(image[mask], axis=0)
return avg_color
def color_difference(color1, color2):
return np.linalg.norm(np.array(color1) - np.array(color2))
def count_color_matches(dominant_colors, target_colors, threshold):
matches_count = {tuple(target): 0 for target in target_colors}
matched_colors = {tuple(target): [] for target in target_colors}
for color in dominant_colors:
for target in target_colors:
difference = color_difference(color, target)
if difference < threshold:
matches_count[tuple(target)] += 1
matched_colors[tuple(target)].append(color)
return matches_count, matched_colors
def get_dominant_colors(image, k=5):
image = image.reshape((image.shape[0] * image.shape[1], 3))
image = image[np.any(image != [0, 0, 0], axis=1)]
kmeans = KMeans(n_clusters=k, n_init='auto')
kmeans.fit(image)
dominant_colors = kmeans.cluster_centers_
return dominant_colors
def extract_target_colors(target_image_path,inference_server_address, project_id, version_number):
target_image = cv2.imread(target_image_path)
with open(target_image_path, "rb") as f:
im_bytes = f.read()
im_b64 = base64.b64encode(im_bytes).decode("utf8")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
params = {
'api_key': 'FFgkmScNUBERP9t3PJvV',
}
response = requests.post(inference_server_address + project_id + str(version_number), params=params, headers=headers, data=im_b64)
data = response.json()
for predictions in data['predictions']:
Pred_points = predictions['points']
target_image = cv2.imread(target_image_path)
polygon_points = parse_polygon_annotation(Pred_points, target_image.shape)
polygon_image = extract_polygon_area(target_image_path, polygon_points)
target_dominant_colors = get_dominant_colors(polygon_image)
return target_dominant_colors
def match_images_with_target_colors(target_dominant_colors, images_folder, inference_server_address, project_id, version_number, color_threshold):
global prediction_counter, image_counter
total_matches = 0
matched_filepaths = []
extention_images = ".jpg"
get_images = sorted(glob.glob(images_folder + '*' + extention_images))
for images in get_images:
t0 = time.time()
print("File path: " + images)
img = cv2.imread(images)
with open(images, "rb") as f:
im_bytes = f.read()
im_b64 = base64.b64encode(im_bytes).decode("utf8")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
params = {
'api_key': '',
}
response = requests.post(inference_server_address + project_id + str(version_number), params=params, headers=headers, data=im_b64)
data = response.json()
for predictions in data['predictions']:
prediction_counter += 1
image_counter += 1
Pred_points = predictions['points']
image = cv2.imread(images)
polygon_points = parse_polygon_annotation(Pred_points, image.shape)
polygon_image = extract_polygon_area(images, polygon_points)
dominant_colors = get_dominant_colors(polygon_image)
matches, matched_colors_list = count_color_matches(dominant_colors, target_dominant_colors, color_threshold)
all_matched = all(value > 0 for value in matches.values())
if all_matched:
matched_filepaths.append(images)
total_matches += 1
print(f"\nTotal images where all target colors matched: {total_matches}")
print(f"\nMatched images where all target colors matched: {matched_filepaths}")
Run main function to compare base color logo with target colors and run anomaly detection¶
In [ ]:
Copied!
def main():
target_image_path = "TARGET_IMAGE_PATH"
inference_server_address = "http://detect.roboflow.com/"
version_number = 1
project_id = "PROJECT_ID"
images_folder = "IMAGE_FOLDER_PATH"
# grab all the .jpg files
extention_images = ".jpg"
get_images = sorted(glob.glob(images_folder + '*' + extention_images))
MAX_COLOR_DIFFERENCE = 3 * 256 # DO NOT EDIT
TARGET_COLOR_PERCENT_THRESHOLD= 0.08 # Value must be between 0 - 1 - DO EDIT
color_threshold = int(MAX_COLOR_DIFFERENCE * TARGET_COLOR_PERCENT_THRESHOLD)
target_dominant_colors = extract_target_colors(target_image_path,inference_server_address, project_id, version_number)
match_images_with_target_colors(target_dominant_colors, images_folder, inference_server_address, project_id, version_number, color_threshold)
if __name__ == "__main__":
main()
def main():
target_image_path = "TARGET_IMAGE_PATH"
inference_server_address = "http://detect.roboflow.com/"
version_number = 1
project_id = "PROJECT_ID"
images_folder = "IMAGE_FOLDER_PATH"
# grab all the .jpg files
extention_images = ".jpg"
get_images = sorted(glob.glob(images_folder + '*' + extention_images))
MAX_COLOR_DIFFERENCE = 3 * 256 # DO NOT EDIT
TARGET_COLOR_PERCENT_THRESHOLD= 0.08 # Value must be between 0 - 1 - DO EDIT
color_threshold = int(MAX_COLOR_DIFFERENCE * TARGET_COLOR_PERCENT_THRESHOLD)
target_dominant_colors = extract_target_colors(target_image_path,inference_server_address, project_id, version_number)
match_images_with_target_colors(target_dominant_colors, images_folder, inference_server_address, project_id, version_number, color_threshold)
if __name__ == "__main__":
main()