Перейти к содержанию

Главная

%%{init: {"flowchart": {"htmlLabels": false}} }%%
flowchart LR
    subgraph CLIENT [Пользователь]
        style CLIENT color:#fff
        direction RL

        subgraph JETHUB_client_app[jethub_client_app.py]
            style JETHUB_client_app color:#fff

            client_token([CLIENT_TOKEN])
            style client_token color:#fff

            JETHUB_api([API_PATH])
            style JETHUB_api color:#fff
        end

        path([файл или папка])
        style path color:#fff

        path --> JETHUB_client_app

    end

    subgraph JETHUB [JetHub]
        style JETHUB color:#fff
        direction LR

        identification([идентификация пользователя])
        style identification color:#fff

        subgraph JETHUB_tools[Инструменты]
            style JETHUB_tools color:#fff
            direction LR

            analizator([статический анализатор])
            style analizator color:#fff

            data_leaks([утечка данных])
            style data_leaks color:#fff

        end

        post_processing([постобработка])
        style post_processing color:#fff

        report([jethub_report.json])
        style report color:#fff

        identification -.-> JETHUB_tools

        JETHUB_tools -.-> post_processing
        post_processing -.-> report

    end

    CLIENT <--> JETHUB

Приложение (на python3)


  • Установка зависимостей

    pip install click==8.1.7 requests==2.32.0 ujson==5.10.0
    
  • Программа

    Для быстрого доступа к системе предлагается использовать следующий python код.

    jethub_client_app.py
    import shutil
    import time
    import zipfile
    from pathlib import Path
    
    import click
    import requests
    import ujson
    
    
    API_PATH = "insert_api_path"
    CLIENT_TOKEN = "insert_your_private_token"
    
    
    def create_task(
        path: str,
        sast_python_skiptest: str,
        sast_python_exclude_path: str,
        data_leaks_exclude_path: str,
        global_exclude_path: str,
    ) -> str:
        """
        Sends a request to the API to create the task.
    
        Args:
            path (str): The path to the userdata.
    
        Returns:
            task_id (str): The task identifier inside the API db.
    
        Raises:
            Exception: If the specified path does not exist.
            Exception: If the API request fails.
        """
    
        # Check if the specified path exists
        if not Path(path).exists():
            raise Exception("The specified path does not exist")
    
        # Create the transfer archive
        transfer = "archive.zip"
        if Path(path).is_file():
            # Create archive from 1 file
            zipfile.ZipFile("archive.zip", "w", zipfile.ZIP_DEFLATED).write(path, Path(path))
        else:
            # Create archive from directory
            transfer = shutil.make_archive("archive", "zip", path)
    
        # Prepare the request data
        file = {"userdata": ("clientarchive.zip", open(transfer, "rb"), "application/zip")}
    
        # Send the API request
        r = requests.post(
            API_PATH
            + f"/tasks/create_task?sast_python_skiptest={sast_python_skiptest}&sast_python_exclude_path={sast_python_exclude_path}&data_leaks_exclude_path={data_leaks_exclude_path}&global_exclude_path={global_exclude_path}",
            files=file,
            headers={"Authorization": f"Bearer {CLIENT_TOKEN}"},
            timeout=30,
        )
    
        Path("archive.zip").unlink()
    
        # Check the API response
        if r.status_code == 200:
            return r.json()["task_id"]
    
        else:
            # Raise an exception with the error message from the API
            raise Exception(r.json()["msg"])
    
    
    def wait_results(taskid: str) -> dict:
        """
        Wait for task results from the API.
    
        This function continuously polls the API for task results until it
        receives a response containing the "data" key or reaches a maximum number of attempts.
    
        Args:
            taskid (str): Task identifier inside the API database.
    
        Returns:
            dict: Report.
    
        Raises:
            Exception: If the function fails to receive a response from the analyzer after 10 attempts.
        """
    
        # Initialize variables
        data = {}  # API response
        counter = 0  # Number of attempts
    
        # Continuously poll the API until a response is received or the maximum number of attempts is reached
        while "data" not in data and counter < 10:
            # Send a GET request to the API
            r = requests.get(
                API_PATH + f"/tasks/get_task?task_id={taskid}",
                headers={"Authorization": f"Bearer {CLIENT_TOKEN}"},
                timeout=30,
            )
    
            # Update the data variable with the JSON response
            data = r.json()
    
            # Wait for 3 seconds before sending the next request
            time.sleep(3)
    
            # Increment the counter
            counter += 1
    
        # Raise an exception if the function fails to receive a response from the analyzer after 10 attempts
        if counter == 10 and "data" not in data:
            raise Exception("Failed to get a response from the analyzer")
    
        # Return the SAST report
        return data
    
    
    def write_results(data: dict, out_file: str):
        with open(out_file, "w") as f:
            ujson.dump(data, f, indent=4, ensure_ascii=False)
    
    
    @click.command(context_settings={"ignore_unknown_options": True})
    @click.option("--path", help="Path for transfer to sast", required=True, type=str)
    @click.option("--output", help="Output file name", default="jethub_report.json", type=str)
    @click.option("--sast-python-skiptest", help="Skiptest for sast", type=str)
    @click.option("--sast-python-exclude-path", help="Exclude path for sast", type=str)
    @click.option("--data-leaks-exclude-path", help="Exclude path for data leaks", type=str)
    @click.option("--global-exclude-path", help="Exclude path for global", type=str)
    def main(
        path: str,
        output: str,
        sast_python_skiptest: str,
        sast_python_exclude_path: str,
        data_leaks_exclude_path: str,
        global_exclude_path: str,
    ) -> None:
        """
        The main function that creates a new task, waits for the result, and writes the result to a file.
        """
    
        # Create a new task
        taskid = create_task(
            path, sast_python_skiptest, sast_python_exclude_path, data_leaks_exclude_path, global_exclude_path
        )
    
        # Wait for the result of the task
        data = wait_results(taskid)
    
        # Write the result to a file
        write_results(data, output)
    
    
    if __name__ == "__main__":
        main()
    
  • API_PATH и CLIENT_TOKEN предоставляются отдельно

  • Аргументы

    Имя Описание Условие Замечение
    --path Путь файла или папки для анализа - -
    --output Путь-имя выходного отчёта (по-умолчанию jethub_report.json) - -
    --sast-python-skiptest Список идентификаторов уязвимостей (id) для игнорирования. В строке уязвимости можно использовать # nosec Через запятую, без пробела Только для САК-python
    --sast-python-exclude-path Список файлов-папок (path) для игнорирования Через запятую, без пробела Только для САК-python
    --data-leaks-exlude-path Список файлов-папок (path) для игнорирования. В строке утечки можно использовать # nosec или // nosec Через запятую, без пробела Только для утечки данных
    —-global-exlude-path Глобальный список файлов-папок (path) для игнорирования. Eсли path одинаковый для sast_python и data_leaks Через запятую, без пробела Для САК-python и утечки данных

    Пример

    python3 jethub_client_app.py --path src/core/ --sast-python-skiptest JP0817B,JP1219C --data-leaks-exlude-path design/,total_func.py,
    

