Post

PDF to Knowledge Graph (Part 4): Automated PDF Pipeline with Watchdog

PDF to Knowledge Graph (Part 4): Automated PDF Pipeline with Watchdog

Part 4 of the PDF to Knowledge Graph series.

Manual processing does not scale. Hundreds of PDFs require processing, and more arrive regularly. This post presents a file-watching system that automatically processes new PDFs through the complete pipeline—conversion, extraction, and graph ingestion—without human intervention.

Motivation

Consider the workflow without automation:

  1. Download PDF
  2. Run MinerU conversion
  3. Wait for completion
  4. Run extraction script
  5. Verify ingestion
  6. Repeat for every document

With Watchdog, the workflow becomes:

  1. Drop PDF in folder
  2. (Remaining steps execute automatically)

Watchdog: Filesystem Monitoring

Watchdog is a Python library that monitors filesystem events—file creation, modification, deletion. When a new PDF appears, the handler processes it immediately.

Installation

1
pip install watchdog

File Watcher Implementation

Basic Structure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
import os

WATCH_DIR = "./input_pdfs"

class NewPDFHandler(FileSystemEventHandler):
    def on_created(self, event):
        if event.is_directory:
            return

        if event.src_path.endswith(".pdf"):
            pdf_name = os.path.basename(event.src_path)
            print(f"\n{'='*60}")
            print(f"NEW PDF DETECTED: {pdf_name}")
            print(f"{'='*60}")

            # Process the PDF
            self.process_pdf(event.src_path)

    def process_pdf(self, pdf_path):
        # Implementation follows
        pass


if __name__ == "__main__":
    os.makedirs(WATCH_DIR, exist_ok=True)

    print(f"Watching {WATCH_DIR} for new PDFs...")
    print("Press Ctrl+C to stop")

    observer = Observer()
    observer.schedule(NewPDFHandler(), path=WATCH_DIR, recursive=False)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nShutting down...")
        observer.stop()

    observer.join()

Handling File Completion

A critical detail: when a file is created, it may still be copying. Processing an incomplete file fails. The system must wait for the file to stabilize:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time

def wait_for_file_complete(path: str, check_interval: float = 0.5, stable_time: float = 2.0):
    """Wait for a file to finish being written."""
    last_size = -1
    stable_count = 0

    while stable_count < (stable_time / check_interval):
        try:
            current_size = os.path.getsize(path)
            if current_size == last_size:
                stable_count += 1
            else:
                stable_count = 0
                last_size = current_size
        except OSError:
            stable_count = 0

        time.sleep(check_interval)

    return True

Complete PDF Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class NewPDFHandler(FileSystemEventHandler):
    def on_created(self, event):
        if event.is_directory:
            return

        if event.src_path.endswith(".pdf"):
            pdf_name = os.path.basename(event.src_path)
            print(f"\n{'='*60}")
            print(f"NEW PDF DETECTED: {pdf_name}")
            print(f"{'='*60}")

            # Wait for file copy to complete
            print("[WAIT] Ensuring file is complete...")
            wait_for_file_complete(event.src_path)

            # Stage 1: Convert PDF to Markdown
            print("[STAGE 1/2] PDF -> Markdown")
            md_path = run_mineru(event.src_path)

            if md_path and os.path.exists(md_path):
                # Stage 2: Extract to Knowledge Graph
                with open(md_path, "r", encoding="utf-8") as f:
                    text = f.read()

                print("[STAGE 2/2] Markdown -> Knowledge Graph")
                extract_knowledge(text, source_name=pdf_name)

                print(f"[DONE] {pdf_name} processed successfully")
            else:
                print(f"[ERROR] Could not convert {pdf_name}")

Batch Processing for Initial Corpus

For existing PDF collections, batch processing is more appropriate than watching:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#!/usr/bin/env python3
"""
Batch process all PDFs in input_pdfs directory.

Usage:
    python batch_process.py                    # Process all
    python batch_process.py --limit 5          # First 5 only
    python batch_process.py --skip-existing    # Skip already converted
    python batch_process.py --conversion-only  # Only convert, no LLM
"""
import os
import argparse
import time

WATCH_DIR = "./input_pdfs"
CONVERTER_OUTPUT_DIR = "./mineru_outputs"

