| description | Guide for implementing Python framework models and security queries in CodeQL |
|---|
This document provides specific, actionable instructions for implementing Python framework models and security queries in CodeQL using the ApiGraph library.
For Framework Modeling (.qll files):
// Standard framework modeling imports - USE THESE FOR ALL FRAMEWORK MODELS
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
private import semmle.python.frameworks.internal.InstanceTaintStepsHelper
private import semmle.python.frameworks.data.ModelsAsData
// Optional: Include if framework has existing PEP249 database compliance
// private import semmle.python.frameworks.PEP249For Security Query Implementation (.ql/.qll files):
// Standard security query imports - USE THESE FOR ALL SECURITY QUERIES
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.Concepts
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.dataflow.new.BarrierGuards
// Optional: Include path graph support for path-problem queries
// import DataFlow::PathGraphCritical patterns that LLMs must understand for proper implementation:
// STEP 1: Create base module reference - ALWAYS start here
API::Node frameworkRef() { result = API::moduleImport("framework_name") }
// STEP 2: Create submodule references if framework has submodules
API::Node submoduleRef() { result = frameworkRef().getMember("submodule_name") }
// STEP 3: Create class references for important security-relevant classes
API::Node classRef() { result = frameworkRef().getMember("ImportantClass") }
// STEP 4: Create function references for security-relevant functions
API::Node functionRef() { result = frameworkRef().getMember("important_function") }
// STEP 5: Handle aliased imports if framework commonly uses them
API::Node aliasedRef() {
result = frameworkRef() or
result = API::moduleImport("framework_alias")
}// BASIC NAVIGATION - Learn these patterns first
node.getMember("member_name") // Access any attribute/method/class
node.getACall() // Get all calls to this function/method
node.getReturn() // Get return value from function/method
node.getParameter(0) // Get 1st parameter (0-based indexing)
node.getParameter(n) // Get nth parameter where n is integer
node.getKeywordParameter("param_name") // Get named/keyword parameter
node.getSelfParameter() // Get "self" parameter (for methods)
// ADVANCED NAVIGATION - Use for complex frameworks
node.getASubclass() // Direct subclasses only
node.getASubclass*() // All subclasses (transitive closure)
node.getAnInstance() // Instances of this class
// DATA FLOW INTEGRATION - Connect to data flow analysis
node.asSource() // Create DataFlow::Node as source
node.asSink() // Create DataFlow::Node as sink
// ARGUMENT ACCESS PATTERNS - Critical for sink modeling
call.getArg(0) // First positional argument
call.getArg(n) // nth positional argument
call.getArgByName("param") // Named argument by parameter name
call.getAnArg() // Any argument (positional or named)Template A: Direct Function Call Sinks
// For functions that directly execute dangerous operations
// Pattern: framework.dangerous_function(user_input)
this = API::moduleImport("framework_name").getMember("execute_sql").getACall()
this = API::moduleImport("subprocess").getMember(["run", "call", "Popen"]).getACall()
this = API::moduleImport("os").getMember(["system", "popen"]).getACall()Template B: Method Call on Instance Sinks
// For methods called on framework instances
// Pattern: instance.method(user_input)
exists(DataFlow::AttrRead ar |
ar.getObject() = /* instance reference */ and
ar.getAttributeName() in ["execute", "query", "run"] and
this = ar.getACall()
)Template C: Constructor with Dangerous Parameters
// For class constructors that create dangerous objects
// Pattern: FrameworkClass(dangerous_param=user_input)
this = API::moduleImport("framework").getMember("DangerousClass").getACall()Template D: Chained API Calls
// For frameworks with fluent/builder APIs
// Pattern: framework.builder().set_param(user_input).execute()
this = API::moduleImport("framework")
.getMember("QueryBuilder")
.getACall() // get builder instance
.getMember("where") // call where method
.getACall() // get result of where call/**
* Provides classes modeling security-relevant aspects of the `FRAMEWORK_NAME` PyPI package.
* See FRAMEWORK_DOCUMENTATION_URL
*/
private import python
private import semmle.python.dataflow.new.DataFlow
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
private import semmle.python.frameworks.internal.InstanceTaintStepsHelper
private import semmle.python.frameworks.data.ModelsAsData
/**
* Provides models for the `FRAMEWORK_NAME` PyPI package.
* See FRAMEWORK_DOCUMENTATION_URL
*/
module FRAMEWORK_MODULE_NAME {
// Replace with actual implementation based on framework analysis
}LLM Implementation Guidance:
- Replace
FRAMEWORK_NAMEwith actual PyPI package name (e.g., "flask", "django") - Replace
FRAMEWORK_DOCUMENTATION_URLwith actual framework docs URL - Replace
FRAMEWORK_MODULE_NAMEwith PascalCase module name (e.g., Flask, Django) - Keep ALL imports exactly as shown - they provide essential functionality
- Use
private importfor all imports in framework files
Critical: Always start with these fundamental access patterns
/** Gets a reference to the core module */
API::Node moduleRef() { result = API::moduleImport("framework_name") }
/** Gets a reference to important classes */
API::Node classRef(string className) {
result = moduleRef().getMember(className)
}
/** Gets a reference to important functions */
API::Node functionRef(string functionName) {
result = moduleRef().getMember(functionName)
}For classes that need instance tracking, use this exact pattern:
/**
* A source of instances of `framework_name.ClassName`.
* Include direct instantiation, factory functions, and callback parameters.
*/
abstract class ClassInstanceSource extends DataFlow::LocalSourceNode { }
/** Direct class instantiation */
private class DirectInstantiation extends ClassInstanceSource, DataFlow::CallCfgNode {
DirectInstantiation() { this = classRef("ClassName").getACall() }
}
/** Factory function that returns an instance */
private class FactoryInstantiation extends ClassInstanceSource, DataFlow::CallCfgNode {
FactoryInstantiation() { this = functionRef("create_instance").getACall() }
}
/** Type tracking for instances */
private DataFlow::TypeTrackingNode instanceTracking(DataFlow::TypeTracker t) {
t.start() and result instanceof ClassInstanceSource
or
exists(DataFlow::TypeTracker t2 | result = instanceTracking(t2).track(t2, t))
}
/** Gets a reference to a tracked instance */
DataFlow::Node instance() { instanceTracking(DataFlow::TypeTracker::end()).flowsTo(result) }Use InstanceTaintStepsHelper for automatic taint propagation:
/**
* Taint propagation for `framework_name.ClassName`.
* This automatically handles method calls and attribute access on instances.
*/
private class InstanceTaintSteps extends InstanceTaintStepsHelper {
InstanceTaintSteps() { this = "framework_name.ClassName" }
override DataFlow::Node getInstance() { result = instance() }
override string getAttributeName() {
// List attributes that should propagate taint
result in ["data", "content", "body", "text", "value"]
}
override string getMethodName() {
// List methods that should propagate taint from self to return value
result in ["get_data", "read", "decode", "format"]
}
override string getAsyncMethodName() {
// List async methods that should propagate taint
result in ["aread", "aformat"]
}
}Extend existing security concepts rather than creating new ones:
/** SQL execution using this framework */
private class FrameworkSqlExecution extends SqlExecution::Range, DataFlow::CallCfgNode {
FrameworkSqlExecution() {
// Pattern: instance.method_name(sql_query, ...)
this = any(DataFlow::AttrRead ar |
ar.getObject() = instance() and ar.getAttributeName() = "execute"
).getACall()
}
override DataFlow::Node getSql() {
// SQL is typically the first argument
result in [this.getArg(0), this.getArgByName("query"), this.getArgByName("sql")]
}
}
/** HTTP client request using this framework */
private class FrameworkHttpRequest extends Http::Client::Request::Range, DataFlow::CallCfgNode {
FrameworkHttpRequest() {
this = functionRef(["get", "post", "put", "delete", "request"]).getACall()
}
override DataFlow::Node getUrl() { result = this.getArg(0) }
override string getFramework() { result = "framework_name" }
override DataFlow::Node getResponseBody() { result = this.getReturn() }
}
/** Remote flow source from framework request objects */
private class FrameworkRemoteFlowSource extends RemoteFlowSource::Range {
FrameworkRemoteFlowSource() {
// Pattern: request.attribute_name
exists(DataFlow::AttrRead ar |
ar.getObject().asExpr().(Name).getId() = "request" and
ar.getAttributeName() in ["data", "json", "form", "args", "values"] and
this = ar
)
}
override string getSourceType() { result = "framework_name request data" }
}Account for different ways frameworks can be imported:
/** Gets references accounting for various import patterns */
API::Node frameworkRef() {
// Direct import: import framework_name
result = API::moduleImport("framework_name")
or
// Submodule import: from framework_name import submodule
result = API::moduleImport("framework_name").getMember("submodule")
or
// Alias import: import framework_name as fn
result = API::moduleImport("framework_name").getMember("submodule").getMember("ClassName")
}ALL security queries follow this exact pattern:
- Customizations Module (
*Customizations.qll) - Defines Sources, Sinks, Sanitizers - Query Module (
*Query.qll) - Defines flow configuration - Final Query (
*.ql) - Implements path-problem query
/**
* Provides sources, sinks and sanitizers for detecting [VULNERABILITY_NAME] vulnerabilities.
*/
module VulnerabilityName {
/** A data flow source for [VULNERABILITY_NAME] vulnerabilities */
abstract class Source extends DataFlow::Node { }
/** A data flow sink for [VULNERABILITY_NAME] vulnerabilities */
abstract class Sink extends DataFlow::Node { }
/** A sanitizer for [VULNERABILITY_NAME] vulnerabilities */
abstract class Sanitizer extends DataFlow::Node { }
// Use existing threat-model sources (PREFERRED)
private class ActiveThreatModelSourceAsSource extends Source, ActiveThreatModelSource { }
// OR define specific sources
private class SpecificSourceAsSource extends Source {
SpecificSourceAsSource() {
// Define specific source patterns
this = API::moduleImport("framework").getMember("get_user_input").getACall()
}
}
// Define sinks based on security concepts
private class ConceptSinkAsSink extends Sink {
ConceptSinkAsSink() {
this = any(SqlExecution e).getSql() // SQL injection
// OR this = any(SystemCommandExecution e).getCommand() // Command injection
// OR this = any(Http::Client::Request e).getUrl() // SSRF
}
}
// Define sanitizers (validation, escaping, etc.)
private class ValidationSanitizer extends Sanitizer {
ValidationSanitizer() {
// Pattern: validation function calls
exists(DataFlow::CallCfgNode call |
call.getFunction().asExpr().(Name).getId() = "validate_input" and
this = call
)
}
}
}/**
* Provides a taint-tracking configuration for detecting [VULNERABILITY_NAME] vulnerabilities.
*/
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import VulnerabilityNameCustomizations::VulnerabilityName
module VulnerabilityNameConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) { source instanceof Source }
predicate isSink(DataFlow::Node sink) { sink instanceof Sink }
predicate isBarrier(DataFlow::Node node) { node instanceof Sanitizer }
// Add additional flow steps if needed
predicate isAdditionalFlowStep(DataFlow::Node node1, DataFlow::Node node2) {
// Custom flow steps for framework-specific patterns
none()
}
}
module VulnerabilityNameFlow = TaintTracking::Global<VulnerabilityNameConfig>;/**
* @name [Human-readable vulnerability name]
* @description [Description of the vulnerability and its impact]
* @kind path-problem
* @problem.severity [error|warning|recommendation]
* @security-severity [0.0-10.0]
* @precision [low|medium|high|very-high]
* @id [language]/[cwe-category]/[specific-id]
* @tags security
* external/cwe/cwe-[number]
*/
import python
import VulnerabilityNameQuery
import DataFlow::PathGraph
from VulnerabilityNameFlow::PathNode source, VulnerabilityNameFlow::PathNode sink
where VulnerabilityNameFlow::flowPath(source, sink)
select sink.getNode(), source, sink, "[Alert message with $@ placeholder]",
source.getNode(), "[source description]"All tests should be stored in the languages/python/<framework-name>/test/<QueryBasename>/ directory.
Each test should be stored in its own folder, named after the framework or module being tested.
Tests should contain a ${frameworkName}.ql file that contains the test cases for the framework or module.
Test file should be written in the language it is testing (e.g., Python, JavaScript, etc.).
The test file should import the necessary classes and predicates from the framework or module being tested. Use inline queries to test the functionality of the framework or module being tested.
Running the Tests:
codeql test run --show-extractor-output -- languages/python/<framework-name>/test/Once run check the output of the command to ensure that all tests have passed. If the test has failed, check the test file and the implementation of the class to ensure that the test is correct. Iterate on the implementation of the class and the test until the test passes.
Reference resources/cli/codeql/codeql_test_run.prompt.md for more details on how to use the codeql test run command.
Inline tests are used to test specific functionality of the library.
Inline tests should be stored in the Inline${TestName}.ql file.
For testing AST, CFG, or DataFlow the query should tests the functionality being implemented.
For queries, the inline test should be a query that tests sources, sinks, and sanitizers are inplace.
There should only be one inline test per file.
Multiple tests can be true on the same line, each test is seperated by a space.
Example: `// $ Source1 Source2=source
The inline test hasActualResult predicate has the following parameters:
Location location: The location of the element being tested.string element: The name of the element being tested.string tag: The tag used to identify the test (e.g.,${Test1},${Test2}, etc.).string value: The expected value of the element being tested.- Value is set to the expected value of the element being tested and requires the inline test to check the value.
- Example:
// ${Test1}=${ExpectedValue}
Example template:
import utils.InlineExpectationsTest
module InlineTest implements TestSig {
string getARelevantTag() { result = ["${test1}", "${test2}"] }
predicate hasActualResult(Location location, string element, string tag, string value) {
tag = "${Test1}" and
exists(Variable var |
element = var.getName() and
value = typedecl.toString() and
location = typedecl.getLocation()
)
or
tag = "${Test2}" and
exists(Variable var |
element = var.getName() and
value = typedecl.toString() and
location = typedecl.getLocation()
)
// Add more tests as needed
}
}
import MakeTest<InlineTest>Check other inline tests in the languages/python/*/test/ directories for examples of how to implement inline tests.
The inline test .expected file should be empty which means that the inline test is expected to pass.
If a test is expected to fail, the .expected file should contain the expected output of the test.
- SqlExecution - SQL query execution
- SystemCommandExecution - OS command execution
- Http::Client::Request - Outgoing HTTP requests
- Http::Server::RequestInputAccess - Incoming request data
- FileSystemWrite - File write operations
- FileSystemRead - File read operations
- Decoding - Data decoding operations
- RemoteFlowSource - Remote user input
SQL Execution Pattern:
private class FrameworkSqlExecution extends SqlExecution::Range, DataFlow::CallCfgNode {
FrameworkSqlExecution() {
this = API::moduleImport("orm_framework").getMember(["execute", "raw", "query"]).getACall()
}
override DataFlow::Node getSql() { result = this.getArg(0) }
}Command Execution Pattern:
private class FrameworkCommandExecution extends SystemCommandExecution::Range, DataFlow::CallCfgNode {
FrameworkCommandExecution() {
this = API::moduleImport("framework").getMember("run_command").getACall()
}
override DataFlow::Node getCommand() { result = this.getArg(0) }
}HTTP Client Pattern:
private class FrameworkHttpRequest extends Http::Client::Request::Range, DataFlow::CallCfgNode {
FrameworkHttpRequest() {
this = API::moduleImport("http_client").getMember(["get", "post"]).getACall()
}
override DataFlow::Node getUrl() { result = this.getArg(0) }
override string getFramework() { result = "framework_name" }
override DataFlow::Node getResponseBody() { result = this.getReturn() }
}File System Access Pattern:
private class FrameworkFileWrite extends FileSystemWrite::Range, DataFlow::CallCfgNode {
FrameworkFileWrite() {
this = API::moduleImport("framework").getMember("write_file").getACall()
}
override DataFlow::Node getAPathArg() { result = this.getArg(0) }
override DataFlow::Node getADataArg() { result = this.getArg(1) }
}| Vulnerability | Primary Sinks | Secondary Sinks |
|---|---|---|
| SQL Injection | SqlExecution::getSql() |
SqlConstruction::getSql() |
| Command Injection | SystemCommandExecution::getCommand() |
- |
| SSRF | Http::Client::Request::getUrl() |
Http::Client::Request::getHost() |
| Path Traversal | FileSystemAccess::getAPathArg() |
- |
| XSS | Http::Server::HttpResponse::getBody() |
Template rendering |
| LDAP Injection | LdapBind, LdapSearch |
- |
| XXE | XmlParsing::getSourceArg() |
- |
Framework files must be placed in:
python/ql/lib/semmle/python/frameworks/FrameworkName.qll
Query files must be placed in appropriate directories:
- Source queries:
python/ql/src/Security/CWE/CWE-XXX/VulnerabilityName.ql - Library queries:
python/ql/lib/semmle/python/security/dataflow/VulnerabilityNameQuery.qll - Customizations:
python/ql/lib/semmle/python/security/dataflow/VulnerabilityNameCustomizations.qll
Every class and predicate MUST have documentation:
/**
* A SQL execution using the ExampleORM framework.
*
* Example usage:
* ```python
* from example_orm import Database
* db = Database()
* db.execute("SELECT * FROM users WHERE id = ?", user_id) // Sink: first argument
* ```
*/
private class ExampleOrmSqlExecution extends SqlExecution::Range, DataFlow::CallCfgNode {
// Implementation
}Create test files in: languages/python/<framework-name>/test/
Test structure:
framework_name/
├── FrameworkName.ql // Test query
├── test_data.py // Python code to analyze
└── FrameworkName.expected // Expected results
Test query template:
import python
import semmle.python.frameworks.FrameworkName
from DataFlow::Node source
where source instanceof FrameworkName::SomeClass
select source, "Found framework usage"- Don't hardcode module paths - Use API::moduleImport consistently
- Always handle multiple import patterns - Check
from x import yandimport x.y - Use existing concepts - Don't create new security concepts unnecessarily
- Test thoroughly - Ensure models detect real-world usage patterns
- Document with examples - Include Python code examples in comments
- Use
privatevisibility for internal classes - Prefer specific API patterns over broad catches
- Chain navigation predicates efficiently
- Use
exists()clauses to limit scope when needed
Before submitting framework models, verify:
- All imports are correct and minimal
- Module follows established naming patterns
- Documentation includes Python usage examples
- Test cases cover primary API patterns
- Security concepts are properly extended
- Type tracking works for relevant classes
- Multiple import patterns are handled