当前位置: 首页 > news >正文

【Flink CDC】Flink CDC的Schema Evolution表结构演变的源码分析和流程图

Flink CDC版本:3.2.1
说明:本文从SchemaOperator接收到,表结构变更事件开始,表结构变更事件应由source端产生,本文不讨论。
可以先看流程图,研究源码。
参考文章:
Flink cdc3.0动态变更表结构——源码解析

一、源码解析

以Sink to doris举例:

SchemaOperator

org.apache.flink.cdc.runtime.operators.schema.SchemaOperator
判断是否是SchemaChangeEvent事件,调用processSchemaChangeEvents方法

/**  * This method is guaranteed to not be called concurrently with other methods of the operator. */
@Override  
public void processElement(StreamRecord<Event> streamRecord)  throws InterruptedException, TimeoutException, ExecutionException {  Event event = streamRecord.getValue();  if (event instanceof SchemaChangeEvent) {  // (0)processSchemaChangeEvents((SchemaChangeEvent) event);  } else if (event instanceof DataChangeEvent) {  // (13)processDataChangeEvents(streamRecord, (DataChangeEvent) event);  } else {  throw new RuntimeException("Unknown event type in Stream record: " + event);  }  
}

调用handleSchemaChangeEvent方法:

private void processSchemaChangeEvents(SchemaChangeEvent event)  throws InterruptedException, TimeoutException, ExecutionException {  TableId tableId = event.tableId();  LOG.info(  "{}> Table {} received SchemaChangeEvent {} and start to be blocked.",  subTaskId,  tableId,  event);  handleSchemaChangeEvent(tableId, event);  // Update caches  originalSchema.put(tableId, getLatestOriginalSchema(tableId));  schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId));  List<TableId> optionalRoutedTable = getRoutedTables(tableId);  if (!optionalRoutedTable.isEmpty()) {  tableIdMappingCache  .get(tableId)  .forEach(routed -> evolvedSchema.put(routed, getLatestEvolvedSchema(routed)));  } else {  evolvedSchema.put(tableId, getLatestEvolvedSchema(tableId));  }  
}

handleSchemaChangeEvent调用requestSchemaChange方法,请求修改Schema:

response.isAccepted()就是注册中心接收了此修改需求。进入if后,重点来了:output.collect(new StreamRecord<>(new FlushEvent(tableId))); 。注意这里发送了一个new FlushEvent(tableId)事件,这个事件会在SinkWriter用到,就是通知SinkWriter要执行flush,即把数据刷入到sink端数据库,和jdbc的commit相似。
FlushEvent内容非常简单只有tableId但是其类型是FlushEvent,此类的doc内容是:

  • An {@link Event} from {@code SchemaOperator} to notify {@code DataSinkWriterOperator} that it
  • start flushing.
    也就是FlushEvent作为特殊数据传递事件,接收到此数据的DataSinkWriterOperator会触发其执行flushing操作,也就是将目前收到的所有数据都写入目标数据库。可以理解为:
    schema修改后的数据 --> FlushEvent(新插入) --> schema修改前的数据

发送FlushEvent事件后执行requestSchemaChangeResult方法,此方法是while阻塞的方法,简而言之是等所有writer都完成了FlushEvent前数据的(旧表结构的数据)写入前,一直阻塞不发送新表结构的数据至下游。

最后finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e))); ,简略的说其内部是handler方法中生成的SchemaRegistryRequestHandler#applySchemaChange事件,将原始的SchemaChangeEvent转换成新的数据,还是根据Flink CDC的schema.change.behavior转换,其类型如下:
![[image-20250106113512324.png]]

具体将这些时间发送至下游怎么用暂时没有研究了。