def get_pending_pdfs(skip_existing: bool = False) -> list[str]:
    """Get list of PDFs to process."""
    pdfs = sorted([f for f in os.listdir(WATCH_DIR) if f.endswith('.pdf')])

    if skip_existing:
        pending = []
        for pdf in pdfs:
            pdf_name = pdf.replace('.pdf', '')
            md_path = os.path.join(
                CONVERTER_OUTPUT_DIR,
                pdf_name,
                'auto',
                f'{pdf_name}.md'
            )
            if not os.path.exists(md_path):
                pending.append(pdf)
            else:
                print(f"[SKIP] {pdf} (already converted)")
        return pending

    return pdfs


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--limit", type=int, default=0,
                        help="Maximum number of PDFs to process")
    parser.add_argument("--skip-existing", action="store_true",
                        help="Skip PDFs that have already been converted")
    parser.add_argument("--conversion-only", action="store_true",
                        help="Only run PDF conversion, skip LLM extraction")
    args = parser.parse_args()

    pdfs = get_pending_pdfs(skip_existing=args.skip_existing)

    if args.limit > 0:
        pdfs = pdfs[:args.limit]

    total = len(pdfs)
    print(f"Processing {total} PDFs...")

    results = {"success": [], "failed": []}
    start_time = time.time()

    for i, pdf in enumerate(pdfs):
        pdf_path = os.path.join(WATCH_DIR, pdf)

        print(f"\n[{i+1}/{total}] {pdf}")

        # Convert
        md_path = run_mineru(pdf_path)

        if not md_path:
            print(f"[FAIL] Conversion failed")
            results["failed"].append(pdf)
            continue

        if args.conversion_only:
            results["success"].append(pdf)
            continue

        # Extract
        try:
            with open(md_path, 'r', encoding='utf-8') as f:
                text = f.read()

            extract_knowledge(text, source_name=pdf)
            results["success"].append(pdf)
        except Exception as e:
            print(f"[FAIL] Extraction failed: {e}")
            results["failed"].append(pdf)

    elapsed = time.time() - start_time

    print(f"\n{'='*60}")
    print(f"BATCH COMPLETE")
    print(f"{'='*60}")
    print(f"Total time: {elapsed:.1f}s ({elapsed/60:.1f} minutes)")
    print(f"Success: {len(results['success'])}")
    print(f"Failed:  {len(results['failed'])}")

    if results["failed"]:
        print("\nFailed PDFs:")
        for name in results["failed"]:
            print(f"  - {name}")


if __name__ == "__main__":
    main()

Batch Processing Strategies

Resume on Failure

Use --skip-existing to resume after a crash:

1
2
3
4
5
# Initial run - processes 50 PDFs, crashes at #37
python batch_process.py

# Resume - skips the 36 already converted
python batch_process.py --skip-existing

Staged Processing

For large collections, separate conversion from extraction:

1
2
3
4
5
# Stage 1: Convert all PDFs (faster, less resource-intensive)
python batch_process.py --conversion-only

# Stage 2: Run extraction on converted files
python batch_process.py --skip-existing

Testing with Limits

Verify pipeline functionality before committing to full processing:

1
2
# Test with 3 PDFs first
python batch_process.py --limit 3

Error Handling and Recovery

Robust pipelines handle failures gracefully:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import traceback
from datetime import datetime

LOG_FILE = "./pipeline_errors.log"

def log_error(pdf_name: str, stage: str, error: Exception):
    """Log error with context for debugging."""
    timestamp = datetime.now().isoformat()
    entry = f"""
{'='*60}
[{timestamp}] ERROR in {stage}
PDF: {pdf_name}
Exception: {type(error).__name__}: {str(error)}
{'='*60}
{traceback.format_exc()}
"""
    with open(LOG_FILE, "a") as f:
        f.write(entry)

    print(f"[ERROR] {stage} failed for {pdf_name}. See {LOG_FILE}")


class RobustPDFHandler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.src_path.endswith(".pdf"):
            return

        pdf_name = os.path.basename(event.src_path)

        try:
            wait_for_file_complete(event.src_path)
        except Exception as e:
            log_error(pdf_name, "file_wait", e)
            return

        try:
            md_path = run_mineru(event.src_path)
        except Exception as e:
            log_error(pdf_name, "conversion", e)
            return

        if not md_path:
            log_error(pdf_name, "conversion", Exception("No markdown output"))
            return

        try:
            with open(md_path, "r", encoding="utf-8") as f:
                text = f.read()
            extract_knowledge(text, source_name=pdf_name)
        except Exception as e:
            log_error(pdf_name, "extraction", e)
            return

        print(f"[SUCCESS] {pdf_name}")

Progress Tracking

For batch processing, track progress persistently:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import json
from pathlib import Path

PROGRESS_FILE = "./batch_progress.json"

def load_progress() -> dict:
    """Load processing progress."""
    if Path(PROGRESS_FILE).exists():
        with open(PROGRESS_FILE) as f:
            return json.load(f)
    return {"processed": [], "failed": [], "skipped": []}

def save_progress(progress: dict):
    """Save processing progress."""
    with open(PROGRESS_FILE, "w") as f:
        json.dump(progress, f, indent=2)

