Compare commits
5 Commits
17652eb249
...
ea7dc75c10
| Author | SHA1 | Date | |
|---|---|---|---|
| ea7dc75c10 | |||
| ce10cf8e29 | |||
| 8d82f31516 | |||
| 08f343a361 | |||
| aff6f2870d |
65
README.md
Normal file
65
README.md
Normal file
@@ -0,0 +1,65 @@
|
||||
# Misskey Bulk Export & Backup Tool
|
||||
|
||||
This tool is designed to perform a comprehensive, server-side backup of all local users on a Misskey instance. It utilizes both the Misskey Admin API and direct PostgreSQL database access to export data that is typically difficult to retrieve through standard means.
|
||||
|
||||
## Features
|
||||
|
||||
- **Full User List Export**: Iterates through every local user on the instance.
|
||||
- **Notes & Posts**: Exports all notes (public, home, followers, and private), including replies and renotes, to `posts.csv`.
|
||||
- **Drive Backup**:
|
||||
- Reconstructs the original folder hierarchy from the user's Drive.
|
||||
- Preserves original file extensions by guessing from MIME types or using API metadata.
|
||||
- Automatically handles filename collisions within the same folder.
|
||||
- **Database-Direct Export**:
|
||||
- **User Lists**: Exports all private and public user lists to `lists.csv`.
|
||||
- **Antennas**: Exports all antenna configurations to `antennas.csv`.
|
||||
- **Handle Resolution**: Replaces internal Misskey IDs with full `@username@host` handles in all CSV exports (posts, lists, antennas) for better readability.
|
||||
- **Profile Data**: Exports user bio, display name, and custom profile fields to `user-data.txt`.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- **Python 3.13+**
|
||||
- **psycopg2-binary**: For PostgreSQL connection.
|
||||
- **Requests**: For API interaction.
|
||||
- **Direct Network Access**: The script is configured to talk to the Misskey service via `http://misskey:3000` to bypass Nginx restrictions.
|
||||
- **Database Access**: Access to the Misskey PostgreSQL container/service (configured as `misskey-db`).
|
||||
|
||||
## Setup
|
||||
|
||||
1. Install dependencies:
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
2. Verify the constants in `misskey_export.py`:
|
||||
- `TOKEN`: A Misskey API token with Admin/Moderator permissions.
|
||||
- `INTERNAL_INSTANCE`: The internal address of your Misskey container (default: `http://misskey:3000`).
|
||||
- `DB_HOST`: The hostname of your database container (default: `misskey-db`).
|
||||
|
||||
## Usage
|
||||
|
||||
Run the script using the system Python:
|
||||
|
||||
```bash
|
||||
python3 misskey_export.py
|
||||
```
|
||||
|
||||
The script will create a `backups/` directory in the current working directory, organized by username:
|
||||
|
||||
```text
|
||||
backups/
|
||||
├── user1/
|
||||
│ ├── user-data.txt
|
||||
│ ├── posts.csv
|
||||
│ ├── lists.csv
|
||||
│ ├── antennas.csv
|
||||
│ └── files/
|
||||
│ ├── Subfolder_A/
|
||||
│ └── image.png
|
||||
└── user2/
|
||||
└── ...
|
||||
```
|
||||
|
||||
## Security Note
|
||||
|
||||
This script contains sensitive database credentials and an admin API token. Ensure that `misskey_export.py` is stored securely and is not accessible to unauthorized users.
|
||||
@@ -4,30 +4,64 @@ import csv
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import mimetypes
|
||||
import psycopg2
|
||||
from pathlib import Path
|
||||
|
||||
# Change these configs
|
||||
# --- CONFIGURATIONS ---
|
||||
TOKEN = "changeme"
|
||||
INSTANCE = "http://misskey:3000"
|
||||
REMOTE_URL = "https://misskey.bubbletea.dev"
|
||||
INTERNAL_INSTANCE = "http://changeme:3000"
|
||||
EXTERNAL_URL = "https://changeme"
|
||||
|
||||
# Database Constants
|
||||
DB_NAME = "changeme"
|
||||
DB_USER = "changeme"
|
||||
DB_PASS = "changeme"
|
||||
DB_HOST = "changeme"
|
||||
# ----------------------
|
||||
|
||||
# Global mapping cache for User ID -> Full Username
|
||||
user_cache = {}
|
||||
|
||||
|
||||
def get_db_connection():
|
||||
"""Establishes a connection to the PostgreSQL database."""
|
||||
return psycopg2.connect(
|
||||
dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST
|
||||
)
|
||||
|
||||
|
||||
def load_user_cache(db_conn):
|
||||
"""Pre-loads all user IDs and their full handles into memory."""
|
||||
print("Loading user handle cache...")
|
||||
with db_conn.cursor() as cur:
|
||||
cur.execute('SELECT id, username, host FROM "user"')
|
||||
for row in cur.fetchall():
|
||||
u_id, username, host = row
|
||||
full_handle = f"@{username}@{host if host else BASE_DOMAIN}"
|
||||
user_cache[u_id] = full_handle
|
||||
print(f"Cache loaded with {len(user_cache)} handles.")
|
||||
|
||||
|
||||
def get_handle(u_id):
|
||||
"""Returns the full handle for a user ID, falling back to the ID if not found."""
|
||||
return user_cache.get(u_id, u_id)
|
||||
|
||||
|
||||
def post(path, body=None):
|
||||
"""Sends a post request to the Misskey instance."""
|
||||
if body is None:
|
||||
body = {}
|
||||
|
||||
body["i"] = TOKEN
|
||||
|
||||
res = requests.post(
|
||||
f"{INSTANCE}/api{path}",
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
f"{INTERNAL_INSTANCE}/api{path}",
|
||||
headers={"Content-Type": "application/json"},
|
||||
data=json.dumps(body),
|
||||
timeout=10,
|
||||
timeout=15,
|
||||
)
|
||||
return res
|
||||
|
||||
|
||||
def fetch_all_local_users():
|
||||
"""Fetches all local users using admin/show-users."""
|
||||
print("Fetching all local users...")
|
||||
@@ -35,11 +69,9 @@ def fetch_all_local_users():
|
||||
offset = 0
|
||||
limit = 100
|
||||
while True:
|
||||
res = post("/admin/show-users", {
|
||||
"origin": "local",
|
||||
"limit": limit,
|
||||
"offset": offset
|
||||
})
|
||||
res = post(
|
||||
"/admin/show-users", {"origin": "local", "limit": limit, "offset": offset}
|
||||
)
|
||||
res.raise_for_status()
|
||||
users = res.json()
|
||||
if not users:
|
||||
@@ -48,11 +80,11 @@ def fetch_all_local_users():
|
||||
if len(users) < limit:
|
||||
break
|
||||
offset += limit
|
||||
print(f"Found {len(all_users)} local users.")
|
||||
return all_users
|
||||
|
||||
|
||||
def fetch_notes(user_id, until_id=None):
|
||||
"""Fetches all notes from a user."""
|
||||
"""Fetches notes via API."""
|
||||
body = {
|
||||
"userId": user_id,
|
||||
"withReplies": True,
|
||||
@@ -62,17 +94,19 @@ def fetch_notes(user_id, until_id=None):
|
||||
}
|
||||
if until_id:
|
||||
body["untilId"] = until_id
|
||||
|
||||
res = post("/users/notes", body)
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
|
||||
def export_notes(user_id, base_path):
|
||||
"""Exports all user notes to posts.csv."""
|
||||
print(f" Exporting notes for {user_id}...")
|
||||
"""Exports user notes, replacing IDs with handles."""
|
||||
print(f" Exporting notes for {get_handle(user_id)}...")
|
||||
with open(base_path / "posts.csv", "w", newline="", encoding="utf-8") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(["id", "createdAt", "text", "visibility", "renoteId", "replyId", "cw"])
|
||||
writer.writerow(
|
||||
["id", "createdAt", "text", "visibility", "renote", "reply", "cw"]
|
||||
)
|
||||
|
||||
until_id = None
|
||||
count = 0
|
||||
@@ -82,15 +116,31 @@ def export_notes(user_id, base_path):
|
||||
break
|
||||
|
||||
for note in notes:
|
||||
writer.writerow([
|
||||
# Resolve renote/reply IDs to handles if they exist
|
||||
renote_handle = ""
|
||||
if note.get("renote"):
|
||||
renote_handle = get_handle(note["renote"].get("userId"))
|
||||
elif note.get("renoteId"):
|
||||
# Fallback if note object isn't fully packed
|
||||
renote_handle = note["renoteId"]
|
||||
|
||||
reply_handle = ""
|
||||
if note.get("reply"):
|
||||
reply_handle = get_handle(note["reply"].get("userId"))
|
||||
elif note.get("replyId"):
|
||||
reply_handle = note["replyId"]
|
||||
|
||||
writer.writerow(
|
||||
[
|
||||
note["id"],
|
||||
note["createdAt"],
|
||||
note.get("text"),
|
||||
note["visibility"],
|
||||
note.get("renoteId"),
|
||||
note.get("replyId"),
|
||||
note.get("cw")
|
||||
])
|
||||
renote_handle,
|
||||
reply_handle,
|
||||
note.get("cw"),
|
||||
]
|
||||
)
|
||||
|
||||
count += len(notes)
|
||||
until_id = notes[-1]["id"]
|
||||
@@ -98,8 +148,8 @@ def export_notes(user_id, base_path):
|
||||
break
|
||||
print(f" Exported {count} notes.")
|
||||
|
||||
|
||||
def fetch_admin_drive_files(user_id, until_id=None):
|
||||
"""Fetches drive files for a specific user using admin/drive/files."""
|
||||
body = {"limit": 100, "userId": user_id}
|
||||
if until_id:
|
||||
body["untilId"] = until_id
|
||||
@@ -107,25 +157,36 @@ def fetch_admin_drive_files(user_id, until_id=None):
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def download_file(file_info, folder_path):
|
||||
"""Downloads a file from the drive."""
|
||||
url = file_info["url"]
|
||||
if "misskey.bubbletea.dev" in url:
|
||||
url = url.replace(REMOTE_URL, INSTANCE)
|
||||
|
||||
file_name = file_info["name"]
|
||||
def download_file(file_info, folder_path):
|
||||
url = file_info["url"]
|
||||
if EXTERNAL_URL in url:
|
||||
url = url.replace(EXTERNAL_URL, INTERNAL_INSTANCE)
|
||||
|
||||
file_name = file_info.get("name", "")
|
||||
extension = file_info.get("extension")
|
||||
mime_type = file_info.get("type")
|
||||
|
||||
if extension and not file_name.lower().endswith(f".{extension.lower()}"):
|
||||
file_name = f"{file_name}.{extension}"
|
||||
elif mime_type and not "." in file_name:
|
||||
guessed_ext = mimetypes.guess_extension(mime_type)
|
||||
if guessed_ext:
|
||||
file_name = f"{file_name}{guessed_ext}"
|
||||
|
||||
safe_name = "".join(c for c in file_name if c.isalnum() or c in "._- ")
|
||||
if not safe_name:
|
||||
safe_name = file_info["id"]
|
||||
if extension:
|
||||
safe_name = f"{safe_name}.{extension}"
|
||||
|
||||
dest_path = folder_path / safe_name
|
||||
|
||||
counter = 1
|
||||
original_dest_path = dest_path
|
||||
while dest_path.exists():
|
||||
stem = original_dest_path.stem
|
||||
suffix = original_dest_path.suffix
|
||||
dest_path = original_dest_path.with_name(f"{stem}_{counter}{suffix}")
|
||||
dest_path = original_dest_path.with_name(
|
||||
f"{original_dest_path.stem}_{counter}{original_dest_path.suffix}"
|
||||
)
|
||||
counter += 1
|
||||
|
||||
try:
|
||||
@@ -137,53 +198,173 @@ def download_file(file_info, folder_path):
|
||||
except Exception as e:
|
||||
print(f" Failed to download {file_name}: {e}")
|
||||
|
||||
def export_drive_admin(user_id, base_path):
|
||||
"""Exports all drive files for a user using admin endpoint."""
|
||||
print(f" Exporting drive for {user_id}...")
|
||||
files_path = base_path / "files"
|
||||
if not files_path.exists():
|
||||
files_path.mkdir()
|
||||
|
||||
def get_folder_path(folder_id, folder_map):
|
||||
path_parts = []
|
||||
curr_id = folder_id
|
||||
while curr_id and curr_id in folder_map:
|
||||
folder = folder_map[curr_id]
|
||||
folder_name = folder.get("name", curr_id)
|
||||
safe_name = "".join(c for c in folder_name if c.isalnum() or c in "._- ")
|
||||
if not safe_name:
|
||||
safe_name = curr_id
|
||||
path_parts.insert(0, safe_name)
|
||||
curr_id = folder.get("parentId")
|
||||
return Path(*path_parts) if path_parts else Path("")
|
||||
|
||||
|
||||
def export_drive_admin(user_id, base_path):
|
||||
print(f" Exporting drive for {get_handle(user_id)}...")
|
||||
files_base_path = base_path / "files"
|
||||
if not files_base_path.exists():
|
||||
files_base_path.mkdir()
|
||||
all_files = []
|
||||
folder_map = {}
|
||||
until_id = None
|
||||
count = 0
|
||||
while True:
|
||||
files = fetch_admin_drive_files(user_id, until_id)
|
||||
if not files:
|
||||
break
|
||||
for file in files:
|
||||
download_file(file, files_path)
|
||||
count += 1
|
||||
|
||||
all_files.append(file)
|
||||
curr = file.get("folder")
|
||||
while curr:
|
||||
folder_map[curr["id"]] = curr
|
||||
curr = curr.get("parent")
|
||||
if len(files) < 100:
|
||||
break
|
||||
until_id = files[-1]["id"]
|
||||
print(f" Exported {count} files.")
|
||||
for file in all_files:
|
||||
folder_id = file.get("folderId")
|
||||
rel_folder_path = get_folder_path(folder_id, folder_map)
|
||||
full_folder_path = files_base_path / rel_folder_path
|
||||
if not full_folder_path.exists():
|
||||
full_folder_path.mkdir(parents=True, exist_ok=True)
|
||||
download_file(file, full_folder_path)
|
||||
|
||||
|
||||
def export_user_data(user, base_path):
|
||||
print(f" Exporting profile data for {user['username']}...")
|
||||
with open(base_path / "user-data.txt", "w", encoding="utf-8") as f:
|
||||
f.write(f"Username: {user.get('username')}\n")
|
||||
f.write(f"Full Handle: {get_handle(user['id'])}\n")
|
||||
f.write(f"Display Name: {user.get('name')}\n")
|
||||
f.write(f"Created At: {user.get('createdAt')}\n")
|
||||
f.write(f"Description:\n{user.get('description') or ''}\n\n")
|
||||
f.write("Fields:\n")
|
||||
fields = user.get("fields", [])
|
||||
if fields:
|
||||
for field in fields:
|
||||
f.write(f"- {field.get('name', '')}: {field.get('value', '')}\n")
|
||||
else:
|
||||
f.write("(None)\n")
|
||||
|
||||
|
||||
def export_lists_db(user_id, base_path, db_conn):
|
||||
print(" Exporting lists from DB...")
|
||||
with db_conn.cursor() as cur:
|
||||
cur.execute('SELECT id, name FROM user_list WHERE "userId" = %s', (user_id,))
|
||||
lists = cur.fetchall()
|
||||
with open(base_path / "lists.csv", "w", newline="", encoding="utf-8") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(["id", "name", "members"])
|
||||
for l in lists:
|
||||
list_id = l[0]
|
||||
cur.execute(
|
||||
'SELECT "userId" FROM user_list_membership WHERE "userListId" = %s',
|
||||
(list_id,),
|
||||
)
|
||||
members = cur.fetchall()
|
||||
# Replace Member IDs with Handles
|
||||
m_handles = ",".join(get_handle(m[0]) for m in members)
|
||||
writer.writerow([l[0], l[1], m_handles])
|
||||
|
||||
|
||||
def export_antennas_db(user_id, base_path, db_conn):
|
||||
print(" Exporting antennas from DB...")
|
||||
with db_conn.cursor() as cur:
|
||||
cur.execute(
|
||||
'SELECT id, name, src, "userListId", keywords, "excludeKeywords", users, "caseSensitive", "withReplies", "withFile" FROM antenna WHERE "userId" = %s',
|
||||
(user_id,),
|
||||
)
|
||||
antennas = cur.fetchall()
|
||||
with open(base_path / "antennas.csv", "w", newline="", encoding="utf-8") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(
|
||||
[
|
||||
"id",
|
||||
"name",
|
||||
"src",
|
||||
"userListId",
|
||||
"keywords",
|
||||
"excludeKeywords",
|
||||
"users",
|
||||
"caseSensitive",
|
||||
"withReplies",
|
||||
"withFile",
|
||||
]
|
||||
)
|
||||
for a in antennas:
|
||||
# Resolve IDs in the 'users' list
|
||||
u_list = a[6] if isinstance(a[6], list) else []
|
||||
u_handles = ",".join(get_handle(u) for u in u_list)
|
||||
|
||||
k = (
|
||||
",".join(str(i) for i in a[4])
|
||||
if isinstance(a[4], list)
|
||||
else str(a[4] or "")
|
||||
)
|
||||
ek = (
|
||||
",".join(str(i) for i in a[5])
|
||||
if isinstance(a[5], list)
|
||||
else str(a[5] or "")
|
||||
)
|
||||
writer.writerow(
|
||||
[a[0], a[1], a[2], a[3], k, ek, u_handles, a[7], a[8], a[9]]
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
db_conn = None
|
||||
try:
|
||||
users = fetch_all_local_users()
|
||||
db_conn = get_db_connection()
|
||||
print("Connected to Misskey Database.")
|
||||
|
||||
load_user_cache(db_conn)
|
||||
|
||||
users = fetch_all_local_users()
|
||||
backups_dir = Path("backups")
|
||||
if not backups_dir.exists():
|
||||
backups_dir.mkdir()
|
||||
|
||||
for user in users:
|
||||
username = user["username"]
|
||||
raw_username = str(user.get("username", "unknown"))
|
||||
username = "".join(
|
||||
c for c in raw_username if c.isalnum() or c in "._-"
|
||||
).strip()
|
||||
if not username:
|
||||
username = user["id"]
|
||||
|
||||
user_id = user["id"]
|
||||
print(f"Processing user: @{username} ({user_id})")
|
||||
print(f"Processing user: {get_handle(user_id)}")
|
||||
|
||||
user_dir = backups_dir / username
|
||||
if not user_dir.exists():
|
||||
user_dir.mkdir(parents=True)
|
||||
|
||||
export_user_data(user, user_dir)
|
||||
export_notes(user_id, user_dir)
|
||||
export_drive_admin(user_id, user_dir)
|
||||
# Admin cannot easily fetch other users' lists/antennas
|
||||
# as there are no admin/lists or admin/antennas endpoints visible.
|
||||
export_lists_db(user_id, user_dir, db_conn)
|
||||
export_antennas_db(user_id, user_dir, db_conn)
|
||||
|
||||
print("Bulk export complete!")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
finally:
|
||||
if db_conn:
|
||||
db_conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
Requests
|
||||
psycopg2-binary
|
||||
|
||||
Reference in New Issue
Block a user