根据安全公告中的描述,我们可以简单地获得导致漏洞的一些关键点。
在配置中将 作为 ErrorHandlingDeserializer Kafka 记录中的键和/或值。
将布尔类型属性 checkDeserExWhenKeyNull 和/或 checkDeserExWhenValueNull 设置为 true。
用户可以发布 Kafka 主题,无需任何验证。
在深入研究该漏洞之前,我们及时回顾了 Kafka 服务的一些相关概念。
Producer:我们调用发布记录卡夫卡主题制作人的对象
Topic:记录由 Kafka 服务分类,每个分类都命名为 Topic。
Broker:发布的消息存储在一组服务器中,我们称之为Kafka集群。每个服务器都是一个代理。消费者可以获得数据形式 Broker 并使用多个主题。
consumer:用于订阅消息并与已发布消息一起处理的对象称为 Kafka topi 消费者。消耗消息是基于主题的。
此外,有必要回顾kafka记录的结构。
Kafka 记录,我们也称它为由标头和正文组成的消息或事件。标头数据实际上等于元数据,包括主题、页码和时间戳等基本元素。它们存储为一对键/值。正文数据通常也是存储为键/值结构的相关业务数据。
复现:
部署Kafka服务之前需要Zookeeper服务器。
1.通过docker安装Zookeeper服务器
docker run -d --name zookeeper -p 2181:2181 -t zookeeper:latest2.通过docker部署Kafka服务器
docker run -d --name kafka -p 9092:9092 \-e KAFKA_ZOOKEEPER_CONNECT=192.168.5.102:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.5.102:9092 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \-e TZ="Asia/Shanghai" \wurstmeister/kafka:latest
3.Spring Boot项目导入受影响的Kafka依赖
受影响版本:
2.8.1至2.9.10
3.0.0 至 3.0.
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>
4.更新配置application.yaml
5.演示类
1)Kafka生产者类
package com.example.SpringKafkaDemo.producer;import com.example.SpringKafkaDemo.model.KafkaMessage;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.*;import java.util.HashMap;@RestControllerpublic class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/message/send")public String sendMessage(@RequestBody KafkaMessage message) {String topic = message.getTopic();String data = message.getData();HashMap<String, String> headers = message.getHeaders();ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, data);for (String s : headers.keySet()) {if (s.equals("springDeserializerExceptionKey")) {String exceptData = headers.get(s);byte[] exceptHandler = KafkaProducer.hexStringtoBytes(exceptData);producerRecord.headers().add(s, exceptHandler);continue;}producerRecord.headers().add(s, headers.get(s).getBytes());}kafkaTemplate.send(producerRecord);String jsonString="{\"code\":\"200\", \"status\":\"success\"}";return jsonString;}private static byte[] hexStringtoBytes(String hexString) {byte[] excepetionMessage = new byte[hexString.length() / 2];for (int i = 0; i < excepetionMessage.length; i++) {excepetionMessage[i] = (byte) Integer.parseInt(hexString.substring(i * 2, i * 2 + 2), 16);}return excepetionMessage;}}
顺便说一句,这里我们使用了Java语言中的一种设计模式, 模板方法模式。在此演示中,我插入了一个名为 的模板kafkaTemplate。
代码片段的高亮显示
private KafkaTemplate<String, String> kafkaTemplate;2)Kafka消费类
package com.example.SpringKafkaDemo.consumer;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;@Servicepublic class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group-id")public void consume(String message) {System.out.println("Received message: " + message);}}
3)消费者配置类
package com.example.SpringKafkaDemo.config;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);factory.getContainerProperties().setCheckDeserExWhenValueNull(true);return factory;}}
根据官方公告中的漏洞描述,我们应该将checkDeserExWhenKeyNull和checkDeserExWhenValueNull属性都设置为true。
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true)factory.getContainerProperties().setCheckDeserExWhenValueNull(true)
在函数处设置断点getExceptionFromHeader,然后启动服务器。
进入函数后invokeIfHaveRecords,记录对象将被反序列化。
回到getExceptionFromHeader函数。
该函数将 的值转化springDeserializerExceptionKey为record.headers()变量的值headerName并进行传递header。
然后将价值交付给byteArrayToDeserializationException职能。
进入byteArrayToDeserializationException功能。
该resolveClass函数被重写以限制任意 Java 类反序列化。实际上,我们可以在很多项目中找到防止Java反序列化漏洞的方法,比如Apache Shiro、Fastjson等。
显然,只有类org.springframework.kafka.support.serializer.DeserializationException可以反序列化。
进入DeserializationException函数,它由四个参数组成。其中之一是cause用于调用实例类。
编写一个恶意类并使其继承父类Throwable。
springDeserializerExceptionKey最后,用生成的Java序列化填充JSON数据中的键值。发送HTTP请求后触发远程代码执行。
可点击阅读原文跳转到原文地址
感谢您抽出
.
.
来阅读本文
点它,分享点赞在看都在这里