diff options
Diffstat (limited to 'apt/progress/text.py')
| -rw-r--r-- | apt/progress/text.py | 261 |
1 files changed, 261 insertions, 0 deletions
diff --git a/apt/progress/text.py b/apt/progress/text.py new file mode 100644 index 00000000..5e45c1db --- /dev/null +++ b/apt/progress/text.py @@ -0,0 +1,261 @@ +# Copyright (c) 2009 Julian Andres Klode <jak@debian.org> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; either version 2 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +# USA +"""Progress reporting for text interfaces.""" +import sys + +import apt_pkg +from apt.progress import base + +__all__ = ['AcquireProgress', 'CdromProgress', 'OpProgress'] + + +def _(msg): + """Translate the message, also try apt if translation is missing.""" + res = apt_pkg.gettext(msg) + if res == msg: + res = apt_pkg.gettext(msg, "apt") + return res + + +class TextProgress(object): + """Internal Base class for text progress classes.""" + + def __init__(self, outfile=None): + self._file = outfile or sys.stdout + self._width = 0 + + def _write(self, msg, newline=True, maximize=False): + """Write the message on the terminal, fill remaining space.""" + self._file.write("\r") + self._file.write(msg) + + # Fill remaining stuff with whitespace + if self._width > len(msg): + self._file.write((self._width - len(msg)) * ' ') + elif maximize: # Needed for OpProgress. + self._width = max(self._width, len(msg)) + if newline: + self._file.write("\n") + else: + #self._file.write("\r") + self._file.flush() + + +class OpProgress(base.OpProgress, TextProgress): + """Operation progress reporting. + + This closely resembles OpTextProgress in libapt-pkg. + """ + + def __init__(self, outfile=None): + TextProgress.__init__(self, outfile) + base.OpProgress.__init__(self) + self.old_op = "" + + def update(self, percent=None): + """Called periodically to update the user interface.""" + base.OpProgress.update(self, percent) + if self.major_change and self.old_op: + self._write(self.old_op) + self._write("%s... %i%%\r" % (self.op, self.percent), False, True) + self.old_op = self.op + + def done(self): + """Called once an operation has been completed.""" + base.OpProgress.done(self) + if self.old_op: + self._write(_("%c%s... Done") % ('\r', self.old_op), True, True) + self.old_op = "" + + +class AcquireProgress(base.AcquireProgress, TextProgress): + """AcquireProgress for the text interface.""" + + def __init__(self, outfile=None): + TextProgress.__init__(self, outfile) + base.AcquireProgress.__init__(self) + self._signal = None + self._width = 80 + self._id = 1 + + def start(self): + """Start an Acquire progress. + + In this case, the function sets up a signal handler for SIGWINCH, i.e. + window resize signals. And it also sets id to 1. + """ + base.AcquireProgress.start(self) + import signal + self._signal = signal.signal(signal.SIGWINCH, self._winch) + # Get the window size. + self._winch() + self._id = 1L + + def _winch(self, *dummy): + """Signal handler for window resize signals.""" + import fcntl + import termios + import struct + buf = fcntl.ioctl(self._file, termios.TIOCGWINSZ, 8 * ' ') + dummy, col, dummy, dummy = struct.unpack('hhhh', buf) + self._width = col - 1 # 1 for the cursor + + def ims_hit(self, item): + """Called when an item is update (e.g. not modified on the server).""" + base.AcquireProgress.ims_hit(self, item) + line = _('Hit ') + item.description + if item.owner.filesize: + line += ' [%sB]' % apt_pkg.size_to_str(item.owner.filesize) + self._write(line) + + def fail(self, item): + """Called when an item is failed.""" + base.AcquireProgress.fail(self, item) + if item.owner.status == item.owner.STAT_DONE: + self._write(_("Ign ") + item.description) + else: + self._write(_("Err ") + item.description) + self._write(" %s" % item.owner.error_text) + + def fetch(self, item): + """Called when some of the item's data is fetched.""" + base.AcquireProgress.fetch(self, item) + # It's complete already (e.g. Hit) + if item.owner.complete: + return + item.owner.id = self._id + self._id += 1 + line = _("Get:") + "%s %s" % (item.owner.id, item.description) + if item.owner.filesize: + line += (" [%sB]" % apt_pkg.size_to_str(item.owner.filesize)) + + self._write(line) + + def pulse(self, owner): + """Periodically invoked while the Acquire process is underway. + + Return False if the user asked to cancel the whole Acquire process.""" + base.AcquireProgress.pulse(self, owner) + percent = (((self.current_bytes + self.current_items) * 100.0) / + float(self.total_bytes + self.total_items)) + + shown = False + tval = '%i%%' % percent + + end = "" + if self.current_cps: + eta = int(float(self.total_bytes - self.current_bytes) / + self.current_cps) + end = " %sB/s %s" % (apt_pkg.size_to_str(self.current_cps), + apt_pkg.time_to_str(eta)) + + for worker in owner.workers: + val = '' + if not worker.current_item: + if worker.status: + val = ' [%s]' % worker.status + if len(tval) + len(val) + len(end) >= self._width: + break + tval += val + shown = True + continue + shown = True + + if worker.current_item.owner.id: + val += " [%i %s" % (worker.current_item.owner.id, + worker.current_item.shortdesc) + else: + val += ' [%s' % worker.current_item.description + if worker.current_item.owner.mode: + val += ' %s' % worker.current_item.owner.mode + + val += ' %sB' % apt_pkg.size_to_str(worker.current_size) + + # Add the total size and percent + if worker.total_size and not worker.current_item.owner.complete: + val += "/%sB %i%%" % (apt_pkg.size_to_str(worker.total_size), + worker.current_size*100.0/worker.total_size) + + val += ']' + + if len(tval) + len(val) + len(end) >= self._width: + # Display as many items as screen width + break + else: + tval += val + + if not shown: + tval += _(" [Working]") + + if self.current_cps: + tval += (self._width - len(end) - len(tval)) * ' ' + end + + self._write(tval, False) + return True + + def media_change(self, medium, drive): + """Prompt the user to change the inserted removable media.""" + base.AcquireProgress.media_change(self, medium, drive) + self._write(_("Media change: please insert the disc labeled\n" + " '%s'\n" + "in the drive '%s' and press enter\n") % (medium, drive)) + return raw_input() not in ('c', 'C') + + def stop(self): + """Invoked when the Acquire process stops running.""" + base.AcquireProgress.stop(self) + # Trick for getting a translation from apt + self._write((_("Fetched %sB in %s (%sB/s)\n") % ( + apt_pkg.size_to_str(self.fetched_bytes), + apt_pkg.time_to_str(self.elapsed_time), + apt_pkg.size_to_str(self.current_cps))).rstrip("\n")) + + # Delete the signal again. + import signal + signal.signal(signal.SIGWINCH, self._signal) + + +class CdromProgress(base.CdromProgress, TextProgress): + """Text CD-ROM progress.""" + + def ask_cdrom_name(self): + """Ask the user to provide a name for the disc.""" + base.CdromProgress.ask_cdrom_name(self) + self._write(_("Please provide a name for this Disc, such as " + "'Debian 2.1r1 Disk 1'"), False) + try: + return raw_input(":") + except KeyboardInterrupt: + return + + def update(self, text, current): + """Set the current progress.""" + base.CdromProgress.update(self, text, current) + if text: + self._write(text, False) + + def change_cdrom(self): + """Ask the user to change the CD-ROM.""" + base.CdromProgress.change_cdrom(self) + self._write(_("Please insert a Disc in the drive and press enter"), + False) + try: + return (raw_input() == '') + except KeyboardInterrupt: + return False + +InstallProgress = base.InstallProgress |
