From 8d82f31516d35346045b74740b32593eaf44e2d7 Mon Sep 17 00:00:00 2001 From: shibao Date: Fri, 6 Mar 2026 02:03:47 +0000 Subject: [PATCH] add other user data back --- misskey_export.py | 237 ++++++++++++++++++++++++++++------------------ requirements.txt | 1 + 2 files changed, 146 insertions(+), 92 deletions(-) diff --git a/misskey_export.py b/misskey_export.py index a8105fa..2f3bce9 100644 --- a/misskey_export.py +++ b/misskey_export.py @@ -4,27 +4,59 @@ import csv import json import os import requests +import mimetypes +import psycopg2 from pathlib import Path -# Change these configs +# --- CONFIGURATIONS --- TOKEN = "changeme" -INSTANCE = "http://misskey:3000" -REMOTE_URL = "https://misskey.bubbletea.dev" +INTERNAL_INSTANCE = "http://changeme:3000" +EXTERNAL_URL = "https://changeme" + +# Database Constants +DB_NAME = "changeme" +DB_USER = "changeme" +DB_PASS = "changeme" +DB_HOST = "changeme" +# ---------------------- + +# Global mapping cache for User ID -> Full Username +user_cache = {} + +def get_db_connection(): + """Establishes a connection to the PostgreSQL database.""" + return psycopg2.connect( + dbname=DB_NAME, + user=DB_USER, + password=DB_PASS, + host=DB_HOST + ) + +def load_user_cache(db_conn): + """Pre-loads all user IDs and their full handles into memory.""" + print("Loading user handle cache...") + with db_conn.cursor() as cur: + cur.execute('SELECT id, username, host FROM "user"') + for row in cur.fetchall(): + u_id, username, host = row + full_handle = f"@{username}@{host if host else BASE_DOMAIN}" + user_cache[u_id] = full_handle + print(f"Cache loaded with {len(user_cache)} handles.") + +def get_handle(u_id): + """Returns the full handle for a user ID, falling back to the ID if not found.""" + return user_cache.get(u_id, u_id) def post(path, body=None): """Sends a post request to the Misskey instance.""" if body is None: body = {} - body["i"] = TOKEN - res = requests.post( - f"{INSTANCE}/api{path}", - headers={ - "Content-Type": "application/json", - }, + f"{INTERNAL_INSTANCE}/api{path}", + headers={"Content-Type": "application/json"}, data=json.dumps(body), - timeout=10, + timeout=15, ) return res @@ -35,97 +67,98 @@ def fetch_all_local_users(): offset = 0 limit = 100 while True: - res = post("/admin/show-users", { - "origin": "local", - "limit": limit, - "offset": offset - }) + res = post("/admin/show-users", {"origin": "local", "limit": limit, "offset": offset}) res.raise_for_status() users = res.json() - if not users: - break + if not users: break all_users.extend(users) - if len(users) < limit: - break + if len(users) < limit: break offset += limit - print(f"Found {len(all_users)} local users.") return all_users def fetch_notes(user_id, until_id=None): - """Fetches all notes from a user.""" - body = { - "userId": user_id, - "withReplies": True, - "withRenotes": True, - "withChannelNotes": True, - "limit": 100, - } - if until_id: - body["untilId"] = until_id - + """Fetches notes via API.""" + body = {"userId": user_id, "withReplies": True, "withRenotes": True, "withChannelNotes": True, "limit": 100} + if until_id: body["untilId"] = until_id res = post("/users/notes", body) res.raise_for_status() return res.json() def export_notes(user_id, base_path): - """Exports all user notes to posts.csv.""" - print(f" Exporting notes for {user_id}...") + """Exports user notes, replacing IDs with handles.""" + print(f" Exporting notes for {get_handle(user_id)}...") with open(base_path / "posts.csv", "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) - writer.writerow(["id", "createdAt", "text", "visibility", "renoteId", "replyId", "cw"]) + writer.writerow(["id", "createdAt", "text", "visibility", "renote", "reply", "cw"]) until_id = None count = 0 while True: notes = fetch_notes(user_id, until_id) - if not notes: - break + if not notes: break for note in notes: + # Resolve renote/reply IDs to handles if they exist + renote_handle = "" + if note.get("renote"): + renote_handle = get_handle(note["renote"].get("userId")) + elif note.get("renoteId"): + # Fallback if note object isn't fully packed + renote_handle = note["renoteId"] + + reply_handle = "" + if note.get("reply"): + reply_handle = get_handle(note["reply"].get("userId")) + elif note.get("replyId"): + reply_handle = note["replyId"] + writer.writerow([ note["id"], note["createdAt"], note.get("text"), note["visibility"], - note.get("renoteId"), - note.get("replyId"), + renote_handle, + reply_handle, note.get("cw") ]) count += len(notes) until_id = notes[-1]["id"] - if len(notes) < 100: - break + if len(notes) < 100: break print(f" Exported {count} notes.") def fetch_admin_drive_files(user_id, until_id=None): - """Fetches drive files for a specific user using admin/drive/files.""" body = {"limit": 100, "userId": user_id} - if until_id: - body["untilId"] = until_id + if until_id: body["untilId"] = until_id res = post("/admin/drive/files", body) res.raise_for_status() return res.json() def download_file(file_info, folder_path): - """Downloads a file from the drive.""" url = file_info["url"] - if "misskey.bubbletea.dev" in url: - url = url.replace(REMOTE_URL, INSTANCE) + if EXTERNAL_URL in url: + url = url.replace(EXTERNAL_URL, INTERNAL_INSTANCE) + + file_name = file_info.get("name", "") + extension = file_info.get("extension") + mime_type = file_info.get("type") + + if extension and not file_name.lower().endswith(f".{extension.lower()}"): + file_name = f"{file_name}.{extension}" + elif mime_type and not "." in file_name: + guessed_ext = mimetypes.guess_extension(mime_type) + if guessed_ext: file_name = f"{file_name}{guessed_ext}" - file_name = file_info["name"] safe_name = "".join(c for c in file_name if c.isalnum() or c in "._- ") if not safe_name: safe_name = file_info["id"] + if extension: safe_name = f"{safe_name}.{extension}" dest_path = folder_path / safe_name - counter = 1 original_dest_path = dest_path while dest_path.exists(): - stem = original_dest_path.stem - suffix = original_dest_path.suffix - dest_path = original_dest_path.with_name(f"{stem}_{counter}{suffix}") + dest_path = original_dest_path.with_name(f"{original_dest_path.stem}_{counter}{original_dest_path.suffix}") counter += 1 try: @@ -138,102 +171,122 @@ def download_file(file_info, folder_path): print(f" Failed to download {file_name}: {e}") def get_folder_path(folder_id, folder_map): - """Resolves the full path of a folder from the folder map.""" path_parts = [] curr_id = folder_id while curr_id and curr_id in folder_map: folder = folder_map[curr_id] folder_name = folder.get("name", curr_id) safe_name = "".join(c for c in folder_name if c.isalnum() or c in "._- ") - if not safe_name: - safe_name = curr_id + if not safe_name: safe_name = curr_id path_parts.insert(0, safe_name) curr_id = folder.get("parentId") return Path(*path_parts) if path_parts else Path("") def export_drive_admin(user_id, base_path): - """Exports all drive files for a user with folder hierarchy.""" - print(f" Exporting drive for {user_id}...") + print(f" Exporting drive for {get_handle(user_id)}...") files_base_path = base_path / "files" - if not files_base_path.exists(): - files_base_path.mkdir() - - all_files = [] - folder_map = {} - - until_id = None + if not files_base_path.exists(): files_base_path.mkdir() + all_files = []; folder_map = {}; until_id = None while True: files = fetch_admin_drive_files(user_id, until_id) - if not files: - break + if not files: break for file in files: all_files.append(file) - # Build folder map from the 'folder' property if available in the packed file curr = file.get("folder") while curr: folder_map[curr["id"]] = curr - # Some Misskey versions might pack the parent as well curr = curr.get("parent") - - if len(files) < 100: - break + if len(files) < 100: break until_id = files[-1]["id"] - for file in all_files: folder_id = file.get("folderId") rel_folder_path = get_folder_path(folder_id, folder_map) full_folder_path = files_base_path / rel_folder_path - - if not full_folder_path.exists(): - full_folder_path.mkdir(parents=True, exist_ok=True) - + if not full_folder_path.exists(): full_folder_path.mkdir(parents=True, exist_ok=True) download_file(file, full_folder_path) - print(f" Exported {len(all_files)} files.") def export_user_data(user, base_path): - """Exports user profile data to user-data.txt.""" print(f" Exporting profile data for {user['username']}...") with open(base_path / "user-data.txt", "w", encoding="utf-8") as f: f.write(f"Username: {user.get('username')}\n") + f.write(f"Full Handle: {get_handle(user['id'])}\n") f.write(f"Display Name: {user.get('name')}\n") f.write(f"Created At: {user.get('createdAt')}\n") f.write(f"Description:\n{user.get('description') or ''}\n\n") - f.write("Fields:\n") fields = user.get("fields", []) if fields: for field in fields: - name = field.get("name", "") - value = field.get("value", "") - f.write(f"- {name}: {value}\n") - else: - f.write("(None)\n") + f.write(f"- {field.get('name', '')}: {field.get('value', '')}\n") + else: f.write("(None)\n") + +def export_lists_db(user_id, base_path, db_conn): + print(" Exporting lists from DB...") + with db_conn.cursor() as cur: + cur.execute('SELECT id, name FROM user_list WHERE "userId" = %s', (user_id,)) + lists = cur.fetchall() + with open(base_path / "lists.csv", "w", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + writer.writerow(["id", "name", "members"]) + for l in lists: + list_id = l[0] + cur.execute('SELECT "userId" FROM user_list_membership WHERE "userListId" = %s', (list_id,)) + members = cur.fetchall() + # Replace Member IDs with Handles + m_handles = ",".join(get_handle(m[0]) for m in members) + writer.writerow([l[0], l[1], m_handles]) + +def export_antennas_db(user_id, base_path, db_conn): + print(" Exporting antennas from DB...") + with db_conn.cursor() as cur: + cur.execute('SELECT id, name, src, "userListId", keywords, "excludeKeywords", users, "caseSensitive", "withReplies", "withFile" FROM antenna WHERE "userId" = %s', (user_id,)) + antennas = cur.fetchall() + with open(base_path / "antennas.csv", "w", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + writer.writerow(["id", "name", "src", "userListId", "keywords", "excludeKeywords", "users", "caseSensitive", "withReplies", "withFile"]) + for a in antennas: + # Resolve IDs in the 'users' list + u_list = a[6] if isinstance(a[6], list) else [] + u_handles = ",".join(get_handle(u) for u in u_list) + + k = ",".join(str(i) for i in a[4]) if isinstance(a[4], list) else str(a[4] or "") + ek = ",".join(str(i) for i in a[5]) if isinstance(a[5], list) else str(a[5] or "") + writer.writerow([a[0], a[1], a[2], a[3], k, ek, u_handles, a[7], a[8], a[9]]) def main(): + db_conn = None try: - users = fetch_all_local_users() + db_conn = get_db_connection() + print("Connected to Misskey Database.") + load_user_cache(db_conn) + + users = fetch_all_local_users() backups_dir = Path("backups") - if not backups_dir.exists(): - backups_dir.mkdir() + if not backups_dir.exists(): backups_dir.mkdir() for user in users: - username = user["username"] + raw_username = str(user.get("username", "unknown")) + username = "".join(c for c in raw_username if c.isalnum() or c in "._-").strip() + if not username: username = user["id"] + user_id = user["id"] - print(f"Processing user: @{username} ({user_id})") + print(f"Processing user: {get_handle(user_id)}") user_dir = backups_dir / username - if not user_dir.exists(): - user_dir.mkdir(parents=True) + if not user_dir.exists(): user_dir.mkdir(parents=True) export_user_data(user, user_dir) export_notes(user_id, user_dir) - export_drive_admin(user_id, user_dir) # Admin cannot easily fetch other users' lists/antennas - # as there are no admin/lists or admin/antennas endpoints visible. + export_drive_admin(user_id, user_dir) + export_lists_db(user_id, user_dir, db_conn) + export_antennas_db(user_id, user_dir, db_conn) print("Bulk export complete!") except Exception as e: print(f"An error occurred: {e}") + finally: + if db_conn: db_conn.close() if __name__ == "__main__": main() diff --git a/requirements.txt b/requirements.txt index bccde58..7bb3867 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ Requests +psycopg2-binary