#!/usr/bin/env python # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """ A clone of top / htop. Author: Giampaolo Rodola' $ python examples/top.py CPU0 [| ] 4.9% CPU1 [||| ] 7.8% CPU2 [ ] 2.0% CPU3 [||||| ] 13.9% Mem [||||||||||||||||||| ] 49.8% 4920M/9888M Swap [ ] 0.0% 0M/0M Processes: 287 (running=1 sleeping=286) Load average: 0.34 0.54 0.46 Uptime: 3 days, 10:16:37 PID USER NI VIRT RES CPU% MEM% TIME+ NAME ------------------------------------------------------------ 989 giampaol 0 66M 12M 7.4 0.1 0:00.61 python 2083 root 0 506M 159M 6.5 1.6 0:29.26 Xorg 4503 giampaol 0 599M 25M 6.5 0.3 3:32.60 gnome-terminal 3868 giampaol 0 358M 8M 2.8 0.1 23:12.60 pulseaudio 3936 giampaol 0 1G 111M 2.8 1.1 33:41.67 compiz 4401 giampaol 0 536M 141M 2.8 1.4 35:42.73 skype 4047 giampaol 0 743M 76M 1.8 0.8 42:03.33 unity-panel-service 13155 giampaol 0 1G 280M 1.8 2.8 41:57.34 chrome 10 root 0 0B 0B 0.9 0.0 4:01.81 rcu_sched 339 giampaol 0 1G 113M 0.9 1.1 8:15.73 chrome ... """ from datetime import datetime, timedelta import atexit import os import time import sys try: import curses except ImportError: sys.exit('platform not supported') import psutil # --- curses stuff def tear_down(): win.keypad(0) curses.nocbreak() curses.echo() curses.endwin() win = curses.initscr() atexit.register(tear_down) curses.endwin() lineno = 0 def print_line(line, highlight=False): """A thin wrapper around curses's addstr().""" global lineno try: if highlight: line += " " * (win.getmaxyx()[1] - len(line)) win.addstr(lineno, 0, line, curses.A_REVERSE) else: win.addstr(lineno, 0, line, 0) except curses.error: lineno = 0 win.refresh() raise else: lineno += 1 # --- /curses stuff def bytes2human(n): """ >>> bytes2human(10000) '9K' >>> bytes2human(100001221) '95M' """ symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') prefix = {} for i, s in enumerate(symbols): prefix[s] = 1 << (i + 1) * 10 for s in reversed(symbols): if n >= prefix[s]: value = int(float(n) / prefix[s]) return '%s%s' % (value, s) return "%sB" % n def poll(interval): # sleep some time time.sleep(interval) procs = [] procs_status = {} for p in psutil.process_iter(): try: p.dict = p.as_dict(['username', 'nice', 'memory_info', 'memory_percent', 'cpu_percent', 'cpu_times', 'name', 'status']) try: procs_status[p.dict['status']] += 1 except KeyError: procs_status[p.dict['status']] = 1 except psutil.NoSuchProcess: pass else: procs.append(p) # return processes sorted by CPU percent usage processes = sorted(procs, key=lambda p: p.dict['cpu_percent'], reverse=True) return (processes, procs_status) def print_header(procs_status, num_procs): """Print system-related info, above the process list.""" def get_dashes(perc): dashes = "|" * int((float(perc) / 10 * 4)) empty_dashes = " " * (40 - len(dashes)) return dashes, empty_dashes # cpu usage percs = psutil.cpu_percent(interval=0, percpu=True) for cpu_num, perc in enumerate(percs): dashes, empty_dashes = get_dashes(perc) print_line(" CPU%-2s [%s%s] %5s%%" % (cpu_num, dashes, empty_dashes, perc)) mem = psutil.virtual_memory() dashes, empty_dashes = get_dashes(mem.percent) used = mem.total - mem.available line = " Mem [%s%s] %5s%% %6s/%s" % ( dashes, empty_dashes, mem.percent, str(int(used / 1024 / 1024)) + "M", str(int(mem.total / 1024 / 1024)) + "M" ) print_line(line) # swap usage swap = psutil.swap_memory() dashes, empty_dashes = get_dashes(swap.percent) line = " Swap [%s%s] %5s%% %6s/%s" % ( dashes, empty_dashes, swap.percent, str(int(swap.used / 1024 / 1024)) + "M", str(int(swap.total / 1024 / 1024)) + "M" ) print_line(line) # processes number and status st = [] for x, y in procs_status.items(): if y: st.append("%s=%s" % (x, y)) st.sort(key=lambda x: x[:3] in ('run', 'sle'), reverse=1) print_line(" Processes: %s (%s)" % (num_procs, ' '.join(st))) # load average, uptime uptime = datetime.utcnow() - datetime.fromtimestamp(psutil.boot_time()) av1, av2, av3 = os.getloadavg() line = " Load average: %.2f %.2f %.2f Uptime: %s" \ % (av1, av2, av3, str(uptime).split('.')[0]) print_line(line) def refresh_window(procs, procs_status): """Print results on screen by using curses.""" curses.endwin() templ = "%-6s %-8s %4s %5s %5s %6s %4s %9s %2s" win.erase() header = templ % ("PID", "USER", "NI", "VIRT", "RES", "CPU%", "MEM%", "TIME+", "NAME") print_header(procs_status, len(procs)) print_line("") print_line(header, highlight=True) for p in procs: # TIME+ column shows process CPU cumulative time and it # is expressed as: "mm:ss.ms" if p.dict['cpu_times'] is not None: ctime = timedelta(seconds=sum(p.dict['cpu_times'])) ctime = "%s:%s.%s" % (ctime.seconds // 60 % 60, str((ctime.seconds % 60)).zfill(2), str(ctime.microseconds)[:2]) else: ctime = '' if p.dict['memory_percent'] is not None: p.dict['memory_percent'] = round(p.dict['memory_percent'], 1) else: p.dict['memory_percent'] = '' if p.dict['cpu_percent'] is None: p.dict['cpu_percent'] = '' if p.dict['username']: username = p.dict['username'][:8] else: username = "" line = templ % (p.pid, username, p.dict['nice'], bytes2human(getattr(p.dict['memory_info'], 'vms', 0)), bytes2human(getattr(p.dict['memory_info'], 'rss', 0)), p.dict['cpu_percent'], p.dict['memory_percent'], ctime, p.dict['name'] or '', ) try: print_line(line) except curses.error: break win.refresh() def main(): try: interval = 0 while True: args = poll(interval) refresh_window(*args) interval = 1 except (KeyboardInterrupt, SystemExit): pass if __name__ == '__main__': main()