当前位置: 首页 > news >正文

Spark使用Log4j将日志发送到Kafka

文章目录

      • 自定义KafkaAppender
      • 修改log4j.properties配置
      • 启动命令配置添加参数
      • 启动之后可以在Kafka中查询发送数据
      • 时区问题-自定义实现JSONLayout解决
        • 自定义JSONLayout.java
    • 一键应用
      • 可能遇到的异常
        • ClassNotFoundException: xxx.KafkaLog4jAppender
        • Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException
      • 参考文章

自定义KafkaAppender

注意:如果使用官方自带的可以直接引用,版本为Kafka当前使用的版本

				<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-log4j-appender</artifactId><version>2.4.1</version></dependency>

配置依赖为

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.28</version><scope>compile</scope></dependency>

自定义KafkaLog4jAppender.java内容为

这里我们实现了包名过滤功能

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaLog4jAppender extends AppenderSkeleton {/** 包含规则条件 */private Set<String> includeSet = new HashSet<>();private Set<String> includeMatchSet = new HashSet<>();/** 不包含规则条件 */private Set<String> excludeSet = new HashSet<>();private Set<String> excludeMatchSet = new HashSet<>();private String brokerList;private String topic;private String compressionType;private String securityProtocol;private String sslTruststoreLocation;private String sslTruststorePassword;private String sslKeystoreType;private String sslKeystoreLocation;private String sslKeystorePassword;private String saslKerberosServiceName;private String saslMechanism;private String clientJaasConfPath;private String clientJaasConf;private String kerb5ConfPath;private Integer maxBlockMs;private int retries = 2147483647;private int requiredNumAcks = 1;private int deliveryTimeoutMs = 120000;private boolean ignoreExceptions = true;private boolean syncSend;private Producer<byte[], byte[]> producer;private String includes;private String excludes;public String getIncludes() {return includes;}public void setIncludes(String includes) {this.includes = includes;}public String getExcludes() {return excludes;}public void setExcludes(String excludes) {this.excludes = excludes;}public KafkaLog4jAppender() {}public Producer<byte[], byte[]> getProducer() {return this.producer;}public String getBrokerList() {return this.brokerList;}public void setBrokerList(String brokerList) {this.brokerList = brokerList;}public int getRequiredNumAcks() {return this.requiredNumAcks;}public void setRequiredNumAcks(int requiredNumAcks) {this.requiredNumAcks = requiredNumAcks;}public int getRetries() {return this.retries;}public void setRetries(int retries) {this.retries = retries;}public int getDeliveryTimeoutMs() {return this.deliveryTimeoutMs;}public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {this.deliveryTimeoutMs = deliveryTimeoutMs;}public String getCompressionType() {return this.compressionType;}public void setCompressionType(String compressionType) {this.compressionType = compressionType;}public String getTopic() {return this.topic;}public void setTopic(String topic) {this.topic = topic;}public boolean getIgnoreExceptions() {return this.ignoreExceptions;}public void setIgnoreExceptions(boolean ignoreExceptions) {this.ignoreExceptions = ignoreExceptions;}public boolean getSyncSend() {return this.syncSend;}public void setSyncSend(boolean syncSend) {this.syncSend = syncSend;}public String getSslTruststorePassword() {return this.sslTruststorePassword;}public String getSslTruststoreLocation() {return this.sslTruststoreLocation;}public String getSecurityProtocol() {return this.securityProtocol;}public void setSecurityProtocol(String securityProtocol) {this.securityProtocol = securityProtocol;}public void setSslTruststoreLocation(String sslTruststoreLocation) {this.sslTruststoreLocation = sslTruststoreLocation;}public void setSslTruststorePassword(String sslTruststorePassword) {this.sslTruststorePassword = sslTruststorePassword;}public void setSslKeystorePassword(String sslKeystorePassword) {this.sslKeystorePassword = sslKeystorePassword;}public void setSslKeystoreType(String sslKeystoreType) {this.sslKeystoreType = sslKeystoreType;}public void setSslKeystoreLocation(String sslKeystoreLocation) {this.sslKeystoreLocation = sslKeystoreLocation;}public void setSaslKerberosServiceName(String saslKerberosServiceName) {this.saslKerberosServiceName = saslKerberosServiceName;}public void setClientJaasConfPath(String clientJaasConfPath) {this.clientJaasConfPath = clientJaasConfPath;}public void setKerb5ConfPath(String kerb5ConfPath) {this.kerb5ConfPath = kerb5ConfPath;}public String getSslKeystoreLocation() {return this.sslKeystoreLocation;}public String getSslKeystoreType() {return this.sslKeystoreType;}public String getSslKeystorePassword() {return this.sslKeystorePassword;}public String getSaslKerberosServiceName() {return this.saslKerberosServiceName;}public String getClientJaasConfPath() {return this.clientJaasConfPath;}public void setSaslMechanism(String saslMechanism) {this.saslMechanism = saslMechanism;}public String getSaslMechanism() {return this.saslMechanism;}public void setClientJaasConf(String clientJaasConf) {this.clientJaasConf = clientJaasConf;}public String getClientJaasConf() {return this.clientJaasConf;}public String getKerb5ConfPath() {return this.kerb5ConfPath;}public int getMaxBlockMs() {return this.maxBlockMs;}public void setMaxBlockMs(int maxBlockMs) {this.maxBlockMs = maxBlockMs;}@Overridepublic void activateOptions() {// 加载过滤规则setFilterRules(includes, includeMatchSet, includeSet);setFilterRules(excludes, excludeMatchSet, excludeSet);Properties props = new Properties();if (this.brokerList != null) {props.put("bootstrap.servers", this.brokerList);}if (props.isEmpty()) {throw new ConfigException("The bootstrap servers property should be specified");} else if (this.topic == null) {throw new ConfigException("Topic must be specified by the Kafka log4j appender");} else {if (this.compressionType != null) {props.put("compression.type", this.compressionType);}props.put("acks", Integer.toString(this.requiredNumAcks));props.put("retries", this.retries);props.put("delivery.timeout.ms", this.deliveryTimeoutMs);if (this.securityProtocol != null) {props.put("security.protocol", this.securityProtocol);}if (this.securityProtocol != null&& this.securityProtocol.contains("SSL")&& this.sslTruststoreLocation != null&& this.sslTruststorePassword != null) {props.put("ssl.truststore.location", this.sslTruststoreLocation);props.put("ssl.truststore.password", this.sslTruststorePassword);if (this.sslKeystoreType != null&& this.sslKeystoreLocation != null&& this.sslKeystorePassword != null) {props.put("ssl.keystore.type", this.sslKeystoreType);props.put("ssl.keystore.location", this.sslKeystoreLocation);props.put("ssl.keystore.password", this.sslKeystorePassword);}}if (this.securityProtocol != null&& this.securityProtocol.contains("SASL")&& this.saslKerberosServiceName != null&& this.clientJaasConfPath != null) {props.put("sasl.kerberos.service.name", this.saslKerberosServiceName);System.setProperty("java.security.auth.login.config", this.clientJaasConfPath);}if (this.kerb5ConfPath != null) {System.setProperty("java.security.krb5.conf", this.kerb5ConfPath);}if (this.saslMechanism != null) {props.put("sasl.mechanism", this.saslMechanism);}if (this.clientJaasConf != null) {props.put("sasl.jaas.config", this.clientJaasConf);}if (this.maxBlockMs != null) {props.put("max.block.ms", this.maxBlockMs);}props.put("key.serializer", ByteArraySerializer.class.getName());props.put("value.serializer", ByteArraySerializer.class.getName());System.out.println("Properties:" + props);this.producer = this.getKafkaProducer(props);LogLog.debug("Kafka producer connected to " + this.brokerList);LogLog.debug("Logging for topic: " + this.topic);}}/*** 设置过滤规则** @name setFilterRules* @date 2023/3/2 下午1:57* @return void* @param excludes* @param excludeMatchSet* @param excludeSet* @author Jast*/private void setFilterRules(String excludes, Set<String> excludeMatchSet, Set<String> excludeSet) {if (excludes != null) {for (String exclude : excludes.split(",")) {if (exclude.length() > 0) {if (exclude.endsWith(".*")) {excludeMatchSet.add(exclude.replace(".*", ""));} else {excludeSet.add(exclude);}}}}}protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {return new KafkaProducer(props);}@Overrideprotected void append(LoggingEvent event) {if (filterPackageName(event)) {return;}String message = this.subAppend(event);LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);Future<RecordMetadata> response =this.producer.send(new ProducerRecord(this.topic, message.getBytes(StandardCharsets.UTF_8)));if (this.syncSend) {try {response.get();} catch (ExecutionException | InterruptedException var5) {if (!this.ignoreExceptions) {throw new RuntimeException(var5);}LogLog.debug("Exception while getting response", var5);}}}private String subAppend(LoggingEvent event) {return this.layout == null ? event.getRenderedMessage() : this.layout.format(event);}@Overridepublic void close() {if (!this.closed) {this.closed = true;this.producer.close();}}@Overridepublic boolean requiresLayout() {return true;}/*** 过滤包名,如果为True则不发送到Kafka** @name filterPackageName* @date 2023/2/28 下午4:07* @return boolean* @param event* @author Jast*/private boolean filterPackageName(LoggingEvent event) {boolean flag = true;if (includeSet.size() == 0&& includeMatchSet.size() == 0&& excludeSet.size() == 0&& excludeMatchSet.size() == 0) {return false;}if (includeSet.size() == 0 && includeMatchSet.size() == 0) {flag = false;}/** 打印日志类/名称 */String loggerName = event.getLoggerName();for (String include : includeSet) {if (loggerName.equals(include)) {flag = false;}}for (String include : includeMatchSet) {if (loggerName.startsWith(include)) {flag = false;}}for (String exclude : excludeMatchSet) {if (loggerName.startsWith(exclude)) {flag = true;}}for (String exclude : excludeSet) {if (loggerName.equals(exclude)) {flag = true;}}return flag;}
}

修改log4j.properties配置

修改Spark自身的配置文件,配置文件位置:/opt/spark-client/conf

不同的集群可能配置文件所在目录不同

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.avris.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=app_name:xxx

启动命令配置添加参数

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \

说明:

  • kafka-appender-1.0.0.jar 为我们刚刚自定义的KafkaLog4jAppender类打成的jar包
  • slf4j-api-1.8.0-beta2.jar与slf4j-log4j12-1.8.0-beta2.jar版本是为了解决日志版本引发的NullpointException异常,在下面异常处理章节有描述
  • slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar这两个jar包也要放在lib目录下通过–jars命令提交

启动之后可以在Kafka中查询发送数据

{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 14:38:35 - 本轮任务计算完成,休眠 10000","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02T06:38:35.858Z","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.util.Print","class":"com.avris.util.Print$"}

这里有个问题net.logstash.log4j.JSONEventLayoutV1实现的方法,时区是错误的,我们需要修改时区,下面我们介绍自定义实现Layout

时区问题-自定义实现JSONLayout解决

JSONLayout比较影响性能,建议数据量大的情况下不要使用,自行记录相关信息,然后打印日志

自定义JSONLayout.java

在我们刚刚自定义KafkaAppender项目中创建JSONLayout.java类,内容如下

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LocationInfo;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;public class JSONLayout extends Layout {private boolean locationInfo = false;private String customUserFields;private boolean ignoreThrowable = false;private boolean activeIgnoreThrowable = ignoreThrowable;private String hostname = InetAddress.getLocalHost().getHostName();private String threadName;private long timestamp;private String ndc;private Map mdc;private LocationInfo info;private HashMap<String, Object> exceptionInformation;private static Integer version = 1;private JSONObject logstashEvent;public static final TimeZone GMT_8 = TimeZone.getTimeZone("GMT+8");public static final FastDateFormat ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", GMT_8);public static final String ADDITIONAL_DATA_PROPERTY = "com.avris.JSONLayout.UserFields";public static String dateFormat(long timestamp) {return ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS.format(timestamp);}/*** For backwards compatibility, the default is to generate location information in the log* messages.*/public JSONLayout() throws UnknownHostException {this(true);}/*** Creates a layout that optionally inserts location information into log messages.** @param locationInfo whether or not to include location information in the log messages.*/public JSONLayout(boolean locationInfo) throws UnknownHostException {this.locationInfo = locationInfo;}@Overridepublic String format(LoggingEvent loggingEvent) {threadName = loggingEvent.getThreadName();timestamp = loggingEvent.getTimeStamp();exceptionInformation = new HashMap<String, Object>();mdc = loggingEvent.getProperties();ndc = loggingEvent.getNDC();logstashEvent = new JSONObject();String whoami = this.getClass().getSimpleName();/*** All v1 of the event format requires is "@timestamp" and "@version" Every other field is* arbitrary*/logstashEvent.put("@version", version);logstashEvent.put("@timestamp", dateFormat(timestamp));/** Extract and add fields from log4j config, if defined */if (getUserFields() != null) {String userFlds = getUserFields();LogLog.debug("[" + whoami + "] Got user data from log4j property: " + userFlds);addUserFields(userFlds);}/*** Extract fields from system properties, if defined Note that CLI props will override* conflicts with log4j config*/if (System.getProperty(ADDITIONAL_DATA_PROPERTY) != null) {if (getUserFields() != null) {LogLog.warn("["+ whoami+ "] Loading UserFields from command-line. This will override any UserFields set in the log4j configuration file");}String userFieldsProperty = System.getProperty(ADDITIONAL_DATA_PROPERTY);LogLog.debug("[" + whoami + "] Got user data from system property: " + userFieldsProperty);addUserFields(userFieldsProperty);}/** Now we start injecting our own stuff. */logstashEvent.put("source_host", hostname);logstashEvent.put("message", loggingEvent.getRenderedMessage());if (loggingEvent.getThrowableInformation() != null) {final ThrowableInformation throwableInformation =loggingEvent.getThrowableInformation();if (throwableInformation.getThrowable().getClass().getCanonicalName() != null) {exceptionInformation.put("exception_class",throwableInformation.getThrowable().getClass().getCanonicalName());}if (throwableInformation.getThrowable().getMessage() != null) {exceptionInformation.put("exception_message", throwableInformation.getThrowable().getMessage());}if (throwableInformation.getThrowableStrRep() != null) {String stackTrace =StringUtils.join(throwableInformation.getThrowableStrRep(), "\n");exceptionInformation.put("stacktrace", stackTrace);}addEventData("exception", exceptionInformation);}if (locationInfo) {info = loggingEvent.getLocationInformation();addEventData("file", info.getFileName());addEventData("line_number", info.getLineNumber());addEventData("class", info.getClassName());addEventData("method", info.getMethodName());}addEventData("logger_name", loggingEvent.getLoggerName());addEventData("mdc", mdc);addEventData("ndc", ndc);addEventData("level", loggingEvent.getLevel().toString());addEventData("thread_name", threadName);return logstashEvent.toString() + "\n";}@Overridepublic boolean ignoresThrowable() {return ignoreThrowable;}/*** Query whether log messages include location information.** @return true if location information is included in log messages, false otherwise.*/public boolean getLocationInfo() {return locationInfo;}/*** Set whether log messages should include location information.** @param locationInfo true if location information should be included, false otherwise.*/public void setLocationInfo(boolean locationInfo) {this.locationInfo = locationInfo;}public String getUserFields() {return customUserFields;}public void setUserFields(String userFields) {this.customUserFields = userFields;}@Overridepublic void activateOptions() {activeIgnoreThrowable = ignoreThrowable;}private void addUserFields(String data) {if (null != data) {String[] pairs = data.split(",");for (String pair : pairs) {String[] userField = pair.split(":", 2);if (userField[0] != null) {String key = userField[0];String val = userField[1];addEventData(key, val);}}}}private void addEventData(String keyname, Object keyval) {if (null != keyval) {logstashEvent.put(keyname, keyval);}}
}

相关依赖

 <dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.21</version><scope>provided</scope></dependency>

打包上传服务器准备运行

启动命令中将kafka-appender-1.0.0.jar以及相关依赖添加

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \

启动后查看数据,发现@timestamp时间正常了

{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 16:41:24 - 本轮自定义任务计算完成","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02 16:41:24","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.avris.util.Print","class":"com.avris.util.Print$"}

一键应用

查看本节之前,请将之前讲解的步骤都看一遍,否则可能不了解。

为了方便应用,我将实现的类打包传到了中央仓库,可以直接通过Maven引用,直接使用,具体步骤如下

  1. maven中引用依赖
 				<dependency><groupId>com.gitee.jastee</groupId><artifactId>kafka-log4j-appender</artifactId><version>1.0.5</version></dependency>
  1. 在代码中使用Log打印日志
  2. 修改Spark配置文件log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.gitee.jastee.kafka.appender.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=com.gitee.jastee.kafka.appender.JSONLayout
log4j.appender.kafka.layout.UserFields=app_name:xxx
  1. 启动命令添加

使用--conf 指定加载的jar包太多了?可以将三个包合成一个包去指定,使用jar xf解压,jar -cvfM在压缩,合成一个包即可,这里因为不同环境可能使用的slf4j版本不同,我就没统一合并,实际开发中根据自己需求合并就行。

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \
  1. 启动程序

可能遇到的异常

ClassNotFoundException: xxx.KafkaLog4jAppender

启动程序提示异常,明明在启动时候将jar包提交了却提示找不到类

log4j:ERROR Could not instantiate class [com.xxx.KafkaLog4jAppender].
java.lang.ClassNotFoundException: com.xxx.KafkaLog4jAppender

原因:

因为Spark启动最初还未加载–jars的jar包,通过spark.executor.extraClassPathspark.driver.extraClassPath将我们自定义kafka-appender-1.0.0.jar(jar包中的类就是KafkaLog4jAppender.java)提交上去即可,如果是使用官方的,就将官方的jar包提交上去即可

解决方法:

在启动脚本添加

--conf "spark.driver.extraClassPath=kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=kafka-appender-1.0.0.jar" \

Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException

Unexpected problem occured during version sanity check
Reported exception:
java.lang.NullPointerExceptionat org.slf4j.LoggerFactory.versionSanityCheck(LoggerFactory.java:272)at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:126)at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:417)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)at com.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)at com.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" java.lang.ExceptionInInitializerErrorat org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)at com.avris.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)at com.avris.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerExceptionat org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:423)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)... 27 more

