planedrop/import.py
2026-03-02 15:41:59 +03:00

384 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Notion CSV → Plane import script.
Usage:
python import.py [--csv PATH] [--config PATH] [--dry-run]
Defaults:
--csv "Tasks 300144872f3480f19272e04d4cf0ee7e.csv"
--config config.yaml
"""
import argparse
import csv
import re
import sys
from pathlib import Path
import requests
import yaml
from dateutil import parser as dateparser
DEFAULT_CSV = "Tasks 300144872f3480f19272e04d4cf0ee7e.csv"
DEFAULT_CONFIG = "config.yaml"
# Maps lowercased, emoji-stripped priority text to Plane's enum values
PRIORITY_MAP = {
"critical": "urgent",
"high": "high",
"medium": "medium",
"low": "low",
}
def strip_priority_emoji(raw: str) -> str:
"""Remove leading emoji and whitespace from a priority string, return lowercase."""
# Remove any leading non-letter characters (emoji, spaces, punctuation)
stripped = re.sub(r"^[^\w]+", "", raw.strip(), flags=re.UNICODE)
return stripped.lower()
def parse_priority(raw: str) -> str:
"""Map Notion priority string to Plane priority enum value."""
if not raw or not raw.strip():
return "none"
key = strip_priority_emoji(raw)
return PRIORITY_MAP.get(key, "none")
def parse_date(raw: str) -> tuple[str | None, str | None]:
"""
Parse a Notion date field.
Returns (start_date, end_date) as 'YYYY-MM-DD' strings or None.
Single date → (None, date)
Range 'A → B' → (date_a, date_b)
Empty → (None, None)
"""
if not raw or not raw.strip():
return None, None
# Unicode arrow used by Notion for date ranges
if "" in raw:
parts = raw.split("", 1)
start = _parse_single_date(parts[0].strip())
end = _parse_single_date(parts[1].strip())
return start, end
return None, _parse_single_date(raw.strip())
def _parse_single_date(text: str) -> str | None:
if not text:
return None
try:
dt = dateparser.parse(text, dayfirst=False)
return dt.strftime("%Y-%m-%d")
except (ValueError, OverflowError):
return None
def clamp_estimate(raw: str) -> int | None:
"""Cast weight to int and clamp to Plane's 07 range; None if empty."""
if not raw or not raw.strip():
return None
try:
value = int(float(raw.strip()))
return max(0, min(7, value))
except ValueError:
return None
def build_description_html(description: str, result: str) -> str:
"""Combine description and result into HTML."""
parts = []
if description and description.strip():
parts.append(f"<p>{description.strip()}</p>")
result_val = clamp_estimate(result) if result else None
if result_val is not None and result_val != 0:
parts.append(f"<p><em>Result: {result_val}</em></p>")
elif result and result.strip() and result.strip() != "0":
# Non-numeric result text
parts.append(f"<p><em>Result: {result.strip()}</em></p>")
return "".join(parts)
def fetch_estimate_point_map(base_url: str, workspace_slug: str, project_id: str, session_id: str) -> dict[str, str]:
"""
Fetch estimate points from Plane's internal (non-v1) endpoint using session cookie.
Returns {value_string: uuid} e.g. {"1": "e7ae...", "2": "157d...", ...}.
Also includes key-based entries {"key:1": uuid, ...} as fallback.
"""
url = f"{base_url.rstrip('/')}/api/workspaces/{workspace_slug}/projects/{project_id}/estimates/"
resp = requests.get(
url,
headers={"accept": "application/json"},
cookies={"session-id": session_id},
)
resp.raise_for_status()
data = resp.json()
if not data:
return {}
# Prefer the last-used estimate; fall back to first
estimate = next((e for e in data if e.get("last_used")), data[0])
result: dict[str, str] = {}
for pt in estimate.get("points", []):
result[str(pt["value"])] = pt["id"] # match by display value, e.g. "5"
result[f"key:{pt['key']}"] = pt["id"] # match by ordinal key, e.g. "key:3"
return result
def resolve_estimate_point(raw_weight: str, point_map: dict[str, str]) -> str | None:
"""
Resolve a Notion вес value to a Plane estimate point UUID.
Tries exact value match first ("3" → UUID for value "3"),
then ordinal key match ("3" → UUID for key=3).
"""
if not raw_weight or not raw_weight.strip() or not point_map:
return None
raw = raw_weight.strip()
# 1. Exact value match (e.g. вес="5" → point with value="5")
if raw in point_map:
return point_map[raw]
# 2. Ordinal key match (e.g. вес="3" → point with key=3)
try:
key = int(float(raw))
fallback = point_map.get(f"key:{key}")
if fallback:
return fallback
except ValueError:
pass
return None
class PlaneClient:
def __init__(self, base_url: str, workspace_slug: str, project_id: str, api_key: str):
self.base_url = base_url.rstrip("/")
self.workspace_slug = workspace_slug
self.project_id = project_id
self.session = requests.Session()
self.session.headers.update({"X-API-Key": api_key})
def _project_url(self, *path_parts: str) -> str:
parts = [
self.base_url,
"api/v1/workspaces",
self.workspace_slug,
"projects",
self.project_id,
*path_parts,
]
return "/".join(p.strip("/") for p in parts) + "/"
def get_states(self) -> dict[str, str]:
"""Return {state_name: state_uuid} for all project states."""
url = self._project_url("states")
resp = self.session.get(url)
resp.raise_for_status()
data = resp.json()
results = data.get("results", data) if isinstance(data, dict) else data
return {s["name"]: s["id"] for s in results}
def get_labels(self) -> dict[str, str]:
"""Return {label_name: label_uuid} for all project labels."""
url = self._project_url("labels")
resp = self.session.get(url)
resp.raise_for_status()
data = resp.json()
results = data.get("results", data) if isinstance(data, dict) else data
return {lb["name"]: lb["id"] for lb in results}
def create_label(self, name: str) -> str:
"""Create a label and return its UUID."""
url = self._project_url("labels")
resp = self.session.post(url, json={"name": name})
resp.raise_for_status()
return resp.json()["id"]
def create_issue(self, payload: dict) -> dict:
"""POST a new work item. Returns the created issue dict."""
url = self._project_url("issues")
resp = self.session.post(url, json=payload)
resp.raise_for_status()
return resp.json()
def ensure_labels(disciplines: list[str], client: PlaneClient, dry_run: bool) -> dict[str, str]:
"""
Ensure all discipline values have a corresponding Plane label.
Returns {discipline_name: label_uuid}.
"""
label_map = client.get_labels() if not dry_run else {}
for discipline in disciplines:
if not discipline or discipline in label_map:
continue
if dry_run:
print(f" [dry-run] Would create label: {discipline!r}")
label_map[discipline] = f"<dry-run-uuid-{discipline}>"
else:
new_id = client.create_label(discipline)
label_map[discipline] = new_id
print(f" Created label: {discipline!r}{new_id}")
return label_map
def main() -> None:
parser = argparse.ArgumentParser(description="Import Notion CSV tasks into Plane.")
parser.add_argument("--csv", default=DEFAULT_CSV, help="Path to Notion CSV export")
parser.add_argument("--config", default=DEFAULT_CONFIG, help="Path to config.yaml")
parser.add_argument(
"--dry-run",
action="store_true",
help="Parse and validate rows without making API calls",
)
args = parser.parse_args()
csv_path = Path(args.csv)
config_path = Path(args.config)
if not csv_path.exists():
print(f"Error: CSV file not found: {csv_path}", file=sys.stderr)
sys.exit(1)
if not config_path.exists():
print(f"Error: Config file not found: {config_path}", file=sys.stderr)
sys.exit(1)
with config_path.open() as f:
config = yaml.safe_load(f)
plane_url: str = config["plane_url"]
workspace_slug: str = config["workspace_slug"]
project_id: str = config["project_id"]
api_key: str = config["api_key"]
status_mapping: dict[str, str] = config.get("status_mapping", {})
plane_session: str | None = config.get("plane_session")
client = PlaneClient(plane_url, workspace_slug, project_id, api_key)
# Fetch runtime state and label maps (skip in dry-run)
if args.dry_run:
print("[dry-run] Skipping API calls for states/labels lookup.\n")
state_map: dict[str, str] = {}
label_map: dict[str, str] = {}
point_map: dict[str, str] = {}
else:
print("Fetching project states…")
state_map = client.get_states()
print(f" Found {len(state_map)} states: {list(state_map.keys())}")
print("Fetching project labels…")
label_map = client.get_labels()
print(f" Found {len(label_map)} labels: {list(label_map.keys())}")
# Fetch estimate points via internal endpoint (requires plane_session cookie)
if plane_session:
print("Fetching estimate points…")
try:
point_map = fetch_estimate_point_map(plane_url, workspace_slug, project_id, plane_session)
values = [k for k in point_map if not k.startswith("key:")]
print(f" Found {len(values)} estimate points: {values}")
except Exception as exc:
print(f" Warning: could not fetch estimate points ({exc}). estimate_point will be skipped.")
point_map = {}
else:
print("No plane_session in config — estimate_point will be skipped.")
point_map = {}
# Read CSV and collect unique disciplines for label pre-creation
with csv_path.open(encoding="utf-8-sig") as f:
rows = list(csv.DictReader(f))
disciplines = list({r.get("discipline", "").strip() for r in rows if r.get("discipline", "").strip()})
if not args.dry_run:
print(f"\nEnsuring labels for disciplines: {disciplines}")
label_map = ensure_labels(disciplines, client, dry_run=False)
else:
label_map = ensure_labels(disciplines, client, dry_run=True)
print(f"\nProcessing {len(rows)} rows…\n")
created = 0
failed = 0
for i, row in enumerate(rows, start=1):
name = row.get("Name", "").strip()
if not name:
print(f" Row {i}: skipped (empty Name)")
continue
# Status → state UUID
notion_status = row.get("Status", "").strip()
plane_state_name = status_mapping.get(notion_status)
state_uuid = state_map.get(plane_state_name) if plane_state_name else None
if notion_status and not state_uuid and not args.dry_run:
print(f" Row {i} warning: no state mapping for status {notion_status!r}")
# Priority
priority = parse_priority(row.get("priority", ""))
# Discipline → label UUID
discipline = row.get("discipline", "").strip()
label_uuids: list[str] = []
if discipline:
label_uuid = label_map.get(discipline)
if label_uuid:
label_uuids = [label_uuid]
# Dates
start_date, target_date = parse_date(row.get("Date", ""))
# Description + result
description_html = build_description_html(
row.get("Description", ""),
row.get("результат", ""),
)
# Weight → estimate_point UUID (resolved via internal endpoint)
estimate_uuid = resolve_estimate_point(row.get("вес", ""), point_map)
payload: dict = {"name": name}
if state_uuid:
payload["state"] = state_uuid
payload["priority"] = priority
if label_uuids:
payload["label_ids"] = label_uuids
if target_date:
payload["target_date"] = target_date
if start_date:
payload["start_date"] = start_date
if description_html:
payload["description_html"] = description_html
if estimate_uuid:
payload["estimate_point"] = estimate_uuid
if args.dry_run:
state_display = plane_state_name or f"(unmapped: {notion_status!r})" if notion_status else "(none)"
raw_weight = row.get("вес", "").strip()
print(
f" [{i:>3}] {name!r}\n"
f" state={state_display}, priority={priority}, "
f"labels={label_uuids}, dates={start_date}{target_date}, "
f"estimate={raw_weight!r}{estimate_uuid or '(skipped)'}"
)
created += 1
continue
try:
issue = client.create_issue(payload)
identifier = issue.get("sequence_id") or issue.get("id", "?")
print(f" [{i:>3}] Created: #{identifier}{name!r}")
created += 1
except requests.HTTPError as exc:
body = exc.response.text[:300] if exc.response is not None else ""
print(f" [{i:>3}] FAILED ({exc.response.status_code if exc.response is not None else '?'}): {name!r}{body}")
failed += 1
print(f"\n{'[dry-run] ' if args.dry_run else ''}Summary: {created} created, {failed} failed.")
if __name__ == "__main__":
main()