"""
Vision utilities for multimodal agent input.
This module provides:
1. VisionInput: Pydantic model for vision input
2. ImageStore: Disk-based image storage with deduplication
3. Utilities for converting between paths, base64, and OpenAI format
"""
import io
import re
import copy
import base64
import hashlib
from pathlib import Path
from typing import Optional
from PIL import Image
from pydantic import BaseModel
from .log import logger
# ============================================================================
# Constants
# ============================================================================
MAX_IMAGE_DIMENSION = 1568 # Claude/OpenAI recommended max (avoids internal resize)
# ============================================================================
# VisionInput Model
# ============================================================================
[docs]
def vision_to_openai(vision: VisionInput) -> list[dict]:
"""Convert VisionInput to OpenAI message format.
Args:
vision: VisionInput instance
Returns:
List of message dicts in OpenAI format
"""
messages = [{"role": "user", "content": [{"type": "text", "text": vision.prompt}]}]
for img in vision.images:
messages[0]["content"].append(
{
"type": "image_url",
"image_url": {"url": img},
}
)
return messages
# ============================================================================
# Image Base64 Utilities (Unified PIL-based)
# ============================================================================
[docs]
def get_image_base64(file_path: str, max_size: int = MAX_IMAGE_DIMENSION) -> str:
"""
Read a local image file and return its base64 data URI.
Automatically resizes large images to reduce transmission cost.
All images are processed through PIL for consistent output.
Args:
file_path: Path to image file (with or without file:// prefix)
max_size: Maximum dimension (width or height). Default: 1568px
Returns:
Data URI string (data:image/...;base64,...)
"""
# Strip file:// prefix if present
if file_path.startswith("file://"):
file_path = file_path[7:]
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"Image file not found: {file_path}")
# Read file bytes into memory first to avoid lazy loading issues
# (e.g., 'PngImageFile' object has no attribute '_im')
with open(path, "rb") as f:
file_bytes = f.read()
# Open from memory buffer - this forces complete loading
with Image.open(io.BytesIO(file_bytes)) as img:
# Resize if exceeds max dimension
if max(img.size) > max_size:
img.thumbnail((max_size, max_size), Image.LANCZOS)
# Encode to buffer. Anthropic rejects images whose base64 payload
# exceeds 5 MB; PNG of a 1568² RGBA image easily clears that even
# after thumbnailing. Fall back to a flattened RGB JPEG (with white
# background composite) when the lossless PNG comes out too large.
BASE64_LIMIT = 4 * 1024 * 1024 # leave headroom under 5 MB
# Approximate: base64 length ≈ raw_bytes * 4/3
RAW_LIMIT = (BASE64_LIMIT // 4) * 3
buffer = io.BytesIO()
used_jpeg_fallback = False
if img.mode in ("RGBA", "LA", "P"):
# Preserve transparency with PNG when feasible.
img.save(buffer, format="PNG", optimize=True)
mime = "png"
if buffer.tell() > RAW_LIMIT:
# PNG too big — composite onto white and re-encode as JPEG.
buffer = io.BytesIO()
if img.mode == "P":
img = img.convert("RGBA")
if img.mode in ("RGBA", "LA"):
bg = Image.new("RGB", img.size, (255, 255, 255))
alpha = img.getchannel("A") if img.mode == "RGBA" else img.split()[-1]
bg.paste(img.convert("RGB"), mask=alpha)
img = bg
else:
img = img.convert("RGB")
img.save(buffer, format="JPEG", quality=85, optimize=True)
mime = "jpeg"
used_jpeg_fallback = True
else:
# Use JPEG for RGB (smaller file size)
if img.mode != "RGB":
img = img.convert("RGB")
img.save(buffer, format="JPEG", quality=85, optimize=True)
mime = "jpeg"
# If JPEG is still too big (e.g. very high-detail photo), step quality
# down once. Aggressive quality drops here are rare in practice.
if buffer.tell() > RAW_LIMIT and (mime == "jpeg" or used_jpeg_fallback):
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=70, optimize=True)
mime = "jpeg"
return f"data:image/{mime};base64,{base64.b64encode(buffer.getvalue()).decode()}"
# Backward compatibility alias
path_to_image_url = get_image_base64
[docs]
def path_to_vision(
prompt: str, image_paths: list[str] | str | Path | list[Path]
) -> VisionInput:
"""Create VisionInput from local image paths.
Args:
prompt: Text prompt
image_paths: One or more local image paths
Returns:
VisionInput with file:// paths (expanded to Base64 before LLM call)
"""
if isinstance(image_paths, (str, Path)):
image_paths = [image_paths]
images = [f"file://{Path(p).resolve()}" for p in image_paths]
return VisionInput(images=images, prompt=prompt)
[docs]
def parse_image_mentions(
message: str, workspace: Path | str | None = None
) -> list[dict]:
"""
Parse @image:path tokens from message and build OpenAI multimodal format.
Args:
message: User input string (may contain @image: tokens)
workspace: Workspace directory for resolving relative paths.
If None, uses workspace from settings
Returns:
List of message dicts in OpenAI format
Example:
>>> parse_image_mentions("@image:photo.png describe this")
[{"role": "user", "content": [
{"type": "text", "text": "describe this"},
{"type": "image_url", "image_url": {"url": "file:///abs/path/photo.png"}}
]}]
"""
import re
# Pattern to match @image:path
image_pattern = r"@image:([^\s]+)"
matches = re.findall(image_pattern, message)
if not matches:
# No images - plain text message
return [{"role": "user", "content": message}]
# Get workspace
if workspace is None:
from pantheon.settings import get_settings
workspace = get_settings().workspace
workspace = Path(workspace)
# Remove @image: tokens from text
clean_text = re.sub(image_pattern, "", message).strip()
# Build multimodal content list
content = []
# Add text if present
if clean_text:
content.append({"type": "text", "text": clean_text})
# Add images as file:// paths
for image_path in matches:
try:
# Resolve relative paths to workspace
if not image_path.startswith("/"):
image_path = str(workspace / image_path)
path = Path(image_path)
# Verify file exists
if not path.exists():
content.append(
{"type": "text", "text": f"[Error: Image not found: {image_path}]"}
)
continue
# Use file:// path (Agent will handle expansion to Base64)
content.append(
{"type": "image_url", "image_url": {"url": f"file://{path.resolve()}"}}
)
except Exception as e:
content.append(
{"type": "text", "text": f"[Error loading image {image_path}: {e}]"}
)
return [{"role": "user", "content": content}]
# ============================================================================
# ImageStore - Disk-based Image Storage
# ============================================================================
[docs]
class ImageStore:
"""
Manages storage of images for chat sessions.
Storage location: <pantheon_dir>/images/<chat_id>/<md5_hash>.<ext>
Handles:
1. Saving base64 images to disk (deduplicated by hash)
2. Validating and resolving local file paths
3. Processing message dicts to convert images to file:// references
"""
[docs]
def __init__(self, storage_root: str | Path | None = None):
if storage_root is None:
from pantheon.settings import get_settings
storage_root = get_settings().pantheon_dir / "images"
self.storage_root = Path(storage_root).resolve()
def _get_chat_dir(self, chat_id: str) -> Path:
"""Get or create the image directory for a specific chat."""
path = self.storage_root / chat_id
path.mkdir(parents=True, exist_ok=True)
return path
[docs]
def save_base64_image(self, chat_id: str, base64_data: str) -> str:
"""
Save a base64 string image to disk.
Args:
chat_id: The ID of the chat
base64_data: Full data URI (data:image/png;base64,...)
Returns:
Absolute local file path to the saved image
"""
try:
# Parse header
header = None
data_str = base64_data
if "," in base64_data:
parts = base64_data.split(",", 1)
header = parts[0]
data_str = parts[1]
# Determine extension from header
ext = "png"
if header:
match = re.search(r"data:image/(\w+);base64", header)
if match:
ext = match.group(1)
if ext == "jpeg":
ext = "jpg"
# Decode
image_bytes = base64.b64decode(data_str)
# Compute hash for deduplication
file_hash = hashlib.md5(image_bytes).hexdigest()
filename = f"{file_hash}.{ext}"
# Save
chat_dir = self._get_chat_dir(chat_id)
file_path = chat_dir / filename
if not file_path.exists():
with open(file_path, "wb") as f:
f.write(image_bytes)
logger.debug(f"Saved image to {file_path}")
return str(file_path.absolute())
except Exception as e:
logger.error(f"Failed to save base64 image: {e}")
raise
[docs]
def normalize_local_path(self, path_str: str) -> str:
"""
Normalize and verify a local file path.
Args:
path_str: Raw path (e.g. "file:///tmp/a.png", "/tmp/a.png")
Returns:
Absolute file path
Raises:
FileNotFoundError: If file does not exist
"""
if path_str.startswith("file://"):
path_str = path_str[7:]
path = Path(path_str).resolve()
if not path.exists():
# Try relative to cwd
rel_path = Path.cwd() / path_str
if rel_path.exists():
return str(rel_path.resolve())
raise FileNotFoundError(f"Image file not found: {path_str}")
return str(path)
[docs]
def process_message_images(self, message: dict, chat_id: str) -> None:
"""
Process a single message dict in-place.
- Extracts content from message
- Skips if content is not a list (plain text)
- For each image_url item:
- Base64 → save to disk → replace with file:// path
- Local path → verify → standardize to file:// path
- HTTP URL → pass through
"""
content = message.get("content")
if not isinstance(content, list):
return
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") == "image_url" and "image_url" in item:
url = item["image_url"].get("url", "")
try:
if url.startswith("data:image/"):
# Base64 → save to disk
saved_path = self.save_base64_image(chat_id, url)
item["image_url"]["url"] = f"file://{saved_path}"
elif url.startswith("file://") or url.startswith("/"):
# Local path → normalize
norm_path = self.normalize_local_path(url)
item["image_url"]["url"] = f"file://{norm_path}"
# HTTP URLs pass through unchanged
except Exception as e:
logger.error(f"Error processing image: {e}")
# Global Singleton
_image_store: Optional[ImageStore] = None
[docs]
def get_image_store() -> ImageStore:
"""Get or create global ImageStore instance."""
global _image_store
if _image_store is None:
_image_store = ImageStore()
return _image_store
# ============================================================================
# LLM Message Expansion
# ============================================================================
[docs]
def expand_image_references_for_llm(messages: list[dict]) -> list[dict]:
"""
Expand file:// image references to Base64 data URIs for LLM consumption.
Called just before sending messages to the LLM API.
Args:
messages: List of message dicts (will be deep copied)
Returns:
New list with file:// paths converted to base64 data URIs
"""
result = copy.deepcopy(messages)
for msg in result:
content = msg.get("content")
if not isinstance(content, list):
continue
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") == "image_url" and "image_url" in item:
url = item["image_url"].get("url", "")
if url.startswith("file://"):
try:
base64_uri = get_image_base64(url)
item["image_url"]["url"] = base64_uri
except Exception as e:
logger.error(f"Failed to expand image {url}: {e}")
return result