#!/usr/bin/python3 import csv import json import os import requests import mimetypes import psycopg2 from pathlib import Path # --- CONFIGURATIONS --- TOKEN = "changeme" INTERNAL_INSTANCE = "http://changeme:3000" EXTERNAL_URL = "https://changeme" # Database Constants DB_NAME = "changeme" DB_USER = "changeme" DB_PASS = "changeme" DB_HOST = "changeme" # ---------------------- # Global mapping cache for User ID -> Full Username user_cache = {} def get_db_connection(): """Establishes a connection to the PostgreSQL database.""" return psycopg2.connect( dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST ) def load_user_cache(db_conn): """Pre-loads all user IDs and their full handles into memory.""" print("Loading user handle cache...") with db_conn.cursor() as cur: cur.execute('SELECT id, username, host FROM "user"') for row in cur.fetchall(): u_id, username, host = row full_handle = f"@{username}@{host if host else BASE_DOMAIN}" user_cache[u_id] = full_handle print(f"Cache loaded with {len(user_cache)} handles.") def get_handle(u_id): """Returns the full handle for a user ID, falling back to the ID if not found.""" return user_cache.get(u_id, u_id) def post(path, body=None): """Sends a post request to the Misskey instance.""" if body is None: body = {} body["i"] = TOKEN res = requests.post( f"{INTERNAL_INSTANCE}/api{path}", headers={"Content-Type": "application/json"}, data=json.dumps(body), timeout=15, ) return res def fetch_all_local_users(): """Fetches all local users using admin/show-users.""" print("Fetching all local users...") all_users = [] offset = 0 limit = 100 while True: res = post( "/admin/show-users", {"origin": "local", "limit": limit, "offset": offset} ) res.raise_for_status() users = res.json() if not users: break all_users.extend(users) if len(users) < limit: break offset += limit return all_users def fetch_notes(user_id, until_id=None): """Fetches notes via API.""" body = { "userId": user_id, "withReplies": True, "withRenotes": True, "withChannelNotes": True, "limit": 100, } if until_id: body["untilId"] = until_id res = post("/users/notes", body) res.raise_for_status() return res.json() def export_notes(user_id, base_path): """Exports user notes, replacing IDs with handles.""" print(f" Exporting notes for {get_handle(user_id)}...") with open(base_path / "posts.csv", "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow( ["id", "createdAt", "text", "visibility", "renote", "reply", "cw"] ) until_id = None count = 0 while True: notes = fetch_notes(user_id, until_id) if not notes: break for note in notes: # Resolve renote/reply IDs to handles if they exist renote_handle = "" if note.get("renote"): renote_handle = get_handle(note["renote"].get("userId")) elif note.get("renoteId"): # Fallback if note object isn't fully packed renote_handle = note["renoteId"] reply_handle = "" if note.get("reply"): reply_handle = get_handle(note["reply"].get("userId")) elif note.get("replyId"): reply_handle = note["replyId"] writer.writerow( [ note["id"], note["createdAt"], note.get("text"), note["visibility"], renote_handle, reply_handle, note.get("cw"), ] ) count += len(notes) until_id = notes[-1]["id"] if len(notes) < 100: break print(f" Exported {count} notes.") def fetch_admin_drive_files(user_id, until_id=None): body = {"limit": 100, "userId": user_id} if until_id: body["untilId"] = until_id res = post("/admin/drive/files", body) res.raise_for_status() return res.json() def download_file(file_info, folder_path): url = file_info["url"] if EXTERNAL_URL in url: url = url.replace(EXTERNAL_URL, INTERNAL_INSTANCE) file_name = file_info.get("name", "") extension = file_info.get("extension") mime_type = file_info.get("type") if extension and not file_name.lower().endswith(f".{extension.lower()}"): file_name = f"{file_name}.{extension}" elif mime_type and not "." in file_name: guessed_ext = mimetypes.guess_extension(mime_type) if guessed_ext: file_name = f"{file_name}{guessed_ext}" safe_name = "".join(c for c in file_name if c.isalnum() or c in "._- ") if not safe_name: safe_name = file_info["id"] if extension: safe_name = f"{safe_name}.{extension}" dest_path = folder_path / safe_name counter = 1 original_dest_path = dest_path while dest_path.exists(): dest_path = original_dest_path.with_name( f"{original_dest_path.stem}_{counter}{original_dest_path.suffix}" ) counter += 1 try: res = requests.get(url, stream=True, timeout=30) res.raise_for_status() with open(dest_path, "wb") as f: for chunk in res.iter_content(chunk_size=8192): f.write(chunk) except Exception as e: print(f" Failed to download {file_name}: {e}") def get_folder_path(folder_id, folder_map): path_parts = [] curr_id = folder_id while curr_id and curr_id in folder_map: folder = folder_map[curr_id] folder_name = folder.get("name", curr_id) safe_name = "".join(c for c in folder_name if c.isalnum() or c in "._- ") if not safe_name: safe_name = curr_id path_parts.insert(0, safe_name) curr_id = folder.get("parentId") return Path(*path_parts) if path_parts else Path("") def export_drive_admin(user_id, base_path): print(f" Exporting drive for {get_handle(user_id)}...") files_base_path = base_path / "files" if not files_base_path.exists(): files_base_path.mkdir() all_files = [] folder_map = {} until_id = None while True: files = fetch_admin_drive_files(user_id, until_id) if not files: break for file in files: all_files.append(file) curr = file.get("folder") while curr: folder_map[curr["id"]] = curr curr = curr.get("parent") if len(files) < 100: break until_id = files[-1]["id"] for file in all_files: folder_id = file.get("folderId") rel_folder_path = get_folder_path(folder_id, folder_map) full_folder_path = files_base_path / rel_folder_path if not full_folder_path.exists(): full_folder_path.mkdir(parents=True, exist_ok=True) download_file(file, full_folder_path) def export_user_data(user, base_path): print(f" Exporting profile data for {user['username']}...") with open(base_path / "user-data.txt", "w", encoding="utf-8") as f: f.write(f"Username: {user.get('username')}\n") f.write(f"Full Handle: {get_handle(user['id'])}\n") f.write(f"Display Name: {user.get('name')}\n") f.write(f"Created At: {user.get('createdAt')}\n") f.write(f"Description:\n{user.get('description') or ''}\n\n") f.write("Fields:\n") fields = user.get("fields", []) if fields: for field in fields: f.write(f"- {field.get('name', '')}: {field.get('value', '')}\n") else: f.write("(None)\n") def export_lists_db(user_id, base_path, db_conn): print(" Exporting lists from DB...") with db_conn.cursor() as cur: cur.execute('SELECT id, name FROM user_list WHERE "userId" = %s', (user_id,)) lists = cur.fetchall() with open(base_path / "lists.csv", "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(["id", "name", "members"]) for l in lists: list_id = l[0] cur.execute( 'SELECT "userId" FROM user_list_membership WHERE "userListId" = %s', (list_id,), ) members = cur.fetchall() # Replace Member IDs with Handles m_handles = ",".join(get_handle(m[0]) for m in members) writer.writerow([l[0], l[1], m_handles]) def export_antennas_db(user_id, base_path, db_conn): print(" Exporting antennas from DB...") with db_conn.cursor() as cur: cur.execute( 'SELECT id, name, src, "userListId", keywords, "excludeKeywords", users, "caseSensitive", "withReplies", "withFile" FROM antenna WHERE "userId" = %s', (user_id,), ) antennas = cur.fetchall() with open(base_path / "antennas.csv", "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow( [ "id", "name", "src", "userListId", "keywords", "excludeKeywords", "users", "caseSensitive", "withReplies", "withFile", ] ) for a in antennas: # Resolve IDs in the 'users' list u_list = a[6] if isinstance(a[6], list) else [] u_handles = ",".join(get_handle(u) for u in u_list) k = ( ",".join(str(i) for i in a[4]) if isinstance(a[4], list) else str(a[4] or "") ) ek = ( ",".join(str(i) for i in a[5]) if isinstance(a[5], list) else str(a[5] or "") ) writer.writerow( [a[0], a[1], a[2], a[3], k, ek, u_handles, a[7], a[8], a[9]] ) def main(): db_conn = None try: db_conn = get_db_connection() print("Connected to Misskey Database.") load_user_cache(db_conn) users = fetch_all_local_users() backups_dir = Path("backups") if not backups_dir.exists(): backups_dir.mkdir() for user in users: raw_username = str(user.get("username", "unknown")) username = "".join( c for c in raw_username if c.isalnum() or c in "._-" ).strip() if not username: username = user["id"] user_id = user["id"] print(f"Processing user: {get_handle(user_id)}") user_dir = backups_dir / username if not user_dir.exists(): user_dir.mkdir(parents=True) export_user_data(user, user_dir) export_notes(user_id, user_dir) export_drive_admin(user_id, user_dir) export_lists_db(user_id, user_dir, db_conn) export_antennas_db(user_id, user_dir, db_conn) print("Bulk export complete!") except Exception as e: print(f"An error occurred: {e}") finally: if db_conn: db_conn.close() if __name__ == "__main__": main()