Skip to content

Commit

Permalink
Merge pull request apache#3067 from dandsager1/STORM-3442
Browse files Browse the repository at this point in the history
STORM-3442 Add owner to supervisor summary
  • Loading branch information
Ethanlm authored Jul 5, 2019
2 parents 7ca1ffb + 926d7d1 commit 8d4f0c6
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ private NumErrorsChoice(int value) {
/**
* Get the integer value of this enum value, as defined in the Thrift IDL.
*/
@Override
public int getValue() {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ private TopologyInitialStatus(int value) {
/**
* Get the integer value of this enum value, as defined in the Thrift IDL.
*/
@Override
public int getValue() {
return value;
}
Expand Down
114 changes: 110 additions & 4 deletions storm-client/src/jvm/org/apache/storm/generated/WorkerSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class WorkerSummary implements org.apache.storm.thrift.TBase<WorkerSummar
private static final org.apache.storm.thrift.protocol.TField ASSIGNED_MEMONHEAP_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("assigned_memonheap", org.apache.storm.thrift.protocol.TType.DOUBLE, (short)524);
private static final org.apache.storm.thrift.protocol.TField ASSIGNED_MEMOFFHEAP_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("assigned_memoffheap", org.apache.storm.thrift.protocol.TType.DOUBLE, (short)525);
private static final org.apache.storm.thrift.protocol.TField ASSIGNED_CPU_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("assigned_cpu", org.apache.storm.thrift.protocol.TType.DOUBLE, (short)526);
private static final org.apache.storm.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("owner", org.apache.storm.thrift.protocol.TType.STRING, (short)527);

private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new WorkerSummaryStandardSchemeFactory();
private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new WorkerSummaryTupleSchemeFactory();
Expand All @@ -62,6 +63,7 @@ public class WorkerSummary implements org.apache.storm.thrift.TBase<WorkerSummar
private double assigned_memonheap; // optional
private double assigned_memoffheap; // optional
private double assigned_cpu; // optional
private @org.apache.storm.thrift.annotation.Nullable java.lang.String owner; // optional

/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum {
Expand All @@ -79,7 +81,8 @@ public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum {
REQUESTED_CPU((short)523, "requested_cpu"),
ASSIGNED_MEMONHEAP((short)524, "assigned_memonheap"),
ASSIGNED_MEMOFFHEAP((short)525, "assigned_memoffheap"),
ASSIGNED_CPU((short)526, "assigned_cpu");
ASSIGNED_CPU((short)526, "assigned_cpu"),
OWNER((short)527, "owner");

private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();

Expand Down Expand Up @@ -125,6 +128,8 @@ public static _Fields findByThriftId(int fieldId) {
return ASSIGNED_MEMOFFHEAP;
case 526: // ASSIGNED_CPU
return ASSIGNED_CPU;
case 527: // OWNER
return OWNER;
default:
return null;
}
Expand Down Expand Up @@ -177,7 +182,7 @@ public java.lang.String getFieldName() {
private static final int __ASSIGNED_MEMOFFHEAP_ISSET_ID = 8;
private static final int __ASSIGNED_CPU_ISSET_ID = 9;
private short __isset_bitfield = 0;
private static final _Fields optionals[] = {_Fields.SUPERVISOR_ID,_Fields.HOST,_Fields.PORT,_Fields.TOPOLOGY_ID,_Fields.TOPOLOGY_NAME,_Fields.NUM_EXECUTORS,_Fields.COMPONENT_TO_NUM_TASKS,_Fields.TIME_SECS,_Fields.UPTIME_SECS,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU};
private static final _Fields optionals[] = {_Fields.SUPERVISOR_ID,_Fields.HOST,_Fields.PORT,_Fields.TOPOLOGY_ID,_Fields.TOPOLOGY_NAME,_Fields.NUM_EXECUTORS,_Fields.COMPONENT_TO_NUM_TASKS,_Fields.TIME_SECS,_Fields.UPTIME_SECS,_Fields.REQUESTED_MEMONHEAP,_Fields.REQUESTED_MEMOFFHEAP,_Fields.REQUESTED_CPU,_Fields.ASSIGNED_MEMONHEAP,_Fields.ASSIGNED_MEMOFFHEAP,_Fields.ASSIGNED_CPU,_Fields.OWNER};
public static final java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData>(_Fields.class);
Expand Down Expand Up @@ -213,6 +218,8 @@ public java.lang.String getFieldName() {
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.DOUBLE)));
tmpMap.put(_Fields.ASSIGNED_CPU, new org.apache.storm.thrift.meta_data.FieldMetaData("assigned_cpu", org.apache.storm.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.DOUBLE)));
tmpMap.put(_Fields.OWNER, new org.apache.storm.thrift.meta_data.FieldMetaData("owner", org.apache.storm.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.STRING)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.storm.thrift.meta_data.FieldMetaData.addStructMetaDataMap(WorkerSummary.class, metaDataMap);
}
Expand Down Expand Up @@ -251,6 +258,9 @@ public WorkerSummary(WorkerSummary other) {
this.assigned_memonheap = other.assigned_memonheap;
this.assigned_memoffheap = other.assigned_memoffheap;
this.assigned_cpu = other.assigned_cpu;
if (other.is_set_owner()) {
this.owner = other.owner;
}
}

public WorkerSummary deepCopy() {
Expand Down Expand Up @@ -284,6 +294,7 @@ public void clear() {
this.assigned_memoffheap = 0.0;
set_assigned_cpu_isSet(false);
this.assigned_cpu = 0.0;
this.owner = null;
}

@org.apache.storm.thrift.annotation.Nullable
Expand Down Expand Up @@ -637,6 +648,30 @@ public void set_assigned_cpu_isSet(boolean value) {
__isset_bitfield = org.apache.storm.thrift.EncodingUtils.setBit(__isset_bitfield, __ASSIGNED_CPU_ISSET_ID, value);
}

@org.apache.storm.thrift.annotation.Nullable
public java.lang.String get_owner() {
return this.owner;
}

public void set_owner(@org.apache.storm.thrift.annotation.Nullable java.lang.String owner) {
this.owner = owner;
}

public void unset_owner() {
this.owner = null;
}

/** Returns true if field owner is set (has been assigned a value) and false otherwise */
public boolean is_set_owner() {
return this.owner != null;
}

public void set_owner_isSet(boolean value) {
if (!value) {
this.owner = null;
}
}

public void setFieldValue(_Fields field, @org.apache.storm.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
case SUPERVISOR_ID:
Expand Down Expand Up @@ -759,6 +794,14 @@ public void setFieldValue(_Fields field, @org.apache.storm.thrift.annotation.Nul
}
break;

case OWNER:
if (value == null) {
unset_owner();
} else {
set_owner((java.lang.String)value);
}
break;

}
}

Expand Down Expand Up @@ -810,6 +853,9 @@ public java.lang.Object getFieldValue(_Fields field) {
case ASSIGNED_CPU:
return get_assigned_cpu();

case OWNER:
return get_owner();

}
throw new java.lang.IllegalStateException();
}
Expand Down Expand Up @@ -851,6 +897,8 @@ public boolean isSet(_Fields field) {
return is_set_assigned_memoffheap();
case ASSIGNED_CPU:
return is_set_assigned_cpu();
case OWNER:
return is_set_owner();
}
throw new java.lang.IllegalStateException();
}
Expand Down Expand Up @@ -1005,6 +1053,15 @@ public boolean equals(WorkerSummary that) {
return false;
}

boolean this_present_owner = true && this.is_set_owner();
boolean that_present_owner = true && that.is_set_owner();
if (this_present_owner || that_present_owner) {
if (!(this_present_owner && that_present_owner))
return false;
if (!this.owner.equals(that.owner))
return false;
}

return true;
}