原因:

使用日志版本问题,现在使用的版本为slf4j-log4j12-1.7.30.jar

解决方法:

使用slf4j的1.8.0-beta2版本

<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.8.0-beta2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.8.0-beta2</version></dependency>

通过spark.driver.extraClassPathspark.executor.extraClassPath参数提交

--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \

参考文章

https://www.jianshu.com/p/cde2b4712859

https://blog.csdn.net/epitomizelu/article/details/123687998

相关文章:

Spark使用Log4j将日志发送到Kafka

文章目录自定义KafkaAppender修改log4j.properties配置启动命令配置添加参数启动之后可以在Kafka中查询发送数据时区问题-自定义实现JSONLayout解决自定义JSONLayout.java一键应用可能遇到的异常ClassNotFoundException: xxx.KafkaLog4jAppenderUnexpected problem occured dur…...

c++类与对象整理(上)

目录 1.类的引入 2.类的定义 3.类的访问限定符及封装 1&#xff09;访问限定符 2&#xff09;封装 4.类的作用域 5.类的实例化 6.类的对象大小的计算 1&#xff09;类对象的存储方式 2&#xff09;内存对齐和大小计算 ​编辑 7.类成员函数的this指针 1&#xff09…...

Docker学习(十九)什么是镜像的元数据?

