Skip to content

Commit

Permalink
Fix summary and unbind error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Jan 16, 2025
1 parent 984a313 commit 2152f83
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,10 +734,15 @@ private void unbindDevice(String registryId, String gatewayId, String proxyId) {
}
}

private void unbindGatewayDevices(String registryId, Device gatewayDevice,
private boolean unbindGatewayDevices(String registryId, Device gatewayDevice,
Set<String> unbindIds, Consumer<Integer> progress) {
ImmutableSet<String> gatewayIds = ImmutableSet.of(gatewayDevice.toBuilder().getId());
bindDevicesGateways(registryId, gatewayIds, unbindIds, false, progress);
try {
ImmutableSet<String> gatewayIds = ImmutableSet.of(gatewayDevice.toBuilder().getId());
bindDevicesGateways(registryId, gatewayIds, unbindIds, false, progress);
return true;
} catch (Exception e) {
return false;
}
}

private CloudModel updateDevice(String registryId, Device device) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,14 @@ private CloudModel reflectModel(Envelope attributes, CloudModel request) {
return previewReflectResponse(attributes, request);
}

String deviceId = attributes.deviceId;
String deviceRegistryId = attributes.deviceRegistryId;

if (request.resource_type == REGISTRY) {
return iotAccess.modelRegistry(attributes.deviceRegistryId, attributes.deviceId, request);
return iotAccess.modelRegistry(deviceRegistryId, deviceId, request);
} else {
return iotAccess.modelDevice(attributes.deviceRegistryId, attributes.deviceId, request,
progress -> reflectUdmiLog(attributes, format("Processed %d entries...", progress)));
return iotAccess.modelDevice(deviceRegistryId, deviceId, request, progress ->
reflectUdmiLog(attributes, format("Processed %d entries for %s...", progress, deviceId)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ private void writeErrors() throws RuntimeException {
summarizers.forEach(summarizer -> {
File outFile = summarizer.outFile;
try {
summarizer.summarize(workingDevices, errorSummary, extraDevices);
summarizer.summarize(workingDevices, errorSummary, cloudModels);
System.err.println("Registration summary available in " + outFile.getAbsolutePath());
} catch (Exception e) {
throw new RuntimeException("While summarizing output to " + outFile.getAbsolutePath(), e);
Expand Down Expand Up @@ -525,11 +525,17 @@ private void processDevices(Runnable modelMunger) {

int total = 0;

System.err.printf("Processing %d new devices...%n", newDevices.size());
total += processLocalDevices(newDevices);
if (updateCloudIoT) {
System.err.printf("Processing %d new devices...%n", newDevices.size());
total += processLocalDevices(newDevices);

System.err.printf("Updating %d existing devices...%n", oldDevices.size());
total += processLocalDevices(oldDevices);
} else {
System.err.printf("Processing %d target devices...%n", targetDevices.size());
total += processLocalDevices(targetDevices);
}

System.err.printf("Updating %d existing devices...%n", oldDevices.size());
total += processLocalDevices(oldDevices);
System.err.printf("Finished processing %d/%d devices.%n", total, targetDevices.size());

if (updateCloudIoT) {
Expand Down Expand Up @@ -746,25 +752,23 @@ private boolean processLocalDevice(String localName, AtomicInteger processedDevi
Instant start = Instant.now();
int count = processedDeviceCount.incrementAndGet();
boolean created = false;
boolean error = false;
try {
localDevice.writeConfigFile();
if (cloudModels != null && updateCloudIoT) {
created = syncCloudIoT(localName, localDevice);
sendUpdateMessages(localDevice);
cloudModels.computeIfAbsent(localName, name -> fetchDevice(localName, true));
sendSwarmMessage(localDevice);

Duration between = Duration.between(start, Instant.now());
double seconds = (between.getSeconds() + between.getNano() / 1e9) / runnerThreads;
System.err.printf("Processed %s (%d/%d) in %.03fs (%s)%n", localName, count, totalCount,
seconds, created ? "add" : "update");
}
} catch (Exception e) {
System.err.printf("Deferring exception for %s: %s%n", localDevice.getDeviceId(), e);
System.err.printf("Error processing %s: %s%n", localDevice.getDeviceId(), e);
localDevice.captureError(LocalDevice.EXCEPTION_REGISTERING, e);
error = true;
}
Duration between = Duration.between(start, Instant.now());
double seconds = (between.getSeconds() + between.getNano() / 1e9) / runnerThreads;
String result = error ? "error" : (created ? "add" : "update");
System.err.printf("Processed %s (%d/%d) in %.03fs (%s)%n", localName, count, totalCount,
seconds, result);
return created;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Set;
import java.util.TreeSet;
import udmi.schema.CloudModel;
import udmi.schema.CloudModel.Operation;

abstract class Summarizer {

Expand Down Expand Up @@ -54,16 +55,16 @@ static class CsvSummarizer extends Summarizer {

@Override
public void summarize(Map<String, LocalDevice> localDevices, Map<String, Object> errorSummary,
Map<String, CloudModel> extraDevices)
Map<String, CloudModel> maybeCloudModels)
throws Exception {
Map<String, CloudModel> external = ofNullable(extraDevices).orElse(ImmutableMap.of());
Set<String> combined = new TreeSet<>(Sets.union(localDevices.keySet(), external.keySet()));
Map<String, CloudModel> cloudModels = ofNullable(maybeCloudModels).orElse(ImmutableMap.of());
Set<String> combined = new TreeSet<>(Sets.union(localDevices.keySet(), cloudModels.keySet()));
try (PrintWriter writer = new PrintWriter(outFile)) {
writer.println(CSV_JOINER.join(headers));
combined.forEach(deviceId -> {
Map<String, String> rowValues =
ofNullable(ifNotNullGet(localDevices.get(deviceId), this::extractDeviceRow))
.orElse(ifNotNullGet(external.get(deviceId), this::extractDeviceRow));
ofNullable(ifNotNullGet(cloudModels.get(deviceId), this::extractDeviceRow))
.orElse(ifNotNullGet(localDevices.get(deviceId), this::extractDeviceRow));
List<String> listValues = new ArrayList<>(headers.stream().map(rowValues::get).toList());
listValues.set(headers.indexOf(DEVICE_ID_HEADER), deviceId);
writer.println(CSV_JOINER.join(listValues));
Expand All @@ -74,7 +75,7 @@ public void summarize(Map<String, LocalDevice> localDevices, Map<String, Object>
private Map<String, String> extractDeviceRow(CloudModel cloudModel) {
return ImmutableMap.of(
NUM_ID_HEADER, ofNullable(cloudModel.num_id).orElse(UNKNOWN_NUM_ID),
STATUS_HEADER, cloudModel.operation.toString(),
STATUS_HEADER, ofNullable(cloudModel.operation).orElse(Operation.READ).toString(),
ACTIVE_HEADER, JsonUtil.isoConvert(cloudModel.last_event_time),
DETAIL_HEADER, ofNullable(cloudModel.detail).orElse(NO_DETAIL));
}
Expand Down

0 comments on commit 2152f83

Please sign in to comment.