import os
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
from arc_memory.plugins import IngestorPlugin
from arc_memory.schema.models import Node, Edge, NodeType, EdgeRel
class NotionIngestor(IngestorPlugin):
"""Ingestor plugin for Notion pages and databases."""
def get_name(self) -> str:
"""Return the name of this plugin."""
return "notion"
def get_node_types(self) -> List[str]:
"""Return the node types this plugin can create."""
return ["notion_page", "notion_database"]
def get_edge_types(self) -> List[str]:
"""Return the edge types this plugin can create."""
return [EdgeRel.MENTIONS, "CONTAINS"]
def ingest(
self,
last_processed: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Node], List[Edge], Dict[str, Any]]:
"""Ingest data from Notion."""
nodes = []
edges = []
# Get Notion API token from environment or keyring
notion_token = os.environ.get("NOTION_TOKEN")
if not notion_token:
try:
import keyring
notion_token = keyring.get_password("arc_memory", "notion_token")
except:
pass
if not notion_token:
print("Notion token not found. Skipping Notion ingestion.")
return [], [], {"last_sync": None}
# Initialize Notion client
from notion_client import Client
notion = Client(auth=notion_token)
# Get last sync time for incremental ingestion
last_sync = None
if last_processed and "last_sync" in last_processed:
last_sync = last_processed["last_sync"]
# Fetch pages from Notion
try:
# Query for pages
response = notion.search(
filter={"property": "object", "value": "page"},
sort={"direction": "descending", "timestamp": "last_edited_time"}
)
# Process pages
for page in response["results"]:
# Skip if this page was processed in a previous run
if last_sync and page["last_edited_time"] <= last_sync:
continue
# Create a node for this page
page_id = page["id"].replace("-", "")
node = Node(
id=f"notion_page:{page_id}",
type="notion_page",
title=self._get_page_title(page),
body=self._get_page_content(page),
ts=datetime.fromisoformat(page["last_edited_time"].replace("Z", "+00:00")),
extra={
"url": page["url"],
"created_time": page["created_time"],
"last_edited_time": page["last_edited_time"],
"notion_id": page["id"]
}
)
nodes.append(node)
# Look for mentions of Git commits, PRs, or issues
self._extract_mentions(node, edges)
# Create metadata for incremental builds
metadata = {
"last_sync": datetime.now().isoformat()
}
return nodes, edges, metadata
except Exception as e:
print(f"Error ingesting Notion data: {e}")
return [], [], {"last_sync": last_sync}
def _get_page_title(self, page):
"""Extract the title from a Notion page."""
# Implementation depends on Notion API structure
# This is a simplified example
if "properties" in page and "title" in page["properties"]:
title_property = page["properties"]["title"]
if "title" in title_property and title_property["title"]:
return title_property["title"][0]["plain_text"]
return "Untitled"
def _get_page_content(self, page):
"""Extract the content from a Notion page."""
# In a real implementation, you would use the Notion API to get blocks
# This is a simplified example
return f"Notion page content for {page['id']}"
def _extract_mentions(self, node, edges):
"""Extract mentions of Git commits, PRs, or issues from page content."""
# In a real implementation, you would parse the content for patterns like:
# - Commit: abc123
# - PR #42
# - Issue #123
# This is a simplified example
# Example: If we find a mention of a commit
edges.append(Edge(
src=node.id,
dst="commit:abc123",
rel=EdgeRel.MENTIONS
))