在 Docker 中&#xff0c;镜像的元数据是指与镜像相关的所有信息&#xff0c;包括镜像的名称和标签、作者、描述、创建日期、环境变量、命令等。这些信息都是通过 Dockerfile 或命令行创建和指定的。 镜像的元数据被存储在 Docker Registry 中&#xff0c;并在使用 docker pull…...

Python如何获取弹幕?给你介绍两种方式

前言 弹幕可以给观众一种“实时互动”的错觉&#xff0c;虽然不同弹幕的发送时间有所区别&#xff0c;但是其只会在视频中特定的一个时间点出现&#xff0c;因此在相同时刻发送的弹幕基本上也具有相同的主题&#xff0c;在参与评论时就会有与其他观众同时评论的错觉。 在国内…...

JAVA- AOP 面向切面编程 Aspect切面工具类 记录特定方法执行时的入参、执行时间、返参等内容

背景&#xff1a;JAVA项目&#xff0c;使用AOP对指定函数进行切面。能够记录特定方法执行时的入参、执行时间、返参结果等内容。 文章目录1、自定义注解类1.1 Target1.2 Retention2、Aspect切面工具2.1 JointPoint2.2 Pointcut2.3 切面中的相关注解3、同一个类里调用AOP4、其他…...

「史上最全的 TCG 规范解读」TCG 规范架构概述(下)