Пример отчёта


Перечень ошибок (в настоящий момент доступен только для python) можно посмотреть во вкладке «Python».

jethub_report.json
{
    "code": 200,
    "msg": "Success response",
    "data": {
        "sast_python": [
            {
                "id": "JP0847B",
                "test_name": "blacklist",
                "path": "src/main.py",
                "code": "1 import pandas as pd\n2 from fastapi import *\n3",
                "line": 2,
                "line_range": [
                    2
                ],
                "col_offset": 0,
                "end_col_offset": 21,
                "severity": "HIGH",
                "confidence": "HIGH",
                "cwe_id": 1061,
                "cwe_link": "https://cwe.mitre.org/data/definitions/1061.html",
                "link_to_doc": "https://docs.jethub.pro/python/special/import/JP0847B-import_all"
            },
            {
                "id": "JP1225C",
                "test_name": "start_process_with_no_shell",
                "path": "src/utils.py",
                "code": "9 \n10 os.execl(path, arg0, arg1)\n",
                "line": 10,
                "line_range": [
                    10
                ],  
                "col_offset": 0,
                "end_col_offset": 26,
                "severity": "LOW",  
                "confidence": "MEDIUM",
                "cwe_id": 78,
                "cwe_link": "https://cwe.mitre.org/data/definitions/78.html",
                "link_to_doc": "https://docs.jethub.pro/python/common_errors/calls/injections/JP1225C-создание_процесса_через_os_без_shell"
            }
        ],
        "data_leaks": [
            {
                "path": "src/structure.py",
                "line": 10,
                "code": "AKIAQYLPMN5HHHFPZAM2"
            },
            {
                "path": "src/code/main.go",
                "line": 4,
                "code": "***the-internet.herokuapp.com"
            }
        ]
    }
}