diff --git a/misskey_export.py b/misskey_export.py index 2f3bce9..a855123 100644 --- a/misskey_export.py +++ b/misskey_export.py @@ -23,15 +23,14 @@ DB_HOST = "changeme" # Global mapping cache for User ID -> Full Username user_cache = {} + def get_db_connection(): """Establishes a connection to the PostgreSQL database.""" return psycopg2.connect( - dbname=DB_NAME, - user=DB_USER, - password=DB_PASS, - host=DB_HOST + dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST ) + def load_user_cache(db_conn): """Pre-loads all user IDs and their full handles into memory.""" print("Loading user handle cache...") @@ -43,10 +42,12 @@ def load_user_cache(db_conn): user_cache[u_id] = full_handle print(f"Cache loaded with {len(user_cache)} handles.") + def get_handle(u_id): """Returns the full handle for a user ID, falling back to the ID if not found.""" return user_cache.get(u_id, u_id) + def post(path, body=None): """Sends a post request to the Misskey instance.""" if body is None: @@ -60,6 +61,7 @@ def post(path, body=None): ) return res + def fetch_all_local_users(): """Fetches all local users using admin/show-users.""" print("Fetching all local users...") @@ -67,35 +69,51 @@ def fetch_all_local_users(): offset = 0 limit = 100 while True: - res = post("/admin/show-users", {"origin": "local", "limit": limit, "offset": offset}) + res = post( + "/admin/show-users", {"origin": "local", "limit": limit, "offset": offset} + ) res.raise_for_status() users = res.json() - if not users: break + if not users: + break all_users.extend(users) - if len(users) < limit: break + if len(users) < limit: + break offset += limit return all_users + def fetch_notes(user_id, until_id=None): """Fetches notes via API.""" - body = {"userId": user_id, "withReplies": True, "withRenotes": True, "withChannelNotes": True, "limit": 100} - if until_id: body["untilId"] = until_id + body = { + "userId": user_id, + "withReplies": True, + "withRenotes": True, + "withChannelNotes": True, + "limit": 100, + } + if until_id: + body["untilId"] = until_id res = post("/users/notes", body) res.raise_for_status() return res.json() + def export_notes(user_id, base_path): """Exports user notes, replacing IDs with handles.""" print(f" Exporting notes for {get_handle(user_id)}...") with open(base_path / "posts.csv", "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) - writer.writerow(["id", "createdAt", "text", "visibility", "renote", "reply", "cw"]) + writer.writerow( + ["id", "createdAt", "text", "visibility", "renote", "reply", "cw"] + ) until_id = None count = 0 while True: notes = fetch_notes(user_id, until_id) - if not notes: break + if not notes: + break for note in notes: # Resolve renote/reply IDs to handles if they exist @@ -112,28 +130,34 @@ def export_notes(user_id, base_path): elif note.get("replyId"): reply_handle = note["replyId"] - writer.writerow([ - note["id"], - note["createdAt"], - note.get("text"), - note["visibility"], - renote_handle, - reply_handle, - note.get("cw") - ]) + writer.writerow( + [ + note["id"], + note["createdAt"], + note.get("text"), + note["visibility"], + renote_handle, + reply_handle, + note.get("cw"), + ] + ) count += len(notes) until_id = notes[-1]["id"] - if len(notes) < 100: break + if len(notes) < 100: + break print(f" Exported {count} notes.") + def fetch_admin_drive_files(user_id, until_id=None): body = {"limit": 100, "userId": user_id} - if until_id: body["untilId"] = until_id + if until_id: + body["untilId"] = until_id res = post("/admin/drive/files", body) res.raise_for_status() return res.json() + def download_file(file_info, folder_path): url = file_info["url"] if EXTERNAL_URL in url: @@ -147,18 +171,22 @@ def download_file(file_info, folder_path): file_name = f"{file_name}.{extension}" elif mime_type and not "." in file_name: guessed_ext = mimetypes.guess_extension(mime_type) - if guessed_ext: file_name = f"{file_name}{guessed_ext}" + if guessed_ext: + file_name = f"{file_name}{guessed_ext}" safe_name = "".join(c for c in file_name if c.isalnum() or c in "._- ") if not safe_name: safe_name = file_info["id"] - if extension: safe_name = f"{safe_name}.{extension}" + if extension: + safe_name = f"{safe_name}.{extension}" dest_path = folder_path / safe_name counter = 1 original_dest_path = dest_path while dest_path.exists(): - dest_path = original_dest_path.with_name(f"{original_dest_path.stem}_{counter}{original_dest_path.suffix}") + dest_path = original_dest_path.with_name( + f"{original_dest_path.stem}_{counter}{original_dest_path.suffix}" + ) counter += 1 try: @@ -170,6 +198,7 @@ def download_file(file_info, folder_path): except Exception as e: print(f" Failed to download {file_name}: {e}") + def get_folder_path(folder_id, folder_map): path_parts = [] curr_id = folder_id @@ -177,34 +206,43 @@ def get_folder_path(folder_id, folder_map): folder = folder_map[curr_id] folder_name = folder.get("name", curr_id) safe_name = "".join(c for c in folder_name if c.isalnum() or c in "._- ") - if not safe_name: safe_name = curr_id + if not safe_name: + safe_name = curr_id path_parts.insert(0, safe_name) curr_id = folder.get("parentId") return Path(*path_parts) if path_parts else Path("") + def export_drive_admin(user_id, base_path): print(f" Exporting drive for {get_handle(user_id)}...") files_base_path = base_path / "files" - if not files_base_path.exists(): files_base_path.mkdir() - all_files = []; folder_map = {}; until_id = None + if not files_base_path.exists(): + files_base_path.mkdir() + all_files = [] + folder_map = {} + until_id = None while True: files = fetch_admin_drive_files(user_id, until_id) - if not files: break + if not files: + break for file in files: all_files.append(file) curr = file.get("folder") while curr: folder_map[curr["id"]] = curr curr = curr.get("parent") - if len(files) < 100: break + if len(files) < 100: + break until_id = files[-1]["id"] for file in all_files: folder_id = file.get("folderId") rel_folder_path = get_folder_path(folder_id, folder_map) full_folder_path = files_base_path / rel_folder_path - if not full_folder_path.exists(): full_folder_path.mkdir(parents=True, exist_ok=True) + if not full_folder_path.exists(): + full_folder_path.mkdir(parents=True, exist_ok=True) download_file(file, full_folder_path) + def export_user_data(user, base_path): print(f" Exporting profile data for {user['username']}...") with open(base_path / "user-data.txt", "w", encoding="utf-8") as f: @@ -218,7 +256,9 @@ def export_user_data(user, base_path): if fields: for field in fields: f.write(f"- {field.get('name', '')}: {field.get('value', '')}\n") - else: f.write("(None)\n") + else: + f.write("(None)\n") + def export_lists_db(user_id, base_path, db_conn): print(" Exporting lists from DB...") @@ -230,28 +270,59 @@ def export_lists_db(user_id, base_path, db_conn): writer.writerow(["id", "name", "members"]) for l in lists: list_id = l[0] - cur.execute('SELECT "userId" FROM user_list_membership WHERE "userListId" = %s', (list_id,)) + cur.execute( + 'SELECT "userId" FROM user_list_membership WHERE "userListId" = %s', + (list_id,), + ) members = cur.fetchall() # Replace Member IDs with Handles m_handles = ",".join(get_handle(m[0]) for m in members) writer.writerow([l[0], l[1], m_handles]) + def export_antennas_db(user_id, base_path, db_conn): print(" Exporting antennas from DB...") with db_conn.cursor() as cur: - cur.execute('SELECT id, name, src, "userListId", keywords, "excludeKeywords", users, "caseSensitive", "withReplies", "withFile" FROM antenna WHERE "userId" = %s', (user_id,)) + cur.execute( + 'SELECT id, name, src, "userListId", keywords, "excludeKeywords", users, "caseSensitive", "withReplies", "withFile" FROM antenna WHERE "userId" = %s', + (user_id,), + ) antennas = cur.fetchall() with open(base_path / "antennas.csv", "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) - writer.writerow(["id", "name", "src", "userListId", "keywords", "excludeKeywords", "users", "caseSensitive", "withReplies", "withFile"]) + writer.writerow( + [ + "id", + "name", + "src", + "userListId", + "keywords", + "excludeKeywords", + "users", + "caseSensitive", + "withReplies", + "withFile", + ] + ) for a in antennas: # Resolve IDs in the 'users' list u_list = a[6] if isinstance(a[6], list) else [] u_handles = ",".join(get_handle(u) for u in u_list) - k = ",".join(str(i) for i in a[4]) if isinstance(a[4], list) else str(a[4] or "") - ek = ",".join(str(i) for i in a[5]) if isinstance(a[5], list) else str(a[5] or "") - writer.writerow([a[0], a[1], a[2], a[3], k, ek, u_handles, a[7], a[8], a[9]]) + k = ( + ",".join(str(i) for i in a[4]) + if isinstance(a[4], list) + else str(a[4] or "") + ) + ek = ( + ",".join(str(i) for i in a[5]) + if isinstance(a[5], list) + else str(a[5] or "") + ) + writer.writerow( + [a[0], a[1], a[2], a[3], k, ek, u_handles, a[7], a[8], a[9]] + ) + def main(): db_conn = None @@ -263,18 +334,23 @@ def main(): users = fetch_all_local_users() backups_dir = Path("backups") - if not backups_dir.exists(): backups_dir.mkdir() + if not backups_dir.exists(): + backups_dir.mkdir() for user in users: raw_username = str(user.get("username", "unknown")) - username = "".join(c for c in raw_username if c.isalnum() or c in "._-").strip() - if not username: username = user["id"] + username = "".join( + c for c in raw_username if c.isalnum() or c in "._-" + ).strip() + if not username: + username = user["id"] user_id = user["id"] print(f"Processing user: {get_handle(user_id)}") user_dir = backups_dir / username - if not user_dir.exists(): user_dir.mkdir(parents=True) + if not user_dir.exists(): + user_dir.mkdir(parents=True) export_user_data(user, user_dir) export_notes(user_id, user_dir) @@ -286,7 +362,9 @@ def main(): except Exception as e: print(f"An error occurred: {e}") finally: - if db_conn: db_conn.close() + if db_conn: + db_conn.close() + if __name__ == "__main__": main()