]> glassweightruler.freedombox.rocks Git - waydroid.git/blob - tools/helpers/ipc.py
debian/control: add Depends on pipewire-pulse | pulseaudio
[waydroid.git] / tools / helpers / ipc.py
1 # Copyright 2022 Alessandro Astone
2 # SPDX-License-Identifier: GPL-3.0-or-later
3
4 # Currently implemented as FIFO
5 import os
6 import dbus
7
8 BASE_DIR = "/var/run/"
9
10 def pipe_for(channel):
11 return BASE_DIR + "waydroid-" + channel
12
13 def read_one(channel):
14 with open_channel(channel, "r", 1) as fifo:
15 while True:
16 data = fifo.read()
17 if len(data) != 0:
18 return data
19
20 def create_channel(channel):
21 pipe = pipe_for(channel)
22 if not os.path.exists(pipe):
23 os.mkfifo(pipe)
24
25 def open_channel(channel, mode, buffering=0):
26 return open(pipe_for(channel), mode, buffering)
27
28 def notify(channel, msg):
29 try:
30 fd = os.open(pipe_for(channel), os.O_WRONLY | os.O_NONBLOCK)
31 with os.fdopen(fd, "w") as fifo:
32 fifo.write(msg)
33 except Exception:
34 pass
35
36 def notify_blocking(channel, msg):
37 with open_channel(channel, "w", 1) as channel:
38 channel.write(msg)
39
40 def DBusContainerService(object_path="/ContainerManager", intf="id.waydro.ContainerManager"):
41 return dbus.Interface(dbus.SystemBus().get_object("id.waydro.Container", object_path), intf)
42
43 def DBusSessionService(object_path="/SessionManager", intf="id.waydro.SessionManager"):
44 return dbus.Interface(dbus.SessionBus().get_object("id.waydro.Session", object_path), intf)