亚洲国产日韩人妖另类,久久只有这里有精品热久久,依依成人精品视频在线观看,免费国产午夜视频在线

      
      

        聊聊如何實現(xiàn)一個帶冪等模板的Kafka消費者

        實現(xiàn)步驟

        1、kafka自動提交改為手動提交

        spring: kafka: consumer: # 是否自動提交偏移量,默認值是true,為了避免出現(xiàn)重復數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設置為false,然后手動提交偏移量 enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:false}復制代碼

        2、定義消費端模板抽象基類

        @Slf4jpublic abstract class BaseComusmeListener { @KafkaHandler public final void receive(@Payload String data, @Header(value = KafkaHeaders.RECEIVED_TOPIC,required = false) String receivedTopic, @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY,required = false) String receivedMessageKey, @Header(value = KafkaHeaders.RECEIVED_TIMESTAMP,required = false) long receivedTimestamp, Acknowledgment ack){ KafkaComsumePayLoad kafkaComsumePayLoad = buildKafkaComsumePayLoad(data,receivedTimestamp,receivedTopic,receivedMessageKey); boolean isRepeateConsume = isRepeateConsume(kafkaComsumePayLoad); if(isRepeateConsume){ log.warn(“messageKey:【{}】,topic:【{}】存在重復消息數(shù)據(jù)–>【{}】”,receivedMessageKey,receivedTopic,data); //手工確認 ack.acknowledge(); return; } if(doBiz(kafkaComsumePayLoad)){ //手工確認 ack.acknowledge(); } } /** * 是否重復消費 * @param kafkaComsumePayLoad * @return */ public abstract boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad); /** * 業(yè)務處理 * @param kafkaComsumerPayLoad */ public abstract boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad); private KafkaComsumePayLoad buildKafkaComsumePayLoad(String data, long receivedTimestamp, String receivedTopic, String receivedMessageKey){ return KafkaComsumePayLoad.builder() .data(data) .receivedTimestamp(receivedTimestamp) .receivedTopic(receivedTopic) .receivedMessageKey(receivedMessageKey) .build(); }}復制代碼

        3、自定義監(jiān)聽注解【可選】

        @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })@Retention(RetentionPolicy.RUNTIME)@KafkaListener@Documented@Componentpublic @interface LybGeekKafkaListener { @AliasFor(annotation = KafkaListener.class, attribute = “id”) String id() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “containerFactory”) String containerFactory() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “topics”) String[] topics() default {}; @AliasFor(annotation = KafkaListener.class, attribute = “topicPattern”) String topicPattern() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “topicPartitions”) TopicPartition[] topicPartitions() default {}; @AliasFor(annotation = KafkaListener.class, attribute = “containerGroup”) String containerGroup() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “errorHandler”) String errorHandler() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “groupId”) String groupId() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “idIsGroup”) boolean idIsGroup() default true; @AliasFor(annotation = KafkaListener.class, attribute = “clientIdPrefix”) String clientIdPrefix() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “beanRef”) String beanRef() default “__listener”; @AliasFor(annotation = KafkaListener.class, attribute = “concurrency”) String concurrency() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “autoStartup”) String autoStartup() default “”; @AliasFor(annotation = KafkaListener.class, attribute = “properties”) String[] properties() default {}; @AliasFor(annotation = Component.class, attribute = “value”) String value() default “”;}復制代碼

        3、重寫KafkaListener注解后置處理器【可選】

        注: 因示例項目的springboot版本比較低,直接使用@LybGeekKafkaListener不起作用

        public class LybGeekKafkaListenerAnnotationBeanPostProcessorimplements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {private static final String GENERATED_ID_PREFIX = “org.springframework.kafka.KafkaListenerEndpointContainer#”;/** * The bean name of the default {@link KafkaListenerContainerFactory}. */public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = “kafkaListenerContainerFactory”;private final Set nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap(64));private final Log logger = LogFactory.getLog(getClass());private final ListenerScope listenerScope = new ListenerScope();private KafkaListenerEndpointRegistry endpointRegistry;private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;private DefaultListableBeanFactory beanFactory;private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =new KafkaHandlerMethodFactoryAdapter();private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();private final AtomicInteger counter = new AtomicInteger();private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();private BeanExpressionContext expressionContext;private Charset charset = StandardCharsets.UTF_8;@Overridepublic int getOrder() {return LOWEST_PRECEDENCE;}/** * Set the {@link KafkaListenerEndpointRegistry} that will hold the created * endpoint and manage the lifecycle of the related listener container. * @param endpointRegistry the {@link KafkaListenerEndpointRegistry} to set. */public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) {this.endpointRegistry = endpointRegistry;}/** * Set the name of the {@link KafkaListenerContainerFactory} to use by default. *

        If none is specified, “kafkaListenerContainerFactory” is assumed to be defined. * @param containerFactoryBeanName the {@link KafkaListenerContainerFactory} bean name. */public void setDefaultContainerFactoryBeanName(String containerFactoryBeanName) {this.defaultContainerFactoryBeanName = containerFactoryBeanName;}/** * Set the {@link MessageHandlerMethodFactory} to use to configure the message * listener responsible to serve an endpoint detected by this processor. *

        By default, {@link DefaultMessageHandlerMethodFactory} is used and it * can be configured further to support additional method arguments * or to customize conversion and validation support. See * {@link DefaultMessageHandlerMethodFactory} Javadoc for more details. * @param messageHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance. */public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHandlerMethodFactory) {this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(messageHandlerMethodFactory);}/** * Making a {@link BeanFactory} available is optional; if not set, * {@link KafkaListenerConfigurer} beans won’t get autodetected and an * {@link #setEndpointRegistry endpoint registry} has to be explicitly configured. * @param beanFactory the {@link BeanFactory} to be used. */@Overridepublic void setBeanFactory(BeanFactory beanFactory) {this.beanFactory = (DefaultListableBeanFactory) beanFactory;if (beanFactory instanceof ConfigurableListableBeanFactory) {this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,this.listenerScope);}}/** * Set a charset to use when converting byte[] to String in method arguments. * Default UTF-8. * @param charset the charset. * @since 2.2 */public void setCharset(Charset charset) {Assert.notNull(charset, “‘charset’ cannot be null”);this.charset = charset;}@Overridepublic void afterSingletonsInstantiated() {this.registrar.setBeanFactory(this.beanFactory);if (this.beanFactory instanceof ListableBeanFactory) {Map instances =((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);for (KafkaListenerConfigurer configurer : instances.values()) {configurer.configureKafkaListeners(this.registrar);}}if (this.registrar.getEndpointRegistry() == null) {if (this.endpointRegistry == null) {Assert.state(this.beanFactory != null,”BeanFactory must be set to find endpoint registry by bean name”);this.endpointRegistry = this.beanFactory.getBean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,KafkaListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}if (this.defaultContainerFactoryBeanName != null) {this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);}// Set the custom handler method factory once resolved by the configurerMessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();if (handlerMethodFactory != null) {this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);}else {addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);}// Actually register all listenersthis.registrar.afterPropertiesSet();beanFactory.removeBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME);}@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return bean;}@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class targetClass = AopUtils.getTargetClass(bean);Collection classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = classLevelListeners.size() > 0;final List multiMethods = new ArrayList();Map annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup) method -> {Set listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(bean.getClass());if (this.logger.isTraceEnabled()) {this.logger.trace(“No @LybGeekKafkaListener annotations found on bean type: ” + bean.getClass());}}else {// Non-empty set of methodsfor (Map.Entry entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (LybGeekKafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}if (this.logger.isDebugEnabled()) {this.logger.debug(annotatedMethods.size() + ” @LybGeekKafkaListener methods processed on bean ‘”+ beanName + “‘: ” + annotatedMethods);}}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}/* * AnnotationUtils.getRepeatableAnnotations does not look at interfaces */private Collection findListenerAnnotations(Class clazz) {Set listeners = new HashSet();LybGeekKafkaListener ann = AnnotationUtils.findAnnotation(clazz, LybGeekKafkaListener.class);if (ann != null) {listeners.add(ann);}return listeners;}/* * AnnotationUtils.getRepeatableAnnotations does not look at interfaces */private Set findListenerAnnotations(Method method) {Set listeners = new HashSet();LybGeekKafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, LybGeekKafkaListener.class);if (ann != null) {listeners.add(ann);}return listeners;}private void processMultiMethodListeners(Collection classLevelListeners, List multiMethods,Object bean, String beanName) {List checkedMethods = new ArrayList();Method defaultMethod = null;for (Method method : multiMethods) {Method checked = checkProxy(method, bean);KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);if (annotation != null && annotation.isDefault()) {final Method toAssert = defaultMethod;Assert.state(toAssert == null, () -> “Only one @KafkaHandler can be marked ‘isDefault’, found: “+ toAssert.toString() + ” and ” + method.toString());defaultMethod = checked;}checkedMethods.add(checked);}for (LybGeekKafkaListener classLevelListener : classLevelListeners) {MultiMethodKafkaListenerEndpoint endpoint =new MultiMethodKafkaListenerEndpoint(checkedMethods, defaultMethod, bean);processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);}}protected void processKafkaListener(LybGeekKafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint();endpoint.setMethod(methodToUse);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}private Method checkProxy(Method methodArg, Object bean) {Method method = methodArg;if (AopUtils.isJdkDynamicProxy(bean)) {try {// Found a @LybGeekKafkaListener method on the target class for this JDK proxy ->// is it also present on the proxy itself?method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());Class[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();for (Class iface : proxiedInterfaces) {try {method = iface.getMethod(method.getName(), method.getParameterTypes());break;}catch (NoSuchMethodException noMethod) {}}}catch (SecurityException ex) {ReflectionUtils.handleReflectionException(ex);}catch (NoSuchMethodException ex) {throw new IllegalStateException(String.format(“@LybGeekKafkaListener method ‘%s’ found on bean target class ‘%s’, ” +”but not found in any interface(s) for bean JDK proxy. Either ” +”pull the method up to an interface or switch to subclass (CGLIB) ” +”proxies by setting proxy-target-class/proxyTargetClass ” +”attribute to ‘true'”, method.getName(),method.getDeclaringClass().getSimpleName()), ex);}}return method;}protected void processListener(MethodKafkaListenerEndpoint endpoint, LybGeekKafkaListener kafkaListener,Object bean, Object adminTarget, String beanName) {String beanRef = kafkaListener.beanRef();if (StringUtils.hasText(beanRef)) {this.listenerScope.addListener(beanRef, bean);}endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(kafkaListener));endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPattern(resolvePattern(kafkaListener));endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), “clientIdPrefix”));String group = kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String concurrency = kafkaListener.concurrency();if (StringUtils.hasText(concurrency)) {endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, “concurrency”));}String autoStartup = kafkaListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, “autoStartup”));}resolveKafkaProperties(endpoint, kafkaListener.properties());KafkaListenerContainerFactory factory = null;String containerFactoryBeanName = resolve(kafkaListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, “BeanFactory must be set to obtain container factory by bean name”);try {factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException(“Could not register Kafka listener endpoint on [” + adminTarget+ “] for bean ” + beanName + “, no ” + KafkaListenerContainerFactory.class.getSimpleName()+ ” with id ‘” + containerFactoryBeanName + “‘ was found in the application context”, ex);}}endpoint.setBeanFactory(this.beanFactory);String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), “errorHandler”);if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));}this.registrar.registerEndpoint(endpoint, factory);if (StringUtils.hasText(beanRef)) {this.listenerScope.removeListener(beanRef);}}private void resolveKafkaProperties(MethodKafkaListenerEndpoint endpoint, String[] propertyStrings) {if (propertyStrings.length > 0) {Properties properties = new Properties();for (String property : propertyStrings) {String value = resolveExpressionAsString(property, “property”);if (value != null) {try {properties.load(new StringReader(value));}catch (IOException e) {this.logger.error(“Failed to load property ” + property + “, continuing…”, e);}}}endpoint.setConsumerProperties(properties);}}private String getEndpointId(LybGeekKafkaListener kafkaListener) {if (StringUtils.hasText(kafkaListener.id())) {return resolveExpressionAsString(kafkaListener.id(), “id”);}else {return GENERATED_ID_PREFIX + this.counter.getAndIncrement();}}private String getEndpointGroupId(LybGeekKafkaListener kafkaListener, String id) {String groupId = null;if (StringUtils.hasText(kafkaListener.groupId())) {groupId = resolveExpressionAsString(kafkaListener.groupId(), “groupId”);}if (groupId == null && kafkaListener.idIsGroup() && StringUtils.hasText(kafkaListener.id())) {groupId = id;}return groupId;}private TopicPartitionInitialOffset[] resolveTopicPartitions(LybGeekKafkaListener kafkaListener) {TopicPartition[] topicPartitions = kafkaListener.topicPartitions();List result = new ArrayList();if (topicPartitions.length > 0) {for (TopicPartition topicPartition : topicPartitions) {result.addAll(resolveTopicPartitionsList(topicPartition));}}return result.toArray(new TopicPartitionInitialOffset[0]);}private String[] resolveTopics(LybGeekKafkaListener kafkaListener) {String[] topics = kafkaListener.topics();List result = new ArrayList();if (topics.length > 0) {for (String topic1 : topics) {Object topic = resolveExpression(topic1);resolveAsString(topic, result);}}return result.toArray(new String[0]);}private Pattern resolvePattern(LybGeekKafkaListener kafkaListener) {Pattern pattern = null;String text = kafkaListener.topicPattern();if (StringUtils.hasText(text)) {Object resolved = resolveExpression(text);if (resolved instanceof Pattern) {pattern = (Pattern) resolved;}else if (resolved instanceof String) {pattern = Pattern.compile((String) resolved);}else if (resolved != null) {throw new IllegalStateException(“topicPattern must resolve to a Pattern or String, not ” + resolved.getClass());}}return pattern;}private List resolveTopicPartitionsList(TopicPartition topicPartition) {Object topic = resolveExpression(topicPartition.topic());Assert.state(topic instanceof String,”topic in @TopicPartition must resolve to a String, not ” + topic.getClass());Assert.state(StringUtils.hasText((String) topic), “topic in @TopicPartition must not be empty”);String[] partitions = topicPartition.partitions();PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();Assert.state(partitions.length > 0 || partitionOffsets.length > 0,”At least one ‘partition’ or ‘partitionOffset’ required in @TopicPartition for topic ‘” + topic + “‘”);List result = new ArrayList();for (String partition : partitions) {resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);}for (PartitionOffset partitionOffset : partitionOffsets) {TopicPartitionInitialOffset topicPartitionOffset =new TopicPartitionInitialOffset((String) topic,resolvePartition(topic, partitionOffset),resolveInitialOffset(topic, partitionOffset),isRelative(topic, partitionOffset));if (!result.contains(topicPartitionOffset)) {result.add(topicPartitionOffset);}else {throw new IllegalArgumentException(String.format(“@TopicPartition can’t have the same partition configuration twice: [%s]”,topicPartitionOffset));}}return result;}private Integer resolvePartition(Object topic, PartitionOffset partitionOffset) {Object partitionValue = resolveExpression(partitionOffset.partition());Integer partition;if (partitionValue instanceof String) {Assert.state(StringUtils.hasText((String) partitionValue),”partition in @PartitionOffset for topic ‘” + topic + “‘ cannot be empty”);partition = Integer.valueOf((String) partitionValue);}else if (partitionValue instanceof Integer) {partition = (Integer) partitionValue;}else {throw new IllegalArgumentException(String.format(“@PartitionOffset for topic ‘%s’ can’t resolve ‘%s’ as an Integer or String, resolved to ‘%s'”,topic, partitionOffset.partition(), partitionValue.getClass()));}return partition;}private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());Long initialOffset;if (initialOffsetValue instanceof String) {Assert.state(StringUtils.hasText((String) initialOffsetValue),”‘initialOffset’ in @PartitionOffset for topic ‘” + topic + “‘ cannot be empty”);initialOffset = Long.valueOf((String) initialOffsetValue);}else if (initialOffsetValue instanceof Long) {initialOffset = (Long) initialOffsetValue;}else {throw new IllegalArgumentException(String.format(“@PartitionOffset for topic ‘%s’ can’t resolve ‘%s’ as a Long or String, resolved to ‘%s'”,topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));}return initialOffset;}private boolean isRelative(Object topic, PartitionOffset partitionOffset) {Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());Boolean relativeToCurrent;if (relativeToCurrentValue instanceof String) {relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);}else if (relativeToCurrentValue instanceof Boolean) {relativeToCurrent = (Boolean) relativeToCurrentValue;}else {throw new IllegalArgumentException(String.format(“@PartitionOffset for topic ‘%s’ can’t resolve ‘%s’ as a Boolean or String, resolved to ‘%s'”,topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));}return relativeToCurrent;}@SuppressWarnings(“unchecked”)private void resolveAsString(Object resolvedValue, List result) {if (resolvedValue instanceof String[]) {for (Object object : (String[]) resolvedValue) {resolveAsString(object, result);}}else if (resolvedValue instanceof String) {result.add((String) resolvedValue);}else if (resolvedValue instanceof Iterable) {for (Object object : (Iterable) resolvedValue) {resolveAsString(object, result);}}else {throw new IllegalArgumentException(String.format(“@LybGeekKafkaListener can’t resolve ‘%s’ as a String”, resolvedValue));}}@SuppressWarnings(“unchecked”)private void resolvePartitionAsInteger(String topic, Object resolvedValue,List result) {if (resolvedValue instanceof String[]) {for (Object object : (String[]) resolvedValue) {resolvePartitionAsInteger(topic, object, result);}}else if (resolvedValue instanceof String) {Assert.state(StringUtils.hasText((String) resolvedValue),”partition in @TopicPartition for topic ‘” + topic + “‘ cannot be empty”);result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));}else if (resolvedValue instanceof Integer[]) {for (Integer partition : (Integer[]) resolvedValue) {result.add(new TopicPartitionInitialOffset(topic, partition));}}else if (resolvedValue instanceof Integer) {result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));}else if (resolvedValue instanceof Iterable) {for (Object object : (Iterable) resolvedValue) {resolvePartitionAsInteger(topic, object, result);}}else {throw new IllegalArgumentException(String.format(“@LybGeekKafkaListener for topic ‘%s’ can’t resolve ‘%s’ as an Integer or String”, topic, resolvedValue));}}private String resolveExpressionAsString(String value, String attribute) {Object resolved = resolveExpression(value);if (resolved instanceof String) {return (String) resolved;}else if (resolved != null) {throw new IllegalStateException(“The [” + attribute + “] must resolve to a String. “+ “Resolved to [” + resolved.getClass() + “] for [” + value + “]”);}return null;}private Integer resolveExpressionAsInteger(String value, String attribute) {Object resolved = resolveExpression(value);Integer result = null;if (resolved instanceof String) {result = Integer.parseInt((String) resolved);}else if (resolved instanceof Number) {result = ((Number) resolved).intValue();}else if (resolved != null) {throw new IllegalStateException(“The [” + attribute + “] must resolve to an Number or a String that can be parsed as an Integer. “+ “Resolved to [” + resolved.getClass() + “] for [” + value + “]”);}return result;}private Boolean resolveExpressionAsBoolean(String value, String attribute) {Object resolved = resolveExpression(value);Boolean result = null;if (resolved instanceof Boolean) {result = (Boolean) resolved;}else if (resolved instanceof String) {result = Boolean.parseBoolean((String) resolved);}else if (resolved != null) {throw new IllegalStateException(“The [” + attribute + “] must resolve to a Boolean or a String that can be parsed as a Boolean. “+ “Resolved to [” + resolved.getClass() + “] for [” + value + “]”);}return result;}private Object resolveExpression(String value) {return this.resolver.evaluate(resolve(value), this.expressionContext);}/** * Resolve the specified value if possible. * @param value the value to resolve * @return the resolved value * @see ConfigurableBeanFactory#resolveEmbeddedValue */private String resolve(String value) {if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);}return value;}private void addFormatters(FormatterRegistry registry) {for (Converter converter : getBeansOfType(Converter.class)) {registry.addConverter(converter);}for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {registry.addConverter(converter);}for (Formatter formatter : getBeansOfType(Formatter.class)) {registry.addFormatter(formatter);}}private Collection getBeansOfType(Class type) {if (LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {return ((ListableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory).getBeansOfType(type).values();}else {return Collections.emptySet();}}/** * An {@link MessageHandlerMethodFactory} adapter that offers a configurable underlying * instance to use. Useful if the factory to use is determined once the endpoints * have been registered but not created yet. * @see KafkaListenerEndpointRegistrar#setMessageHandlerMethodFactory */private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {private final DefaultFormattingConversionService defaultFormattingConversionService =new DefaultFormattingConversionService();private MessageHandlerMethodFactory messageHandlerMethodFactory;public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;}@Overridepublic InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);}private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {if (this.messageHandlerMethodFactory == null) {this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();}return this.messageHandlerMethodFactory;}private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();Validator validator = LybGeekKafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();if (validator != null) {defaultFactory.setValidator(validator);}defaultFactory.setBeanFactory(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory);ConfigurableBeanFactory cbf =LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?(ConfigurableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory :null;this.defaultFormattingConversionService.addConverter(new BytesToStringConverter(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.charset));defaultFactory.setConversionService(this.defaultFormattingConversionService);List argumentResolvers = new ArrayList();// Annotation-based argument resolutionargumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));argumentResolvers.add(new HeadersMethodArgumentResolver());// Type-based argument resolutionfinal GenericMessageConverter messageConverter =new GenericMessageConverter(this.defaultFormattingConversionService);argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {@Overridepublic Object resolveArgument(MethodParameter parameter, Message message) throws Exception {Object resolved = super.resolveArgument(parameter, message);/* * Replace KafkaNull list elements with null. */if (resolved instanceof List) {List list = ((List) resolved);for (int i = 0; i < list.size(); i++) {if (list.get(i) instanceof KafkaNull) {list.set(i, null);}}}return resolved;}@Overrideprotected boolean isEmptyPayload(Object payload) {return payload == null || payload instanceof KafkaNull;}});defaultFactory.setArgumentResolvers(argumentResolvers);defaultFactory.afterPropertiesSet();return defaultFactory;}}private static class BytesToStringConverter implements Converter {private final Charset charset;BytesToStringConverter(Charset charset) {this.charset = charset;}@Overridepublic String convert(byte[] source) {return new String(source, this.charset);}}private static class ListenerScope implements Scope {private final Map listeners = new HashMap();ListenerScope() {super();}public void addListener(String key, Object bean) {this.listeners.put(key, bean);}public void removeListener(String key) {this.listeners.remove(key);}@Overridepublic Object get(String name, ObjectFactory objectFactory) {return this.listeners.get(name);}@Overridepublic Object remove(String name) {return null;}@Overridepublic void registerDestructionCallback(String name, Runnable callback) {}@Overridepublic Object resolveContextualObject(String key) {return this.listeners.get(key);}@Overridepublic String getConversationId() {return null;}}}復制代碼

        業(yè)務側(cè)如何使用

        示例

        @LybGeekKafkaListener(id = “createUser”,topics = Constant.USER_TOPIC)public class UserComsumer extends BaseComusmeListener { @Autowired private UserService userService; @Override public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) { User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class); return userService.isExistUserByUsername(user.getUsername()); } @Override public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) { User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class); return userService.save(user); }}復制代碼

        總結(jié)

        有時候我們在宣導一些事情時,往往會發(fā)現(xiàn)即使我們已經(jīng)說了N遍了,事情仍然會出現(xiàn)紕漏。這時候我們可以考慮把我們想宣導的東西工具化,通過工具來規(guī)范。比如有些業(yè)務,可能一些開發(fā)沒考慮全面,我們就可以基于業(yè)務,把一些核心的場景抽象成方法,然后開發(fā)人員基于這些抽象方法,做具體實現(xiàn)。

        鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請聯(lián)系管理員(admin#wlmqw.com)刪除。
        上一篇 2022年6月22日 15:18
        下一篇 2022年6月22日 15:18

        相關推薦

        聯(lián)系我們

        聯(lián)系郵箱:admin#wlmqw.com
        工作時間:周一至周五,10:30-18:30,節(jié)假日休息