# 微软智能体治理工具包实现：通过策略、审批、审计日志与风险控制实现安全AI智能体工具调用

- 来源：MarkTechPost（RSS）
- 作者：Sana Hassan
- 发布时间：2026-06-01 04:07
- AIHOT 分数：63
- AIHOT 链接：https://aihot.virxact.com/items/cmpu8bvuc01iuslag9twi4rvx
- 原文链接：https://www.marktechpost.com/2026/05/31/an-implementation-of-the-microsoft-agent-governance-toolkit-for-safe-ai-agent-tool-use-with-policies-approvals-audit-logs-and-risk-controls

## AI 摘要

该实现方案创建了一个可执行的智能体治理工作流。智能体不直接执行工具，其每个操作首先经过一个治理层，该层会检查智能体的身份、信任分数、风险等级、请求的工具、动作类型和敏感性等级等，以确保安全。实现以Colab-ready形式提供，参考了微软的Agent Governance Toolkit。

## 正文

In this tutorial, we build a governed AI-agent workflow using Microsoft’s Agent Governance Toolkit as the reference point. We create a Colab-ready implementation where agents do not directly execute tools; instead, every action first passes through a governance layer that checks the agent’s identity, trust score, risk tier, requested tool, action type, sensitivity level, and policy rules. We define a YAML-based policy that controls destructive database operations, external email sending, shell execution, access to sensitive data, and financial transfers. We then wrap each tool with governance logic so that actions can be allowed, denied, sandboxed, or routed through an approval step before execution. We also generate tamper-evident audit records, run policy tests, activate a kill switch, summarize governance decisions, and visualize the relationships between agents, tools, rules, and outcomes as a graph.

Copy CodeCopiedUse a different Browser

import os import sys import json import time import uuid import hmac import yaml import hashlib import random import shutil import subprocess from dataclasses import dataclass, asdict from datetime import datetime, timezone from typing import Any, Dict, List, Callable, Optional def pip_install(*packages): subprocess.run( [sys.executable, "-m", "pip", "install", "-q", *packages], check=False ) pip_install("pyyaml", "pandas", "networkx", "matplotlib", "rich") pip_install("agent-governance-toolkit[full]") from rich.console import Console from rich.table import Table from rich.panel import Panel from rich import box import pandas as pd import networkx as nx import matplotlib.pyplot as plt console = Console() REPO_URL = "https://github.com/microsoft/agent-governance-toolkit" REPO_DIR = "/content/agent-governance-toolkit" if not os.path.exists(REPO_DIR): subprocess.run(["git", "clone", "--depth", "1", REPO_URL, REPO_DIR], check=False) official_govern = None official_import_error = None try: from agentmesh.governance import govern as official_govern except Exception as e: official_import_error = repr(e)

We set up the Colab environment by installing the required libraries and importing everything needed for policy handling, auditing, visualization, and data analysis. We also clone the Microsoft Agent Governance Toolkit repository to keep the notebook connected to the original project. We then try to import the official governance function, while keeping the tutorial runnable even if the preview package changes.

Copy CodeCopiedUse a different Browser

POLICY_PATH = "/content/advanced_agent_policy.yaml" policy_yaml = """ apiVersion: governance.toolkit/v1 name: advanced-colab-governance-policy default_action: allow metadata: owner: ai-platform-team environment: tutorial description: > Demonstrates deterministic governance controls for AI agent tool calls. rules: - name: block-destructive-database-actions description: "Agents must not perform destructive database operations." condition: "action.type in ['drop_table', 'delete_table', 'truncate_table']" action: deny severity: critical owasp_risk: "Tool misuse / Excessive agency" - name: require-human-approval-for-email description: "External email requires approval before execution." condition: "action.type == 'send_email' and action.recipient_domain != 'internal.local'" action: require_approval approvers: ["security-team", "business-owner"] severity: high owasp_risk: "Goal hijacking / Unauthorized action" - name: sandbox-shell-execution description: "Shell commands must run in a sandbox with blocked dangerous commands." condition: "action.type == 'shell_exec'" action: sandbox sandbox: blocked_terms: ["rm -rf", "curl http", "wget http", "chmod 777", "sudo"] max_runtime_seconds: 2 severity: high owasp_risk: "Tool misuse / Unsafe execution" - name: deny-low-trust-agent-sensitive-data description: "Low-trust agents cannot access sensitive data." condition: "identity.trust_score 1000" action: require_approval approvers: ["finance-controller"] severity: critical owasp_risk: "Excessive agency / Business process compromise" - name: rate-limit-high-risk-agent description: "High-risk agents are blocked from repeated autonomous actions." condition: "identity.risk_tier == 'high' and action.autonomous == True" action: deny severity: medium owasp_risk: "Rogue agent / Cascading failure" """ with open(POLICY_PATH, "w") as f: f.write(policy_yaml) with open(POLICY_PATH, "r") as f: policy = yaml.safe_load(f)

