elasticsearch源码分析-08Serch查询流程
Serch查询流程
查询请求Rest路由注册也是在actionModule中
//查询操作
registerHandler.accept(new RestSearchAction());@Override
public List<Route> routes() {return unmodifiableList(asList(new Route(GET, "/_search"),new Route(POST, "/_search"),new Route(GET, "/{index}/_search"),new Route(POST, "/{index}/_search"),// Deprecated typed endpoints.new Route(GET, "/{index}/{type}/_search"),new Route(POST, "/{index}/{type}/_search")));
}
和Get查询一样,请求会经过转发dispatchRequest,查询路由对应的handler处理类,然后执行处理
@Overridepublic void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {//小图标请求处理if (request.rawPath().equals("/favicon.ico")) {handleFavicon(request.method(), request.uri(), channel);return;}try {//处理rest请求tryAllHandlers(request, channel, threadContext);} catch (Exception e) {try {channel.sendResponse(new BytesRestResponse(channel, e));} catch (Exception inner) {inner.addSuppressed(e);logger.error(() ->new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);}}}
调用BaseRestHandler的handleRequest方法
@Overridepublic final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {// prepare the request for execution; has the side effect of touching the request parameters//处理请求final RestChannelConsumer action = prepareRequest(request, client);// validate unconsumed params, but we must exclude params used to format the response// use a sorted set so the unconsumed parameters appear in a reliable sorted orderfinal SortedSet<String> unconsumedParams =request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));// validate the non-response paramsif (!unconsumedParams.isEmpty()) {final Set<String> candidateParams = new HashSet<>();candidateParams.addAll(request.consumedParams());candidateParams.addAll(responseParams());throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));}if (request.hasContent() && request.isContentConsumed() == false) {throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");}usageCount.increment();// execute the actionaction.accept(channel);}
首先构造请求request对象,RestSearchAction对该方法进行了重写
@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {//创建查询请求对象SearchRequest searchRequest = new SearchRequest();IntConsumer setSize = size -> searchRequest.source().size(size);request.withContentOrSourceParamParserOrNull(parser ->parseSearchRequest(searchRequest, request, parser, setSize));return channel -> {RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));};}
首先根据请求的request解析成查询请求
public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,XContentParser requestContentParser,IntConsumer setSize) throws IOException {//查询源包含哪些字段,为空查询所有字段if (searchRequest.source() == null) {searchRequest.source(new SearchSourceBuilder());}//查询索引searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));if (requestContentParser != null) {searchRequest.source().parseXContent(requestContentParser, true);}//协调节点的保护,从每个shard获取的数据最大数据量final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());searchRequest.setBatchedReduceSize(batchedReduceSize);//预过滤分片数量if (request.hasParam("pre_filter_shard_size")) {searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE));}//分片并发查询数量if (request.hasParam("max_concurrent_shard_requests")) {// only set if we have the parameter since we auto adjust the max concurrency on the coordinator// based on the number of nodes in the clusterfinal int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests",searchRequest.getMaxConcurrentShardRequests());searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);}//允许部分搜索结果if (request.hasParam("allow_partial_search_results")) {// only set if we have the parameter passed to override the cluster-level defaultsearchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));}// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types// from the REST layer. these modes are an internal optimization and should// not be specified explicitly by the user.//搜索类型,默认query_then_fetchString searchType = request.param("search_type");//下面两种查询已经被废弃if ("query_and_fetch".equals(searchType) ||"dfs_query_and_fetch".equals(searchType)) {throw new IllegalArgumentException("Unsupported search type [" + searchType + "]");} else {searchRequest.searchType(searchType);}parseSearchSource(searchRequest.source(), request, setSize);//查询缓存searchRequest.requestCache(request.paramAsBoolean("request_cache", searchRequest.requestCache()));//滚动查询String scroll = request.param("scroll");if (scroll != null) {searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, "scroll")));}//type字段已废弃if (request.hasParam("type")) {deprecationLogger.deprecatedAndMaybeLog("search_with_types", TYPES_DEPRECATION_MESSAGE);searchRequest.types(Strings.splitStringByCommaToArray(request.param("type")));}searchRequest.routing(request.param("routing"));searchRequest.preference(request.param("preference"));searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));checkRestTotalHits(request, searchRequest);}private static void parseSearchSource(final SearchSourceBuilder searchSourceBuilder, RestRequest request, IntConsumer setSize) {QueryBuilder queryBuilder = RestActions.urlParamsToQueryBuilder(request);if (queryBuilder != null) {searchSourceBuilder.query(queryBuilder);}//分页查询int from = request.paramAsInt("from", -1);if (from != -1) {searchSourceBuilder.from(from);}int size = request.paramAsInt("size", -1);if (size != -1) {setSize.accept(size);}//特定文档如何与查询匹配的详细信息和解释if (request.hasParam("explain")) {searchSourceBuilder.explain(request.paramAsBoolean("explain", null));}//带版本号查询if (request.hasParam("version")) {searchSourceBuilder.version(request.paramAsBoolean("version", null));}//是否返回seqNo和primaryTermif (request.hasParam("seq_no_primary_term")) {searchSourceBuilder.seqNoAndPrimaryTerm(request.paramAsBoolean("seq_no_primary_term", null));}//超时时间if (request.hasParam("timeout")) {searchSourceBuilder.timeout(request.paramAsTime("timeout", null));}//收集后终止搜索if (request.hasParam("terminate_after")) {int terminateAfter = request.paramAsInt("terminate_after",SearchContext.DEFAULT_TERMINATE_AFTER);if (terminateAfter < 0) {throw new IllegalArgumentException("terminateAfter must be > 0");} else if (terminateAfter > 0) {searchSourceBuilder.terminateAfter(terminateAfter);}}//指示应如何获取存储的字段StoredFieldsContext storedFieldsContext =StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request);if (storedFieldsContext != null) {searchSourceBuilder.storedFields(storedFieldsContext);}//从doc_value字段查询String sDocValueFields = request.param("docvalue_fields");if (sDocValueFields != null) {if (Strings.hasText(sDocValueFields)) {String[] sFields = Strings.splitStringByCommaToArray(sDocValueFields);for (String field : sFields) {searchSourceBuilder.docValueField(field, null);}}}//从_source获取结果FetchSourceContext fetchSourceContext = FetchSourceContext.parseFromRestRequest(request);if (fetchSourceContext != null) {searchSourceBuilder.fetchSource(fetchSourceContext);}//在排序时应用,并控制是否也会跟踪分数if (request.hasParam("track_scores")) {searchSourceBuilder.trackScores(request.paramAsBoolean("track_scores", false));}//如果值未设置,则返回应跟踪的总命中数或 nullif (request.hasParam("track_total_hits")) {if (Booleans.isBoolean(request.param("track_total_hits"))) {searchSourceBuilder.trackTotalHits(request.paramAsBoolean("track_total_hits", true));} else {searchSourceBuilder.trackTotalHitsUpTo(request.paramAsInt("track_total_hits", SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO));}}//排序字段String sSorts = request.param("sort");if (sSorts != null) {String[] sorts = Strings.splitStringByCommaToArray(sSorts);for (String sort : sorts) {int delimiter = sort.lastIndexOf(":");if (delimiter != -1) {String sortField = sort.substring(0, delimiter);String reverse = sort.substring(delimiter + 1);if ("asc".equals(reverse)) {searchSourceBuilder.sort(sortField, SortOrder.ASC);} else if ("desc".equals(reverse)) {searchSourceBuilder.sort(sortField, SortOrder.DESC);}} else {searchSourceBuilder.sort(sort);}}}//此请求将汇总的统计信息组String sStats = request.param("stats");if (sStats != null) {searchSourceBuilder.stats(Arrays.asList(Strings.splitStringByCommaToArray(sStats)));}//推荐词String suggestField = request.param("suggest_field");if (suggestField != null) {String suggestText = request.param("suggest_text", request.param("q"));int suggestSize = request.paramAsInt("suggest_size", 5);String suggestMode = request.param("suggest_mode");searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(suggestField,termSuggestion(suggestField).text(suggestText).size(suggestSize).suggestMode(SuggestMode.resolve(suggestMode))));}}
创建rest client开始发送请求
return channel -> {RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));};
执行execute方法
public < Request extends ActionRequest,Response extends ActionResponse> Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {//获取transportAction开始执行return transportAction(action).execute(request, listener);}
最终调用到TransportSearchAction的doExecute方法
search操作会发送请求到索引下的所有分片,相同分片只会发送到主分片或副本分片
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {//服务启动相对时间final long relativeStartNanos = System.nanoTime();final SearchTimeProvider timeProvider =new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {if (source != searchRequest.source()) {// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch// situations when source is rewritten to null due to a bugsearchRequest.source(source);}//对其他集群的索引和当前集群的索引分组,合并本集群shard列表和远程集群的shardfinal ClusterState clusterState = clusterService.state();final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);//执行本集群搜索if (remoteClusterIndices.isEmpty()) {executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener);} else {//最小化查询次数if (shouldMinimizeRoundtrips(searchRequest)) {ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider,searchService.aggReduceContextBuilder(searchRequest),remoteClusterService, threadPool, listener,(r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));} else {AtomicInteger skippedClusters = new AtomicInteger(0);//收集所有需要查询的分片collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,ActionListener.wrap(//查询分片返回的结果searchShardsResponses -> {List<SearchShardIterator> remoteShardIterators = new ArrayList<>();Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);//查询集群数int localClusters = localIndices == null ? 0 : 1;int totalClusters = remoteClusterIndices.size() + localClusters;int successfulClusters = searchShardsResponses.size() + localClusters;//执行查询executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));},listener::onFailure));}}}, listener::onFailure);if (searchRequest.source() == null) {rewriteListener.onResponse(searchRequest.source());} else {Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),rewriteListener);}}
这里收集本集群和跨集群及其他集群需要查询的索引,这里我们主要是分析本集群搜索
ES中查询主要包括两个方式:
- QUERY_THEN_FETCH:默认搜索方式, 先向所有的 shard 发出请求, 各分片只返回文档 id和排
名相关的信息及文档的打分然后按照各分片返回的文档的分数进行重新排序和排名, 取前
size 个文档。然后根据文档 id 去相关的 shard 取 document。 数据量是准确的,数据排名不准确 - DFS_QUERY_THEN_FETCH:在上面的查询方式之前多个一个DFS的过程,也就是在进行查询之前, 先对所有分片发送请求, 把所有分片中的词频和文档频率等打分依据全部汇总到一块, 再执行后面的操作,数据排名准确,但是性能比上面性能要差
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,SearchResponse.Clusters clusters) {clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead// of just for the _search apifinal Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider);Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),searchRequest.indices());routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);String[] concreteIndices = new String[indices.length];for (int i = 0; i < indices.length; i++) {concreteIndices[i] = indices[i].getName();}//每个节点查询次数Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);//合并需要查询的shard分片GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,searchRequest.getLocalClusterAlias(), remoteShardIterators);//检查需要查询分片数量是否超过阈值failIfOverShardCountLimit(clusterService, shardIterators.size());Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);// optimize search type for cases where there is only one shard group to search on//针对只有一个分片组要搜索的情况优化搜索类型if (shardIterators.size() == 1) {// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shardsearchRequest.searchType(QUERY_THEN_FETCH);}if (searchRequest.allowPartialSearchResults() == null) {// No user preference defined in search request - apply cluster service default//允许部分搜索结果返回searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());}if (searchRequest.isSuggestOnly()) {// disable request cache if we have only suggest// 如果我们只有建议,则禁用请求缓存searchRequest.requestCache(false);switch (searchRequest.searchType()) {case DFS_QUERY_THEN_FETCH:// convert to Q_T_F if we have only suggestsearchRequest.searchType(QUERY_THEN_FETCH);break;}}final DiscoveryNodes nodes = clusterState.nodes();BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),nodes::get, remoteConnections, searchTransportService::getConnection);//提前过滤分片boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, indices, shardIterators.size());//异步查询searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState,Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();}
根据两个搜索方式创建不同的action
AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction;switch (searchRequest.searchType()) {case DFS_QUERY_THEN_FETCH:searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,shardIterators, timeProvider, clusterState, task, clusters);break;case QUERY_THEN_FETCH:searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, /*查询后的回调*/listener,shardIterators, timeProvider, clusterState, task, clusters);break;default:throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");}return searchAsyncAction;
创建action后开始执行
public final void start() {if (getNumShards() == 0) {//no search shards to search on, bail with empty response//(it happens with search across _all with no indices around and consistent with broadcast operations)int trackTotalHitsUpTo = request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :request.source().trackTotalHitsUpTo() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :request.source().trackTotalHitsUpTo();// total hits is null in the response if the tracking of total hits is disabledboolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED;listener.onResponse(new SearchResponse(InternalSearchResponse.empty(withTotalHits), null, 0, 0, 0, buildTookInMillis(),ShardSearchFailure.EMPTY_ARRAY, clusters));return;}//执行搜索executePhase(this);
}private void executePhase(SearchPhase phase) {try {phase.run();} catch (Exception e) {if (logger.isDebugEnabled()) {logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);}onPhaseFailure(phase, "", e);}
}
遍历所有分片发送请求,并设置回调函数接收分片返回结果
public final void run() {for (final SearchShardIterator iterator : toSkipShardsIts) {assert iterator.skip();skipShard(iterator);}if (shardsIts.size() > 0) {...for (int index = 0; index < shardsIts.size(); index++) {//分片路由信息final SearchShardIterator shardRoutings = shardsIts.get(index);assert shardRoutings.skip() == false;//分片执行performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());}}}private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {if (shard == null) {fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));} else {//节流并发请求final PendingExecutions pendingExecutions = throttleConcurrentRequests ?pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode)): null;Runnable r = () -> {final Thread thread = Thread.currentThread();try {//执行查询executePhaseOnShard(shardIt, shard,new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {@Overridepublic void innerOnResponse(Result result) {try {//分片返回结果onShardResult(result, shardIt);} finally {executeNext(pendingExecutions, thread);}}@Overridepublic void onFailure(Exception t) {try {onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);} finally {executeNext(pendingExecutions, thread);}}});} catch (final Exception e) {try { fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));} finally {executeNext(pendingExecutions, thread);}}};//是否限流if (throttleConcurrentRequests) {pendingExecutions.tryRun(r);} else {r.run();}}}
默认为QUERY_THEN_FETCH方式,调用SearchQueryThenFetchAsyncAction的executePhaseOnShard方法
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,final SearchActionListener<SearchPhaseResult> listener) {//封装请求ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),request, getTask(), listener);}
获取连接然后发送action为indices:data/read/search[phase/query]的RPC请求
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchRequest request, SearchTask task,final SearchActionListener<SearchPhaseResult> listener) {// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request// this used to be the QUERY_AND_FETCH which doesn't exist anymore.final boolean fetchDocuments = request.numberOfShards() == 1;Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;final ActionListener handler = responseWrapper.apply(connection, listener);//发送查询transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId()));}
- 数据节点
数据节点对应的RPC注册的处理handler
//注册分片查询处理transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new,(request, channel, task) -> {//执行查询searchService.executeQueryPhase(request, (SearchShardTask) task,/*查询后回调*/new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request));});
数据节点接收RPC请求执行query阶段
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1: "empty responses require more than one shard";//索引服务IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//索引分片IndexShard shard = indexService.getShard(request.shardId().id());rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {@Overridepublic void onResponse(ShardSearchRequest orig) {//如果没有查询到doc可以返回nullif (orig.canReturnNullResponseIfMatchNoDocs()) { ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig);try (Engine.Searcher searcher = shard.acquireCanMatchSearcher()) {QueryShardContext context = indexService.newQueryShardContext(canMatchRequest.shardId().id(), searcher,canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias());Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true);} catch (Exception exc) {listener.onFailure(exc);return;}if (canRewriteToMatchNone(canMatchRequest.source())&& canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) {assert canMatchRequest.scroll() == null : "must always create search context for scroll requests";listener.onResponse(QuerySearchResult.nullInstance());return;}}// fork the execution in the search thread pool//在搜索线程池中执行runAsync(shard, () -> executeQueryPhase(orig, task), listener);}@Overridepublic void onFailure(Exception exc) {listener.onFailure(exc);}});}
执行search操作
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {final SearchContext context = createAndPutContext(request, task);context.incRef();try {final long afterQueryTime;try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {contextProcessing(context);//执行查询阶段loadOrExecuteQueryPhase(request, context);if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {freeContext(context.id());} else {contextProcessedSuccessfully(context);}afterQueryTime = executor.success();}//如果只有一个分片,执行fetch阶段if (request.numberOfShards() == 1) {return executeFetchPhase(context, afterQueryTime);}return context.queryResult();} catch (Exception e) {// execution exception can happen while loading the cache, strip itif (e instanceof ExecutionException) {e = (e.getCause() == null || e.getCause() instanceof Exception) ?(Exception) e.getCause() : new ElasticsearchException(e.getCause());}logger.trace("Query phase failed", e);processFailure(context, e);throw e;} finally {cleanContext(context);}}
创建查询上下文对象SearchContext跟踪整个search流程,并且查询后的结果也会填充到SearchContext中
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {final boolean canCache = indicesService.canCache(request, context);context.getQueryShardContext().freezeContext();if (canCache) {indicesService.loadIntoContext(request, context, queryPhase);} else {//执行查询queryPhase.execute(context);}}public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {if (searchContext.hasOnlySuggest()) {suggestPhase.execute(searchContext);searchContext.queryResult().topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),new DocValueFormat[0]);return;}if (LOGGER.isTraceEnabled()) {LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));}//尽可能晚地预处理聚合。在 DFS_Q_T_F 的情况下// 请求,preProcess 在 DFS 阶段调用,这就是我们预处理它们的原因//这里是为了确保它发生在QUERY阶段aggregationPhase.preProcess(searchContext);//lucene搜索boolean rescore = executeInternal(searchContext);if (rescore) { // only if we do a regular search//重新计算打分rescorePhase.execute(searchContext);}//纠错suggestPhase.execute(searchContext);//聚合aggregationPhase.execute(searchContext);if (searchContext.getProfilers() != null) {ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());searchContext.queryResult().profileResults(shardResults);}}
这里可以看到首先会执行lucene的查询,然后对查询结果进行打分、执行聚合逻辑,然后生成SearchPhaseResult返回
数据节点查询数据后返回协调节点,我们继续回到executePhaseOnShard方法执行回调
//执行查询
executePhaseOnShard(shardIt, shard,new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {@Overridepublic void innerOnResponse(Result result) {try {//分片返回结果onShardResult(result, shardIt);} finally {executeNext(pendingExecutions, thread);}}@Overridepublic void onFailure(Exception t) {try {onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);} finally {executeNext(pendingExecutions, thread);}}});
保存分片查询返回的数据到results数组中并进行计数
protected void onShardResult(Result result, SearchShardIterator shardIt) {assert result.getShardIndex() != -1 : "shard index is not set";assert result.getSearchShardTarget() != null : "search shard target must not be null";successfulOps.incrementAndGet();results.consumeResult(result);hasShardResponse.set(true);if (logger.isTraceEnabled()) {logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);}AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();if (shardFailures != null) {shardFailures.set(result.getShardIndex(), null);}successfulShardExecution(shardIt);}
判断所有分片查询是否都已经返回,如果都已经返回则执行下一阶段及fetch阶段
private void successfulShardExecution(SearchShardIterator shardsIt) {final int remainingOpsOnIterator;if (shardsIt.skip()) {remainingOpsOnIterator = shardsIt.remaining();} else {remainingOpsOnIterator = shardsIt.remaining() + 1;}final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);//所有分片都已经返回if (xTotalOps == expectedTotalOps) {onPhaseDone();} else if (xTotalOps > expectedTotalOps) {throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["+ expectedTotalOps + "]");}}final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()executeNextPhase(this, getNextPhase(results, this));}@Overrideprotected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {//获取数据阶段return new FetchSearchPhase(results, searchPhaseController, context, clusterState());}FetchSearchPhase(SearchPhaseResults<SearchPhaseResult> resultConsumer,SearchPhaseController searchPhaseController,SearchPhaseContext context,ClusterState clusterState) {this(resultConsumer, searchPhaseController, context, clusterState,//最后收尾阶段(response, scrollId) -> new ExpandSearchPhase(context, response, scrollId));}
- fetch阶段
@Overridepublic void run() {context.execute(new AbstractRunnable() {@Overrideprotected void doRun() throws Exception {// we do the heavy lifting in this inner run method where we reduce aggs etc. that's why we fork this phase// off immediately instead of forking when we send back the response to the user since there we only need// to merge together the fetched results which is a linear operation.innerRun();}@Overridepublic void onFailure(Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}});}private void innerRun() throws IOException {//查询分片数量final int numShards = context.getNumShards();//是否是滚动查询final boolean isScrollSearch = context.getRequest().scroll() != null;//search阶段返回结果final List<SearchPhaseResult> phaseResults = queryResults.asList();final String scrollId;//如果是滚动查询则生成scrollIdif (isScrollSearch) {final boolean includeContextUUID = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_7_7_0);scrollId = TransportSearchHelper.buildScrollId(queryResults, includeContextUUID);} else {scrollId = null;}final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();final boolean queryAndFetchOptimization = queryResults.length() == 1;//fetch结束后执行任务final Runnable finishPhase = ()-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?queryResults : fetchResults);if (queryAndFetchOptimization) {assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()+ "], single result: " + phaseResults.get(0).fetchResult();// query AND fetch optimizationfinishPhase.run();} else {ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs);// no docs to fetch -- sidestep everything and return//无文档执行fetchif (scoreDocs.length == 0) {// we have to release contexts here to free up resourcesphaseResults.stream().map(SearchPhaseResult::queryResult).forEach(this::releaseIrrelevantSearchContext);finishPhase.run();} else {final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards): null;final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or notfinishPhase, context);for (int i = 0; i < docIdsToLoad.length; i++) {IntArrayList entry = docIdsToLoad[i];SearchPhaseResult queryResult = queryResults.get(i);if (entry == null) { // no results for this shard IDif (queryResult != null) {// if we got some hits from this shard we have to release the context there// we do this as we go since it will free up resources and passing on the request on the// transport layer is cheap.releaseIrrelevantSearchContext(queryResult.queryResult());progressListener.notifyFetchResult(i);}// in any case we count down this result since we don't talk to this shard anymorecounter.countDown();} else {SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),searchShardTarget.getNodeId());//创建fetch请求ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getContextId(), i, entry,lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());//执行fetchexecuteFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),connection);}}}}}
创建fetch请求,执行fetch
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,final CountedCollector<FetchSearchResult> counter,final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,final Transport.Connection connection) {//发送fetch请求context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {@Overridepublic void innerOnResponse(FetchSearchResult result) {try {progressListener.notifyFetchResult(shardIndex);counter.onResult(result);} catch (Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}}@Overridepublic void onFailure(Exception e) {try {logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.contextId()), e);progressListener.notifyFetchFailure(shardIndex, shardTarget, e);counter.onFailure(shardIndex, shardTarget, e);} finally {releaseIrrelevantSearchContext(querySearchResult);}}});}
发送RPC为indices:data/read/search[phase/fetch/id]的fetch请求
public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task,final SearchActionListener<FetchSearchResult> listener) {sendExecuteFetch(connection, FETCH_ID_ACTION_NAME, request, task, listener);}
数据节点注册的handler
//fetch请求transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,(request, channel, task) -> {//执行请求searchService.executeFetchPhase(request, (SearchShardTask) task,new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request));});
- 数据节点执行fetch阶段
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {runAsync(request.contextId(), () -> {final SearchContext context = findContext(request.contextId(), request);context.incRef();try {context.setTask(task);contextProcessing(context);if (request.lastEmittedDoc() != null) {context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();}context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) {//fetch查询fetchPhase.execute(context);if (fetchPhaseShouldFreeContext(context)) {freeContext(request.contextId());} else {contextProcessedSuccessfully(context);}executor.success();}//返回结果return context.fetchResult();} catch (Exception e) {logger.trace("Fetch phase failed", e);processFailure(context, e);throw e;} finally {cleanContext(context);}}, listener);}
遍历文档id集合,查询详细信息
@Overridepublic void execute(SearchContext context) {if (LOGGER.isTraceEnabled()) {LOGGER.trace("{}", new SearchContextSourcePrinter(context));}final FieldsVisitor fieldsVisitor;Map<String, Set<String>> storedToRequestedFields = new HashMap<>();StoredFieldsContext storedFieldsContext = context.storedFieldsContext();if (storedFieldsContext == null) {// no fields specified, default to return source if no explicit indicationif (!context.hasScriptFields() && !context.hasFetchSourceContext()) {context.fetchSourceContext(new FetchSourceContext(true));}fieldsVisitor = new FieldsVisitor(context.sourceRequested());} else if (storedFieldsContext.fetchFields() == false) {// disable stored fields entirelyfieldsVisitor = null;} else {for (String fieldNameOrPattern : context.storedFieldsContext().fieldNames()) {if (fieldNameOrPattern.equals(SourceFieldMapper.NAME)) {FetchSourceContext fetchSourceContext = context.hasFetchSourceContext() ? context.fetchSourceContext(): FetchSourceContext.FETCH_SOURCE;context.fetchSourceContext(new FetchSourceContext(true, fetchSourceContext.includes(), fetchSourceContext.excludes()));continue;}Collection<String> fieldNames = context.mapperService().simpleMatchToFullName(fieldNameOrPattern);for (String fieldName : fieldNames) {MappedFieldType fieldType = context.smartNameFieldType(fieldName);if (fieldType == null) {// Only fail if we know it is a object field, missing paths / fields shouldn't fail.if (context.getObjectMapper(fieldName) != null) {throw new IllegalArgumentException("field [" + fieldName + "] isn't a leaf field");}} else {String storedField = fieldType.name();Set<String> requestedFields = storedToRequestedFields.computeIfAbsent(storedField, key -> new HashSet<>());requestedFields.add(fieldName);}}}boolean loadSource = context.sourceRequested();if (storedToRequestedFields.isEmpty()) {// empty list specified, default to disable _source if no explicit indicationfieldsVisitor = new FieldsVisitor(loadSource);} else {fieldsVisitor = new CustomFieldsVisitor(storedToRequestedFields.keySet(), loadSource);}}try {SearchHit[] hits = new SearchHit[context.docIdsToLoadSize()];FetchSubPhase.HitContext hitContext = new FetchSubPhase.HitContext();for (int index = 0; index < context.docIdsToLoadSize(); index++) {if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}//文档idint docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());LeafReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);int subDocId = docId - subReaderContext.docBase;final SearchHit searchHit;int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);if (rootDocId != -1) {searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId,storedToRequestedFields, subReaderContext);} else {searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId,storedToRequestedFields, subReaderContext);}hits[index] = searchHit;hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher());//执行各种元数据获取for (FetchSubPhase fetchSubPhase : fetchSubPhases) {fetchSubPhase.hitExecute(context, hitContext);}}if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}//执行各种元数据获取for (FetchSubPhase fetchSubPhase : fetchSubPhases) {fetchSubPhase.hitsExecute(context, hits);if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}}TotalHits totalHits = context.queryResult().getTotalHits();context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));} catch (IOException e) {throw ExceptionsHelper.convertToElastic(e);}}private SearchHit createSearchHit(SearchContext context,FieldsVisitor fieldsVisitor,int docId,int subDocId,Map<String, Set<String>> storedToRequestedFields,LeafReaderContext subReaderContext) {DocumentMapper documentMapper = context.mapperService().documentMapper();Text typeText = documentMapper.typeText();if (fieldsVisitor == null) {return new SearchHit(docId, null, typeText, null, null);}//查询lucene保存的字段Map<String, DocumentField> searchFields = getSearchFields(context, fieldsVisitor, subDocId,storedToRequestedFields, subReaderContext);Map<String, DocumentField> metaFields = new HashMap<>();Map<String, DocumentField> documentFields = new HashMap<>();SearchHit.splitFieldsByMetadata(searchFields, documentFields, metaFields);SearchHit searchHit = new SearchHit(docId, fieldsVisitor.uid().id(), typeText, documentFields, metaFields);// Set _source if requested.SourceLookup sourceLookup = context.lookup().source();sourceLookup.setSegmentAndDocument(subReaderContext, subDocId);if (fieldsVisitor.source() != null) {sourceLookup.setSource(fieldsVisitor.source());}return searchHit;}
数据节点fetch数据后返回,协调节点。继续回到executeFetch方法的回调函数
@Override
public void innerOnResponse(FetchSearchResult result) {try {progressListener.notifyFetchResult(shardIndex);counter.onResult(result);} catch (Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}
}
将返回的数据填充到fetchResults中,最后执行finish阶段
final Runnable finishPhase = ()-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?queryResults : fetchResults);private void moveToNextPhase(SearchPhaseController searchPhaseController,String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);context.executeNextPhase(this, nextPhaseFactory.apply(internalResponse, scrollId));}
执行ExpandSearchPhase的run方法
@Overridepublic void run() {if (isCollapseRequest() && searchResponse.hits().getHits().length > 0) {SearchRequest searchRequest = context.getRequest();CollapseBuilder collapseBuilder = searchRequest.source().collapse();final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();MultiSearchRequest multiRequest = new MultiSearchRequest();if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());}for (SearchHit hit : searchResponse.hits().getHits()) {BoolQueryBuilder groupQuery = new BoolQueryBuilder();Object collapseValue = hit.field(collapseBuilder.getField()).getValue();if (collapseValue != null) {groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));} else {groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));}QueryBuilder origQuery = searchRequest.source().query();if (origQuery != null) {groupQuery.must(origQuery);}for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder).query(groupQuery).postFilter(searchRequest.source().postFilter());SearchRequest groupRequest = new SearchRequest(searchRequest);groupRequest.source(sourceBuilder);multiRequest.add(groupRequest);}}context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(),ActionListener.wrap(response -> {Iterator<MultiSearchResponse.Item> it = response.iterator();for (SearchHit hit : searchResponse.hits.getHits()) {for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {MultiSearchResponse.Item item = it.next();if (item.isFailure()) {context.onPhaseFailure(this, "failed to expand hits", item.getFailure());return;}SearchHits innerHits = item.getResponse().getHits();if (hit.getInnerHits() == null) {hit.setInnerHits(new HashMap<>(innerHitBuilders.size()));}hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);}}context.sendSearchResponse(searchResponse, scrollId);}, context::onFailure));} else {//返回查询结果context.sendSearchResponse(searchResponse, scrollId);}}
返回查询结果
@Overridepublic void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {ShardSearchFailure[] failures = buildShardFailures();Boolean allowPartialResults = request.allowPartialSearchResults();assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";if (allowPartialResults == false && failures.length > 0){raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));} else {listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId, failures));}}
这里的listener就是new RestStatusToXContentListener<>(channel)
@Overridepublic RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {assert response.isFragment() == false; //would be nice if we could make default methods finalresponse.toXContent(builder, channel.request());RestResponse restResponse = new BytesRestResponse(response.status(), builder);if (RestStatus.CREATED == restResponse.status()) {final String location = extractLocation.apply(response);if (location != null) {restResponse.addHeader("Location", location);}}return restResponse;}@Overridepublic final RestResponse buildResponse(Response response) throws Exception {return buildResponse(response, channel.newBuilder());}protected final void processResponse(Response response) throws Exception {channel.sendResponse(buildResponse(response));}
最后通过channel将结果返回客户端
相关文章:
elasticsearch源码分析-08Serch查询流程
Serch查询流程 查询请求Rest路由注册也是在actionModule中 //查询操作 registerHandler.accept(new RestSearchAction());Override public List<Route> routes() {return unmodifiableList(asList(new Route(GET, "/_search"),new Route(POST, "/_searc…...
【协作提效 Go - gin ! swagger】
什么是swagger Swagger 是一个用于设计、构建、记录和使用 RESTful Web 服务的工具集。它的主要作用包括: API 文档生成:Swagger 可以自动生成详细的 API 文档,包括每个端点的请求和响应格式、参数、状态码等。这使得开发者和用户可以轻松理…...
栈和队列——3.滑动窗口最大值
力扣题目链接 给定一个数组 nums,有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回滑动窗口中的最大值。 示例: 输入:nums[1,3,-1,-3,5,3,6,7],k 3 …...
嵌入式智能手表开发系列文章之开篇
不好意思,朋友们,我回来了。想想已经断更了好久了。在这段断更的日子里。开拓了个新领域,不搞android 产品,而是去搞嵌入式智能手表啦。 接下来我会用几篇文章来介绍下我对这个领域的看法体会,以及我自己所负责领域的…...
24.8.2数据结构|双链表
双链表 1、定义结构:2个指针域、数据域 2、初始化:创建一个含有N个结点的带头结点双链表head (双链表头结点的前驱与和尾节点的后继与置为空) 3、求表长:返回双链表head的长度 4、取元素:取出双链表head中…...
RabbitMQ高级特性 - 事务消息
文章目录 RabbitMQ 事务消息概述实现原理代码实现不采用事务采用事务 RabbitMQ 事务消息 概述 RabbitMQ 的 AMQP 协议实现了事务机制,允许开发者保证消息的发送和接收时原子性的,也就是说,要么消息全都发送成功,要么全都发送失败…...
leetcode:心算挑战
题目: 心算项目的挑战比赛中,要求选手从N张卡牌中选出cnt张卡牌,若这cnt张卡牌数字总和为偶数,则选手成绩「有效」且得分为cnt张卡牌数字总和。给定数组cards和cnt,其中cards[i]表示第i张卡牌上的数字。 请帮参赛选手计…...
docker部署java项目(war包方式)
场景描述:java项目war包,在开发开电脑上使用dockerfile构建镜像,上传镜像到客户服务器中使用docker加载docker镜像,然后部署。 目录 一、本地环境安装 docker git 二、服务器环境安装 docker 三、构建docker镜像(win系统) 四、注意事项 (1)系统架构 (2)使…...
jsp 自定义taglib
一、简介 我们在javaWeb开发中,经常会用到jsp的taglib标签,有时候并不能满足我们的实际需要,这就需要我们自定义taglib标签, 二、开发步骤 1、编写control方法,继承BodyTagSupport 2、定义zdytaglib.tld标签文件 3、…...
从一到无穷大 #32 TimeCloth,云上的快速 Point-in-Time Recovery
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。 文章目录 引言解决方案FAST FINE-GRAINED PITRLog FilterInter-Record Dependency ResolutionL…...
时间序列论文1——Forecasting at Scale
目录 0. AI总结0.1 文章概述0.2 研究背景0.3 研究思路0.4 研究结论与讨论1. Introduction2 Features of Business Time Series3 The Prophet Forecasting Model3.1 The Trend Model3.2 Seasonality3.3 Holidays and Events3.4 Model Fitting3.5 Analyst-in-the-Loop Modeling4 …...
HDFS常用命令
HDFS常用命令 1.HDFS命令介绍1.1基本语法格式1.2常用命令 1.HDFS命令介绍 HDFS 提供了一组命令行工具,用于管理和操作 HDFS 文件系统。 1.1基本语法格式 hdfs dfs -<命令> [选项] <参数>1.2常用命令 1.显示<path>指定的文件的详细信息。 had…...
请问如何做好软件测试工作呢?
一、明确测试目标和范围 理解测试目的:在开始测试之前,首先要明确测试的目标和范围,确保测试计划 与需求相匹配。这有助于测试人员聚焦在关键功能上,避免浪费时间和资源。制定详细的测试计划:根据项目需求࿰…...
单片机开发与Linux开发的区别
引言 单片机(MCU)和Linux开发是嵌入式系统领域的两大主要方向。它们在硬件平台、开发环境、应用场景和开发难度上存在显著区别。本文将系统性地比较单片机开发和Linux开发,探讨它们的主要区别及各自的应用场景和难度体系。 一、基本概念 1…...
【机器学习】回归类算法-相关性分析
一、前言 前面的几篇博客我们学习了分类算法,今天我们来了解一下回归类的算法吧。首先我们来谈谈两者有什么区别,首先是我们在之前的分类算法,这类算法可以将让我们学会如何将不同的数据划分到不同的类里面,输出的是一些离散的值。…...
java基础 之 集合与栈的使用(三)
文章目录 Map接口(一)实现类:HashMap特点HashMap集合的一些方法 (二)实现类: TreeMap特点【自然排序】代码【定制排序】代码TreeMap集合的一些方法 HashMap 和 TreeMap的区别 前文回顾: 戳这里 …...
JDK-java.nio包详解
JDK-java.nio包详解 概述 一直以来Java三件套(集合、io、多线程)都是最热门的Java基础技术点,我们要深入掌握好这三件套才能在日常开发中得心应手,之前有编写集合相关的文章,这里出一篇文章来梳理一下io相关的知识点。…...
虚拟机与服务器的区别是什么?虚拟机与服务器的区别和联系
服务器和虚拟机是两个不同的概念,它们在计算机领域有着不同的含义和作用。今天飞飞就和你分享虚拟机和服务器的区别和联系,希望可以帮助到你~ 1、物理形态 a)服务器是实实在在的物理设备,拥有独立的硬件架构。如CPU、硬盘、内存等 b)虚拟机…...
Linux CentOS stream9 命令
初学linux,对字符界面的命令并不陌生。问到什么是linux命令直接答cd、pwd、ls是linux命令。对于命令的定义并熟悉,也不太关心命令的底层执行逻辑,更关心录入命令,马上获取需要的结果。 本文就命令的定义、分类或执行优先级作一简单介绍。 一、定义 搜索网上对linux命令的…...
JavaScript基础——JavaScript变量声明
变量是存储数据的容器,可以变的量,值可以改变,在JavaScript中,变量声明的关键字有var、let,其中,var是ES5的语法,let是ES6的语法,变量需要先声明,在使用。 声明一个age变…...
如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明
AI 领域的快速发展正在催生一个新时代,智能代理(agents)不再是孤立的个体,而是能够像一个数字团队一样协作。然而,当前 AI 生态系统的碎片化阻碍了这一愿景的实现,导致了“AI 巴别塔问题”——不同代理之间…...
C#中的CLR属性、依赖属性与附加属性
CLR属性的主要特征 封装性: 隐藏字段的实现细节 提供对字段的受控访问 访问控制: 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性: 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑: 可以…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
Web中间件--tomcat学习
Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机,它可以执行Java字节码。Java虚拟机是Java平台的一部分,Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...
给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...
解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用
在工业制造领域,无损检测(NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统,以非接触式光学麦克风技术为核心,打破传统检测瓶颈,为半导体、航空航天、汽车制造等行业提供了高灵敏…...
DBLP数据库是什么?
DBLP(Digital Bibliography & Library Project)Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高,数据库文献更新速度很快,很好地反映了国际计算机科学学术研…...
6个月Python学习计划 Day 16 - 面向对象编程(OOP)基础
第三周 Day 3 🎯 今日目标 理解类(class)和对象(object)的关系学会定义类的属性、方法和构造函数(init)掌握对象的创建与使用初识封装、继承和多态的基本概念(预告) &a…...
Python网页自动化Selenium中文文档
1. 安装 1.1. 安装 Selenium Python bindings 提供了一个简单的API,让你使用Selenium WebDriver来编写功能/校验测试。 通过Selenium Python的API,你可以非常直观的使用Selenium WebDriver的所有功能。 Selenium Python bindings 使用非常简洁方便的A…...