Expand Down Expand Up @@ -1072,6 +1129,10 @@ public int hashCode() {
if (is_set_assigned_cpu())
hashCode = hashCode * 8191 + org.apache.storm.thrift.TBaseHelper.hashCode(assigned_cpu);

hashCode = hashCode * 8191 + ((is_set_owner()) ? 131071 : 524287);
if (is_set_owner())
hashCode = hashCode * 8191 + owner.hashCode();

return hashCode;
}

Expand Down Expand Up @@ -1233,6 +1294,16 @@ public int compareTo(WorkerSummary other) {
return lastComparison;
}
}
lastComparison = java.lang.Boolean.valueOf(is_set_owner()).compareTo(other.is_set_owner());
if (lastComparison != 0) {
return lastComparison;
}
if (is_set_owner()) {
lastComparison = org.apache.storm.thrift.TBaseHelper.compareTo(this.owner, other.owner);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}

Expand Down Expand Up @@ -1363,6 +1434,16 @@ public java.lang.String toString() {
sb.append(this.assigned_cpu);
first = false;
}
if (is_set_owner()) {
if (!first) sb.append(", ");
sb.append("owner:");
if (this.owner == null) {
sb.append("null");
} else {
sb.append(this.owner);
}
first = false;
}
sb.append(")");
return sb.toString();
}
Expand Down Expand Up @@ -1540,6 +1621,14 @@ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, WorkerSummary
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 527: // OWNER
if (schemeField.type == org.apache.storm.thrift.protocol.TType.STRING) {
struct.owner = iprot.readString();
struct.set_owner_isSet(true);
} else {
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
Expand Down Expand Up @@ -1646,6 +1735,13 @@ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, WorkerSummar
oprot.writeDouble(struct.assigned_cpu);
oprot.writeFieldEnd();
}
if (struct.owner != null) {
if (struct.is_set_owner()) {
oprot.writeFieldBegin(OWNER_FIELD_DESC);
oprot.writeString(struct.owner);
oprot.writeFieldEnd();
}
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
Expand Down Expand Up @@ -1709,7 +1805,10 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, WorkerSummary
if (struct.is_set_assigned_cpu()) {
optionals.set(14);
}
oprot.writeBitSet(optionals, 15);
if (struct.is_set_owner()) {
optionals.set(15);
}
oprot.writeBitSet(optionals, 16);
if (struct.is_set_supervisor_id()) {
oprot.writeString(struct.supervisor_id);
}
Expand Down Expand Up @@ -1762,12 +1861,15 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, WorkerSummary
if (struct.is_set_assigned_cpu()) {
oprot.writeDouble(struct.assigned_cpu);
}
if (struct.is_set_owner()) {
oprot.writeString(struct.owner);
}
}

