mirror of
https://git.alemi.dev/fedimut.git
synced 2024-11-12 20:09:20 +01:00
feat: initial python proof-of-concept
This commit is contained in:
commit
e99e05617f
1 changed files with 67 additions and 0 deletions
67
fedi-list-tool.py
Normal file
67
fedi-list-tool.py
Normal file
|
@ -0,0 +1,67 @@
|
|||
import aiohttp
|
||||
|
||||
HOMESERVER = "https://social.alemi.dev"
|
||||
API_BASE = "/api/v1"
|
||||
|
||||
async def following(sess: aiohttp.ClientSession, id: str, token: str = "") -> set[str]:
|
||||
out : set[str] = set()
|
||||
limit = 40
|
||||
params = {"limit": limit}
|
||||
headers = {"Authorization": f"Bearer {token}"} if token else {}
|
||||
while True:
|
||||
print(f"fetching followed users ({len(out)}/{len(out)+limit})")
|
||||
async with sess.get(
|
||||
f"{HOMESERVER}{API_BASE}/accounts/{id}/following",
|
||||
params=params,
|
||||
headers=headers,
|
||||
) as res:
|
||||
res.raise_for_status()
|
||||
doc = await res.json()
|
||||
for user in doc:
|
||||
out.add(user["acct"])
|
||||
print(out)
|
||||
if len(doc) < limit:
|
||||
break
|
||||
params["max_id"] = doc[-1]["id"]
|
||||
return out
|
||||
|
||||
async def lists(sess: aiohttp.ClientSession, token: str = "") -> dict[str, set[str]]:
|
||||
headers = {"Authorization": f"Bearer {token}"} if token else {}
|
||||
print("fetching all lists")
|
||||
async with sess.get(f"{HOMESERVER}{API_BASE}/lists", headers=headers) as res:
|
||||
res.raise_for_status()
|
||||
doc = await res.json()
|
||||
list_ids = { l["title"]: l["id"] for l in doc }
|
||||
print(list_ids)
|
||||
list_accounts : dict[str, set[str]] = {}
|
||||
for l_name, l_id in list_ids.items():
|
||||
list_accounts[l_name] = set()
|
||||
async with sess.get(f"{HOMESERVER}{API_BASE}/lists/{l_id}/accounts", headers=headers) as res:
|
||||
res.raise_for_status()
|
||||
doc = await res.json()
|
||||
for u in doc:
|
||||
list_accounts[l_name].add(u["acct"])
|
||||
print(f"users of list {l_name} [{l_id}] : {list_accounts[l_name]}")
|
||||
return list_accounts
|
||||
|
||||
|
||||
async def main():
|
||||
TOKEN = "1rpaIxjg-dLEI4v22Nv-p8-Wx5wUtp9VgHGhNHrkWgI"
|
||||
MY_ID = "AZwC011g1IzjFkJXxA"
|
||||
async with aiohttp.ClientSession() as sess:
|
||||
followed_users = await following(sess, MY_ID, TOKEN)
|
||||
my_lists = await lists(sess, TOKEN)
|
||||
print("done, calculating...")
|
||||
all_users_in_a_list = set()
|
||||
for name, users in my_lists.items():
|
||||
all_users_in_a_list = all_users_in_a_list.union(users)
|
||||
print("users not in any list:")
|
||||
for u in followed_users:
|
||||
if u not in all_users_in_a_list:
|
||||
print(u)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
asyncio.run(main())
|
Loading…
Reference in a new issue