private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent)  throws InterruptedException, TimeoutException {  if (schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION  && schemaChangeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {  // CreateTableEvent should be applied even in EXCEPTION mode  throw new RuntimeException(  String.format(  "Refused to apply schema change event %s in EXCEPTION mode.",  schemaChangeEvent));  }  // The request will block if another schema change event is being handled  SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);  // (1)if (response.isAccepted()) {   // (3)LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId);  output.collect(new StreamRecord<>(new FlushEvent(tableId)));  // (4)List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents();  schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());  // The request will block until flushing finished in each sink writer  SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult();  // (5) List<SchemaChangeEvent> finishedSchemaChangeEvents =  schemaEvolveResponse.getFinishedSchemaChangeEvents();  // Update evolved schema changes based on apply results  finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e)));  } else if (response.isDuplicate()) {  LOG.info(  "{}> Schema change event {} has been handled in another subTask already.",  subTaskId,  schemaChangeEvent);  } else if (response.isIgnored()) {  LOG.info(  "{}> Schema change event {} has been ignored. No schema evolution needed.",  subTaskId,  schemaChangeEvent);  } else {  throw new IllegalStateException("Unexpected response status " + response);  }  
}

requestSchemaChange是一个阻塞的方法(while (true)),发送SchemaChangeRequest直到返回的response不是Busy。可以看到发送的的SchemaChangeRequest

private SchemaChangeResponse requestSchemaChange(  TableId tableId, SchemaChangeEvent schemaChangeEvent)  throws InterruptedException, TimeoutException {  long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;  while (true) {  SchemaChangeResponse response =  sendRequestToCoordinator(  new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId));  if (response.isRegistryBusy()) {  // (2)if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {  LOG.info(  "{}> Schema Registry is busy now, waiting for next request...",  subTaskId);  Thread.sleep(1000);  } else {  throw new TimeoutException("TimeOut when requesting schema change");  }  } else {  return response;  }  }  
}

sendRequestToCoordinator方法是org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway类的,也就Flink的内部类。
实习类有:
(1)org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway
(2)org.apache.flink.runtime.taskexecutor.rpc.RpcTaskOperatorEventGateway
内部具体逻辑暂不深入了解。
其实际发送至 SchemaRegistry#handleEventFromOperator

private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse>  RESPONSE sendRequestToCoordinator(REQUEST request) {  try {  CompletableFuture<CoordinationResponse> responseFuture =  toCoordinator.sendRequestToCoordinator(  getOperatorID(), new SerializedValue<>(request));  return CoordinationResponseUtils.unwrap(responseFuture.get());  } catch (Exception e) {  throw new IllegalStateException(  "Failed to send request to coordinator: " + request.toString(), e);  }  
}

requestSchemaChangeResult执行的操作非常简单,就是等待返回,如果跳出while方法结束,就代表sink端已经完成所有旧数据的flush,在此之前SchemaOperator类不会向下游发送新数据,因为FlushEvent后的数据都是schema变更的后的新数据了。

private SchemaChangeResultResponse requestSchemaChangeResult()  throws InterruptedException, TimeoutException {  CoordinationResponse coordinationResponse =  sendRequestToCoordinator(new SchemaChangeResultRequest());  long nextRpcTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;  while (coordinationResponse instanceof SchemaChangeProcessingResponse) {  // (6) (7)if (System.currentTimeMillis() < nextRpcTimeOutMillis) {  Thread.sleep(1000);  coordinationResponse = sendRequestToCoordinator(new SchemaChangeResultRequest());  } else {  throw new TimeoutException("TimeOut when requesting release upstream");  }  }  return ((SchemaChangeResultResponse) coordinationResponse);  
}

这里的toCoordinator.sendRequestToCoordinator也是使用flink内部的调用过程,暂不做研究。
这个发送过程也是被org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry#handleCoordinationRequest接收了,并在if (request instanceof SchemaChangeResultRequest)内处理其逻辑。

private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse>  RESPONSE sendRequestToCoordinator(REQUEST request) {  try {  CompletableFuture<CoordinationResponse> responseFuture =  toCoordinator.sendRequestToCoordinator(  getOperatorID(), new SerializedValue<>(request));  return CoordinationResponseUtils.unwrap(responseFuture.get());  } catch (Exception e) {  throw new IllegalStateException(  "Failed to send request to coordinator: " + request.toString(), e);  }  
}

SchemaRegistry

org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry

toCoordinator.sendRequestToCoordinator方法就由handleCoordinationRequest接收,进入request instanceof SchemaChangeRequest中的handleSchemaChangeRequest方法。

@Override  
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(  CoordinationRequest request) {  CompletableFuture<CoordinationResponse> responseFuture = new CompletableFuture<>();  runInEventLoop(  () -> {  try {  if (request instanceof SchemaChangeRequest) {  SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;  requestHandler.handleSchemaChangeRequest(  schemaChangeRequest, responseFuture);  } else if (request instanceof SchemaChangeResultRequest) {  requestHandler.getSchemaChangeResult(responseFuture);  } else if (request instanceof GetEvolvedSchemaRequest) {  handleGetEvolvedSchemaRequest(  ((GetEvolvedSchemaRequest) request), responseFuture);  } else if (request instanceof GetOriginalSchemaRequest) {  handleGetOriginalSchemaRequest(  (GetOriginalSchemaRequest) request, responseFuture);  } else {  throw new IllegalArgumentException(  "Unrecognized CoordinationRequest type: " + request);  }  } catch (Throwable t) {  context.failJob(t);  throw t;  }  },  "handling coordination request %s",  request);  return responseFuture;  
}

SchemaRegistry#handleEventFromOperator方法用于处理DataSinkWriterOperator#handleFlushEvent发送而来的FlushSuccessEvent事件。还是使用handler执行具体逻辑:SchemaRegistryRequestHandler#flushSuccess

  
@Override  
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {  runInEventLoop(  () -> {  try {  if (event instanceof FlushSuccessEvent) {  FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;  LOG.info(  "Sink subtask {} succeed flushing for table {}.",  flushSuccessEvent.getSubtask(),  flushSuccessEvent.getTableId().toString());  requestHandler.flushSuccess(  flushSuccessEvent.getTableId(),  flushSuccessEvent.getSubtask(),  currentParallelism);  } else if (event instanceof SinkWriterRegisterEvent) {  requestHandler.registerSinkWriter(  ((SinkWriterRegisterEvent) event).getSubtask());  } else {  throw new FlinkException("Unrecognized Operator Event: " + event);  }  } catch (Throwable t) {  context.failJob(t);  throw t;  }  },  "handling event %s from subTask %d",  event,  subtask);  
}

SchemaRegistryRequestHandler

SchemaRegistryRequestHandler是SchemaRegistry的执行器,类中schemaChangeStatus是自己的状态记录状态的。
pendingSubTaskIds是记录待处理的任务id的,即数据流ID,是含有一个任务所有的并行度的子任务ID。
此处:
(1)pendingSubTaskIds空 -> 继续执行
(2)requestSubTaskId和发送过来的一样,则为移除头一个。
(3)其他pendingSubTaskIds不为空情形,则直接返回SchemaChangeResponse.busy(),此处的busy就和SchemaOperator的response.isRegistryBusy()对应上了。
继续执行:
calculateDerivedSchemaChangeEvents方法是对事件作息写转换,根据的是flink的schema evolution的策略进行转换,例如通过返回空集合的方式进行忽略 。

`schema.change.behavior` is of enum type, and could be set to `exception`, `evolve`, `try_evolve`, `lenient` or `ignore`.

而后此handler的状态修改为WAITING_FOR_FLUSH
并返回ResponseCode.ACCEPTED的状态,此时程序跳转回SchemaOperator#handleSchemaChangeEvent方法。

SchemaRegistryRequestHandler#handleSchemaChangeRequest方法:

  
/**  * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing.  * * @param request the received SchemaChangeRequest  */public void handleSchemaChangeRequest(  SchemaChangeRequest request, CompletableFuture<CoordinationResponse> response) {  // We use requester subTask ID as the pending ticket, because there will be at most 1 schema  // change requests simultaneously from each subTask    int requestSubTaskId = request.getSubTaskId();  synchronized (schemaChangeRequestLock) {  // Make sure we handle the first request in the pending list to avoid out-of-order  // waiting and blocks checkpointing mechanism.        if (schemaChangeStatus == RequestStatus.IDLE) {  if (pendingSubTaskIds.isEmpty()) {  LOG.info(  "Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.",  request.getSchemaChangeEvent(),  request.getTableId().toString(),  requestSubTaskId);  } else if (pendingSubTaskIds.get(0) == requestSubTaskId) {  LOG.info(  "Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.",  request.getSchemaChangeEvent(),  request.getTableId().toString(),  requestSubTaskId);  pendingSubTaskIds.remove(0);  } else {  LOG.info(  "Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).",  request.getSchemaChangeEvent(),  request.getTableId().toString(),  requestSubTaskId,  pendingSubTaskIds);  if (!pendingSubTaskIds.contains(requestSubTaskId)) {  pendingSubTaskIds.add(requestSubTaskId);  }  response.complete(wrap(SchemaChangeResponse.busy()));  // (2) return;  }  SchemaChangeEvent event = request.getSchemaChangeEvent();  // If this schema change event has been requested by another subTask, ignore it.  if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {  LOG.info("Event {} has been addressed before, ignoring it.", event);  clearCurrentSchemaChangeRequest();  LOG.info(  "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",  request);  response.complete(wrap(SchemaChangeResponse.duplicate()));  return;  }  schemaManager.applyOriginalSchemaChange(event);  List<SchemaChangeEvent> derivedSchemaChangeEvents =  calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());  // (14)// If this schema change event is filtered out by LENIENT mode or merging table  // route strategies, ignore it.            if (derivedSchemaChangeEvents.isEmpty()) {  LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);  clearCurrentSchemaChangeRequest();  LOG.info(  "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",  request);  response.complete(wrap(SchemaChangeResponse.ignored()));  return;  }  LOG.info(  "SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.");  // This request has been accepted.  schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;  // (3)currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);  response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));  // (3) } else {  LOG.info(  "Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",  request,  requestSubTaskId,  pendingSubTaskIds);  if (!pendingSubTaskIds.contains(requestSubTaskId)) {  pendingSubTaskIds.add(requestSubTaskId);  }  response.complete(wrap(SchemaChangeResponse.busy()));  // (2) }  }  
}

SchemaRegistryRequestHandler#getSchemaChangeResult方法:
内容就是检查类成员变量SchemaRegistryRequestHandler#schemaChangeStatus的状态:

  • FINISHED -> 重置自身状态并返回FINISHED状态
  • 非FINISHED -> 返回Processing状态,SchemaOperator#requestSchemaChangeResult接到SchemaChangeProcessingResponse会在while一直循环等待阻塞。
public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> response) {  Preconditions.checkState(  schemaChangeStatus != RequestStatus.IDLE,  "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");  if (schemaChangeStatus == RequestStatus.FINISHED) {  // (12)schemaChangeStatus = RequestStatus.IDLE;  LOG.info(  "SchemaChangeStatus switched from FINISHED to IDLE for request {}",  currentDerivedSchemaChangeEvents);  // This request has been finished, return it and prepare for the next request  List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();  SchemaChangeResultResponse resultResponse =  new SchemaChangeResultResponse(finishedEvents);  response.complete(wrap(resultResponse));  } else {  // Still working on schema change request, waiting it  response.complete(wrap(new SchemaChangeProcessingResponse()));  }  
}

方法flushSuccess用于处理DataSinkWriterOperator返回的FlushSuccessEvent事件。这里有点不好理解。
activeSinkWriters是记录所有可用的writer的索引,也就是说writer的并行度可能大于1,activeSinkWriters记录的是writer的索引,接收的FlushSuccessEvent只是其中一个writer发送的。需要等待所有writer都完成flush才能确定所有的schema修改前的数据都写入数据库了。
(1)if (activeSinkWriters.size() < parallelism)内的就是上述过程。
(2)if (flushedSinkWriters.equals(activeSinkWriters))代表所有writer都完成了flush。而后修改handler状态为RequestStatus.APPLYING,即此handler正在apply schema change。接下来执行applySchemaChange方法 。

/**  * Record flushed sink subtasks after receiving FlushSuccessEvent. * * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about  * @param sinkSubtask the sink subtask succeed flushing  */public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {  flushedSinkWriters.add(sinkSubtask);  if (activeSinkWriters.size() < parallelism) {  LOG.info(  "Not all active sink writers have been registered. Current {}, expected {}.",  activeSinkWriters.size(),  parallelism);  return;  }  if (flushedSinkWriters.equals(activeSinkWriters)) {  Preconditions.checkState(  schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH,  "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not "  + schemaChangeStatus);  schemaChangeStatus = RequestStatus.APPLYING;  // (9)LOG.info(  "All sink subtask have flushed for table {}. Start to apply schema change.",  tableId.toString());  schemaChangeThreadPool.submit(  () -> applySchemaChange(tableId, currentDerivedSchemaChangeEvents));  }  
}

SchemaRegistryRequestHandler#applySchemaChange方法:
内部主要是schemaManager.applyEvolvedSchemaChange(changeEvent)即执行表结构变更操作,其接口类org.apache.flink.cdc.common.sink.MetadataApplier的doc内容:

  • {@code MetadataApplier} is used to apply metadata changes to external systems.
    可以看到schemaManager至对外部数据执行的表结构变更,其实就是sink端的数据库,其内部一般是收到需要变更的内容,拼接SQL并发送到数据库执行。

最后,修改handler状态为RequestStatus.FINISHED
好像此FlushSuccessEvent没有继续向SchemaOperator继续传递,其实不然,SchemaOperator是不断向SchemaRegistry发送请求的:SchemaOperator#requestSchemaChangeResult
SchemaRegistry是根据handler状态判断返回值类型的
SchemaRegistryRequestHandler#getSchemaChangeResult,此时handler状态已经是RequestStatus.FINISHEDSchemaRegistry就会给CompletableFuture填充非SchemaChangeProcessingResponse了,SchemaOperator类就中断阻塞,继续向下游发送数据了。

  
/**  * Apply the schema change to the external system. * * @param tableId the table need to change schema  * @param derivedSchemaChangeEvents list of the schema changes  */private void applySchemaChange(  TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {  for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {  if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {  if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {  currentIgnoredSchemaChanges.add(changeEvent);  continue;  }  }  if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) {  LOG.info("Ignored schema change {} to table {}.", changeEvent, tableId);  currentIgnoredSchemaChanges.add(changeEvent);  } else {  try {  metadataApplier.applySchemaChange(changeEvent);  LOG.info("Applied schema change {} to table {}.", changeEvent, tableId);  schemaManager.applyEvolvedSchemaChange(changeEvent);  currentFinishedSchemaChanges.add(changeEvent);  } catch (Throwable t) {  LOG.error(  "Failed to apply schema change {} to table {}. Caused by: {}",  changeEvent,  tableId,  t);  if (!shouldIgnoreException(t)) {  currentChangeException = t;  break;  } else {  LOG.warn(  "Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",  changeEvent,  t);  }  }  }  }  Preconditions.checkState(  schemaChangeStatus == RequestStatus.APPLYING,  "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "  + schemaChangeStatus);  schemaChangeStatus = RequestStatus.FINISHED;  LOG.info(  "SchemaChangeStatus switched from APPLYING to FINISHED for request {}.",  currentDerivedSchemaChangeEvents);  
}

SchemaRegistryRequestHandler.RequestStatus类是就handler类状态的类型。具体状态流程可见文档。

// Schema change event state could transfer in the following way:  
//  
//      -------- B --------  
//      |                 |  
//      v                 |  
//  --------           ---------------------  
//  | IDLE | --- A --> | WAITING_FOR_FLUSH |  
//  --------           ---------------------  
//     ^                        |  
//      E                       C  
//       \                      v  
//  ------------          ------------  
//  | FINISHED | <-- D -- | APPLYING |  
//  ------------          ------------  
//  
//  A: When a request came to an idling request handler.  
//  B: When current request is duplicate or ignored by LENIENT / routed table merging  
// strategies.  
//  C: When schema registry collected enough flush success events, and actually started to apply  
// schema changes.  
//  D: When schema change application finishes (successfully or with exceptions)  
//  E: When current schema change request result has been retrieved by SchemaOperator, and ready  
// for the next request.  
private enum RequestStatus {  IDLE,  WAITING_FOR_FLUSH,  APPLYING,  FINISHED  
}

接下来看下:Sink端的事件处理:

DataSinkWriterOperator

org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator

org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator#processElement方法:

重点是对FlushEvent的处理

  
@Override  
public void processElement(StreamRecord<Event> element) throws Exception {  Event event = element.getValue();  // FlushEvent triggers flush  if (event instanceof FlushEvent) {  handleFlushEvent(((FlushEvent) event));  return;  }  // CreateTableEvent marks the table as processed directly  if (event instanceof CreateTableEvent) {  processedTableIds.add(((CreateTableEvent) event).tableId());  this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()  .processElement(element);  return;  }  // Check if the table is processed before emitting all other events, because we have to make  // sure that sink have a view of the full schema before processing any change events,    // including schema changes.    ChangeEvent changeEvent = (ChangeEvent) event;  if (!processedTableIds.contains(changeEvent.tableId())) {  emitLatestSchema(changeEvent.tableId());  processedTableIds.add(changeEvent.tableId());  }  processedTableIds.add(changeEvent.tableId());  this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()  .processElement(element);  
}

handleFlushEvent方法内只有两个操作:

  • flush: 将目前已经接受到所有数据写入目标库(相当于jdbc的commit操作)。
  • 发送事件:发送FlushSuccess。notifyFlushSuccess内容见类SchemaEvolutionClient
private void handleFlushEvent(FlushEvent event) throws Exception {  copySinkWriter.flush(false);  // (8) schemaEvolutionClient.notifyFlushSuccess(  getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());  // (9)
}

SchemaEvolutionClient

org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient

org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient#notifyFlushSuccess方法:
发送了FlushSuccessEvent事件至SchemaRegistry类的handleEventFromOperator方法。

public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException {  toCoordinator.sendOperatorEventToCoordinator(  schemaOperatorID, new SerializedValue<>(new FlushSuccessEvent(subtask, tableId)));  
}

TaskOperatorEventGateway

org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway
SchemaOperatorDataSinkWriterOperator中的toCoordinator都是此类对象。

/*
Gateway to send an OperatorEvent or CoordinationRequest from a Task to the OperatorCoordinator JobManager side.
This is the first step in the chain of sending Operator Events and Requests from Operator to Coordinator. Each layer adds further context, so that the inner layers do not need to know about the complete context, which keeps dependencies small and makes testing easier.OperatorEventGateway takes the event, enriches the event with the OperatorID, and forwards it to:
TaskOperatorEventGateway enriches the event with the ExecutionAttemptID and forwards it to the:
JobMasterOperatorEventGateway which is RPC interface from the TaskManager to the JobManager.
*/
public interface TaskOperatorEventGateway {  /**  * Sends an event from the operator (identified by the given operator ID) to the operator     * coordinator (identified by the same ID).     */    void sendOperatorEventToCoordinator(OperatorID operator, SerializedValue<OperatorEvent> event);  /**  * Sends a request from current operator to a specified operator coordinator which is identified     * by the given operator ID and return the response.     */    CompletableFuture<CoordinationResponse> sendRequestToCoordinator(  OperatorID operator, SerializedValue<CoordinationRequest> request);  
}

MetadataApplier

org.apache.flink.cdc.common.sink.MetadataApplier
此类负责将表结构修改的事件,转化成为DDL,发送给目标sink端数据库执行。

/** {@code MetadataApplier} is used to apply metadata changes to external systems. */  
@PublicEvolving  
public interface MetadataApplier extends Serializable {  /** Apply the given {@link SchemaChangeEvent} to external systems. */  void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException;  // (10) /** Sets enabled schema evolution event types of current metadata applier. */  default MetadataApplier setAcceptedSchemaEvolutionTypes(  Set<SchemaChangeEventType> schemaEvolutionTypes) {  return this;  }  /** Checks if this metadata applier should this event type. */  default boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {  return true;  }  /** Checks what kind of schema change events downstream can handle. */  default Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {  return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet());  }  
}

DorisMetadataApplier

org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier 实现 MetadataApplier

org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier#applySchemaChange
以:

// (10)
@Override  
public void applySchemaChange(SchemaChangeEvent event) throws SchemaEvolveException {  try {  // send schema change op to doris  if (event instanceof CreateTableEvent) {  applyCreateTableEvent((CreateTableEvent) event);  } else if (event instanceof AddColumnEvent) {  applyAddColumnEvent((AddColumnEvent) event);  } else if (event instanceof DropColumnEvent) {  applyDropColumnEvent((DropColumnEvent) event);  } else if (event instanceof RenameColumnEvent) {  applyRenameColumnEvent((RenameColumnEvent) event);  } else if (event instanceof AlterColumnTypeEvent) {  applyAlterColumnTypeEvent((AlterColumnTypeEvent) event);  } else {  throw new UnsupportedSchemaChangeEventException(event);  }  } catch (Exception ex) {  throw new SchemaEvolveException(event, ex.getMessage(), null);  }  
}

applyAddColumnEvent举例说明:
这里仅做一些转换

private void applyAddColumnEvent(AddColumnEvent event)  throws IOException, IllegalArgumentException {  TableId tableId = event.tableId();  List<AddColumnEvent.ColumnWithPosition> addedColumns = event.getAddedColumns();  for (AddColumnEvent.ColumnWithPosition col : addedColumns) {  Column column = col.getAddColumn();  FieldSchema addFieldSchema =  new FieldSchema(  column.getName(),  buildTypeString(column.getType()),  column.getDefaultValueExpression(),  column.getComment());  schemaChangeManager.addColumn(  tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);  }  
}

SchemaChangeManager

org.apache.doris.flink.sink.schema.SchemaChangeManager

org.apache.doris.flink.sink.schema.SchemaChangeManager#addColumn方法:
SchemaChangeHelper是拼接SQL用的。schemaChange方法向数据库发送需要执行的SQL。

public boolean addColumn(String database, String table, FieldSchema field)  throws IOException, IllegalArgumentException {  if (checkColumnExists(database, table, field.getName())) {  LOG.warn(  "The column {} already exists in table {}, no need to add it again",  field.getName(),  table);  return true;  }  String tableIdentifier = getTableIdentifier(database, table);  String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field);  return schemaChange(  database, table, buildRequestParam(false, field.getName()), addColumnDDL);  
}

SchemaChangeHelper

org.apache.doris.flink.sink.schema.SchemaChangeHelper

org.apache.doris.flink.sink.schema.SchemaChangeHelper#buildAddColumnDDL
ADD_DDL字符串模板拼接SQL:

// (11)
private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
public static String buildAddColumnDDL(String tableIdentifier, FieldSchema fieldSchema) {  String name = fieldSchema.getName();  String type = fieldSchema.getTypeString();  String defaultValue = fieldSchema.getDefaultValue();  String comment = fieldSchema.getComment();  StringBuilder addDDL =  new StringBuilder(  String.format(  ADD_DDL,  DorisSchemaFactory.quoteTableIdentifier(tableIdentifier),  DorisSchemaFactory.identifier(name),  type));  if (defaultValue != null) {  addDDL.append(" DEFAULT ").append(DorisSchemaFactory.quoteDefaultValue(defaultValue));  }  commentColumn(addDDL, comment);  return addDDL.toString();  
}

流程总结:

  • SchemaOperator接收到SchemaChangeEvent,发送SchemaChangeRequest至SchemaRegistry。
  • SchemaRegistry内部执行器是SchemaRegistryRequestHandler,简称handler,handler内部持有有状态schemaChangeStatus其判断是否正在执行之前的Request,如果是则返回busy状态。如果不是则返回accept状态。其状态修改由RequestStatus.IDLERequestStatus.WAITING_FOR_FLUSH
  • SchemaOperator如果收到busy状态则sleep后再次发起请求,阻塞直到,收到accept状态,则发送一条FlushEvent至下游,之后发送SchemaChangeResultRequest至SchemaRegistry,等待返回结果如果是SchemaChangeProcessingResponse则认为SchemaChange还没有结束,sleep后再次发起请求,阻塞直至收到非SchemaChangeProcessingResponse。此时阻塞,不再发送新的表结构的数据至下游。
  • SchemaRegistry收到SchemaChangeResultRequest,handler会检查自身状态schemaChangeStatus,如果不是RequestStatus.FINISHED,则返回SchemaChangeProcessingResponse
  • DataSinkWriterOperator收到FlushEvent,并执行flush操作,将所有已经收到的老表结构的数据写入数据库。并发送FlushSuccessEvent给SchemaRegistry。
  • SchemaRegistry的handler收集FlushSuccessEvent,当收到所有的subtask的FlushSuccessEvent后,修改自身状态为RequestStatus.APPLYING。后使用MetadataApplier执行sink端(外)数据库的表结构变更。执行后修改自身状态为RequestStatus.FINISHED
  • 当SchemaOperator再次发送SchemaChangeResultRequest,且SchemaRegistry的handler的状态为RequestStatus.FINISHED,SchemaRegistry返回给其结果为 非SchemaChangeProcessingResponse,SchemaOperator将不再阻塞,开始将新的表结构的数据继续发送至下游。

二、流程图

在这里插入图片描述

下图中的序号已经在源码中表示,可以在源码中搜索。
在这里插入图片描述

相关文章:

【Flink CDC】Flink CDC的Schema Evolution表结构演变的源码分析和流程图

Flink CDC版本&#xff1a;3.2.1 说明&#xff1a;本文从SchemaOperator接收到&#xff0c;表结构变更事件开始&#xff0c;表结构变更事件应由source端产生&#xff0c;本文不讨论。 可以先看流程图&#xff0c;研究源码。 参考文章&#xff1a; Flink cdc3.0动态变更表结构—…...

【智能算法】改进蚁狮优化算法【matlab】

目录 1 主要内容 2 部分程序 3 程序结果 下载链接 1 主要内容 该程序方法复现《改进蚁狮算法的无线传感器网络覆盖优化》两种改进算法模型&#xff0c;即原始ALO算法的基础上添加了两种改进策略&#xff1a; - 改进1&#xff1a;将原先的间断性边界收缩因子变为连续性边界…...

swagger导出json

要将 Swagger(或者 OpenAPI)文档导出为 JSON 文件,通常有几种常见的方法,具体取决于你使用的 Swagger 工具(如 Swagger UI、Swagger Editor、Swagger Hub 等)。下面列出了几种常见的导出 JSON 文件的方法。 1. 通过 Swagger UI 导出 JSON 文件 如果你在使用 Swagger UI…...

Go语言的 的引用数据类型(Reference Data Types)核心知识

Go语言的引用数据类型&#xff08;Reference Data Types&#xff09;核心知识 引言 Go语言作为一种现代编程语言&#xff0c;因其简洁的语法、强大的并发支持以及丰富的标准库而受到广泛欢迎。在Go语言中&#xff0c;数据类型可以分为值类型和引用类型。本文将深入探讨Go语言…...

JAVA解析Excel复杂表头

废话不多说&#xff0c;直接上源码。前后端都有哦&#xff5e;&#xff5e;&#xff5e;&#xff5e;&#xff5e;&#xff5e;&#xff5e;&#xff5e; 能帮到你记得点赞收藏哦&#xff5e;&#xff5e;&#xff5e;&#xff5e;&#xff5e;&#xff5e;&#xff5e;&#…...

jmeter 中 BeanShell 预处理程序、JSR223后置处理程序使用示例

1. 各个组件如何新建的&#xff1f; 2. "http请求" 组件内容样例&#xff1a; "消息体数据" 源码&#xff1a; {"task_tag": "face_detect","image_type": "base64","extra_args": [{"model"…...

我的创作纪念日——《惊变128天》

我的创作纪念日——《惊变128天》 机缘收获日常成就憧憬 机缘 时光飞逝&#xff0c;转眼间&#xff0c;我已在这条创作之路上走过了 128 天。回顾起 2024 年 8 月 29 日&#xff0c;我满怀忐忑与期待&#xff0c;撰写了第一篇技术博客《讲解LeetCode第1题&#xff1a;两数之和…...

vuedraggable 选项介绍

vuedraggable 是基于 SortableJS 的 Vue 组件&#xff0c;提供了丰富的选项来定制拖拽行为。以下是 vuedraggable 常用的选项和它们的详细说明&#xff1a; 常用选项介绍 group 配置拖拽分组。多个列表可以共享同一个分组&#xff0c;允许它们之间的项目互相拖拽。 group: { na…...

微信小程序获取后端数据

在小程序中获取后端接口数据 通常可以使用 wx.request 方法&#xff0c;以下是一个基本示例&#xff1a; // pages/index/index.js Page({data: {// 用于存储后端返回的数据resultData: [] },onLoad() {this.fetchData();},fetchData() {wx.request({url: https://your-backe…...

ThreadLocal` 的工作原理

ThreadLocal 的工作原理&#xff1a; ThreadLocal 是 Java 提供的一个类&#xff0c;它用于为每个线程提供独立的变量副本。也就是说&#xff0c;多个线程访问同一个 ThreadLocal 变量时&#xff0c;每个线程看到的值都是不同的&#xff0c;相互隔离&#xff0c;互不干扰。 T…...

数据挖掘教学指南:从基础到应用

数据挖掘教学指南&#xff1a;从基础到应用 引言 数据挖掘是大数据时代的核心技术之一&#xff0c;它从大量数据中提取有用信息和知识。本教学文章旨在为学生和初学者提供一个全面的数据挖掘学习指南&#xff0c;涵盖数据挖掘的基本概念、流程、常用技术、工具以及教学建议。…...

大模型搜索引擎增强问答demo-纯python实现

流程概览 本文使用python语言,实现了大模型搜索引擎增强问答demo。 大模型搜索引擎增强问答定义:根据问题搜索得到相关内容,拼接prompt=问题+搜索结果,将这个prompt传入大模型,得到最终的结果。 优势在于搜索引擎可以返回实时性信息,例如明日双色球开奖信息、最新八卦…...

【C语言程序设计——选择结构程序设计】按从小到大排序三个数(头歌实践教学平台习题)【合集】

目录&#x1f60b; 任务描述 编程要求 相关知识 1. 选择结构 2. 主要语句类型 3. 比较操作 4. 交换操作 测试说明 通关代码 测试结果 任务描述 本关任务&#xff1a;从键盘上输入三个数&#xff0c;请按从小到大的顺序排序并打印输出排序后的结果。 编程要求 根据提示…...

简洁安装配置在Windows环境下使用vscode开发pytorch

简洁安装配置在Windows环境下使用vscode开发pytorch 使用anaconda安装pytorch&#xff0c;通过vscode集成环境开发pytorch 下载 anaconda 下载网址&#xff0c;选择对应系统的版本 https://repo.anaconda.com/archive/ windows可以选择Anaconda3-2024.10-1-Windows-x86_64.e…...

conda安装及demo:SadTalker实现图片+音频生成高质量视频

1.安装conda 下载各个版本地址&#xff1a;https://repo.anaconda.com/archive/ win10版本&#xff1a; Anaconda3-2023.03-1-Windows-x86_64 linux版本&#xff1a; Anaconda3-2023.03-1-Linux-x86_64 Windows安装 环境变量 conda -V2.配置conda镜像源 安装pip conda…...

【面试】后端开发面试中常见数据结构及应用场景、原理总结

在后端开发面试中&#xff0c;常见的数据结构包括数组、链表、栈、队列、二叉树、平衡树、堆、图和哈希表等。以下是这些数据结构的总结&#xff0c;包括它们的应用场景、优缺点。 常见数据结构及其应用场景 数据结构应用场景数组存储固定大小的数据集合&#xff0c;如学生成…...

141.《mac m系列芯片安装mongodb详细教程》

文章目录 下载从官网下载安装包 下载后双击解压出文件夹安装文件名修改为 mongodb配置data存放位置和日志log的存放位置启动方式一方式二方式二:输入mongo报错以及解决办法 本人电脑 m2 pro,属于 arm 架构 下载 官网地址: mongodb官网 怎么查看自己电脑应该下载哪个版本,输入…...

Java 23 集合框架详解:ArrayList、LinkedList、Vector

&#x1f4da; Java 23 集合框架详解&#xff1a;ArrayList、LinkedList、Vector 在 Java 集合框架中&#xff0c;ArrayList、LinkedList 和 Vector 是三种最常用的 List 接口实现类。它们都可以存储有序的、可重复的元素&#xff0c;但它们在 底层实现、性能 和 多线程安全 等…...

03、MySQL安全管理和特性解析(DBA运维专用)

03、MySQL安全管理和特性解析 本节主要讲MySQL的安全管理、角色使用、特定场景下的数据库对象、各版本特性以及存储引擎 目录 03、MySQL安全管理和特性解析 1、 用户和权限管理 2、 MySQL角色管理 3、 MySQL密码管理 4、 用户资源限制 5、 忘记root密码处理办法 6、 SQ…...

创建型模式5.单例模式

创建型模式 工厂方法模式&#xff08;Factory Method Pattern&#xff09;抽象工厂模式&#xff08;Abstract Factory Pattern&#xff09;建造者模式&#xff08;Builder Pattern&#xff09;原型模式&#xff08;Prototype Pattern&#xff09;单例模式&#xff08;Singleto…...

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例

一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现

摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序&#xff0c;以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务&#xff0c;提供稳定高效的数据处理与业务逻辑支持&#xff1b;利用 uniapp 实现跨平台前…...

基于Docker Compose部署Java微服务项目

一. 创建根项目 根项目&#xff08;父项目&#xff09;主要用于依赖管理 一些需要注意的点&#xff1a; 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件&#xff0c;否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

如何在网页里填写 PDF 表格?

有时候&#xff0c;你可能希望用户能在你的网站上填写 PDF 表单。然而&#xff0c;这件事并不简单&#xff0c;因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件&#xff0c;但原生并不支持编辑或填写它们。更糟的是&#xff0c;如果你想收集表单数据&#xff…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表

##鸿蒙核心技术##运动开发##Sensor Service Kit&#xff08;传感器服务&#xff09;# 前言 在运动类应用中&#xff0c;运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据&#xff0c;如配速、距离、卡路里消耗等&#xff0c;用户可以更清晰…...