add other user data back

This commit is contained in:
2026-03-06 02:03:47 +00:00
parent 08f343a361
commit 8d82f31516
2 changed files with 146 additions and 92 deletions

View File

@@ -4,27 +4,59 @@ import csv
import json import json
import os import os
import requests import requests
import mimetypes
import psycopg2
from pathlib import Path from pathlib import Path
# Change these configs # --- CONFIGURATIONS ---
TOKEN = "changeme" TOKEN = "changeme"
INSTANCE = "http://misskey:3000" INTERNAL_INSTANCE = "http://changeme:3000"
REMOTE_URL = "https://misskey.bubbletea.dev" EXTERNAL_URL = "https://changeme"
# Database Constants
DB_NAME = "changeme"
DB_USER = "changeme"
DB_PASS = "changeme"
DB_HOST = "changeme"
# ----------------------
# Global mapping cache for User ID -> Full Username
user_cache = {}
def get_db_connection():
"""Establishes a connection to the PostgreSQL database."""
return psycopg2.connect(
dbname=DB_NAME,
user=DB_USER,
password=DB_PASS,
host=DB_HOST
)
def load_user_cache(db_conn):
"""Pre-loads all user IDs and their full handles into memory."""
print("Loading user handle cache...")
with db_conn.cursor() as cur:
cur.execute('SELECT id, username, host FROM "user"')
for row in cur.fetchall():
u_id, username, host = row
full_handle = f"@{username}@{host if host else BASE_DOMAIN}"
user_cache[u_id] = full_handle
print(f"Cache loaded with {len(user_cache)} handles.")
def get_handle(u_id):
"""Returns the full handle for a user ID, falling back to the ID if not found."""
return user_cache.get(u_id, u_id)
def post(path, body=None): def post(path, body=None):
"""Sends a post request to the Misskey instance.""" """Sends a post request to the Misskey instance."""
if body is None: if body is None:
body = {} body = {}
body["i"] = TOKEN body["i"] = TOKEN
res = requests.post( res = requests.post(
f"{INSTANCE}/api{path}", f"{INTERNAL_INSTANCE}/api{path}",
headers={ headers={"Content-Type": "application/json"},
"Content-Type": "application/json",
},
data=json.dumps(body), data=json.dumps(body),
timeout=10, timeout=15,
) )
return res return res
@@ -35,97 +67,98 @@ def fetch_all_local_users():
offset = 0 offset = 0
limit = 100 limit = 100
while True: while True:
res = post("/admin/show-users", { res = post("/admin/show-users", {"origin": "local", "limit": limit, "offset": offset})
"origin": "local",
"limit": limit,
"offset": offset
})
res.raise_for_status() res.raise_for_status()
users = res.json() users = res.json()
if not users: if not users: break
break
all_users.extend(users) all_users.extend(users)
if len(users) < limit: if len(users) < limit: break
break
offset += limit offset += limit
print(f"Found {len(all_users)} local users.")
return all_users return all_users
def fetch_notes(user_id, until_id=None): def fetch_notes(user_id, until_id=None):
"""Fetches all notes from a user.""" """Fetches notes via API."""
body = { body = {"userId": user_id, "withReplies": True, "withRenotes": True, "withChannelNotes": True, "limit": 100}
"userId": user_id, if until_id: body["untilId"] = until_id
"withReplies": True,
"withRenotes": True,
"withChannelNotes": True,
"limit": 100,
}
if until_id:
body["untilId"] = until_id
res = post("/users/notes", body) res = post("/users/notes", body)
res.raise_for_status() res.raise_for_status()
return res.json() return res.json()
def export_notes(user_id, base_path): def export_notes(user_id, base_path):
"""Exports all user notes to posts.csv.""" """Exports user notes, replacing IDs with handles."""
print(f" Exporting notes for {user_id}...") print(f" Exporting notes for {get_handle(user_id)}...")
with open(base_path / "posts.csv", "w", newline="", encoding="utf-8") as f: with open(base_path / "posts.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f) writer = csv.writer(f)
writer.writerow(["id", "createdAt", "text", "visibility", "renoteId", "replyId", "cw"]) writer.writerow(["id", "createdAt", "text", "visibility", "renote", "reply", "cw"])
until_id = None until_id = None
count = 0 count = 0
while True: while True:
notes = fetch_notes(user_id, until_id) notes = fetch_notes(user_id, until_id)
if not notes: if not notes: break
break
for note in notes: for note in notes:
# Resolve renote/reply IDs to handles if they exist
renote_handle = ""
if note.get("renote"):
renote_handle = get_handle(note["renote"].get("userId"))
elif note.get("renoteId"):
# Fallback if note object isn't fully packed
renote_handle = note["renoteId"]
reply_handle = ""
if note.get("reply"):
reply_handle = get_handle(note["reply"].get("userId"))
elif note.get("replyId"):
reply_handle = note["replyId"]
writer.writerow([ writer.writerow([
note["id"], note["id"],
note["createdAt"], note["createdAt"],
note.get("text"), note.get("text"),
note["visibility"], note["visibility"],
note.get("renoteId"), renote_handle,
note.get("replyId"), reply_handle,
note.get("cw") note.get("cw")
]) ])
count += len(notes) count += len(notes)
until_id = notes[-1]["id"] until_id = notes[-1]["id"]
if len(notes) < 100: if len(notes) < 100: break
break
print(f" Exported {count} notes.") print(f" Exported {count} notes.")
def fetch_admin_drive_files(user_id, until_id=None): def fetch_admin_drive_files(user_id, until_id=None):
"""Fetches drive files for a specific user using admin/drive/files."""
body = {"limit": 100, "userId": user_id} body = {"limit": 100, "userId": user_id}
if until_id: if until_id: body["untilId"] = until_id
body["untilId"] = until_id
res = post("/admin/drive/files", body) res = post("/admin/drive/files", body)
res.raise_for_status() res.raise_for_status()
return res.json() return res.json()
def download_file(file_info, folder_path): def download_file(file_info, folder_path):
"""Downloads a file from the drive."""
url = file_info["url"] url = file_info["url"]
if "misskey.bubbletea.dev" in url: if EXTERNAL_URL in url:
url = url.replace(REMOTE_URL, INSTANCE) url = url.replace(EXTERNAL_URL, INTERNAL_INSTANCE)
file_name = file_info.get("name", "")
extension = file_info.get("extension")
mime_type = file_info.get("type")
if extension and not file_name.lower().endswith(f".{extension.lower()}"):
file_name = f"{file_name}.{extension}"
elif mime_type and not "." in file_name:
guessed_ext = mimetypes.guess_extension(mime_type)
if guessed_ext: file_name = f"{file_name}{guessed_ext}"
file_name = file_info["name"]
safe_name = "".join(c for c in file_name if c.isalnum() or c in "._- ") safe_name = "".join(c for c in file_name if c.isalnum() or c in "._- ")
if not safe_name: if not safe_name:
safe_name = file_info["id"] safe_name = file_info["id"]
if extension: safe_name = f"{safe_name}.{extension}"
dest_path = folder_path / safe_name dest_path = folder_path / safe_name
counter = 1 counter = 1
original_dest_path = dest_path original_dest_path = dest_path
while dest_path.exists(): while dest_path.exists():
stem = original_dest_path.stem dest_path = original_dest_path.with_name(f"{original_dest_path.stem}_{counter}{original_dest_path.suffix}")
suffix = original_dest_path.suffix
dest_path = original_dest_path.with_name(f"{stem}_{counter}{suffix}")
counter += 1 counter += 1
try: try:
@@ -138,102 +171,122 @@ def download_file(file_info, folder_path):
print(f" Failed to download {file_name}: {e}") print(f" Failed to download {file_name}: {e}")
def get_folder_path(folder_id, folder_map): def get_folder_path(folder_id, folder_map):
"""Resolves the full path of a folder from the folder map."""
path_parts = [] path_parts = []
curr_id = folder_id curr_id = folder_id
while curr_id and curr_id in folder_map: while curr_id and curr_id in folder_map:
folder = folder_map[curr_id] folder = folder_map[curr_id]
folder_name = folder.get("name", curr_id) folder_name = folder.get("name", curr_id)
safe_name = "".join(c for c in folder_name if c.isalnum() or c in "._- ") safe_name = "".join(c for c in folder_name if c.isalnum() or c in "._- ")
if not safe_name: if not safe_name: safe_name = curr_id
safe_name = curr_id
path_parts.insert(0, safe_name) path_parts.insert(0, safe_name)
curr_id = folder.get("parentId") curr_id = folder.get("parentId")
return Path(*path_parts) if path_parts else Path("") return Path(*path_parts) if path_parts else Path("")
def export_drive_admin(user_id, base_path): def export_drive_admin(user_id, base_path):
"""Exports all drive files for a user with folder hierarchy.""" print(f" Exporting drive for {get_handle(user_id)}...")
print(f" Exporting drive for {user_id}...")
files_base_path = base_path / "files" files_base_path = base_path / "files"
if not files_base_path.exists(): if not files_base_path.exists(): files_base_path.mkdir()
files_base_path.mkdir() all_files = []; folder_map = {}; until_id = None
all_files = []
folder_map = {}
until_id = None
while True: while True:
files = fetch_admin_drive_files(user_id, until_id) files = fetch_admin_drive_files(user_id, until_id)
if not files: if not files: break
break
for file in files: for file in files:
all_files.append(file) all_files.append(file)
# Build folder map from the 'folder' property if available in the packed file
curr = file.get("folder") curr = file.get("folder")
while curr: while curr:
folder_map[curr["id"]] = curr folder_map[curr["id"]] = curr
# Some Misskey versions might pack the parent as well
curr = curr.get("parent") curr = curr.get("parent")
if len(files) < 100: break
if len(files) < 100:
break
until_id = files[-1]["id"] until_id = files[-1]["id"]
for file in all_files: for file in all_files:
folder_id = file.get("folderId") folder_id = file.get("folderId")
rel_folder_path = get_folder_path(folder_id, folder_map) rel_folder_path = get_folder_path(folder_id, folder_map)
full_folder_path = files_base_path / rel_folder_path full_folder_path = files_base_path / rel_folder_path
if not full_folder_path.exists(): full_folder_path.mkdir(parents=True, exist_ok=True)
if not full_folder_path.exists():
full_folder_path.mkdir(parents=True, exist_ok=True)
download_file(file, full_folder_path) download_file(file, full_folder_path)
print(f" Exported {len(all_files)} files.")
def export_user_data(user, base_path): def export_user_data(user, base_path):
"""Exports user profile data to user-data.txt."""
print(f" Exporting profile data for {user['username']}...") print(f" Exporting profile data for {user['username']}...")
with open(base_path / "user-data.txt", "w", encoding="utf-8") as f: with open(base_path / "user-data.txt", "w", encoding="utf-8") as f:
f.write(f"Username: {user.get('username')}\n") f.write(f"Username: {user.get('username')}\n")
f.write(f"Full Handle: {get_handle(user['id'])}\n")
f.write(f"Display Name: {user.get('name')}\n") f.write(f"Display Name: {user.get('name')}\n")
f.write(f"Created At: {user.get('createdAt')}\n") f.write(f"Created At: {user.get('createdAt')}\n")
f.write(f"Description:\n{user.get('description') or ''}\n\n") f.write(f"Description:\n{user.get('description') or ''}\n\n")
f.write("Fields:\n") f.write("Fields:\n")
fields = user.get("fields", []) fields = user.get("fields", [])
if fields: if fields:
for field in fields: for field in fields:
name = field.get("name", "") f.write(f"- {field.get('name', '')}: {field.get('value', '')}\n")
value = field.get("value", "") else: f.write("(None)\n")
f.write(f"- {name}: {value}\n")
else: def export_lists_db(user_id, base_path, db_conn):
f.write("(None)\n") print(" Exporting lists from DB...")
with db_conn.cursor() as cur:
cur.execute('SELECT id, name FROM user_list WHERE "userId" = %s', (user_id,))
lists = cur.fetchall()
with open(base_path / "lists.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["id", "name", "members"])
for l in lists:
list_id = l[0]
cur.execute('SELECT "userId" FROM user_list_membership WHERE "userListId" = %s', (list_id,))
members = cur.fetchall()
# Replace Member IDs with Handles
m_handles = ",".join(get_handle(m[0]) for m in members)
writer.writerow([l[0], l[1], m_handles])
def export_antennas_db(user_id, base_path, db_conn):
print(" Exporting antennas from DB...")
with db_conn.cursor() as cur:
cur.execute('SELECT id, name, src, "userListId", keywords, "excludeKeywords", users, "caseSensitive", "withReplies", "withFile" FROM antenna WHERE "userId" = %s', (user_id,))
antennas = cur.fetchall()
with open(base_path / "antennas.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["id", "name", "src", "userListId", "keywords", "excludeKeywords", "users", "caseSensitive", "withReplies", "withFile"])
for a in antennas:
# Resolve IDs in the 'users' list
u_list = a[6] if isinstance(a[6], list) else []
u_handles = ",".join(get_handle(u) for u in u_list)
k = ",".join(str(i) for i in a[4]) if isinstance(a[4], list) else str(a[4] or "")
ek = ",".join(str(i) for i in a[5]) if isinstance(a[5], list) else str(a[5] or "")
writer.writerow([a[0], a[1], a[2], a[3], k, ek, u_handles, a[7], a[8], a[9]])
def main(): def main():
db_conn = None
try: try:
users = fetch_all_local_users() db_conn = get_db_connection()
print("Connected to Misskey Database.")
load_user_cache(db_conn)
users = fetch_all_local_users()
backups_dir = Path("backups") backups_dir = Path("backups")
if not backups_dir.exists(): if not backups_dir.exists(): backups_dir.mkdir()
backups_dir.mkdir()
for user in users: for user in users:
username = user["username"] raw_username = str(user.get("username", "unknown"))
username = "".join(c for c in raw_username if c.isalnum() or c in "._-").strip()
if not username: username = user["id"]
user_id = user["id"] user_id = user["id"]
print(f"Processing user: @{username} ({user_id})") print(f"Processing user: {get_handle(user_id)}")
user_dir = backups_dir / username user_dir = backups_dir / username
if not user_dir.exists(): if not user_dir.exists(): user_dir.mkdir(parents=True)
user_dir.mkdir(parents=True)
export_user_data(user, user_dir) export_user_data(user, user_dir)
export_notes(user_id, user_dir) export_notes(user_id, user_dir)
export_drive_admin(user_id, user_dir) # Admin cannot easily fetch other users' lists/antennas export_drive_admin(user_id, user_dir)
# as there are no admin/lists or admin/antennas endpoints visible. export_lists_db(user_id, user_dir, db_conn)
export_antennas_db(user_id, user_dir, db_conn)
print("Bulk export complete!") print("Bulk export complete!")
except Exception as e: except Exception as e:
print(f"An error occurred: {e}") print(f"An error occurred: {e}")
finally:
if db_conn: db_conn.close()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -1 +1,2 @@
Requests Requests
psycopg2-binary