可信计算组织&#xff08;Ttrusted Computing Group,TCG&#xff09;是一个非盈利的工业标准组织&#xff0c;它的宗旨是加强不同计算机平台上计算环境的安全性。TCG 于 2003 年春成立&#xff0c;并采纳了由可信计算平台联盟&#xff08;the Trusted Computing Platform Allia…...

GDScript 导出变量 (4.0)

概述 导出变量的功能在3.x版本中也是有的&#xff0c;但是4.0版本对其进行了语法上的改进。 导出变量在日常的游戏制作中提供节点的自定义参数化调节功能时非常有用&#xff0c;除此之外还用于自定义资源。 本文是&#xff08;Bilibili巽星石&#xff09;在4.0官方文档《GDScr…...

JAVA知识点全面总结6:泛型反射和注解

六.JAVA知识点全面总结6泛型反射和注解 1.什么是泛型?可以用在哪里&#xff1f; 2.泛型擦除机制是什么&#xff1f;为什么擦除&#xff1f; 3.通配符是什么&#xff1f;作用是什么&#xff1f; 未更新 1.注解是什么&#xff1f;有什么用&#xff1f; 2.注解的自定义和实…...

死代码删除(DCE,Dead Code Elimination)和激进的死代码删除(ADCE,Aggressive DCE)

