Select Git revision
server.py 6.82 KiB
#!/usr/bin/env python3
import json
import time
import ssl
import asyncio
import websockets
from typing import List, Dict, Awaitable
from enum import Enum, auto
from aioconsole import ainput
LOCAL = False
PORT = 8432
PATH_CERTCHAIN = "/etc/letsencrypt/live/www.mpvsync.de/fullchain.pem"
PATH_PRIVATE_KEY = "/etc/letsencrypt/live/www.mpvsync.de/privkey.pem"
PLAY_REQUEST = {"command": "play"}
PAUSE_REQUEST = {"command": "pause"}
GET_PLAYBACK_TIME_REQUEST = {"command": "get-playback-time"}
SET_PLAYBACK_TIME_REQUEST = {"command": "set-playback-time", "data": None}
class PlayState(Enum):
UNKNOWN = auto()
PLAY = auto()
PAUSE = auto()
class MPVClient:
def __init__(self, ws):
super(MPVClient, self).__init__()
self.__ws = ws
self.__play_state = PlayState.UNKNOWN
self.__playback_time = -1
self.__last_modification = -1
async def send_command(self, msg):
await self.__ws.send(msg)
def update_state(self, parsed_message):
if "playback_time" in parsed_message:
time = parsed_message["playback_time"]
self._set_playback_time(time)
self._refresh_modification()
def get_playback_time(self):
return self.__playback_time
def _refresh_modification(self):
milliseconds = int(round(time.time() * 1000))
self.__last_modification = milliseconds
def _set_playstate(self, new_play_state):
self.__play_state = new_play_state
self._refresh_modification()
def _set_playback_time(self, new_time):
self.__playback_time = new_time
self._refresh_modification()
def is_current(self):
cur = int(round(time.time() * 1000))
return cur - self.__last_modification <= 10 * 1000
def __str__(self):
return f"MPVClient: {self.__ws} {self.__play_state} {self.__playback_time} {self.__last_modification}"
class ClientGroup:
def __init__(self, groupname):
self.__groupname = groupname
self.__clients: List[MPVClient] = []
self.__lock = asyncio.Lock()
async def add_client(self, client):
async with self.__lock:
self.__clients.append(client)
async def remove_client(self, client):
async with self.__lock:
self.__clients.remove(client)
async def _send_command(self, command: Dict[str, any]):
print(f"send command to clients: <{command}>")
msg = json.dumps(command)
async with self.__lock:
for c in self.__clients:
await c.send_command(msg)
async def show_connected_clients(self):
async with self.__lock:
for c in self.__clients:
print("Connected Client: " + str(c))
async def _filter_valid_clients(self):
async with self.__lock:
current_clients = list(filter(lambda x: x.is_current(), self.__clients))
return current_clients
async def _get_current_times_of_clients(self):
current_clients = await self._filter_valid_clients()
return [
c.get_playback_time() for c in current_clients if c.get_playback_time() >= 1
]
async def play(self):
await self._send_command(PLAY_REQUEST)
async def pause(self):
await self._send_command(PAUSE_REQUEST)
async def sync(self):
await self._send_command(GET_PLAYBACK_TIME_REQUEST)
await asyncio.sleep(1)
times = await self._get_current_times_of_clients()
min_time = min(times, default=0)
set_pb_time_filled_req = SET_PLAYBACK_TIME_REQUEST.copy()
set_pb_time_filled_req["data"] = min_time
await self._send_command(set_pb_time_filled_req)
async def sync_to_me(self, playback_time):
set_pb_time_filled_req = SET_PLAYBACK_TIME_REQUEST.copy()
set_pb_time_filled_req["data"] = playback_time
await self._send_command(set_pb_time_filled_req)
async def sendCommandToClients(self, parsed):
command = parsed["command"]
if command == "play":
await self.play()
elif command == "pause":
await self.pause()
elif command == "sync":
await self.sync()
elif command == "sync_to_me":
if "playback_time" not in parsed:
print(
f"Ignoring sync_to_me command, because playback_time is missing: <{parsed}>"
)
return
playback_time = parsed["playback_time"]
await self.sync_to_me(playback_time)
elif command == "show":
await self.show_connected_clients()
else:
print(f"unknown command {command}")
class WatchGroups:
def __init__(self):
self.__lock = asyncio.Lock()
self.__groups: Dict[str, ClientGroup] = {}
async def get_clientgroup_by_name(self, name: str) -> ClientGroup:
async with self.__lock:
if name not in self.__groups:
group = ClientGroup(name)
self.__groups[name] = group
return self.__groups[name]
groups = WatchGroups()
async def console_input():
while True:
com = await ainput("> ")
print("cool, but currently we dont listen on this...")
async def handle(ws: websockets.WebSocketServerProtocol, path: str) -> None:
client = MPVClient(ws)
print("new connection: " + str(ws))
path = path.strip("/")
print(path)
clients = await groups.get_clientgroup_by_name(path)
await clients.add_client(client)
try:
async for message in ws:
print(f"Get message from <{client}>: <{message}>")
try:
parsed = json.loads(message)
except json.JSONDecodeError:
print(f"Ignoring non-JSON message: <{message}>")
continue
if "target" in parsed:
target = parsed["target"]
else:
target = "client"
if target == "client":
client.update_state(parsed)
else:
if "command" not in parsed:
print(
f"Ignoring message that does not contain a command <{message}>"
)
else:
asyncio.create_task(clients.sendCommandToClients(parsed))
finally:
await clients.remove_client(client)
print("connection closed: " + str(ws))
def main():
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(PATH_CERTCHAIN, PATH_PRIVATE_KEY)
hostname = "127.0.0.1" if LOCAL else None
port = PORT
try:
async def async_main():
await websockets.serve(handle, hostname, port, ssl=ssl_context)
await console_input()
asyncio.run(async_main())
except (KeyboardInterrupt, EOFError):
print("Bye bye :)")
return
if __name__ == "__main__":
main()