diff --git a/qui/clipboard.py b/qui/clipboard.py
index aa1da44..0118bf9 100644
--- a/qui/clipboard.py
+++ b/qui/clipboard.py
@@ -22,8 +22,8 @@
#
# pylint: disable=import-error
-''' Sends notifications via Gio.Notification when something is Copy-Pasted
-via Qubes RPC '''
+""" Sends notifications via Gio.Notification when something is Copy-Pasted
+via Qubes RPC """
# pylint: disable=invalid-name,wrong-import-position
import asyncio
@@ -36,13 +36,15 @@
import qubesadmin.events
import gi
-gi.require_version('Gtk', '3.0') # isort:skip
+
+gi.require_version("Gtk", "3.0") # isort:skip
from gi.repository import Gtk, Gio, Gdk # isort:skip
import gbulb
import pyinotify
import gettext
+
t = gettext.translation("desktop-linux-manager", fallback=True)
_ = t.gettext
@@ -56,45 +58,51 @@
FROM_DIR = "/var/run/qubes/"
XEVENT = "/var/run/qubes/qubes-clipboard.bin.xevent"
APPVIEWER_LOCK = "/var/run/qubes/appviewer.lock"
-COPY_FEATURE = 'gui-default-secure-copy-sequence'
-PASTE_FEATURE = 'gui-default-secure-paste-sequence'
+COPY_FEATURE = "gui-default-secure-copy-sequence"
+PASTE_FEATURE = "gui-default-secure-paste-sequence"
# Defining all messages in one place for easy modification
-ERROR_MALFORMED_DATA = _( \
- "Malformed clipboard data received from " \
- "qube: {vmname}")
-ERROR_ON_COPY = _( \
- "Failed to fetch clipboard data from qube: {vmname}")
-ERROR_ON_PASTE = _( \
- "Failed to paste global clipboard contents to qube: " \
- "{vmname}")
-ERROR_OVERSIZED_DATA = _( \
- "Global clipboard size exceeded.\n" \
- "qube: {vmname} attempted to send {size} bytes to global clipboard."\
- "\nCurrent global clipboard limit is {limit}, increase limit or use " \
- "qvm-copy to transfer large amounts of data between qubes.")
-WARNING_POSSIBLE_TRUNCATION = _( \
- "Global clipboard size limit exceed.\n" \
- "qube: {vmname} attempted to send {size} bytes to global clipboard."\
- "\nGlobal clipboard might have been truncated.\n" \
- "Use qvm-copy to transfer large amounts of data between " \
- "qubes.")
-WARNING_EMPTY_CLIPBOARD = _( \
- "Empty source qube clipboard.\n" \
- "qube: {vmname} attempted to send 0 bytes to global " \
- "clipboard.")
-MSG_COPY_SUCCESS = _( \
- "Clipboard contents fetched from qube: '{vmname}'\n" \
- "Copied {size} to the global clipboard.\n" \
- "Press {shortcut} in qube to paste to local clipboard.")
+ERROR_MALFORMED_DATA = _(
+ "Malformed clipboard data received from qube: {vmname}"
+)
+ERROR_ON_COPY = _("Failed to fetch clipboard data from qube: {vmname}")
+ERROR_ON_PASTE = _(
+ "Failed to paste global clipboard contents to qube: {vmname}"
+)
+ERROR_OVERSIZED_DATA = _(
+ "Global clipboard size exceeded.\n"
+ "qube: {vmname} attempted to send {size} bytes to global clipboard."
+ "\nCurrent global clipboard limit is {limit}, increase limit or use "
+ "qvm-copy to transfer large amounts of data between qubes."
+)
+WARNING_POSSIBLE_TRUNCATION = _(
+ "Global clipboard size limit exceed.\n"
+ "qube: {vmname} attempted to send {size} bytes to global clipboard."
+ "\nGlobal clipboard might have been truncated.\n"
+ "Use qvm-copy to transfer large amounts of data between "
+ "qubes."
+)
+WARNING_EMPTY_CLIPBOARD = _(
+ "Empty source qube clipboard.\n"
+ "qube: {vmname} attempted to send 0 bytes to global "
+ "clipboard."
+)
+MSG_COPY_SUCCESS = _(
+ "Clipboard contents fetched from qube: '{vmname}'\n"
+ "Copied {size} to the global clipboard.\n"
+ "Press {shortcut} in qube to paste to local clipboard."
+)
MSG_WIPED = _("\nGlobal clipboard has been wiped")
-MSG_PASTE_SUCCESS_METADATA = _( \
- "Global clipboard copied {size} to {vmname}.\n" \
- "Global clipboard has been wiped.\n" \
- "Paste normally in qube (e.g. Ctrl+V).")
-MSG_PASTE_SUCCESS_LEGACY = _( \
- "Global clipboard copied to qube and wiped.\n" \
- "Paste normally in qube (e.g. Ctrl+V).")
+MSG_PASTE_SUCCESS_METADATA = _(
+ "Global clipboard copied {size} to {vmname}.\n"
+ "Global clipboard has been wiped.\n"
+ "Paste normally in qube (e.g. Ctrl+V)."
+)
+MSG_PASTE_SUCCESS_LEGACY = _(
+ "Global clipboard copied to qube and wiped.\n"
+ "Paste normally in qube (e.g. Ctrl+V)."
+)
+
@contextlib.contextmanager
def appviewer_lock():
@@ -106,62 +114,76 @@ def appviewer_lock():
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)
+
class EventHandler(pyinotify.ProcessEvent):
# pylint: disable=arguments-differ
def my_init(self, loop=None, gtk_app=None):
- ''' This method is called from ProcessEvent.__init__(). '''
+ """This method is called from ProcessEvent.__init__()."""
self.gtk_app = gtk_app
self.loop = loop if loop else asyncio.get_event_loop()
def _copy(self, metadata: dict) -> None:
- ''' Sends Copy notification via Gio.Notification
- '''
+ """Sends Copy notification via Gio.Notification"""
size = clipboard_formatted_size(metadata["sent_size"])
if metadata["malformed_request"]:
body = ERROR_MALFORMED_DATA.format(vmname=metadata["vmname"])
icon = "dialog-error"
- elif metadata["qrexec_clipboard"] and \
- metadata["sent_size"] >= metadata["buffer_size"]:
+ elif (
+ metadata["qrexec_clipboard"]
+ and metadata["sent_size"] >= metadata["buffer_size"]
+ ):
# Microsoft Windows clipboard case
body = WARNING_POSSIBLE_TRUNCATION.format(
- vmname=metadata["vmname"], size=size)
+ vmname=metadata["vmname"], size=size
+ )
icon = "dialog-warning"
elif metadata["oversized_request"]:
- body = ERROR_OVERSIZED_DATA.format(vmname=metadata["vmname"], \
- size=size, \
- limit=clipboard_formatted_size(metadata["buffer_size"]))
+ body = ERROR_OVERSIZED_DATA.format(
+ vmname=metadata["vmname"],
+ size=size,
+ limit=clipboard_formatted_size(metadata["buffer_size"]),
+ )
icon = "dialog-error"
- elif metadata["successful"] and metadata["cleared"] and \
- metadata["sent_size"] == 0:
+ elif (
+ metadata["successful"]
+ and metadata["cleared"]
+ and metadata["sent_size"] == 0
+ ):
body = WARNING_EMPTY_CLIPBOARD.format(vmname=metadata["vmname"])
icon = "dialog-warning"
elif not metadata["successful"]:
body = ERROR_ON_COPY.format(vmname=metadata["vmname"])
icon = "dialog-error"
else:
- body = MSG_COPY_SUCCESS.format(vmname=metadata["vmname"], \
- size=size, shortcut=self.gtk_app.paste_shortcut)
+ body = MSG_COPY_SUCCESS.format(
+ vmname=metadata["vmname"],
+ size=size,
+ shortcut=self.gtk_app.paste_shortcut,
+ )
icon = "dialog-information"
if metadata["cleared"]:
body += MSG_WIPED
- self.gtk_app.update_clipboard_contents(metadata["vmname"], size,
- message=body, icon=icon)
+ self.gtk_app.update_clipboard_contents(
+ metadata["vmname"], size, message=body, icon=icon
+ )
def _paste(self, metadata: dict) -> None:
- ''' Sends Paste notification via Gio.Notification.
- '''
+ """Sends Paste notification via Gio.Notification."""
if not metadata["successful"] or metadata["malformed_request"]:
body = ERROR_ON_PASTE.format(vmname=metadata["vmname"])
body += MSG_WIPED
icon = "dialog-error"
- elif "protocol_version_xside" in metadata.keys() and \
- metadata["protocol_version_xside"] >= 0x00010008:
- body = MSG_PASTE_SUCCESS_METADATA.format( \
- size=clipboard_formatted_size(metadata["sent_size"]), \
- vmname=metadata["vmname"])
+ elif (
+ "protocol_version_xside" in metadata.keys()
+ and metadata["protocol_version_xside"] >= 0x00010008
+ ):
+ body = MSG_PASTE_SUCCESS_METADATA.format(
+ size=clipboard_formatted_size(metadata["sent_size"]),
+ vmname=metadata["vmname"],
+ )
icon = "dialog-information"
else:
body = MSG_PASTE_SUCCESS_LEGACY
@@ -169,13 +191,15 @@ def _paste(self, metadata: dict) -> None:
self.gtk_app.update_clipboard_contents(message=body, icon=icon)
def process_IN_CLOSE_WRITE(self, _unused=None):
- ''' Reacts to modifications of the FROM file '''
+ """Reacts to modifications of the FROM file"""
metadata = {}
with appviewer_lock():
- if os.path.isfile(METADATA):
+ if os.path.isfile(METADATA) and os.path.getmtime(
+ METADATA
+ ) >= os.path.getmtime(DATA):
# parse JSON .metadata file if qubes-guid protocol 1.8 or newer
try:
- with open(METADATA, 'r', encoding='ascii') as metadata_file:
+ with open(METADATA, "r", encoding="ascii") as metadata_file:
metadata = json.loads(metadata_file.read())
except OSError:
return
@@ -184,8 +208,8 @@ def process_IN_CLOSE_WRITE(self, _unused=None):
else:
# revert to .source file on qubes-guid protocol 1.7 or older
# synthesize metadata based on limited available information
- with open(FROM, 'r', encoding='ascii') as vm_from_file:
- metadata["vmname"] = vm_from_file.readline().strip('\n')
+ with open(FROM, "r", encoding="ascii") as vm_from_file:
+ metadata["vmname"] = vm_from_file.readline().strip("\n")
metadata["copy_action"] = metadata["vmname"] != ""
metadata["paste_action"] = metadata["vmname"] == ""
@@ -196,7 +220,7 @@ def process_IN_CLOSE_WRITE(self, _unused=None):
metadata["sent_size"] = 0
metadata["cleared"] = metadata["sent_size"] == 0
- metadata["qrexec_request"] = False
+ metadata["qrexec_clipboard"] = False
metadata["malformed_request"] = False
metadata["oversized_request"] = metadata["sent_size"] >= 65000
metadata["buffer_size"] = 65000
@@ -212,11 +236,11 @@ def process_IN_CLOSE_WRITE(self, _unused=None):
self._paste(metadata=metadata)
def process_IN_MOVE_SELF(self, _unused):
- ''' Stop loop if file is moved '''
+ """Stop loop if file is moved"""
self.loop.stop()
def process_IN_DELETE(self, _unused):
- ''' Stop loop if file is deleted '''
+ """Stop loop if file is deleted"""
self.loop.stop()
def process_IN_CREATE(self, event):
@@ -226,7 +250,7 @@ def process_IN_CREATE(self, event):
def clipboard_formatted_size(size: int = None) -> str:
- units = ['B', 'KiB', 'MiB', 'GiB']
+ units = ["B", "KiB", "MiB", "GiB"]
try:
if size:
@@ -234,22 +258,25 @@ def clipboard_formatted_size(size: int = None) -> str:
else:
file_size = os.path.getsize(DATA)
except OSError:
- return _('? bytes')
+ return _("? bytes")
if file_size == 1:
- formatted_bytes = _('1 byte')
+ formatted_bytes = _("1 byte")
else:
- formatted_bytes = str(file_size) + _(' bytes')
+ formatted_bytes = str(file_size) + _(" bytes")
if file_size > 0:
magnitude = min(
- int(math.log(file_size) / math.log(2) * 0.1), len(units) - 1)
+ int(math.log(file_size) / math.log(2) * 0.1), len(units) - 1
+ )
if magnitude > 0:
# pylint: disable=consider-using-f-string
- return '%s (%.1f %s)' % (formatted_bytes,
- file_size / (2.0**(10 * magnitude)),
- units[magnitude])
+ return "%s (%.1f %s)" % (
+ formatted_bytes,
+ file_size / (2.0 ** (10 * magnitude)),
+ units[magnitude],
+ )
# pylint: disable=consider-using-f-string
- return '%s' % (formatted_bytes)
+ return "%s" % (formatted_bytes)
class NotificationApp(Gtk.Application):
@@ -263,11 +290,14 @@ def __init__(self, wm, qapp, dispatcher, **properties):
self.dispatcher = dispatcher
self.icon = Gtk.StatusIcon()
- self.icon.set_from_icon_name('edit-copy')
+ self.icon.set_from_icon_name("edit-copy")
self.icon.set_tooltip_markup(
- _('Global Clipboard\nInformation about the current'
- ' state of the global clipboard.'))
- self.icon.connect('button-press-event', self.show_menu)
+ _(
+ "Global Clipboard\nInformation about the current"
+ " state of the global clipboard."
+ )
+ )
+ self.icon.connect("button-press-event", self.show_menu)
self.menu = Gtk.Menu()
self.clipboard_label = Gtk.Label(xalign=0)
@@ -281,16 +311,19 @@ def __init__(self, wm, qapp, dispatcher, **properties):
if not os.path.exists(FROM):
# pylint: disable=no-member
- self.temporary_watch = \
- self.wm.add_watch(FROM_DIR, pyinotify.IN_CREATE, rec=False)
+ self.temporary_watch = self.wm.add_watch(
+ FROM_DIR, pyinotify.IN_CREATE, rec=False
+ )
else:
self.setup_watcher()
for feature in [COPY_FEATURE, PASTE_FEATURE]:
- self.dispatcher.add_handler(f'domain-feature-set:{feature}',
- self.setup_ui)
- self.dispatcher.add_handler(f'domain-feature-delete:{feature}',
- self.setup_ui)
+ self.dispatcher.add_handler(
+ f"domain-feature-set:{feature}", self.setup_ui
+ )
+ self.dispatcher.add_handler(
+ f"domain-feature-delete:{feature}", self.setup_ui
+ )
def setup_watcher(self):
if self.temporary_watch:
@@ -301,35 +334,43 @@ def setup_watcher(self):
def show_menu(self, _unused, event):
self.menu.show_all()
- self.menu.popup(None, # parent_menu_shell
- None, # parent_menu_item
- None, # func
- None, # data
- event.button, # button
- Gtk.get_current_event_time()) # activate_time
-
- def update_clipboard_contents(self, vm=None, size=0, message=None, \
- icon=None):
+ self.menu.popup(
+ None, # parent_menu_shell
+ None, # parent_menu_item
+ None, # func
+ None, # data
+ event.button, # button
+ Gtk.get_current_event_time(),
+ ) # activate_time
+
+ def update_clipboard_contents(
+ self, vm=None, size=0, message=None, icon=None
+ ):
if not vm or not size:
- self.clipboard_label.set_markup(_(
- "Global clipboard is empty"))
+ self.clipboard_label.set_markup(
+ _("Global clipboard is empty")
+ )
self.icon.set_from_icon_name("edit-copy")
# todo the icon should be empty and full depending on state
else:
self.clipboard_label.set_markup(
- _("Global clipboard contents: {0} from "
- "{1}").format(size, vm))
+ _(
+ "Global clipboard contents: {0} from {1}"
+ ).format(size, vm)
+ )
self.icon.set_from_icon_name("edit-copy")
if message:
self.send_notify(message, icon=icon)
def setup_ui(self, *_args, **_kwargs):
- self.copy_shortcut = self._prettify_shortcut(self.vm.features.get(
- COPY_FEATURE, 'Ctrl-Shift-C'))
- self.paste_shortcut = self._prettify_shortcut(self.vm.features.get(
- PASTE_FEATURE, 'Ctrl-Shift-V'))
+ self.copy_shortcut = self._prettify_shortcut(
+ self.vm.features.get(COPY_FEATURE, "Ctrl-Shift-C")
+ )
+ self.paste_shortcut = self._prettify_shortcut(
+ self.vm.features.get(PASTE_FEATURE, "Ctrl-Shift-V")
+ )
self.menu = Gtk.Menu()
@@ -350,9 +391,11 @@ def setup_ui(self, *_args, **_kwargs):
help_label = Gtk.Label(xalign=0)
help_label.set_markup(
- _("Use {copy} to copy and "
- "{paste} to paste.").format(
- copy=self.copy_shortcut, paste=self.paste_shortcut))
+ _(
+ "Use {copy} to copy and "
+ "{paste} to paste."
+ ).format(copy=self.copy_shortcut, paste=self.paste_shortcut)
+ )
help_item = Gtk.MenuItem()
help_item.set_margin_left(10)
help_item.set_sensitive(False)
@@ -362,7 +405,7 @@ def setup_ui(self, *_args, **_kwargs):
self.menu.append(Gtk.SeparatorMenuItem())
dom0_item = Gtk.MenuItem(_("Copy dom0 clipboard"))
- dom0_item.connect('activate', self.copy_dom0_clipboard)
+ dom0_item.connect("activate", self.copy_dom0_clipboard)
self.menu.append(dom0_item)
def copy_dom0_clipboard(self, *_args, **_kwargs):
@@ -370,40 +413,45 @@ def copy_dom0_clipboard(self, *_args, **_kwargs):
text = clipboard.wait_for_text()
if not text:
- self.send_notify(_("Dom0 clipboard is empty!"), \
- icon="dialog-information")
+ self.send_notify(
+ _("Dom0 clipboard is empty!"), icon="dialog-information"
+ )
return
try:
with appviewer_lock():
- with open(DATA, "w", encoding='utf-8') as contents:
+ with open(DATA, "w", encoding="utf-8") as contents:
contents.write(text)
- with open(FROM, "w", encoding='ascii') as source:
+ with open(FROM, "w", encoding="ascii") as source:
source.write("dom0")
- with open(XEVENT, "w", encoding='ascii') as timestamp:
+ with open(XEVENT, "w", encoding="ascii") as timestamp:
timestamp.write(str(Gtk.get_current_event_time()))
- with open(METADATA, "w", encoding='ascii') as metadata:
+ with open(METADATA, "w", encoding="ascii") as metadata:
metadata.write(
- "{{\n" \
- '"vmname":"dom0",\n' \
- '"xevent_timestamp":{xevent_timestamp},\n' \
- '"successful":1,\n' \
- '"copy_action":1,\n' \
- '"paste_action":0,\n' \
- '"malformed_request":0,\n' \
- '"cleared":0,\n' \
- '"qrexec_clipboard":0,\n' \
- '"sent_size":{sent_size},\n' \
- '"buffer_size":{buffer_size},\n' \
- '"protocol_version_xside":65544,\n' \
- '"protocol_version_vmside":65544,\n' \
- '}}\n'.format(xevent_timestamp= \
- str(Gtk.get_current_event_time()), \
- sent_size=os.path.getsize(DATA), \
- buffer_size="256000"))
+ "{{\n"
+ '"vmname":"dom0",\n'
+ '"xevent_timestamp":{xevent_timestamp},\n'
+ '"successful":1,\n'
+ '"copy_action":1,\n'
+ '"paste_action":0,\n'
+ '"malformed_request":0,\n'
+ '"cleared":0,\n'
+ '"qrexec_clipboard":0,\n'
+ '"sent_size":{sent_size},\n'
+ '"buffer_size":{buffer_size},\n'
+ '"protocol_version_xside":65544,\n'
+ '"protocol_version_vmside":65544,\n'
+ "}}\n".format(
+ xevent_timestamp=str(Gtk.get_current_event_time()),
+ sent_size=os.path.getsize(DATA),
+ buffer_size="256000",
+ )
+ )
except Exception: # pylint: disable=broad-except
- self.send_notify(_("Error while accessing global clipboard!"), \
- icon = "dialog-error")
+ self.send_notify(
+ _("Error while accessing global clipboard!"),
+ icon="dialog-error",
+ )
def send_notify(self, body, icon=None):
# pylint: disable=attribute-defined-outside-init
@@ -417,19 +465,20 @@ def send_notify(self, body, icon=None):
def _prettify_shortcut(self, shortcut: str):
"""Turn a keyboard shortcut into a nicer, more readable version,
e.g. convert 'Ctrl-Mod4-c' into 'Ctrl-Win-C'"""
- parts = shortcut.split('-')
+ parts = shortcut.split("-")
return "+".join([self._convert_to_readable(part) for part in parts])
@staticmethod
def _convert_to_readable(key: str):
- if key == 'Mod4':
- return 'Win'
- if key == 'Ins':
+ if key == "Mod4":
+ return "Win"
+ if key == "Ins":
return "Insert"
if len(key) == 1:
return key.upper()
return key
+
def main():
loop = asyncio.get_event_loop()
wm = pyinotify.WatchManager()
@@ -442,9 +491,12 @@ def main():
handler = EventHandler(loop=loop, gtk_app=gtk_app)
pyinotify.AsyncioNotifier(wm, loop, default_proc_fun=handler)
- return run_asyncio_and_show_errors(loop, [asyncio.ensure_future(
- dispatcher.listen_for_events())], _("Qubes Clipboard Widget"))
+ return run_asyncio_and_show_errors(
+ loop,
+ [asyncio.ensure_future(dispatcher.listen_for_events())],
+ _("Qubes Clipboard Widget"),
+ )
-if __name__ == '__main__':
+if __name__ == "__main__":
main()