We create a YAML governance policy that defines how agent actions should be handled before execution. We add rules to block destructive database actions, require approval for external emails and financial transfers, sandbox shell commands, and restrict low-trust agents from sensitive data. We then save and reload this policy so the rest of the tutorial can use it as the main governance configuration.

Copy CodeCopiedUse a different Browser

@dataclass class AgentIdentity: agent_id: str name: str role: str owner: str trust_score: float risk_tier: str scopes: List[str] @dataclass class GovernanceDecision: decision_id: str timestamp: str policy_name: str agent_id: str agent_name: str tool_name: str action: Dict[str, Any] decision: str matched_rule: Optional[str] severity: Optional[str] reason: str approved_by: Optional[str] previous_hash: str record_hash: str class GovernanceDenied(Exception): pass class ApprovalRequired(Exception): pass class SandboxViolation(Exception): pass class DotDict(dict): def __getattr__(self, item): value = self.get(item) if isinstance(value, dict): return DotDict(value) return value def safe_eval_condition(condition: str, action: Dict[str, Any], identity: AgentIdentity) -> bool: safe_globals = { "__builtins__": {}, "True": True, "False": False, "None": None, } safe_locals = { "action": DotDict(action), "identity": DotDict(asdict(identity)), } try: return bool(eval(condition, safe_globals, safe_locals)) except Exception as e: return False

We define the core data structures for representing agent identities, governance decisions, and governance-related exceptions. We also create a small dot-access dictionary helper so that policy conditions can read values such as action.type and identity.trust_score. We then build a safe condition evaluator that checks whether each policy rule matches the current agent action.

Copy CodeCopiedUse a different Browser

class TamperEvidentAuditLog: def __init__(self, secret: bytes = b"tutorial-secret-key"): self.records: List[GovernanceDecision] = [] self.secret = secret self.last_hash = "GENESIS" def _hash_record(self, payload: Dict[str, Any], previous_hash: str) -> str: canonical = json.dumps( {"payload": payload, "previous_hash": previous_hash}, sort_keys=True, default=str ).encode() return hmac.new(self.secret, canonical, hashlib.sha256).hexdigest() def append( self, policy_name: str, identity: AgentIdentity, tool_name: str, action: Dict[str, Any], decision: str, matched_rule: Optional[str], severity: Optional[str], reason: str, approved_by: Optional[str] = None ) -> GovernanceDecision: base_payload = { "decision_id": str(uuid.uuid4()), "timestamp": datetime.now(timezone.utc).isoformat(), "policy_name": policy_name, "agent_id": identity.agent_id, "agent_name": identity.name, "tool_name": tool_name, "action": action, "decision": decision, "matched_rule": matched_rule, "severity": severity, "reason": reason, "approved_by": approved_by, } record_hash = self._hash_record(base_payload, self.last_hash) record = GovernanceDecision( **base_payload, previous_hash=self.last_hash, record_hash=record_hash ) self.records.append(record) self.last_hash = record_hash return record def verify(self) -> bool: previous = "GENESIS" for r in self.records: payload = asdict(r) record_hash = payload.pop("record_hash") previous_hash = payload.pop("previous_hash") if previous_hash != previous: return False expected = self._hash_record(payload, previous_hash) if expected != record_hash: return False previous = record_hash return True def to_dataframe(self) -> pd.DataFrame: return pd.DataFrame([asdict(r) for r in self.records]) audit_log = TamperEvidentAuditLog()

