import aiohttp HOMESERVER = "https://social.alemi.dev" API_BASE = "/api/v1" async def following(sess: aiohttp.ClientSession, id: str, token: str = "") -> set[str]: out : set[str] = set() limit = 40 params = {"limit": limit} headers = {"Authorization": f"Bearer {token}"} if token else {} while True: print(f"fetching followed users ({len(out)}/{len(out)+limit})") async with sess.get( f"{HOMESERVER}{API_BASE}/accounts/{id}/following", params=params, headers=headers, ) as res: res.raise_for_status() doc = await res.json() for user in doc: out.add(user["acct"]) print(out) if len(doc) < limit: break params["max_id"] = doc[-1]["id"] return out async def lists(sess: aiohttp.ClientSession, token: str = "") -> dict[str, set[str]]: headers = {"Authorization": f"Bearer {token}"} if token else {} print("fetching all lists") async with sess.get(f"{HOMESERVER}{API_BASE}/lists", headers=headers) as res: res.raise_for_status() doc = await res.json() list_ids = { l["title"]: l["id"] for l in doc } print(list_ids) list_accounts : dict[str, set[str]] = {} for l_name, l_id in list_ids.items(): list_accounts[l_name] = set() async with sess.get(f"{HOMESERVER}{API_BASE}/lists/{l_id}/accounts", headers=headers) as res: res.raise_for_status() doc = await res.json() for u in doc: list_accounts[l_name].add(u["acct"]) print(f"users of list {l_name} [{l_id}] : {list_accounts[l_name]}") return list_accounts async def main(): TOKEN = "1rpaIxjg-dLEI4v22Nv-p8-Wx5wUtp9VgHGhNHrkWgI" MY_ID = "AZwC011g1IzjFkJXxA" async with aiohttp.ClientSession() as sess: followed_users = await following(sess, MY_ID, TOKEN) my_lists = await lists(sess, TOKEN) print("done, calculating...") all_users_in_a_list = set() for name, users in my_lists.items(): all_users_in_a_list = all_users_in_a_list.union(users) print("users not in any list:") for u in followed_users: if u not in all_users_in_a_list: print(u) if __name__ == "__main__": import asyncio asyncio.run(main())