gpu_monitor / gpu_reader.py
meccatronis's picture
Upload gpu_reader.py with huggingface_hub
1170bc5 verified
#!/usr/bin/env python3
"""
GPU Reader for AMD Radeon Pro VII
Reads GPU sensor data using rocm-smi and sysfs fallback
"""
import subprocess
import os
import glob
import time
from typing import Dict, Optional, Tuple
class GPUReader:
def __init__(self):
self.base_path = "/sys/class/drm/card1/device"
self._find_hwmon_path()
def _find_hwmon_path(self):
"""Find the hwmon path for AMD GPU"""
self.hwmon_path = None
hwmon_base = os.path.join(self.base_path, "hwmon")
if os.path.exists(hwmon_base):
hwmons = os.listdir(hwmon_base)
if hwmons:
self.hwmon_path = os.path.join(hwmon_base, hwmons[0])
def read_file(self, path: str) -> Optional[str]:
"""Safely read a file and return its content"""
if not path or not os.path.exists(path):
return None
try:
with open(path, 'r') as f:
return f.read().strip()
except Exception:
return None
def get_gpu_usage(self) -> Optional[float]:
"""Get GPU usage percentage using rocm-smi"""
try:
result = subprocess.run(['rocm-smi', '--showuse'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'GPU use' in line:
usage = line.split(':')[1].strip().replace('%', '')
return float(usage)
except:
# Fallback to sysfs
usage = self.read_file(os.path.join(self.base_path, "gpu_busy_percent"))
if usage:
return float(usage)
return None
def get_vram_usage(self) -> Tuple[Optional[int], Optional[int]]:
"""Get VRAM usage (used, total) in MB"""
try:
# Try rocm-smi first
result = subprocess.run(['rocm-smi', '--showmeminfo', 'vram'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
used_mb, total_mb = None, None
for line in lines:
if 'VRAM Total' in line:
total_mb = int(line.split(':')[1].strip().replace('MB', ''))
elif 'VRAM Used' in line:
used_mb = int(line.split(':')[1].strip().replace('MB', ''))
if used_mb is not None and total_mb is not None:
return used_mb, total_mb
except:
pass
# Fallback to sysfs
vram_used = self.read_file(os.path.join(self.base_path, "mem_info_vram_used"))
vram_total = self.read_file(os.path.join(self.base_path, "mem_info_vram_total"))
if vram_used and vram_total:
used_mb = int(vram_used) // (1024*1024)
total_mb = int(vram_total) // (1024*1024)
return used_mb, total_mb
return None, None
def get_temperature(self) -> Optional[float]:
"""Get GPU temperature in Celsius"""
if self.hwmon_path:
temp_raw = self.read_file(os.path.join(self.hwmon_path, "temp1_input"))
if temp_raw:
return int(temp_raw) // 1000
# Fallback to rocm-smi
try:
result = subprocess.run(['rocm-smi', '--showtemp'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'Temperature' in line and 'GPU' in line:
temp_str = line.split(':')[1].strip().replace('c', '').replace('C', '')
return float(temp_str)
except:
pass
return None
def get_power_draw(self) -> Optional[float]:
"""Get GPU power draw in Watts"""
if self.hwmon_path:
power_raw = self.read_file(os.path.join(self.hwmon_path, "power1_input"))
if power_raw:
return int(power_raw) // 1000000
# Fallback to rocm-smi
try:
result = subprocess.run(['rocm-smi', '--showpower'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'Average Graphics Package Power' in line:
power_str = line.split(':')[1].strip().replace('W', '')
return float(power_str)
except:
pass
return None
def get_fan_speed(self) -> Tuple[Optional[int], Optional[int]]:
"""Get fan speed (RPM, PWM percentage)"""
rpm, pwm_pct = None, None
if self.hwmon_path:
# Get PWM value
fan_pwm = self.read_file(os.path.join(self.hwmon_path, "pwm1"))
if fan_pwm:
pwm_pct = (int(fan_pwm) * 100) // 255
# Get RPM
fan_rpm = self.read_file(os.path.join(self.hwmon_path, "fan1_input"))
if fan_rpm:
rpm = int(fan_rpm)
# Fallback to rocm-smi if needed
if rpm is None or pwm_pct is None:
try:
result = subprocess.run(['rocm-smi', '--showfan'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'Fan Speed' in line:
# Parse fan info
pass # Implementation would depend on exact output format
except:
pass
return rpm, pwm_pct
def get_clocks(self) -> Tuple[Optional[int], Optional[int]]:
"""Get GPU clocks (core, memory) in MHz"""
core_clock, mem_clock = None, None
# Try parsing sysfs
sclk_raw = self.read_file(os.path.join(self.base_path, "pp_dpm_sclk"))
mclk_raw = self.read_file(os.path.join(self.base_path, "pp_dpm_mclk"))
if sclk_raw:
for line in sclk_raw.split('\n'):
if '*' in line:
core_clock = int(line.split(':')[1].strip().split(' ')[0].replace('Mhz', ''))
break
if mclk_raw:
for line in mclk_raw.split('\n'):
if '*' in line:
mem_clock = int(line.split(':')[1].strip().split(' ')[0].replace('Mhz', ''))
break
# Fallback to rocm-smi
if core_clock is None or mem_clock is None:
try:
result = subprocess.run(['rocm-smi', '--showclocks'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'GPU clock' in line and core_clock is None:
core_str = line.split(':')[1].strip().replace('Mhz', '').replace('MHz', '')
core_clock = int(float(core_str))
elif 'Memory clock' in line and mem_clock is None:
mem_str = line.split(':')[1].strip().replace('Mhz', '').replace('MHz', '')
mem_clock = int(float(mem_str))
except:
pass
return core_clock, mem_clock
def get_all_data(self) -> Dict[str, any]:
"""Get all GPU data in a single call"""
data = {
'gpu_usage': self.get_gpu_usage(),
'vram_used': None,
'vram_total': None,
'temperature': self.get_temperature(),
'power_draw': self.get_power_draw(),
'fan_rpm': None,
'fan_pwm': None,
'core_clock': None,
'mem_clock': None
}
# Get VRAM usage
vram_used, vram_total = self.get_vram_usage()
data['vram_used'] = vram_used
data['vram_total'] = vram_total
# Get fan speed
fan_rpm, fan_pwm = self.get_fan_speed()
data['fan_rpm'] = fan_rpm
data['fan_pwm'] = fan_pwm
# Get clocks
core_clock, mem_clock = self.get_clocks()
data['core_clock'] = core_clock
data['mem_clock'] = mem_clock
return data
if __name__ == "__main__":
# Test the GPU reader
reader = GPUReader()
data = reader.get_all_data()
print("GPU Data:")
for key, value in data.items():
print(f" {key}: {value}")