Skip to content

Commit

Permalink
Fix registrar performance for large sites (#1071)
Browse files Browse the repository at this point in the history
* Fixing extras gateway proxy devices

* Add proper num_id handling for instantiating local models

* Add set size limiting

* Cleanup unbind string

* Fix binding output

* Fixing some NPEs

* Fixing unit tests
  • Loading branch information
grafnu authored Jan 21, 2025
1 parent 3608bd2 commit adfecda
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 25 deletions.
3 changes: 2 additions & 1 deletion bin/pull_pubsub
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ echo $project_spec | tr / ' ' > $TMP_FILE
read < $TMP_FILE schema project_id namespace
echo schema:$schema project_id:$project_id namespace:$namespace

suffix=${namespace#*+}

if [[ -z $namespace ]]; then
suffix=${project_id#*+}
project_id=${project_id%+*}
fi

suffix=${namespace#*+}
[[ $suffix != $namespace ]] || suffix=
namespace=${namespace%%+*}

Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/com/google/udmi/util/GeneralUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ public static String friendlyStackTrace(Throwable e) {
}

private static List<String> traceDetails(Throwable e) {
// Only include the base message and first line of output, which will have the offending line.
return Arrays.stream(stackTraceString(e).split("\n")).toList().subList(0, 2);
// Only include the first two lines of output, which will have the message and offending line.
return Arrays.stream(stackTraceString(e).split("\n")).limit(2).toList();
}

public static List<String> friendlyLineTrace(Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class LocalDevice {
private String baseVersion;
private Date lastActive;
private boolean blocked;
private CloudModel cloudModel;

LocalDevice(
SiteModel siteModel, String deviceId, Map<String, JsonSchema> schemas,
Expand Down Expand Up @@ -229,6 +230,7 @@ public void initialize() {
prepareOutDir();
ifTrueThen(deviceKind == DeviceKind.LOCAL && metadata != null, this::validateMetadata);
config = configFrom(metadata, deviceId, siteModel);
ifTrueThen(deviceKind == DeviceKind.EXTRA, this::loadExtraCloudModel);
}

public static void parseMetadataValidateProcessingReport(ProcessingReport report)
Expand Down Expand Up @@ -378,7 +380,7 @@ public void loadCredentials() {
}
String authType = getAuthType();
Set<String> keyFiles = (authType.equals(ES_CERT_TYPE) || authType.equals(RSA_CERT_TYPE))
? ALL_CERT_FILES : ALL_KEY_FILES;
? ALL_CERT_FILES : ALL_KEY_FILES;
for (String keyFile : keyFiles) {
Credential deviceCredential = getDeviceCredential(keyFile);
if (deviceCredential != null) {
Expand Down Expand Up @@ -454,16 +456,17 @@ boolean isProxied() {
: config != null && config.isProxied();
}

private CloudModel extraCloudModel() {
return loadFile(CloudModel.class, new File(siteModel.getExtraDir(deviceId), CLOUD_MODEL_FILE));
private void loadExtraCloudModel() {
cloudModel = loadFile(CloudModel.class,
new File(siteModel.getExtraDir(deviceId), CLOUD_MODEL_FILE));
}

private boolean isExtraKind() {
return deviceKind == DeviceKind.EXTRA;
}

String getGatewayId() {
GatewayModel gatewayModel = isExtraKind() ? extraCloudModel().gateway : metadata.gateway;
GatewayModel gatewayModel = isExtraKind() ? cloudModel.gateway : metadata.gateway;
return ifNotNullGet(gatewayModel, model -> model.gateway_id);
}

Expand All @@ -481,15 +484,15 @@ void initializeSettings() {
settings.credentials = deviceCredentials;
settings.generation = generation;
settings.blocked = deviceKind == DeviceKind.EXTRA;
settings.proxyDevices = getProxyDevicesList();
settings.deviceNumId = findDeviceNumId();

if (metadata == null) {
return;
}

settings.updated = config.getUpdatedTimestamp();
settings.metadata = deviceMetadataString();
settings.deviceNumId = ifNotNullGet(metadata.cloud, cloud -> cloud.num_id);
settings.proxyDevices = config.getProxyDevicesList();
settings.keyAlgorithm = getAuthType();
settings.keyBytes = getKeyBytes();
settings.config = deviceConfigString();
Expand All @@ -498,6 +501,18 @@ void initializeSettings() {
}
}

private String findDeviceNumId() {
return isExtraKind() ? cloudModel.num_id : catchToNull(() -> metadata.cloud.num_id);
}

private List<String> getProxyDevicesList() {
return isExtraKind() ? getCloudModelProxyList() : config.getProxyDevicesList();
}

private List<String> getCloudModelProxyList() {
return catchToNull(() -> cloudModel.gateway.proxy_ids);
}

public void updateModel(CloudModel device) {
setDeviceNumId(checkNotNull(device.num_id, "missing deviceNumId for " + deviceId));
setLastActive(device.last_event_time);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public class Registrar {
private static final String TOOL_NAME = "registrar";
private static final long DELETE_FLUSH_DELAY_MS = 10 * SEC_TO_MS;
public static final String REGISTRAR_TOOL_NAME = "registrar";
private static final int SET_SIZE_THRESHOLD = 10;
private final Map<String, JsonSchema> schemas = new HashMap<>();
private final String generation = JsonUtil.isoConvert();
private final Set<Summarizer> summarizers = new HashSet<>();
Expand Down Expand Up @@ -717,20 +718,30 @@ private void deleteDevice(Set<String> allDevices, String deviceId) {
}

/**
* Unbind all devices for the list of gateways. This is synchronized at the class level to ensure
* that only one instance at a time, since it's a global operation that doesn't need to be
* executed for each device individually.
* Unbind all devices for the list of gateways. This is synchronized to ensure that only one
* instance at a time, since it's a global operation that doesn't need to be executed for each
* device individually.
*/
private void unbindDevicesFromGateways(Set<String> allDevices,
Set<String> boundGateways) {
synchronized (Operation.UNBIND) {
boundGateways.forEach(gatewayId -> {
private void unbindDevicesFromGateways(Set<String> allDevices, Set<String> boundGateways) {
boundGateways.forEach(gatewayId -> {
synchronized (Operation.UNBIND) {
Map<String, CloudModel> boundDevices = cloudIotManager.fetchDevice(gatewayId).device_ids;
SetView<String> toUnbind = intersection(allDevices, boundDevices.keySet());
System.err.printf("Unbinding from gateway %s: %s%n", gatewayId, toUnbind);
cloudIotManager.bindDevices(toUnbind, gatewayId, false);
});
}
Set<String> toUnbind = new HashSet<>(intersection(allDevices, boundDevices.keySet()));
System.err.printf("Unbinding from gateway %s: %s%n", gatewayId, setOrSize(toUnbind));
boolean multiple = toUnbind.size() > SET_SIZE_THRESHOLD;
while (!toUnbind.isEmpty()) {
Set<String> limitedSet = limitSetSize(toUnbind, SET_SIZE_THRESHOLD);
ifTrueThen(multiple, () -> System.err.printf("Unbinding subset from %s: %s%n", gatewayId,
setOrSize(limitedSet)));
cloudIotManager.bindDevices(limitedSet, gatewayId, false);
toUnbind.removeAll(limitedSet);
}
}
});
}

private Set<String> limitSetSize(Set<String> toUnbind, int sizeLimit) {
return toUnbind.stream().limit(sizeLimit).collect(Collectors.toSet());
}

private boolean processLocalDevice(String localName, AtomicInteger processedDeviceCount,
Expand Down Expand Up @@ -1010,18 +1021,18 @@ private void bindGatewayDevices(Map<String, LocalDevice> localDevices) {
.filter(LocalDevice::isProxied)
.collect(groupingBy(LocalDevice::getGatewayId, Collectors.toSet()));
AtomicInteger bindingCount = new AtomicInteger();
System.err.printf("Binding devices to gateways: %s%n", gatewayBindings.keySet());
System.err.printf("Binding devices to gateways: %s%n", setOrSize(gatewayBindings.keySet()));
gatewayBindings.forEach((gatewayId, proxiedDevices) -> {
parallelExecute(() -> {
try {
Set<String> proxyIds = proxiedDevices.stream().map(LocalDevice::getDeviceId)
.collect(Collectors.toSet());
Set<String> boundDevices = ofNullable(cloudIotManager.fetchBoundDevices(gatewayId))
.orElse(ImmutableSet.of());
System.err.printf("Already bound to %s: %s%n", gatewayId, boundDevices);
System.err.printf("Already bound to %s: %s%n", gatewayId, setOrSize(boundDevices));
SetView<String> toBind = difference(proxyIds, boundDevices);
int count = bindingCount.incrementAndGet();
System.err.printf("Binding %s to %s (%d/%d)%n", toBind, gatewayId, count,
System.err.printf("Binding %s to %s (%d/%d)%n", setOrSize(toBind), gatewayId, count,
gatewayBindings.size());
cloudIotManager.bindDevices(toBind, gatewayId, true);
} catch (Exception e) {
Expand All @@ -1042,6 +1053,11 @@ private void bindGatewayDevices(Map<String, LocalDevice> localDevices) {
}
}

private static String setOrSize(Set<String> items) {
return items.size() > SET_SIZE_THRESHOLD
? format("%d devices", items.size()) : items.toString();
}

private synchronized void dynamicTerminate(int expected) throws InterruptedException {
try {
if (executor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.google.daq.mqtt.util.TimePeriodConstants.THREE_MINUTES_MS;
import static com.google.daq.mqtt.util.TimePeriodConstants.TWO_MINUTES_MS;
import static com.google.udmi.util.GeneralUtils.CSV_JOINER;
import static com.google.udmi.util.GeneralUtils.catchToElse;
import static com.google.udmi.util.GeneralUtils.ifNotNullGet;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
import static com.google.udmi.util.GeneralUtils.ifNotTrueGet;
Expand All @@ -18,6 +19,7 @@
import static udmi.schema.Category.POINTSET_POINT_INVALID_VALUE;
import static udmi.schema.FeatureDiscovery.FeatureStage.STABLE;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.daq.mqtt.sequencer.Feature;
import com.google.daq.mqtt.sequencer.PointsetBase;
Expand Down Expand Up @@ -67,7 +69,8 @@ private void untilPointsetSanity() {

waitUntil("pointset state matches config", EVENT_WAIT_DURATION, () -> {
Set<String> configPoints = deviceConfig.pointset.points.keySet();
Set<String> statePoints = deviceState.pointset.points.keySet();
Set<String> statePoints = catchToElse(() ->
deviceState.pointset.points.keySet(), ImmutableSet.of());
String prefix = format("config %s state %s differences: ",
isoConvert(deviceConfig.timestamp), isoConvert(deviceState.timestamp));
return prefixedDifference(prefix, configPoints, statePoints);
Expand Down

0 comments on commit adfecda

Please sign in to comment.