import asyncio
HOST = '127.0.0.1'
PORT = 6667
clients = {}
channels = {}
class IRCProtocol(asyncio.Protocol):
def __init__(self):
self.transport = None
self.current_channel = None
self.peername = None
def connection_made(self, transport):
self.transport = transport
self.peername = transport.get_extra_info('peername')
clients[self] = self.peername
print(f"Connection from {self.peername}")
def data_received(self, data):
message = data.decode('utf-8').strip()
if message:
if message.startswith('/join '):
channel_name = message.split(' ', 1)[1].strip()
if channel_name not in channels:
channels[channel_name] = set()
channels[channel_name].add(self)
self.current_channel = channel_name
self.transport.write(f"Joined channel {channel_name}\n".encode('utf-8'))
elif message.startswith('/leave'):
if self.current_channel and self in channels.get(self.current_channel, set()):
channels[self.current_channel].remove(self)
self.transport.write(f"Left channel {self.current_channel}\n".encode('utf-8'))
self.current_channel = None
elif message.startswith('/list'):
channel_list = ', '.join(channels.keys())
self.transport.write(f"Channels: {channel_list}\n".encode('utf-8'))
elif message.startswith('/users '):
channel_name = message.split(' ', 1)[1].strip()
users = [str(c.transport.get_extra_info('peername')) for c in channels.get(channel_name, set())]
user_list = ', '.join(users)
self.transport.write(f"Users in {channel_name}: {user_list}\n".encode('utf-8'))
elif message.startswith('/shutdown'):
# For simplicity, shutdown server
self.transport.write("Server is shutting down...\n".encode('utf-8'))
asyncio.get_event_loop().stop()
elif message.startswith('/quit'):
self.transport.close()
else:
if self.current_channel and self in channels.get(self.current_channel, set()):
for client in channels[self.current_channel]:
if client != self:
client.transport.write(f"{self.peername}: {message}\n".encode('utf-8'))
else:
self.transport.write("You are not in a channel. Use /join <channel> to join one.\n".encode('utf-8'))
else:
self.transport.close()
def connection_lost(self, exc):
if self.current_channel and self in channels.get(self.current_channel, set()):
channels[self.current_channel].remove(self)
if self in clients:
del clients[self]
print(f"Connection lost from {self.peername}")
async def main():
loop = asyncio.get_running_loop()
server = await loop.create_server(lambda: IRCProtocol(), HOST, PORT)
print(f"Asyncio IRC server started on {HOST}:{PORT}")
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())