]>
glassweightruler.freedombox.rocks Git - waydroid.git/blob - tools/helpers/ipc.py
1 # Copyright 2022 Alessandro Astone
2 # SPDX-License-Identifier: GPL-3.0-or-later
4 # Currently implemented as FIFO
10 def pipe_for(channel
):
11 return BASE_DIR
+ "waydroid-" + channel
13 def read_one(channel
):
14 with open_channel(channel
, "r", 1) as fifo
:
20 def create_channel(channel
):
21 pipe
= pipe_for(channel
)
22 if not os
.path
.exists(pipe
):
25 def open_channel(channel
, mode
, buffering
=0):
26 return open(pipe_for(channel
), mode
, buffering
)
28 def notify(channel
, msg
):
30 fd
= os
.open(pipe_for(channel
), os
.O_WRONLY | os
.O_NONBLOCK
)
31 with os
.fdopen(fd
, "w") as fifo
:
36 def notify_blocking(channel
, msg
):
37 with open_channel(channel
, "w", 1) as channel
:
40 def DBusContainerService(object_path
="/ContainerManager", intf
="id.waydro.ContainerManager"):
41 return dbus
.Interface(dbus
.SystemBus().get_object("id.waydro.Container", object_path
), intf
)
43 def DBusSessionService(object_path
="/SessionManager", intf
="id.waydro.SessionManager"):
44 return dbus
.Interface(dbus
.SessionBus().get_object("id.waydro.Session", object_path
), intf
)