背景

我们有类似waf的攻击检测产品,所以需要测试一些攻击是否能够正确识别,并且识别的原因是否正确,如果是自己去看规则,来编写,几十上百个规则太难了,要搞很久很久,而且是非常无聊的任务

大模型

用大模型来完成这个任务刚好,大模型懂SecRule语法,并且能够理解正则表达式,是干这个活的完美人选。
只要写一个不太烂的prompt约定一下产出的格式,我这里是产出的python代码,然后保存为文件,方便我以后经常调用验证产品。

import pandas as pd
import openai
import os

# 设置OpenAI API密钥和自定义endpoint
API_KEY = ''
# 替换为您的自定义endpoint
API_BASE = ''

# 读取CSV文件
input_csv_path = 'input.csv'
df = pd.read_csv(input_csv_path)

# 创建一个数据目录
output_directory = 'payload_output/'
os.makedirs(output_directory, exist_ok=True)


# 假设CSV文件中有一列名为'text',我们将对其进行文本生成
generated_texts = []

client = openai.OpenAI(
    api_key=API_KEY,
    base_url=API_BASE
)

for index, row in df.iterrows():
    print(f"正在处理第 {index + 1} 行... {row.iloc[0]}")
    prompt = row.iloc[1]

    # 调用OpenAI的API生成文本
    response = client.chat.completions.create(
        stream=False,
        model="deepseek-coder",  # 选择适当的模型
        messages=[
            {"role": "system", "content": """
                需要对WAF规则进行匹配测试,所以要根据用户提供的Secrule规则进行python poc的编写。
                请使用requests库来进行请求发送,函数名直接命名为attack,在函数的顶部用注释表明该模拟的意图,传入参数为url、cookie,url为模拟攻击网站host,cookie参数如果生成的poc脚本中用到了则拼接在一起,否则直接传递cookie参数,举例如下:
                ```python
                # 模拟cookie中超过3个符号
                def attack(url, cookie):
                  payload={}
                  headers = {
                    'Host': 'example.com',
                    'Cookie': 'TestCookie=-1-1-1-1-;' + cookie,
                    'User-Agent': 'Jetknown-Test-Case'
                  }

                  response = requests.request("GET", url, headers=headers, data=payload)

                  print(response.text)
                ```
                仅需输出代码块,不要进行解释和说明,也不要在代码块中引入requests
                """},
            {"role": "user", "content": "规则如下:\n" + prompt + "\n"}
        ],
        max_tokens=4000  # 根据需要调整生成文本的长度
    )

    generated_text = response.choices[0].message.content.strip()
    print(f"生成的文本:{generated_text}")

    # 从生成的文件的markdown中提取代码
    generated_text = generated_text.split("```python")[1].split("```")[0].strip()
    generated_text = "import requests \n\n" + generated_text
    # 将生成的文本保存到文件中
    file_name = f"{output_directory}attack_{row.iloc[0]}.py"
    # 写入文件
    with open(file_name, 'w') as f:
        f.write(generated_text)

然后写一个代码来动态引入python文件并且执行

def execute_attacks(directory, url, cookie):
    # 遍历指定目录中的所有Python文件
    for filename in os.listdir(directory):
        if filename.endswith('.py'):
            file_path = os.path.join(directory, filename)

            # 动态导入Python文件
            spec = importlib.util.spec_from_file_location(filename[:-3], file_path)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)

            # 检查是否存在attack函数
            if hasattr(module, 'attack'):
                attack_func = getattr(module, 'attack')

                # 执行attack函数
                try:
                    result = attack_func(url, cookie)
                    print(f"执行 {filename} 中的attack函数,结果: {result}")
                except Exception as e:
                    print(f"执行 {filename} 中的attack函数时出错: {str(e)}")
            else:
                print(f"{filename} 中没有找到attack函数")

大大减少手写测试用例的工作量,提高测试效率,而且可复用,方便后续持续验证产品。并且如果发现不理想的代码,可以挑出来再让大模型重新生成。


0 条评论

发表回复

Avatar placeholder

您的电子邮箱地址不会被公开。 必填项已用 * 标注