384 lines
13 KiB
Python
384 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Notion CSV → Plane import script.
|
||
|
||
Usage:
|
||
python import.py [--csv PATH] [--config PATH] [--dry-run]
|
||
|
||
Defaults:
|
||
--csv "Tasks 300144872f3480f19272e04d4cf0ee7e.csv"
|
||
--config config.yaml
|
||
"""
|
||
|
||
import argparse
|
||
import csv
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
import requests
|
||
import yaml
|
||
from dateutil import parser as dateparser
|
||
|
||
DEFAULT_CSV = "Tasks 300144872f3480f19272e04d4cf0ee7e.csv"
|
||
DEFAULT_CONFIG = "config.yaml"
|
||
|
||
# Maps lowercased, emoji-stripped priority text to Plane's enum values
|
||
PRIORITY_MAP = {
|
||
"critical": "urgent",
|
||
"high": "high",
|
||
"medium": "medium",
|
||
"low": "low",
|
||
}
|
||
|
||
|
||
def strip_priority_emoji(raw: str) -> str:
|
||
"""Remove leading emoji and whitespace from a priority string, return lowercase."""
|
||
# Remove any leading non-letter characters (emoji, spaces, punctuation)
|
||
stripped = re.sub(r"^[^\w]+", "", raw.strip(), flags=re.UNICODE)
|
||
return stripped.lower()
|
||
|
||
|
||
def parse_priority(raw: str) -> str:
|
||
"""Map Notion priority string to Plane priority enum value."""
|
||
if not raw or not raw.strip():
|
||
return "none"
|
||
key = strip_priority_emoji(raw)
|
||
return PRIORITY_MAP.get(key, "none")
|
||
|
||
|
||
def parse_date(raw: str) -> tuple[str | None, str | None]:
|
||
"""
|
||
Parse a Notion date field.
|
||
|
||
Returns (start_date, end_date) as 'YYYY-MM-DD' strings or None.
|
||
|
||
Single date → (None, date)
|
||
Range 'A → B' → (date_a, date_b)
|
||
Empty → (None, None)
|
||
"""
|
||
if not raw or not raw.strip():
|
||
return None, None
|
||
|
||
# Unicode arrow used by Notion for date ranges
|
||
if " → " in raw:
|
||
parts = raw.split(" → ", 1)
|
||
start = _parse_single_date(parts[0].strip())
|
||
end = _parse_single_date(parts[1].strip())
|
||
return start, end
|
||
|
||
return None, _parse_single_date(raw.strip())
|
||
|
||
|
||
def _parse_single_date(text: str) -> str | None:
|
||
if not text:
|
||
return None
|
||
try:
|
||
dt = dateparser.parse(text, dayfirst=False)
|
||
return dt.strftime("%Y-%m-%d")
|
||
except (ValueError, OverflowError):
|
||
return None
|
||
|
||
|
||
def clamp_estimate(raw: str) -> int | None:
|
||
"""Cast weight to int and clamp to Plane's 0–7 range; None if empty."""
|
||
if not raw or not raw.strip():
|
||
return None
|
||
try:
|
||
value = int(float(raw.strip()))
|
||
return max(0, min(7, value))
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def build_description_html(description: str, result: str) -> str:
|
||
"""Combine description and result into HTML."""
|
||
parts = []
|
||
if description and description.strip():
|
||
parts.append(f"<p>{description.strip()}</p>")
|
||
result_val = clamp_estimate(result) if result else None
|
||
if result_val is not None and result_val != 0:
|
||
parts.append(f"<p><em>Result: {result_val}</em></p>")
|
||
elif result and result.strip() and result.strip() != "0":
|
||
# Non-numeric result text
|
||
parts.append(f"<p><em>Result: {result.strip()}</em></p>")
|
||
return "".join(parts)
|
||
|
||
|
||
def fetch_estimate_point_map(base_url: str, workspace_slug: str, project_id: str, session_id: str) -> dict[str, str]:
|
||
"""
|
||
Fetch estimate points from Plane's internal (non-v1) endpoint using session cookie.
|
||
Returns {value_string: uuid} e.g. {"1": "e7ae...", "2": "157d...", ...}.
|
||
Also includes key-based entries {"key:1": uuid, ...} as fallback.
|
||
"""
|
||
url = f"{base_url.rstrip('/')}/api/workspaces/{workspace_slug}/projects/{project_id}/estimates/"
|
||
resp = requests.get(
|
||
url,
|
||
headers={"accept": "application/json"},
|
||
cookies={"session-id": session_id},
|
||
)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
if not data:
|
||
return {}
|
||
# Prefer the last-used estimate; fall back to first
|
||
estimate = next((e for e in data if e.get("last_used")), data[0])
|
||
result: dict[str, str] = {}
|
||
for pt in estimate.get("points", []):
|
||
result[str(pt["value"])] = pt["id"] # match by display value, e.g. "5"
|
||
result[f"key:{pt['key']}"] = pt["id"] # match by ordinal key, e.g. "key:3"
|
||
return result
|
||
|
||
|
||
def resolve_estimate_point(raw_weight: str, point_map: dict[str, str]) -> str | None:
|
||
"""
|
||
Resolve a Notion вес value to a Plane estimate point UUID.
|
||
Tries exact value match first ("3" → UUID for value "3"),
|
||
then ordinal key match ("3" → UUID for key=3).
|
||
"""
|
||
if not raw_weight or not raw_weight.strip() or not point_map:
|
||
return None
|
||
raw = raw_weight.strip()
|
||
# 1. Exact value match (e.g. вес="5" → point with value="5")
|
||
if raw in point_map:
|
||
return point_map[raw]
|
||
# 2. Ordinal key match (e.g. вес="3" → point with key=3)
|
||
try:
|
||
key = int(float(raw))
|
||
fallback = point_map.get(f"key:{key}")
|
||
if fallback:
|
||
return fallback
|
||
except ValueError:
|
||
pass
|
||
return None
|
||
|
||
|
||
class PlaneClient:
|
||
def __init__(self, base_url: str, workspace_slug: str, project_id: str, api_key: str):
|
||
self.base_url = base_url.rstrip("/")
|
||
self.workspace_slug = workspace_slug
|
||
self.project_id = project_id
|
||
self.session = requests.Session()
|
||
self.session.headers.update({"X-API-Key": api_key})
|
||
|
||
def _project_url(self, *path_parts: str) -> str:
|
||
parts = [
|
||
self.base_url,
|
||
"api/v1/workspaces",
|
||
self.workspace_slug,
|
||
"projects",
|
||
self.project_id,
|
||
*path_parts,
|
||
]
|
||
return "/".join(p.strip("/") for p in parts) + "/"
|
||
|
||
def get_states(self) -> dict[str, str]:
|
||
"""Return {state_name: state_uuid} for all project states."""
|
||
url = self._project_url("states")
|
||
resp = self.session.get(url)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
results = data.get("results", data) if isinstance(data, dict) else data
|
||
return {s["name"]: s["id"] for s in results}
|
||
|
||
def get_labels(self) -> dict[str, str]:
|
||
"""Return {label_name: label_uuid} for all project labels."""
|
||
url = self._project_url("labels")
|
||
resp = self.session.get(url)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
results = data.get("results", data) if isinstance(data, dict) else data
|
||
return {lb["name"]: lb["id"] for lb in results}
|
||
|
||
def create_label(self, name: str) -> str:
|
||
"""Create a label and return its UUID."""
|
||
url = self._project_url("labels")
|
||
resp = self.session.post(url, json={"name": name})
|
||
resp.raise_for_status()
|
||
return resp.json()["id"]
|
||
|
||
def create_issue(self, payload: dict) -> dict:
|
||
"""POST a new work item. Returns the created issue dict."""
|
||
url = self._project_url("issues")
|
||
resp = self.session.post(url, json=payload)
|
||
resp.raise_for_status()
|
||
return resp.json()
|
||
|
||
|
||
def ensure_labels(disciplines: list[str], client: PlaneClient, dry_run: bool) -> dict[str, str]:
|
||
"""
|
||
Ensure all discipline values have a corresponding Plane label.
|
||
Returns {discipline_name: label_uuid}.
|
||
"""
|
||
label_map = client.get_labels() if not dry_run else {}
|
||
for discipline in disciplines:
|
||
if not discipline or discipline in label_map:
|
||
continue
|
||
if dry_run:
|
||
print(f" [dry-run] Would create label: {discipline!r}")
|
||
label_map[discipline] = f"<dry-run-uuid-{discipline}>"
|
||
else:
|
||
new_id = client.create_label(discipline)
|
||
label_map[discipline] = new_id
|
||
print(f" Created label: {discipline!r} → {new_id}")
|
||
return label_map
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description="Import Notion CSV tasks into Plane.")
|
||
parser.add_argument("--csv", default=DEFAULT_CSV, help="Path to Notion CSV export")
|
||
parser.add_argument("--config", default=DEFAULT_CONFIG, help="Path to config.yaml")
|
||
parser.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help="Parse and validate rows without making API calls",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
csv_path = Path(args.csv)
|
||
config_path = Path(args.config)
|
||
|
||
if not csv_path.exists():
|
||
print(f"Error: CSV file not found: {csv_path}", file=sys.stderr)
|
||
sys.exit(1)
|
||
if not config_path.exists():
|
||
print(f"Error: Config file not found: {config_path}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
with config_path.open() as f:
|
||
config = yaml.safe_load(f)
|
||
|
||
plane_url: str = config["plane_url"]
|
||
workspace_slug: str = config["workspace_slug"]
|
||
project_id: str = config["project_id"]
|
||
api_key: str = config["api_key"]
|
||
status_mapping: dict[str, str] = config.get("status_mapping", {})
|
||
plane_session: str | None = config.get("plane_session")
|
||
|
||
client = PlaneClient(plane_url, workspace_slug, project_id, api_key)
|
||
|
||
# Fetch runtime state and label maps (skip in dry-run)
|
||
if args.dry_run:
|
||
print("[dry-run] Skipping API calls for states/labels lookup.\n")
|
||
state_map: dict[str, str] = {}
|
||
label_map: dict[str, str] = {}
|
||
point_map: dict[str, str] = {}
|
||
else:
|
||
print("Fetching project states…")
|
||
state_map = client.get_states()
|
||
print(f" Found {len(state_map)} states: {list(state_map.keys())}")
|
||
|
||
print("Fetching project labels…")
|
||
label_map = client.get_labels()
|
||
print(f" Found {len(label_map)} labels: {list(label_map.keys())}")
|
||
|
||
# Fetch estimate points via internal endpoint (requires plane_session cookie)
|
||
if plane_session:
|
||
print("Fetching estimate points…")
|
||
try:
|
||
point_map = fetch_estimate_point_map(plane_url, workspace_slug, project_id, plane_session)
|
||
values = [k for k in point_map if not k.startswith("key:")]
|
||
print(f" Found {len(values)} estimate points: {values}")
|
||
except Exception as exc:
|
||
print(f" Warning: could not fetch estimate points ({exc}). estimate_point will be skipped.")
|
||
point_map = {}
|
||
else:
|
||
print("No plane_session in config — estimate_point will be skipped.")
|
||
point_map = {}
|
||
|
||
# Read CSV and collect unique disciplines for label pre-creation
|
||
with csv_path.open(encoding="utf-8-sig") as f:
|
||
rows = list(csv.DictReader(f))
|
||
|
||
disciplines = list({r.get("discipline", "").strip() for r in rows if r.get("discipline", "").strip()})
|
||
|
||
if not args.dry_run:
|
||
print(f"\nEnsuring labels for disciplines: {disciplines}")
|
||
label_map = ensure_labels(disciplines, client, dry_run=False)
|
||
else:
|
||
label_map = ensure_labels(disciplines, client, dry_run=True)
|
||
|
||
print(f"\nProcessing {len(rows)} rows…\n")
|
||
|
||
created = 0
|
||
failed = 0
|
||
|
||
for i, row in enumerate(rows, start=1):
|
||
name = row.get("Name", "").strip()
|
||
if not name:
|
||
print(f" Row {i}: skipped (empty Name)")
|
||
continue
|
||
|
||
# Status → state UUID
|
||
notion_status = row.get("Status", "").strip()
|
||
plane_state_name = status_mapping.get(notion_status)
|
||
state_uuid = state_map.get(plane_state_name) if plane_state_name else None
|
||
if notion_status and not state_uuid and not args.dry_run:
|
||
print(f" Row {i} warning: no state mapping for status {notion_status!r}")
|
||
|
||
# Priority
|
||
priority = parse_priority(row.get("priority", ""))
|
||
|
||
# Discipline → label UUID
|
||
discipline = row.get("discipline", "").strip()
|
||
label_uuids: list[str] = []
|
||
if discipline:
|
||
label_uuid = label_map.get(discipline)
|
||
if label_uuid:
|
||
label_uuids = [label_uuid]
|
||
|
||
# Dates
|
||
start_date, target_date = parse_date(row.get("Date", ""))
|
||
|
||
# Description + result
|
||
description_html = build_description_html(
|
||
row.get("Description", ""),
|
||
row.get("результат", ""),
|
||
)
|
||
|
||
# Weight → estimate_point UUID (resolved via internal endpoint)
|
||
estimate_uuid = resolve_estimate_point(row.get("вес", ""), point_map)
|
||
|
||
payload: dict = {"name": name}
|
||
if state_uuid:
|
||
payload["state"] = state_uuid
|
||
payload["priority"] = priority
|
||
if label_uuids:
|
||
payload["label_ids"] = label_uuids
|
||
if target_date:
|
||
payload["target_date"] = target_date
|
||
if start_date:
|
||
payload["start_date"] = start_date
|
||
if description_html:
|
||
payload["description_html"] = description_html
|
||
if estimate_uuid:
|
||
payload["estimate_point"] = estimate_uuid
|
||
|
||
if args.dry_run:
|
||
state_display = plane_state_name or f"(unmapped: {notion_status!r})" if notion_status else "(none)"
|
||
raw_weight = row.get("вес", "").strip()
|
||
print(
|
||
f" [{i:>3}] {name!r}\n"
|
||
f" state={state_display}, priority={priority}, "
|
||
f"labels={label_uuids}, dates={start_date}→{target_date}, "
|
||
f"estimate={raw_weight!r}→{estimate_uuid or '(skipped)'}"
|
||
)
|
||
created += 1
|
||
continue
|
||
|
||
try:
|
||
issue = client.create_issue(payload)
|
||
identifier = issue.get("sequence_id") or issue.get("id", "?")
|
||
print(f" [{i:>3}] Created: #{identifier} — {name!r}")
|
||
created += 1
|
||
except requests.HTTPError as exc:
|
||
body = exc.response.text[:300] if exc.response is not None else ""
|
||
print(f" [{i:>3}] FAILED ({exc.response.status_code if exc.response is not None else '?'}): {name!r} — {body}")
|
||
failed += 1
|
||
|
||
print(f"\n{'[dry-run] ' if args.dry_run else ''}Summary: {created} created, {failed} failed.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|