====== Intro ====== This is a demo API application showing a simple way to lookup leaked password hashes. It make use of a Sqlite3 database where the hashes are stored in (look [[howtos:import_hashes_to_sqlite3|here]] for how to build it) for the application. You just have to specify its location in the script or as the environment variable "DATABASE". ===== The API Script ===== Here are examples of how to query the API with curl with a GET and a POST request: curl http://:/check_hash/ curl -X POST -H "Content-Type: application/json" -d '{"hash":""}' http://:/check_hash #!/usr/bin/env python3 """ This script is a simple Flask application that checks if a given hash exists in a database of leaked passwords. """ import os from flask import Flask, request, jsonify from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker import logging # Load configuration from environment variables. If the variables are not set, default values are used. DATABASE = os.getenv('DATABASE', 'sqlite:///leaked_hashes.db') HOST = os.getenv('HOST', '0.0.0.0') PORT = int(os.getenv('PORT', 5000)) DEBUG = bool(os.getenv('DEBUG', False)) # Setup logging to INFO level logging.basicConfig(level=logging.INFO) # Setup SQLAlchemy engine and sessionmaker. The engine is configured with the DATABASE URL. engine = create_engine(DATABASE) Session = sessionmaker(bind=engine) # Create a Flask application instance app = Flask(__name__) # Define a route for GET requests to '/check_hash/'. The function query_hash is called with the hash_to_check parameter. @app.route('/check_hash/', methods=['GET']) def check_hash_get(hash_to_check): return query_hash(hash_to_check) # Define a route for POST requests to '/check_hash'. The function query_hash is called with the hash from the request JSON. @app.route('/check_hash', methods=['POST']) def check_hash_post(): hash_to_check = request.json.get('hash', '') return query_hash(hash_to_check) # Function to query the database for a given hash and return a JSON response. def query_hash(hash_to_check): # Create a new SQLAlchemy session session = Session() try: # Execute the SQL query result = session.execute(text('SELECT count FROM hashes WHERE hash=:hash'), {'hash': hash_to_check}).fetchone() except Exception as e: # Log any exceptions and return an error response logging.error(f"Error querying database: {e}") return jsonify({"error": "Internal server error"}), 500 finally: # Close the session session.close() # Return the result as a JSON response if result: return jsonify({"leaked": True, "count": result[0]}) else: return jsonify({"leaked": False}) # Run the Flask application if the script is run directly if __name__ == '__main__': app.run(host=HOST, port=PORT, debug=DEBUG)