Skip to main content
Here is an example on how you can use our API in python to solve a Cloudflare WAF challenge.
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
import requests
import rnet # pip install asyncio rnet --pre --upgrade
import asyncio
from pydantic import BaseModel

URL = "https://page.systems/uam"  # test page
API_KEY = "" # your solv.now api key
PROXY = ""  # must be static/sticky in URL format: protocol://[user:pass@]host:port


class WAFRequest(BaseModel):
    url: str  # URL where challenge is issued
    proxy: str  # Proxy we use for solving
    user_agent: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36"
    html: str | None = None  # challenge HTML, prefered but optional


class WAFSolution(BaseModel):
    clearance: str  # Solved WAF/UAM token, required
    cf_bm: str | None = None  # Cloudflare browser ID, optional
    cf_rt: str # token that must be appended to Referer query parameters as __cf_chl_tk
    headers: dict[str, str]  # headers used for solving
    attributes: dict[str, str]  # attributes to post on first request


def solve_waf(request: WAFRequest) -> WAFSolution:
    response = requests.post(
        "https://api.solv.now/v1/task/execute",
        headers={
            "Content-Type": "application/json",
            "X-Api-Key": API_KEY,
        },
        json={
            "task_type": "waf",
            "task_data": request.model_dump(),
        },
    )

    data = response.json()
    if response.status_code != 200 or not data.get("success"):
        raise Exception(f"Error occurred while solving WAF: {data.get('message')}")
    return WAFSolution.model_validate(data["data"]["solution"])


def add_query_param(url: str, **params) -> str:
    p = urlparse(url)
    q = parse_qs(p.query)
    q.update(params)
    return urlunparse(p._replace(query=urlencode(q, doseq=True)))


def build_waf_attributes(attributes: dict[str, str]) -> str:
    return "&".join([f"{key}={value}" for key, value in attributes.items()])


async def main():
    emulation = rnet.EmulationOption(
        emulation=rnet.Emulation.Chrome143, emulation_os=rnet.EmulationOS.Windows
    )
    # We need to create our own cookie jar so we can add later solved cookies
    jar = rnet.Jar()
    # Create the client with our proxy, cookie jar and emulation.
    client = rnet.Client(
        proxies=[rnet.Proxy.all(PROXY)], emulation=emulation, cookie_provider=jar
    )

    # Fetch protected WAF site
    # We don't actually set all headers here because rnet library handles most of them automatically, but other libs may not.
    # We only rewrite some such as Accept-Encoding because rnet's one does not match with Chrome's.
    response = await client.get(
        URL,
        headers={
            "Accept-Encoding": "gzip, deflate, br, zstd",
        },
    )
    # Check if we've been mitigated by Cloudflare
    # https://developers.cloudflare.com/cloudflare-challenges/challenge-types/challenge-pages/detect-response/
    cf_mitigated = response.headers.get("Cf-Mitigated")
    if not cf_mitigated or cf_mitigated.decode() != "challenge":
        # We didn't find a challenge, so we're good!
        raise Exception("No mitigation from Cloudflare!")

    # Let's solve the challenge
    html = await response.text()
    solution = solve_waf(
        WAFRequest(
            url=URL,
            html=html,
            proxy=PROXY,
        )
    )

    # Add the solved cookies to the cookie jar
    jar.add("cf_clearance=" + solution.clearance, URL)
    if solution.cf_bm is not None:
        jar.add("__cf_bm=" + solution.cf_bm, URL)

    print("Solved!")

    response = await client.post(
        URL,
        headers={
            "Content-Type": "application/x-www-form-urlencoded",
            "Referer": add_query_param(URL, __cf_chl_tk=solution.cf_rt),
            "Accept-Encoding": "gzip, deflate, br, zstd",
        },
        body=build_waf_attributes(solution.attributes),
    )

    print("Response status:", response.status)
    print("Response body:", await response.text())


if __name__ == "__main__":
    asyncio.run(main())