死代码删除&#xff08;DCE&#xff0c;Dead Code Elimination&#xff09;和激进的死代码删除&#xff08;ADCE&#xff0c;Aggressive DCE&#xff09;死代码删除&#xff08;DCE&#xff0c;Dead Code Elimination&#xff09;DCE简介DCE基本算法激进的死代码删除&#xff0…...

询问new bing关于android开发的15个问题(前景、未来、发展方向)

前言&#xff1a;new bing是基于chat-gpt的新搜索工具&#xff0c;可以采用对话方式进行问题搜索&#xff0c;经过排队等候终于可以使用new bing&#xff0c;询问了目前我最关心的关于android开发几个问题 文章目录1.如何学好android开发&#xff1f;2.android开发能做什么?3.…...

【C++】初识类和对象

&#x1f3d6;️作者&#xff1a;malloc不出对象 ⛺专栏&#xff1a;C的学习之路 &#x1f466;个人简介&#xff1a;一名双非本科院校大二在读的科班编程菜鸟&#xff0c;努力编程只为赶上各位大佬的步伐&#x1f648;&#x1f648; 目录前言一、面向过程和面向对象初步认识二…...

EPICS S7nodave手册

第一章&#xff1a;介绍 本手册分为6章(不算次介绍部分)。第一章介绍s7nodave用于EPICS的设备支持的概念和特新。第二章描述启动一个使用s7nodave的IOC项目所需要的几步。第三章描述s7nodave支持的IOC shell命令。之后&#xff0c;第四章解释s7nodave支持的各种记录类型。最后…...

