Skip to content

Cyber threats are a growing concern for organizations worldwide. These threats take many forms, including malware, phishing, and denial-of-service (DOS) attacks, compromising sensitive information and disrupting operations. The increasing sophistication and frequency of these attacks make it imperative for organizations to adopt advanced security measures. Traditional threat detection methods often fall short due to their inability to adapt to new and evolving threats. This is where deep learning models come into play.

Deep learning models can analyze vast amounts of data and identify patterns that may not be immediately obvious to human analysts. By leveraging these models, organizations can proactively detect and mitigate cyber threats, safeguarding their sensitive information and ensuring operational continuity.

As a cybersecurity analyst, you identify and mitigate these threats. In this project, you will design and implement a deep learning model to detect cyber threats. The BETH dataset simulates real-world logs, providing a rich source of information for training and testing your model. The data has already undergone preprocessing, and we have a target label, sus_label, indicating whether an event is malicious (1) or benign (0).

By successfully developing this model, you will contribute to enhancing cybersecurity measures and protecting organizations from potentially devastating cyber attacks.

The Data

ColumnDescription
processIdThe unique identifier for the process that generated the event - int64
threadIdID for the thread spawning the log - int64
parentProcessIdLabel for the process spawning this log - int64
userIdID of user spawning the log
mountNamespaceMounting restrictions the process log works within - int64
argsNumNumber of arguments passed to the event - int64
returnValueValue returned from the event log (usually 0) - int64
sus_labelBinary label as suspicous event (1 is suspicious, 0 is not) - int64

More information on the dataset: BETH dataset (Invalid URL)

# Import required libraries
import pandas as pd
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.nn.functional as functional
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
from torchmetrics import Accuracy
# from sklearn.metrics import accuracy_score  # uncomment to use sklearn
# Load preprocessed data
train_df = pd.read_csv('labelled_train.csv')
test_df = pd.read_csv('labelled_test.csv')
val_df = pd.read_csv('labelled_validation.csv')

# View the first 5 rows of training set
train_df.head()
# 1. Loading and Scaling Data
# Separate features and labels
feature_cols = ['processId', 'threadId', 'parentProcessId', 'userId', 
                'mountNamespace', 'argsNum', 'returnValue']

X_train = train_df[feature_cols].values
y_train = train_df['sus_label'].values

X_val = val_df[feature_cols].values
y_val = val_df['sus_label'].values

X_test = test_df[feature_cols].values
y_test = test_df['sus_label'].values

# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train_scaled)
y_train_tensor = torch.LongTensor(y_train)

X_val_tensor = torch.FloatTensor(X_val_scaled)
y_val_tensor = torch.LongTensor(y_val)


X_test_tensor = torch.FloatTensor(X_test_scaled)
y_test_tensor = torch.LongTensor(y_test)

# Create data loaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

print("Data preprocessing completed!")
# 2. Define the Neural Network Model
class CyberThreatDetector(nn.Module):
    def __init__(self, input_size):
        super(CyberThreatDetector, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, 2)  # Binary classification (benign/malicious)
        self.dropout = nn.Dropout(0.3)
        
    def forward(self, x):
        x = functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = functional.relu(self.fc2(x))
        x = self.dropout(x)
        x = functional.relu(self.fc3(x))
        x = self.fc4(x)
        return x

# Initialize model
input_size = X_train_scaled.shape[1]
model = CyberThreatDetector(input_size)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

print(f"Model initialized with input size: {input_size}")
print(f"Model architecture: {input_size} -> 128 -> 64 -> 32 -> 2")
# 3. Train and evaluate the model
def evaluate_model(model, data_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in data_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

# Training loop
num_epochs = 10
train_losses = []
val_accuracies = []

print("\nStarting training...")
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for batch_idx, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    # Calculate validation accuracy
    val_accuracy = evaluate_model(model, val_loader)
    val_accuracies.append(val_accuracy)
    
    avg_loss = running_loss / len(train_loader)
    train_losses.append(avg_loss)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')
# Final validation accuracy as integer (multiply by 100 for percentage)
val_accuracy = int(val_accuracies[-1] * 100) / 100  # Keep as decimal but ensure it's clean
print(f"\nFinal validation accuracy: {val_accuracy:.2f}")

# Test the model on test set
test_accuracy = evaluate_model(model, DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=64))
print(f"Test accuracy: {test_accuracy:.4f}")

# Check if we achieved the target accuracy
if val_accuracy >= 0.6:
    print("✅ Target accuracy of 0.6 achieved!")
else:
    print("❌ Target accuracy of 0.6 not achieved. Consider:")
    print("- Adjusting learning rate")
    print("- Modifying network architecture")
    print("- Training for more epochs")

print(f"\nval_accuracy = {val_accuracy}")