We implement a tamper-evident audit log that records every governance decision made by the system. We use chained hashes, so each new record depends on the previous record, making changes easier to detect. We also add methods to verify the audit chain and convert the records into a dataframe for later analysis.

Copy CodeCopiedUse a different Browser

class TutorialGovernanceEngine: def __init__(self, policy: Dict[str, Any], audit_log: TamperEvidentAuditLog): self.policy = policy self.audit_log = audit_log self.kill_switch_enabled = False self.error_budget = 5 self.recent_denials = 0 def activate_kill_switch(self): self.kill_switch_enabled = True def deactivate_kill_switch(self): self.kill_switch_enabled = False def evaluate( self, identity: AgentIdentity, tool_name: str, action: Dict[str, Any] ) -> GovernanceDecision: if self.kill_switch_enabled: return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision="deny", matched_rule="global-kill-switch", severity="critical", reason="Global governance kill switch is active." ) for rule in self.policy.get("rules", []): condition = rule.get("condition", "") if safe_eval_condition(condition, action, identity): rule_action = rule.get("action", "deny") matched_rule = rule.get("name") severity = rule.get("severity") description = rule.get("description", "Policy rule matched.") if rule_action == "deny": self.recent_denials += 1 return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision="deny", matched_rule=matched_rule, severity=severity, reason=description ) if rule_action == "require_approval": return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision="require_approval", matched_rule=matched_rule, severity=severity, reason=description ) if rule_action == "sandbox": blocked_terms = rule.get("sandbox", {}).get("blocked_terms", []) command = str(action.get("command", "")) for term in blocked_terms: if term in command: self.recent_denials += 1 return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision="deny", matched_rule=matched_rule, severity=severity, reason=f"Sandbox blocked command term: {term}" ) return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision="sandbox", matched_rule=matched_rule, severity=severity, reason=description ) return self.audit_log.append( policy_name=self.policy["name"], identity=identity, tool_name=tool_name, action=action, decision=self.policy.get("default_action", "allow"), matched_rule=None, severity=None, reason="No policy rule matched. Default action applied." ) engine = TutorialGovernanceEngine(policy, audit_log)

We build the main governance engine that compares each agent action against the YAML policy rules. We handle different outcomes such as deny, approval required, sandbox mode, and default allow. We also include a kill switch that immediately blocks all actions when needed.

Copy CodeCopiedUse a different Browser

def query_database(table: str, operation: str = "select") -> Dict[str, Any]: return { "status": "success", "operation": operation, "table": table, "rows_returned": random.randint(10, 100) } def send_email(to: str, subject: str, body: str) -> Dict[str, Any]: return { "status": "sent", "to": to, "subject": subject, "body_preview": body[:80] } def shell_exec(command: str) -> Dict[str, Any]: allowed_commands = ["echo", "date", "pwd", "ls"] first = command.strip().split()[0] if command.strip() else "" if first not in allowed_commands: return { "status": "blocked_by_tutorial_shell", "command": command, "reason": "Only harmless demo shell commands are executed." } result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=2 ) return { "status": "executed", "command": command, "stdout": result.stdout.strip(), "stderr": result.stderr.strip() } def transfer_money(amount: float, destination: str) -> Dict[str, Any]: return { "status": "transferred", "amount": amount, "destination": destination } class GovernedTool: def __init__( self, name: str, fn: Callable, engine: TutorialGovernanceEngine, identity: AgentIdentity, approval_simulator: Optional[Callable[[GovernanceDecision], bool]] = None ): self.name = name self.fn = fn self.engine = engine self.identity = identity self.approval_simulator = approval_simulator or (lambda decision: False) def __call__(self, **kwargs): action = dict(kwargs) action.setdefault("autonomous", True) decision = self.engine.evaluate( identity=self.identity, tool_name=self.name, action=action ) if decision.decision == "deny": raise GovernanceDenied( f"Action denied by rule '{decision.matched_rule}': {decision.reason}" ) if decision.decision == "require_approval": approved = self.approval_simulator(decision) if not approved: raise ApprovalRequired( f"Approval required by rule '{decision.matched_rule}': {decision.reason}" ) self.engine.audit_log.append( policy_name=self.engine.policy["name"], identity=self.identity, tool_name=self.name, action=action, decision="approved", matched_rule=decision.matched_rule, severity=decision.severity, reason="Human approval simulated for tutorial.", approved_by="tutorial-approver" ) return self.fn(**kwargs)

