嘿,所有,今天我将展示一种方法,以动态生成多个消费组与春天卡夫卡。在此方法之前,让我们使用注释来执行此操作。我们只是创建一个配置类,该配置类由生成 我们的 的弹簧 @Bean
KafkaListenerContainerFactory
组成。
使用者配置文件示例:
bootstrap.servers=127.0.0.1:9093
key.deserializer=org.apache.kafka.common.serialization.LongDeserializer
value.deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.trusted.packages=com.sample.pojo,java.util, java.lang
group.id=CONSUMER-1-GROUP
auto.offset.reset=earliest
public class GeneralUtils {
public static Map<String, Object> provideKafkaConfig(String property) {
Map<String, Object> configProps = new HashMap<>();
InputStream input = null;
try {
Properties prop = new Properties();
input = new FileInputStream(property);
if (input == null) {
log.error("Sorry, unable to find " + property);
return null;
}
prop.load(input);
Enumeration<?> e = prop.propertyNames();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
String value = prop.getProperty(key);
configProps.put(key, value);
}
} catch (IOException e) {
log.error(GeneralUtils.getExcStackTrace(e));
} finally {
try {
if (input != null)
input.close();
} catch (Exception ex) {
log.error(GeneralUtils.getExcStackTrace(ex));
}
}
return configProps;
}
}
@ComponentScan({"com.sample.config.*"})
@EnableKafka
@Slf4j
@Configuration
public class KafkaConsumerConfig {
public Map<String, Object> configMap(boolean isOnline) {
Map<String, Object> configProps = new HashMap<>();
//log4j config
String property = System.getProperty("kafkaConsumerPropFilePath") :
if (null != property)
configProps=GeneralUtils.provideKafkaConfig(property);
return configProps;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Long, LogDay>> onlineKafkaListenerContainerFactory() {
Map<String, Object> propMap = configMap(true);
ConcurrentKafkaListenerContainerFactory<Long, LogDay> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(5);
factory.getContainerProperties().setPollTimeout(1000l);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(propMap));
return factory;
}
}
然后,我们生成我们的侦听器方法,它使用我们的KafkaListenerContainerFactory
…//
}
那么,如何从文件或数据库中读取我们的使用者组并动态生成它们?
只需查看下面的示例。我们不使用任何 Spring-Kafka 注释,也不会 KafkaListenerContainerfactory
将 定义为 @Bean
。
我们使用 DefaultKafaConsumerFactory
Kafka 使用者属性映射从类生成对象。然后,我们从相关主题生成一个对象, Container Propertie
并在其中设置消息侦听器。
最后,我们生成 ConcurrentMessageListenerContainer
并启动它。
@Slf4j
@Component
public class ListenerBean {
public Map<String, Object> configMap(String propName) {
Map<String, Object> configProps = new HashMap<>();
//log4j config
String property = System.getProperty(propName);
if (null != property)
configProps = GeneralUtils.provideKafkaConfig(property);
return configProps;
}
@EventListener
public void handleEvent(ContextRefreshedEvent event) {
List<TargetSystemDto> targetSystemDtoList = new ArrayList<>();
TargetSystemDto targetSystemDto=new TargetSystemDto();
targetSystemDto.setConsumerPropName("targetSystem1PropFilePath");
targetSystemDto.setSystemName("TARGET-SYSTEM-1");
targetSystemDto.setWsUrl("https://sample.com.tr:443/sampleRestApi");
targetSystemDtoList.add(targetSystemDto);
targetSystemDto=new TargetSystemDto();
targetSystemDto.setSystemName("TARGET-SYSTEM-2");
targetSystemDto.setConsumerPropName("targetSystem2PropFilePath");
targetSystemDto.setWsUrl("https://sample.com.tr:443/sampleRestApi2");
targetSystemDtoList.add(targetSystemDto);
targetSystemDtoList.forEach(dto -> generateAndStartConsumerGroup(dto));
}
private void generateAndStartConsumerGroup(TargetSystemDto dto) {
Map<String, Object> propMap = configMap(dto.getConsumerPropName());
DefaultKafkaConsumerFactory<Long, PaymentPojo> factory = new DefaultKafkaConsumerFactory<>(propMap);
ContainerProperties containerProperties = new ContainerProperties(dto.getTopicName());
containerProperties.setMessageListener(
(MessageListener<String, PaymentPojo>) messageObject ->
{
PaymentPojo paymentPojo = messageObject.value();
/* for instance do some condition check
for the related consumer group and call a rest api which has standart parameters
or transfer it to a different topic
......
*/
});
ConcurrentMessageListenerContainer container =
new ConcurrentMessageListenerContainer<>(
factory,
containerProperties);
container.start();
log.info("{} consumer is configured and started",dto.getSystemName());
}