| description | Guidance for developing CodeQL queries targeting Python code |
|---|
For general CodeQL query development guidance, see Common Query Development.
- Python AST Reference - Complete guide to Python AST node types
- Python Security Query Guide - Comprehensive framework modeling and security query implementation
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.security.dataflow.SqlInjection
import semmle.python.ApiGraphsCall- Function/method calls (func(args))Attribute- Attribute access (obj.attr,module.function)Subscript- Subscript operations (obj[key],list[0])Name- Variable references and identifiersStringLiteral- String literals ("hello",'world')List/Dict- Collection literalsBinOp/UnaryExpr- Operations
FunctionDef/FunctionExpr/Function- Function definitions and objectsClassDef/ClassExpr/Class- Class definitions and objectsImport/ImportFrom/ImportExpr- Import statementsAssign/AssignStmt- Assignment statementsIf/For/While- Control flow statementsReturn/ExprStmt/Pass- Other statements
The ApiGraphs library is the preferred method for tracking Python APIs and data flow:
// Track module imports
API::Node flask() { result = API::moduleImport("flask") }
// Track class instantiation
API::Node flaskApp() { result = flask().getMember("Flask").getACall() }
// Track method calls
API::Node route() { result = flaskApp().getMember("route") }// Connect API tracking to data flow analysis
API::Node userInput() {
result = API::moduleImport("flask").getMember("request").getMember(["args", "form", "json"])
}
// Use in taint tracking configurations
predicate isSource(DataFlow::Node source) {
source = userInput().getACall()
}// Flask route detection
predicate isFlaskRoute(FunctionDef func) {
exists(Decorator d | d.getTarget() = func |
d.getDecorator().(Attribute).getName() = "route" and
d.getDecorator().(Attribute).getObject().(Name).getId() = "app"
)
}
// Django view detection
predicate isDjangoView(FunctionDef func) {
exists(Parameter p | p = func.getAnArg() |
p.getAnnotation().(Attribute).getName() = "HttpRequest"
)
}from Call call
where
// Database cursor execute methods
call.getFunc().(Attribute).getName() in ["execute", "executemany"] and
exists(Expr arg | arg = call.getAnArg() |
// Check if argument contains string formatting
arg instanceof BinOp or
arg instanceof Call
)
select call, "Potential SQL injection vulnerability"// Track Flask request data
API::Node flaskRequest() {
result = API::moduleImport("flask").getMember("request")
}
predicate isUserInput(DataFlow::Node node) {
node = flaskRequest().getMember(["args", "form", "json", "data"]).getACall()
}Follow the three-part pattern documented in the Security Query Guide:
- Customizations Module (
*Customizations.qll) - Define sources, sinks, sanitizers - Query Module (
*Query.qll) - Define flow configuration - Final Query (
*.ql) - Implement path-problem query
Path Traversal
predicate isPathTraversal(Call call) {
exists(string funcName | funcName in ["open", "file"] |
call.getFunc().(Name).getId() = funcName and
exists(DataFlow::Node source, DataFlow::Node sink |
isUserInput(source) and
sink.asExpr() = call.getAnArg() and
DataFlow::flow(source, sink)
)
)
}Command Injection
import semmle.python.security.dataflow.CommandInjection
from CommandInjection::Configuration config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "Command injection from $@.", source.getNode(), "user input"os.system(),subprocess.call()- Command executioneval(),exec()- Code evaluationpickle.loads()- Deserializationsqlite3.execute()- Database operations
- Flask:
@app.route,request.args,render_template - Django:
HttpRequest,render,Qobjects - FastAPI:
@app.get,Depends,Request
# NON_COMPLIANT: Vulnerable pattern
def vulnerable_function(user_input):
query = "SELECT * FROM users WHERE id = " + user_input # SQL injection
cursor.execute(query)
# COMPLIANT: Safe pattern
def safe_function(user_input):
query = "SELECT * FROM users WHERE id = ?"
cursor.execute(query, (user_input,)) # Parameterized querydef test_function():
source = get_user_input() # $ Source
sink(source) # $ Alert=source/**
* @name SQL injection in Python
* @description User input flows into SQL query without sanitization
* @kind path-problem
* @problem.severity error
* @security-severity 8.8
* @precision high
* @id py/sql-injection
* @tags security
* external/cwe/cwe-089
*/
import python
import semmle.python.security.dataflow.SqlInjection
import DataFlow::PathGraph
from SqlInjection::Configuration config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "SQL query built from $@.", source.getNode(), "user-provided value"When existing framework support is insufficient, create comprehensive models following the patterns in the Security Query Guide:
- Use
ApiGraphsfor API tracking - Extend
Conceptsclasses for security-relevant operations - Implement
InstanceTaintStepsHelperfor method chaining - Use
ModelsAsDatafor external API specifications
- Prefer
ApiGraphsover string-based matching - Use specific AST node types rather than generic
Expr - Implement efficient predicate logic with proper ordering
- qlt query generate new-query - Generate scaffolding for a new CodeQL query with packs and tests
- codeql query format
- codeql query compile
- codeql query run
- codeql test run