Writing a Burp Suite Extension Using Python

Hello, Habr!





I think many people know about such a tool as Burp Suite from PortSwigger. Burp Suite is a popular web application security audit platform. In addition to the fact that Burp already contains a ton of useful features, it also allows users to create their own extensions that can incredibly increase the built-in functionality of the application.





, Python , , , Burp Java, , , Java. , , Bug Bounty. Burp Suite Python, CORS misconfiguration.





, Burp Java, Python Jython Standalone Edition, PortSwigger. Burp Extender - Options - Python environment Jython .





Setting up the Python Environment
Python Environment

Burp , burp-exceptions. , Java:





Java
java.lang.RuntimeException: org.python.core.PyException
  at burp.fl.a(Unknown Source)
  at burp.edd.a(Unknown Source)
  at burp.e2g.a(Unknown Source)
  at burp.e2g.g(Unknown Source)
  at burp.i1c.stateChanged(Unknown Source)
  at javax.swing.JTabbedPane.fireStateChanged(JTabbedPane.java:416)
  at javax.swing.JTabbedPane$ModelListener.stateChanged(JTabbedPane.java:270)
  ...
      
      



Python, Java, . Java Python :





Python
*** PYTHON EXCEPTION
Traceback (most recent call last):
  File "/Users/mb/Desktop/burp extension/exceptions_fix.py", line 8, in decorated_function
    return original_function(*args, **kwargs)
  File "/Users/mb/Desktop/burp extension/CustomEditorTab.py", line 78, in setMessage
    self._txtInput.setEsditable(self._editable)
AttributeError: 'burp.ul' object has no attribute 'setEsditable'
      
      



, .





:





  1. Burp, Extender - Options - Python environment. , , Folder for loading modules.





  2. exceptions_fix.py





  3. ,





, CORS . .





Python . , , cors-scanner.py.









try:
  from exceptions_fix import FixBurpExceptions
except ImportError:
  pass
      
      



burp



,





from burp import IBurpExtender, IScannerCheck, IScanIssue
      
      







from java.io import PrintWriter
import sys
      
      



:





try:
    FixBurpExceptions()
except:
    pass
      
      



,





BurpExtender

- . , activeScan++, Python. , .





:





class BurpExtender(IBurpExtender, IScannerCheck):
      
      



IBurpExtender - , Burp.

IScannerCheck - , / , .









def registerExtenderCallbacks(self, callbacks):

  sys.stdout = PrintWriter(callbacks.getStdout(), True)

  self._callbacks = callbacks
  self._helpers = callbacks.getHelpers()

  callbacks.setExtensionName('CORS Passive Scanner')

  callbacks.registerScannerCheck(self)
      
      



, .





callbacks.registerScannerCheck(self)
      
      



-, registerScannerCheck(..)



. - self



.





,





def doPassiveScan(self, baseRequestResponse):
      
      



baseRequestResponse - , , .





: CORS misconfiguration? - . :





  1. Access-Control-Allow-Origin





  2. Access-Control-Allow-Credentials





. , .





_helpers



, , analyzeResponse(..), . analyzeResponse(..)



( ). baseRequestResponse



( , , ) getResponse()



. , analyzeResponse(..)



. "" - IResponseInfo. , , - . getHeaders()



. , Java .





def doPassiveScan(self, baseRequestResponse):

  response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders()) 

      
      



Access-Control-Allow-Origin



Access-Control-Allow-Credentials



.





def doPassiveScan(self, baseRequestResponse):
        
  response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders()) 

  for response_header in response_headers:
    if 'Access-Control-Allow-Origin' in response_header or 'Access-Control-Allow-Credentials' in response_header:
      
      







sys.stdout.println(response_headers)
      
      



:





[u'Access-Control-Allow-Credentials: true', u'cache-control: private, s-maxage=0, no-store, no-cache']
#   
      
      



, , URL.





request_url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
request_headers = self._helpers.analyzeRequest(baseRequestResponse).getHeaders()
      
      



, http, ( , . , , , , , , , , , ). issues



, , .





:





def doPassiveScan(self, baseRequestResponse):
        
  response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders()) 

  for response_header in response_headers:
    if 'Access-Control-Allow-Origin' in response_header or 'Access-Control-Allow-Credentials' in response_header:

      request_url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
      request_headers = self._helpers.analyzeRequest(baseRequestResponse).getHeaders()

      issues = []
      
      



payloads.





Origin

- . , Python - CORScanner, . ( misconfiguration, ), - .





