Using Fail2Ban with Cloudflare on a free account

Kovasky Buezo | Mar 25, 2024 min read

edited on: May 17, 2024

Intro

To better protect my web accessible services, I decided to use Fail2Ban in conjunction with Cloudflare’s WAF to block IPs with more than 3 failed login attempts. I already leverage Cloudflare’s CDN and geo-blocking capabilities on my homelab, so I opted to utilize their WAF to block access to potential attackers.

On the free tier, the WAF is limited to 5 rules, which can easily reach their limit. A workaround is to create a rule that uses a blocklist and then with the Lists API, update it as required.

Creating an API key

Log in to Cloudflare and navigate to My Profile -> API Tokens-> Create a Token-> Create a Custom Token. Give your token a name and make sure the permissions are as follows:

cloudflare token

Keep this token handy as it will be used later.

Creating a list

Log in to Cloudflare, navigate to Manage Account -> Configurations -> Lists. Under Lists, create a new list with type IP and name it “block_list.” It should look something like:

block_list

Make sure to write down your user id plus the id of this list. You can get it by clicking edit and taking a look at the url. It should look something like:

https://dash.cloudflare.com/<user id>/configurations/lists/<list id>

Modifying the list

To modify the list, I made a small python script which uses some of the details that were recorded before.

#!/usr/bin/python3

""" 
Called by entryPoint.py, performs the addition or deletion of an item in a Cloudflare custom list.
Support for IPV6 is limited as it blocks the entire /64 subnet.
"""

import sys
import requests
from requests import Response
import json
import ipaddress

def getIPList(apiEndpoint : str, headers : dict) -> json:
    response = requests.get(apiEndpoint, headers=headers)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to fetch existing IP list. Status code: {response.status_code}")
        print(response.text)
        sys.exit(1)

def addIPtoList(ipAddr : str, apiEndpoint : str, headers : dict) -> Response:
    payload = [{"ip": ipAddr}]
    response = requests.post(apiEndpoint, headers=headers, data=json.dumps(payload))
    return response

def removeIPFromList(ipId : str, apiEndpoint : str, headers : dict) -> Response:
    payload = {"items": [{"id": ipId}]}
    response = requests.delete(apiEndpoint, headers=headers, data=json.dumps(payload))
    return response

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: ./modifyBanList.py <ip> <add|del>")
        sys.exit(1)

    ipAddr = sys.argv[1]

    try:
        addr = ipaddress.IPv6Address(ipAddr)
        first_64_bits = str(addr.exploded).split(':')[:4]
        ipAddr = ':'.join(first_64_bits) + '::/64'
    except:
        pass

    action = sys.argv[2]
    listId = ''
    accountId = ''
    email = ''
    apiKey = ''
    apiEndpoint = f'https://api.cloudflare.com/client/v4/accounts/{accountId}/rules/lists/{listId}/items'

    headers = {
        'X-Auth-Email': f'{email}',
        'X-Auth-Key': f'{apiKey}',
        'Content-Type': 'application/json'
    }

    existingIpList = getIPList(apiEndpoint,headers)
    print(existingIpList)
    response = None

    if action == "del":
        ipId = None
        for item in existingIpList['result']:
            if item['ip'] == ipAddr:
                ipId = item['id']
                break
        payload = {"items": [{"id": ipId}]}

        if ipId is not None:
            response = requests.delete(apiEndpoint,headers=headers,data=json.dumps(payload))
    elif not any(item['ip'] == ipAddr for item in existingIpList['result']):
        payload = [{
            "ip": ipAddr
        }]
        response = requests.post(apiEndpoint, headers=headers, data=json.dumps(payload))

    if response is not None and response.status_code == 200:
            print(f"IP address {ipAddr} {action} to the custom IP list successfully.")
    else:
        print(f"Failed to {action} IP address {ipAddr} to the custom IP list.")

This should add or remove any desired IP from the block list.

Entry Point

My Fail2Ban instance is a container which luckily enough has python. To make sure the script has all required dependencies, I made an entrypoint which then calls the previous script.

#!/usr/bin/python3

""" 
Creates a virtual environment, installs dependencies, and then calls modifyBanList.py
"""

import os
import sys

if len(sys.argv) < 3:
    print("Usage: ./entryPoint.py <ip> <add|del>")
    sys.exit(1)

venv_dir = 'env'

if not os.path.exists(venv_dir):
    os.system(f"{sys.executable} -m venv {venv_dir}")

os.system(f"{os.path.join(venv_dir, 'bin', 'pip')} install --upgrade requests ipaddress")
os.system(f"{os.path.join(venv_dir, 'bin', 'python')} /data/action.d/modifyBanList.py {sys.argv[1]} {sys.argv[2]}")

Cloudflare.conf

To glue all of it together, a conf file will tell Fail2Ban how to call the script when a ban or unban action needs to take place.

[Definition]
actionban = /data/action.d/entryPoint.py <ip> add
actionunban = /data/action.d/entryPoint.py <ip> del

Jail.d

In my config, I am directly using the jail.local and it looks something like:

[DEFAULT]
action = cloudflare

Done!

Some next steps are to configure a filter, add it to the jail, and later use a VPN to generate some unsuccesful events. Once the ban takes place, it may take a few seconds to propapage on Cloudflare.

You can take a look at these scripts here.