====== Intro ======
This is a demo API application showing a simple way to lookup leaked password hashes.
It make use of a Sqlite3 database where the hashes are stored in (look [[howtos:import_hashes_to_sqlite3|here]] for how to build it) for the application. You just have to specify its location in the script or as the environment variable "DATABASE".
===== The API Script =====
Here are examples of how to query the API with curl with a GET and a POST request:
curl http://:/check_hash/
curl -X POST -H "Content-Type: application/json" -d '{"hash":""}' http://:/check_hash
#!/usr/bin/env python3
"""
This script is a simple Flask application that checks if a given hash exists in a database of leaked passwords.
"""
import os
from flask import Flask, request, jsonify
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import logging
# Load configuration from environment variables. If the variables are not set, default values are used.
DATABASE = os.getenv('DATABASE', 'sqlite:///leaked_hashes.db')
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', 5000))
DEBUG = bool(os.getenv('DEBUG', False))
# Setup logging to INFO level
logging.basicConfig(level=logging.INFO)
# Setup SQLAlchemy engine and sessionmaker. The engine is configured with the DATABASE URL.
engine = create_engine(DATABASE)
Session = sessionmaker(bind=engine)
# Create a Flask application instance
app = Flask(__name__)
# Define a route for GET requests to '/check_hash/'. The function query_hash is called with the hash_to_check parameter.
@app.route('/check_hash/', methods=['GET'])
def check_hash_get(hash_to_check):
return query_hash(hash_to_check)
# Define a route for POST requests to '/check_hash'. The function query_hash is called with the hash from the request JSON.
@app.route('/check_hash', methods=['POST'])
def check_hash_post():
hash_to_check = request.json.get('hash', '')
return query_hash(hash_to_check)
# Function to query the database for a given hash and return a JSON response.
def query_hash(hash_to_check):
# Create a new SQLAlchemy session
session = Session()
try:
# Execute the SQL query
result = session.execute(text('SELECT count FROM hashes WHERE hash=:hash'), {'hash': hash_to_check}).fetchone()
except Exception as e:
# Log any exceptions and return an error response
logging.error(f"Error querying database: {e}")
return jsonify({"error": "Internal server error"}), 500
finally:
# Close the session
session.close()
# Return the result as a JSON response
if result:
return jsonify({"leaked": True, "count": result[0]})
else:
return jsonify({"leaked": False})
# Run the Flask application if the script is run directly
if __name__ == '__main__':
app.run(host=HOST, port=PORT, debug=DEBUG)