SDK Usage Examples

This page provides examples of how to use the Arc Memory SDK in your applications. The SDK allows you to programmatically interact with the knowledge graph, build custom tools, and integrate Arc Memory into your workflows.

Installation

First, install the Arc Memory SDK:

pip install arc-memory

Basic Usage

Initializing Arc Memory

from arc_memory import ArcMemory

# Basic initialization with default options
arc = ArcMemory()

# Initialization with custom options
arc = ArcMemory(
    repo_path="path/to/repo",
    output_path="path/to/output.db",
    github_token="your_github_token",
    max_commits=1000,
    days=30
)

Building the Knowledge Graph

# Build a complete knowledge graph
build_result = arc.build()
print(f"Built graph with {build_result.node_count} nodes and {build_result.edge_count} edges")

# Build incrementally
incremental_result = arc.build_incremental()
print(f"Added {incremental_result.new_node_count} new nodes and {incremental_result.new_edge_count} new edges")

Tracing File History

# Trace the history of a specific line in a file
history = arc.trace_file(
    file_path="src/main.py",
    line_number=42,
    max_depth=3
)

# Process the results
for item in history:
    print(f"Commit: {item.commit_hash}")
    print(f"Author: {item.author}")
    print(f"Date: {item.date}")
    print(f"Message: {item.message}")
    if item.pr:
        print(f"PR: #{item.pr.number} - {item.pr.title}")
    print("---")
# Find nodes related to a specific commit
related = arc.relate_node(
    node_type="commit",
    node_id="abc123",
    max_depth=2,
    edge_types=["MODIFIES", "PART_OF", "REFERENCES"]
)

# Process the results
for node in related.nodes:
    print(f"Node: {node.id} ({node.type})")
    for edge in node.edges:
        print(f"  → {edge.target_node.id} ({edge.type})")

Advanced Usage

Custom Graph Queries

from arc_memory.models import NodeType, EdgeType

# Custom graph query
results = arc.query(
    start_node_type=NodeType.FILE,
    start_node_id="src/main.py",
    edge_types=[EdgeType.MODIFIES, EdgeType.REFERENCES],
    max_depth=3,
    filters={
        "author": "username",
        "date_after": "2023-01-01"
    }
)

# Process the results
for node in results.nodes:
    print(f"Node: {node.id} ({node.type})")
    for edge in node.edges:
        print(f"  → {edge.target_node.id} ({edge.type})")

Working with the Database Directly

from arc_memory.db import MemoryDB

# Open the database
db = MemoryDB("path/to/memory.db")

# Execute a custom query
results = db.execute_query("""
    MATCH (n:Commit)-[:MODIFIES]->(f:File)
    WHERE f.path = 'src/main.py'
    RETURN n.hash, n.author, n.message
""")

# Process the results
for row in results:
    print(f"Commit: {row['n.hash']}")
    print(f"Author: {row['n.author']}")
    print(f"Message: {row['n.message']}")

Running Simulations

# Run a simulation on current changes
simulation = arc.simulate(
    scenario="network_latency",
    branch="main",
    use_memory=True
)

# Process the results
print(f"Simulation ID: {simulation.id}")
print(f"Risk Score: {simulation.risk_score}/100")
print(f"Affected Services: {', '.join(simulation.affected_services)}")
print(f"Analysis: {simulation.analysis}")

Integration Examples

CI/CD Pipeline Integration

import os
from arc_memory import ArcMemory

# Initialize Arc Memory with CI/CD environment variables
arc = ArcMemory(
    repo_path=os.environ.get("CI_REPO_PATH", "."),
    github_token=os.environ.get("GITHUB_TOKEN"),
    output_path=os.environ.get("ARC_DB_PATH", ".arc/memory.db")
)

# Build the knowledge graph incrementally
arc.build_incremental()

# Get the current PR number from environment variables
pr_number = os.environ.get("CI_PR_NUMBER")

# Analyze the PR
if pr_number:
    pr_analysis = arc.analyze_pr(pr_number)
    
    # Post results as a comment on the PR
    if pr_analysis.risk_score > 50:
        arc.post_pr_comment(
            pr_number=pr_number,
            comment=f"⚠️ **High Risk Changes Detected**\n\n"
                   f"Risk Score: {pr_analysis.risk_score}/100\n\n"
                   f"Affected Services: {', '.join(pr_analysis.affected_services)}\n\n"
                   f"Analysis: {pr_analysis.analysis}"
        )

IDE Extension Integration

from arc_memory import ArcMemory

class ArcIDEExtension:
    def __init__(self):
        self.arc = ArcMemory()
        
    def on_file_open(self, file_path):
        """Called when a file is opened in the IDE."""
        # Get file history
        history = self.arc.trace_file(file_path)
        return self._format_history_for_ide(history)
        
    def on_line_hover(self, file_path, line_number):
        """Called when hovering over a line in the IDE."""
        # Get decision trail for the line
        trail = self.arc.why_file(file_path, line_number)
        return self._format_trail_for_ide(trail)
        
    def on_pre_commit(self, changed_files):
        """Called before committing changes."""
        # Run simulation on changed files
        simulation = self.arc.simulate(files=changed_files)
        return self._format_simulation_for_ide(simulation)
        
    def _format_history_for_ide(self, history):
        # Format history data for IDE display
        pass
        
    def _format_trail_for_ide(self, trail):
        # Format decision trail for IDE display
        pass
        
    def _format_simulation_for_ide(self, simulation):
        # Format simulation results for IDE display
        pass

Error Handling

from arc_memory import ArcMemory
from arc_memory.exceptions import GraphBuildError, QueryError, SimulationError

try:
    arc = ArcMemory()
    arc.build()
except GraphBuildError as e:
    print(f"Failed to build graph: {e}")
    # Handle build error

try:
    history = arc.trace_file("src/main.py", 42)
except QueryError as e:
    print(f"Query failed: {e}")
    # Handle query error

try:
    simulation = arc.simulate()
except SimulationError as e:
    print(f"Simulation failed: {e}")
    # Handle simulation error

Best Practices

  1. Use incremental builds for better performance in frequently updated environments
  2. Limit query depth to avoid performance issues with large repositories
  3. Cache results when making multiple queries for the same data
  4. Handle errors gracefully to provide a better user experience
  5. Use specific filters to narrow down query results
  6. Close database connections when done to free up resources

See Also