Creating Custom Plugins
This guide provides examples of how to create custom plugins for Arc Memory to ingest data from additional sources beyond the built-in Git, GitHub, and ADR plugins.
Related Documentation:
Plugin Architecture Overview
Arc Memory’s plugin architecture allows you to extend the system to ingest data from any source. Plugins are Python classes that implement the IngestorPlugin
protocol, which defines methods for:
- Identifying the plugin
- Specifying the types of nodes and edges it creates
- Ingesting data from the source
Basic Plugin Template
Here’s a basic template for creating a custom plugin:
from typing import List, Optional, Tuple, Dict, Any
from pathlib import Path
from datetime import datetime
from arc_memory.plugins import IngestorPlugin
from arc_memory.schema.models import Node, Edge, NodeType, EdgeRel
class MyCustomPlugin(IngestorPlugin):
"""Custom ingestor plugin for Arc Memory."""
def get_name(self) -> str:
"""Return the name of this plugin."""
return "my-custom-source"
def get_node_types(self) -> List[str]:
"""Return the node types this plugin can create."""
return ["custom_node"]
def get_edge_types(self) -> List[str]:
"""Return the edge types this plugin can create."""
return [EdgeRel.MENTIONS]
def ingest(
self,
last_processed: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Node], List[Edge], Dict[str, Any]]:
"""Ingest data from the custom source."""
nodes = []
edges = []
# Your implementation here
# 1. Fetch data from your source
# 2. Create nodes and edges
# 3. Return them along with metadata
# Example: Create a sample node
node = Node(
id="custom_node:1",
type="custom_node",
title="My Custom Node",
body="This is a custom node created by my plugin",
ts=datetime.now(),
extra={
"custom_field": "custom_value",
"source": "my_plugin"
}
)
nodes.append(node)
# Example: Create a sample edge connecting to a Git commit
edge = Edge(
src="custom_node:1",
dst="commit:abc123", # This would be a real commit ID in practice
rel=EdgeRel.MENTIONS
)
edges.append(edge)
# Create metadata for incremental builds
metadata = {
"timestamp": datetime.now().isoformat(),
"count": len(nodes)
}
return nodes, edges, metadata
Registering Your Plugin
To make your plugin available to Arc Memory, you need to register it using Python’s entry point system:
In setup.py
from setuptools import setup
setup(
name="arc-memory-myplugin",
version="0.1.0",
packages=["arc_memory_myplugin"],
entry_points={
"arc_memory.plugins": [
"my-custom-source = arc_memory_myplugin.plugin:MyCustomPlugin",
],
},
)
In pyproject.toml
[project.entry-points."arc_memory.plugins"]
my-custom-source = "arc_memory_myplugin.plugin:MyCustomPlugin"
Example: Notion Plugin
Here’s an example of a plugin that ingests data from Notion:
import os
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
from arc_memory.plugins import IngestorPlugin
from arc_memory.schema.models import Node, Edge, NodeType, EdgeRel
class NotionIngestor(IngestorPlugin):
"""Ingestor plugin for Notion pages and databases."""
def get_name(self) -> str:
"""Return the name of this plugin."""
return "notion"
def get_node_types(self) -> List[str]:
"""Return the node types this plugin can create."""
return ["notion_page", "notion_database"]
def get_edge_types(self) -> List[str]:
"""Return the edge types this plugin can create."""
return [EdgeRel.MENTIONS, "CONTAINS"]
def ingest(
self,
last_processed: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Node], List[Edge], Dict[str, Any]]:
"""Ingest data from Notion."""
nodes = []
edges = []
# Get Notion API token from environment or keyring
notion_token = os.environ.get("NOTION_TOKEN")
if not notion_token:
try:
import keyring
notion_token = keyring.get_password("arc_memory", "notion_token")
except:
pass
if not notion_token:
print("Notion token not found. Skipping Notion ingestion.")
return [], [], {"last_sync": None}
# Initialize Notion client
from notion_client import Client
notion = Client(auth=notion_token)
# Get last sync time for incremental ingestion
last_sync = None
if last_processed and "last_sync" in last_processed:
last_sync = last_processed["last_sync"]
# Fetch pages from Notion
try:
# Query for pages
response = notion.search(
filter={"property": "object", "value": "page"},
sort={"direction": "descending", "timestamp": "last_edited_time"}
)
# Process pages
for page in response["results"]:
# Skip if this page was processed in a previous run
if last_sync and page["last_edited_time"] <= last_sync:
continue
# Create a node for this page
page_id = page["id"].replace("-", "")
node = Node(
id=f"notion_page:{page_id}",
type="notion_page",
title=self._get_page_title(page),
body=self._get_page_content(page),
ts=datetime.fromisoformat(page["last_edited_time"].replace("Z", "+00:00")),
extra={
"url": page["url"],
"created_time": page["created_time"],
"last_edited_time": page["last_edited_time"],
"notion_id": page["id"]
}
)
nodes.append(node)
# Look for mentions of Git commits, PRs, or issues
self._extract_mentions(node, edges)
# Create metadata for incremental builds
metadata = {
"last_sync": datetime.now().isoformat()
}
return nodes, edges, metadata
except Exception as e:
print(f"Error ingesting Notion data: {e}")
return [], [], {"last_sync": last_sync}
def _get_page_title(self, page):
"""Extract the title from a Notion page."""
# Implementation depends on Notion API structure
# This is a simplified example
if "properties" in page and "title" in page["properties"]:
title_property = page["properties"]["title"]
if "title" in title_property and title_property["title"]:
return title_property["title"][0]["plain_text"]
return "Untitled"
def _get_page_content(self, page):
"""Extract the content from a Notion page."""
# In a real implementation, you would use the Notion API to get blocks
# This is a simplified example
return f"Notion page content for {page['id']}"
def _extract_mentions(self, node, edges):
"""Extract mentions of Git commits, PRs, or issues from page content."""
# In a real implementation, you would parse the content for patterns like:
# - Commit: abc123
# - PR #42
# - Issue #123
# This is a simplified example
# Example: If we find a mention of a commit
edges.append(Edge(
src=node.id,
dst="commit:abc123",
rel=EdgeRel.MENTIONS
))
Example: Jira Plugin
Here’s an example of a plugin that ingests data from Jira:
import os
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
import re
from arc_memory.plugins import IngestorPlugin
from arc_memory.schema.models import Node, Edge, NodeType, EdgeRel
class JiraIngestor(IngestorPlugin):
"""Ingestor plugin for Jira issues."""
def get_name(self) -> str:
"""Return the name of this plugin."""
return "jira"
def get_node_types(self) -> List[str]:
"""Return the node types this plugin can create."""
return ["jira_issue"]
def get_edge_types(self) -> List[str]:
"""Return the edge types this plugin can create."""
return [EdgeRel.MENTIONS, "IMPLEMENTS"]
def ingest(
self,
last_processed: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Node], List[Edge], Dict[str, Any]]:
"""Ingest data from Jira."""
nodes = []
edges = []
# Get Jira credentials from environment
jira_url = os.environ.get("JIRA_URL")
jira_user = os.environ.get("JIRA_USER")
jira_token = os.environ.get("JIRA_TOKEN")
if not all([jira_url, jira_user, jira_token]):
print("Jira credentials not found. Skipping Jira ingestion.")
return [], [], {"last_updated": None}
# Initialize Jira client
from jira import JIRA
jira = JIRA(server=jira_url, basic_auth=(jira_user, jira_token))
# Get project key from config or environment
project_key = os.environ.get("JIRA_PROJECT", "PROJ")
# Get last updated time for incremental ingestion
last_updated = None
if last_processed and "last_updated" in last_processed:
last_updated = last_processed["last_updated"]
# Build JQL query
jql = f"project = {project_key}"
if last_updated:
jql += f" AND updated >= '{last_updated}'"
# Fetch issues from Jira
try:
issues = jira.search_issues(jql, maxResults=100)
for issue in issues:
# Create a node for this issue
node = Node(
id=f"jira_issue:{issue.key}",
type="jira_issue",
title=issue.fields.summary,
body=issue.fields.description or "",
ts=datetime.fromisoformat(issue.fields.updated.replace("Z", "+00:00")),
extra={
"key": issue.key,
"status": issue.fields.status.name,
"assignee": issue.fields.assignee.displayName if issue.fields.assignee else None,
"reporter": issue.fields.reporter.displayName if issue.fields.reporter else None,
"created": issue.fields.created,
"updated": issue.fields.updated,
"url": f"{jira_url}/browse/{issue.key}"
}
)
nodes.append(node)
# Extract commit mentions from comments
for comment in jira.comments(issue.key):
# Look for commit mentions in the format "commit:abc123" or similar
commit_matches = re.findall(r'commit:([a-f0-9]+)', comment.body, re.IGNORECASE)
for commit_hash in commit_matches:
edges.append(Edge(
src=node.id,
dst=f"commit:{commit_hash}",
rel=EdgeRel.MENTIONS
))
# Look for PR mentions in the format "PR #42" or similar
pr_matches = re.findall(r'PR #(\d+)', comment.body, re.IGNORECASE)
for pr_number in pr_matches:
edges.append(Edge(
src=node.id,
dst=f"pr:{pr_number}",
rel=EdgeRel.MENTIONS
))
# Create metadata for incremental builds
metadata = {
"last_updated": datetime.now().isoformat()
}
return nodes, edges, metadata
except Exception as e:
print(f"Error ingesting Jira data: {e}")
return [], [], {"last_updated": last_updated}
Best Practices for Plugin Development
1. Use Unique IDs
Ensure your node IDs are unique by prefixing them with your plugin name:
node_id = f"{plugin_name}:{original_id}" # e.g., "notion_page:123"
2. Handle Incremental Ingestion
Use the last_processed
parameter to implement incremental ingestion:
def ingest(self, last_processed: Optional[Dict[str, Any]] = None):
# Get last sync time
last_sync = None
if last_processed and "last_sync" in last_processed:
last_sync = last_processed["last_sync"]
# Only process new data
if last_sync:
# Filter data based on last_sync
pass
3. Error Handling
Implement robust error handling to prevent plugin failures from affecting the entire build process:
try:
# Your implementation
except Exception as e:
print(f"Error in plugin: {e}")
return [], [], {} # Return empty results
4. Documentation
Document your plugin thoroughly, including:
- Node types and their attributes
- Edge types and their meanings
- Configuration requirements
- Dependencies
5. Testing
Write tests for your plugin to ensure it works correctly:
def test_my_plugin():
plugin = MyCustomPlugin()
nodes, edges, metadata = plugin.ingest()
assert len(nodes) > 0
assert len(edges) > 0
assert "timestamp" in metadata
Plugin Development Workflow
Here’s a step-by-step workflow for developing and testing custom plugins:
1. Set Up Development Environment
# Create a new directory for your plugin
mkdir arc-memory-myplugin
cd arc-memory-myplugin
# Create a virtual environment
python -m venv venv
source venv/bin/activate # On Unix/macOS
venv\Scripts\activate # On Windows
# Install arc-memory in development mode
pip install -e /path/to/arc-memory
# Create basic project structure
mkdir -p src/arc_memory_myplugin
touch src/arc_memory_myplugin/__init__.py
touch src/arc_memory_myplugin/plugin.py
touch pyproject.toml
touch README.md
2. Implement Your Plugin
Create your plugin implementation in src/arc_memory_myplugin/plugin.py
:
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
from arc_memory.plugins import IngestorPlugin
from arc_memory.schema.models import Node, Edge, NodeType, EdgeRel
class MyPlugin(IngestorPlugin):
"""Custom ingestor plugin for Arc Memory."""
def get_name(self) -> str:
"""Return the name of this plugin."""
return "myplugin"
def get_node_types(self) -> List[str]:
"""Return the node types this plugin can create."""
return ["custom_node"]
def get_edge_types(self) -> List[str]:
"""Return the edge types this plugin can create."""
return [EdgeRel.MENTIONS]
def ingest(
self,
last_processed: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Node], List[Edge], Dict[str, Any]]:
"""Ingest data from the custom source."""
nodes = []
edges = []
# Your implementation here
# For testing, create a simple node
node = Node(
id="custom_node:1",
type="custom_node",
title="Test Node",
body="This is a test node",
ts=datetime.now(),
extra={"source": "myplugin"}
)
nodes.append(node)
# Create metadata for incremental builds
metadata = {
"timestamp": datetime.now().isoformat()
}
return nodes, edges, metadata
Set up your pyproject.toml
:
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "arc-memory-myplugin"
version = "0.1.0"
description = "Custom plugin for Arc Memory"
readme = "README.md"
requires-python = ">=3.8"
license = {text = "MIT"}
dependencies = [
"arc-memory",
]
[project.entry-points."arc_memory.plugins"]
myplugin = "arc_memory_myplugin.plugin:MyPlugin"
4. Install Your Plugin in Development Mode
# Install your plugin in development mode
pip install -e .
5. Test Your Plugin
Manual Testing
# Enable debug logging to see detailed output
arc build --debug
# Check if your plugin was discovered
arc doctor
Look for output like:
Discovered plugins: ['git', 'github', 'adr', 'myplugin']
Unit Testing
Create a test file tests/test_plugin.py
:
import unittest
from arc_memory_myplugin.plugin import MyPlugin
class TestMyPlugin(unittest.TestCase):
def test_plugin_basics(self):
plugin = MyPlugin()
self.assertEqual(plugin.get_name(), "myplugin")
self.assertIn("custom_node", plugin.get_node_types())
def test_ingest(self):
plugin = MyPlugin()
nodes, edges, metadata = plugin.ingest()
self.assertGreaterEqual(len(nodes), 1)
self.assertIn("timestamp", metadata)
if __name__ == "__main__":
unittest.main()
Run the tests:
python -m unittest tests/test_plugin.py
6. Debugging Plugins
Debug Logging
To see detailed logs during plugin execution:
# Enable debug logging
arc build --debug
Look for log messages related to your plugin:
DEBUG:arc_memory.plugins:Registered plugin: myplugin
INFO:arc_memory.build:Ingesting myplugin data...
Common Issues and Solutions
-
Plugin Not Discovered:
- Check your entry point configuration in
pyproject.toml
- Verify your plugin class implements all required methods
- Make sure your package is installed (
pip list | grep myplugin
)
-
Plugin Fails During Ingestion:
- Add try/except blocks with detailed logging in your
ingest
method
- Check for API errors or rate limiting if your plugin uses external APIs
- Verify your node and edge creation logic
-
No Data in Graph:
- Verify your plugin is returning non-empty lists of nodes and edges
- Check node IDs for proper formatting (e.g., “custom_node:1”)
- Ensure edge source and destination IDs refer to existing nodes
Interactive Debugging
For more complex issues, use Python’s debugger:
# Add this to your plugin code
import pdb
def ingest(self, last_processed=None):
# ... your code ...
pdb.set_trace() # Debugger will stop here
# ... more code ...
Then run:
7. Packaging and Distribution
Once your plugin is working correctly:
# Build the package
python -m build
# Test the built package
pip install dist/arc_memory_myplugin-0.1.0-py3-none-any.whl
# Publish to PyPI (if desired)
twine upload dist/*
Packaging and Distribution
To package your plugin for distribution:
- Create a Python package with your plugin implementation
- Register it using entry points as shown above
- Publish it to PyPI:
python -m build
twine upload dist/*
Users can then install your plugin with:
pip install arc-memory-myplugin
And it will be automatically discovered and used by Arc Memory.