Creating Custom Plugins
This guide provides examples of how to create custom plugins for Arc Memory to ingest data from additional sources beyond the built-in Git, GitHub, and ADR plugins.
Related Documentation:
Plugin Architecture Overview
Arc Memory’s plugin architecture allows you to extend the system to ingest data from any source. Plugins are Python classes that implement the IngestorPlugin
protocol, which defines methods for:
- Identifying the plugin
- Specifying the types of nodes and edges it creates
- Ingesting data from the source
Basic Plugin Template
Here’s a basic template for creating a custom plugin:
Registering Your Plugin
To make your plugin available to Arc Memory, you need to register it using Python’s entry point system:
In setup.py
In pyproject.toml
Example: Notion Plugin
Here’s an example of a plugin that ingests data from Notion:
import os
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
from arc_memory.plugins import IngestorPlugin
from arc_memory.schema.models import Node, Edge, NodeType, EdgeRel
class NotionIngestor(IngestorPlugin):
"""Ingestor plugin for Notion pages and databases."""
def get_name(self) -> str:
"""Return the name of this plugin."""
return "notion"
def get_node_types(self) -> List[str]:
"""Return the node types this plugin can create."""
return ["notion_page", "notion_database"]
def get_edge_types(self) -> List[str]:
"""Return the edge types this plugin can create."""
return [EdgeRel.MENTIONS, "CONTAINS"]
def ingest(
self,
last_processed: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Node], List[Edge], Dict[str, Any]]:
"""Ingest data from Notion."""
nodes = []
edges = []
notion_token = os.environ.get("NOTION_TOKEN")
if not notion_token:
try:
import keyring
notion_token = keyring.get_password("arc_memory", "notion_token")
except:
pass
if not notion_token:
print("Notion token not found. Skipping Notion ingestion.")
return [], [], {"last_sync": None}
from notion_client import Client
notion = Client(auth=notion_token)
last_sync = None
if last_processed and "last_sync" in last_processed:
last_sync = last_processed["last_sync"]
try:
response = notion.search(
filter={"property": "object", "value": "page"},
sort={"direction": "descending", "timestamp": "last_edited_time"}
)
for page in response["results"]:
if last_sync and page["last_edited_time"] <= last_sync:
continue
page_id = page["id"].replace("-", "")
node = Node(
id=f"notion_page:{page_id}",
type="notion_page",
title=self._get_page_title(page),
body=self._get_page_content(page),
ts=datetime.fromisoformat(page["last_edited_time"].replace("Z", "+00:00")),
extra={
"url": page["url"],
"created_time": page["created_time"],
"last_edited_time": page["last_edited_time"],
"notion_id": page["id"]
}
)
nodes.append(node)
self._extract_mentions(node, edges)
metadata = {
"last_sync": datetime.now().isoformat()
}
return nodes, edges, metadata
except Exception as e:
print(f"Error ingesting Notion data: {e}")
return [], [], {"last_sync": last_sync}
def _get_page_title(self, page):
"""Extract the title from a Notion page."""
if "properties" in page and "title" in page["properties"]:
title_property = page["properties"]["title"]
if "title" in title_property and title_property["title"]:
return title_property["title"][0]["plain_text"]
return "Untitled"
def _get_page_content(self, page):
"""Extract the content from a Notion page."""
return f"Notion page content for {page['id']}"
def _extract_mentions(self, node, edges):
"""Extract mentions of Git commits, PRs, or issues from page content."""
edges.append(Edge(
src=node.id,
dst="commit:abc123",
rel=EdgeRel.MENTIONS
))
Example: Jira Plugin
Here’s an example of a plugin that ingests data from Jira:
import os
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
import re
from arc_memory.plugins import IngestorPlugin
from arc_memory.schema.models import Node, Edge, NodeType, EdgeRel
class JiraIngestor(IngestorPlugin):
"""Ingestor plugin for Jira issues."""
def get_name(self) -> str:
"""Return the name of this plugin."""
return "jira"
def get_node_types(self) -> List[str]:
"""Return the node types this plugin can create."""
return ["jira_issue"]
def get_edge_types(self) -> List[str]:
"""Return the edge types this plugin can create."""
return [EdgeRel.MENTIONS, "IMPLEMENTS"]
def ingest(
self,
last_processed: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Node], List[Edge], Dict[str, Any]]:
"""Ingest data from Jira."""
nodes = []
edges = []
jira_url = os.environ.get("JIRA_URL")
jira_user = os.environ.get("JIRA_USER")
jira_token = os.environ.get("JIRA_TOKEN")
if not all([jira_url, jira_user, jira_token]):
print("Jira credentials not found. Skipping Jira ingestion.")
return [], [], {"last_updated": None}
from jira import JIRA
jira = JIRA(server=jira_url, basic_auth=(jira_user, jira_token))
project_key = os.environ.get("JIRA_PROJECT", "PROJ")
last_updated = None
if last_processed and "last_updated" in last_processed:
last_updated = last_processed["last_updated"]
jql = f"project = {project_key}"
if last_updated:
jql += f" AND updated >= '{last_updated}'"
try:
issues = jira.search_issues(jql, maxResults=100)
for issue in issues:
node = Node(
id=f"jira_issue:{issue.key}",
type="jira_issue",
title=issue.fields.summary,
body=issue.fields.description or "",
ts=datetime.fromisoformat(issue.fields.updated.replace("Z", "+00:00")),
extra={
"key": issue.key,
"status": issue.fields.status.name,
"assignee": issue.fields.assignee.displayName if issue.fields.assignee else None,
"reporter": issue.fields.reporter.displayName if issue.fields.reporter else None,
"created": issue.fields.created,
"updated": issue.fields.updated,
"url": f"{jira_url}/browse/{issue.key}"
}
)
nodes.append(node)
for comment in jira.comments(issue.key):
commit_matches = re.findall(r'commit:([a-f0-9]+)', comment.body, re.IGNORECASE)
for commit_hash in commit_matches:
edges.append(Edge(
src=node.id,
dst=f"commit:{commit_hash}",
rel=EdgeRel.MENTIONS
))
pr_matches = re.findall(r'PR #(\d+)', comment.body, re.IGNORECASE)
for pr_number in pr_matches:
edges.append(Edge(
src=node.id,
dst=f"pr:{pr_number}",
rel=EdgeRel.MENTIONS
))
metadata = {
"last_updated": datetime.now().isoformat()
}
return nodes, edges, metadata
except Exception as e:
print(f"Error ingesting Jira data: {e}")
return [], [], {"last_updated": last_updated}
Best Practices for Plugin Development
1. Use Unique IDs
Ensure your node IDs are unique by prefixing them with your plugin name:
2. Handle Incremental Ingestion
Use the last_processed
parameter to implement incremental ingestion:
3. Error Handling
Implement robust error handling to prevent plugin failures from affecting the entire build process:
4. Documentation
Document your plugin thoroughly, including:
- Node types and their attributes
- Edge types and their meanings
- Configuration requirements
- Dependencies
5. Testing
Write tests for your plugin to ensure it works correctly:
Plugin Development Workflow
Here’s a step-by-step workflow for developing and testing custom plugins:
1. Set Up Development Environment
2. Implement Your Plugin
Create your plugin implementation in src/arc_memory_myplugin/plugin.py
:
Set up your pyproject.toml
:
4. Install Your Plugin in Development Mode
5. Test Your Plugin
Manual Testing
Look for output like:
Unit Testing
Create a test file tests/test_plugin.py
:
Run the tests:
6. Debugging Plugins
Debug Logging
To see detailed logs during plugin execution:
Look for log messages related to your plugin:
Common Issues and Solutions
-
Plugin Not Discovered:
- Check your entry point configuration in
pyproject.toml
- Verify your plugin class implements all required methods
- Make sure your package is installed (
pip list | grep myplugin
)
-
Plugin Fails During Ingestion:
- Add try/except blocks with detailed logging in your
ingest
method
- Check for API errors or rate limiting if your plugin uses external APIs
- Verify your node and edge creation logic
-
No Data in Graph:
- Verify your plugin is returning non-empty lists of nodes and edges
- Check node IDs for proper formatting (e.g., “custom_node:1”)
- Ensure edge source and destination IDs refer to existing nodes
Interactive Debugging
For more complex issues, use Python’s debugger:
Then run:
7. Packaging and Distribution
Once your plugin is working correctly:
Packaging and Distribution
To package your plugin for distribution:
- Create a Python package with your plugin implementation
- Register it using entry points as shown above
- Publish it to PyPI:
Users can then install your plugin with:
And it will be automatically discovered and used by Arc Memory.