# retoor <retoor@molodetz.nl>
from __future__ import annotations
import os
import sys
import threading
import time
from client import DevPlace
URL = os.environ.get("DEVPLACE_URL", "https://devplace.net")
API_KEY = os.environ.get("DEVPLACE_API_KEY", "YOUR_API_KEY")
POLL_SECONDS = int(os.environ.get("DEVPLACE_POLL_SECONDS", "3"))
PEER = os.environ.get("DEVPLACE_PEER") or (sys.argv[1] if len(sys.argv) > 1 else "")
def resolve_uid(dp: DevPlace, username: str) -> str:
detail = dp.call("profile.detail", username=username)
return detail["profile_user"]["uid"]
def receive_loop(dp: DevPlace, peer_uid: str, stop: threading.Event) -> None:
seen: set[str] = set()
primed = False
while not stop.is_set():
try:
thread = dp.call("messages.inbox", with_uid=peer_uid)
for item in thread.get("messages", []):
message_uid = item["message"]["uid"]
if message_uid in seen:
continue
seen.add(message_uid)
if primed and not item.get("is_mine"):
sender = item.get("sender") or {}
sys.stdout.write(
f"\n{sender.get('username', 'them')}> {item['message']['content']}\nyou> "
)
sys.stdout.flush()
primed = True
except Exception as error:
sys.stderr.write(f"\n[receive error: {error}]\n")
time.sleep(POLL_SECONDS)
def run() -> None:
if not PEER:
print("Usage: DEVPLACE_PEER=<username> python dm_client.py (or pass it as an argument)")
return
dp = DevPlace(URL, api_key=API_KEY)
peer_uid = resolve_uid(dp, PEER)
stop = threading.Event()
worker = threading.Thread(target=receive_loop, args=(dp, peer_uid, stop), daemon=True)
worker.start()
print(f"Chatting with @{PEER}. Type a message and press enter; Ctrl-C to quit.")
try:
while True:
text = input("you> ")
if text.strip():
dp.call("messages.send", content=text, receiver_uid=peer_uid)
except (KeyboardInterrupt, EOFError):
stop.set()
print("\nbye")
if __name__ == "__main__":
run()