]> glassweightruler.freedombox.rocks Git - waydroid.git/blob - tools/interfaces/IStatusBarService.py
arch: Separately identify arm64_only CPUs without AArch32 support
[waydroid.git] / tools / interfaces / IStatusBarService.py
1 import gbinder
2 import logging
3 import time
4 from tools import helpers
5 from gi.repository import GLib
6 import signal
7
8
9 INTERFACE = "com.android.internal.statusbar.IStatusBarService"
10 SERVICE_NAME = "statusbar"
11
12 TRANSACTION_expand = 1
13 TRANSACTION_collapse = 2
14
15 class IStatusBarService:
16 def __init__(self, remote):
17 self.client = gbinder.Client(remote, INTERFACE)
18
19 def expand(self):
20 request = self.client.new_request()
21 reply, status = self.client.transact_sync_reply(
22 TRANSACTION_expand, request)
23
24 if status:
25 logging.error("Sending reply failed")
26 else:
27 reader = reply.init_reader()
28 status, exception = reader.read_int32()
29 if exception != 0:
30 logging.error("Failed with code: {}".format(exception))
31
32 def collapse(self):
33 request = self.client.new_request()
34 reply, status = self.client.transact_sync_reply(
35 TRANSACTION_collapse, request)
36
37 if status:
38 logging.error("Sending reply failed")
39 else:
40 reader = reply.init_reader()
41 status, exception = reader.read_int32()
42 if exception != 0:
43 logging.error("Failed with code: {}".format(exception))
44
45 def get_service(args):
46 helpers.drivers.loadBinderNodes(args)
47 try:
48 serviceManager = gbinder.ServiceManager("/dev/" + args.BINDER_DRIVER, args.SERVICE_MANAGER_PROTOCOL, args.BINDER_PROTOCOL)
49 except TypeError:
50 serviceManager = gbinder.ServiceManager("/dev/" + args.BINDER_DRIVER)
51
52 if not serviceManager.is_present():
53 logging.info("Waiting for binder Service Manager...")
54 if not wait_for_manager(serviceManager):
55 logging.error("Service Manager never appeared")
56 return None
57
58 tries = 1000
59
60 remote, status = serviceManager.get_service_sync(SERVICE_NAME)
61 while(not remote):
62 if tries > 0:
63 logging.warning(
64 "Failed to get service {}, trying again...".format(SERVICE_NAME))
65 time.sleep(1)
66 remote, status = serviceManager.get_service_sync(SERVICE_NAME)
67 tries = tries - 1
68 else:
69 return None
70
71 return IStatusBarService(remote)
72
73 # Like ServiceManager.wait() but can be interrupted
74 def wait_for_manager(sm):
75 mainloop = GLib.MainLoop()
76 hndl = sm.add_presence_handler(lambda: mainloop.quit() if sm.is_present() else None)
77 GLib.timeout_add_seconds(60, lambda: mainloop.quit())
78 GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, lambda _: mainloop.quit(), None)
79 mainloop.run()
80 sm.remove_handler(hndl)
81 if not sm.is_present():
82 return False
83 return True