Jun 23, 2025

Building a Custom Intrusion Detection System (IDS) with Python and Machine Learning

 
Build a custom intrusion detection system with Python and ML. Monitor networks, detect anomalies, automate security tasks, and improve your cybersecurity posture.


Introduction: Why Build Your Own IDS?

Intrusion Detection Systems (IDS) are a critical component of any robust cybersecurity strategy. They monitor network traffic and system activity for malicious activity or policy violations. While commercial IDS solutions offer comprehensive features, building your own IDS with Python and machine learning provides unparalleled customization, flexibility, and a deeper understanding of your network's security landscape. This article explores how to create a custom IDS, focusing on network monitoring with Scapy, anomaly detection with machine learning, and security automation.

Understanding Intrusion Detection Systems

An IDS works by analyzing network traffic and system logs, comparing them against a database of known attack signatures and suspicious behaviors. There are two primary types of IDS:

  • Network Intrusion Detection Systems (NIDS): Analyze network traffic for malicious activity.
  • Host Intrusion Detection Systems (HIDS): Monitor individual hosts for suspicious behavior.

This article will focus on building a NIDS, leveraging Python's capabilities for network packet analysis and machine learning for anomaly detection.

Setting Up the Development Environment

Before diving into the code, let's set up the development environment.

Installing Required Libraries

We'll need the following Python libraries:

  • Scapy: For capturing and analyzing network packets.
  • Scikit-learn: For machine learning algorithms.
  • Pandas: For data manipulation and analysis.
  • Numpy: For numerical computing.
  • Joblib: For saving and loading machine learning models.

Install these libraries using pip:


pip install scapy scikit-learn pandas numpy joblib

Choosing a Network Interface

Select the network interface you want to monitor. This is typically the interface connected to your local network or the internet gateway. Identify it using the `ifconfig` (Linux/macOS) or `ipconfig` (Windows) command. Ensure that you have the necessary permissions to capture network traffic on the selected interface.

Network Monitoring with Scapy

Scapy is a powerful Python library for crafting, capturing, and analyzing network packets. It allows us to intercept network traffic and extract relevant information.

Capturing Network Packets

The following code snippet demonstrates how to capture network packets using Scapy:


from scapy.all import sniff, IP, TCP

def packet_callback(packet):
    if IP in packet:
        ip_src = packet[IP].src
        ip_dst = packet[IP].dst
        if TCP in packet:
            tcp_sport = packet[TCP].sport
            tcp_dport = packet[TCP].dport
            print(f"Source IP: {ip_src}, Destination IP: {ip_dst}, Source Port: {tcp_sport}, Destination Port: {tcp_dport}")

sniff(filter="ip", prn=packet_callback, store=False, iface="eth0")

Explanation:

  • `from scapy.all import sniff, IP, TCP`: Imports necessary Scapy functions and classes.
  • `packet_callback(packet)`: Defines a function to process each captured packet.
  • `if IP in packet`: Checks if the packet contains an IP layer.
  • `packet[IP].src` and `packet[IP].dst`: Extracts the source and destination IP addresses.
  • `if TCP in packet`: Checks if the packet contains a TCP layer.
  • `packet[TCP].sport` and `packet[TCP].dport`: Extracts the source and destination TCP ports.
  • `sniff(filter="ip", prn=packet_callback, store=False, iface="eth0")`: Starts capturing packets on the "eth0" interface, filtering for IP packets, and calling `packet_callback` for each packet. `store=False` prevents Scapy from storing packets in memory, improving performance.

Important: Replace `"eth0"` with your actual network interface name.

Extracting Relevant Features

To use machine learning, we need to extract relevant features from the captured packets. These features could include:

  • Protocol type (TCP, UDP, ICMP)
  • Packet size
  • Source and destination IP addresses
  • Source and destination ports
  • Flags (SYN, ACK, FIN)
  • Payload size

Modify the `packet_callback` function to extract these features and store them in a structured format like a Pandas DataFrame.


import pandas as pd
from scapy.all import sniff, IP, TCP, UDP, ICMP

packet_data = []

def packet_callback(packet):
    if IP in packet:
        ip_src = packet[IP].src
        ip_dst = packet[IP].dst
        protocol = packet[IP].proto  # Protocol number
        packet_len = len(packet)

        tcp_sport = None
        tcp_dport = None
        tcp_flags = None

        udp_sport = None
        udp_dport = None

        icmp_type = None
        icmp_code = None

        if TCP in packet:
            tcp_sport = packet[TCP].sport
            tcp_dport = packet[TCP].dport
            tcp_flags = packet[TCP].flags
        elif UDP in packet:
            udp_sport = packet[UDP].sport
            udp_dport = packet[UDP].dport
        elif ICMP in packet:
            icmp_type = packet[ICMP].type
            icmp_code = packet[ICMP].code

        packet_data.append({
            "ip_src": ip_src,
            "ip_dst": ip_dst,
            "protocol": protocol,
            "packet_len": packet_len,
            "tcp_sport": tcp_sport,
            "tcp_dport": tcp_dport,
            "tcp_flags": tcp_flags,
            "udp_sport": udp_sport,
            "udp_dport": udp_dport,
            "icmp_type": icmp_type,
            "icmp_code": icmp_code
        })

sniff(filter="ip", prn=packet_callback, store=False, iface="eth0", count=1000) #Capture 1000 packets

