Intro
To better protect my web accessible services, I decided to use Fail2Ban in conjunction with Cloudflare’s WAF to block IPs with more than 3 failed login attempts. I already leverage Cloudflare’s CDN and geo-blocking capabilities on my homelab, so I opted to utilize their WAF to block access to potential attackers.
On the free tier, the WAF is limited to 5 rules, which can easily reach their limit. A workaround is to create a rule that uses a blocklist and then with the Lists API, update it as required.
Creating an API key
Log in to Cloudflare and navigate to My Profile -> API Tokens-> Create a Token-> Create a Custom Token. Give your token a name and make sure the permissions are as follows:

Keep this token handy as it will be used later.
Creating a list
Log in to Cloudflare, navigate to Manage Account -> Configurations -> Lists. Under Lists, create a new list with type IP and name it “block_list.” It should look something like:

Make sure to write down your user id plus the id of this list. You can get it by clicking edit and taking a look at the url. It should look something like:
https://dash.cloudflare.com/<user id>/configurations/lists/<list id>Modifying the list
To modify the list, I made a small python script which uses some of the details that were recorded before.
#!/usr/bin/python3
""" 
Called by entryPoint.py, performs the addition or deletion of an item in a Cloudflare custom list.
Support for IPV6 is limited as it blocks the entire /64 subnet.
"""
import sys
import requests
from requests import Response
import json
import ipaddress
def getIPList(apiEndpoint : str, headers : dict) -> json:
    response = requests.get(apiEndpoint, headers=headers)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to fetch existing IP list. Status code: {response.status_code}")
        print(response.text)
        sys.exit(1)
def addIPtoList(ipAddr : str, apiEndpoint : str, headers : dict) -> Response:
    payload = [{"ip": ipAddr}]
    response = requests.post(apiEndpoint, headers=headers, data=json.dumps(payload))
    return response
def removeIPFromList(ipId : str, apiEndpoint : str, headers : dict) -> Response:
    payload = {"items": [{"id": ipId}]}
    response = requests.delete(apiEndpoint, headers=headers, data=json.dumps(payload))
    return response
if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: ./modifyBanList.py <ip> <add|del>")
        sys.exit(1)
    ipAddr = sys.argv[1]
    try:
        addr = ipaddress.IPv6Address(ipAddr)
        first_64_bits = str(addr.exploded).split(':')[:4]
        ipAddr = ':'.join(first_64_bits) + '::/64'
    except:
        pass
    action = sys.argv[2]
    listId = ''
    accountId = ''
    email = ''
    apiKey = ''
    apiEndpoint = f'https://api.cloudflare.com/client/v4/accounts/{accountId}/rules/lists/{listId}/items'
    headers = {
        'X-Auth-Email': f'{email}',
        'X-Auth-Key': f'{apiKey}',
        'Content-Type': 'application/json'
    }
    existingIpList = getIPList(apiEndpoint,headers)
    print(existingIpList)
    response = None
    if action == "del":
        ipId = None
        for item in existingIpList['result']:
            if item['ip'] == ipAddr:
                ipId = item['id']
                break
        payload = {"items": [{"id": ipId}]}
        if ipId is not None:
            response = requests.delete(apiEndpoint,headers=headers,data=json.dumps(payload))
    elif not any(item['ip'] == ipAddr for item in existingIpList['result']):
        payload = [{
            "ip": ipAddr
        }]
        response = requests.post(apiEndpoint, headers=headers, data=json.dumps(payload))
    if response is not None and response.status_code == 200:
            print(f"IP address {ipAddr} {action} to the custom IP list successfully.")
    else:
        print(f"Failed to {action} IP address {ipAddr} to the custom IP list.")This should add or remove any desired IP from the block list.
Entry Point
My Fail2Ban instance is a container which luckily enough has python. To make sure the script has all required dependencies, I made an entrypoint which then calls the previous script.
#!/usr/bin/python3
""" 
Creates a virtual environment, installs dependencies, and then calls modifyBanList.py
"""
import os
import sys
if len(sys.argv) < 3:
    print("Usage: ./entryPoint.py <ip> <add|del>")
    sys.exit(1)
venv_dir = 'env'
if not os.path.exists(venv_dir):
    os.system(f"{sys.executable} -m venv {venv_dir}")
os.system(f"{os.path.join(venv_dir, 'bin', 'pip')} install --upgrade requests ipaddress")
os.system(f"{os.path.join(venv_dir, 'bin', 'python')} /data/action.d/modifyBanList.py {sys.argv[1]} {sys.argv[2]}")Cloudflare.conf
To glue all of it together, a conf file will tell Fail2Ban how to call the script when a ban or unban action needs to take place.
[Definition]
actionban = /data/action.d/entryPoint.py <ip> add
actionunban = /data/action.d/entryPoint.py <ip> delJail.d
In my config, I am directly using the jail.local and it looks something like:
[DEFAULT]
action = cloudflareDone!
Some next steps are to configure a filter, add it to the jail, and later use a VPN to generate some unsuccesful events. Once the ban takes place, it may take a few seconds to propapage on Cloudflare.
You can take a look at these scripts here.
