diff --git a/src/devranta/api_plain.py b/src/devranta/api_plain.py new file mode 100644 index 0000000..44eb465 --- /dev/null +++ b/src/devranta/api_plain.py @@ -0,0 +1,249 @@ +import json +import urllib.parse +import http.client +import functools + +class Api: + + base_url = "www.devrant.io" + + def __init__(self, username=None, password=None): + self.username = username + self.password = password + self.auth = None + self.app_id = 3 + self.user_id = None + self.token_id = None + self.token_key = None + + def patch_auth(self, request_dict=None): + auth_dict = {"app": self.app_id} + if self.auth: + auth_dict.update( + user_id=self.user_id, token_id=self.token_id, token_key=self.token_key + ) + if not request_dict: + return auth_dict + request_dict.update(auth_dict) + return request_dict + + def login(self): + if not self.username or not self.password: + raise Exception("No authentication details supplied.") + conn = http.client.HTTPSConnection(self.base_url) + payload = json.dumps({ + "username": self.username, + "password": self.password, + "app": self.app_id, + }) + headers = {'Content-Type': 'application/json'} + conn.request("POST", "/api/users/auth-token", payload, headers) + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + if not obj.get("success"): + return False + self.auth = obj.get("auth_token") + if not self.auth: + return False + self.user_id = self.auth.get("user_id") + self.token_id = self.auth.get("id") + self.token_key = self.auth.get("key") + return True + + def ensure_login(self): + if not self.auth: + return self.login() + return True + + @functools.lru_cache() + def register_user(self, email, username, password): + conn = http.client.HTTPSConnection(self.base_url) + payload = json.dumps(self.patch_auth({ + "email": email, + "username": username, + "password": password, + "plat": 3 + })) + headers = {'Content-Type': 'application/json'} + conn.request("POST", "/api/users", payload, headers) + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + return obj.get('success', False) + + @functools.lru_cache() + def get_comments_from_user(self, username): + user_id = self.get_user_id(username) + profile = self.get_profile(user_id) + return profile.get("content", {}).get("content", {}).get("comments", []) + + @functools.lru_cache() + def post_comment(self, rant_id, comment): + if not self.ensure_login(): + return False + conn = http.client.HTTPSConnection(self.base_url) + payload = json.dumps(self.patch_auth({"comment": comment, "plat": 2})) + headers = {'Content-Type': 'application/json'} + conn.request("POST", f"/api/devrant/rants/{rant_id}/comments", payload, headers) + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + return obj.get("success", False) + + @functools.lru_cache() + def get_comment(self, id_): + conn = http.client.HTTPSConnection(self.base_url) + conn.request("GET", f"/api/comments/{id_}?" + urllib.parse.urlencode(self.patch_auth())) + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + if not obj.get("success"): + return None + return obj.get("comment") + + @functools.lru_cache() + def delete_comment(self, id_): + if not self.ensure_login(): + return False + conn = http.client.HTTPSConnection(self.base_url) + conn.request("DELETE", f"/api/comments/{id_}?" + urllib.parse.urlencode(self.patch_auth())) + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + return obj.get("success", False) + + @functools.lru_cache() + def get_profile(self, id_): + conn = http.client.HTTPSConnection(self.base_url) + conn.request("GET", f"/api/users/{id_}?" + urllib.parse.urlencode(self.patch_auth())) + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + if not obj.get("success"): + return None + return obj.get("profile") + + @functools.lru_cache() + def search(self, term): + conn = http.client.HTTPSConnection(self.base_url) + params = urllib.parse.urlencode(self.patch_auth({"term": term})) + conn.request("GET", f"/api/devrant/search?{params}") + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + if not obj.get("success"): + return + return obj.get("results", []) + + @functools.lru_cache() + def get_rant(self, id): + conn = http.client.HTTPSConnection(self.base_url) + conn.request("GET", f"/api/devrant/rants/{id}?"+urllib.parse.urlencode(self.patch_auth())) + response = conn.getresponse() + data = response.read() + return json.loads(data) + + @functools.lru_cache() + def get_rants(self, sort="recent", limit=20, skip=0): + conn = http.client.HTTPSConnection(self.base_url) + params = urllib.parse.urlencode(self.patch_auth({"sort": sort, "limit": limit, "skip": skip})) + conn.request("GET", f"/api/devrant/rants?{params}") + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + if not obj.get("success"): + return + return obj.get("rants", []) + + @functools.lru_cache() + def get_user_id(self, username): + conn = http.client.HTTPSConnection(self.base_url) + params = urllib.parse.urlencode(self.patch_auth({"username": username})) + conn.request("GET", f"/api/get-user-id?{params}") + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + if not obj.get("success"): + return None + return obj.get("user_id") + + @functools.lru_cache() + def update_comment(self, comment_id, comment): + if not self.ensure_login(): + return None + conn = http.client.HTTPSConnection(self.base_url) + payload = json.dumps(self.patch_auth({"comment": comment})) + headers = {'Content-Type': 'application/json'} + conn.request("POST", f"/api/comments/{comment_id}", payload, headers) + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + return obj.get("success", False) + + @functools.lru_cache() + def vote_rant(self, rant_id, vote, reason=None): + if not self.ensure_login(): + return None + conn = http.client.HTTPSConnection(self.base_url) + payload = json.dumps(self.patch_auth({"vote": vote, "reason": reason})) + headers = {'Content-Type': 'application/json'} + conn.request("POST", f"/api/devrant/rants/{rant_id}/vote", payload, headers) + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + return obj.get("success", False) + + @functools.lru_cache() + def vote_comment(self, comment_id, vote, reason=None): + if not self.ensure_login(): + return None + conn = http.client.HTTPSConnection(self.base_url) + payload = json.dumps(self.patch_auth({"vote": vote, "reason": reason})) + headers = {'Content-Type': 'application/json'} + conn.request("POST", f"/api/comments/{comment_id}/vote", payload, headers) + response = conn.getresponse() + data = response.read() + obj = json.loads(data) + return obj.get("success", False) + + @property + def notifs(self): + if not self.ensure_login(): + return + conn = http.client.HTTPSConnection(self.base_url) + conn.request("GET", "/api/users/me/notif-feed?" + urllib.parse.urlencode(self.patch_auth())) + response = conn.getresponse() + data = response.read() + return json.loads(data).get("data", {}).get("items", []) + +def filter_field(name, obj): + results = [] + if type(obj) in (list,tuple): + for value in obj: + results += filter_field(name, value) + elif type(obj) == dict: + for key, value in obj.items(): + if key == name: + results.append(value) + if type(value) in (list,dict,tuple): + results += filter_field(name, value) + return results + + +def fetch_all(rants, rant_ids): + usernames = filter_field("user_username",rants) + user_ids = [api.get_user_id(username) for username in usernames] + profiles = [api.get_profile(user_id) for user_id in user_ids] + new_rant_ids = [rant_id for rant_id in filter_field("rant_id", profiles) if not rant_id in rant_ids] + new_rants = [] + for rant_id in set(new_rant_ids): + rant_ids.append(rant_id) + new_rants.append(api.get_rant(rant_id)) + print(rant_id) + + if new_rants: + return fetch_all(new_rants,rant_ids) + + return rant_ids +