Hello, Habr!
I think many people know about such a tool as Burp Suite from PortSwigger. Burp Suite is a popular web application security audit platform. In addition to the fact that Burp already contains a ton of useful features, it also allows users to create their own extensions that can incredibly increase the built-in functionality of the application.
, Python , , , Burp Java, , , Java. , , Bug Bounty. Burp Suite Python, CORS misconfiguration.
, Burp Java, Python Jython Standalone Edition, PortSwigger. Burp Extender - Options - Python environment Jython .
Burp , burp-exceptions. , Java:
Java
java.lang.RuntimeException: org.python.core.PyException
at burp.fl.a(Unknown Source)
at burp.edd.a(Unknown Source)
at burp.e2g.a(Unknown Source)
at burp.e2g.g(Unknown Source)
at burp.i1c.stateChanged(Unknown Source)
at javax.swing.JTabbedPane.fireStateChanged(JTabbedPane.java:416)
at javax.swing.JTabbedPane$ModelListener.stateChanged(JTabbedPane.java:270)
...
Python, Java, . Java Python :
Python
*** PYTHON EXCEPTION
Traceback (most recent call last):
File "/Users/mb/Desktop/burp extension/exceptions_fix.py", line 8, in decorated_function
return original_function(*args, **kwargs)
File "/Users/mb/Desktop/burp extension/CustomEditorTab.py", line 78, in setMessage
self._txtInput.setEsditable(self._editable)
AttributeError: 'burp.ul' object has no attribute 'setEsditable'
, .
:
Burp, Extender - Options - Python environment. , , Folder for loading modules.
-
,
, CORS . .
Python . , , cors-scanner.py.
try:
from exceptions_fix import FixBurpExceptions
except ImportError:
pass
burp
,
from burp import IBurpExtender, IScannerCheck, IScanIssue
from java.io import PrintWriter
import sys
:
try:
FixBurpExceptions()
except:
pass
,
BurpExtender
- . , activeScan++, Python. , .
:
class BurpExtender(IBurpExtender, IScannerCheck):
IBurpExtender - , Burp.
IScannerCheck - , / , .
def registerExtenderCallbacks(self, callbacks):
sys.stdout = PrintWriter(callbacks.getStdout(), True)
self._callbacks = callbacks
self._helpers = callbacks.getHelpers()
callbacks.setExtensionName('CORS Passive Scanner')
callbacks.registerScannerCheck(self)
, .
callbacks.registerScannerCheck(self)
-, registerScannerCheck(..)
. - self
.
,
def doPassiveScan(self, baseRequestResponse):
baseRequestResponse - , , .
: CORS misconfiguration? - . :
Access-Control-Allow-Origin
Access-Control-Allow-Credentials
. , .
_helpers
, , analyzeResponse(..), . analyzeResponse(..)
( ). baseRequestResponse
( , , ) getResponse()
. , analyzeResponse(..)
. "" - IResponseInfo. , , - . getHeaders()
. , Java .
def doPassiveScan(self, baseRequestResponse):
response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders())
Access-Control-Allow-Origin
Access-Control-Allow-Credentials
.
def doPassiveScan(self, baseRequestResponse):
response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders())
for response_header in response_headers:
if 'Access-Control-Allow-Origin' in response_header or 'Access-Control-Allow-Credentials' in response_header:
sys.stdout.println(response_headers)
:
[u'Access-Control-Allow-Credentials: true', u'cache-control: private, s-maxage=0, no-store, no-cache']
#
, , URL.
request_url = self._helpers.analyzeRequest(baseRequestResponse).getUrl() request_headers = self._helpers.analyzeRequest(baseRequestResponse).getHeaders()
, http, ( , . , , , , , , , , , ). issues
, , .
:
def doPassiveScan(self, baseRequestResponse):
response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders())
for response_header in response_headers:
if 'Access-Control-Allow-Origin' in response_header or 'Access-Control-Allow-Credentials' in response_header:
request_url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
request_headers = self._helpers.analyzeRequest(baseRequestResponse).getHeaders()
issues = []
payloads.
Origin
- . , Python - CORScanner, . ( misconfiguration, ), - .
- URL.
def _generate_payloads(self, url):
host = url.getHost()
protocol = url.getProtocol()
payloads = {}
URL - . . ( ), . , . , . :
{'trust_any_origin': {'payload_url': 'XXX', 'description': 'YYY', 'severity': 'ZZZ'}}
severity, Burp. , , URL , , . , ,
def _generate_payloads(self, url):
host = url.getHost()
protocol = url.getProtocol()
payloads = {}
# trust any origin
payload_url = '{}://evil.com'.format(protocol)
payloads['trust_any_origin'] = {'origin': payload_url, 'description': 'Site trust any origin', 'severity': 'High'}
# trust any subdomain
payload_url = '{}://evil.{}'.format(protocol, host)
payloads['trust_any_subdomain'] = {'origin': payload_url, 'description': 'Site trust any subdomain', 'severity': 'High'}
# trust insecure protocol
if protocol == 'https':
payload_url = 'http://evil.{}'.format(host)
payloads['trust_http'] = {'origin': payload_url, 'description': 'Site trust insecure protocol', 'severity': 'Medium'}
# trust null
payload_url = 'null'
payloads['trust_null'] = {'origin': payload_url, 'description': 'Site trust null origin', 'severity': 'High'}
# prefix match full url
payload_url = '{}://{}.evil.com'.format(protocol, host)
payloads['trust_prefix'] = {'origin': payload_url, 'description': 'Site trust prefix', 'severity': 'High'}
# trust invalid dot escape
splitted_host = host.split('.')
payload_host = '{}A{}.{}'.format('.'.join(splitted_host[:-1]), splitted_host[-1], splitted_host[-1])
payload_url = '{}://{}'.format(protocol, payload_host)
payloads['trust_invalid_regex'] = {'origin': payload_url, 'description': 'Site trust origin with unescaped dot', 'severity': 'High'}
return payloads
{'severity': 'Medium'}
http . , , , , Google - . HackerOne (#629892). , - N/A. , Medium, , , .
, doPassiveScan
.
def _add_origin(self, headers, value):
headers = list(headers)
headers.append('Origin: {}'.format(value))
return headers
, , Origin .
, payload_headers
, , _add_origin
.
def doPassiveScan(self, baseRequestResponse):
response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders())
for response_header in response_headers:
if 'Access-Control-Allow-Origin' in response_header or 'Access-Control-Allow-Credentials' in response_header:
request_url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
request_headers = self._helpers.analyzeRequest(baseRequestResponse).getHeaders()
issues = []
payloads = self._generate_payloads(request_url)
for payload in payloads.values():
payload_headers = self._add_origin(request_headers, payload['origin'])
, , . offset , , .
body_offset = self._helpers.analyzeRequest(baseRequestResponse).getBodyOffset() request_body = baseRequestResponse.getRequest()[body_offset:]
, , . - None
if len(request_body) == 0:
request = self._helpers.buildHttpMessage(payload_headers, None)
else:
request = self._helpers.buildHttpMessage(payload_headers, request_body)
, POST , . . . ,
response = self._callbacks.makeHttpRequest(baseRequestResponse.getHttpService(), request) response_headers = list(self._helpers.analyzeResponse(response.getResponse()).getHeaders())
? - Access-Control-Allow-Origin
, , Origin .
for response_header in response_headers:
if 'Access-Control-Allow-Origin' in response_header:
- . Burp, . .
, :
cors-scanner.py
def doPassiveScan(self, baseRequestResponse):
response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders())
for response_header in response_headers:
if 'Access-Control-Allow-Origin' in response_header or 'Access-Control-Allow-Credentials' in response_header:
request_url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
request_headers = self._helpers.analyzeRequest(baseRequestResponse).getHeaders()
issues = []
payloads = self._generate_payloads(request_url)
for payload in payloads.values():
payload_headers = self._add_origin(request_headers, payload['origin'])
body_offset = self._helpers.analyzeRequest(baseRequestResponse).getBodyOffset()
request_body = baseRequestResponse.getRequest()[body_offset:]
if len(request_body) == 0:
request = self._helpers.buildHttpMessage(payload_headers, None)
else:
request = self._helpers.buildHttpMessage(payload_headers, request_body)
response = self._callbacks.makeHttpRequest(baseRequestResponse.getHttpService(), request)
response_headers = list(self._helpers.analyzeResponse(response.getResponse()).getHeaders())
for response_header in response_headers:
if 'Access-Control-Allow-Origin' in response_header:
return issues
, . !
class CustomScanIssue(IScanIssue):
__init__
def __init__(self, httpService, url, httpMessages, name, detail, severity):
self._httpService = httpService
self._url = url
self._httpMessages = httpMessages
self._name = name
self._detail = detail
self._severity = severity
self._confidence = 'Certain'
return
. URL, , , severity, confidence . , Dashboard URL, . , Burp . :
class CustomScanIssue(IScanIssue):
def __init__(self, httpService, url, httpMessages, name, detail, severity):
self._httpService = httpService
self._url = url
self._httpMessages = httpMessages
self._name = name
self._detail = detail
self._severity = severity
self._confidence = 'Certain'
def getUrl(self):
return self._url
def getIssueName(self):
return self._name
def getIssueType(self):
return 0
def getSeverity(self):
return self._severity
def getConfidence(self):
return self._confidence
def getIssueBackground(self):
return None
def getRemediationBackground(self):
return None
def getIssueDetail(self):
return self._detail
def getRemediationDetail(self):
return None
def getHttpMessages(self):
return self._httpMessages
def getHttpService(self):
return self._httpService
.
, Access-Control-Allow-Origin
, :
for response_header in response_headers:
if 'Access-Control-Allow-Origin' in response_header:
issues.append(
CustomScanIssue(
baseRequestResponse.getHttpService(),
request_url,
[response],
'CORS Misconfiguration',
payload['description'],
payload['severity']
)
)
break
:
HTTP
URL,
response
,makeHttpRequest
, , UI,
,
Severity,
, , , .
, Access-Control-Allow-Origin: *
if response_header == 'Access-Control-Allow-Origin: *':
return CustomScanIssue(
baseRequestResponse.getHttpService(),
request_url,
[baseRequestResponse],
'CORS Misconfiguration',
'Site trust *',
'Medium'
)
, URL, Allow-Origin
wildcard.
consolidateDuplicateIssues
. , URL.
def consolidateDuplicateIssues(self, existingIssue, newIssue):
if existingIssue.getIssueDetail() == newIssue.getIssueDetail():
return -1
return 0
, . URL - .
:
cors-scanner.py
from burp import IBurpExtender, IScannerCheck, IScanIssue
from java.io import PrintWriter
import sys
try:
from exceptions_fix import FixBurpExceptions
except ImportError:
pass
class BurpExtender(IBurpExtender, IScannerCheck):
def registerExtenderCallbacks(self, callbacks):
sys.stdout = PrintWriter(callbacks.getStdout(), True)
self._callbacks = callbacks
self._helpers = callbacks.getHelpers()
callbacks.setExtensionName('CORS Passive Scanner')
callbacks.registerScannerCheck(self)
def _add_origin(self, headers, value):
headers = list(headers)
headers.append('Origin: {}'.format(value))
return headers
def _generate_payloads(self, url):
host = url.getHost()
protocol = url.getProtocol()
payloads = {}
# trust any origin
payload_url = '{}://evil.com'.format(protocol)
payloads['trust_any_origin'] = {'origin': payload_url, 'description': 'Site trust any origin', 'severity': 'High'}
# trust any subdomain
payload_url = '{}://evil.{}'.format(protocol, host)
payloads['trust_any_subdomain'] = {'origin': payload_url, 'description': 'Site trust any subdomain', 'severity': 'High'}
# trust insecure protocol
if protocol == 'https':
payload_url = 'http://evil.{}'.format(host)
payloads['trust_http'] = {'origin': payload_url, 'description': 'Site trust insecure protocol', 'severity': 'Medium'}
# trust null
payload_url = 'null'
payloads['trust_null'] = {'origin': payload_url, 'description': 'Site trust null origin', 'severity': 'High'}
# prefix match full url
payload_url = '{}://{}.evil.com'.format(protocol, host)
payloads['trust_prefix'] = {'origin': payload_url, 'description': 'Site trust prefix', 'severity': 'High'}
# trust invalid regex dot escape
splitted_host = host.split('.')
payload_host = '{}A{}.{}'.format('.'.join(splitted_host[:-1]), splitted_host[-1], splitted_host[-1])
payload_url = '{}://{}'.format(protocol, payload_host)
payloads['trust_invalid_regex'] = {'origin': payload_url, 'description': 'Site trust origin with unescaped dot', 'severity': 'High'}
return payloads
def doPassiveScan(self, baseRequestResponse):
response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders())
for response_header in response_headers:
if 'Access-Control-Allow-Origin' in response_header or 'Access-Control-Allow-Credentials' in response_header:
request_url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
request_headers = self._helpers.analyzeRequest(baseRequestResponse).getHeaders()
if response_header == 'Access-Control-Allow-Origin: *':
return CustomScanIssue(
baseRequestResponse.getHttpService(),
request_url,
[baseRequestResponse],
'CORS Misconfiguration',
'Site trust any origin',
'Medium'
)
issues = []
payloads = self._generate_payloads(request_url)
for payload in payloads.values():
payload_headers = self._add_origin(request_headers, payload['origin'])
body_offset = self._helpers.analyzeRequest(baseRequestResponse).getBodyOffset()
request_body = baseRequestResponse.getRequest()[body_offset:]
if len(request_body) == 0:
request = self._helpers.buildHttpMessage(payload_headers, None)
else:
request = self._helpers.buildHttpMessage(payload_headers, request_body)
response = self._callbacks.makeHttpRequest(baseRequestResponse.getHttpService(), request)
response_headers = list(self._helpers.analyzeResponse(response.getResponse()).getHeaders())
for response_header in response_headers:
if 'Access-Control-Allow-Origin' in response_header:
issues.append(
CustomScanIssue(
baseRequestResponse.getHttpService(),
request_url,
[response],
'CORS Misconfiguration',
payload['description'],
payload['severity']
)
)
break
return issues
def consolidateDuplicateIssues(self, existingIssue, newIssue):
if existingIssue.getIssueDetail() == newIssue.getIssueDetail():
return -1
return 0
class CustomScanIssue(IScanIssue):
def __init__(self, httpService, url, httpMessages, name, detail, severity):
self._httpService = httpService
self._url = url
self._httpMessages = httpMessages
self._name = name
self._detail = detail
self._severity = severity
self._confidence = 'Certain'
def getUrl(self):
return self._url
def getIssueName(self):
return self._name
def getIssueType(self):
return 0
def getSeverity(self):
return self._severity
def getConfidence(self):
return self._confidence
def getIssueBackground(self):
return None
def getRemediationBackground(self):
return None
def getIssueDetail(self):
return self._detail
def getRemediationDetail(self):
return None
def getHttpMessages(self):
return self._httpMessages
def getHttpService(self):
return self._httpService
try:
FixBurpExceptions()
except:
pass
, .
Burp
- Extender, Extensions -> Add.
Extension type - , Python
Extension file -
Next, . , .
HackerOne , CORS misconfiguration, Origin, .
- PortSwigger.
- Origin Reflect. , . , wiener:peter
. , , . , 6 High .
- , Origin: null
, wiener:peter
. ,
(, ), , , , , . , XSS , subdomain-takeover. URL, :
Burp Suite Python. , . , . Java Python. , - CORS misconfiguration. - , XSS, . , , . CORS , Information Disclosure OTA.
A scanner for this kind of vulnerabilities for Burp is undoubtedly a good thing, especially since extensions for CORS are not available in the store for some reason (or I did not notice).
Burp is a great tool for hobbies and money making. Baghunters know how tough the competition can be, and automation of everything that is possible plays a huge role in this. I found errors in well-known companies only because I automate all the processes, often looking at the results of automatic scans and studying by hand.
Readers - thanks for reading, baghunters - good luck!
Never stop learning and learning new tools.