Programación y Hacking de Interfaces API – Parte N°2

Detalles básicos de OWASP Top 10 API

La cantidad de pruebas de seguridad a las que una interfaz API puede ser sometida, es amplia, tanto como las pruebas de seguridad contra aplicaciones web, razón por la cual tienen un Top 10 dedicado en OWASP; para quienes prefieren lecturas en castellano, la gente de Segu-Info tiene una buena traducción. El Top 10 de API en OWASP es del año 2019, así que a diferencia del Top 10 Web de OWASP del año 2017, la información es un poco más clara, precisa y actualizada.

API4:2019 – Lack of Resources & Rate Limiting y Insecure Direct Object Reference (IDOR)

Según el cheat sheet de OWASP, una vulnerabilidad de tipo IDOR se caracteriza por elementos que permiten realizar procedimientos de fuzzing, enumeración e incluso de fuerza bruta o diccionario. Por otro lado, API4:2019 hace referencia a rate limit…

Entonces, creamos un API endpoint que simule la URL de consumo basada en la columna SKU de la base de datos:

#!/usr/bin/python3
#_*_ coding: utf8 _*_

from flask import Flask,jsonify,make_response,request
from flaskext.mysql import MySQL

app = Flask(__name__)

app.config["MYSQL_DATABASE_HOST"] = "localhost"
app.config["MYSQL_DATABASE_DB"] = "db_api_pentesting"
app.config["MYSQL_DATABASE_USER"] = "root"
app.config["MYSQL_DATABASE_PASSWORD"] = ""

mysql = MySQL()
mysql.init_app(app)

@app.route("/", methods=["GET"])
def Home():
    return "Bienvenido al Home!"

@app.route("/api/v1/purchases/sku/<string:sku>", methods=["GET"])
def GETsku(sku):
    dbcursor = mysql.get_db().cursor()
    dbcursor.execute("SELECT * FROM `orders` WHERE `sku` = '"+sku+"'")
    skuFound = dbcursor.fetchone()
    if not skuFound:
        response = make_response("Nothing to see here!")
        response.headers["Server"] = "Servidor API Pentesting 1.0"
        return response
    else:
        skuFetched = jsonify({"id": skuFound[0],"sku": skuFound[1],"price": skuFound[2]})
        response = make_response(skuFetched)
        response.headers["Server"] = "Servidor API Pentesting 1.0"
        return response

if __name__ == "__main__":
    app.run(debug=True)

Como dije antes, no es necesario por ahora preocuparse con el uso de consultas de SQL parametrizadas o de un ORM como SQLALchemy, todo lo relacionado a inyección de SQL será para otro API endpoint. Entonces, ya podemos consumir este API endpoint:

Request
Response

Bastaría llevar el request a la opción intruder para realizar un ciclo de iteraciones, usando Burp Suite. Pero acabo de leer buenos comentarios sobre el módulo HTTPX, bastante parecido al módulo Requests, ambos para Python. En fin, aprovecharé este paso para probar HTTPX. Resultado:

#!/usr/bin/python3
#_*_ coding: utf8 _*_

import httpx

def R(sku):
    try:
        r = httpx.get("http://127.0.0.1:5000/api/v1/purchases/sku/"+sku)
        print(r.text)
    except KeyboardInterrupt:
        print("\nStop...")
        exit()

for x in range(0,5999):
    x = str(x)
    if len(x) == 1:
        R("ABC-000"+x)
    elif len(x) == 2:
        R("CDE-00"+x)
    elif len(x) == 3:
        R("FGH-0"+x)
    else:
        R("IJK-"+x)
else:
    print("End of loop!")

Ya con esto, es posible entonces, realizar una extracción masiva de datos por medio del endpoint.

Implementación de Rate Limit con Flask-Limiter

Rate limite es posible configurarlo con Cloudflare, Kong (el proxy), y muchos otros recursos, pero lo haré con Flask-Limiter. Si nos enfocamos en realizar un control y bloqueo temporal basado en la dirección IP, con key get_ipaddr:

#!/usr/bin/python3
#_*_ coding: utf8 _*_

from flask import Flask,jsonify,make_response,request
from flaskext.mysql import MySQL
from flask_limiter import Limiter
from flask_limiter.util import get_ipaddr

app = Flask(__name__)
limiter = Limiter(
    app,
    key_func=get_ipaddr,
    default_limits=["200 per day", "50 per hour"]
)

