Files
misskey-export/misskey_export.py
2026-03-06 02:02:49 +00:00

240 lines
7.3 KiB
Python

#!/usr/bin/python3
import csv
import json
import os
import requests
from pathlib import Path
# Change these configs
TOKEN = "changeme"
INSTANCE = "http://misskey:3000"
REMOTE_URL = "https://misskey.bubbletea.dev"
def post(path, body=None):
"""Sends a post request to the Misskey instance."""
if body is None:
body = {}
body["i"] = TOKEN
res = requests.post(
f"{INSTANCE}/api{path}",
headers={
"Content-Type": "application/json",
},
data=json.dumps(body),
timeout=10,
)
return res
def fetch_all_local_users():
"""Fetches all local users using admin/show-users."""
print("Fetching all local users...")
all_users = []
offset = 0
limit = 100
while True:
res = post("/admin/show-users", {
"origin": "local",
"limit": limit,
"offset": offset
})
res.raise_for_status()
users = res.json()
if not users:
break
all_users.extend(users)
if len(users) < limit:
break
offset += limit
print(f"Found {len(all_users)} local users.")
return all_users
def fetch_notes(user_id, until_id=None):
"""Fetches all notes from a user."""
body = {
"userId": user_id,
"withReplies": True,
"withRenotes": True,
"withChannelNotes": True,
"limit": 100,
}
if until_id:
body["untilId"] = until_id
res = post("/users/notes", body)
res.raise_for_status()
return res.json()
def export_notes(user_id, base_path):
"""Exports all user notes to posts.csv."""
print(f" Exporting notes for {user_id}...")
with open(base_path / "posts.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["id", "createdAt", "text", "visibility", "renoteId", "replyId", "cw"])
until_id = None
count = 0
while True:
notes = fetch_notes(user_id, until_id)
if not notes:
break
for note in notes:
writer.writerow([
note["id"],
note["createdAt"],
note.get("text"),
note["visibility"],
note.get("renoteId"),
note.get("replyId"),
note.get("cw")
])
count += len(notes)
until_id = notes[-1]["id"]
if len(notes) < 100:
break
print(f" Exported {count} notes.")
def fetch_admin_drive_files(user_id, until_id=None):
"""Fetches drive files for a specific user using admin/drive/files."""
body = {"limit": 100, "userId": user_id}
if until_id:
body["untilId"] = until_id
res = post("/admin/drive/files", body)
res.raise_for_status()
return res.json()
def download_file(file_info, folder_path):
"""Downloads a file from the drive."""
url = file_info["url"]
if "misskey.bubbletea.dev" in url:
url = url.replace(REMOTE_URL, INSTANCE)
file_name = file_info["name"]
safe_name = "".join(c for c in file_name if c.isalnum() or c in "._- ")
if not safe_name:
safe_name = file_info["id"]
dest_path = folder_path / safe_name
counter = 1
original_dest_path = dest_path
while dest_path.exists():
stem = original_dest_path.stem
suffix = original_dest_path.suffix
dest_path = original_dest_path.with_name(f"{stem}_{counter}{suffix}")
counter += 1
try:
res = requests.get(url, stream=True, timeout=30)
res.raise_for_status()
with open(dest_path, "wb") as f:
for chunk in res.iter_content(chunk_size=8192):
f.write(chunk)
except Exception as e:
print(f" Failed to download {file_name}: {e}")
def get_folder_path(folder_id, folder_map):
"""Resolves the full path of a folder from the folder map."""
path_parts = []
curr_id = folder_id
while curr_id and curr_id in folder_map:
folder = folder_map[curr_id]
folder_name = folder.get("name", curr_id)
safe_name = "".join(c for c in folder_name if c.isalnum() or c in "._- ")
if not safe_name:
safe_name = curr_id
path_parts.insert(0, safe_name)
curr_id = folder.get("parentId")
return Path(*path_parts) if path_parts else Path("")
def export_drive_admin(user_id, base_path):
"""Exports all drive files for a user with folder hierarchy."""
print(f" Exporting drive for {user_id}...")
files_base_path = base_path / "files"
if not files_base_path.exists():
files_base_path.mkdir()
all_files = []
folder_map = {}
until_id = None
while True:
files = fetch_admin_drive_files(user_id, until_id)
if not files:
break
for file in files:
all_files.append(file)
# Build folder map from the 'folder' property if available in the packed file
curr = file.get("folder")
while curr:
folder_map[curr["id"]] = curr
# Some Misskey versions might pack the parent as well
curr = curr.get("parent")
if len(files) < 100:
break
until_id = files[-1]["id"]
for file in all_files:
folder_id = file.get("folderId")
rel_folder_path = get_folder_path(folder_id, folder_map)
full_folder_path = files_base_path / rel_folder_path
if not full_folder_path.exists():
full_folder_path.mkdir(parents=True, exist_ok=True)
download_file(file, full_folder_path)
print(f" Exported {len(all_files)} files.")
def export_user_data(user, base_path):
"""Exports user profile data to user-data.txt."""
print(f" Exporting profile data for {user['username']}...")
with open(base_path / "user-data.txt", "w", encoding="utf-8") as f:
f.write(f"Username: {user.get('username')}\n")
f.write(f"Display Name: {user.get('name')}\n")
f.write(f"Created At: {user.get('createdAt')}\n")
f.write(f"Description:\n{user.get('description') or ''}\n\n")
f.write("Fields:\n")
fields = user.get("fields", [])
if fields:
for field in fields:
name = field.get("name", "")
value = field.get("value", "")
f.write(f"- {name}: {value}\n")
else:
f.write("(None)\n")
def main():
try:
users = fetch_all_local_users()
backups_dir = Path("backups")
if not backups_dir.exists():
backups_dir.mkdir()
for user in users:
username = user["username"]
user_id = user["id"]
print(f"Processing user: @{username} ({user_id})")
user_dir = backups_dir / username
if not user_dir.exists():
user_dir.mkdir(parents=True)
export_user_data(user, user_dir)
export_notes(user_id, user_dir)
export_drive_admin(user_id, user_dir) # Admin cannot easily fetch other users' lists/antennas
# as there are no admin/lists or admin/antennas endpoints visible.
print("Bulk export complete!")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()