- URL.





def _generate_payloads(self, url):

  host = url.getHost()
  protocol = url.getProtocol()

  payloads = {}
      
      



URL - . . ( ), . , . , . :





{'trust_any_origin': {'payload_url': 'XXX', 'description': 'YYY', 'severity': 'ZZZ'}}
      
      



severity, Burp. , , URL , , . , ,





def _generate_payloads(self, url):

  host = url.getHost()
  protocol = url.getProtocol()

  payloads = {}

  # trust any origin
  payload_url = '{}://evil.com'.format(protocol)
  payloads['trust_any_origin'] = {'origin': payload_url, 'description': 'Site trust any origin', 'severity': 'High'}

  # trust any subdomain
  payload_url = '{}://evil.{}'.format(protocol, host)
  payloads['trust_any_subdomain'] = {'origin': payload_url, 'description': 'Site trust any subdomain', 'severity': 'High'}

  # trust insecure protocol
  if protocol == 'https':
    payload_url = 'http://evil.{}'.format(host)
    payloads['trust_http'] = {'origin': payload_url, 'description': 'Site trust insecure protocol', 'severity': 'Medium'}

  # trust null
  payload_url = 'null'
  payloads['trust_null'] = {'origin': payload_url, 'description': 'Site trust null origin', 'severity': 'High'}

  # prefix match full url
  payload_url = '{}://{}.evil.com'.format(protocol, host)
  payloads['trust_prefix'] = {'origin': payload_url, 'description': 'Site trust prefix', 'severity': 'High'}

  # trust invalid dot escape
  splitted_host = host.split('.')
  payload_host = '{}A{}.{}'.format('.'.join(splitted_host[:-1]), splitted_host[-1], splitted_host[-1])
  payload_url = '{}://{}'.format(protocol, payload_host)
  payloads['trust_invalid_regex'] = {'origin': payload_url, 'description': 'Site trust origin with unescaped dot', 'severity': 'High'}

  return payloads
      
      



{'severity': 'Medium'}



http . , , , , Google - . HackerOne (#629892). , - N/A. , Medium, , , .





, doPassiveScan



.





def _add_origin(self, headers, value):
  headers = list(headers)
  headers.append('Origin: {}'.format(value))
  return headers
      
      



, , Origin .





, payload_headers



, , _add_origin



.





def doPassiveScan(self, baseRequestResponse):

  response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders()) 

  for response_header in response_headers:
    if 'Access-Control-Allow-Origin' in response_header or 'Access-Control-Allow-Credentials' in response_header:

      request_url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
      request_headers = self._helpers.analyzeRequest(baseRequestResponse).getHeaders()

      issues = []
      payloads = self._generate_payloads(request_url)

      for payload in payloads.values():
        payload_headers = self._add_origin(request_headers, payload['origin'])
      
      



, , . offset , , .





body_offset = self._helpers.analyzeRequest(baseRequestResponse).getBodyOffset()
request_body = baseRequestResponse.getRequest()[body_offset:]
      
      



, , . - None





if len(request_body) == 0:
    request = self._helpers.buildHttpMessage(payload_headers, None)
  else:
    request = self._helpers.buildHttpMessage(payload_headers, request_body)
      
      



, POST , . . . ,





response = self._callbacks.makeHttpRequest(baseRequestResponse.getHttpService(), request)
response_headers = list(self._helpers.analyzeResponse(response.getResponse()).getHeaders())
      
      



? - Access-Control-Allow-Origin



, , Origin .





for response_header in response_headers:
  if 'Access-Control-Allow-Origin' in response_header:
      
      



- . Burp, . .





, :





cors-scanner.py
def doPassiveScan(self, baseRequestResponse):

  response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders()) 

  for response_header in response_headers:
    if 'Access-Control-Allow-Origin' in response_header or 'Access-Control-Allow-Credentials' in response_header:

      request_url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
      request_headers = self._helpers.analyzeRequest(baseRequestResponse).getHeaders()

      issues = []
      payloads = self._generate_payloads(request_url)

      for payload in payloads.values():
        payload_headers = self._add_origin(request_headers, payload['origin'])

        body_offset = self._helpers.analyzeRequest(baseRequestResponse).getBodyOffset()
        request_body = baseRequestResponse.getRequest()[body_offset:]

        if len(request_body) == 0:
          request = self._helpers.buildHttpMessage(payload_headers, None)
        else:
          request = self._helpers.buildHttpMessage(payload_headers, request_body)

        response = self._callbacks.makeHttpRequest(baseRequestResponse.getHttpService(), request)
        response_headers = list(self._helpers.analyzeResponse(response.getResponse()).getHeaders())

        for response_header in response_headers:
          if 'Access-Control-Allow-Origin' in response_header:


    return issues

      
      



