initial commit
This commit is contained in:
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
env/
|
||||
backups/
|
||||
endpoints/
|
||||
2
.tool-versions
Normal file
2
.tool-versions
Normal file
@@ -0,0 +1,2 @@
|
||||
python 3.13.0
|
||||
nodejs 25.8.0
|
||||
189
misskey_export.py
Normal file
189
misskey_export.py
Normal file
@@ -0,0 +1,189 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import csv
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
from pathlib import Path
|
||||
|
||||
# Change these configs
|
||||
TOKEN = "changeme"
|
||||
INSTANCE = "http://misskey:3000"
|
||||
REMOTE_URL = "https://misskey.bubbletea.dev"
|
||||
|
||||
def post(path, body=None):
|
||||
"""Sends a post request to the Misskey instance."""
|
||||
if body is None:
|
||||
body = {}
|
||||
|
||||
body["i"] = TOKEN
|
||||
|
||||
res = requests.post(
|
||||
f"{INSTANCE}/api{path}",
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
data=json.dumps(body),
|
||||
timeout=10,
|
||||
)
|
||||
return res
|
||||
|
||||
def fetch_all_local_users():
|
||||
"""Fetches all local users using admin/show-users."""
|
||||
print("Fetching all local users...")
|
||||
all_users = []
|
||||
offset = 0
|
||||
limit = 100
|
||||
while True:
|
||||
res = post("/admin/show-users", {
|
||||
"origin": "local",
|
||||
"limit": limit,
|
||||
"offset": offset
|
||||
})
|
||||
res.raise_for_status()
|
||||
users = res.json()
|
||||
if not users:
|
||||
break
|
||||
all_users.extend(users)
|
||||
if len(users) < limit:
|
||||
break
|
||||
offset += limit
|
||||
print(f"Found {len(all_users)} local users.")
|
||||
return all_users
|
||||
|
||||
def fetch_notes(user_id, until_id=None):
|
||||
"""Fetches all notes from a user."""
|
||||
body = {
|
||||
"userId": user_id,
|
||||
"withReplies": True,
|
||||
"withRenotes": True,
|
||||
"withChannelNotes": True,
|
||||
"limit": 100,
|
||||
}
|
||||
if until_id:
|
||||
body["untilId"] = until_id
|
||||
|
||||
res = post("/users/notes", body)
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def export_notes(user_id, base_path):
|
||||
"""Exports all user notes to posts.csv."""
|
||||
print(f" Exporting notes for {user_id}...")
|
||||
with open(base_path / "posts.csv", "w", newline="", encoding="utf-8") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(["id", "createdAt", "text", "visibility", "renoteId", "replyId", "cw"])
|
||||
|
||||
until_id = None
|
||||
count = 0
|
||||
while True:
|
||||
notes = fetch_notes(user_id, until_id)
|
||||
if not notes:
|
||||
break
|
||||
|
||||
for note in notes:
|
||||
writer.writerow([
|
||||
note["id"],
|
||||
note["createdAt"],
|
||||
note.get("text"),
|
||||
note["visibility"],
|
||||
note.get("renoteId"),
|
||||
note.get("replyId"),
|
||||
note.get("cw")
|
||||
])
|
||||
|
||||
count += len(notes)
|
||||
until_id = notes[-1]["id"]
|
||||
if len(notes) < 100:
|
||||
break
|
||||
print(f" Exported {count} notes.")
|
||||
|
||||
def fetch_admin_drive_files(user_id, until_id=None):
|
||||
"""Fetches drive files for a specific user using admin/drive/files."""
|
||||
body = {"limit": 100, "userId": user_id}
|
||||
if until_id:
|
||||
body["untilId"] = until_id
|
||||
res = post("/admin/drive/files", body)
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def download_file(file_info, folder_path):
|
||||
"""Downloads a file from the drive."""
|
||||
url = file_info["url"]
|
||||
if "misskey.bubbletea.dev" in url:
|
||||
url = url.replace(REMOTE_URL, INSTANCE)
|
||||
|
||||
file_name = file_info["name"]
|
||||
safe_name = "".join(c for c in file_name if c.isalnum() or c in "._- ")
|
||||
if not safe_name:
|
||||
safe_name = file_info["id"]
|
||||
|
||||
dest_path = folder_path / safe_name
|
||||
|
||||
counter = 1
|
||||
original_dest_path = dest_path
|
||||
while dest_path.exists():
|
||||
stem = original_dest_path.stem
|
||||
suffix = original_dest_path.suffix
|
||||
dest_path = original_dest_path.with_name(f"{stem}_{counter}{suffix}")
|
||||
counter += 1
|
||||
|
||||
try:
|
||||
res = requests.get(url, stream=True, timeout=30)
|
||||
res.raise_for_status()
|
||||
with open(dest_path, "wb") as f:
|
||||
for chunk in res.iter_content(chunk_size=8192):
|
||||
f.write(chunk)
|
||||
except Exception as e:
|
||||
print(f" Failed to download {file_name}: {e}")
|
||||
|
||||
def export_drive_admin(user_id, base_path):
|
||||
"""Exports all drive files for a user using admin endpoint."""
|
||||
print(f" Exporting drive for {user_id}...")
|
||||
files_path = base_path / "files"
|
||||
if not files_path.exists():
|
||||
files_path.mkdir()
|
||||
|
||||
until_id = None
|
||||
count = 0
|
||||
while True:
|
||||
files = fetch_admin_drive_files(user_id, until_id)
|
||||
if not files:
|
||||
break
|
||||
for file in files:
|
||||
download_file(file, files_path)
|
||||
count += 1
|
||||
|
||||
if len(files) < 100:
|
||||
break
|
||||
until_id = files[-1]["id"]
|
||||
print(f" Exported {count} files.")
|
||||
|
||||
def main():
|
||||
try:
|
||||
users = fetch_all_local_users()
|
||||
|
||||
backups_dir = Path("backups")
|
||||
if not backups_dir.exists():
|
||||
backups_dir.mkdir()
|
||||
|
||||
for user in users:
|
||||
username = user["username"]
|
||||
user_id = user["id"]
|
||||
print(f"Processing user: @{username} ({user_id})")
|
||||
|
||||
user_dir = backups_dir / username
|
||||
if not user_dir.exists():
|
||||
user_dir.mkdir(parents=True)
|
||||
|
||||
export_notes(user_id, user_dir)
|
||||
export_drive_admin(user_id, user_dir)
|
||||
# Admin cannot easily fetch other users' lists/antennas
|
||||
# as there are no admin/lists or admin/antennas endpoints visible.
|
||||
|
||||
print("Bulk export complete!")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
1
requirements.txt
Normal file
1
requirements.txt
Normal file
@@ -0,0 +1 @@
|
||||
Requests
|
||||
Reference in New Issue
Block a user