def batch_with_progress():
    """Batch process with persistent progress tracking."""
    progress = load_progress()
    already_done = set(progress["processed"] + progress["failed"])

    pdfs = [f for f in os.listdir(WATCH_DIR)
            if f.endswith('.pdf') and f not in already_done]

    print(f"Previously processed: {len(already_done)}")
    print(f"Remaining: {len(pdfs)}")

    for pdf in pdfs:
        pdf_path = os.path.join(WATCH_DIR, pdf)

        try:
            md_path = run_mineru(pdf_path)
            if md_path:
                with open(md_path) as f:
                    extract_knowledge(f.read(), source_name=pdf)
                progress["processed"].append(pdf)
            else:
                progress["failed"].append(pdf)
        except Exception as e:
            print(f"[ERROR] {pdf}: {e}")
            progress["failed"].append(pdf)

        # Save after each file (enables resume)
        save_progress(progress)

    return progress

Systemd Integration

For production deployment, run the watcher as a system service:

Service File

Create /etc/systemd/system/pdf-pipeline.service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[Unit]
Description=PDF Knowledge Graph Pipeline
After=network.target

[Service]
Type=simple
User=derrekito
WorkingDirectory=/home/derrekito/Projects/knowledge_graph_db
Environment="PATH=/home/derrekito/.local/bin:/usr/bin"
ExecStart=/home/derrekito/.local/bin/python graph_builder.py
Restart=always
RestartSec=10

# Logging
StandardOutput=append:/var/log/pdf-pipeline.log
StandardError=append:/var/log/pdf-pipeline.log

[Install]
WantedBy=multi-user.target

Managing the Service

1
2
3
4
5
6
7
8
9
10
11
12
13
# Install and start
sudo systemctl daemon-reload
sudo systemctl enable pdf-pipeline
sudo systemctl start pdf-pipeline

# Check status
sudo systemctl status pdf-pipeline

# View logs
sudo journalctl -u pdf-pipeline -f

# Restart after changes
sudo systemctl restart pdf-pipeline

Log Rotation

Create /etc/logrotate.d/pdf-pipeline:

1
2
3
4
5
6
7
8
9
/var/log/pdf-pipeline.log {
    daily
    rotate 7
    compress
    delaycompress
    missingok
    notifempty
    create 644 derrekito derrekito
}

Complete Watcher Script

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env python3
"""
Knowledge Graph Pipeline: Automatic PDF Processing

Watches input_pdfs/ for new files and automatically:
1. Converts PDF to Markdown (MinerU)
2. Extracts entities and relations (LLM)
3. Stores in graph database (Kuzu)

Usage:
    python graph_builder.py
"""
import os
import sys
import time
import subprocess
import shutil
import kuzu
from datetime import datetime
from typing import List, Literal
from pydantic import BaseModel, Field
import instructor
from openai import OpenAI
from rapidfuzz import process, fuzz
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# === CONFIGURATION ===
WATCH_DIR = "./input_pdfs"
CONVERTER_OUTPUT_DIR = "./mineru_outputs"
DB_PATH = "./kuzu_graph_db"
LLM_MODEL = "qwen2.5:72b"
LLM_TIMEOUT = 120


# === SCHEMA ===
class Entity(BaseModel):
    id: str = Field(..., description="Unique technical name")
    type: Literal["Paper", "Algorithm", "Metric", "Library", "Function"]
    summary: str = Field(..., description="One sentence definition")

class Relation(BaseModel):
    source: str
    target: str
    label: Literal["PROPOSES", "USES", "IMPROVES", "IMPLEMENTS", "CITES"]

class Extraction(BaseModel):
    entities: List[Entity]
    relations: List[Relation]


# === LLM CLIENT ===
client = instructor.patch(OpenAI(
    base_url="http://localhost:11434/v1",
    api_key="ollama",
))


# === DATABASE ===
class KnowledgeBase:
    def __init__(self):
        self.db = kuzu.Database(DB_PATH)
        self.conn = kuzu.Connection(self.db)
        self._init_schema()

    def _init_schema(self):
        try:
            self.conn.execute(
                "CREATE NODE TABLE Entity(id STRING, type STRING, summary STRING, PRIMARY KEY (id))"
            )
            self.conn.execute(
                "CREATE REL TABLE RELATED(FROM Entity TO Entity, label STRING)"
            )
        except RuntimeError:
            pass

    def get_all_entity_ids(self):
        try:
            results = self.conn.execute("MATCH (n:Entity) RETURN n.id").get_as_df()
            return results["n.id"].tolist() if not results.empty else []
        except Exception:
            return []

    def add_data(self, data: Extraction):
        existing_ids = self.get_all_entity_ids()
        id_map = {}

        for entity in data.entities:
            resolved_id = entity.id
            if existing_ids:
                match, score, _ = process.extractOne(
                    entity.id, existing_ids, scorer=fuzz.ratio
                )
                if score > 92:
                    resolved_id = match
                    print(f"   Merged: '{entity.id}' -> '{match}'")

            id_map[entity.id] = resolved_id
            self.conn.execute(
                "MERGE (n:Entity {id: $id}) ON CREATE SET n.type = $type, n.summary = $summary",
                {"id": resolved_id, "type": entity.type, "summary": entity.summary}
            )

        for rel in data.relations:
            if rel.source in id_map and rel.target in id_map:
                src, tgt = id_map[rel.source], id_map[rel.target]
                if src != tgt:
                    self.conn.execute(
                        "MATCH (a:Entity {id: $src}), (b:Entity {id: $tgt}) "
                        "MERGE (a)-[:RELATED {label: $label}]->(b)",
                        {"src": src, "tgt": tgt, "label": rel.label}
                    )


