File: //usr/lib/python3.9/site-packages/cockpit/channels/dbus.py
# This file is part of Cockpit.
#
# Copyright (C) 2022 Red Hat, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# Missing stuff compared to the C bridge that we should probably add:
#
# - removing matches
# - removing watches
# - emitting of signals
# - publishing of objects
# - failing more gracefully in some cases (during open, etc)
#
# Stuff we might or might not do:
#
# - using non-default service names
#
# Stuff we should probably not do:
#
# - emulation of ObjectManager via recursive introspection
# - automatic detection of ObjectManager below the given path_namespace
# - recursive scraping of properties for new object paths
# (for path_namespace watches that don't hit an ObjectManager)
import asyncio
import errno
import json
import logging
import traceback
import xml.etree.ElementTree as ET
from cockpit._vendor import systemd_ctypes
from cockpit._vendor.systemd_ctypes import Bus, BusError, introspection
from ..channel import Channel, ChannelError
logger = logging.getLogger(__name__)
# The dbusjson3 payload
#
# This channel payload type translates JSON encoded messages on a
# Cockpit channel to D-Bus messages, in a mostly straightforward way.
# See doc/protocol.md for a description of the basics.
#
# However, dbusjson3 offers some advanced features as well that are
# meant to support the "magic" DBusProxy objects implemented by
# cockpit.js. Those proxy objects "magically" expose all the methods
# and properties of a D-Bus interface without requiring any explicit
# binding code to be generated for a JavaScript client. A dbusjson3
# channel does this by doing automatic introspection and property
# retrieval without much direction from the JavaScript client.
#
# The details of what exactly is done is not specified very strictly,
# and the Python bridge will likely differ from the C bridge
# significantly. This will be informed by what existing code actually
# needs, and we might end up with a more concrete description of what
# a client can actually expect.
#
# Here is an example of a more complex scenario:
#
# - The client adds a "watch" for a path namespace. There is a
# ObjectManager at the given path and the bridge emits "meta" and
# "notify" messages to describe all interfaces and objects reported
# by that ObjectManager.
#
# - The client makes a method call that causes a new object with a new
# interface to appear at the ObjectManager. The bridge will send a
# "meta" and "notify" message to describe this new object.
#
# - Since the InterfacesAdded signal was emitted before the method
# reply, the bridge must send the "meta" and "notify" messages
# before the method reply message.
#
# - However, in order to construct the "meta" message, the bridge must
# perform a Introspect call, and consequently must delay sending the
# method reply until that call has finished.
#
# The Python bridge implements this delaying of messages with
# coroutines and a fair mutex. Every message coming from D-Bus will
# wait on the mutex for its turn to send its message on the Cockpit
# channel, and will keep that mutex locked until it is done with
# sending. Since the mutex is fair, everyone will nicely wait in line
# without messages getting re-ordered.
#
# The scenario above will play out like this:
#
# - While adding the initial "watch", the lock is held until the
# "meta" and "notify" messages have been sent.
#
# - Later, when the InterfacesAdded signal comes in that has been
# triggered by the method call, the mutex will be locked while the
# necessary introspection is going on.
#
# - The method reply will likely come while the mutex is locked, and
# the task for sending that reply on the Cockpit channel will enter
# the wait queue of the mutex.
#
# - Once the introspection is done and the new "meta" and "notify"
# messages have been sent, the mutex is unlocked, the method reply
# task acquires it, and sends its message.
class InterfaceCache:
def __init__(self):
self.cache = {}
self.old = set() # Interfaces already returned by get_interface_if_new
def inject(self, interfaces):
self.cache.update(interfaces)
async def introspect_path(self, bus, destination, object_path):
xml, = await bus.call_method_async(destination, object_path,
'org.freedesktop.DBus.Introspectable',
'Introspect')
et = ET.fromstring(xml)
interfaces = {tag.attrib['name']: introspection.parse_interface(tag) for tag in et.findall('interface')}
# Add all interfaces we found: we might use them later
self.inject(interfaces)
return interfaces
async def get_interface(self, interface_name, bus=None, destination=None, object_path=None):
try:
return self.cache[interface_name]
except KeyError:
pass
if bus and object_path:
try:
await self.introspect_path(bus, destination, object_path)
except BusError:
pass
return self.cache.get(interface_name)
async def get_interface_if_new(self, interface_name, bus, destination, object_path):
if interface_name in self.old:
return None
self.old.add(interface_name)
return await self.get_interface(interface_name, bus, destination, object_path)
async def get_signature(self, interface_name, method, bus=None, destination=None, object_path=None):
interface = await self.get_interface(interface_name, bus, destination, object_path)
if interface is None:
raise KeyError(f'Interface {interface_name} is not found')
return ''.join(interface['methods'][method]['in'])
def notify_update(notify, path, interface_name, props):
notify.setdefault(path, {})[interface_name] = {k: v.value for k, v in props.items()}
class DBusChannel(Channel):
json_encoder = systemd_ctypes.JSONEncoder(indent=2)
payload = 'dbus-json3'
matches = None
name = None
bus = None
owner = None
async def setup_name_owner_tracking(self):
def send_owner(owner):
# We must be careful not to send duplicate owner
# notifications. cockpit.js relies on that.
if self.owner != owner:
self.owner = owner
self.send_json(owner=owner)
def handler(message):
_name, _old, new = message.get_body()
send_owner(owner=new if new != "" else None)
self.add_signal_handler(handler,
sender='org.freedesktop.DBus',
path='/org/freedesktop/DBus',
interface='org.freedesktop.DBus',
member='NameOwnerChanged',
arg0=self.name)
try:
unique_name, = await self.bus.call_method_async("org.freedesktop.DBus",
"/org/freedesktop/DBus",
"org.freedesktop.DBus",
"GetNameOwner", "s", self.name)
except BusError as error:
if error.name == "org.freedesktop.DBus.Error.NameHasNoOwner":
# Try to start it. If it starts successfully, we will
# get a NameOwnerChanged signal (which will set
# self.owner) before StartServiceByName returns.
try:
await self.bus.call_method_async("org.freedesktop.DBus",
"/org/freedesktop/DBus",
"org.freedesktop.DBus",
"StartServiceByName", "su", self.name, 0)
except BusError as start_error:
logger.debug("Failed to start service '%s': %s", self.name, start_error.message)
self.send_json(owner=None)
else:
logger.debug("Failed to get owner of service '%s': %s", self.name, error.message)
else:
send_owner(unique_name)
def do_open(self, options):
self.cache = InterfaceCache()
self.name = options.get('name')
self.matches = []
bus = options.get('bus')
address = options.get('address')
try:
if address is not None:
if bus is not None and bus != 'none':
raise ChannelError('protocol-error', message='only one of "bus" and "address" can be specified')
logger.debug('get bus with address %s for %s', address, self.name)
self.bus = Bus.new(address=address, bus_client=self.name is not None)
elif bus == 'internal':
logger.debug('get internal bus for %s', self.name)
self.bus = self.router.internal_bus.client
else:
if bus == 'session':
logger.debug('get session bus for %s', self.name)
self.bus = Bus.default_user()
elif bus == 'system' or bus is None:
logger.debug('get system bus for %s', self.name)
self.bus = Bus.default_system()
else:
raise ChannelError('protocol-error', message=f'invalid bus "{bus}"')
except OSError as exc:
raise ChannelError('protocol-error', message=f'failed to connect to {bus} bus: {exc}') from exc
try:
self.bus.attach_event(None, 0)
except OSError as err:
if err.errno != errno.EBUSY:
raise
# This needs to be a fair mutex so that outgoing messages don't
# get re-ordered. asyncio.Lock is fair.
self.watch_processing_lock = asyncio.Lock()
if self.name is not None:
async def get_ready():
async with self.watch_processing_lock:
await self.setup_name_owner_tracking()
if self.owner:
self.ready(unique_name=self.owner)
else:
self.close({'problem': 'not-found'})
self.create_task(get_ready())
else:
self.ready()
def add_signal_handler(self, handler, **kwargs):
r = dict(**kwargs)
r['type'] = 'signal'
if 'sender' not in r and self.name is not None:
r['sender'] = self.name
# HACK - https://github.com/bus1/dbus-broker/issues/309
# path_namespace='/' in a rule does not work.
if r.get('path_namespace') == "/":
del r['path_namespace']
def filter_owner(message):
if self.owner is not None and self.owner == message.get_sender():
handler(message)
if self.name is not None and 'sender' in r and r['sender'] == self.name:
func = filter_owner
else:
func = handler
r_string = ','.join(f"{key}='{value}'" for key, value in r.items())
if not self.is_closing():
# this gets an EINTR very often especially on RHEL 8
while True:
try:
match = self.bus.add_match(r_string, func)
break
except InterruptedError:
pass
self.matches.append(match)
def add_async_signal_handler(self, handler, **kwargs):
def sync_handler(message):
self.create_task(handler(message))
self.add_signal_handler(sync_handler, **kwargs)
async def do_call(self, message):
path, iface, method, args = message['call']
cookie = message.get('id')
flags = message.get('flags')
timeout = message.get('timeout')
if timeout is not None:
# sd_bus timeout is μs, cockpit API timeout is ms
timeout *= 1000
else:
# sd_bus has no "indefinite" timeout, so use MAX_UINT64
timeout = 2 ** 64 - 1
# We have to figure out the signature of the call. Either we got told it:
signature = message.get('type')
# ... or there aren't any arguments
if signature is None and len(args) == 0:
signature = ''
# ... or we need to introspect
if signature is None:
try:
logger.debug('Doing introspection request for %s %s', iface, method)
signature = await self.cache.get_signature(iface, method, self.bus, self.name, path)
except BusError as error:
self.send_json(error=[error.name, [f'Introspection: {error.message}']], id=cookie)
return
except KeyError:
self.send_json(
error=[
"org.freedesktop.DBus.Error.UnknownMethod",
[f"Introspection data for method {iface} {method} not available"]],
id=cookie)
return
except Exception as exc:
self.send_json(error=['python.error', [f'Introspection: {exc!s}']], id=cookie)
return
try:
method_call = self.bus.message_new_method_call(self.name, path, iface, method, signature, *args)
reply = await self.bus.call_async(method_call, timeout=timeout)
# If the method call has kicked off any signals related to
# watch processing, wait for that to be done.
async with self.watch_processing_lock:
# TODO: stop hard-coding the endian flag here.
self.send_json(
reply=[reply.get_body()], id=cookie,
flags="<" if flags is not None else None,
type=reply.get_signature(True)) # noqa: FBT003
except BusError as error:
# actually, should send the fields from the message body
self.send_json(error=[error.name, [error.message]], id=cookie)
except Exception:
logger.exception("do_call(%s): generic exception", message)
self.send_json(error=['python.error', [traceback.format_exc()]], id=cookie)
async def do_add_match(self, message):
add_match = message['add-match']
logger.debug('adding match %s', add_match)
async def match_hit(message):
logger.debug('got match')
async with self.watch_processing_lock:
self.send_json(signal=[
message.get_path(),
message.get_interface(),
message.get_member(),
list(message.get_body())
])
self.add_async_signal_handler(match_hit, **add_match)
async def setup_objectmanager_watch(self, path, interface_name, meta, notify):
# Watch the objects managed by the ObjectManager at "path".
# Properties are not watched, that is done by setup_path_watch
# below via recursive_props == True.
async def handler(message):
member = message.get_member()
if member == "InterfacesAdded":
(path, interface_props) = message.get_body()
logger.debug('interfaces added %s %s', path, interface_props)
meta = {}
notify = {}
async with self.watch_processing_lock:
for name, props in interface_props.items():
if interface_name is None or name == interface_name:
mm = await self.cache.get_interface_if_new(name, self.bus, self.name, path)
if mm:
meta.update({name: mm})
notify_update(notify, path, name, props)
self.send_json(meta=meta)
self.send_json(notify=notify)
elif member == "InterfacesRemoved":
(path, interfaces) = message.get_body()
logger.debug('interfaces removed %s %s', path, interfaces)
async with self.watch_processing_lock:
notify = {path: dict.fromkeys(interfaces)}
self.send_json(notify=notify)
self.add_async_signal_handler(handler,
path=path,
interface="org.freedesktop.DBus.ObjectManager")
objects, = await self.bus.call_method_async(self.name, path,
'org.freedesktop.DBus.ObjectManager',
'GetManagedObjects')
for p, ifaces in objects.items():
for iface, props in ifaces.items():
if interface_name is None or iface == interface_name:
mm = await self.cache.get_interface_if_new(iface, self.bus, self.name, p)
if mm:
meta.update({iface: mm})
notify_update(notify, p, iface, props)
async def setup_path_watch(self, path, interface_name, recursive_props, meta, notify):
# Watch a single object at "path", but maybe also watch for
# property changes for all objects below "path".
async def handler(message):
async with self.watch_processing_lock:
path = message.get_path()
name, props, invalids = message.get_body()
logger.debug('NOTIFY: %s %s %s %s', path, name, props, invalids)
for inv in invalids:
try:
reply, = await self.bus.call_method_async(self.name, path,
'org.freedesktop.DBus.Properties', 'Get',
'ss', name, inv)
except BusError as exc:
logger.debug('failed to fetch property %s.%s on %s %s: %s',
name, inv, self.name, path, str(exc))
continue
props[inv] = reply
notify = {}
notify_update(notify, path, name, props)
self.send_json(notify=notify)
this_meta = await self.cache.introspect_path(self.bus, self.name, path)
if interface_name is not None:
interface = this_meta.get(interface_name)
this_meta = {interface_name: interface}
meta.update(this_meta)
if recursive_props:
self.add_async_signal_handler(handler,
interface="org.freedesktop.DBus.Properties",
path_namespace=path)
else:
self.add_async_signal_handler(handler,
interface="org.freedesktop.DBus.Properties",
path=path)
for name in meta:
if name.startswith("org.freedesktop.DBus."):
continue
try:
props, = await self.bus.call_method_async(self.name, path,
'org.freedesktop.DBus.Properties',
'GetAll', 's', name)
notify_update(notify, path, name, props)
except BusError:
pass
async def do_watch(self, message):
watch = message['watch']
path = watch.get('path')
path_namespace = watch.get('path_namespace')
interface_name = watch.get('interface')
cookie = message.get('id')
path = path or path_namespace
recursive = path == path_namespace
if path is None or cookie is None:
logger.debug('ignored incomplete watch request %s', message)
self.send_json(error=['x.y.z', ['Not Implemented']], id=cookie)
self.send_json(reply=[], id=cookie)
return
try:
async with self.watch_processing_lock:
meta = {}
notify = {}
await self.setup_path_watch(path, interface_name, recursive, meta, notify)
if recursive:
await self.setup_objectmanager_watch(path, interface_name, meta, notify)
self.send_json(meta=meta)
self.send_json(notify=notify)
self.send_json(reply=[], id=message['id'])
except BusError as error:
logger.debug("do_watch(%s) caught D-Bus error: %s", message, error.message)
self.send_json(error=[error.name, [error.message]], id=cookie)
async def do_meta(self, message):
self.cache.inject(message['meta'])
def do_data(self, data):
message = json.loads(data)
logger.debug('receive dbus request %s %s', self.name, message)
if 'call' in message:
self.create_task(self.do_call(message))
elif 'add-match' in message:
self.create_task(self.do_add_match(message))
elif 'watch' in message:
self.create_task(self.do_watch(message))
elif 'meta' in message:
self.create_task(self.do_meta(message))
else:
logger.debug('ignored dbus request %s', message)
return
def do_close(self):
for slot in self.matches:
slot.cancel()
self.matches = []
self.close()