2023最新版本RabbitMQ的持久化和简单使用

上节讲了 RabbitMQ下载安装教程 &#xff0c; 本节主要介绍RabbitMQ的持久化和简单使用。 一、RabbitMQ消息持久化 当处理一个比较耗时得任务的时候&#xff0c;也许想知道消费者&#xff08;consumers&#xff09;是否运行到一半就挂掉。在当前的代码中&#xff0c;当RabbitM…...

函数式编程

函数式编程&#xff08;一&#xff09; 文章目录函数式编程&#xff08;一&#xff09;1. 前言1.1 概念2. Lambda 表达式2.1 概述2.2 基本的格式2.3 触发条件2.4 Lambda表达式2.4.1 无参无返回值2.4.2 有参无返回值2.4.3 无参数有返回值2.4.4 有参有返回值【重点】2.4.4.1 比较…...

【Java 类】001-访问修饰符、命名规范

【Java 类】001-访问修饰符、命名规范 文章目录【Java 类】001-访问修饰符、命名规范一、访问修饰符概述1、是什么2、作用作用问题3、访问修饰符有哪些4、作用对象二、访问修饰符使用演示1、类访问修饰符演示第一步&#xff1a;创建 Dog 类&#xff1a;public第二步&#xff1a…...

【C++】命名空间