We define sample tools that represent real agent capabilities, including database access, email sending, shell execution, and money transfer. We then create a governed tool wrapper that ensures every tool call passes through the governance engine first. We ensure denied actions stop immediately, that approval-based actions require a simulated approval, and that only approved or allowed actions reach the actual tool.

Copy CodeCopiedUse a different Browser

research_agent = AgentIdentity( agent_id="agent-research-001", name="ResearchAgent", role="market_research", owner="strategy-team", trust_score=0.91, risk_tier="low", scopes=["read_database", "web_search", "internal_email"] ) ops_agent = AgentIdentity( agent_id="agent-ops-002", name="OpsAgent", role="automation", owner="platform-team", trust_score=0.72, risk_tier="medium", scopes=["shell_exec", "read_database"] ) unknown_agent = AgentIdentity( agent_id="agent-shadow-999", name="ShadowAgent", role="unknown", owner="unknown", trust_score=0.42, risk_tier="high", scopes=["unknown"] ) finance_agent = AgentIdentity( agent_id="agent-finance-003", name="FinanceAgent", role="finance_ops", owner="finance-team", trust_score=0.88, risk_tier="low", scopes=["transfer_money", "read_database"] ) def tutorial_approval_simulator(decision: GovernanceDecision) -> bool: action = decision.action if decision.matched_rule == "require-approval-for-financial-transaction": return action.get("amount", 0) ", arrowsize=15) nx.draw_networkx_labels(G, pos, font_size=8) plt.title("Agent Governance Graph: Agents, Tools, Decisions, and Policy Rules") plt.axis("off") plt.tight_layout() plt.show() EXPORT_DIR = "/content/agt_tutorial_outputs" os.makedirs(EXPORT_DIR, exist_ok=True) audit_json_path = os.path.join(EXPORT_DIR, "tamper_evident_audit_log.json") audit_csv_path = os.path.join(EXPORT_DIR, "governance_audit_log.csv") policy_copy_path = os.path.join(EXPORT_DIR, "advanced_agent_policy.yaml") test_results_path = os.path.join(EXPORT_DIR, "policy_test_results.csv") with open(audit_json_path, "w") as f: json.dump([asdict(r) for r in audit_log.records], f, indent=2, default=str) audit_df.to_csv(audit_csv_path, index=False) test_df.to_csv(test_results_path, index=False) shutil.copy(POLICY_PATH, policy_copy_path)

We run a set of test scenarios that show how the governed system handles safe actions, risky actions, approval flows, and blocked operations. We display the audit log, run policy tests, activate and deactivate the kill switch, and summarize governance decisions with tables and charts. We also create a governance graph and export the audit logs, policy file, and test results as reusable artifacts.

In conclusion, we have a fully governed-agent workflow that covers both policy enforcement and observability. We simulated multiple agents with varying trust levels. We showed how the same system responds differently depending on the agent’s identity, the action’s sensitivity, and the rules defined in the policy file. Safe actions, such as simple database reads, are executed. In contrast, risky actions, such as destructive database changes, unsafe shell commands, low-trust sensitive access, and large financial transfers, are blocked or sent for approval. We also recorded every decision in an audit log, verified the audit chain, ran policy tests, exported governance artifacts, and created visual summaries that make the system’s behavior easier to review.

Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 150k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us

The post An Implementation of the Microsoft Agent Governance Toolkit for Safe AI Agent Tool Use with Policies, Approvals, Audit Logs, and Risk Controls appeared first on MarkTechPost.