, . !





, IScanIssue





class CustomScanIssue(IScanIssue):
      
      



__init__







def __init__(self, httpService, url, httpMessages, name, detail, severity):
  self._httpService = httpService
  self._url = url
  self._httpMessages = httpMessages
  self._name = name
  self._detail = detail
  self._severity = severity
  self._confidence = 'Certain'
  return
      
      



. URL, , , severity, confidence . , Dashboard URL, . , Burp . :





class CustomScanIssue(IScanIssue):
  def __init__(self, httpService, url, httpMessages, name, detail, severity):
    self._httpService = httpService
    self._url = url
    self._httpMessages = httpMessages
    self._name = name
    self._detail = detail
    self._severity = severity
    self._confidence = 'Certain'

  def getUrl(self):
    return self._url

  def getIssueName(self):
    return self._name

  def getIssueType(self):
    return 0

  def getSeverity(self):
    return self._severity

  def getConfidence(self):
    return self._confidence

  def getIssueBackground(self):
    return None

  def getRemediationBackground(self):
    return None

  def getIssueDetail(self):
    return self._detail

  def getRemediationDetail(self):
    return None

  def getHttpMessages(self):
    return self._httpMessages

  def getHttpService(self):
    return self._httpService
      
      



.





, Access-Control-Allow-Origin



, :





for response_header in response_headers:
  if 'Access-Control-Allow-Origin' in response_header:

    issues.append(
      CustomScanIssue(
        baseRequestResponse.getHttpService(),
        request_url,
        [response],
        'CORS Misconfiguration',
        payload['description'],
        payload['severity']
      )
    )
                            
    break
      
      



:





  1. HTTP





  2. URL,





  3. response



    , makeHttpRequest



    , , UI,





  4. ,









  5. Severity,





, , , .





, Access-Control-Allow-Origin: *







if response_header == 'Access-Control-Allow-Origin: *':
  return CustomScanIssue(
    baseRequestResponse.getHttpService(),
    request_url,
    [baseRequestResponse],
    'CORS Misconfiguration',
    'Site trust *',
    'Medium'
)
      
      



, URL, Allow-Origin



wildcard.





consolidateDuplicateIssues



. , URL.





def consolidateDuplicateIssues(self, existingIssue, newIssue):
  if existingIssue.getIssueDetail() == newIssue.getIssueDetail():
    return -1

  return 0
      
      



, . URL - .





:





cors-scanner.py
from burp import IBurpExtender, IScannerCheck, IScanIssue
from java.io import PrintWriter
import sys

try:
    from exceptions_fix import FixBurpExceptions
except ImportError:
    pass