df = pd.DataFrame(packet_data)
print(df.head())

Explanation:

  • The modified `packet_callback` function now extracts additional features such as protocol, packet length, TCP flags, and ICMP type/code.
  • It stores this data in a list called `packet_data`.
  • After capturing a defined number of packets (count=1000), the script creates a Pandas DataFrame from `packet_data` and prints the head of the DataFrame.

This DataFrame will serve as the input for our machine learning model.

Anomaly Detection with Machine Learning

Anomaly detection is the process of identifying unusual patterns that deviate significantly from the norm. In the context of network security, anomalies can indicate malicious activity.

Choosing a Machine Learning Algorithm

Several machine learning algorithms are suitable for anomaly detection:

  • Isolation Forest: An ensemble learning method that isolates anomalies by randomly partitioning the data space.
  • One-Class SVM: Aims to learn a boundary around the "normal" data points, identifying anything outside this boundary as an anomaly.
  • Local Outlier Factor (LOF): Measures the local density deviation of a given data point with respect to its neighbors.

For this example, we'll use Isolation Forest due to its simplicity and effectiveness.

Training the Model

First, prepare the data by cleaning and transforming it. This includes handling missing values (e.g., filling with 0) and encoding categorical features (e.g., using one-hot encoding).


import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Load the DataFrame (replace with your actual data loading)
df = pd.read_csv('network_traffic_data.csv') #Example loading

# Preprocessing: Handling missing values
df = df.fillna(0)

# Preprocessing: Encoding categorical features
categorical_cols = ['ip_src', 'ip_dst', 'tcp_flags'] #Adjust to your DataFrame
for col in categorical_cols:
    if col in df.columns:
        le = LabelEncoder()
        df[col] = le.fit_transform(df[col])

# Selecting features for training
features = ['protocol', 'packet_len', 'tcp_sport', 'tcp_dport', 'udp_sport', 'udp_dport', 'icmp_type', 'icmp_code'] #Adjust to your DataFrame
features = [f for f in features if f in df.columns] #Ensure features exist
X = df[features]

# Splitting the data into training and testing sets
X_train, X_test = train_test_split(X, test_size=0.3, random_state=42)

# Training the Isolation Forest model
model = IsolationForest(n_estimators=100, contamination='auto', random_state=42)
model.fit(X_train)

# Making predictions on the test set
y_pred = model.predict(X_test)

# Adding the predictions to the test DataFrame
df_test = df.loc[X_test.index].copy()
df_test['anomaly'] = y_pred

# Displaying anomaly results
anomalies = df_test[df_test['anomaly'] == -1]
print("Anomalous Packets:")
print(anomalies.head())

Explanation:

  • The code loads the preprocessed DataFrame from a CSV file.
  • It handles missing values by filling them with 0.
  • It encodes categorical features using LabelEncoder.
  • It selects the features to be used for training the Isolation Forest model.
  • The data is split into training and testing sets.
  • The Isolation Forest model is trained on the training data.
  • Predictions are made on the test data, and the results are added to a new 'anomaly' column in the test DataFrame.
  • Finally, the anomalous packets are filtered and displayed.

Evaluating the Model

Evaluate the model's performance using appropriate metrics such as precision, recall, and F1-score. These metrics will help you fine-tune the model's parameters and improve its accuracy.

Security Automation

Automating the response to detected anomalies is crucial for timely security interventions. This can involve actions such as:

  • Logging the event.
  • Alerting administrators.
  • Blocking the offending IP address.
  • Quarantining the affected system.

Implementing Automated Responses

Use Python to create scripts that automatically respond to detected anomalies. For example, you can use the `iptables` command (on Linux) to block malicious IP addresses.


import subprocess

def block_ip(ip_address):
    """Blocks an IP address using iptables."""
    try:
        subprocess.run(["iptables", "-A", "INPUT", "-s", ip_address, "-j", "DROP"], check=True)
        print(f"IP address {ip_address} blocked.")
    except subprocess.CalledProcessError as e:
        print(f"Error blocking IP address: {e}")

#Example usage (from the anomaly detection script above)
if 'anomaly' in df_test.columns:
    anomalies = df_test[df_test['anomaly'] == -1]
    for index, row in anomalies.iterrows():
        ip_src = row['ip_src']
        block_ip(ip_src)

Explanation:

  • The `block_ip` function uses the `iptables` command to block the specified IP address.
  • The example usage iterates through the anomalous packets and calls `block_ip` to block the source IP address of each anomalous packet.

Important: Exercise caution when implementing automated blocking mechanisms. Ensure that you have adequate safeguards in place to prevent blocking legitimate traffic. Consider using a testing environment to validate your automation scripts before deploying them in a production environment.

Remember to adapt this script for Windows firewall using `netsh` or PowerShell cmdlets.

Advanced Techniques and Considerations

Building a robust IDS requires continuous improvement and adaptation. Consider the following advanced techniques:

  • Signature-Based Detection: Integrate a signature-based detection engine to identify known attack patterns.
  • Real-time Analysis: Optimize the code for real-time analysis to detect anomalies as they occur.
  • Cloud Integration: Deploy the IDS in the cloud to monitor network traffic across multiple environments.
  • Feedback Loops: Implement feedback loops to continuously improve the accuracy of the machine learning model.
  • Data Visualization: Use data visualization tools to gain insights into network traffic patterns and identify potential security threats.

No comments:

Post a Comment