From 4b8cf3fafb49abb4c07b217253419092042df49f Mon Sep 17 00:00:00 2001 From: shibao Date: Fri, 6 Mar 2026 03:04:16 +0000 Subject: [PATCH] add attachment paths to posts csv --- misskey_export.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/misskey_export.py b/misskey_export.py index a855123..6385ec8 100644 --- a/misskey_export.py +++ b/misskey_export.py @@ -99,13 +99,13 @@ def fetch_notes(user_id, until_id=None): return res.json() -def export_notes(user_id, base_path): +def export_notes(user_id, base_path, file_id_to_path): """Exports user notes, replacing IDs with handles.""" print(f" Exporting notes for {get_handle(user_id)}...") with open(base_path / "posts.csv", "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow( - ["id", "createdAt", "text", "visibility", "renote", "reply", "cw"] + ["id", "createdAt", "text", "visibility", "renote", "reply", "cw", "files"] ) until_id = None @@ -130,6 +130,13 @@ def export_notes(user_id, base_path): elif note.get("replyId"): reply_handle = note["replyId"] + # Resolve file paths + file_paths = [] + for file in note.get("files", []): + f_id = file.get("id") + if f_id in file_id_to_path: + file_paths.append(f"files/{file_id_to_path[f_id]}") + writer.writerow( [ note["id"], @@ -139,6 +146,7 @@ def export_notes(user_id, base_path): renote_handle, reply_handle, note.get("cw"), + ", ".join(file_paths), ] ) @@ -184,6 +192,10 @@ def download_file(file_info, folder_path): counter = 1 original_dest_path = dest_path while dest_path.exists(): + # Heuristic: If it exists and matches size, assume it's already downloaded + if dest_path.stat().st_size == file_info.get("size"): + return dest_path.name + dest_path = original_dest_path.with_name( f"{original_dest_path.stem}_{counter}{original_dest_path.suffix}" ) @@ -195,8 +207,10 @@ def download_file(file_info, folder_path): with open(dest_path, "wb") as f: for chunk in res.iter_content(chunk_size=8192): f.write(chunk) + return dest_path.name except Exception as e: print(f" Failed to download {file_name}: {e}") + return None def get_folder_path(folder_id, folder_map): @@ -234,13 +248,17 @@ def export_drive_admin(user_id, base_path): if len(files) < 100: break until_id = files[-1]["id"] + file_id_to_path = {} for file in all_files: folder_id = file.get("folderId") rel_folder_path = get_folder_path(folder_id, folder_map) full_folder_path = files_base_path / rel_folder_path if not full_folder_path.exists(): full_folder_path.mkdir(parents=True, exist_ok=True) - download_file(file, full_folder_path) + final_name = download_file(file, full_folder_path) + if final_name: + file_id_to_path[file["id"]] = str(rel_folder_path / final_name) + return file_id_to_path def export_user_data(user, base_path): @@ -353,8 +371,8 @@ def main(): user_dir.mkdir(parents=True) export_user_data(user, user_dir) - export_notes(user_id, user_dir) - export_drive_admin(user_id, user_dir) + file_id_to_path = export_drive_admin(user_id, user_dir) + export_notes(user_id, user_dir, file_id_to_path) export_lists_db(user_id, user_dir, db_conn) export_antennas_db(user_id, user_dir, db_conn)