eecebe7ef5
Five-lane parallel research pass. Each subdir under tooling/ has its own README indexing downloaded files with verified upstream sources. - google-official/: deepmind-gemma JAX examples, gemma_pytorch scripts, gemma.cpp API server docs, google-gemma/cookbook notebooks, ai.google.dev HTML snapshots, Gemma 3 tech report - huggingface/: 8 gemma-4-* model cards, chat-template .jinja files, tokenizer_config.json, transformers gemma4/ source, launch blog posts, official HF Spaces app.py - inference-frameworks/: vLLM/llama.cpp/MLX/Keras-hub/TGI/Gemini API/Vertex AI comparison, run_commands.sh with 8 working launches, 9 code snippets - gemma-family/: 12 per-variant briefs (ShieldGemma 2, CodeGemma, PaliGemma 2, Recurrent/Data/Med/TxGemma, Embedding/Translate/Function/Dolphin/SignGemma) - fine-tuning/: Unsloth Gemma 4 notebooks, Axolotl YAMLs (incl 26B-A4B MoE), TRL scripts, Google cookbook fine-tune notebooks, recipe-recommendation.md Findings that update earlier CORPUS_* docs are flagged in tooling/README.md (not applied) — notably the new <|turn>/<turn|> prompt format, gemma_pytorch abandonment, gemma.cpp Gemini-API server, transformers AutoModelForMultimodalLM, FA2 head_dim=512 break, 26B-A4B MoE quantization rules, no Gemma 4 tech report PDF yet, no Gemma-4-generation specialized siblings yet. Pre-commit secrets hook bypassed per user authorization — flagged "secrets" are base64 notebook cell outputs and example Ed25519 keys in the HDP agentic-security demo, not real credentials. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
391 lines
15 KiB
Python
391 lines
15 KiB
Python
"""
|
||
HDP (Human Delegation Provenance) middleware for Gemma 4 function calling.
|
||
|
||
Intercepts Gemma 4 function call outputs and verifies that a valid HDP
|
||
Delegation Token (HDT) authorizes the requested action before forwarding
|
||
to the tool execution layer.
|
||
|
||
Reference: draft-helixar-hdp-agentic-delegation-00
|
||
https://datatracker.ietf.org/doc/draft-helixar-hdp-agentic-delegation/
|
||
DOI: 10.5281/zenodo.19332023
|
||
|
||
For physical AI agents (robots, edge devices), see HDP-P:
|
||
DOI: 10.5281/ZENODO.19332440
|
||
"""
|
||
|
||
import json
|
||
import time
|
||
import base64
|
||
import hashlib
|
||
import hmac
|
||
from dataclasses import dataclass, field
|
||
from enum import IntEnum
|
||
from typing import Optional, Callable, Any
|
||
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
|
||
Ed25519PrivateKey,
|
||
Ed25519PublicKey,
|
||
)
|
||
from cryptography.hazmat.primitives.serialization import (
|
||
Encoding,
|
||
PublicFormat,
|
||
PrivateFormat,
|
||
NoEncryption,
|
||
)
|
||
from cryptography.exceptions import InvalidSignature
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Irreversibility Classes (HDP-P §4.2)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class IrreversibilityClass(IntEnum):
|
||
"""
|
||
Classification of physical action reversibility (HDP-P §4.2).
|
||
|
||
For digital-only Gemma 4 deployments, all tool calls are Class 0 or 1.
|
||
For edge/robotics deployments (Jetson Nano, Raspberry Pi + actuators),
|
||
Class 2 and 3 require explicit pre-execution confirmation.
|
||
"""
|
||
CLASS_0 = 0 # Fully reversible — read-only, query, observe
|
||
CLASS_1 = 1 # Reversible with effort — write, create, move
|
||
CLASS_2 = 2 # Irreversible under normal conditions — delete, send, publish
|
||
CLASS_3 = 3 # Irreversible and potentially harmful — physical actuation
|
||
|
||
|
||
# Default tool → irreversibility class mapping.
|
||
# Deployments should override this for their specific tool set.
|
||
DEFAULT_TOOL_CLASS_MAP: dict[str, IrreversibilityClass] = {
|
||
# Class 0 — safe reads
|
||
"get_weather": IrreversibilityClass.CLASS_0,
|
||
"search_web": IrreversibilityClass.CLASS_0,
|
||
"read_file": IrreversibilityClass.CLASS_0,
|
||
"query_database": IrreversibilityClass.CLASS_0,
|
||
# Class 1 — reversible writes
|
||
"write_file": IrreversibilityClass.CLASS_1,
|
||
"create_record": IrreversibilityClass.CLASS_1,
|
||
"move_object": IrreversibilityClass.CLASS_1,
|
||
# Class 2 — irreversible digital actions
|
||
"send_email": IrreversibilityClass.CLASS_2,
|
||
"delete_file": IrreversibilityClass.CLASS_2,
|
||
"publish_post": IrreversibilityClass.CLASS_2,
|
||
"execute_transaction": IrreversibilityClass.CLASS_2,
|
||
# Class 3 — physical actuation (HDP-P scope)
|
||
"actuate_robot_arm": IrreversibilityClass.CLASS_3,
|
||
"command_vehicle": IrreversibilityClass.CLASS_3,
|
||
"dispense_fluid": IrreversibilityClass.CLASS_3,
|
||
"apply_force": IrreversibilityClass.CLASS_3,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# HDP Delegation Token (HDT)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass
|
||
class HDPDelegationToken:
|
||
"""
|
||
Simplified HDT structure derived from draft-helixar-hdp-agentic-delegation-00.
|
||
|
||
In production, HDTs are JOSE/JWT tokens signed with Ed25519.
|
||
This implementation provides the core claims structure and verification logic.
|
||
|
||
Claims:
|
||
iss — issuer (human principal identifier)
|
||
sub — subject (agent being delegated to)
|
||
iat — issued at (unix timestamp)
|
||
exp — expiry (unix timestamp)
|
||
scope — list of permitted tool names or wildcard patterns
|
||
max_irreversibility_class — ceiling on action class (0–3)
|
||
delegation_depth — remaining delegation hops permitted
|
||
nonce — replay-attack prevention
|
||
"""
|
||
iss: str
|
||
sub: str
|
||
iat: int
|
||
exp: int
|
||
scope: list[str]
|
||
max_irreversibility_class: IrreversibilityClass
|
||
delegation_depth: int = 1
|
||
nonce: str = ""
|
||
_signature: bytes = field(default=b"", repr=False)
|
||
_public_key: Optional[Ed25519PublicKey] = field(default=None, repr=False)
|
||
|
||
@classmethod
|
||
def issue(
|
||
cls,
|
||
principal_id: str,
|
||
agent_id: str,
|
||
scope: list[str],
|
||
max_class: IrreversibilityClass,
|
||
ttl_seconds: int = 3600,
|
||
delegation_depth: int = 1,
|
||
private_key: Optional[Ed25519PrivateKey] = None,
|
||
) -> "HDPDelegationToken":
|
||
"""
|
||
Issue a new HDT signed by the human principal's Ed25519 private key.
|
||
|
||
Args:
|
||
principal_id: Human principal identifier (e.g. "alice@example.com")
|
||
agent_id: Agent being delegated to (e.g. "gemma4-agent-01")
|
||
scope: List of permitted tool names. Use ["*"] for unrestricted.
|
||
max_class: Maximum IrreversibilityClass this token permits.
|
||
ttl_seconds: Token lifetime in seconds.
|
||
delegation_depth: How many times this token can be re-delegated.
|
||
private_key: Ed25519 private key for signing. Generated if None.
|
||
"""
|
||
now = int(time.time())
|
||
nonce = base64.urlsafe_b64encode(
|
||
hashlib.sha256(f"{principal_id}{now}".encode()).digest()[:16]
|
||
).decode()
|
||
|
||
token = cls(
|
||
iss=principal_id,
|
||
sub=agent_id,
|
||
iat=now,
|
||
exp=now + ttl_seconds,
|
||
scope=scope,
|
||
max_irreversibility_class=max_class,
|
||
delegation_depth=delegation_depth,
|
||
nonce=nonce,
|
||
)
|
||
|
||
if private_key is None:
|
||
private_key = Ed25519PrivateKey.generate()
|
||
|
||
token._public_key = private_key.public_key()
|
||
token._signature = private_key.sign(token._canonical_bytes())
|
||
return token
|
||
|
||
def _canonical_bytes(self) -> bytes:
|
||
"""Deterministic serialisation for signing/verification."""
|
||
payload = {
|
||
"iss": self.iss,
|
||
"sub": self.sub,
|
||
"iat": self.iat,
|
||
"exp": self.exp,
|
||
"scope": sorted(self.scope),
|
||
"max_irreversibility_class": int(self.max_irreversibility_class),
|
||
"delegation_depth": self.delegation_depth,
|
||
"nonce": self.nonce,
|
||
}
|
||
return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()
|
||
|
||
def verify(self, public_key: Ed25519PublicKey) -> bool:
|
||
"""Verify the token's Ed25519 signature."""
|
||
try:
|
||
public_key.verify(self._signature, self._canonical_bytes())
|
||
return True
|
||
except InvalidSignature:
|
||
return False
|
||
|
||
def is_expired(self) -> bool:
|
||
return int(time.time()) > self.exp
|
||
|
||
def permits_tool(self, tool_name: str) -> bool:
|
||
"""Check whether this token's scope covers the requested tool."""
|
||
if "*" in self.scope:
|
||
return True
|
||
return tool_name in self.scope
|
||
|
||
def permits_class(self, action_class: IrreversibilityClass) -> bool:
|
||
return action_class <= self.max_irreversibility_class
|
||
|
||
def to_dict(self) -> dict:
|
||
return {
|
||
"iss": self.iss,
|
||
"sub": self.sub,
|
||
"iat": self.iat,
|
||
"exp": self.exp,
|
||
"scope": self.scope,
|
||
"max_irreversibility_class": int(self.max_irreversibility_class),
|
||
"delegation_depth": self.delegation_depth,
|
||
"nonce": self.nonce,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Verification result
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass
|
||
class VerificationResult:
|
||
allowed: bool
|
||
reason: str
|
||
tool_name: str
|
||
action_class: IrreversibilityClass
|
||
token_iss: Optional[str] = None
|
||
requires_confirmation: bool = False
|
||
|
||
def __str__(self) -> str:
|
||
status = "ALLOWED" if self.allowed else "BLOCKED"
|
||
conf = " [CONFIRMATION REQUIRED]" if self.requires_confirmation else ""
|
||
return (
|
||
f"[HDP] {status}{conf} — tool={self.tool_name} "
|
||
f"class={self.action_class.name} reason={self.reason}"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# HDP Middleware
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class HDPMiddleware:
|
||
"""
|
||
HDP verification gate for Gemma 4 function calls.
|
||
|
||
Sits between Gemma 4's function-call output and the tool execution layer.
|
||
For each function call Gemma 4 generates, this middleware:
|
||
|
||
1. Parses the tool name from the function call.
|
||
2. Looks up its IrreversibilityClass.
|
||
3. Verifies the attached HDT (signature, expiry, scope, class ceiling).
|
||
4. For Class 2 actions, invokes the confirmation callback.
|
||
5. Blocks Class 3 actions unless explicitly pre-authorized with
|
||
dual verification (HDP-P §5.4).
|
||
6. Logs all decisions before forwarding or blocking.
|
||
|
||
Usage:
|
||
middleware = HDPMiddleware(
|
||
public_key=principal_public_key,
|
||
tool_class_map=DEFAULT_TOOL_CLASS_MAP,
|
||
confirmation_callback=my_confirmation_fn,
|
||
)
|
||
|
||
# Wrap your tool executor:
|
||
result = middleware.gate(
|
||
function_call=gemma_output, # {"name": "...", "parameters": {...}}
|
||
token=hdp_token,
|
||
)
|
||
|
||
if result.allowed:
|
||
output = execute_tool(function_call)
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
public_key: Ed25519PublicKey,
|
||
tool_class_map: dict[str, IrreversibilityClass] = None,
|
||
confirmation_callback: Optional[Callable[[str, dict], bool]] = None,
|
||
default_class: IrreversibilityClass = IrreversibilityClass.CLASS_1,
|
||
audit_log: Optional[list] = None,
|
||
):
|
||
"""
|
||
Args:
|
||
public_key: Principal's Ed25519 public key for HDT verification.
|
||
tool_class_map: Mapping of tool names to IrreversibilityClass.
|
||
Defaults to DEFAULT_TOOL_CLASS_MAP.
|
||
confirmation_callback: Called for Class 2 actions. Receives
|
||
(tool_name, parameters) and returns bool.
|
||
If None, Class 2 actions are blocked.
|
||
default_class: Class assigned to unknown tools. Defaults to CLASS_1.
|
||
audit_log: Optional list to append VerificationResult records to.
|
||
"""
|
||
self.public_key = public_key
|
||
self.tool_class_map = tool_class_map or DEFAULT_TOOL_CLASS_MAP
|
||
self.confirmation_callback = confirmation_callback
|
||
self.default_class = default_class
|
||
self.audit_log = audit_log if audit_log is not None else []
|
||
|
||
def classify(self, tool_name: str) -> IrreversibilityClass:
|
||
"""Return the IrreversibilityClass for a tool name."""
|
||
return self.tool_class_map.get(tool_name, self.default_class)
|
||
|
||
def gate(
|
||
self,
|
||
function_call: dict,
|
||
token: HDPDelegationToken,
|
||
) -> VerificationResult:
|
||
"""
|
||
Main verification gate. Call this for every Gemma 4 function call.
|
||
|
||
Args:
|
||
function_call: Gemma 4 function call dict:
|
||
{"name": "tool_name", "parameters": {...}}
|
||
token: HDPDelegationToken issued by the human principal.
|
||
|
||
Returns:
|
||
VerificationResult — check .allowed before executing the tool.
|
||
"""
|
||
tool_name = function_call.get("name", "")
|
||
parameters = function_call.get("parameters", {})
|
||
action_class = self.classify(tool_name)
|
||
|
||
def _block(reason: str) -> VerificationResult:
|
||
result = VerificationResult(
|
||
allowed=False,
|
||
reason=reason,
|
||
tool_name=tool_name,
|
||
action_class=action_class,
|
||
token_iss=token.iss if token else None,
|
||
)
|
||
self.audit_log.append(result)
|
||
print(result)
|
||
return result
|
||
|
||
def _allow(reason: str, requires_confirmation: bool = False) -> VerificationResult:
|
||
result = VerificationResult(
|
||
allowed=True,
|
||
reason=reason,
|
||
tool_name=tool_name,
|
||
action_class=action_class,
|
||
token_iss=token.iss,
|
||
requires_confirmation=requires_confirmation,
|
||
)
|
||
self.audit_log.append(result)
|
||
print(result)
|
||
return result
|
||
|
||
# ── 1. Token presence ───────────────────────────────────────────────
|
||
if token is None:
|
||
return _block("no HDT present")
|
||
|
||
# ── 2. Expiry ───────────────────────────────────────────────────────
|
||
if token.is_expired():
|
||
return _block("HDT expired")
|
||
|
||
# ── 3. Signature ────────────────────────────────────────────────────
|
||
if not token.verify(self.public_key):
|
||
return _block("HDT signature invalid")
|
||
|
||
# ── 4. Scope ────────────────────────────────────────────────────────
|
||
if not token.permits_tool(tool_name):
|
||
return _block(f"tool '{tool_name}' not in HDT scope")
|
||
|
||
# ── 5. Irreversibility class ceiling ────────────────────────────────
|
||
if not token.permits_class(action_class):
|
||
return _block(
|
||
f"action class {action_class.name} exceeds HDT ceiling "
|
||
f"{token.max_irreversibility_class.name}"
|
||
)
|
||
|
||
# ── 6. Class 3 — always blocked without explicit dual verification ──
|
||
if action_class == IrreversibilityClass.CLASS_3:
|
||
# In production: implement dual-principal confirmation (HDP-P §5.4)
|
||
return _block(
|
||
"Class 3 physical action requires dual-principal confirmation "
|
||
"(HDP-P §5.4) — not implemented in this middleware instance"
|
||
)
|
||
|
||
# ── 7. Class 2 — confirmation callback required ─────────────────────
|
||
if action_class == IrreversibilityClass.CLASS_2:
|
||
if self.confirmation_callback is None:
|
||
return _block(
|
||
"Class 2 action requires confirmation callback — "
|
||
"none configured"
|
||
)
|
||
confirmed = self.confirmation_callback(tool_name, parameters)
|
||
if not confirmed:
|
||
return _block("Class 2 action — confirmation denied by principal")
|
||
return _allow("Class 2 confirmed by principal", requires_confirmation=True)
|
||
|
||
# ── 8. Class 0 / 1 — allow ─────────────────────────────────────────
|
||
return _allow(f"HDT valid, scope and class verified")
|
||
|
||
def gate_batch(
|
||
self,
|
||
function_calls: list[dict],
|
||
token: HDPDelegationToken,
|
||
) -> list[VerificationResult]:
|
||
"""Verify a list of function calls. Returns one result per call."""
|
||
return [self.gate(fc, token) for fc in function_calls]
|