Skip to content

Commit

Permalink
Merge pull request #2883 from murgatroid99/grpc-js-xds_config_selecto…
Browse files Browse the repository at this point in the history
…r_cluster_ref

grpc-js-xds: Reference clusters for ConfigSelector lifetime
  • Loading branch information
murgatroid99 authored Jan 15, 2025
2 parents 22bbe8a + bfd87a9 commit 5a942ed
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 66 deletions.
108 changes: 79 additions & 29 deletions packages/grpc-js-xds/src/resolver-xds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ const RETRY_CODES: {[key: string]: status} = {
export const XDS_CONFIG_KEY = `${experimental.SUBCHANNEL_ARGS_EXCLUDE_KEY_PREFIX}.xds_config`;
export const XDS_CLIENT_KEY = 'grpc.internal.xds_client';

/**
* Tracks a dynamic subscription to a cluster that is currently or previously
* referenced in a RouteConfiguration.
*/
class ClusterRef {
private refCount = 0;
constructor(private unsubscribe: () => void) {}

ref() {
this.refCount += 1;
}

unref() {
this.refCount -= 1;
if (this.refCount <= 0) {
this.unsubscribe();
}
}

hasRef() {
return this.refCount > 0;
}
}

class XdsResolver implements Resolver {

private listenerResourceName: string | null = null;
Expand All @@ -93,6 +117,7 @@ class XdsResolver implements Resolver {

private xdsConfigWatcher: XdsConfigWatcher;
private xdsDependencyManager: XdsDependencyManager | null = null;
private clusterRefs: Map<string, ClusterRef> = new Map();

constructor(
private target: GrpcUri,
Expand Down Expand Up @@ -123,11 +148,20 @@ class XdsResolver implements Resolver {
}
}

private pruneUnusedClusters() {
for (const [cluster, clusterRef] of this.clusterRefs) {
if (!clusterRef.hasRef()) {
this.clusterRefs.delete(cluster);
}
}
}

private async handleXdsConfig(xdsConfig: XdsConfig) {
/* We need to load the xxhash API before this function finishes, because
* it is invoked in the config selector, which can be called immediately
* after this function returns. */
await loadXxhashApi();
this.pruneUnusedClusters();
const httpConnectionManager = decodeSingleResource(HTTP_CONNECTION_MANGER_TYPE_URL, xdsConfig.listener.api_listener!.api_listener!.value);
const configDefaultTimeout = httpConnectionManager.common_http_protocol_options?.idle_timeout;
let defaultTimeout: Duration | undefined = undefined;
Expand Down Expand Up @@ -312,44 +346,60 @@ class XdsResolver implements Resolver {
const routeMatcher = getPredicateForMatcher(route.match!);
matchList.push({matcher: routeMatcher, action: routeAction});
}
const configSelector: ConfigSelector = (methodName, metadata, channelId) => {
for (const {matcher, action} of matchList) {
if (matcher.apply(methodName, metadata)) {
const clusterResult = action.getCluster();
const unrefCluster = this.xdsDependencyManager!.addClusterSubscription(clusterResult.name);
const onCommitted = () => {
unrefCluster();
}
let hash: string;
if (EXPERIMENTAL_RING_HASH) {
hash = `${action.getHash(metadata, channelId)}`;
} else {
hash = '';
for (const cluster of allConfigClusters) {
let clusterRef = this.clusterRefs.get(cluster);
if (!clusterRef) {
clusterRef = new ClusterRef(this.xdsDependencyManager!.addClusterSubscription(cluster));
this.clusterRefs.set(cluster, clusterRef);
}
clusterRef.ref();
}
const configSelector: ConfigSelector = {
invoke: (methodName, metadata, channelId) => {
for (const {matcher, action} of matchList) {
if (matcher.apply(methodName, metadata)) {
const clusterResult = action.getCluster();
const clusterRef = this.clusterRefs.get(clusterResult.name)!;
clusterRef.ref();
const onCommitted = () => {
clusterRef.unref();
}
let hash: string;
if (EXPERIMENTAL_RING_HASH) {
hash = `${action.getHash(metadata, channelId)}`;
} else {
hash = '';
}
return {
methodConfig: clusterResult.methodConfig,
onCommitted: onCommitted,
pickInformation: {cluster: clusterResult.name, hash: hash},
status: status.OK,
dynamicFilterFactories: clusterResult.dynamicFilterFactories
};
}
return {
methodConfig: clusterResult.methodConfig,
onCommitted: onCommitted,
pickInformation: {cluster: clusterResult.name, hash: hash},
status: status.OK,
dynamicFilterFactories: clusterResult.dynamicFilterFactories
};
}
return {
methodConfig: {name: []},
// These fields won't be used here, but they're set because of some TypeScript weirdness
pickInformation: {cluster: '', hash: ''},
status: status.UNAVAILABLE,
dynamicFilterFactories: []
};
},
unref: () => {
for (const cluster of allConfigClusters) {
this.clusterRefs.get(cluster)?.unref();
}
}
return {
methodConfig: {name: []},
// These fields won't be used here, but they're set because of some TypeScript weirdness
pickInformation: {cluster: '', hash: ''},
status: status.UNAVAILABLE,
dynamicFilterFactories: []
};
};
}
trace('Created ConfigSelector with configuration:');
for (const {matcher, action} of matchList) {
trace(matcher.toString());
trace('=> ' + action.toString());
}
const clusterConfigMap: {[key: string]: {child_policy: LoadBalancingConfig[]}} = {};
for (const clusterName of allConfigClusters) {
for (const clusterName of this.clusterRefs.keys()) {
clusterConfigMap[clusterName] = {child_policy: [{cds: {cluster: clusterName}}]};
}
const lbPolicyConfig = {xds_cluster_manager: {children: clusterConfigMap}};
Expand Down
5 changes: 4 additions & 1 deletion packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ export class InternalChannel {
'Address resolution succeeded'
);
}
this.configSelector?.unref();
this.configSelector = configSelector;
this.currentResolutionError = null;
/* We process the queue asynchronously to ensure that the corresponding
Expand Down Expand Up @@ -568,7 +569,7 @@ export class InternalChannel {
if (this.configSelector) {
return {
type: 'SUCCESS',
config: this.configSelector(method, metadata, this.randomChannelId),
config: this.configSelector.invoke(method, metadata, this.randomChannelId),
};
} else {
if (this.currentResolutionError) {
Expand Down Expand Up @@ -790,6 +791,8 @@ export class InternalChannel {
}

this.subchannelPool.unrefUnusedSubchannels();
this.configSelector?.unref();
this.configSelector = null;
}

getTarget() {
Expand Down
3 changes: 2 additions & 1 deletion packages/grpc-js/src/resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ export interface CallConfig {
* https://github.com/grpc/proposal/blob/master/A31-xds-timeout-support-and-config-selector.md#new-functionality-in-grpc
*/
export interface ConfigSelector {
(methodName: string, metadata: Metadata, channelId: number): CallConfig;
invoke(methodName: string, metadata: Metadata, channelId: number): CallConfig;
unref(): void;
}

/**
Expand Down
74 changes: 39 additions & 35 deletions packages/grpc-js/src/resolving-load-balancer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,43 +103,46 @@ function findMatchingConfig(
function getDefaultConfigSelector(
serviceConfig: ServiceConfig | null
): ConfigSelector {
return function defaultConfigSelector(
methodName: string,
metadata: Metadata
) {
const splitName = methodName.split('/').filter(x => x.length > 0);
const service = splitName[0] ?? '';
const method = splitName[1] ?? '';
if (serviceConfig && serviceConfig.methodConfig) {
/* Check for the following in order, and return the first method
* config that matches:
* 1. A name that exactly matches the service and method
* 2. A name with no method set that matches the service
* 3. An empty name
*/
for (const matchLevel of NAME_MATCH_LEVEL_ORDER) {
const matchingConfig = findMatchingConfig(
service,
method,
serviceConfig.methodConfig,
matchLevel
);
if (matchingConfig) {
return {
methodConfig: matchingConfig,
pickInformation: {},
status: Status.OK,
dynamicFilterFactories: [],
};
return {
invoke(
methodName: string,
metadata: Metadata
) {
const splitName = methodName.split('/').filter(x => x.length > 0);
const service = splitName[0] ?? '';
const method = splitName[1] ?? '';
if (serviceConfig && serviceConfig.methodConfig) {
/* Check for the following in order, and return the first method
* config that matches:
* 1. A name that exactly matches the service and method
* 2. A name with no method set that matches the service
* 3. An empty name
*/
for (const matchLevel of NAME_MATCH_LEVEL_ORDER) {
const matchingConfig = findMatchingConfig(
service,
method,
serviceConfig.methodConfig,
matchLevel
);
if (matchingConfig) {
return {
methodConfig: matchingConfig,
pickInformation: {},
status: Status.OK,
dynamicFilterFactories: [],
};
}
}
}
}
return {
methodConfig: { name: [] },
pickInformation: {},
status: Status.OK,
dynamicFilterFactories: [],
};
return {
methodConfig: { name: [] },
pickInformation: {},
status: Status.OK,
dynamicFilterFactories: [],
};
},
unref() {}
};
}

Expand Down Expand Up @@ -298,6 +301,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
'All load balancer options in service config are not compatible',
metadata: new Metadata(),
});
configSelector?.unref();
return;
}
this.childLoadBalancer.updateAddressList(
Expand Down

0 comments on commit 5a942ed

Please sign in to comment.