From e99e05617fc43ae7f2c086d21a4aadef789ac7a4 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 28 Feb 2024 17:08:58 +0100 Subject: [PATCH] feat: initial python proof-of-concept --- fedi-list-tool.py | 67 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 fedi-list-tool.py diff --git a/fedi-list-tool.py b/fedi-list-tool.py new file mode 100644 index 0000000..97e961d --- /dev/null +++ b/fedi-list-tool.py @@ -0,0 +1,67 @@ +import aiohttp + +HOMESERVER = "https://social.alemi.dev" +API_BASE = "/api/v1" + +async def following(sess: aiohttp.ClientSession, id: str, token: str = "") -> set[str]: + out : set[str] = set() + limit = 40 + params = {"limit": limit} + headers = {"Authorization": f"Bearer {token}"} if token else {} + while True: + print(f"fetching followed users ({len(out)}/{len(out)+limit})") + async with sess.get( + f"{HOMESERVER}{API_BASE}/accounts/{id}/following", + params=params, + headers=headers, + ) as res: + res.raise_for_status() + doc = await res.json() + for user in doc: + out.add(user["acct"]) + print(out) + if len(doc) < limit: + break + params["max_id"] = doc[-1]["id"] + return out + +async def lists(sess: aiohttp.ClientSession, token: str = "") -> dict[str, set[str]]: + headers = {"Authorization": f"Bearer {token}"} if token else {} + print("fetching all lists") + async with sess.get(f"{HOMESERVER}{API_BASE}/lists", headers=headers) as res: + res.raise_for_status() + doc = await res.json() + list_ids = { l["title"]: l["id"] for l in doc } + print(list_ids) + list_accounts : dict[str, set[str]] = {} + for l_name, l_id in list_ids.items(): + list_accounts[l_name] = set() + async with sess.get(f"{HOMESERVER}{API_BASE}/lists/{l_id}/accounts", headers=headers) as res: + res.raise_for_status() + doc = await res.json() + for u in doc: + list_accounts[l_name].add(u["acct"]) + print(f"users of list {l_name} [{l_id}] : {list_accounts[l_name]}") + return list_accounts + + +async def main(): + TOKEN = "1rpaIxjg-dLEI4v22Nv-p8-Wx5wUtp9VgHGhNHrkWgI" + MY_ID = "AZwC011g1IzjFkJXxA" + async with aiohttp.ClientSession() as sess: + followed_users = await following(sess, MY_ID, TOKEN) + my_lists = await lists(sess, TOKEN) + print("done, calculating...") + all_users_in_a_list = set() +for name, users in my_lists.items(): + all_users_in_a_list = all_users_in_a_list.union(users) + print("users not in any list:") + for u in followed_users: + if u not in all_users_in_a_list: + print(u) + + + +if __name__ == "__main__": + import asyncio + asyncio.run(main())