elasticsearch源码分析-08Serch查询流程
Serch查询流程
查询请求Rest路由注册也是在actionModule中
//查询操作
registerHandler.accept(new RestSearchAction());@Override
public List<Route> routes() {return unmodifiableList(asList(new Route(GET, "/_search"),new Route(POST, "/_search"),new Route(GET, "/{index}/_search"),new Route(POST, "/{index}/_search"),// Deprecated typed endpoints.new Route(GET, "/{index}/{type}/_search"),new Route(POST, "/{index}/{type}/_search")));
}
和Get查询一样,请求会经过转发dispatchRequest,查询路由对应的handler处理类,然后执行处理
@Overridepublic void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {//小图标请求处理if (request.rawPath().equals("/favicon.ico")) {handleFavicon(request.method(), request.uri(), channel);return;}try {//处理rest请求tryAllHandlers(request, channel, threadContext);} catch (Exception e) {try {channel.sendResponse(new BytesRestResponse(channel, e));} catch (Exception inner) {inner.addSuppressed(e);logger.error(() ->new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);}}}
调用BaseRestHandler的handleRequest方法
@Overridepublic final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {// prepare the request for execution; has the side effect of touching the request parameters//处理请求final RestChannelConsumer action = prepareRequest(request, client);// validate unconsumed params, but we must exclude params used to format the response// use a sorted set so the unconsumed parameters appear in a reliable sorted orderfinal SortedSet<String> unconsumedParams =request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));// validate the non-response paramsif (!unconsumedParams.isEmpty()) {final Set<String> candidateParams = new HashSet<>();candidateParams.addAll(request.consumedParams());candidateParams.addAll(responseParams());throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));}if (request.hasContent() && request.isContentConsumed() == false) {throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");}usageCount.increment();// execute the actionaction.accept(channel);}
首先构造请求request对象,RestSearchAction对该方法进行了重写
@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {//创建查询请求对象SearchRequest searchRequest = new SearchRequest();IntConsumer setSize = size -> searchRequest.source().size(size);request.withContentOrSourceParamParserOrNull(parser ->parseSearchRequest(searchRequest, request, parser, setSize));return channel -> {RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));};}
首先根据请求的request解析成查询请求
public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,XContentParser requestContentParser,IntConsumer setSize) throws IOException {//查询源包含哪些字段,为空查询所有字段if (searchRequest.source() == null) {searchRequest.source(new SearchSourceBuilder());}//查询索引searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));if (requestContentParser != null) {searchRequest.source().parseXContent(requestContentParser, true);}//协调节点的保护,从每个shard获取的数据最大数据量final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());searchRequest.setBatchedReduceSize(batchedReduceSize);//预过滤分片数量if (request.hasParam("pre_filter_shard_size")) {searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE));}//分片并发查询数量if (request.hasParam("max_concurrent_shard_requests")) {// only set if we have the parameter since we auto adjust the max concurrency on the coordinator// based on the number of nodes in the clusterfinal int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests",searchRequest.getMaxConcurrentShardRequests());searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);}//允许部分搜索结果if (request.hasParam("allow_partial_search_results")) {// only set if we have the parameter passed to override the cluster-level defaultsearchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));}// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types// from the REST layer. these modes are an internal optimization and should// not be specified explicitly by the user.//搜索类型,默认query_then_fetchString searchType = request.param("search_type");//下面两种查询已经被废弃if ("query_and_fetch".equals(searchType) ||"dfs_query_and_fetch".equals(searchType)) {throw new IllegalArgumentException("Unsupported search type [" + searchType + "]");} else {searchRequest.searchType(searchType);}parseSearchSource(searchRequest.source(), request, setSize);//查询缓存searchRequest.requestCache(request.paramAsBoolean("request_cache", searchRequest.requestCache()));//滚动查询String scroll = request.param("scroll");if (scroll != null) {searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, "scroll")));}//type字段已废弃if (request.hasParam("type")) {deprecationLogger.deprecatedAndMaybeLog("search_with_types", TYPES_DEPRECATION_MESSAGE);searchRequest.types(Strings.splitStringByCommaToArray(request.param("type")));}searchRequest.routing(request.param("routing"));searchRequest.preference(request.param("preference"));searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));checkRestTotalHits(request, searchRequest);}private static void parseSearchSource(final SearchSourceBuilder searchSourceBuilder, RestRequest request, IntConsumer setSize) {QueryBuilder queryBuilder = RestActions.urlParamsToQueryBuilder(request);if (queryBuilder != null) {searchSourceBuilder.query(queryBuilder);}//分页查询int from = request.paramAsInt("from", -1);if (from != -1) {searchSourceBuilder.from(from);}int size = request.paramAsInt("size", -1);if (size != -1) {setSize.accept(size);}//特定文档如何与查询匹配的详细信息和解释if (request.hasParam("explain")) {searchSourceBuilder.explain(request.paramAsBoolean("explain", null));}//带版本号查询if (request.hasParam("version")) {searchSourceBuilder.version(request.paramAsBoolean("version", null));}//是否返回seqNo和primaryTermif (request.hasParam("seq_no_primary_term")) {searchSourceBuilder.seqNoAndPrimaryTerm(request.paramAsBoolean("seq_no_primary_term", null));}//超时时间if (request.hasParam("timeout")) {searchSourceBuilder.timeout(request.paramAsTime("timeout", null));}//收集后终止搜索if (request.hasParam("terminate_after")) {int terminateAfter = request.paramAsInt("terminate_after",SearchContext.DEFAULT_TERMINATE_AFTER);if (terminateAfter < 0) {throw new IllegalArgumentException("terminateAfter must be > 0");} else if (terminateAfter > 0) {searchSourceBuilder.terminateAfter(terminateAfter);}}//指示应如何获取存储的字段StoredFieldsContext storedFieldsContext =StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request);if (storedFieldsContext != null) {searchSourceBuilder.storedFields(storedFieldsContext);}//从doc_value字段查询String sDocValueFields = request.param("docvalue_fields");if (sDocValueFields != null) {if (Strings.hasText(sDocValueFields)) {String[] sFields = Strings.splitStringByCommaToArray(sDocValueFields);for (String field : sFields) {searchSourceBuilder.docValueField(field, null);}}}//从_source获取结果FetchSourceContext fetchSourceContext = FetchSourceContext.parseFromRestRequest(request);if (fetchSourceContext != null) {searchSourceBuilder.fetchSource(fetchSourceContext);}//在排序时应用,并控制是否也会跟踪分数if (request.hasParam("track_scores")) {searchSourceBuilder.trackScores(request.paramAsBoolean("track_scores", false));}//如果值未设置,则返回应跟踪的总命中数或 nullif (request.hasParam("track_total_hits")) {if (Booleans.isBoolean(request.param("track_total_hits"))) {searchSourceBuilder.trackTotalHits(request.paramAsBoolean("track_total_hits", true));} else {searchSourceBuilder.trackTotalHitsUpTo(request.paramAsInt("track_total_hits", SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO));}}//排序字段String sSorts = request.param("sort");if (sSorts != null) {String[] sorts = Strings.splitStringByCommaToArray(sSorts);for (String sort : sorts) {int delimiter = sort.lastIndexOf(":");if (delimiter != -1) {String sortField = sort.substring(0, delimiter);String reverse = sort.substring(delimiter + 1);if ("asc".equals(reverse)) {searchSourceBuilder.sort(sortField, SortOrder.ASC);} else if ("desc".equals(reverse)) {searchSourceBuilder.sort(sortField, SortOrder.DESC);}} else {searchSourceBuilder.sort(sort);}}}//此请求将汇总的统计信息组String sStats = request.param("stats");if (sStats != null) {searchSourceBuilder.stats(Arrays.asList(Strings.splitStringByCommaToArray(sStats)));}//推荐词String suggestField = request.param("suggest_field");if (suggestField != null) {String suggestText = request.param("suggest_text", request.param("q"));int suggestSize = request.paramAsInt("suggest_size", 5);String suggestMode = request.param("suggest_mode");searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(suggestField,termSuggestion(suggestField).text(suggestText).size(suggestSize).suggestMode(SuggestMode.resolve(suggestMode))));}}
创建rest client开始发送请求
return channel -> {RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));};
执行execute方法
public < Request extends ActionRequest,Response extends ActionResponse> Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {//获取transportAction开始执行return transportAction(action).execute(request, listener);}
最终调用到TransportSearchAction的doExecute方法
search操作会发送请求到索引下的所有分片,相同分片只会发送到主分片或副本分片
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {//服务启动相对时间final long relativeStartNanos = System.nanoTime();final SearchTimeProvider timeProvider =new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {if (source != searchRequest.source()) {// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch// situations when source is rewritten to null due to a bugsearchRequest.source(source);}//对其他集群的索引和当前集群的索引分组,合并本集群shard列表和远程集群的shardfinal ClusterState clusterState = clusterService.state();final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);//执行本集群搜索if (remoteClusterIndices.isEmpty()) {executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener);} else {//最小化查询次数if (shouldMinimizeRoundtrips(searchRequest)) {ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider,searchService.aggReduceContextBuilder(searchRequest),remoteClusterService, threadPool, listener,(r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));} else {AtomicInteger skippedClusters = new AtomicInteger(0);//收集所有需要查询的分片collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,ActionListener.wrap(//查询分片返回的结果searchShardsResponses -> {List<SearchShardIterator> remoteShardIterators = new ArrayList<>();Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);//查询集群数int localClusters = localIndices == null ? 0 : 1;int totalClusters = remoteClusterIndices.size() + localClusters;int successfulClusters = searchShardsResponses.size() + localClusters;//执行查询executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));},listener::onFailure));}}}, listener::onFailure);if (searchRequest.source() == null) {rewriteListener.onResponse(searchRequest.source());} else {Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),rewriteListener);}}
这里收集本集群和跨集群及其他集群需要查询的索引,这里我们主要是分析本集群搜索
ES中查询主要包括两个方式:
- QUERY_THEN_FETCH:默认搜索方式, 先向所有的 shard 发出请求, 各分片只返回文档 id和排
名相关的信息及文档的打分然后按照各分片返回的文档的分数进行重新排序和排名, 取前
size 个文档。然后根据文档 id 去相关的 shard 取 document。 数据量是准确的,数据排名不准确 - DFS_QUERY_THEN_FETCH:在上面的查询方式之前多个一个DFS的过程,也就是在进行查询之前, 先对所有分片发送请求, 把所有分片中的词频和文档频率等打分依据全部汇总到一块, 再执行后面的操作,数据排名准确,但是性能比上面性能要差
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,SearchResponse.Clusters clusters) {clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead// of just for the _search apifinal Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider);Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),searchRequest.indices());routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);String[] concreteIndices = new String[indices.length];for (int i = 0; i < indices.length; i++) {concreteIndices[i] = indices[i].getName();}//每个节点查询次数Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);//合并需要查询的shard分片GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,searchRequest.getLocalClusterAlias(), remoteShardIterators);//检查需要查询分片数量是否超过阈值failIfOverShardCountLimit(clusterService, shardIterators.size());Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);// optimize search type for cases where there is only one shard group to search on//针对只有一个分片组要搜索的情况优化搜索类型if (shardIterators.size() == 1) {// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shardsearchRequest.searchType(QUERY_THEN_FETCH);}if (searchRequest.allowPartialSearchResults() == null) {// No user preference defined in search request - apply cluster service default//允许部分搜索结果返回searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());}if (searchRequest.isSuggestOnly()) {// disable request cache if we have only suggest// 如果我们只有建议,则禁用请求缓存searchRequest.requestCache(false);switch (searchRequest.searchType()) {case DFS_QUERY_THEN_FETCH:// convert to Q_T_F if we have only suggestsearchRequest.searchType(QUERY_THEN_FETCH);break;}}final DiscoveryNodes nodes = clusterState.nodes();BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),nodes::get, remoteConnections, searchTransportService::getConnection);//提前过滤分片boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, indices, shardIterators.size());//异步查询searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState,Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();}
根据两个搜索方式创建不同的action
AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction;switch (searchRequest.searchType()) {case DFS_QUERY_THEN_FETCH:searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,shardIterators, timeProvider, clusterState, task, clusters);break;case QUERY_THEN_FETCH:searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, /*查询后的回调*/listener,shardIterators, timeProvider, clusterState, task, clusters);break;default:throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");}return searchAsyncAction;
创建action后开始执行
public final void start() {if (getNumShards() == 0) {//no search shards to search on, bail with empty response//(it happens with search across _all with no indices around and consistent with broadcast operations)int trackTotalHitsUpTo = request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :request.source().trackTotalHitsUpTo() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :request.source().trackTotalHitsUpTo();// total hits is null in the response if the tracking of total hits is disabledboolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED;listener.onResponse(new SearchResponse(InternalSearchResponse.empty(withTotalHits), null, 0, 0, 0, buildTookInMillis(),ShardSearchFailure.EMPTY_ARRAY, clusters));return;}//执行搜索executePhase(this);
}private void executePhase(SearchPhase phase) {try {phase.run();} catch (Exception e) {if (logger.isDebugEnabled()) {logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);}onPhaseFailure(phase, "", e);}
}
遍历所有分片发送请求,并设置回调函数接收分片返回结果
public final void run() {for (final SearchShardIterator iterator : toSkipShardsIts) {assert iterator.skip();skipShard(iterator);}if (shardsIts.size() > 0) {...for (int index = 0; index < shardsIts.size(); index++) {//分片路由信息final SearchShardIterator shardRoutings = shardsIts.get(index);assert shardRoutings.skip() == false;//分片执行performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());}}}private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {if (shard == null) {fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));} else {//节流并发请求final PendingExecutions pendingExecutions = throttleConcurrentRequests ?pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode)): null;Runnable r = () -> {final Thread thread = Thread.currentThread();try {//执行查询executePhaseOnShard(shardIt, shard,new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {@Overridepublic void innerOnResponse(Result result) {try {//分片返回结果onShardResult(result, shardIt);} finally {executeNext(pendingExecutions, thread);}}@Overridepublic void onFailure(Exception t) {try {onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);} finally {executeNext(pendingExecutions, thread);}}});} catch (final Exception e) {try { fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));} finally {executeNext(pendingExecutions, thread);}}};//是否限流if (throttleConcurrentRequests) {pendingExecutions.tryRun(r);} else {r.run();}}}
默认为QUERY_THEN_FETCH方式,调用SearchQueryThenFetchAsyncAction的executePhaseOnShard方法
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,final SearchActionListener<SearchPhaseResult> listener) {//封装请求ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),request, getTask(), listener);}
获取连接然后发送action为indices:data/read/search[phase/query]的RPC请求
public void sendExecuteQuery(Transport.Connection connection, final ShardSearchRequest request, SearchTask task,final SearchActionListener<SearchPhaseResult> listener) {// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request// this used to be the QUERY_AND_FETCH which doesn't exist anymore.final boolean fetchDocuments = request.numberOfShards() == 1;Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;final ActionListener handler = responseWrapper.apply(connection, listener);//发送查询transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId()));}
- 数据节点
数据节点对应的RPC注册的处理handler
//注册分片查询处理transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new,(request, channel, task) -> {//执行查询searchService.executeQueryPhase(request, (SearchShardTask) task,/*查询后回调*/new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request));});
数据节点接收RPC请求执行query阶段
public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener<SearchPhaseResult> listener) {assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1: "empty responses require more than one shard";//索引服务IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//索引分片IndexShard shard = indexService.getShard(request.shardId().id());rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {@Overridepublic void onResponse(ShardSearchRequest orig) {//如果没有查询到doc可以返回nullif (orig.canReturnNullResponseIfMatchNoDocs()) { ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig);try (Engine.Searcher searcher = shard.acquireCanMatchSearcher()) {QueryShardContext context = indexService.newQueryShardContext(canMatchRequest.shardId().id(), searcher,canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias());Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true);} catch (Exception exc) {listener.onFailure(exc);return;}if (canRewriteToMatchNone(canMatchRequest.source())&& canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) {assert canMatchRequest.scroll() == null : "must always create search context for scroll requests";listener.onResponse(QuerySearchResult.nullInstance());return;}}// fork the execution in the search thread pool//在搜索线程池中执行runAsync(shard, () -> executeQueryPhase(orig, task), listener);}@Overridepublic void onFailure(Exception exc) {listener.onFailure(exc);}});}
执行search操作
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task) throws Exception {final SearchContext context = createAndPutContext(request, task);context.incRef();try {final long afterQueryTime;try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {contextProcessing(context);//执行查询阶段loadOrExecuteQueryPhase(request, context);if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {freeContext(context.id());} else {contextProcessedSuccessfully(context);}afterQueryTime = executor.success();}//如果只有一个分片,执行fetch阶段if (request.numberOfShards() == 1) {return executeFetchPhase(context, afterQueryTime);}return context.queryResult();} catch (Exception e) {// execution exception can happen while loading the cache, strip itif (e instanceof ExecutionException) {e = (e.getCause() == null || e.getCause() instanceof Exception) ?(Exception) e.getCause() : new ElasticsearchException(e.getCause());}logger.trace("Query phase failed", e);processFailure(context, e);throw e;} finally {cleanContext(context);}}
创建查询上下文对象SearchContext跟踪整个search流程,并且查询后的结果也会填充到SearchContext中
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {final boolean canCache = indicesService.canCache(request, context);context.getQueryShardContext().freezeContext();if (canCache) {indicesService.loadIntoContext(request, context, queryPhase);} else {//执行查询queryPhase.execute(context);}}public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {if (searchContext.hasOnlySuggest()) {suggestPhase.execute(searchContext);searchContext.queryResult().topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),new DocValueFormat[0]);return;}if (LOGGER.isTraceEnabled()) {LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));}//尽可能晚地预处理聚合。在 DFS_Q_T_F 的情况下// 请求,preProcess 在 DFS 阶段调用,这就是我们预处理它们的原因//这里是为了确保它发生在QUERY阶段aggregationPhase.preProcess(searchContext);//lucene搜索boolean rescore = executeInternal(searchContext);if (rescore) { // only if we do a regular search//重新计算打分rescorePhase.execute(searchContext);}//纠错suggestPhase.execute(searchContext);//聚合aggregationPhase.execute(searchContext);if (searchContext.getProfilers() != null) {ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());searchContext.queryResult().profileResults(shardResults);}}
这里可以看到首先会执行lucene的查询,然后对查询结果进行打分、执行聚合逻辑,然后生成SearchPhaseResult返回
数据节点查询数据后返回协调节点,我们继续回到executePhaseOnShard方法执行回调
//执行查询
executePhaseOnShard(shardIt, shard,new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {@Overridepublic void innerOnResponse(Result result) {try {//分片返回结果onShardResult(result, shardIt);} finally {executeNext(pendingExecutions, thread);}}@Overridepublic void onFailure(Exception t) {try {onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);} finally {executeNext(pendingExecutions, thread);}}});
保存分片查询返回的数据到results数组中并进行计数
protected void onShardResult(Result result, SearchShardIterator shardIt) {assert result.getShardIndex() != -1 : "shard index is not set";assert result.getSearchShardTarget() != null : "search shard target must not be null";successfulOps.incrementAndGet();results.consumeResult(result);hasShardResponse.set(true);if (logger.isTraceEnabled()) {logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);}AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();if (shardFailures != null) {shardFailures.set(result.getShardIndex(), null);}successfulShardExecution(shardIt);}
判断所有分片查询是否都已经返回,如果都已经返回则执行下一阶段及fetch阶段
private void successfulShardExecution(SearchShardIterator shardsIt) {final int remainingOpsOnIterator;if (shardsIt.skip()) {remainingOpsOnIterator = shardsIt.remaining();} else {remainingOpsOnIterator = shardsIt.remaining() + 1;}final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);//所有分片都已经返回if (xTotalOps == expectedTotalOps) {onPhaseDone();} else if (xTotalOps > expectedTotalOps) {throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["+ expectedTotalOps + "]");}}final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()executeNextPhase(this, getNextPhase(results, this));}@Overrideprotected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {//获取数据阶段return new FetchSearchPhase(results, searchPhaseController, context, clusterState());}FetchSearchPhase(SearchPhaseResults<SearchPhaseResult> resultConsumer,SearchPhaseController searchPhaseController,SearchPhaseContext context,ClusterState clusterState) {this(resultConsumer, searchPhaseController, context, clusterState,//最后收尾阶段(response, scrollId) -> new ExpandSearchPhase(context, response, scrollId));}
- fetch阶段
@Overridepublic void run() {context.execute(new AbstractRunnable() {@Overrideprotected void doRun() throws Exception {// we do the heavy lifting in this inner run method where we reduce aggs etc. that's why we fork this phase// off immediately instead of forking when we send back the response to the user since there we only need// to merge together the fetched results which is a linear operation.innerRun();}@Overridepublic void onFailure(Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}});}private void innerRun() throws IOException {//查询分片数量final int numShards = context.getNumShards();//是否是滚动查询final boolean isScrollSearch = context.getRequest().scroll() != null;//search阶段返回结果final List<SearchPhaseResult> phaseResults = queryResults.asList();final String scrollId;//如果是滚动查询则生成scrollIdif (isScrollSearch) {final boolean includeContextUUID = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_7_7_0);scrollId = TransportSearchHelper.buildScrollId(queryResults, includeContextUUID);} else {scrollId = null;}final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();final boolean queryAndFetchOptimization = queryResults.length() == 1;//fetch结束后执行任务final Runnable finishPhase = ()-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?queryResults : fetchResults);if (queryAndFetchOptimization) {assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()+ "], single result: " + phaseResults.get(0).fetchResult();// query AND fetch optimizationfinishPhase.run();} else {ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs);// no docs to fetch -- sidestep everything and return//无文档执行fetchif (scoreDocs.length == 0) {// we have to release contexts here to free up resourcesphaseResults.stream().map(SearchPhaseResult::queryResult).forEach(this::releaseIrrelevantSearchContext);finishPhase.run();} else {final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards): null;final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or notfinishPhase, context);for (int i = 0; i < docIdsToLoad.length; i++) {IntArrayList entry = docIdsToLoad[i];SearchPhaseResult queryResult = queryResults.get(i);if (entry == null) { // no results for this shard IDif (queryResult != null) {// if we got some hits from this shard we have to release the context there// we do this as we go since it will free up resources and passing on the request on the// transport layer is cheap.releaseIrrelevantSearchContext(queryResult.queryResult());progressListener.notifyFetchResult(i);}// in any case we count down this result since we don't talk to this shard anymorecounter.countDown();} else {SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),searchShardTarget.getNodeId());//创建fetch请求ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getContextId(), i, entry,lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());//执行fetchexecuteFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),connection);}}}}}
创建fetch请求,执行fetch
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,final CountedCollector<FetchSearchResult> counter,final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,final Transport.Connection connection) {//发送fetch请求context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {@Overridepublic void innerOnResponse(FetchSearchResult result) {try {progressListener.notifyFetchResult(shardIndex);counter.onResult(result);} catch (Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}}@Overridepublic void onFailure(Exception e) {try {logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.contextId()), e);progressListener.notifyFetchFailure(shardIndex, shardTarget, e);counter.onFailure(shardIndex, shardTarget, e);} finally {releaseIrrelevantSearchContext(querySearchResult);}}});}
发送RPC为indices:data/read/search[phase/fetch/id]的fetch请求
public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task,final SearchActionListener<FetchSearchResult> listener) {sendExecuteFetch(connection, FETCH_ID_ACTION_NAME, request, task, listener);}
数据节点注册的handler
//fetch请求transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,(request, channel, task) -> {//执行请求searchService.executeFetchPhase(request, (SearchShardTask) task,new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request));});
- 数据节点执行fetch阶段
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {runAsync(request.contextId(), () -> {final SearchContext context = findContext(request.contextId(), request);context.incRef();try {context.setTask(task);contextProcessing(context);if (request.lastEmittedDoc() != null) {context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();}context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) {//fetch查询fetchPhase.execute(context);if (fetchPhaseShouldFreeContext(context)) {freeContext(request.contextId());} else {contextProcessedSuccessfully(context);}executor.success();}//返回结果return context.fetchResult();} catch (Exception e) {logger.trace("Fetch phase failed", e);processFailure(context, e);throw e;} finally {cleanContext(context);}}, listener);}
遍历文档id集合,查询详细信息
@Overridepublic void execute(SearchContext context) {if (LOGGER.isTraceEnabled()) {LOGGER.trace("{}", new SearchContextSourcePrinter(context));}final FieldsVisitor fieldsVisitor;Map<String, Set<String>> storedToRequestedFields = new HashMap<>();StoredFieldsContext storedFieldsContext = context.storedFieldsContext();if (storedFieldsContext == null) {// no fields specified, default to return source if no explicit indicationif (!context.hasScriptFields() && !context.hasFetchSourceContext()) {context.fetchSourceContext(new FetchSourceContext(true));}fieldsVisitor = new FieldsVisitor(context.sourceRequested());} else if (storedFieldsContext.fetchFields() == false) {// disable stored fields entirelyfieldsVisitor = null;} else {for (String fieldNameOrPattern : context.storedFieldsContext().fieldNames()) {if (fieldNameOrPattern.equals(SourceFieldMapper.NAME)) {FetchSourceContext fetchSourceContext = context.hasFetchSourceContext() ? context.fetchSourceContext(): FetchSourceContext.FETCH_SOURCE;context.fetchSourceContext(new FetchSourceContext(true, fetchSourceContext.includes(), fetchSourceContext.excludes()));continue;}Collection<String> fieldNames = context.mapperService().simpleMatchToFullName(fieldNameOrPattern);for (String fieldName : fieldNames) {MappedFieldType fieldType = context.smartNameFieldType(fieldName);if (fieldType == null) {// Only fail if we know it is a object field, missing paths / fields shouldn't fail.if (context.getObjectMapper(fieldName) != null) {throw new IllegalArgumentException("field [" + fieldName + "] isn't a leaf field");}} else {String storedField = fieldType.name();Set<String> requestedFields = storedToRequestedFields.computeIfAbsent(storedField, key -> new HashSet<>());requestedFields.add(fieldName);}}}boolean loadSource = context.sourceRequested();if (storedToRequestedFields.isEmpty()) {// empty list specified, default to disable _source if no explicit indicationfieldsVisitor = new FieldsVisitor(loadSource);} else {fieldsVisitor = new CustomFieldsVisitor(storedToRequestedFields.keySet(), loadSource);}}try {SearchHit[] hits = new SearchHit[context.docIdsToLoadSize()];FetchSubPhase.HitContext hitContext = new FetchSubPhase.HitContext();for (int index = 0; index < context.docIdsToLoadSize(); index++) {if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}//文档idint docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());LeafReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);int subDocId = docId - subReaderContext.docBase;final SearchHit searchHit;int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);if (rootDocId != -1) {searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId,storedToRequestedFields, subReaderContext);} else {searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId,storedToRequestedFields, subReaderContext);}hits[index] = searchHit;hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher());//执行各种元数据获取for (FetchSubPhase fetchSubPhase : fetchSubPhases) {fetchSubPhase.hitExecute(context, hitContext);}}if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}//执行各种元数据获取for (FetchSubPhase fetchSubPhase : fetchSubPhases) {fetchSubPhase.hitsExecute(context, hits);if (context.isCancelled()) {throw new TaskCancelledException("cancelled");}}TotalHits totalHits = context.queryResult().getTotalHits();context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));} catch (IOException e) {throw ExceptionsHelper.convertToElastic(e);}}private SearchHit createSearchHit(SearchContext context,FieldsVisitor fieldsVisitor,int docId,int subDocId,Map<String, Set<String>> storedToRequestedFields,LeafReaderContext subReaderContext) {DocumentMapper documentMapper = context.mapperService().documentMapper();Text typeText = documentMapper.typeText();if (fieldsVisitor == null) {return new SearchHit(docId, null, typeText, null, null);}//查询lucene保存的字段Map<String, DocumentField> searchFields = getSearchFields(context, fieldsVisitor, subDocId,storedToRequestedFields, subReaderContext);Map<String, DocumentField> metaFields = new HashMap<>();Map<String, DocumentField> documentFields = new HashMap<>();SearchHit.splitFieldsByMetadata(searchFields, documentFields, metaFields);SearchHit searchHit = new SearchHit(docId, fieldsVisitor.uid().id(), typeText, documentFields, metaFields);// Set _source if requested.SourceLookup sourceLookup = context.lookup().source();sourceLookup.setSegmentAndDocument(subReaderContext, subDocId);if (fieldsVisitor.source() != null) {sourceLookup.setSource(fieldsVisitor.source());}return searchHit;}
数据节点fetch数据后返回,协调节点。继续回到executeFetch方法的回调函数
@Override
public void innerOnResponse(FetchSearchResult result) {try {progressListener.notifyFetchResult(shardIndex);counter.onResult(result);} catch (Exception e) {context.onPhaseFailure(FetchSearchPhase.this, "", e);}
}
将返回的数据填充到fetchResults中,最后执行finish阶段
final Runnable finishPhase = ()-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?queryResults : fetchResults);private void moveToNextPhase(SearchPhaseController searchPhaseController,String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);context.executeNextPhase(this, nextPhaseFactory.apply(internalResponse, scrollId));}
执行ExpandSearchPhase的run方法
@Overridepublic void run() {if (isCollapseRequest() && searchResponse.hits().getHits().length > 0) {SearchRequest searchRequest = context.getRequest();CollapseBuilder collapseBuilder = searchRequest.source().collapse();final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();MultiSearchRequest multiRequest = new MultiSearchRequest();if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());}for (SearchHit hit : searchResponse.hits().getHits()) {BoolQueryBuilder groupQuery = new BoolQueryBuilder();Object collapseValue = hit.field(collapseBuilder.getField()).getValue();if (collapseValue != null) {groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));} else {groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));}QueryBuilder origQuery = searchRequest.source().query();if (origQuery != null) {groupQuery.must(origQuery);}for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder).query(groupQuery).postFilter(searchRequest.source().postFilter());SearchRequest groupRequest = new SearchRequest(searchRequest);groupRequest.source(sourceBuilder);multiRequest.add(groupRequest);}}context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(),ActionListener.wrap(response -> {Iterator<MultiSearchResponse.Item> it = response.iterator();for (SearchHit hit : searchResponse.hits.getHits()) {for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {MultiSearchResponse.Item item = it.next();if (item.isFailure()) {context.onPhaseFailure(this, "failed to expand hits", item.getFailure());return;}SearchHits innerHits = item.getResponse().getHits();if (hit.getInnerHits() == null) {hit.setInnerHits(new HashMap<>(innerHitBuilders.size()));}hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);}}context.sendSearchResponse(searchResponse, scrollId);}, context::onFailure));} else {//返回查询结果context.sendSearchResponse(searchResponse, scrollId);}}
返回查询结果
@Overridepublic void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {ShardSearchFailure[] failures = buildShardFailures();Boolean allowPartialResults = request.allowPartialSearchResults();assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";if (allowPartialResults == false && failures.length > 0){raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));} else {listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId, failures));}}
这里的listener就是new RestStatusToXContentListener<>(channel)
@Overridepublic RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {assert response.isFragment() == false; //would be nice if we could make default methods finalresponse.toXContent(builder, channel.request());RestResponse restResponse = new BytesRestResponse(response.status(), builder);if (RestStatus.CREATED == restResponse.status()) {final String location = extractLocation.apply(response);if (location != null) {restResponse.addHeader("Location", location);}}return restResponse;}@Overridepublic final RestResponse buildResponse(Response response) throws Exception {return buildResponse(response, channel.newBuilder());}protected final void processResponse(Response response) throws Exception {channel.sendResponse(buildResponse(response));}
最后通过channel将结果返回客户端
相关文章:
elasticsearch源码分析-08Serch查询流程
Serch查询流程 查询请求Rest路由注册也是在actionModule中 //查询操作 registerHandler.accept(new RestSearchAction());Override public List<Route> routes() {return unmodifiableList(asList(new Route(GET, "/_search"),new Route(POST, "/_searc…...

【协作提效 Go - gin ! swagger】
什么是swagger Swagger 是一个用于设计、构建、记录和使用 RESTful Web 服务的工具集。它的主要作用包括: API 文档生成:Swagger 可以自动生成详细的 API 文档,包括每个端点的请求和响应格式、参数、状态码等。这使得开发者和用户可以轻松理…...
栈和队列——3.滑动窗口最大值
力扣题目链接 给定一个数组 nums,有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回滑动窗口中的最大值。 示例: 输入:nums[1,3,-1,-3,5,3,6,7],k 3 …...
嵌入式智能手表开发系列文章之开篇
不好意思,朋友们,我回来了。想想已经断更了好久了。在这段断更的日子里。开拓了个新领域,不搞android 产品,而是去搞嵌入式智能手表啦。 接下来我会用几篇文章来介绍下我对这个领域的看法体会,以及我自己所负责领域的…...
24.8.2数据结构|双链表
双链表 1、定义结构:2个指针域、数据域 2、初始化:创建一个含有N个结点的带头结点双链表head (双链表头结点的前驱与和尾节点的后继与置为空) 3、求表长:返回双链表head的长度 4、取元素:取出双链表head中…...

RabbitMQ高级特性 - 事务消息
文章目录 RabbitMQ 事务消息概述实现原理代码实现不采用事务采用事务 RabbitMQ 事务消息 概述 RabbitMQ 的 AMQP 协议实现了事务机制,允许开发者保证消息的发送和接收时原子性的,也就是说,要么消息全都发送成功,要么全都发送失败…...
leetcode:心算挑战
题目: 心算项目的挑战比赛中,要求选手从N张卡牌中选出cnt张卡牌,若这cnt张卡牌数字总和为偶数,则选手成绩「有效」且得分为cnt张卡牌数字总和。给定数组cards和cnt,其中cards[i]表示第i张卡牌上的数字。 请帮参赛选手计…...

docker部署java项目(war包方式)
场景描述:java项目war包,在开发开电脑上使用dockerfile构建镜像,上传镜像到客户服务器中使用docker加载docker镜像,然后部署。 目录 一、本地环境安装 docker git 二、服务器环境安装 docker 三、构建docker镜像(win系统) 四、注意事项 (1)系统架构 (2)使…...

jsp 自定义taglib
一、简介 我们在javaWeb开发中,经常会用到jsp的taglib标签,有时候并不能满足我们的实际需要,这就需要我们自定义taglib标签, 二、开发步骤 1、编写control方法,继承BodyTagSupport 2、定义zdytaglib.tld标签文件 3、…...

从一到无穷大 #32 TimeCloth,云上的快速 Point-in-Time Recovery
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。 文章目录 引言解决方案FAST FINE-GRAINED PITRLog FilterInter-Record Dependency ResolutionL…...
时间序列论文1——Forecasting at Scale
目录 0. AI总结0.1 文章概述0.2 研究背景0.3 研究思路0.4 研究结论与讨论1. Introduction2 Features of Business Time Series3 The Prophet Forecasting Model3.1 The Trend Model3.2 Seasonality3.3 Holidays and Events3.4 Model Fitting3.5 Analyst-in-the-Loop Modeling4 …...

HDFS常用命令
HDFS常用命令 1.HDFS命令介绍1.1基本语法格式1.2常用命令 1.HDFS命令介绍 HDFS 提供了一组命令行工具,用于管理和操作 HDFS 文件系统。 1.1基本语法格式 hdfs dfs -<命令> [选项] <参数>1.2常用命令 1.显示<path>指定的文件的详细信息。 had…...

请问如何做好软件测试工作呢?
一、明确测试目标和范围 理解测试目的:在开始测试之前,首先要明确测试的目标和范围,确保测试计划 与需求相匹配。这有助于测试人员聚焦在关键功能上,避免浪费时间和资源。制定详细的测试计划:根据项目需求࿰…...
单片机开发与Linux开发的区别
引言 单片机(MCU)和Linux开发是嵌入式系统领域的两大主要方向。它们在硬件平台、开发环境、应用场景和开发难度上存在显著区别。本文将系统性地比较单片机开发和Linux开发,探讨它们的主要区别及各自的应用场景和难度体系。 一、基本概念 1…...

【机器学习】回归类算法-相关性分析
一、前言 前面的几篇博客我们学习了分类算法,今天我们来了解一下回归类的算法吧。首先我们来谈谈两者有什么区别,首先是我们在之前的分类算法,这类算法可以将让我们学会如何将不同的数据划分到不同的类里面,输出的是一些离散的值。…...

java基础 之 集合与栈的使用(三)
文章目录 Map接口(一)实现类:HashMap特点HashMap集合的一些方法 (二)实现类: TreeMap特点【自然排序】代码【定制排序】代码TreeMap集合的一些方法 HashMap 和 TreeMap的区别 前文回顾: 戳这里 …...

JDK-java.nio包详解
JDK-java.nio包详解 概述 一直以来Java三件套(集合、io、多线程)都是最热门的Java基础技术点,我们要深入掌握好这三件套才能在日常开发中得心应手,之前有编写集合相关的文章,这里出一篇文章来梳理一下io相关的知识点。…...
虚拟机与服务器的区别是什么?虚拟机与服务器的区别和联系
服务器和虚拟机是两个不同的概念,它们在计算机领域有着不同的含义和作用。今天飞飞就和你分享虚拟机和服务器的区别和联系,希望可以帮助到你~ 1、物理形态 a)服务器是实实在在的物理设备,拥有独立的硬件架构。如CPU、硬盘、内存等 b)虚拟机…...
Linux CentOS stream9 命令
初学linux,对字符界面的命令并不陌生。问到什么是linux命令直接答cd、pwd、ls是linux命令。对于命令的定义并熟悉,也不太关心命令的底层执行逻辑,更关心录入命令,马上获取需要的结果。 本文就命令的定义、分类或执行优先级作一简单介绍。 一、定义 搜索网上对linux命令的…...

JavaScript基础——JavaScript变量声明
变量是存储数据的容器,可以变的量,值可以改变,在JavaScript中,变量声明的关键字有var、let,其中,var是ES5的语法,let是ES6的语法,变量需要先声明,在使用。 声明一个age变…...

大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

华硕a豆14 Air香氛版,美学与科技的馨香融合
在快节奏的现代生活中,我们渴望一个能激发创想、愉悦感官的工作与生活伙伴,它不仅是冰冷的科技工具,更能触动我们内心深处的细腻情感。正是在这样的期许下,华硕a豆14 Air香氛版翩然而至,它以一种前所未有的方式&#x…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式
今天是关于AI如何在教学中增强学生的学习体验,我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育,这并非炒作,而是已经发生的巨大变革。教育机构和教育者不能忽视它,试图简单地禁止学生使…...

push [特殊字符] present
push 🆚 present 前言present和dismiss特点代码演示 push和pop特点代码演示 前言 在 iOS 开发中,push 和 present 是两种不同的视图控制器切换方式,它们有着显著的区别。 present和dismiss 特点 在当前控制器上方新建视图层级需要手动调用…...
Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换
目录 关键点 技术实现1 技术实现2 摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车…...

宇树科技,改名了!
提到国内具身智能和机器人领域的代表企业,那宇树科技(Unitree)必须名列其榜。 最近,宇树科技的一项新变动消息在业界引发了不少关注和讨论,即: 宇树向其合作伙伴发布了一封公司名称变更函称,因…...
「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案
在移动互联网营销竞争白热化的当下,推客小程序系统凭借其裂变传播、精准营销等特性,成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径,助力开发者打造具有市场竞争力的营销工具。 一、系统核心功能架构&…...
学习一下用鸿蒙DevEco Studio HarmonyOS5实现百度地图
在鸿蒙(HarmonyOS5)中集成百度地图,可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API,可以构建跨设备的定位、导航和地图展示功能。 1. 鸿蒙环境准备 开发工具:下载安装 De…...
离线语音识别方案分析
随着人工智能技术的不断发展,语音识别技术也得到了广泛的应用,从智能家居到车载系统,语音识别正在改变我们与设备的交互方式。尤其是离线语音识别,由于其在没有网络连接的情况下仍然能提供稳定、准确的语音处理能力,广…...