| | |
| | """ |
| | GPU Reader for AMD Radeon Pro VII |
| | Reads GPU sensor data using rocm-smi and sysfs fallback |
| | """ |
| |
|
| | import subprocess |
| | import os |
| | import glob |
| | import time |
| | from typing import Dict, Optional, Tuple |
| |
|
| | class GPUReader: |
| | def __init__(self): |
| | self.base_path = "/sys/class/drm/card1/device" |
| | self._find_hwmon_path() |
| | |
| | def _find_hwmon_path(self): |
| | """Find the hwmon path for AMD GPU""" |
| | self.hwmon_path = None |
| | hwmon_base = os.path.join(self.base_path, "hwmon") |
| | if os.path.exists(hwmon_base): |
| | hwmons = os.listdir(hwmon_base) |
| | if hwmons: |
| | self.hwmon_path = os.path.join(hwmon_base, hwmons[0]) |
| | |
| | def read_file(self, path: str) -> Optional[str]: |
| | """Safely read a file and return its content""" |
| | if not path or not os.path.exists(path): |
| | return None |
| | try: |
| | with open(path, 'r') as f: |
| | return f.read().strip() |
| | except Exception: |
| | return None |
| | |
| | def get_gpu_usage(self) -> Optional[float]: |
| | """Get GPU usage percentage using rocm-smi""" |
| | try: |
| | result = subprocess.run(['rocm-smi', '--showuse'], |
| | capture_output=True, text=True, timeout=5) |
| | if result.returncode == 0: |
| | lines = result.stdout.split('\n') |
| | for line in lines: |
| | if 'GPU use' in line: |
| | usage = line.split(':')[1].strip().replace('%', '') |
| | return float(usage) |
| | except: |
| | |
| | usage = self.read_file(os.path.join(self.base_path, "gpu_busy_percent")) |
| | if usage: |
| | return float(usage) |
| | return None |
| | |
| | def get_vram_usage(self) -> Tuple[Optional[int], Optional[int]]: |
| | """Get VRAM usage (used, total) in MB""" |
| | try: |
| | |
| | result = subprocess.run(['rocm-smi', '--showmeminfo', 'vram'], |
| | capture_output=True, text=True, timeout=5) |
| | if result.returncode == 0: |
| | lines = result.stdout.split('\n') |
| | used_mb, total_mb = None, None |
| | for line in lines: |
| | if 'VRAM Total' in line: |
| | total_mb = int(line.split(':')[1].strip().replace('MB', '')) |
| | elif 'VRAM Used' in line: |
| | used_mb = int(line.split(':')[1].strip().replace('MB', '')) |
| | if used_mb is not None and total_mb is not None: |
| | return used_mb, total_mb |
| | except: |
| | pass |
| | |
| | |
| | vram_used = self.read_file(os.path.join(self.base_path, "mem_info_vram_used")) |
| | vram_total = self.read_file(os.path.join(self.base_path, "mem_info_vram_total")) |
| | if vram_used and vram_total: |
| | used_mb = int(vram_used) // (1024*1024) |
| | total_mb = int(vram_total) // (1024*1024) |
| | return used_mb, total_mb |
| | return None, None |
| | |
| | def get_temperature(self) -> Optional[float]: |
| | """Get GPU temperature in Celsius""" |
| | if self.hwmon_path: |
| | temp_raw = self.read_file(os.path.join(self.hwmon_path, "temp1_input")) |
| | if temp_raw: |
| | return int(temp_raw) // 1000 |
| | |
| | |
| | try: |
| | result = subprocess.run(['rocm-smi', '--showtemp'], |
| | capture_output=True, text=True, timeout=5) |
| | if result.returncode == 0: |
| | lines = result.stdout.split('\n') |
| | for line in lines: |
| | if 'Temperature' in line and 'GPU' in line: |
| | temp_str = line.split(':')[1].strip().replace('c', '').replace('C', '') |
| | return float(temp_str) |
| | except: |
| | pass |
| | return None |
| | |
| | def get_power_draw(self) -> Optional[float]: |
| | """Get GPU power draw in Watts""" |
| | if self.hwmon_path: |
| | power_raw = self.read_file(os.path.join(self.hwmon_path, "power1_input")) |
| | if power_raw: |
| | return int(power_raw) // 1000000 |
| | |
| | |
| | try: |
| | result = subprocess.run(['rocm-smi', '--showpower'], |
| | capture_output=True, text=True, timeout=5) |
| | if result.returncode == 0: |
| | lines = result.stdout.split('\n') |
| | for line in lines: |
| | if 'Average Graphics Package Power' in line: |
| | power_str = line.split(':')[1].strip().replace('W', '') |
| | return float(power_str) |
| | except: |
| | pass |
| | return None |
| | |
| | def get_fan_speed(self) -> Tuple[Optional[int], Optional[int]]: |
| | """Get fan speed (RPM, PWM percentage)""" |
| | rpm, pwm_pct = None, None |
| | |
| | if self.hwmon_path: |
| | |
| | fan_pwm = self.read_file(os.path.join(self.hwmon_path, "pwm1")) |
| | if fan_pwm: |
| | pwm_pct = (int(fan_pwm) * 100) // 255 |
| | |
| | |
| | fan_rpm = self.read_file(os.path.join(self.hwmon_path, "fan1_input")) |
| | if fan_rpm: |
| | rpm = int(fan_rpm) |
| | |
| | |
| | if rpm is None or pwm_pct is None: |
| | try: |
| | result = subprocess.run(['rocm-smi', '--showfan'], |
| | capture_output=True, text=True, timeout=5) |
| | if result.returncode == 0: |
| | lines = result.stdout.split('\n') |
| | for line in lines: |
| | if 'Fan Speed' in line: |
| | |
| | pass |
| | except: |
| | pass |
| | |
| | return rpm, pwm_pct |
| | |
| | def get_clocks(self) -> Tuple[Optional[int], Optional[int]]: |
| | """Get GPU clocks (core, memory) in MHz""" |
| | core_clock, mem_clock = None, None |
| | |
| | |
| | sclk_raw = self.read_file(os.path.join(self.base_path, "pp_dpm_sclk")) |
| | mclk_raw = self.read_file(os.path.join(self.base_path, "pp_dpm_mclk")) |
| | |
| | if sclk_raw: |
| | for line in sclk_raw.split('\n'): |
| | if '*' in line: |
| | core_clock = int(line.split(':')[1].strip().split(' ')[0].replace('Mhz', '')) |
| | break |
| | |
| | if mclk_raw: |
| | for line in mclk_raw.split('\n'): |
| | if '*' in line: |
| | mem_clock = int(line.split(':')[1].strip().split(' ')[0].replace('Mhz', '')) |
| | break |
| | |
| | |
| | if core_clock is None or mem_clock is None: |
| | try: |
| | result = subprocess.run(['rocm-smi', '--showclocks'], |
| | capture_output=True, text=True, timeout=5) |
| | if result.returncode == 0: |
| | lines = result.stdout.split('\n') |
| | for line in lines: |
| | if 'GPU clock' in line and core_clock is None: |
| | core_str = line.split(':')[1].strip().replace('Mhz', '').replace('MHz', '') |
| | core_clock = int(float(core_str)) |
| | elif 'Memory clock' in line and mem_clock is None: |
| | mem_str = line.split(':')[1].strip().replace('Mhz', '').replace('MHz', '') |
| | mem_clock = int(float(mem_str)) |
| | except: |
| | pass |
| | |
| | return core_clock, mem_clock |
| | |
| | def get_all_data(self) -> Dict[str, any]: |
| | """Get all GPU data in a single call""" |
| | data = { |
| | 'gpu_usage': self.get_gpu_usage(), |
| | 'vram_used': None, |
| | 'vram_total': None, |
| | 'temperature': self.get_temperature(), |
| | 'power_draw': self.get_power_draw(), |
| | 'fan_rpm': None, |
| | 'fan_pwm': None, |
| | 'core_clock': None, |
| | 'mem_clock': None |
| | } |
| | |
| | |
| | vram_used, vram_total = self.get_vram_usage() |
| | data['vram_used'] = vram_used |
| | data['vram_total'] = vram_total |
| | |
| | |
| | fan_rpm, fan_pwm = self.get_fan_speed() |
| | data['fan_rpm'] = fan_rpm |
| | data['fan_pwm'] = fan_pwm |
| | |
| | |
| | core_clock, mem_clock = self.get_clocks() |
| | data['core_clock'] = core_clock |
| | data['mem_clock'] = mem_clock |
| | |
| | return data |
| |
|
| | if __name__ == "__main__": |
| | |
| | reader = GPUReader() |
| | data = reader.get_all_data() |
| | print("GPU Data:") |
| | for key, value in data.items(): |
| | print(f" {key}: {value}") |