Skip to content

Commit

Permalink
Add PipelineJobPreparer (apache#32758)
Browse files Browse the repository at this point in the history
* Add PipelineJobPreparer

* Add PipelineJobPreparer
  • Loading branch information
terrymanu authored Sep 1, 2024
1 parent 3f2b1a3 commit dc00a1b
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job.preparer;

import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;

import java.sql.SQLException;

/**
* Pipeline job preparer.
*
* @param <T> type of pipeline job item context
*/
public interface PipelineJobPreparer<T extends PipelineJobItemContext> {

/**
* Prepare before job execution.
*
* @param jobItemContext job item context
* @throws SQLException SQL exception
*/
void prepare(T jobItemContext) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public final class IncrementalTaskPositionManager {

private final DatabaseType databaseType;

private final DialectIncrementalPositionManager positionInitializer;
private final DialectIncrementalPositionManager dialectPositionManager;

public IncrementalTaskPositionManager(final DatabaseType databaseType) {
this.databaseType = databaseType;
positionInitializer = DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, databaseType);
dialectPositionManager = DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, databaseType);
}

/**
Expand All @@ -68,7 +68,7 @@ public IngestPosition getPosition(final JobItemIncrementalTasksProgress initialP
return position.get();
}
}
return positionInitializer.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()), dumperContext.getJobId());
return dialectPositionManager.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()), dumperContext.getJobId());
}

/**
Expand All @@ -82,11 +82,11 @@ public void destroyPosition(final String jobId, final PipelineDataSourceConfigur
final long startTimeMillis = System.currentTimeMillis();
log.info("Cleanup position, database type: {}, pipeline data source type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
destroyPosition(jobId, (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, positionInitializer);
destroyPosition(jobId, (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, dialectPositionManager);
} else if (pipelineDataSourceConfig instanceof StandardPipelineDataSourceConfiguration) {
destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig, positionInitializer);
destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig, dialectPositionManager);
}
log.info("destroyPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
log.info("Destroy position cost {} ms.", System.currentTimeMillis() - startTimeMillis);
}

private void destroyPosition(final String jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@

package org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;

import java.sql.SQLException;
import java.util.Map.Entry;

/**
* Migration job configuration changed processor.
*/
@Slf4j
public final class MigrationJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor<MigrationJobConfiguration> {

@Override
Expand All @@ -37,7 +43,14 @@ public PipelineJob createJob(final MigrationJobConfiguration jobConfig) {

@Override
public void clean(final JobConfiguration jobConfig) {
new MigrationJobPreparer().cleanup(new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter()));
MigrationJobConfiguration migrationJobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter());
for (Entry<String, PipelineDataSourceConfiguration> entry : migrationJobConfig.getSources().entrySet()) {
try {
new IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(migrationJobConfig.getJobId(), entry.getValue());
} catch (final SQLException ex) {
log.warn("Job destroying failed, jobId={}, dataSourceName={}", migrationJobConfig.getJobId(), entry.getKey(), ex);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.channel.IncrementalChannelCreator;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
Expand All @@ -41,6 +40,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.preparer.PipelineJobPreparer;
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
Expand Down Expand Up @@ -74,50 +74,35 @@
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map.Entry;

/**
* Migration job preparer.
*/
@Slf4j
public final class MigrationJobPreparer {
public final class MigrationJobPreparer implements PipelineJobPreparer<MigrationJobItemContext> {

private final MigrationJobType jobType = new MigrationJobType();
private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(new MigrationJobType().getYamlJobItemProgressSwapper());

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());

/**
* Do prepare work.
*
* @param jobItemContext job item context
* @throws SQLException SQL exception
* @throws PipelineJobCancelingException pipeline job canceled exception
*/
public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException, PipelineJobCancelingException {
@Override
public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration"));
() -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration."));
DatabaseType sourceDatabaseType = jobItemContext.getJobConfig().getSourceDatabaseType();
new PipelineDataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
throw new PipelineJobCancelingException();
}
ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
prepareAndCheckTargetWithLock(jobItemContext);
if (jobItemContext.isStopping()) {
throw new PipelineJobCancelingException();
}
ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
boolean isIncrementalSupported = DatabaseTypedSPILoader.findService(DialectIncrementalDumperCreator.class, sourceDatabaseType).isPresent();
if (isIncrementalSupported) {
prepareIncremental(jobItemContext);
}
initInventoryTasks(jobItemContext);
if (isIncrementalSupported) {
initIncrementalTasks(jobItemContext);
if (jobItemContext.isStopping()) {
throw new PipelineJobCancelingException();
}
ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), PipelineJobCancelingException::new);
}
log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
log.info("Prepare job, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
}

Expand All @@ -142,11 +127,11 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getOffset().persist(jobId, new JobOffsetInfo(true));
}
} finally {
log.info("unlock, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
log.info("Unlock, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
lockContext.unlock(lockDefinition);
}
} else {
log.warn("try lock failed, jobId={}, shardingItem={}", jobId, jobItemContext.getShardingItem());
log.warn("Try lock failed, jobId={}, shardingItem={}", jobId, jobItemContext.getShardingItem());
}
}

Expand Down Expand Up @@ -204,19 +189,4 @@ private void initIncrementalTasks(final MigrationJobItemContext jobItemContext)
PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
jobItemContext.getIncrementalTasks().add(incrementalTask);
}

/**
* Do cleanup work.
*
* @param jobConfig job configuration
*/
public void cleanup(final MigrationJobConfiguration jobConfig) {
for (Entry<String, PipelineDataSourceConfiguration> entry : jobConfig.getSources().entrySet()) {
try {
new IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(), entry.getValue());
} catch (final SQLException ex) {
log.warn("job destroying failed, jobId={}, dataSourceName={}", jobConfig.getJobId(), entry.getKey(), ex);
}
}
}
}

0 comments on commit dc00a1b

Please sign in to comment.