# === CONVERTER ===
def run_mineru(pdf_path: str) -> str | None:
    pdf_name = os.path.basename(pdf_path).replace(".pdf", "")
    expected_md = os.path.join(CONVERTER_OUTPUT_DIR, pdf_name, "auto", f"{pdf_name}.md")

    for cmd in ["mineru", "magic-pdf"]:
        if shutil.which(cmd):
            result = subprocess.run(
                [cmd, "-p", pdf_path, "-o", CONVERTER_OUTPUT_DIR, "-m", "auto"],
                capture_output=True, text=True
            )
            if result.returncode == 0 and os.path.exists(expected_md):
                return expected_md
    return None


# === EXTRACTION ===
def extract_knowledge(text: str, source_name: str = "unknown"):
    chunks = [c for c in text.split("\n## ") if len(c) >= 100]
    if not chunks:
        return

    kb = KnowledgeBase()

    for idx, chunk in enumerate(chunks):
        try:
            extraction = client.chat.completions.create(
                model=LLM_MODEL,
                response_model=Extraction,
                timeout=LLM_TIMEOUT,
                messages=[
                    {"role": "system", "content": "Extract technical entities and relations. Normalize names."},
                    {"role": "user", "content": f"Extract from:\n\n{chunk[:4000]}"}
                ]
            )
            kb.add_data(extraction)
            print(f"[{idx+1}/{len(chunks)}] +{len(extraction.entities)} entities")
        except Exception as e:
            print(f"[{idx+1}/{len(chunks)}] FAILED: {str(e)[:50]}")


# === WATCHDOG ===
def wait_for_file_complete(path: str, timeout: float = 60.0):
    """Wait for file to finish copying."""
    last_size = -1
    stable_time = 0
    start = time.time()

    while time.time() - start < timeout:
        try:
            size = os.path.getsize(path)
            if size == last_size:
                stable_time += 0.5
                if stable_time >= 2.0:
                    return True
            else:
                stable_time = 0
                last_size = size
        except OSError:
            pass
        time.sleep(0.5)

    return False


class PDFHandler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.src_path.endswith(".pdf"):
            return

        pdf_name = os.path.basename(event.src_path)
        timestamp = datetime.now().strftime("%H:%M:%S")

        print(f"\n[{timestamp}] New PDF: {pdf_name}")

        if not wait_for_file_complete(event.src_path):
            print(f"[ERROR] Timeout waiting for {pdf_name}")
            return

        md_path = run_mineru(event.src_path)
        if md_path:
            with open(md_path) as f:
                extract_knowledge(f.read(), pdf_name)
            print(f"[DONE] {pdf_name}")
        else:
            print(f"[ERROR] Conversion failed for {pdf_name}")


if __name__ == "__main__":
    os.makedirs(WATCH_DIR, exist_ok=True)
    os.makedirs(CONVERTER_OUTPUT_DIR, exist_ok=True)

    print(f"{'='*60}")
    print(f"PDF Knowledge Graph Pipeline")
    print(f"{'='*60}")
    print(f"Watch directory: {WATCH_DIR}")
    print(f"Model: {LLM_MODEL}")
    print(f"Database: {DB_PATH}")
    print(f"Press Ctrl+C to stop")
    print(f"{'='*60}")

    observer = Observer()
    observer.schedule(PDFHandler(), WATCH_DIR)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nShutting down...")
        observer.stop()

    observer.join()

Summary

Automation transforms the pipeline from a manual tool into infrastructure. Documents are dropped, knowledge is retrieved—the system handles all intermediate processing.

Key points:

  • Watchdog for real-time processing: Process new documents as they arrive
  • Batch for backlog processing: Handle existing collections efficiently
  • Progress tracking: Resume after failures without reprocessing
  • Systemd for production: Run as a reliable system service
  • Robust error handling: Log failures, continue processing

The next post covers visualizing the graph with vis.js.


Next: Part 5 - Interactive Visualization with vis.js

This post is licensed under CC BY 4.0 by the author.