File size: 8,956 Bytes
1170bc5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 | #!/usr/bin/env python3
"""
GPU Reader for AMD Radeon Pro VII
Reads GPU sensor data using rocm-smi and sysfs fallback
"""
import subprocess
import os
import glob
import time
from typing import Dict, Optional, Tuple
class GPUReader:
def __init__(self):
self.base_path = "/sys/class/drm/card1/device"
self._find_hwmon_path()
def _find_hwmon_path(self):
"""Find the hwmon path for AMD GPU"""
self.hwmon_path = None
hwmon_base = os.path.join(self.base_path, "hwmon")
if os.path.exists(hwmon_base):
hwmons = os.listdir(hwmon_base)
if hwmons:
self.hwmon_path = os.path.join(hwmon_base, hwmons[0])
def read_file(self, path: str) -> Optional[str]:
"""Safely read a file and return its content"""
if not path or not os.path.exists(path):
return None
try:
with open(path, 'r') as f:
return f.read().strip()
except Exception:
return None
def get_gpu_usage(self) -> Optional[float]:
"""Get GPU usage percentage using rocm-smi"""
try:
result = subprocess.run(['rocm-smi', '--showuse'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'GPU use' in line:
usage = line.split(':')[1].strip().replace('%', '')
return float(usage)
except:
# Fallback to sysfs
usage = self.read_file(os.path.join(self.base_path, "gpu_busy_percent"))
if usage:
return float(usage)
return None
def get_vram_usage(self) -> Tuple[Optional[int], Optional[int]]:
"""Get VRAM usage (used, total) in MB"""
try:
# Try rocm-smi first
result = subprocess.run(['rocm-smi', '--showmeminfo', 'vram'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
used_mb, total_mb = None, None
for line in lines:
if 'VRAM Total' in line:
total_mb = int(line.split(':')[1].strip().replace('MB', ''))
elif 'VRAM Used' in line:
used_mb = int(line.split(':')[1].strip().replace('MB', ''))
if used_mb is not None and total_mb is not None:
return used_mb, total_mb
except:
pass
# Fallback to sysfs
vram_used = self.read_file(os.path.join(self.base_path, "mem_info_vram_used"))
vram_total = self.read_file(os.path.join(self.base_path, "mem_info_vram_total"))
if vram_used and vram_total:
used_mb = int(vram_used) // (1024*1024)
total_mb = int(vram_total) // (1024*1024)
return used_mb, total_mb
return None, None
def get_temperature(self) -> Optional[float]:
"""Get GPU temperature in Celsius"""
if self.hwmon_path:
temp_raw = self.read_file(os.path.join(self.hwmon_path, "temp1_input"))
if temp_raw:
return int(temp_raw) // 1000
# Fallback to rocm-smi
try:
result = subprocess.run(['rocm-smi', '--showtemp'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'Temperature' in line and 'GPU' in line:
temp_str = line.split(':')[1].strip().replace('c', '').replace('C', '')
return float(temp_str)
except:
pass
return None
def get_power_draw(self) -> Optional[float]:
"""Get GPU power draw in Watts"""
if self.hwmon_path:
power_raw = self.read_file(os.path.join(self.hwmon_path, "power1_input"))
if power_raw:
return int(power_raw) // 1000000
# Fallback to rocm-smi
try:
result = subprocess.run(['rocm-smi', '--showpower'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'Average Graphics Package Power' in line:
power_str = line.split(':')[1].strip().replace('W', '')
return float(power_str)
except:
pass
return None
def get_fan_speed(self) -> Tuple[Optional[int], Optional[int]]:
"""Get fan speed (RPM, PWM percentage)"""
rpm, pwm_pct = None, None
if self.hwmon_path:
# Get PWM value
fan_pwm = self.read_file(os.path.join(self.hwmon_path, "pwm1"))
if fan_pwm:
pwm_pct = (int(fan_pwm) * 100) // 255
# Get RPM
fan_rpm = self.read_file(os.path.join(self.hwmon_path, "fan1_input"))
if fan_rpm:
rpm = int(fan_rpm)
# Fallback to rocm-smi if needed
if rpm is None or pwm_pct is None:
try:
result = subprocess.run(['rocm-smi', '--showfan'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'Fan Speed' in line:
# Parse fan info
pass # Implementation would depend on exact output format
except:
pass
return rpm, pwm_pct
def get_clocks(self) -> Tuple[Optional[int], Optional[int]]:
"""Get GPU clocks (core, memory) in MHz"""
core_clock, mem_clock = None, None
# Try parsing sysfs
sclk_raw = self.read_file(os.path.join(self.base_path, "pp_dpm_sclk"))
mclk_raw = self.read_file(os.path.join(self.base_path, "pp_dpm_mclk"))
if sclk_raw:
for line in sclk_raw.split('\n'):
if '*' in line:
core_clock = int(line.split(':')[1].strip().split(' ')[0].replace('Mhz', ''))
break
if mclk_raw:
for line in mclk_raw.split('\n'):
if '*' in line:
mem_clock = int(line.split(':')[1].strip().split(' ')[0].replace('Mhz', ''))
break
# Fallback to rocm-smi
if core_clock is None or mem_clock is None:
try:
result = subprocess.run(['rocm-smi', '--showclocks'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
lines = result.stdout.split('\n')
for line in lines:
if 'GPU clock' in line and core_clock is None:
core_str = line.split(':')[1].strip().replace('Mhz', '').replace('MHz', '')
core_clock = int(float(core_str))
elif 'Memory clock' in line and mem_clock is None:
mem_str = line.split(':')[1].strip().replace('Mhz', '').replace('MHz', '')
mem_clock = int(float(mem_str))
except:
pass
return core_clock, mem_clock
def get_all_data(self) -> Dict[str, any]:
"""Get all GPU data in a single call"""
data = {
'gpu_usage': self.get_gpu_usage(),
'vram_used': None,
'vram_total': None,
'temperature': self.get_temperature(),
'power_draw': self.get_power_draw(),
'fan_rpm': None,
'fan_pwm': None,
'core_clock': None,
'mem_clock': None
}
# Get VRAM usage
vram_used, vram_total = self.get_vram_usage()
data['vram_used'] = vram_used
data['vram_total'] = vram_total
# Get fan speed
fan_rpm, fan_pwm = self.get_fan_speed()
data['fan_rpm'] = fan_rpm
data['fan_pwm'] = fan_pwm
# Get clocks
core_clock, mem_clock = self.get_clocks()
data['core_clock'] = core_clock
data['mem_clock'] = mem_clock
return data
if __name__ == "__main__":
# Test the GPU reader
reader = GPUReader()
data = reader.get_all_data()
print("GPU Data:")
for key, value in data.items():
print(f" {key}: {value}") |