@Override
public void read(org.apache.storm.thrift.protocol.TProtocol prot, WorkerSummary struct) throws org.apache.storm.thrift.TException {
org.apache.storm.thrift.protocol.TTupleProtocol iprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
java.util.BitSet incoming = iprot.readBitSet(15);
java.util.BitSet incoming = iprot.readBitSet(16);
if (incoming.get(0)) {
struct.supervisor_id = iprot.readString();
struct.set_supervisor_id_isSet(true);
Expand Down Expand Up @@ -1839,6 +1941,10 @@ public void read(org.apache.storm.thrift.protocol.TProtocol prot, WorkerSummary
struct.assigned_cpu = iprot.readDouble();
struct.set_assigned_cpu_isSet(true);
}
if (incoming.get(15)) {
struct.owner = iprot.readString();
struct.set_owner_isSet(true);
}
}
}

Expand Down
14 changes: 13 additions & 1 deletion storm-client/src/py/storm/ttypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4555,11 +4555,12 @@ class WorkerSummary(object):
- assigned_memonheap
- assigned_memoffheap
- assigned_cpu
- owner
"""


def __init__(self, supervisor_id=None, host=None, port=None, topology_id=None, topology_name=None, num_executors=None, component_to_num_tasks=None, time_secs=None, uptime_secs=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None,):
def __init__(self, supervisor_id=None, host=None, port=None, topology_id=None, topology_name=None, num_executors=None, component_to_num_tasks=None, time_secs=None, uptime_secs=None, requested_memonheap=None, requested_memoffheap=None, requested_cpu=None, assigned_memonheap=None, assigned_memoffheap=None, assigned_cpu=None, owner=None,):
self.supervisor_id = supervisor_id
self.host = host
self.port = port
Expand All @@ -4575,6 +4576,7 @@ def __init__(self, supervisor_id=None, host=None, port=None, topology_id=None, t
self.assigned_memonheap = assigned_memonheap
self.assigned_memoffheap = assigned_memoffheap
self.assigned_cpu = assigned_cpu
self.owner = owner

def read(self, iprot):
if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
Expand Down Expand Up @@ -4666,6 +4668,11 @@ def read(self, iprot):
self.assigned_cpu = iprot.readDouble()
else:
iprot.skip(ftype)
elif fid == 527:
if ftype == TType.STRING:
self.owner = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString()
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
Expand Down Expand Up @@ -4740,6 +4747,10 @@ def write(self, oprot):
oprot.writeFieldBegin('assigned_cpu', TType.DOUBLE, 526)
oprot.writeDouble(self.assigned_cpu)
oprot.writeFieldEnd()
if self.owner is not None:
oprot.writeFieldBegin('owner', TType.STRING, 527)
oprot.writeString(self.owner.encode('utf-8') if sys.version_info[0] == 2 else self.owner)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()

Expand Down Expand Up @@ -12094,6 +12105,7 @@ def __ne__(self, other):
(524, TType.DOUBLE, 'assigned_memonheap', None, None, ), # 524
(525, TType.DOUBLE, 'assigned_memoffheap', None, None, ), # 525
(526, TType.DOUBLE, 'assigned_cpu', None, None, ), # 526
(527, TType.STRING, 'owner', 'UTF8', None, ), # 527
)
all_structs.append(SupervisorPageInfo)
SupervisorPageInfo.thrift_spec = (
Expand Down
1 change: 1 addition & 0 deletions storm-client/src/storm.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ struct WorkerSummary {
524: optional double assigned_memonheap;
525: optional double assigned_memoffheap;
526: optional double assigned_cpu;
527: optional string owner;
}

struct SupervisorPageInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private List<WorkerSummary> checkWorkerStats(boolean includeSys, boolean userAut
List<WorkerSummary> summaries =
StatsUtil.aggWorkerStats("my-storm-id", "my-storm-name",
task2Component, beats, exec2NodePort, nodeHost, worker2Resources,
includeSys, userAuthorized, filterSupervisor);
includeSys, userAuthorized, filterSupervisor, null);
for (WorkerSummary ws : summaries) {
String host = ws.get_host();
int port = ws.get_port();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4031,6 +4031,7 @@ public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolea
if (base == null) {
throw new WrappedNotAliveException(topoId);
}
String owner = base.get_owner();
Map<WorkerSlot, WorkerResources> workerToResources = getWorkerResourcesForTopology(topoId);
List<WorkerSummary> workerSummaries = null;
Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
Expand All @@ -4051,7 +4052,9 @@ public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolea
nodeToHost,
workerToResources,
includeSys,
true); //this is the topology page, so we know the user is authorized
true, //this is the topology page, so we know the user is authorized
null,
owner);
}

TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats(topoId,
Expand Down Expand Up @@ -4281,9 +4284,10 @@ public SupervisorPageInfo getSupervisorPageInfo(String superId, String host, boo
}
Map<WorkerSlot, WorkerResources> workerResources = getWorkerResourcesForTopology(topoId);
boolean isAllowed = userTopologies.contains(topoId);
String owner = (common.base == null) ? null : common.base.get_owner();
for (WorkerSummary workerSummary : StatsUtil.aggWorkerStats(topoId, topoName, taskToComp, beats,
exec2NodePort, nodeToHost, workerResources, includeSys,
isAllowed, sid)) {
isAllowed, sid, owner)) {
pageInfo.add_to_worker_summaries(workerSummary);
}
}
Expand Down
Loading

0 comments on commit 8d4f0c6

Please sign in to comment.