Spark使用Log4j将日志发送到Kafka
文章目录
- 自定义KafkaAppender
- 修改log4j.properties配置
- 启动命令配置添加参数
- 启动之后可以在Kafka中查询发送数据
- 时区问题-自定义实现JSONLayout解决
- 自定义JSONLayout.java
- 一键应用
- 可能遇到的异常
- ClassNotFoundException: xxx.KafkaLog4jAppender
- Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException
- 参考文章
自定义KafkaAppender
注意:如果使用官方自带的可以直接引用,版本为Kafka当前使用的版本
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-log4j-appender</artifactId><version>2.4.1</version></dependency>
配置依赖为
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.28</version><scope>compile</scope></dependency>
自定义KafkaLog4jAppender.java内容为
这里我们实现了包名过滤功能
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaLog4jAppender extends AppenderSkeleton {/** 包含规则条件 */private Set<String> includeSet = new HashSet<>();private Set<String> includeMatchSet = new HashSet<>();/** 不包含规则条件 */private Set<String> excludeSet = new HashSet<>();private Set<String> excludeMatchSet = new HashSet<>();private String brokerList;private String topic;private String compressionType;private String securityProtocol;private String sslTruststoreLocation;private String sslTruststorePassword;private String sslKeystoreType;private String sslKeystoreLocation;private String sslKeystorePassword;private String saslKerberosServiceName;private String saslMechanism;private String clientJaasConfPath;private String clientJaasConf;private String kerb5ConfPath;private Integer maxBlockMs;private int retries = 2147483647;private int requiredNumAcks = 1;private int deliveryTimeoutMs = 120000;private boolean ignoreExceptions = true;private boolean syncSend;private Producer<byte[], byte[]> producer;private String includes;private String excludes;public String getIncludes() {return includes;}public void setIncludes(String includes) {this.includes = includes;}public String getExcludes() {return excludes;}public void setExcludes(String excludes) {this.excludes = excludes;}public KafkaLog4jAppender() {}public Producer<byte[], byte[]> getProducer() {return this.producer;}public String getBrokerList() {return this.brokerList;}public void setBrokerList(String brokerList) {this.brokerList = brokerList;}public int getRequiredNumAcks() {return this.requiredNumAcks;}public void setRequiredNumAcks(int requiredNumAcks) {this.requiredNumAcks = requiredNumAcks;}public int getRetries() {return this.retries;}public void setRetries(int retries) {this.retries = retries;}public int getDeliveryTimeoutMs() {return this.deliveryTimeoutMs;}public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {this.deliveryTimeoutMs = deliveryTimeoutMs;}public String getCompressionType() {return this.compressionType;}public void setCompressionType(String compressionType) {this.compressionType = compressionType;}public String getTopic() {return this.topic;}public void setTopic(String topic) {this.topic = topic;}public boolean getIgnoreExceptions() {return this.ignoreExceptions;}public void setIgnoreExceptions(boolean ignoreExceptions) {this.ignoreExceptions = ignoreExceptions;}public boolean getSyncSend() {return this.syncSend;}public void setSyncSend(boolean syncSend) {this.syncSend = syncSend;}public String getSslTruststorePassword() {return this.sslTruststorePassword;}public String getSslTruststoreLocation() {return this.sslTruststoreLocation;}public String getSecurityProtocol() {return this.securityProtocol;}public void setSecurityProtocol(String securityProtocol) {this.securityProtocol = securityProtocol;}public void setSslTruststoreLocation(String sslTruststoreLocation) {this.sslTruststoreLocation = sslTruststoreLocation;}public void setSslTruststorePassword(String sslTruststorePassword) {this.sslTruststorePassword = sslTruststorePassword;}public void setSslKeystorePassword(String sslKeystorePassword) {this.sslKeystorePassword = sslKeystorePassword;}public void setSslKeystoreType(String sslKeystoreType) {this.sslKeystoreType = sslKeystoreType;}public void setSslKeystoreLocation(String sslKeystoreLocation) {this.sslKeystoreLocation = sslKeystoreLocation;}public void setSaslKerberosServiceName(String saslKerberosServiceName) {this.saslKerberosServiceName = saslKerberosServiceName;}public void setClientJaasConfPath(String clientJaasConfPath) {this.clientJaasConfPath = clientJaasConfPath;}public void setKerb5ConfPath(String kerb5ConfPath) {this.kerb5ConfPath = kerb5ConfPath;}public String getSslKeystoreLocation() {return this.sslKeystoreLocation;}public String getSslKeystoreType() {return this.sslKeystoreType;}public String getSslKeystorePassword() {return this.sslKeystorePassword;}public String getSaslKerberosServiceName() {return this.saslKerberosServiceName;}public String getClientJaasConfPath() {return this.clientJaasConfPath;}public void setSaslMechanism(String saslMechanism) {this.saslMechanism = saslMechanism;}public String getSaslMechanism() {return this.saslMechanism;}public void setClientJaasConf(String clientJaasConf) {this.clientJaasConf = clientJaasConf;}public String getClientJaasConf() {return this.clientJaasConf;}public String getKerb5ConfPath() {return this.kerb5ConfPath;}public int getMaxBlockMs() {return this.maxBlockMs;}public void setMaxBlockMs(int maxBlockMs) {this.maxBlockMs = maxBlockMs;}@Overridepublic void activateOptions() {// 加载过滤规则setFilterRules(includes, includeMatchSet, includeSet);setFilterRules(excludes, excludeMatchSet, excludeSet);Properties props = new Properties();if (this.brokerList != null) {props.put("bootstrap.servers", this.brokerList);}if (props.isEmpty()) {throw new ConfigException("The bootstrap servers property should be specified");} else if (this.topic == null) {throw new ConfigException("Topic must be specified by the Kafka log4j appender");} else {if (this.compressionType != null) {props.put("compression.type", this.compressionType);}props.put("acks", Integer.toString(this.requiredNumAcks));props.put("retries", this.retries);props.put("delivery.timeout.ms", this.deliveryTimeoutMs);if (this.securityProtocol != null) {props.put("security.protocol", this.securityProtocol);}if (this.securityProtocol != null&& this.securityProtocol.contains("SSL")&& this.sslTruststoreLocation != null&& this.sslTruststorePassword != null) {props.put("ssl.truststore.location", this.sslTruststoreLocation);props.put("ssl.truststore.password", this.sslTruststorePassword);if (this.sslKeystoreType != null&& this.sslKeystoreLocation != null&& this.sslKeystorePassword != null) {props.put("ssl.keystore.type", this.sslKeystoreType);props.put("ssl.keystore.location", this.sslKeystoreLocation);props.put("ssl.keystore.password", this.sslKeystorePassword);}}if (this.securityProtocol != null&& this.securityProtocol.contains("SASL")&& this.saslKerberosServiceName != null&& this.clientJaasConfPath != null) {props.put("sasl.kerberos.service.name", this.saslKerberosServiceName);System.setProperty("java.security.auth.login.config", this.clientJaasConfPath);}if (this.kerb5ConfPath != null) {System.setProperty("java.security.krb5.conf", this.kerb5ConfPath);}if (this.saslMechanism != null) {props.put("sasl.mechanism", this.saslMechanism);}if (this.clientJaasConf != null) {props.put("sasl.jaas.config", this.clientJaasConf);}if (this.maxBlockMs != null) {props.put("max.block.ms", this.maxBlockMs);}props.put("key.serializer", ByteArraySerializer.class.getName());props.put("value.serializer", ByteArraySerializer.class.getName());System.out.println("Properties:" + props);this.producer = this.getKafkaProducer(props);LogLog.debug("Kafka producer connected to " + this.brokerList);LogLog.debug("Logging for topic: " + this.topic);}}/*** 设置过滤规则** @name setFilterRules* @date 2023/3/2 下午1:57* @return void* @param excludes* @param excludeMatchSet* @param excludeSet* @author Jast*/private void setFilterRules(String excludes, Set<String> excludeMatchSet, Set<String> excludeSet) {if (excludes != null) {for (String exclude : excludes.split(",")) {if (exclude.length() > 0) {if (exclude.endsWith(".*")) {excludeMatchSet.add(exclude.replace(".*", ""));} else {excludeSet.add(exclude);}}}}}protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {return new KafkaProducer(props);}@Overrideprotected void append(LoggingEvent event) {if (filterPackageName(event)) {return;}String message = this.subAppend(event);LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);Future<RecordMetadata> response =this.producer.send(new ProducerRecord(this.topic, message.getBytes(StandardCharsets.UTF_8)));if (this.syncSend) {try {response.get();} catch (ExecutionException | InterruptedException var5) {if (!this.ignoreExceptions) {throw new RuntimeException(var5);}LogLog.debug("Exception while getting response", var5);}}}private String subAppend(LoggingEvent event) {return this.layout == null ? event.getRenderedMessage() : this.layout.format(event);}@Overridepublic void close() {if (!this.closed) {this.closed = true;this.producer.close();}}@Overridepublic boolean requiresLayout() {return true;}/*** 过滤包名,如果为True则不发送到Kafka** @name filterPackageName* @date 2023/2/28 下午4:07* @return boolean* @param event* @author Jast*/private boolean filterPackageName(LoggingEvent event) {boolean flag = true;if (includeSet.size() == 0&& includeMatchSet.size() == 0&& excludeSet.size() == 0&& excludeMatchSet.size() == 0) {return false;}if (includeSet.size() == 0 && includeMatchSet.size() == 0) {flag = false;}/** 打印日志类/名称 */String loggerName = event.getLoggerName();for (String include : includeSet) {if (loggerName.equals(include)) {flag = false;}}for (String include : includeMatchSet) {if (loggerName.startsWith(include)) {flag = false;}}for (String exclude : excludeMatchSet) {if (loggerName.startsWith(exclude)) {flag = true;}}for (String exclude : excludeSet) {if (loggerName.equals(exclude)) {flag = true;}}return flag;}
}
修改log4j.properties配置
修改Spark自身的配置文件,配置文件位置:/opt/spark-client/conf
不同的集群可能配置文件所在目录不同
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.avris.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=net.logstash.log4j.JSONEventLayoutV1
log4j.appender.kafka.layout.UserFields=app_name:xxx
启动命令配置添加参数
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \
说明:
- kafka-appender-1.0.0.jar 为我们刚刚自定义的
KafkaLog4jAppender类打成的jar包 - slf4j-api-1.8.0-beta2.jar与slf4j-log4j12-1.8.0-beta2.jar版本是为了解决日志版本引发的NullpointException异常,在下面异常处理章节有描述
- slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar这两个jar包也要放在lib目录下通过–jars命令提交
启动之后可以在Kafka中查询发送数据
{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 14:38:35 - 本轮任务计算完成,休眠 10000","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02T06:38:35.858Z","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.util.Print","class":"com.avris.util.Print$"}
这里有个问题
net.logstash.log4j.JSONEventLayoutV1实现的方法,时区是错误的,我们需要修改时区,下面我们介绍自定义实现Layout
时区问题-自定义实现JSONLayout解决
JSONLayout比较影响性能,建议数据量大的情况下不要使用,自行记录相关信息,然后打印日志
自定义JSONLayout.java
在我们刚刚自定义KafkaAppender项目中创建JSONLayout.java类,内容如下
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LocationInfo;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;public class JSONLayout extends Layout {private boolean locationInfo = false;private String customUserFields;private boolean ignoreThrowable = false;private boolean activeIgnoreThrowable = ignoreThrowable;private String hostname = InetAddress.getLocalHost().getHostName();private String threadName;private long timestamp;private String ndc;private Map mdc;private LocationInfo info;private HashMap<String, Object> exceptionInformation;private static Integer version = 1;private JSONObject logstashEvent;public static final TimeZone GMT_8 = TimeZone.getTimeZone("GMT+8");public static final FastDateFormat ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS =FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", GMT_8);public static final String ADDITIONAL_DATA_PROPERTY = "com.avris.JSONLayout.UserFields";public static String dateFormat(long timestamp) {return ISO_DATETIME_TIME_ZONE_FORMAT_WITH_MILLIS.format(timestamp);}/*** For backwards compatibility, the default is to generate location information in the log* messages.*/public JSONLayout() throws UnknownHostException {this(true);}/*** Creates a layout that optionally inserts location information into log messages.** @param locationInfo whether or not to include location information in the log messages.*/public JSONLayout(boolean locationInfo) throws UnknownHostException {this.locationInfo = locationInfo;}@Overridepublic String format(LoggingEvent loggingEvent) {threadName = loggingEvent.getThreadName();timestamp = loggingEvent.getTimeStamp();exceptionInformation = new HashMap<String, Object>();mdc = loggingEvent.getProperties();ndc = loggingEvent.getNDC();logstashEvent = new JSONObject();String whoami = this.getClass().getSimpleName();/*** All v1 of the event format requires is "@timestamp" and "@version" Every other field is* arbitrary*/logstashEvent.put("@version", version);logstashEvent.put("@timestamp", dateFormat(timestamp));/** Extract and add fields from log4j config, if defined */if (getUserFields() != null) {String userFlds = getUserFields();LogLog.debug("[" + whoami + "] Got user data from log4j property: " + userFlds);addUserFields(userFlds);}/*** Extract fields from system properties, if defined Note that CLI props will override* conflicts with log4j config*/if (System.getProperty(ADDITIONAL_DATA_PROPERTY) != null) {if (getUserFields() != null) {LogLog.warn("["+ whoami+ "] Loading UserFields from command-line. This will override any UserFields set in the log4j configuration file");}String userFieldsProperty = System.getProperty(ADDITIONAL_DATA_PROPERTY);LogLog.debug("[" + whoami + "] Got user data from system property: " + userFieldsProperty);addUserFields(userFieldsProperty);}/** Now we start injecting our own stuff. */logstashEvent.put("source_host", hostname);logstashEvent.put("message", loggingEvent.getRenderedMessage());if (loggingEvent.getThrowableInformation() != null) {final ThrowableInformation throwableInformation =loggingEvent.getThrowableInformation();if (throwableInformation.getThrowable().getClass().getCanonicalName() != null) {exceptionInformation.put("exception_class",throwableInformation.getThrowable().getClass().getCanonicalName());}if (throwableInformation.getThrowable().getMessage() != null) {exceptionInformation.put("exception_message", throwableInformation.getThrowable().getMessage());}if (throwableInformation.getThrowableStrRep() != null) {String stackTrace =StringUtils.join(throwableInformation.getThrowableStrRep(), "\n");exceptionInformation.put("stacktrace", stackTrace);}addEventData("exception", exceptionInformation);}if (locationInfo) {info = loggingEvent.getLocationInformation();addEventData("file", info.getFileName());addEventData("line_number", info.getLineNumber());addEventData("class", info.getClassName());addEventData("method", info.getMethodName());}addEventData("logger_name", loggingEvent.getLoggerName());addEventData("mdc", mdc);addEventData("ndc", ndc);addEventData("level", loggingEvent.getLevel().toString());addEventData("thread_name", threadName);return logstashEvent.toString() + "\n";}@Overridepublic boolean ignoresThrowable() {return ignoreThrowable;}/*** Query whether log messages include location information.** @return true if location information is included in log messages, false otherwise.*/public boolean getLocationInfo() {return locationInfo;}/*** Set whether log messages should include location information.** @param locationInfo true if location information should be included, false otherwise.*/public void setLocationInfo(boolean locationInfo) {this.locationInfo = locationInfo;}public String getUserFields() {return customUserFields;}public void setUserFields(String userFields) {this.customUserFields = userFields;}@Overridepublic void activateOptions() {activeIgnoreThrowable = ignoreThrowable;}private void addUserFields(String data) {if (null != data) {String[] pairs = data.split(",");for (String pair : pairs) {String[] userField = pair.split(":", 2);if (userField[0] != null) {String key = userField[0];String val = userField[1];addEventData(key, val);}}}}private void addEventData(String keyname, Object keyval) {if (null != keyval) {logstashEvent.put(keyname, keyval);}}
}
相关依赖
<dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.21</version><scope>provided</scope></dependency>
打包上传服务器准备运行
启动命令中将kafka-appender-1.0.0.jar以及相关依赖添加
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar:lib/fastjson-2.0.7.jar:lib/fastjson2-2.0.7.jar:lib/fastjson2-extension-2.0.7.jar" \
启动后查看数据,发现@timestamp时间正常了
{"source_host":"bigdata-24-194","method":"println","level":"INFO","message":"2023-03-02 16:41:24 - 本轮自定义任务计算完成","mdc":{},"app_name":"xxx","@timestamp":"2023-03-02 16:41:24","file":"Print.scala","line_number":"24","thread_name":"main","@version":1,"logger_name":"com.avris.util.Print","class":"com.avris.util.Print$"}
一键应用
查看本节之前,请将之前讲解的步骤都看一遍,否则可能不了解。
为了方便应用,我将实现的类打包传到了中央仓库,可以直接通过Maven引用,直接使用,具体步骤如下
- maven中引用依赖
<dependency><groupId>com.gitee.jastee</groupId><artifactId>kafka-log4j-appender</artifactId><version>1.0.5</version></dependency>
- 在代码中使用Log打印日志
- 修改
Spark配置文件log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %l %p %c{1}: %m%n# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO# 从这开始往下是添加的内容
# log4j.logger.com 代表包名com开头的类info日志都发送到Kafka,可以配置多个,注意包名要完整,比如com 不能写co
log4j.logger.com=info,kafka
log4j.logger.net.jast.xxx=info,kafka
log4j.logger.org.apache.kafka.clients.Metadata=ERROR
# 使用自定义的KafkaAppender
log4j.appender.kafka=com.gitee.jastee.kafka.appender.KafkaLog4jAppender
# 官方提供的KafkaAppender
#log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# 发送至Topic
log4j.appender.kafka.topic=test
# Kafka BrokerList
log4j.appender.kafka.brokerList=172.16.24.194:9092,172.16.24.195:9092,172.16.24.196:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.Threshold=INFO
log4j.appender.kafka.requiredNumAcks=-1
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=5000
log4j.appender.kafka.layout=com.gitee.jastee.kafka.appender.JSONLayout
log4j.appender.kafka.layout.UserFields=app_name:xxx
- 启动命令添加
使用
--conf指定加载的jar包太多了?可以将三个包合成一个包去指定,使用jar xf解压,jar -cvfM在压缩,合成一个包即可,这里因为不同环境可能使用的slf4j版本不同,我就没统一合并,实际开发中根据自己需求合并就行。
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-log4j-appender-1.0.5.jar" \
--jars $(echo $JAR_PATH/lib/*.jar | tr ' ' ',') \
- 启动程序
可能遇到的异常
ClassNotFoundException: xxx.KafkaLog4jAppender
启动程序提示异常,明明在启动时候将jar包提交了却提示找不到类
log4j:ERROR Could not instantiate class [com.xxx.KafkaLog4jAppender].
java.lang.ClassNotFoundException: com.xxx.KafkaLog4jAppender
原因:
因为Spark启动最初还未加载–jars的jar包,通过spark.executor.extraClassPath与spark.driver.extraClassPath将我们自定义kafka-appender-1.0.0.jar(jar包中的类就是KafkaLog4jAppender.java)提交上去即可,如果是使用官方的,就将官方的jar包提交上去即可
解决方法:
在启动脚本添加
--conf "spark.driver.extraClassPath=kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=kafka-appender-1.0.0.jar" \
Unexpected problem occured during version sanity check Reported exception: java.lang.NullPointerException
Unexpected problem occured during version sanity check
Reported exception:
java.lang.NullPointerExceptionat org.slf4j.LoggerFactory.versionSanityCheck(LoggerFactory.java:272)at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:126)at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:417)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)at com.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)at com.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" java.lang.ExceptionInInitializerErrorat org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:341)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)at com.avris.KafkaLog4jAppender.getKafkaProducer(KafkaLog4jAppender.java:285)at com.avris.KafkaLog4jAppender.activateOptions(KafkaLog4jAppender.java:278)at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)at org.slf4j.impl.StaticLoggerBinder.<init>(StaticLoggerBinder.java:72)at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:45)at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerExceptionat org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:423)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:362)at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:388)at org.apache.kafka.clients.CommonClientConfigs.<clinit>(CommonClientConfigs.java:32)... 27 more
原因:
使用日志版本问题,现在使用的版本为slf4j-log4j12-1.7.30.jar
解决方法:
使用slf4j的1.8.0-beta2版本
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.8.0-beta2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.8.0-beta2</version></dependency>
通过spark.driver.extraClassPath和spark.executor.extraClassPath参数提交
--conf "spark.driver.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
--conf "spark.executor.extraClassPath=slf4j-api-1.8.0-beta2.jar:slf4j-log4j12-1.8.0-beta2.jar:kafka-appender-1.0.0.jar" \
参考文章
https://www.jianshu.com/p/cde2b4712859
https://blog.csdn.net/epitomizelu/article/details/123687998
相关文章:
Spark使用Log4j将日志发送到Kafka
文章目录自定义KafkaAppender修改log4j.properties配置启动命令配置添加参数启动之后可以在Kafka中查询发送数据时区问题-自定义实现JSONLayout解决自定义JSONLayout.java一键应用可能遇到的异常ClassNotFoundException: xxx.KafkaLog4jAppenderUnexpected problem occured dur…...
c++类与对象整理(上)
目录 1.类的引入 2.类的定义 3.类的访问限定符及封装 1)访问限定符 2)封装 4.类的作用域 5.类的实例化 6.类的对象大小的计算 1)类对象的存储方式 2)内存对齐和大小计算 编辑 7.类成员函数的this指针 1)…...
Docker学习(十九)什么是镜像的元数据?
在 Docker 中,镜像的元数据是指与镜像相关的所有信息,包括镜像的名称和标签、作者、描述、创建日期、环境变量、命令等。这些信息都是通过 Dockerfile 或命令行创建和指定的。 镜像的元数据被存储在 Docker Registry 中,并在使用 docker pull…...
Python如何获取弹幕?给你介绍两种方式
前言 弹幕可以给观众一种“实时互动”的错觉,虽然不同弹幕的发送时间有所区别,但是其只会在视频中特定的一个时间点出现,因此在相同时刻发送的弹幕基本上也具有相同的主题,在参与评论时就会有与其他观众同时评论的错觉。 在国内…...
JAVA- AOP 面向切面编程 Aspect切面工具类 记录特定方法执行时的入参、执行时间、返参等内容
背景:JAVA项目,使用AOP对指定函数进行切面。能够记录特定方法执行时的入参、执行时间、返参结果等内容。 文章目录1、自定义注解类1.1 Target1.2 Retention2、Aspect切面工具2.1 JointPoint2.2 Pointcut2.3 切面中的相关注解3、同一个类里调用AOP4、其他…...
「史上最全的 TCG 规范解读」TCG 规范架构概述(下)
可信计算组织(Ttrusted Computing Group,TCG)是一个非盈利的工业标准组织,它的宗旨是加强不同计算机平台上计算环境的安全性。TCG 于 2003 年春成立,并采纳了由可信计算平台联盟(the Trusted Computing Platform Allia…...
GDScript 导出变量 (4.0)
概述 导出变量的功能在3.x版本中也是有的,但是4.0版本对其进行了语法上的改进。 导出变量在日常的游戏制作中提供节点的自定义参数化调节功能时非常有用,除此之外还用于自定义资源。 本文是(Bilibili巽星石)在4.0官方文档《GDScr…...
JAVA知识点全面总结6:泛型反射和注解
六.JAVA知识点全面总结6泛型反射和注解 1.什么是泛型?可以用在哪里? 2.泛型擦除机制是什么?为什么擦除? 3.通配符是什么?作用是什么? 未更新 1.注解是什么?有什么用? 2.注解的自定义和实…...
死代码删除(DCE,Dead Code Elimination)和激进的死代码删除(ADCE,Aggressive DCE)
死代码删除(DCE,Dead Code Elimination)和激进的死代码删除(ADCE,Aggressive DCE)死代码删除(DCE,Dead Code Elimination)DCE简介DCE基本算法激进的死代码删除࿰…...
询问new bing关于android开发的15个问题(前景、未来、发展方向)
前言:new bing是基于chat-gpt的新搜索工具,可以采用对话方式进行问题搜索,经过排队等候终于可以使用new bing,询问了目前我最关心的关于android开发几个问题 文章目录1.如何学好android开发?2.android开发能做什么?3.…...
【C++】初识类和对象
🏖️作者:malloc不出对象 ⛺专栏:C的学习之路 👦个人简介:一名双非本科院校大二在读的科班编程菜鸟,努力编程只为赶上各位大佬的步伐🙈🙈 目录前言一、面向过程和面向对象初步认识二…...
EPICS S7nodave手册
第一章:介绍 本手册分为6章(不算次介绍部分)。第一章介绍s7nodave用于EPICS的设备支持的概念和特新。第二章描述启动一个使用s7nodave的IOC项目所需要的几步。第三章描述s7nodave支持的IOC shell命令。之后,第四章解释s7nodave支持的各种记录类型。最后…...
2023最新版本RabbitMQ的持久化和简单使用
上节讲了 RabbitMQ下载安装教程 , 本节主要介绍RabbitMQ的持久化和简单使用。 一、RabbitMQ消息持久化 当处理一个比较耗时得任务的时候,也许想知道消费者(consumers)是否运行到一半就挂掉。在当前的代码中,当RabbitM…...
函数式编程
函数式编程(一) 文章目录函数式编程(一)1. 前言1.1 概念2. Lambda 表达式2.1 概述2.2 基本的格式2.3 触发条件2.4 Lambda表达式2.4.1 无参无返回值2.4.2 有参无返回值2.4.3 无参数有返回值2.4.4 有参有返回值【重点】2.4.4.1 比较…...
【Java 类】001-访问修饰符、命名规范
【Java 类】001-访问修饰符、命名规范 文章目录【Java 类】001-访问修饰符、命名规范一、访问修饰符概述1、是什么2、作用作用问题3、访问修饰符有哪些4、作用对象二、访问修饰符使用演示1、类访问修饰符演示第一步:创建 Dog 类:public第二步:…...
【C++】命名空间
🏖️作者:malloc不出对象 ⛺专栏:C的学习之路 👦个人简介:一名双非本科院校大二在读的科班编程菜鸟,努力编程只为赶上各位大佬的步伐🙈🙈 目录前言一、命名空间产生的背景二、命名空…...
【AutoSAR】【MCAL】Dio
一、结构 二、功能介绍 DIO(数字输入输出)驱动模块主要是对端口(Port),通道(Channel)和通道组(ChannelGroup)进行读写操作。 通道(Channel)&…...
瑞吉外卖——day2
目录 一、新增员工 二、查询分页数据 三、启用、禁用员工账户、编辑员工信息 一、新增员工 点击左上角新增员工 页面如下: 我们随便填数据 ,点击保存,请求的地址如下 返回前端可以看到请求方式为Post 在employeeController中编写对应的代…...
了解java
#常见编程语言介绍 C语言 C语言 java语言 javaScript语言 PHP语言 python语言Object-C和Swift语言 C# (c sharp)语言 Kotlin语言 Go语言 Basic语言 #JAVA的发展 起源于1991年SUN公司GREEN项目,1996年JDK1.0正式发布 后被Oracle公司收购&…...
【编程实践】代码之中有创意:“我一直认为工程师世界上最具创造性的工作之一”
代码之中有创意 “我一直认为工程师世界上最具创造性的工作之一”。 文章目录 代码之中有创意一、代码可以赋予创造力1.1 代码的创造力1.2 如何发挥代码的创造力二、有创意的代码可以提高工作效率2.1 代码创意可以提高工作效率2.2 如何利用代码创意来提高工作效率三、代码创意可…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
Frozen-Flask :将 Flask 应用“冻结”为静态文件
Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是:将一个 Flask Web 应用生成成纯静态 HTML 文件,从而可以部署到静态网站托管服务上,如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