class BurpExtender(IBurpExtender, IScannerCheck):

  def registerExtenderCallbacks(self, callbacks):

    sys.stdout = PrintWriter(callbacks.getStdout(), True)

    self._callbacks = callbacks
    self._helpers = callbacks.getHelpers()

    callbacks.setExtensionName('CORS Passive Scanner')

    callbacks.registerScannerCheck(self)


  def _add_origin(self, headers, value):
    headers = list(headers)
    headers.append('Origin: {}'.format(value))
    return headers


  def _generate_payloads(self, url):

    host = url.getHost()
    protocol = url.getProtocol()

    payloads = {}

    # trust any origin
    payload_url = '{}://evil.com'.format(protocol)
    payloads['trust_any_origin'] = {'origin': payload_url, 'description': 'Site trust any origin', 'severity': 'High'}

    # trust any subdomain
    payload_url = '{}://evil.{}'.format(protocol, host)
    payloads['trust_any_subdomain'] = {'origin': payload_url, 'description': 'Site trust any subdomain', 'severity': 'High'}

    # trust insecure protocol
    if protocol == 'https':
      payload_url = 'http://evil.{}'.format(host)
      payloads['trust_http'] = {'origin': payload_url, 'description': 'Site trust insecure protocol', 'severity': 'Medium'}

    # trust null
    payload_url = 'null'
    payloads['trust_null'] = {'origin': payload_url, 'description': 'Site trust null origin', 'severity': 'High'}

    # prefix match full url
    payload_url = '{}://{}.evil.com'.format(protocol, host)
    payloads['trust_prefix'] = {'origin': payload_url, 'description': 'Site trust prefix', 'severity': 'High'}

    # trust invalid regex dot escape
    splitted_host = host.split('.')
    payload_host = '{}A{}.{}'.format('.'.join(splitted_host[:-1]), splitted_host[-1], splitted_host[-1])
    payload_url = '{}://{}'.format(protocol, payload_host)
    payloads['trust_invalid_regex'] = {'origin': payload_url, 'description': 'Site trust origin with unescaped dot', 'severity': 'High'}

    return payloads


  def doPassiveScan(self, baseRequestResponse):
        
    response_headers = list(self._helpers.analyzeResponse(baseRequestResponse.getResponse()).getHeaders()) 

    for response_header in response_headers:
      if 'Access-Control-Allow-Origin' in response_header or 'Access-Control-Allow-Credentials' in response_header:

        request_url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
        request_headers = self._helpers.analyzeRequest(baseRequestResponse).getHeaders()


        if response_header == 'Access-Control-Allow-Origin: *':
          return CustomScanIssue(
            baseRequestResponse.getHttpService(),
            request_url,
            [baseRequestResponse],
            'CORS Misconfiguration',
            'Site trust any origin',
            'Medium'
           )

        issues = []
        payloads = self._generate_payloads(request_url)

        for payload in payloads.values():
          payload_headers = self._add_origin(request_headers, payload['origin'])

          body_offset = self._helpers.analyzeRequest(baseRequestResponse).getBodyOffset()
          request_body = baseRequestResponse.getRequest()[body_offset:]

          if len(request_body) == 0:
            request = self._helpers.buildHttpMessage(payload_headers, None)
          else:
            request = self._helpers.buildHttpMessage(payload_headers, request_body)

          response = self._callbacks.makeHttpRequest(baseRequestResponse.getHttpService(), request)
          response_headers = list(self._helpers.analyzeResponse(response.getResponse()).getHeaders())

          for response_header in response_headers:
            if 'Access-Control-Allow-Origin' in response_header:

              issues.append(
                CustomScanIssue(
                  baseRequestResponse.getHttpService(),
                  request_url,
                  [response],
                  'CORS Misconfiguration',
                  payload['description'],
                  payload['severity']
                )
              )
                            
              break

        return issues
      
  def consolidateDuplicateIssues(self, existingIssue, newIssue):
    if existingIssue.getIssueDetail() == newIssue.getIssueDetail():
      return -1
    return 0

class CustomScanIssue(IScanIssue):
  def __init__(self, httpService, url, httpMessages, name, detail, severity):
    self._httpService = httpService
    self._url = url
    self._httpMessages = httpMessages
    self._name = name
    self._detail = detail
    self._severity = severity
    self._confidence = 'Certain'

  def getUrl(self):
    return self._url

  def getIssueName(self):
    return self._name

  def getIssueType(self):
    return 0

  def getSeverity(self):
    return self._severity

  def getConfidence(self):
    return self._confidence

  def getIssueBackground(self):
    return None

  def getRemediationBackground(self):
    return None

  def getIssueDetail(self):
    return self._detail

  def getRemediationDetail(self):
    return None

  def getHttpMessages(self):
    return self._httpMessages

  def getHttpService(self):
    return self._httpService

try:
  FixBurpExceptions()
except:
  pass
      
      



, .





Burp

- Extender, Extensions -> Add.

Extension type - , Python

Extension file -





Next, . , .





HackerOne , CORS misconfiguration, Origin, .





- PortSwigger.





- Origin Reflect. , . , wiener:peter



. , , . , 6 High .





All 6 payloads worked, which is correct
6 ,

- , Origin: null





, wiener:peter



.  ,





null origin
null origin

- . , . , :





Medium as intended
Medium,

(, ), , , , , . , XSS , subdomain-takeover. URL, :





Dashboard reporting
dashboard
Response headers

Burp Suite Python. , . , . Java Python. , - CORS misconfiguration. - , XSS, . , , . CORS , Information Disclosure OTA.





A scanner for this kind of vulnerabilities for Burp is undoubtedly a good thing, especially since extensions for CORS are not available in the store for some reason (or I did not notice).





Burp is a great tool for hobbies and money making. Baghunters know how tough the competition can be, and automation of everything that is possible plays a huge role in this. I found errors in well-known companies only because I automate all the processes, often looking at the results of automatic scans and studying by hand.





Readers - thanks for reading, baghunters - good luck!





Never stop learning and learning new tools.








All Articles