&#x1f3d6;️作者&#xff1a;malloc不出对象 ⛺专栏&#xff1a;C的学习之路 &#x1f466;个人简介&#xff1a;一名双非本科院校大二在读的科班编程菜鸟&#xff0c;努力编程只为赶上各位大佬的步伐&#x1f648;&#x1f648; 目录前言一、命名空间产生的背景二、命名空…...

【AutoSAR】【MCAL】Dio

一、结构 二、功能介绍 DIO&#xff08;数字输入输出&#xff09;驱动模块主要是对端口&#xff08;Port&#xff09;&#xff0c;通道&#xff08;Channel&#xff09;和通道组&#xff08;ChannelGroup&#xff09;进行读写操作。 通道&#xff08;Channel&#xff09;&…...

瑞吉外卖——day2

目录 一、新增员工 二、查询分页数据 三、启用、禁用员工账户、编辑员工信息 一、新增员工 点击左上角新增员工 页面如下&#xff1a; 我们随便填数据 &#xff0c;点击保存&#xff0c;请求的地址如下 返回前端可以看到请求方式为Post 在employeeController中编写对应的代…...

了解java

#常见编程语言介绍 C语言 C语言 java语言 javaScript语言 PHP语言 python语言Object-C和Swift语言 C# &#xff08;c sharp&#xff09;语言 Kotlin语言 Go语言 Basic语言 #JAVA的发展 起源于1991年SUN公司GREEN项目&#xff0c;1996年JDK1.0正式发布 后被Oracle公司收购&…...

【编程实践】代码之中有创意:“我一直认为工程师世界上最具创造性的工作之一”

代码之中有创意 “我一直认为工程师世界上最具创造性的工作之一”。 文章目录 代码之中有创意一、代码可以赋予创造力1.1 代码的创造力1.2 如何发挥代码的创造力二、有创意的代码可以提高工作效率2.1 代码创意可以提高工作效率2.2 如何利用代码创意来提高工作效率三、代码创意可…...

【MySQL】表连接

