elasticsearch源码分析-08Serch查询流程
Serch查询流程
查询请求Rest路由注册也是在actionModule中
//查询操作
registerHandler.accept(new RestSearchAction());@Override
public List<Route> routes() {return unmodifiableList(asList(new Route(GET, "/_search"),new Route(POST, "/_search"),new Route(GET, "/{index}/_search"),new Route(POST, "/{index}/_search"),// Deprecated typed endpoints.new Route(GET, "/{index}/{type}/_search"),new Route(POST, "/{index}/{type}/_search")));
}
和Get查询一样,请求会经过转发dispatchRequest,查询路由对应的handler处理类,然后执行处理
@Overridepublic void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {//小图标请求处理if (request.rawPath().equals("/favicon.ico")) {handleFavicon(request.method(), request.uri(), channel);return;}try {//处理rest请求tryAllHandlers(request, channel, threadContext);} catch (Exception e) {try {channel.sendResponse(new BytesRestResponse(channel, e));} catch (Exception inner) {inner.addSuppressed(e);logger.error(() ->new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);}}}
调用BaseRestHandler的handleRequest方法
@Overridepublic final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {// prepare the request for execution; has the side effect of touching the request parameters//处理请求final RestChannelConsumer action = prepareRequest(request, client);// validate unconsumed params, but we must exclude params used to format the response// use a sorted set so the unconsumed parameters appear in a reliable sorted orderfinal SortedSet<String> unconsumedParams =request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));// validate the non-response paramsif (!unconsumedParams.isEmpty()) {final Set<String> candidateParams = new HashSet<>();candidateParams.addAll(request.consumedParams());candidateParams.addAll(responseParams());throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));}if (request.hasContent() && request.isContentConsumed() == false) {throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");}usageCount.increment();// execute the actionaction.accept(channel);}
首先构造请求request对象,RestSearchAction对该方法进行了重写
@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {//创建查询请求对象SearchRequest searchRequest = new SearchRequest();IntConsumer setSize = size -> searchRequest.source().size(size);request.withContentOrSourceParamParserOrNull(parser ->parseSearchRequest(searchRequest, request, parser, setSize));return channel -> {RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));};}
首先根据请求的request解析成查询请求
public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,XContentParser requestContentParser,IntConsumer setSize) throws IOException {//查询源包含哪些字段,为空查询所有字段if (searchRequest.source() == null) {searchRequest.source(new SearchSourceBuilder());}//查询索引searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));if (requestContentParser != null) {searchRequest.source().parseXContent(requestContentParser, true);}//协调节点的保护,从每个shard获取的数据最大数据量final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());searchRequest.setBatchedReduceSize(batchedReduceSize);//预过滤分片数量if (request.hasParam("pre_filter_shard_size")) {searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE));}//分片并发查询数量if (request.hasParam("max_concurrent_shard_requests")) {// only set if we have the parameter since we auto adjust the max concurrency on the coordinator// based on the number of nodes in the clusterfinal int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests",searchRequest.getMaxConcurrentShardRequests());searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);}//允许部分搜索结果if (request.hasParam("allow_partial_search_results")) {// only set if we have the parameter passed to override the cluster-level defaultsearchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));}// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types// from the REST layer. these modes are an internal optimization and should// not be specified explicitly by the user.//搜索类型,默认query_then_fetchString searchType = request.param("search_type");//下面两种查询已经被废弃if ("query_and_fetch".equals(searchType) ||"dfs_query_and_fetch".equals(searchType)) {throw new IllegalArgumentException("Unsupported search type [" + searchType + "]");} else {searchRequest.searchType(searchType);}parseSearchSource(searchRequest.source(), request, setSize);//查询缓存searchRequest.requestCache(request.paramAsBoolean("request_cache", searchRequest.requestCache()));//滚动查询String scroll = request.param("scroll");if (scroll != null) {searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, "scroll")));}//type字段已废弃if (request.hasParam("type")) {deprecationLogger.deprecatedAndMaybeLog("search_with_types", TYPES_DEPRECATION_MESSAGE);searchRequest.types(Strings.splitStringByCommaToArray(request.param("type")));}searchRequest.routing(request.param("routing"));searchRequest.preference(request.param("preference"));searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));checkRestTotalHits(request, searchRequest);}private static void parseSearchSource(final SearchSourceBuilder searchSourceBuilder, RestRequest request, IntConsumer setSize) {QueryBuilder queryBuilder = RestActions.urlParamsToQueryBuilder(request);if (queryBuilder != null) {searchSourceBuilder.query(queryBuilder);}//分页查询int from = request.paramAsInt("from", -1);if (from != -1) {searchSourceBuilder.from(from);}int size = request.paramAsInt("size", -1);if (size != -1) {setSize.accept(size);}//特定文档如何与查询匹配的详细信息和解释if (request.hasParam("explain")) {searchSourceBuilder.explain(request.paramAsBoolean("explain", null));}//带版本号查询if (request.hasParam("version")) {searchSourceBuilder.version(request.paramAsBoolean("version", null));}//是否返回seqNo和primaryTermif (request.hasParam("seq_no_primary_term")) {searchSourceBuilder.seqNoAndPrimaryTerm(request.paramAsBoolean("seq_no_primary_term", null));}//超时时间if (request.hasParam("timeout")) {searchSourceBuilder.timeout(request.paramAsTime("timeout", null));}//收集后终止搜索if (request.hasParam("terminate_after")) {int terminateAfter = request.paramAsInt("terminate_after",SearchContext.DEFAULT_TERMINATE_AFTER);if (terminateAfter < 0) {throw new IllegalArgumentException("terminateAfter must be > 0");} else if (terminateAfter > 0) {searchSourceBuilder.terminateAfter(terminateAfter);}}//指示应如何获取存储的字段StoredFieldsContext storedFieldsContext =StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request);if (storedFieldsContext != null) {searchSourceBuilder.storedFields(storedFieldsContext);}//从doc_value字段查询String sDocValueFields = request.param("docvalue_fields");if (sDocValueFields != null) {if (Strings.hasText(sDocValueFields)) {String[] sFields = Strings.splitStringByCommaToArray(sDocValueFields);for (String field : sFields) {searchSourceBuilder.docValueField(field, null);}}}//从_source获取结果FetchSourceContext fetchSourceContext = FetchSourceContext.parseFromRestRequest(request);if (fetchSourceContext != null) {searchSourceBuilder.fetchSource(fetchSourceContext);}//在排序时应用,并控制是否也会跟踪分数if (request.hasParam("track_scores")) {searchSourceBuilder.trackScores(request.paramAsBoolean("track_scores", false));}//如果值未设置,则返回应跟踪的总命中数或 nullif (request.hasParam("track_total_hits")) {if (Booleans.isBoolean(request.param("track_total_hits"))) {searchSourceBuilder.trackTotalHits(request.paramAsBoolean("track_total_hits", true));} else {searchSourceBuilder.trackTotalHitsUpTo(request.paramAsInt("track_total_hits", SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO));}}//排序字段String sSorts = request.param("sort");if (sSorts != null) {String[] sorts = Strings.splitStringByCommaToArray(sSorts);for (String sort : sorts) {int delimiter = sort.lastIndexOf(":");if (delimiter != -1) {String sortField = sort.substring(0, delimiter);String reverse = sort.substring(delimiter + 1);if ("asc".equals(reverse)) {searchSourceBuilder.sort(sortField, SortOrder.ASC);} else if ("desc".equals(reverse)) {searchSourceBuilder.sort(sortField, SortOrder.DESC);}} else {searchSourceBuilder.sort(sort);}}}//此请求将汇总的统计信息组String sStats = request.param("stats");if (sStats != null) {searchSourceBuilder.stats(Arrays.asList(Strings.splitStringByCommaToArray(sStats)));}//推荐词String suggestField = request.param("suggest_field");if (suggestField != null) {String suggestText = request.param("suggest_text", request.param("q"));int suggestSize = request.paramAsInt("suggest_size", 5);String suggestMode = request.param("suggest_mode");searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(suggestField,termSuggestion(suggestField).text(suggestText).size(suggestSize).suggestMode(SuggestMode.resolve(suggestMode))));}}
创建rest client开始发送请求
return channel -> {RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));};
执行execute方法
public < Request extends ActionRequest,Response extends ActionResponse> Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {//获取transportAction开始执行return transportAction(action).execute(request, listener);}
最终调用到TransportSearchAction的doExecute方法
search操作会发送请求到索引下的所有分片,相同分片只会发送到主分片或副本分片
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {//服务启动相对时间final long relativeStartNanos = System.nanoTime();final SearchTimeProvider timeProvider =new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {if (source != searchRequest.source()) {// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch// situations when source is rewritten to null due to a bugsearchRequest.source(source);}//对其他集群的索引和当前集群的索引分组,合并本集群shard列表和远程集群的shardfinal ClusterState clusterState = clusterService.state();final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);//执行本集群搜索if (remoteClusterIndices.isEmpty()) {executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener);} else {//最小化查询次数if (shouldMinimizeRoundtrips(searchRequest)) {ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider,searchService.aggReduceContextBuilder(searchRequest),remoteClusterService, threadPool, listener,(r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));} else {AtomicInteger skippedClusters = new AtomicInteger(0);//收集所有需要查询的分片collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,ActionListener.wrap(//查询分片返回的结果searchShardsResponses -> {List<SearchShardIterator> remoteShardIterators = new ArrayList<>();Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);//查询集群数int localClusters = localIndices == null ? 0 : 1;int totalClusters = remoteClusterIndices.size() + localClusters;int successfulClusters = searchShardsResponses.size() + localClusters;//执行查询executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));},listener::onFailure));}}}, listener::onFailure);if (searchRequest.source() == null) {rewriteListener.onResponse(searchRequest.source());} else {Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),rewriteListener);}}
这里收集本集群和跨集群及其他集群需要查询的索引,这里我们主要是分析本集群搜索
ES中查询主要包括两个方式:
- QUERY_THEN_FETCH:默认搜索方式, 先向所有的 shard 发出请求, 各分片只返回文档 id和排
名相关的信息及文档的打分然后按照各分片返回的文档的分数进行重新排序和排名, 取前
size 个文档。然后根据文档 id 去相关的 shard 取 document。 数据量是准确的,数据排名不准确 - DFS_QUERY_THEN_FETCH:在上面的查询方式之前多个一个DFS的过程,也就是在进行查询之前, 先对所有分片发送请求, 把所有分片中的词频和文档频率等打分依据全部汇总到一块, 再执行后面的操作,数据排名准确,但是性能比上面性能要差
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,SearchResponse.Clusters clusters) {clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead// of just for the _search apifinal Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider);Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),searchRequest.indices());routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);String[] concreteIndices = new String[indices.length];for (int i = 0; i < indices.length; i++) {concreteIndices[i] = indices[i].getName();}//每个节点查询次数Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);//合并需要查询的shard分片GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,searchRequest.getLocalClusterAlias(), remoteShardIterators);//检查需要查询分片数量是否超过阈值failIfOverShardCountLimit(clusterService, shardIterators.size());Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);// optimize search type for cases where there is only one shard group to search on//针对只有一个分片组要搜索的情况优化搜索类型if (shardIterators.size() == 1) {// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shardsearchRequest.searchType(QUERY_THEN_FETCH);}if (searchRequest.allowPartialSearchResults() == null) {// No user preference defined in search request - apply cluster service default//允许部分搜索结果返回searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());}if (searchRequest.isSuggestOnly()) {// disable request cache if we have only suggest// 如果我们只有建议,则禁用请求缓存searchRequest.requestCache(false);switch (searchRequest.searchType()) {case DFS_QUERY_THEN_FETCH:// convert to Q_T_F if we have only suggestsearchRequest.searchType(QUERY_THEN_FETCH);break;}}final DiscoveryNodes nodes = clusterState.nodes();BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),nodes::get, remoteConnections, searchTransportService::getConnection);//提前过滤分片boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, indices, shardIterators.size());//异步查询searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState,Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();}
根据两个搜索方式创建不同的action
AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction;switch (searchRequest.searchType()) {case DFS_QUERY_THEN_FETCH:searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,shardIterators, timeProvider, clusterState, task, clusters);break;case QUERY_THEN_FETCH:searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, /*查询后的回调*/listener,shardIterators, timeProvider, clusterState, task, clusters);break;default:throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");}return searchAsyncAction;
创建action后开始执行
public final void start() {if (getNumShards() == 0) {//no search shards to search on, bail with empty response//(it happens with search across _all with no indices around and consistent with broadcast operations)int trackTotalHitsUpTo = request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :request.source().trackTotalHitsUpTo() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :request.source().trackTotalHitsUpTo();// total hits is null in the response if the tracking of total hits is disabledboolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED;listener.onResponse(new SearchResponse(InternalSearchResponse.empty(withTotalHits), null, 0, 0, 0, buildTookInMillis(),ShardSearchFailure.EMPTY_ARRAY, clusters));return;}//执行搜索executePhase(this);
}private void executePhase(SearchPhase phase) {try {phase.run();} catch (Exception e) {if (logger.isDebugEnabled()) {logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);}onPhaseFailure(phase, "", e);}
}
遍历所有分片发送请求,并设置回调函数接收分片返回结果
public final void run() {for (final SearchShardIterator iterator : toSkipShardsIts) {assert iterator.skip();skipShard(iterator);}if (shardsIts.size() > 0) {...for (int index = 0; index < shardsIts.size(); index++) {//分片路由信息final SearchShardIterator shardRoutings = shardsIts.get(index);assert shardRoutings.skip() == false;//分片执行performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());}}}private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {if (shard == null) {fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));} else {//节流并发请求final PendingExecutions pendingExecutions = throttleConcurrentRequests ?pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode)): null;Runnable r = () -> {final Thread thread = Thread.currentThread();try {//执行查询executePhaseOnShard(shardIt, shard,new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {@Overridepublic void innerOnResponse(Result result) {try {//分片返回结果onShardResult(result, shardIt);} finally {executeNext(pendingExecutions, thread);}}@Overridepublic void onFailure(Exception t) {try {onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);} finally {executeNext(pendingExecutions, thread);}}});} catch (final Exception e) {try { fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));} finally {executeNext(pendingExecutions, thread);}}};//是否限流if (throttleConcurrentRequests) {pendingExecutions.tryRun(r);} else {r.run();}}}
默认为QUERY_THEN_FETCH方式,调用SearchQueryThenFetchAsyncAction的executePhaseOnShard方法
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,final SearchActionListener<SearchPhaseResult> listener) {//封装请求ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),request, getTask(), listener);}
获取连接然后发送action为indices:data/read/search[phase/query]的RPC请求
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchRequest request, SearchTask task,final SearchActionListener<SearchPhaseResult> listener) {// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request// this used to be the QUERY_AND_FETCH which doesn't exist anymore.final boolean fetchDocuments = request.numberOfShards() == 1;Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;final ActionListener handler = responseWrapper.apply(connection, listener);//发送查询transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId()));}
- 数据节点
数据节点对应的RPC注册的处理handler
//注册分片查询处理transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new,(request, channel, task) -> {//执行查询searchService.executeQueryPhase(request, (SearchShardTask) task,/*查询后回调*/new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request));});
数据节点接收RPC请求执行query阶段
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1: "empty responses require more than one shard";//索引服务IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//索引分片IndexShard shard = indexService.getShard(request.shardId().id());rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {@Overridepublic void onResponse(ShardSearchRequest orig) {//如果没有查询到doc可以返回nullif (orig.canReturnNullResponseIfMatchNoDocs()) { ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig);try (Engine.Searcher searcher = shard.acquireCanMatchSearcher()) {QueryShardContext context = indexService.newQueryShardContext(canMatchRequest.shardId().id(), searcher,canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias());Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true);} catch (Exception exc) {listener.onFailure(exc);return;}if (canRewriteToMatchNone(canMatchRequest.source())&& canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) {assert canMatchRequest.scroll() == null : "must always create search context for scroll requests";listener.onResponse(QuerySearchResult.nullInstance());return;}}// fork the execution in the search thread pool//在搜索线程池中执行runAsync(shard, () -> executeQueryPhase(orig, task), listener);}@Overridepublic void onFailure(Exception exc) {listener.onFailure(exc);}});}
执行search操作
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {final SearchContext context = createAndPutContext(request, task);context.incRef();try {final long afterQueryTime;try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {contextProcessing(context);//执行查询阶段loadOrExecuteQueryPhase(request, context);if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {freeContext(context.id());} else {contextProcessedSuccessfully(context);}afterQueryTime = executor.success();}//如果只有一个分片,执行fetch阶段if (request.numberOfShards() == 1) {return executeFetchPhase(context, afterQueryTime);}return context.queryResult();} catch (Exception e) {// execution exception can happen while loading the cache, strip itif (e instanceof ExecutionException) {e = (e.getCause() == null || e.getCause() instanceof Exception) ?(Exception) e.getCause() : new ElasticsearchException(e.getCause());}logger.trace("Query phase failed", e);processFailure(context, e);throw e;} finally {cleanContext(context);}}
创建查询上下文对象SearchContext跟踪整个search流程,并且查询后的结果也会填充到SearchContext中
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {final boolean canCache = indicesService.canCache(request, context);context.getQueryShardContext().freezeContext();if (canCache) {indicesService.loadIntoContext(request, context, queryPhase);} else {//执行查询queryPhase.execute(context);}}public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {if (searchContext.hasOnlySuggest()) {suggestPhase.execute(searchContext);searchContext.queryResult().topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),new DocValueFormat[0]);return;}if (LOGGER.isTraceEnabled()) {LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));}//尽可能晚地预处理聚合。在 DFS_Q_T_F 的情况下// 请求,preProcess 在 DFS 阶段调用,这就是我们预处理它们的原因//这里是为了确保它发生在QUERY阶段aggregationPhase.preProcess(searchContext);//lucene搜索boolean rescore = executeInternal(searchContext);if (rescore) { // only if we do a regular search//重新计算打分rescorePhase.execute(searchContext);}//纠错suggestPhase.execute(searchContext);//聚合aggregationPhase.execute(searchContext);if (searchContext.getProfilers() != null) {ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());searchContext.queryResult().profileResults(shardResults);}}
这里可以看到首先会执行lucene的查询,然后对查询结果进行打分、执行聚合逻辑,然后生成SearchPhaseResult返回
数据节点查询数据后返回协调节点,我们继续回到executePhaseOnShard方法执行回调
//执行查询
executePhaseOnShard(shardIt, shard,new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {@Overridepublic void innerOnResponse(Result result) {try {//分片返回结果onShardResult(result, shardIt);} finally {executeNext(pendingExecutions, thread);}}@Overridepublic void onFailure(Exception t) {try {onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);} finally {executeNext(pendingExecutions, thread);}}});
保存分片查询返回的数据到results数组中并进行计数
protected void onShardResult(Result result, SearchShardIterator shardIt) {assert result.getShardIndex() != -1 : "shard index is not set";assert result.getSearchShardTarget() != null : "search shard target must not be null";successfulOps.incrementAndGet();results.consumeResult(result);hasShardResponse.set(true);if (logger.isTraceEnabled()) {logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);}AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();if (shardFailures != null) {shardFailures.set(result.getShardIndex(), null);}successfulShardExecution(shardIt);}
判断所有分片查询是否都已经返回,如果都已经返回则执行下一阶段及fetch阶段
private void successfulShardExecution(SearchShardIterator shardsIt) {final int remainingOpsOnIterator;if (shardsIt.skip()) {remainingOpsOnIterator = shardsIt.remaining();} else {remainingOpsOnIterator = shardsIt.remaining() + 1;}final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);//所有分片都已经返回if (xTotalOps == expectedTotalOps) {onPhaseDone();} else if (xTotalOps > expectedTotalOps) {throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["+ expectedTotalOps + "]");}}final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()executeNextPhase(this, getNextPhase(results, this));}@Overrideprotected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {//获取数据阶段return new FetchSearchPhase(results, searchPhaseController, context, clusterState());}FetchSearchPhase(SearchPhaseResults<SearchPhaseResult> resultConsumer,SearchPhaseController searchPhaseController,SearchPhaseContext context,ClusterState clusterState) {this(resultConsumer, searchPhaseController, context, clusterState,//最后收尾阶段(response, scrollId) -> new ExpandSearchPhase(context, response, scrollId));}
- fetch阶段
@Overridepublic void run() {context.execute(new AbstractRunnable() {@Overrideprotected void doRun() throws Exception {// we do the heavy lifting in this inner run method where we reduce aggs etc. that's why we fork this phase// off immediately instead of forking when we send back the response to the user since there we only need// to merge together the fetched results which is a linear operation.innerRun();}@Overridepublic void onFailure(Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}});}private void innerRun() throws IOException {//查询分片数量final int numShards = context.getNumShards();//是否是滚动查询final boolean isScrollSearch = context.getRequest().scroll() != null;//search阶段返回结果final List<SearchPhaseResult> phaseResults = queryResults.asList();final String scrollId;//如果是滚动查询则生成scrollIdif (isScrollSearch) {final boolean includeContextUUID = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_7_7_0);scrollId = TransportSearchHelper.buildScrollId(queryResults, includeContextUUID);} else {scrollId = null;}final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();final boolean queryAndFetchOptimization = queryResults.length() == 1;//fetch结束后执行任务final Runnable finishPhase = ()-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?queryResults : fetchResults);if (queryAndFetchOptimization) {assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()+ "], single result: " + phaseResults.get(0).fetchResult();// query AND fetch optimizationfinishPhase.run();} else {ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs);// no docs to fetch -- sidestep everything and return//无文档执行fetchif (scoreDocs.length == 0) {// we have to release contexts here to free up resourcesphaseResults.stream().map(SearchPhaseResult::queryResult).forEach(this::releaseIrrelevantSearchContext);finishPhase.run();} else {final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards): null;final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or notfinishPhase, context);for (int i = 0; i < docIdsToLoad.length; i++) {IntArrayList entry = docIdsToLoad[i];SearchPhaseResult queryResult = queryResults.get(i);if (entry == null) { // no results for this shard IDif (queryResult != null) {// if we got some hits from this shard we have to release the context there// we do this as we go since it will free up resources and passing on the request on the// transport layer is cheap.releaseIrrelevantSearchContext(queryResult.queryResult());progressListener.notifyFetchResult(i);}// in any case we count down this result since we don't talk to this shard anymorecounter.countDown();} else {SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),searchShardTarget.getNodeId());//创建fetch请求ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getContextId(), i, entry,lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());//执行fetchexecuteFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),connection);}}}}}
创建fetch请求,执行fetch
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,final CountedCollector<FetchSearchResult> counter,final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,final Transport.Connection connection) {//发送fetch请求context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {@Overridepublic void innerOnResponse(FetchSearchResult result) {try {progressListener.notifyFetchResult(shardIndex);counter.onResult(result);} catch (Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}}@Overridepublic void onFailure(Exception e) {try {logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.contextId()), e);progressListener.notifyFetchFailure(shardIndex, shardTarget, e);counter.onFailure(shardIndex, shardTarget, e);} finally {releaseIrrelevantSearchContext(querySearchResult);}}});}
发送RPC为indices:data/read/search[phase/fetch/id]的fetch请求
public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task,final SearchActionListener<FetchSearchResult> listener) {sendExecuteFetch(connection, FETCH_ID_ACTION_NAME, request, task, listener);}
数据节点注册的handler
//fetch请求transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,(request, channel, task) -> {//执行请求searchService.executeFetchPhase(request, (SearchShardTask) task,new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request));});
- 数据节点执行fetch阶段
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {runAsync(request.contextId(), () -> {final SearchContext context = findContext(request.contextId(), request);context.incRef();try {context.setTask(task);contextProcessing(context);if (request.lastEmittedDoc() != null) {context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();}context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) {//fetch查询fetchPhase.execute(context);if (fetchPhaseShouldFreeContext(context)) {freeContext(request.contextId());} else {contextProcessedSuccessfully(context);}executor.success();}//返回结果return context.fetchResult();} catch (Exception e) {logger.trace("Fetch phase failed", e);processFailure(context, e);throw e;} finally {cleanContext(context);}}, listener);}
遍历文档id集合,查询详细信息
@Overridepublic void execute(SearchContext context) {if (LOGGER.isTraceEnabled()) {LOGGER.trace("{}", new SearchContextSourcePrinter(context));}final FieldsVisitor fieldsVisitor;Map<String, Set<String>> storedToRequestedFields = new HashMap<>();StoredFieldsContext storedFieldsContext = context.storedFieldsContext();if (storedFieldsContext == null) {// no fields specified, default to return source if no explicit indicationif (!context.hasScriptFields() && !context.hasFetchSourceContext()) {context.fetchSourceContext(new FetchSourceContext(true));}fieldsVisitor = new FieldsVisitor(context.sourceRequested());} else if (storedFieldsContext.fetchFields() == false) {// disable stored fields entirelyfieldsVisitor = null;} else {for (String fieldNameOrPattern : context.storedFieldsContext().fieldNames()) {if (fieldNameOrPattern.equals(SourceFieldMapper.NAME)) {FetchSourceContext fetchSourceContext = context.hasFetchSourceContext() ? context.fetchSourceContext(): FetchSourceContext.FETCH_SOURCE;context.fetchSourceContext(new FetchSourceContext(true, fetchSourceContext.includes(), fetchSourceContext.excludes()));continue;}Collection<String> fieldNames = context.mapperService().simpleMatchToFullName(fieldNameOrPattern);for (String fieldName : fieldNames) {MappedFieldType fieldType = context.smartNameFieldType(fieldName);if (fieldType == null) {// Only fail if we know it is a object field, missing paths / fields shouldn't fail.if (context.getObjectMapper(fieldName) != null) {throw new IllegalArgumentException("field [" + fieldName + "] isn't a leaf field");}} else {String storedField = fieldType.name();Set<String> requestedFields = storedToRequestedFields.computeIfAbsent(storedField, key -> new HashSet<>());requestedFields.add(fieldName);}}}boolean loadSource = context.sourceRequested();if (storedToRequestedFields.isEmpty()) {// empty list specified, default to disable _source if no explicit indicationfieldsVisitor = new FieldsVisitor(loadSource);} else {fieldsVisitor = new CustomFieldsVisitor(storedToRequestedFields.keySet(), loadSource);}}try {SearchHit[] hits = new SearchHit[context.docIdsToLoadSize()];FetchSubPhase.HitContext hitContext = new FetchSubPhase.HitContext();for (int index = 0; index < context.docIdsToLoadSize(); index++) {if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}//文档idint docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());LeafReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);int subDocId = docId - subReaderContext.docBase;final SearchHit searchHit;int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);if (rootDocId != -1) {searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId,storedToRequestedFields, subReaderContext);} else {searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId,storedToRequestedFields, subReaderContext);}hits[index] = searchHit;hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher());//执行各种元数据获取for (FetchSubPhase fetchSubPhase : fetchSubPhases) {fetchSubPhase.hitExecute(context, hitContext);}}if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}//执行各种元数据获取for (FetchSubPhase fetchSubPhase : fetchSubPhases) {fetchSubPhase.hitsExecute(context, hits);if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}}TotalHits totalHits = context.queryResult().getTotalHits();context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));} catch (IOException e) {throw ExceptionsHelper.convertToElastic(e);}}private SearchHit createSearchHit(SearchContext context,FieldsVisitor fieldsVisitor,int docId,int subDocId,Map<String, Set<String>> storedToRequestedFields,LeafReaderContext subReaderContext) {DocumentMapper documentMapper = context.mapperService().documentMapper();Text typeText = documentMapper.typeText();if (fieldsVisitor == null) {return new SearchHit(docId, null, typeText, null, null);}//查询lucene保存的字段Map<String, DocumentField> searchFields = getSearchFields(context, fieldsVisitor, subDocId,storedToRequestedFields, subReaderContext);Map<String, DocumentField> metaFields = new HashMap<>();Map<String, DocumentField> documentFields = new HashMap<>();SearchHit.splitFieldsByMetadata(searchFields, documentFields, metaFields);SearchHit searchHit = new SearchHit(docId, fieldsVisitor.uid().id(), typeText, documentFields, metaFields);// Set _source if requested.SourceLookup sourceLookup = context.lookup().source();sourceLookup.setSegmentAndDocument(subReaderContext, subDocId);if (fieldsVisitor.source() != null) {sourceLookup.setSource(fieldsVisitor.source());}return searchHit;}
数据节点fetch数据后返回,协调节点。继续回到executeFetch方法的回调函数
@Override
public void innerOnResponse(FetchSearchResult result) {try {progressListener.notifyFetchResult(shardIndex);counter.onResult(result);} catch (Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}
}
将返回的数据填充到fetchResults中,最后执行finish阶段
final Runnable finishPhase = ()-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?queryResults : fetchResults);private void moveToNextPhase(SearchPhaseController searchPhaseController,String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);context.executeNextPhase(this, nextPhaseFactory.apply(internalResponse, scrollId));}
执行ExpandSearchPhase的run方法
@Overridepublic void run() {if (isCollapseRequest() && searchResponse.hits().getHits().length > 0) {SearchRequest searchRequest = context.getRequest();CollapseBuilder collapseBuilder = searchRequest.source().collapse();final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();MultiSearchRequest multiRequest = new MultiSearchRequest();if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());}for (SearchHit hit : searchResponse.hits().getHits()) {BoolQueryBuilder groupQuery = new BoolQueryBuilder();Object collapseValue = hit.field(collapseBuilder.getField()).getValue();if (collapseValue != null) {groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));} else {groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));}QueryBuilder origQuery = searchRequest.source().query();if (origQuery != null) {groupQuery.must(origQuery);}for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder).query(groupQuery).postFilter(searchRequest.source().postFilter());SearchRequest groupRequest = new SearchRequest(searchRequest);groupRequest.source(sourceBuilder);multiRequest.add(groupRequest);}}context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(),ActionListener.wrap(response -> {Iterator<MultiSearchResponse.Item> it = response.iterator();for (SearchHit hit : searchResponse.hits.getHits()) {for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {MultiSearchResponse.Item item = it.next();if (item.isFailure()) {context.onPhaseFailure(this, "failed to expand hits", item.getFailure());return;}SearchHits innerHits = item.getResponse().getHits();if (hit.getInnerHits() == null) {hit.setInnerHits(new HashMap<>(innerHitBuilders.size()));}hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);}}context.sendSearchResponse(searchResponse, scrollId);}, context::onFailure));} else {//返回查询结果context.sendSearchResponse(searchResponse, scrollId);}}
返回查询结果
@Overridepublic void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {ShardSearchFailure[] failures = buildShardFailures();Boolean allowPartialResults = request.allowPartialSearchResults();assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";if (allowPartialResults == false && failures.length > 0){raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));} else {listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId, failures));}}
这里的listener就是new RestStatusToXContentListener<>(channel)
@Overridepublic RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {assert response.isFragment() == false; //would be nice if we could make default methods finalresponse.toXContent(builder, channel.request());RestResponse restResponse = new BytesRestResponse(response.status(), builder);if (RestStatus.CREATED == restResponse.status()) {final String location = extractLocation.apply(response);if (location != null) {restResponse.addHeader("Location", location);}}return restResponse;}@Overridepublic final RestResponse buildResponse(Response response) throws Exception {return buildResponse(response, channel.newBuilder());}protected final void processResponse(Response response) throws Exception {channel.sendResponse(buildResponse(response));}
最后通过channel将结果返回客户端
相关文章:
elasticsearch源码分析-08Serch查询流程
Serch查询流程 查询请求Rest路由注册也是在actionModule中 //查询操作 registerHandler.accept(new RestSearchAction());Override public List<Route> routes() {return unmodifiableList(asList(new Route(GET, "/_search"),new Route(POST, "/_searc…...

【协作提效 Go - gin ! swagger】
什么是swagger Swagger 是一个用于设计、构建、记录和使用 RESTful Web 服务的工具集。它的主要作用包括: API 文档生成:Swagger 可以自动生成详细的 API 文档,包括每个端点的请求和响应格式、参数、状态码等。这使得开发者和用户可以轻松理…...
栈和队列——3.滑动窗口最大值
力扣题目链接 给定一个数组 nums,有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回滑动窗口中的最大值。 示例: 输入:nums[1,3,-1,-3,5,3,6,7],k 3 …...
嵌入式智能手表开发系列文章之开篇
不好意思,朋友们,我回来了。想想已经断更了好久了。在这段断更的日子里。开拓了个新领域,不搞android 产品,而是去搞嵌入式智能手表啦。 接下来我会用几篇文章来介绍下我对这个领域的看法体会,以及我自己所负责领域的…...
24.8.2数据结构|双链表
双链表 1、定义结构:2个指针域、数据域 2、初始化:创建一个含有N个结点的带头结点双链表head (双链表头结点的前驱与和尾节点的后继与置为空) 3、求表长:返回双链表head的长度 4、取元素:取出双链表head中…...

RabbitMQ高级特性 - 事务消息
文章目录 RabbitMQ 事务消息概述实现原理代码实现不采用事务采用事务 RabbitMQ 事务消息 概述 RabbitMQ 的 AMQP 协议实现了事务机制,允许开发者保证消息的发送和接收时原子性的,也就是说,要么消息全都发送成功,要么全都发送失败…...
leetcode:心算挑战
题目: 心算项目的挑战比赛中,要求选手从N张卡牌中选出cnt张卡牌,若这cnt张卡牌数字总和为偶数,则选手成绩「有效」且得分为cnt张卡牌数字总和。给定数组cards和cnt,其中cards[i]表示第i张卡牌上的数字。 请帮参赛选手计…...

docker部署java项目(war包方式)
场景描述:java项目war包,在开发开电脑上使用dockerfile构建镜像,上传镜像到客户服务器中使用docker加载docker镜像,然后部署。 目录 一、本地环境安装 docker git 二、服务器环境安装 docker 三、构建docker镜像(win系统) 四、注意事项 (1)系统架构 (2)使…...

jsp 自定义taglib
一、简介 我们在javaWeb开发中,经常会用到jsp的taglib标签,有时候并不能满足我们的实际需要,这就需要我们自定义taglib标签, 二、开发步骤 1、编写control方法,继承BodyTagSupport 2、定义zdytaglib.tld标签文件 3、…...

从一到无穷大 #32 TimeCloth,云上的快速 Point-in-Time Recovery
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。 文章目录 引言解决方案FAST FINE-GRAINED PITRLog FilterInter-Record Dependency ResolutionL…...
时间序列论文1——Forecasting at Scale
目录 0. AI总结0.1 文章概述0.2 研究背景0.3 研究思路0.4 研究结论与讨论1. Introduction2 Features of Business Time Series3 The Prophet Forecasting Model3.1 The Trend Model3.2 Seasonality3.3 Holidays and Events3.4 Model Fitting3.5 Analyst-in-the-Loop Modeling4 …...

HDFS常用命令
HDFS常用命令 1.HDFS命令介绍1.1基本语法格式1.2常用命令 1.HDFS命令介绍 HDFS 提供了一组命令行工具,用于管理和操作 HDFS 文件系统。 1.1基本语法格式 hdfs dfs -<命令> [选项] <参数>1.2常用命令 1.显示<path>指定的文件的详细信息。 had…...

请问如何做好软件测试工作呢?
一、明确测试目标和范围 理解测试目的:在开始测试之前,首先要明确测试的目标和范围,确保测试计划 与需求相匹配。这有助于测试人员聚焦在关键功能上,避免浪费时间和资源。制定详细的测试计划:根据项目需求࿰…...
单片机开发与Linux开发的区别
引言 单片机(MCU)和Linux开发是嵌入式系统领域的两大主要方向。它们在硬件平台、开发环境、应用场景和开发难度上存在显著区别。本文将系统性地比较单片机开发和Linux开发,探讨它们的主要区别及各自的应用场景和难度体系。 一、基本概念 1…...

【机器学习】回归类算法-相关性分析
一、前言 前面的几篇博客我们学习了分类算法,今天我们来了解一下回归类的算法吧。首先我们来谈谈两者有什么区别,首先是我们在之前的分类算法,这类算法可以将让我们学会如何将不同的数据划分到不同的类里面,输出的是一些离散的值。…...

java基础 之 集合与栈的使用(三)
文章目录 Map接口(一)实现类:HashMap特点HashMap集合的一些方法 (二)实现类: TreeMap特点【自然排序】代码【定制排序】代码TreeMap集合的一些方法 HashMap 和 TreeMap的区别 前文回顾: 戳这里 …...

JDK-java.nio包详解
JDK-java.nio包详解 概述 一直以来Java三件套(集合、io、多线程)都是最热门的Java基础技术点,我们要深入掌握好这三件套才能在日常开发中得心应手,之前有编写集合相关的文章,这里出一篇文章来梳理一下io相关的知识点。…...
虚拟机与服务器的区别是什么?虚拟机与服务器的区别和联系
服务器和虚拟机是两个不同的概念,它们在计算机领域有着不同的含义和作用。今天飞飞就和你分享虚拟机和服务器的区别和联系,希望可以帮助到你~ 1、物理形态 a)服务器是实实在在的物理设备,拥有独立的硬件架构。如CPU、硬盘、内存等 b)虚拟机…...
Linux CentOS stream9 命令
初学linux,对字符界面的命令并不陌生。问到什么是linux命令直接答cd、pwd、ls是linux命令。对于命令的定义并熟悉,也不太关心命令的底层执行逻辑,更关心录入命令,马上获取需要的结果。 本文就命令的定义、分类或执行优先级作一简单介绍。 一、定义 搜索网上对linux命令的…...

JavaScript基础——JavaScript变量声明
变量是存储数据的容器,可以变的量,值可以改变,在JavaScript中,变量声明的关键字有var、let,其中,var是ES5的语法,let是ES6的语法,变量需要先声明,在使用。 声明一个age变…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
uniapp中使用aixos 报错
问题: 在uniapp中使用aixos,运行后报如下错误: AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...