Intro
To better protect my web accessible services, I decided to use Fail2Ban in conjunction with Cloudflare’s WAF to block IPs with more than 3 failed login attempts. I already leverage Cloudflare’s CDN and geo-blocking capabilities on my homelab, so I opted to utilize their WAF to block access to potential attackers.
On the free tier, the WAF is limited to 5 rules, which can easily reach their limit. A workaround is to create a rule that uses a blocklist and then with the Lists API, update it as required.
Creating an API key
Log in to Cloudflare and navigate to My Profile -> API Tokens-> Create a Token-> Create a Custom Token. Give your token a name and make sure the permissions are as follows:
Keep this token handy as it will be used later.
Creating a list
Log in to Cloudflare, navigate to Manage Account -> Configurations -> Lists. Under Lists, create a new list with type IP and name it “block_list.” It should look something like:
Make sure to write down your user id plus the id of this list. You can get it by clicking edit and taking a look at the url. It should look something like:
https://dash.cloudflare.com/<user id>/configurations/lists/<list id>
Modifying the list
To modify the list, I made a small python script which uses some of the details that were recorded before.
#!/usr/bin/python3
"""
Called by entryPoint.py, performs the addition or deletion of an item in a Cloudflare custom list.
Support for IPV6 is limited as it blocks the entire /64 subnet.
"""
import sys
import requests
from requests import Response
import json
import ipaddress
def getIPList(apiEndpoint : str, headers : dict) -> json:
response = requests.get(apiEndpoint, headers=headers)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to fetch existing IP list. Status code: {response.status_code}")
print(response.text)
sys.exit(1)
def addIPtoList(ipAddr : str, apiEndpoint : str, headers : dict) -> Response:
payload = [{"ip": ipAddr}]
response = requests.post(apiEndpoint, headers=headers, data=json.dumps(payload))
return response
def removeIPFromList(ipId : str, apiEndpoint : str, headers : dict) -> Response:
payload = {"items": [{"id": ipId}]}
response = requests.delete(apiEndpoint, headers=headers, data=json.dumps(payload))
return response
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: ./modifyBanList.py <ip> <add|del>")
sys.exit(1)
ipAddr = sys.argv[1]
try:
addr = ipaddress.IPv6Address(ipAddr)
first_64_bits = str(addr.exploded).split(':')[:4]
ipAddr = ':'.join(first_64_bits) + '::/64'
except:
pass
action = sys.argv[2]
listId = ''
accountId = ''
email = ''
apiKey = ''
apiEndpoint = f'https://api.cloudflare.com/client/v4/accounts/{accountId}/rules/lists/{listId}/items'
headers = {
'X-Auth-Email': f'{email}',
'X-Auth-Key': f'{apiKey}',
'Content-Type': 'application/json'
}
existingIpList = getIPList(apiEndpoint,headers)
print(existingIpList)
response = None
if action == "del":
ipId = None
for item in existingIpList['result']:
if item['ip'] == ipAddr:
ipId = item['id']
break
payload = {"items": [{"id": ipId}]}
if ipId is not None:
response = requests.delete(apiEndpoint,headers=headers,data=json.dumps(payload))
elif not any(item['ip'] == ipAddr for item in existingIpList['result']):
payload = [{
"ip": ipAddr
}]
response = requests.post(apiEndpoint, headers=headers, data=json.dumps(payload))
if response is not None and response.status_code == 200:
print(f"IP address {ipAddr} {action} to the custom IP list successfully.")
else:
print(f"Failed to {action} IP address {ipAddr} to the custom IP list.")
This should add or remove any desired IP from the block list.
Entry Point
My Fail2Ban instance is a container which luckily enough has python. To make sure the script has all required dependencies, I made an entrypoint which then calls the previous script.
#!/usr/bin/python3
"""
Creates a virtual environment, installs dependencies, and then calls modifyBanList.py
"""
import os
import sys
if len(sys.argv) < 3:
print("Usage: ./entryPoint.py <ip> <add|del>")
sys.exit(1)
venv_dir = 'env'
if not os.path.exists(venv_dir):
os.system(f"{sys.executable} -m venv {venv_dir}")
os.system(f"{os.path.join(venv_dir, 'bin', 'pip')} install --upgrade requests ipaddress")
os.system(f"{os.path.join(venv_dir, 'bin', 'python')} /data/action.d/modifyBanList.py {sys.argv[1]} {sys.argv[2]}")
Cloudflare.conf
To glue all of it together, a conf file will tell Fail2Ban how to call the script when a ban or unban action needs to take place.
[Definition]
actionban = /data/action.d/entryPoint.py <ip> add
actionunban = /data/action.d/entryPoint.py <ip> del
Jail.d
In my config, I am directly using the jail.local and it looks something like:
[DEFAULT]
action = cloudflare
Done!
Some next steps are to configure a filter, add it to the jail, and later use a VPN to generate some unsuccesful events. Once the ban takes place, it may take a few seconds to propapage on Cloudflare.
You can take a look at these scripts here.