app.config["MYSQL_DATABASE_HOST"] = "localhost"
app.config["MYSQL_DATABASE_DB"] = "db_api_pentesting"
app.config["MYSQL_DATABASE_USER"] = "root"
app.config["MYSQL_DATABASE_PASSWORD"] = ""

mysql = MySQL()
mysql.init_app(app)

@app.route("/", methods=["GET"])
def Home():
    return "Bienvenido al Home!"

@app.route("/api/v1/purchases/sku/<string:sku>", methods=["GET"])
@limiter.limit("200 per day")
def GETsku(sku):
    dbcursor = mysql.get_db().cursor()
    dbcursor.execute("SELECT * FROM `orders` WHERE `sku` = '"+sku+"'")
    skuFound = dbcursor.fetchone()
    if not skuFound:
        response = make_response("Nothing to see here!")
        response.headers["Server"] = "Servidor API Pentesting 1.0"
        return response
    else:
        skuFetched = jsonify({"id": skuFound[0],"sku": skuFound[1],"price": skuFound[2]})
        response = make_response(skuFetched)
        response.headers["Server"] = "Servidor API Pentesting 1.0"
        return response

if __name__ == "__main__":
    app.run(debug=True)

Y al ejecutar el script, al llegar al request N°200, se obtiene:

Evasión de Rate Limit basado en cabeceras (headers)

Una forma conocida de evadir el rate limit, es usando direcciones IP de servicios proxy o similares, para rotar o cambiar la dirección de origen para los requests.

Sin embargo, los desarrolladores pueden cometer la mala práctica de usar código y características innecesarias para el endpoint o la interfaz API, como por ejemplo, la utilización de whitelist:

#!/usr/bin/python3
#_*_ coding: utf8 _*_

from flask import Flask,jsonify,make_response,request
from flaskext.mysql import MySQL
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

app = Flask(__name__)
limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["5000 per day", "1000 per hour"]
)

app.config["MYSQL_DATABASE_HOST"] = "localhost"
app.config["MYSQL_DATABASE_DB"] = "db_api_pentesting"
app.config["MYSQL_DATABASE_USER"] = "root"
app.config["MYSQL_DATABASE_PASSWORD"] = ""

mysql = MySQL()
mysql.init_app(app)

@app.route("/", methods=["GET"])
def Home():
    return "Bienvenido al Home!"

@app.route("/api/v1/purchases/sku/<string:sku>", methods=["GET"])
@limiter.limit("5 per day")
def GETsku(sku):
    dbcursor = mysql.get_db().cursor()
    dbcursor.execute("SELECT * FROM `orders` WHERE `sku` = '"+sku+"'")
    skuFound = dbcursor.fetchone()
    if not skuFound:
        response = make_response("Nothing to see here!")
        response.headers["Server"] = "Servidor API Pentesting 1.0"
        return response
    else:
        skuFetched = jsonify({"id": skuFound[0],"sku": skuFound[1],"price": skuFound[2]})
        response = make_response(skuFetched)
        response.headers["Server"] = "Servidor API Pentesting 1.0"
        return response

@limiter.request_filter
def hWL():
    return request.headers.get("X-Host", "") == "127.0.0.1"

if __name__ == "__main__":
    app.run(debug=True)

Y nos vamos al Intruder:

Paso N°1: Ejecutamos 10 request para llegar al límite

Paso N°2: Verificación de Error 429
Paso N°3: En el intruder, seteamos una posición nueva para una lista de cabeceras
Paso N°4: Cargamos la lista de posibles cabeceras

Paso N°5: Identificamos la cabecera “X-Host”
Paso N°6: Response con estado 200
Paso N°7: Ahora, volvemos a ejecutar el mismo ataque pero especificando la cabecera para cada request

Paso N°8: Evadimos satisfactoriamente el rate limit para consumo del API endpoint

Finalmente, entonces, agrego la cabecera como parámetro en el request:

#!/usr/bin/python3
#_*_ coding: utf8 _*_

import httpx

header = {
"X-Host": "127.0.0.1",
}

def R(sku):
    try:
        r = httpx.get("http://127.0.0.1:5000/api/v1/purchases/sku/"+sku,headers=header)
        print(r.text)
    except KeyboardInterrupt:
        print("\nStop...")
        exit()

for x in range(0,5999):
    x = str(x)
    if len(x) == 1:
        R("ABC-000"+x)
    elif len(x) == 2:
        R("CDE-00"+x)
    elif len(x) == 3:
        R("FGH-0"+x)
    else:
        R("IJK-"+x)
else:
    print("End of loop!")