import ctypes
from ctypes import byref
from ctypes import Structure, Union
from ctypes.wintypes import *
import threading
__author__ = 'Shao-chuan Wang'
LONGLONG = ctypes.c_longlong
HQUERY = HCOUNTER = HANDLE
pdh = ctypes.windll.pdh
Error_Success = 0x00000000
class PDH_Counter_Union(Union):
_fields_ = [('longValue', LONG),
('doubleValue', ctypes.c_double),
('largeValue', LONGLONG),
('AnsiStringValue', LPCSTR),
('WideStringValue', LPCWSTR)]
class PDH_FMT_COUNTERVALUE(Structure):
_fields_ = [('CStatus', DWORD),
('union', PDH_Counter_Union),]
g_cpu_usage = 0
class QueryCPUUsageThread(threading.Thread):
def __init__(self):
super(QueryCPUUsageThread, self).__init__()
self.hQuery = HQUERY()
self.hCounter = HCOUNTER()
if not pdh.PdhOpenQueryW(None,
0,
byref(self.hQuery)) == Error_Success:
raise Exception
if not pdh.PdhAddCounterW(self.hQuery,
u'''\\Processor(_Total)\\% Processor Time''',
0,
byref(self.hCounter)) == Error_Success:
raise Exception
def run(self):
while True:
global g_cpu_usage
g_cpu_usage = self.getCPUUsage()
print 'cpu_usage: %d' % g_cpu_usage
def getCPUUsage(self):
PDH_FMT_LONG = 0x00000100
if not pdh.PdhCollectQueryData(self.hQuery) == Error_Success:
raise Exception
ctypes.windll.kernel32.Sleep(1000)
if not pdh.PdhCollectQueryData(self.hQuery) == Error_Success:
raise Exception
dwType = DWORD(0)
value = PDH_FMT_COUNTERVALUE()
if not pdh.PdhGetFormattedCounterValue(self.hCounter,
PDH_FMT_LONG,
byref(dwType),
byref(value)) == Error_Success:
raise Exception
return value.union.longValue
if __name__=='__main__':
thread = QueryCPUUsageThread()
thread.start()