Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

report error creating cron #732

Merged
merged 18 commits into from
Jan 16, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.mantisrx.master.jobcluster.job.MantisJobMetadataView;
import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.CreateJobClusterResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DeleteJobClusterResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterRequest;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto.DisableJobClusterResponse;
Expand Down Expand Up @@ -701,7 +702,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
// create sla enforcer
slaEnforcer = new SLAEnforcer(jobClusterMetadata.getJobClusterDefinition().getSLA());
long expireFrequency = ConfigurationProvider.getConfig().getCompletedJobPurgeFrequencySeqs();

String jobClusterName = jobClusterMetadata.getJobClusterDefinition().getName();
// If cluster is disabled
if(jobClusterMetadata.isDisabled()) {
logger.info("Cluster {} initialized but is Disabled", jobClusterMetadata
Expand All @@ -718,7 +719,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
int count = 50;
if(!initReq.jobList.isEmpty()) {
logger.info("Cluster {} is disabled however it has {} active/accepted jobs",
jobClusterMetadata.getJobClusterDefinition().getName(), initReq.jobList.size());
jobClusterName, initReq.jobList.size());
for(IMantisJobMetadata jobMeta : initReq.jobList) {
try {
if(count == 0) {
Expand All @@ -728,7 +729,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
if(!JobState.isTerminalState(jobMeta.getState())) {
logger.info("Job {} is in non terminal state {} for disabled cluster {}."
+ "Marking it complete", jobMeta.getJobId(), jobMeta.getState(),
jobClusterMetadata.getJobClusterDefinition().getName());
jobClusterName);
count--;
jobManager.markCompleted(jobMeta);
jobStore.archiveJob(jobMeta);
Expand Down Expand Up @@ -758,7 +759,7 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
eventPublisher.publishAuditEvent(
new LifecycleEventsProto.AuditEvent(
LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_CREATE,
jobClusterMetadata.getJobClusterDefinition().getName(),
jobClusterName,
"saved job cluster " + name)
);
logger.info("successfully saved job cluster {}", name);
Expand Down Expand Up @@ -790,6 +791,10 @@ public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest i
cronManager = new CronManager(name, getSelf(), jobClusterMetadata.getJobClusterDefinition().getSLA());
} catch (Exception e) {
logger.warn("Exception initializing cron", e);
getSender().tell(new JobClusterManagerProto.CreateJobClusterResponse(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe separate this catch to known error (SchedulerException) and other error (exception) and use different error code + message.

initReq.requestId, e instanceof SchedulerException?CLIENT_ERROR:SERVER_ERROR,
"Job Cluster " + jobClusterName + " could not be created due to cron initialization error" + e.getMessage(),
jobClusterName), getSelf());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case don't you want to stop here and not initRunning workers?

}
initRunningJobs(initReq, sender);

Expand Down Expand Up @@ -916,7 +921,7 @@ public void onJobClusterUpdate(final UpdateJobClusterRequest request) {
} catch (Exception e) {
logger.error("job cluster not created");
sender.tell(new UpdateJobClusterResponse(request.requestId, SERVER_ERROR, name
+ " Job cluster updation failed " + e.getMessage()), getSelf());
+ " Job cluster update failed " + e.getMessage()), getSelf());
numJobClusterUpdateErrors.increment();
}
}
Expand Down Expand Up @@ -1271,13 +1276,13 @@ public void onJobClusterEnable(final EnableJobClusterRequest req) {
.withLastJobCount(this.jobClusterMetadata.getLastJobCount())
.withJobClusterDefinition((JobClusterDefinitionImpl)this.jobClusterMetadata.getJobClusterDefinition())
.build();
//update store
jobStore.updateJobCluster(jobClusterMetadata);
this.jobClusterMetadata = jobClusterMetadata;
if (cronManager == null) {
cronManager = new CronManager(name, getSelf(), jobClusterMetadata.getJobClusterDefinition().getSLA());
}
this.cronManager.initCron();
//update store after cron init
jobStore.updateJobCluster(jobClusterMetadata);
this.jobClusterMetadata = jobClusterMetadata;
// change behavior to enabled
getContext().become(initializedBehavior);

Expand All @@ -1294,7 +1299,7 @@ public void onJobClusterEnable(final EnableJobClusterRequest req) {
} catch(Exception e) {
String errorMsg = String.format("Exception enabling cluster %s due to %s", name, e.getMessage());
logger.error(errorMsg,e);
sender.tell(new EnableJobClusterResponse(req.requestId, SERVER_ERROR, errorMsg), getSelf());
sender.tell(new EnableJobClusterResponse(req.requestId, e instanceof SchedulerException?CLIENT_ERROR:SERVER_ERROR, errorMsg), getSelf());
numJobClusterEnableErrors.increment();
}
if(logger.isTraceEnabled()) { logger.trace("Enter onJobClusterEnable"); }
Expand Down Expand Up @@ -2122,24 +2127,23 @@ public void onJobClusterUpdateSLA(UpdateJobClusterSLARequest slaRequest) {
.withJobClusterDefinition(updatedDefn)
.build();

updateAndSaveJobCluster(jobCluster);
if(cronManager != null)
cronManager.destroyCron();
this.cronManager = new CronManager(name, getSelf(), newSla);

updateAndSaveJobCluster(jobCluster); //update after cron succeeds
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, SUCCESS, name + " SLA updated"), getSelf());

eventPublisher.publishAuditEvent(
new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE,
jobClusterMetadata.getJobClusterDefinition().getName(), name+" SLA update")
);
} catch(IllegalArgumentException e) {
} catch(IllegalArgumentException | SchedulerException e) {
logger.error("Invalid arguement job cluster not updated ", e);
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, CLIENT_ERROR, name + " Job cluster SLA updation failed " + e.getMessage()), getSelf());
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, CLIENT_ERROR, name + " Job cluster SLA update failed " + e.getMessage()), getSelf());

} catch(Exception e) {
logger.error("job cluster not updated ", e);
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, SERVER_ERROR, name + " Job cluster SLA updation failed " + e.getMessage()), getSelf());
sender.tell(new UpdateJobClusterSLAResponse(slaRequest.requestId, SERVER_ERROR, name + " Job cluster SLA update failed " + e.getMessage()), getSelf());
}
if(logger.isTraceEnabled()) { logger.trace("Exit onJobClusterUpdateSLA {}", slaRequest); }
}
Expand Down Expand Up @@ -3217,7 +3221,7 @@ private void initCron() throws Exception{
isCronActive = true;
} catch (IllegalArgumentException e) {
destroyCron();
logger.error("Failed to start cron for {}: {}", jobClusterName, e);
logger.error("Failed to start cron for {}: {}. The format of the cron schedule may be incorrect.", jobClusterName, e.getStackTrace());
throw new SchedulerException(e.getMessage(), e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.mantisrx.server.master.domain;

import com.netflix.fenzo.triggers.TriggerUtils;
import io.mantisrx.common.Label;
import io.mantisrx.master.jobcluster.LabelManager.SystemLabels;
import io.mantisrx.master.jobcluster.job.JobState;
Expand Down Expand Up @@ -64,6 +65,8 @@ public JobClusterDefinitionImpl(@JsonProperty("name") String name,
) {
Preconditions.checkNotNull(jobClusterConfigs);
Preconditions.checkArgument(!jobClusterConfigs.isEmpty());
if (sla != null && sla.getCronSpec() != null)
TriggerUtils.validateCronExpression(sla.getCronSpec());
this.owner = owner;
this.name = name;
this.sla = Optional.ofNullable(sla).orElse(new SLA(0, 0, null, CronPolicy.KEEP_EXISTING));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,52 @@ public void testCronTriggersSLAToKillOld() {

}

@Test(expected = IllegalArgumentException.class)
public void testInvalidCronDefined() {
TestKit probe = new TestKit(system);
String clusterName = "testInvalidCronSubmit";
MantisScheduler schedulerMock = mock(MantisScheduler.class);
MantisJobStore jobStoreMock = mock(MantisJobStore.class);

SLA sla = new SLA(1,1,"a b * * * * * * *",IJobClusterDefinition.CronPolicy.KEEP_NEW);
final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName, Lists.newArrayList(),sla); //should throw IllegalArgumentException
}

@Test
public void testInvalidCronSLAUpdate() throws Exception {
TestKit probe = new TestKit(system);
String clusterName = "testJobClusterInvalidSLAUpdateIgnored";
MantisScheduler schedulerMock = mock(MantisScheduler.class);
MantisJobStore jobStoreMock = mock(MantisJobStore.class);

final JobClusterDefinitionImpl fakeJobCluster = createFakeJobClusterDefn(clusterName);
ActorRef jobClusterActor = system.actorOf(props(clusterName, jobStoreMock, jobDfn -> schedulerMock, eventPublisher, costsCalculator, 0));
jobClusterActor.tell(new JobClusterProto.InitializeJobClusterRequest(fakeJobCluster, user, probe.getRef()), probe.getRef());
JobClusterProto.InitializeJobClusterResponse createResp = probe.expectMsgClass(JobClusterProto.InitializeJobClusterResponse.class);
assertEquals(SUCCESS, createResp.responseCode);

UpdateJobClusterSLARequest updateSlaReq = new UpdateJobClusterSLARequest(clusterName, 2, 1,"a b * * * * * * *",IJobClusterDefinition.CronPolicy.KEEP_NEW,false,"user" );
jobClusterActor.tell(updateSlaReq, probe.getRef());
UpdateJobClusterSLAResponse resp = probe.expectMsgClass(UpdateJobClusterSLAResponse.class);

assertEquals(CLIENT_ERROR, resp.responseCode);
assertEquals(jobClusterActor, probe.getLastSender());

jobClusterActor.tell(new GetJobClusterRequest(clusterName), probe.getRef());
GetJobClusterResponse resp3 = probe.expectMsgClass(GetJobClusterResponse.class);

assertEquals(SUCCESS, resp3.responseCode);
assertTrue(resp3.getJobCluster() != null);
System.out.println("Job cluster " + resp3.getJobCluster());
assertEquals(clusterName, resp3.getJobCluster().get().getName());
// No changes to original SLA
assertEquals(0, resp3.getJobCluster().get().getSla().getMin());
assertEquals(0, resp3.getJobCluster().get().getSla().getMax());

verify(jobStoreMock, times(1)).createJobCluster(any());
verify(jobStoreMock, times(0)).updateJobCluster(any());
}

@Test
public void testJobSubmitWithUnique() {
TestKit probe = new TestKit(system);
Expand Down
Loading