File size: 8,956 Bytes
1170bc5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env python3
"""
GPU Reader for AMD Radeon Pro VII
Reads GPU sensor data using rocm-smi and sysfs fallback
"""

import subprocess
import os
import glob
import time
from typing import Dict, Optional, Tuple

class GPUReader:
    def __init__(self):
        self.base_path = "/sys/class/drm/card1/device"
        self._find_hwmon_path()
    
    def _find_hwmon_path(self):
        """Find the hwmon path for AMD GPU"""
        self.hwmon_path = None
        hwmon_base = os.path.join(self.base_path, "hwmon")
        if os.path.exists(hwmon_base):
            hwmons = os.listdir(hwmon_base)
            if hwmons:
                self.hwmon_path = os.path.join(hwmon_base, hwmons[0])
    
    def read_file(self, path: str) -> Optional[str]:
        """Safely read a file and return its content"""
        if not path or not os.path.exists(path):
            return None
        try:
            with open(path, 'r') as f:
                return f.read().strip()
        except Exception:
            return None
    
    def get_gpu_usage(self) -> Optional[float]:
        """Get GPU usage percentage using rocm-smi"""
        try:
            result = subprocess.run(['rocm-smi', '--showuse'], 
                                  capture_output=True, text=True, timeout=5)
            if result.returncode == 0:
                lines = result.stdout.split('\n')
                for line in lines:
                    if 'GPU use' in line:
                        usage = line.split(':')[1].strip().replace('%', '')
                        return float(usage)
        except:
            # Fallback to sysfs
            usage = self.read_file(os.path.join(self.base_path, "gpu_busy_percent"))
            if usage:
                return float(usage)
        return None
    
    def get_vram_usage(self) -> Tuple[Optional[int], Optional[int]]:
        """Get VRAM usage (used, total) in MB"""
        try:
            # Try rocm-smi first
            result = subprocess.run(['rocm-smi', '--showmeminfo', 'vram'], 
                                  capture_output=True, text=True, timeout=5)
            if result.returncode == 0:
                lines = result.stdout.split('\n')
                used_mb, total_mb = None, None
                for line in lines:
                    if 'VRAM Total' in line:
                        total_mb = int(line.split(':')[1].strip().replace('MB', ''))
                    elif 'VRAM Used' in line:
                        used_mb = int(line.split(':')[1].strip().replace('MB', ''))
                if used_mb is not None and total_mb is not None:
                    return used_mb, total_mb
        except:
            pass
        
        # Fallback to sysfs
        vram_used = self.read_file(os.path.join(self.base_path, "mem_info_vram_used"))
        vram_total = self.read_file(os.path.join(self.base_path, "mem_info_vram_total"))
        if vram_used and vram_total:
            used_mb = int(vram_used) // (1024*1024)
            total_mb = int(vram_total) // (1024*1024)
            return used_mb, total_mb
        return None, None
    
    def get_temperature(self) -> Optional[float]:
        """Get GPU temperature in Celsius"""
        if self.hwmon_path:
            temp_raw = self.read_file(os.path.join(self.hwmon_path, "temp1_input"))
            if temp_raw:
                return int(temp_raw) // 1000
        
        # Fallback to rocm-smi
        try:
            result = subprocess.run(['rocm-smi', '--showtemp'], 
                                  capture_output=True, text=True, timeout=5)
            if result.returncode == 0:
                lines = result.stdout.split('\n')
                for line in lines:
                    if 'Temperature' in line and 'GPU' in line:
                        temp_str = line.split(':')[1].strip().replace('c', '').replace('C', '')
                        return float(temp_str)
        except:
            pass
        return None
    
    def get_power_draw(self) -> Optional[float]:
        """Get GPU power draw in Watts"""
        if self.hwmon_path:
            power_raw = self.read_file(os.path.join(self.hwmon_path, "power1_input"))
            if power_raw:
                return int(power_raw) // 1000000
        
        # Fallback to rocm-smi
        try:
            result = subprocess.run(['rocm-smi', '--showpower'], 
                                  capture_output=True, text=True, timeout=5)
            if result.returncode == 0:
                lines = result.stdout.split('\n')
                for line in lines:
                    if 'Average Graphics Package Power' in line:
                        power_str = line.split(':')[1].strip().replace('W', '')
                        return float(power_str)
        except:
            pass
        return None
    
    def get_fan_speed(self) -> Tuple[Optional[int], Optional[int]]:
        """Get fan speed (RPM, PWM percentage)"""
        rpm, pwm_pct = None, None
        
        if self.hwmon_path:
            # Get PWM value
            fan_pwm = self.read_file(os.path.join(self.hwmon_path, "pwm1"))
            if fan_pwm:
                pwm_pct = (int(fan_pwm) * 100) // 255
            
            # Get RPM
            fan_rpm = self.read_file(os.path.join(self.hwmon_path, "fan1_input"))
            if fan_rpm:
                rpm = int(fan_rpm)
        
        # Fallback to rocm-smi if needed
        if rpm is None or pwm_pct is None:
            try:
                result = subprocess.run(['rocm-smi', '--showfan'], 
                                      capture_output=True, text=True, timeout=5)
                if result.returncode == 0:
                    lines = result.stdout.split('\n')
                    for line in lines:
                        if 'Fan Speed' in line:
                            # Parse fan info
                            pass  # Implementation would depend on exact output format
            except:
                pass
        
        return rpm, pwm_pct
    
    def get_clocks(self) -> Tuple[Optional[int], Optional[int]]:
        """Get GPU clocks (core, memory) in MHz"""
        core_clock, mem_clock = None, None
        
        # Try parsing sysfs
        sclk_raw = self.read_file(os.path.join(self.base_path, "pp_dpm_sclk"))
        mclk_raw = self.read_file(os.path.join(self.base_path, "pp_dpm_mclk"))
        
        if sclk_raw:
            for line in sclk_raw.split('\n'):
                if '*' in line:
                    core_clock = int(line.split(':')[1].strip().split(' ')[0].replace('Mhz', ''))
                    break
        
        if mclk_raw:
            for line in mclk_raw.split('\n'):
                if '*' in line:
                    mem_clock = int(line.split(':')[1].strip().split(' ')[0].replace('Mhz', ''))
                    break
        
        # Fallback to rocm-smi
        if core_clock is None or mem_clock is None:
            try:
                result = subprocess.run(['rocm-smi', '--showclocks'], 
                                      capture_output=True, text=True, timeout=5)
                if result.returncode == 0:
                    lines = result.stdout.split('\n')
                    for line in lines:
                        if 'GPU clock' in line and core_clock is None:
                            core_str = line.split(':')[1].strip().replace('Mhz', '').replace('MHz', '')
                            core_clock = int(float(core_str))
                        elif 'Memory clock' in line and mem_clock is None:
                            mem_str = line.split(':')[1].strip().replace('Mhz', '').replace('MHz', '')
                            mem_clock = int(float(mem_str))
            except:
                pass
        
        return core_clock, mem_clock
    
    def get_all_data(self) -> Dict[str, any]:
        """Get all GPU data in a single call"""
        data = {
            'gpu_usage': self.get_gpu_usage(),
            'vram_used': None,
            'vram_total': None,
            'temperature': self.get_temperature(),
            'power_draw': self.get_power_draw(),
            'fan_rpm': None,
            'fan_pwm': None,
            'core_clock': None,
            'mem_clock': None
        }
        
        # Get VRAM usage
        vram_used, vram_total = self.get_vram_usage()
        data['vram_used'] = vram_used
        data['vram_total'] = vram_total
        
        # Get fan speed
        fan_rpm, fan_pwm = self.get_fan_speed()
        data['fan_rpm'] = fan_rpm
        data['fan_pwm'] = fan_pwm
        
        # Get clocks
        core_clock, mem_clock = self.get_clocks()
        data['core_clock'] = core_clock
        data['mem_clock'] = mem_clock
        
        return data

if __name__ == "__main__":
    # Test the GPU reader
    reader = GPUReader()
    data = reader.get_all_data()
    print("GPU Data:")
    for key, value in data.items():
        print(f"  {key}: {value}")