Skip to content
Snippets Groups Projects
Select Git revision
  • 9e7346b8bb56ff0577d9a27f6283ad29e4ab632f
  • main default protected
  • docker_compose_development
  • refactor_input_box
  • reconnect_on_startup
5 results

server.py

Blame
  • server.py 6.82 KiB
    #!/usr/bin/env python3
    
    import json
    import time
    import ssl
    import asyncio
    import websockets
    
    from typing import List, Dict, Awaitable
    from enum import Enum, auto
    
    from aioconsole import ainput
    
    LOCAL = False
    PORT = 8432
    PATH_CERTCHAIN = "/etc/letsencrypt/live/www.mpvsync.de/fullchain.pem"
    PATH_PRIVATE_KEY = "/etc/letsencrypt/live/www.mpvsync.de/privkey.pem"
    
    PLAY_REQUEST = {"command": "play"}
    PAUSE_REQUEST = {"command": "pause"}
    GET_PLAYBACK_TIME_REQUEST = {"command": "get-playback-time"}
    SET_PLAYBACK_TIME_REQUEST = {"command": "set-playback-time", "data": None}
    
    
    class PlayState(Enum):
        UNKNOWN = auto()
        PLAY = auto()
        PAUSE = auto()
    
    
    class MPVClient:
        def __init__(self, ws):
            super(MPVClient, self).__init__()
            self.__ws = ws
            self.__play_state = PlayState.UNKNOWN
            self.__playback_time = -1
            self.__last_modification = -1
    
        async def send_command(self, msg):
            await self.__ws.send(msg)
    
        def update_state(self, parsed_message):
            if "playback_time" in parsed_message:
                time = parsed_message["playback_time"]
                self._set_playback_time(time)
    
            self._refresh_modification()
    
        def get_playback_time(self):
            return self.__playback_time
    
        def _refresh_modification(self):
            milliseconds = int(round(time.time() * 1000))
            self.__last_modification = milliseconds
    
        def _set_playstate(self, new_play_state):
            self.__play_state = new_play_state
            self._refresh_modification()
    
        def _set_playback_time(self, new_time):
            self.__playback_time = new_time
            self._refresh_modification()
    
        def is_current(self):
            cur = int(round(time.time() * 1000))
    
            return cur - self.__last_modification <= 10 * 1000
    
        def __str__(self):
            return f"MPVClient: {self.__ws} {self.__play_state} {self.__playback_time} {self.__last_modification}"
    
    
    class ClientGroup:
        def __init__(self, groupname):
            self.__groupname = groupname
            self.__clients: List[MPVClient] = []
            self.__lock = asyncio.Lock()
    
        async def add_client(self, client):
            async with self.__lock:
                self.__clients.append(client)
    
        async def remove_client(self, client):
            async with self.__lock:
                self.__clients.remove(client)
    
        async def _send_command(self, command: Dict[str, any]):
            print(f"send command to clients: <{command}>")
            msg = json.dumps(command)
    
            async with self.__lock:
                for c in self.__clients:
                    await c.send_command(msg)
    
        async def show_connected_clients(self):
            async with self.__lock:
                for c in self.__clients:
                    print("Connected Client: " + str(c))
    
        async def _filter_valid_clients(self):
            async with self.__lock:
                current_clients = list(filter(lambda x: x.is_current(), self.__clients))
                return current_clients
    
        async def _get_current_times_of_clients(self):
    
            current_clients = await self._filter_valid_clients()
            return [
                c.get_playback_time() for c in current_clients if c.get_playback_time() >= 1
            ]
    
        async def play(self):
            await self._send_command(PLAY_REQUEST)
    
        async def pause(self):
            await self._send_command(PAUSE_REQUEST)
    
        async def sync(self):
            await self._send_command(GET_PLAYBACK_TIME_REQUEST)
    
            await asyncio.sleep(1)
    
            times = await self._get_current_times_of_clients()
            min_time = min(times, default=0)
    
            set_pb_time_filled_req = SET_PLAYBACK_TIME_REQUEST.copy()
            set_pb_time_filled_req["data"] = min_time
            await self._send_command(set_pb_time_filled_req)
    
        async def sync_to_me(self, playback_time):
            set_pb_time_filled_req = SET_PLAYBACK_TIME_REQUEST.copy()
            set_pb_time_filled_req["data"] = playback_time
            await self._send_command(set_pb_time_filled_req)
    
        async def sendCommandToClients(self, parsed):
            command = parsed["command"]
            if command == "play":
                await self.play()
            elif command == "pause":
                await self.pause()
            elif command == "sync":
                await self.sync()
            elif command == "sync_to_me":
                if "playback_time" not in parsed:
                    print(
                        f"Ignoring sync_to_me command, because playback_time is missing: <{parsed}>"
                    )
                    return
                playback_time = parsed["playback_time"]
                await self.sync_to_me(playback_time)
            elif command == "show":
                await self.show_connected_clients()
            else:
                print(f"unknown command {command}")
    
    
    class WatchGroups:
        def __init__(self):
            self.__lock = asyncio.Lock()
            self.__groups: Dict[str, ClientGroup] = {}
    
        async def get_clientgroup_by_name(self, name: str) -> ClientGroup:
            async with self.__lock:
                if name not in self.__groups:
                    group = ClientGroup(name)
                    self.__groups[name] = group
    
                return self.__groups[name]
    
    
    groups = WatchGroups()
    
    
    async def console_input():
        while True:
            com = await ainput("> ")
            print("cool, but currently we dont listen on this...")
    
    
    async def handle(ws: websockets.WebSocketServerProtocol, path: str) -> None:
        client = MPVClient(ws)
        print("new connection: " + str(ws))
        path = path.strip("/")
        print(path)
    
        clients = await groups.get_clientgroup_by_name(path)
        await clients.add_client(client)
    
        try:
            async for message in ws:
                print(f"Get message from <{client}>: <{message}>")
                try:
                    parsed = json.loads(message)
                except json.JSONDecodeError:
                    print(f"Ignoring non-JSON message: <{message}>")
                    continue
    
                if "target" in parsed:
                    target = parsed["target"]
                else:
                    target = "client"
    
                if target == "client":
                    client.update_state(parsed)
                else:
                    if "command" not in parsed:
                        print(
                            f"Ignoring message that does not contain a command <{message}>"
                        )
                    else:
                        asyncio.create_task(clients.sendCommandToClients(parsed))
    
        finally:
            await clients.remove_client(client)
            print("connection closed: " + str(ws))
    
    
    def main():
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        ssl_context.load_cert_chain(PATH_CERTCHAIN, PATH_PRIVATE_KEY)
    
        hostname = "127.0.0.1" if LOCAL else None
        port = PORT
    
        try:
    
            async def async_main():
                await websockets.serve(handle, hostname, port, ssl=ssl_context)
                await console_input()
    
            asyncio.run(async_main())
        except (KeyboardInterrupt, EOFError):
            print("Bye bye :)")
            return
    
    
    if __name__ == "__main__":
        main()