一、为什么要学习 因为不合理的使用连接会导致慢查询 二、什么是连接 参与连接的表叫做 连接表&#xff0c; 连接就是把 各个连接表 进行的组合 &#xff08;笛卡儿积&#xff09;加入结果集并返回 三、连接查询 如何只是对表进行大量的连接&#xff0c;笛卡儿积作用得到的…...

2023湖南省“楚怡杯”职业技能大赛“网络安全” 项目比赛任务书

2023湖南省“楚怡杯”职业技能大赛“网络安全” 项目比赛任务书2023安徽省“中银杯”职业技能大赛“网络安全” 项目比赛任务书A模块基础设施设置/安全加固&#xff08;200分&#xff09;A-1&#xff1a;登录安全加固&#xff08;Windows, Linux&#xff09;A-2&#xff1a;Ngi…...

Android应用启动优化笔记整理

应用启动相关流程与优化 应用启动主要涉及SystemServer进程 和 app进程。 SystemServer进程负责app进程创建和管理、窗口的创建和管理&#xff08;StartingWindow 和 AppWindow&#xff09;、应用的启动流程调度等。 App进程被创建后&#xff0c;进行一系列进程初始化、组件初…...

图像bytes字节串二进制转十六进制及bytes转为图像

目录前言正文二进制与十六进制的bytes互转读取bytes为图像法1:直接写入f.read的结果法2: 转换为PIL或Numpy前言 参考&#xff1a; 8. python基础之基础数据类型–bytes - CSDN python 16进制与图片互转 - CSDN 正文 二进制与十六进制的bytes互转 bytes保存的是原始的字节(二…...

信息安全与数学基础-笔记-②同余

知识目录同余完全剩余系剩余类完全剩余系❀简化剩余系❀欧拉函数逆元&#xff01;欧拉定理 &#xff01;同余 a,b 两个数字&#xff0c;都模m&#xff0c;当两个数字模m后余的数一样即为同余。 例子&#xff1a; a bq r (mod m)&#xff0c;这里的a 和 r 就是同余 &#xff…...

网络安全法

目录正文第一章第二章第三章第四章第五章第六章 法律责任第七章 附则正文 学习网络安全应该知道网络安全法 第一章 总则 第一条: 为了保障网络安全&#xff0c;维护网络空间主权和国家安全、社会公共利益&#xff0c;保护公民、法人和其他组织的合法权益&#xff0c;促进经济…...

django框架开发部署项目

前言&#xff1a;相信看到这篇文章的小伙伴都或多或少有一些编程基础&#xff0c;懂得一些linux的基本命令了吧&#xff0c;本篇文章将带领大家服务器如何部署一个使用django框架开发的一个网站进行云服务器端的部署。 文章使用到的的工具 Python&#xff1a;一种编程语言&…...

Unity记录1.3-入门-第一阶段总结

文章首发及后续更新&#xff1a;https://mwhls.top/4447.html&#xff0c;无图/无目录/格式错误/更多相关请至首发页查看。 新的更新内容请到mwhls.top查看。 欢迎提出任何疑问及批评&#xff0c;非常感谢&#xff01; 汇总&#xff1a;Unity 记录 摘要&#xff1a;第一阶段的总…...

Linux入门篇-文件管理

简介 简单的文件管理。 ⽂件内容的查看 ⽂本⽂件内容的查看 cat ⽂本⽂件的path1 ⽂本⽂件的path2 head ⽂本⽂件的path &#xff0c;显示⽂件的前10⾏内容 head -n 5 ⽂本⽂件的path , 显示⽂件的前5⾏内容 head -5 等于head -n 5tail ⽂本⽂件的path, 显示⽂件的后10⾏内容…...

如何从错误中成长?

在上一篇文章“技术人的犯错成本”里&#xff0c;我和你聊了技术人可能会犯的各式各样的错误&#xff0c;也举了很多例子&#xff0c;说明了技术人犯错的成本。在竞争激烈的互联网时代&#xff0c;试错当然是好事&#xff0c;但了解错误成本&#xff0c;避免不应该犯的错误&…...