| | import os |
| | import numpy as np |
| | import pandas as pd |
| | from sklearn.model_selection import train_test_split |
| | from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, balanced_accuracy_score |
| | import matplotlib.pyplot as plt |
| | import seaborn as sns |
| | |
| | import tensorflow as tf |
| |
|
| |
|
| | def read_binary_file(file_path, max_length=2_000_000): |
| | """ |
| | ๋ฐ์ด๋๋ฆฌ ํ์ผ์ ์ฝ์ด ์ ์ ๋ฐฐ์ด๋ก ๋ณํ |
| | ๋
ผ๋ฌธ ์ฌ์: 2MB๊น์ง ์ฒ๋ฆฌ |
| | """ |
| | try: |
| | with open(file_path, 'rb') as f: |
| | raw_bytes = f.read() |
| | |
| | |
| | byte_array = np.frombuffer(raw_bytes, dtype=np.uint8) |
| | |
| | if len(byte_array) > max_length: |
| | |
| | return byte_array[:max_length] |
| | else: |
| | |
| | padded = np.zeros(max_length, dtype=np.uint8) |
| | padded[:len(byte_array)] = byte_array |
| | return padded |
| | |
| | except Exception as e: |
| | print(f"ํ์ผ ์ฝ๊ธฐ ์ค๋ฅ {file_path}: {e}") |
| | return np.zeros(max_length, dtype=np.uint8) |
| |
|
| | def load_dataset_from_directory(malware_dir, benign_dir, max_length=2_000_000, max_samples_per_class=None): |
| | """ |
| | ๋๋ ํ ๋ฆฌ์์ ์ง์ ๋ฐ์ด๋๋ฆฌ ํ์ผ๋ค์ ๋ก๋ |
| | |
| | Args: |
| | malware_dir: ์
์ฑ์ฝ๋ ํ์ผ๋ค์ด ์๋ ๋๋ ํ ๋ฆฌ |
| | benign_dir: ์ ์ ํ์ผ๋ค์ด ์๋ ๋๋ ํ ๋ฆฌ |
| | max_length: ์ต๋ ๋ฐ์ดํธ ๊ธธ์ด |
| | max_samples_per_class: ํด๋์ค๋น ์ต๋ ์ํ ์ |
| | """ |
| | X, y = [], [] |
| | |
| | |
| | if os.path.exists(malware_dir): |
| | malware_files = [f for f in os.listdir(malware_dir) if os.path.isfile(os.path.join(malware_dir, f))] |
| | if max_samples_per_class: |
| | malware_files = malware_files[:max_samples_per_class] |
| | |
| | print(f"์
์ฑ์ฝ๋ ํ์ผ ๋ก๋ฉ ์ค... ({len(malware_files)}๊ฐ)") |
| | for i, filename in enumerate(malware_files): |
| | file_path = os.path.join(malware_dir, filename) |
| | byte_array = read_binary_file(file_path, max_length) |
| | X.append(byte_array) |
| | y.append(0) |
| | |
| | if (i + 1) % 100 == 0: |
| | print(f" {i + 1}/{len(malware_files)} ์ฒ๋ฆฌ ์๋ฃ") |
| | |
| | |
| | if os.path.exists(benign_dir): |
| | benign_files = [f for f in os.listdir(benign_dir) if os.path.isfile(os.path.join(benign_dir, f))] |
| | if max_samples_per_class: |
| | benign_files = benign_files[:max_samples_per_class] |
| | |
| | print(f"์ ์ ํ์ผ ๋ก๋ฉ ์ค... ({len(benign_files)}๊ฐ)") |
| | for i, filename in enumerate(benign_files): |
| | file_path = os.path.join(benign_dir, filename) |
| | byte_array = read_binary_file(file_path, max_length) |
| | X.append(byte_array) |
| | y.append(1) |
| | |
| | if (i + 1) % 100 == 0: |
| | print(f" {i + 1}/{len(benign_files)} ์ฒ๋ฆฌ ์๋ฃ") |
| | |
| | X = np.array(X) |
| | y = np.array(y) |
| | |
| | print(f"\n๋ฐ์ดํฐ์
๋ก๋ฉ ์๋ฃ:") |
| | print(f" ์ด ์ํ: {len(X)}") |
| | print(f" ์
์ฑ์ฝ๋: {np.sum(y == 0)}") |
| | print(f" ์ ์ํ์ผ: {np.sum(y == 1)}") |
| | |
| | return X, y |
| |
|
| | def load_dataset_from_csv(csv_path, max_length=2_000_000): |
| | """CSV ํ์ผ์์ ๋ฐ์ดํฐ์
๋ก๋""" |
| | df = pd.read_csv(csv_path) |
| | |
| | X, y = [], [] |
| | |
| | print("CSV์์ ํ์ผ ๋ก๋ฉ ์ค...") |
| | for idx, row in df.iterrows(): |
| | file_path = row['filepath'] |
| | label = row['label'] |
| | |
| | if os.path.exists(file_path): |
| | byte_array = read_binary_file(file_path, max_length) |
| | X.append(byte_array) |
| | y.append(label) |
| | else: |
| | print(f"ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค: {file_path}") |
| | |
| | if (idx + 1) % 1000 == 0: |
| | print(f" {idx + 1} ํ์ผ ์ฒ๋ฆฌ ์๋ฃ") |
| | |
| | return np.array(X), np.array(y) |
| |
|
| | def configure_gpu_memory(): |
| | """GPU ๋ฉ๋ชจ๋ฆฌ ์ค์ """ |
| | gpus = tf.config.experimental.list_physical_devices('GPU') |
| | if gpus: |
| | try: |
| | for gpu in gpus: |
| | tf.config.experimental.set_memory_growth(gpu, True) |
| | print(f"GPU ์ค์ ์๋ฃ: {len(gpus)}๊ฐ GPU ์ฌ์ฉ") |
| | return True |
| | except RuntimeError as e: |
| | print(f"GPU ์ค์ ์ค๋ฅ: {e}") |
| | return False |
| |
|
| | def plot_training_history(history): |
| | """ํ๋ จ ํ์คํ ๋ฆฌ ์๊ฐํ""" |
| | fig, axes = plt.subplots(2, 2, figsize=(12, 10)) |
| | |
| | |
| | axes[0, 0].plot(history.history['loss'], label='Training Loss') |
| | if 'val_loss' in history.history: |
| | axes[0, 0].plot(history.history['val_loss'], label='Validation Loss') |
| | axes[0, 0].set_title('Model Loss') |
| | axes[0, 0].set_xlabel('Epoch') |
| | axes[0, 0].set_ylabel('Loss') |
| | axes[0, 0].legend() |
| | axes[0, 0].grid(True) |
| | |
| | |
| | axes[0, 1].plot(history.history['accuracy'], label='Training Accuracy') |
| | if 'val_accuracy' in history.history: |
| | axes[0, 1].plot(history.history['val_accuracy'], label='Validation Accuracy') |
| | axes[0, 1].set_title('Model Accuracy') |
| | axes[0, 1].set_xlabel('Epoch') |
| | axes[0, 1].set_ylabel('Accuracy') |
| | axes[0, 1].legend() |
| | axes[0, 1].grid(True) |
| | |
| | |
| | if 'auc' in history.history: |
| | axes[1, 0].plot(history.history['auc'], label='Training AUC') |
| | if 'val_auc' in history.history: |
| | axes[1, 0].plot(history.history['val_auc'], label='Validation AUC') |
| | axes[1, 0].set_title('Model AUC') |
| | axes[1, 0].set_xlabel('Epoch') |
| | axes[1, 0].set_ylabel('AUC') |
| | axes[1, 0].legend() |
| | axes[1, 0].grid(True) |
| | |
| | |
| | if 'lr' in history.history: |
| | axes[1, 1].plot(history.history['lr'], label='Learning Rate', color='red') |
| | axes[1, 1].set_title('Learning Rate Schedule') |
| | axes[1, 1].set_xlabel('Epoch') |
| | axes[1, 1].set_ylabel('Learning Rate') |
| | axes[1, 1].set_yscale('log') |
| | axes[1, 1].legend() |
| | axes[1, 1].grid(True) |
| | |
| | plt.tight_layout() |
| | plt.show() |
| |
|
| | def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"): |
| | """ํผ๋ ํ๋ ฌ ์๊ฐํ""" |
| | cm = confusion_matrix(y_true, y_pred) |
| | |
| | plt.figure(figsize=(8, 6)) |
| | sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', |
| | xticklabels=['Malware', 'Benign'], |
| | yticklabels=['Malware', 'Benign']) |
| | plt.title(title) |
| | plt.ylabel('True Label') |
| | plt.xlabel('Predicted Label') |
| | plt.show() |
| |
|
| | def evaluate_model(model, X_test, y_test, batch_size=16): |
| | """๋ชจ๋ธ ์ฑ๋ฅ ํ๊ฐ""" |
| | print("๋ชจ๋ธ ํ๊ฐ ์ค...") |
| | |
| | |
| | y_pred_prob = model.predict(X_test, batch_size=batch_size, verbose=1) |
| | y_pred = (y_pred_prob > 0.5).astype(int).flatten() |
| | |
| | |
| | accuracy = np.mean(y_pred == y_test) |
| | balanced_acc = balanced_accuracy_score(y_test, y_pred) |
| | auc_score = roc_auc_score(y_test, y_pred_prob) |
| | |
| | print(f"\n=== ํ๊ฐ ๊ฒฐ๊ณผ ===") |
| | print(f"Accuracy: {accuracy:.4f}") |
| | print(f"Balanced Accuracy: {balanced_acc:.4f}") |
| | print(f"AUC Score: {auc_score:.4f}") |
| | |
| | print(f"\n๋ถ๋ฅ ๋ฆฌํฌํธ:") |
| | print(classification_report(y_test, y_pred, target_names=['Malware', 'Benign'])) |
| | |
| | |
| | plot_confusion_matrix(y_test, y_pred, "MalConv Performance") |
| | |
| | return { |
| | 'accuracy': accuracy, |
| | 'balanced_accuracy': balanced_acc, |
| | 'auc': auc_score, |
| | 'predictions': y_pred_prob |
| | } |
| |
|
| | def get_file_paths_and_labels(malware_dir, benign_dir, max_samples_per_class=None): |
| | """ |
| | ๋๋ ํ ๋ฆฌ์์ ํ์ผ ๊ฒฝ๋ก์ ๋ ์ด๋ธ ๋ชฉ๋ก์ ๊ฐ์ ธ์ต๋๋ค. (๋ฉ๋ชจ๋ฆฌ์ ํ์ผ ๋ก๋ ์ํจ) |
| | """ |
| | filepaths = [] |
| | labels = [] |
| |
|
| | |
| | if os.path.exists(malware_dir): |
| | malware_files = [os.path.join(malware_dir, f) for f in os.listdir(malware_dir) if os.path.isfile(os.path.join(malware_dir, f))] |
| | if max_samples_per_class: |
| | malware_files = malware_files[:max_samples_per_class] |
| | filepaths.extend(malware_files) |
| | labels.extend([0] * len(malware_files)) |
| | print(f"์
์ฑ์ฝ๋ ํ์ผ ๊ฒฝ๋ก ๋ก๋ฉ: {len(malware_files)}๊ฐ") |
| |
|
| | |
| | if os.path.exists(benign_dir): |
| | benign_files = [os.path.join(benign_dir, f) for f in os.listdir(benign_dir) if os.path.isfile(os.path.join(benign_dir, f))] |
| | if max_samples_per_class: |
| | benign_files = benign_files[:max_samples_per_class] |
| | filepaths.extend(benign_files) |
| | labels.extend([1] * len(benign_files)) |
| | print(f"์ ์ ํ์ผ ๊ฒฝ๋ก ๋ก๋ฉ: {len(benign_files)}๊ฐ") |
| | |
| | print(f"\n์ด ํ์ผ ๊ฒฝ๋ก: {len(filepaths)}") |
| | print(f" ์
์ฑ์ฝ๋: {labels.count(0)}") |
| | print(f" ์ ์ํ์ผ: {labels.count(1)}") |
| |
|
| | |
| | indices = np.arange(len(filepaths)) |
| | np.random.shuffle(indices) |
| | filepaths = np.array(filepaths)[indices].tolist() |
| | labels = np.array(labels)[indices] |
| |
|
| | return filepaths, labels |
| |
|
| |
|
| | def data_generator(filepaths, labels, batch_size, max_length=2_000_000, shuffle=True): |
| | """ |
| | ๋ฐ์ดํฐ๋ฅผ ๋ฐฐ์น ๋จ์๋ก ์์ฑํ๋ ์ ๋๋ ์ดํฐ |
| | """ |
| | num_samples = len(filepaths) |
| | if num_samples == 0: |
| | return |
| | |
| | while True: |
| | indices = np.arange(num_samples) |
| | if shuffle: |
| | np.random.shuffle(indices) |
| | |
| | for i in range(0, num_samples, batch_size): |
| | batch_indices = indices[i:i+batch_size] |
| | |
| | X_batch = [] |
| | y_batch_list = [] |
| |
|
| | for j in batch_indices: |
| | try: |
| | X_batch.append(read_binary_file(filepaths[j], max_length)) |
| | y_batch_list.append(labels[j]) |
| | except Exception as e: |
| | print(f"Warning: Skipping file {filepaths[j]} due to error: {e}") |
| | continue |
| | |
| | if not X_batch: |
| | continue |
| |
|
| | yield np.array(X_batch), np.array(y_batch_list) |