Skip to content

Commit

Permalink
GH-1089: Support of ObserveComposite for timestamped data on server side
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Bernard <[email protected]>
  • Loading branch information
JaroslawLegierski and sbernard31 committed Jan 15, 2024
1 parent 4689260 commit b303aa4
Show file tree
Hide file tree
Showing 18 changed files with 634 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,13 @@ public void visit(ReadCompositeRequest request) {

@Override
public void visit(ObserveCompositeRequest request) {
response = new ObserveCompositeResponse(code, null, errorMessage, null, null);
response = new ObserveCompositeResponse(code, null, null, null, errorMessage, null);
}

@Override
public void visit(CancelCompositeObservationRequest request) {
// TODO TL : we should check if this is really handle
response = new CancelCompositeObservationResponse(code, null, errorMessage, null, null);
response = new CancelCompositeObservationResponse(code, null, null, null, errorMessage, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,55 @@ public static Builder builder() {
return new Builder();
}

public static Builder builder(List<LwM2mPath> paths) {
return new Builder(paths);
}

public static class Builder {

/**
* Create a Builder to create timestamped nodes for given paths.
* <p>
* If there is not exactly one entry for each path by timestamp for each given <code>paths</code>, then an
* {@link IllegalArgumentException} will be raised on {@link #build()}.
* <p>
* e.g. if you provide "/1/0/1" and "/3/0/15" as path
*
* <pre>
* {@code
* // valid
* t1 => {
* "/1/0/1" => LwM2mResource
* "/3/0/15" => LwM2mResource
* },
* // valid
* t2 => {
* "/1/0/1" => LwM2mResource
* "/3/0/15" => null
* },
* // invalid : 3/0/15 is missing
* t3 => {
* "/1/0/1" => LwM2mResource
* },
* // invalid : 3/0/1 should not be here
* t4 => {
* "/1/0/1" => LwM2mResource
* "/3/0/1" => LwM2mResource
* "/3/0/15" => LwM2mResource
* }
* }
* </pre>
*
* @param paths list of allowed {@link LwM2mPath}
*/
public Builder(List<LwM2mPath> paths) {
this.paths = paths;
}

public Builder() {
this.paths = null;
}

private static class InternalNode {
Instant timestamp;
LwM2mPath path;
Expand All @@ -124,6 +171,7 @@ public InternalNode(Instant timestamp, LwM2mPath path, LwM2mNode node) {

private final List<InternalNode> nodes = new ArrayList<>();
private boolean noDuplicate = true;
private final List<LwM2mPath> paths;

public Builder raiseExceptionOnDuplicate(boolean raiseException) {
noDuplicate = raiseException;
Expand Down Expand Up @@ -180,6 +228,13 @@ public TimestampedLwM2mNodes build() throws IllegalArgumentException {
}
}

// validate path is included in expected paths
if (paths != null && !paths.contains(internalNode.path)) {
throw new IllegalArgumentException(String.format(
"Unable to create TimestampedLwM2mNodes : Unexpected path %s, only %s are expected.",
internalNode.path, paths));
}

// add to the map
Map<LwM2mPath, LwM2mNode> pathToNode = timestampToPathToNode.get(internalNode.timestamp);
if (pathToNode == null) {
Expand All @@ -188,13 +243,25 @@ public TimestampedLwM2mNodes build() throws IllegalArgumentException {
pathToNode.put(internalNode.path, internalNode.node);
} else {
LwM2mNode previous = pathToNode.put(internalNode.path, internalNode.node);
// check for duplicate
if (noDuplicate && previous != null) {
throw new IllegalArgumentException(String.format(
"Unable to create TimestampedLwM2mNodes : duplicate value for path %s. (%s, %s)",
internalNode.path, internalNode.node, previous));
}
}
}

// When paths is provided, validate there is not missing path
if (paths != null) {
timestampToPathToNode.forEach((timestamp, pathToNodes) -> {
if (!pathToNodes.keySet().containsAll(paths)) {
throw new IllegalArgumentException(String.format(
"Unable to create TimestampedLwM2mNodes : Some path are missing for timestamp %s, expected %s but get %s.",
timestamp, paths, pathToNodes.keySet()));
}
});
}
return new TimestampedLwM2mNodes(timestampToPathToNode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ public List<TimestampedLwM2mNode> decodeTimestampedData(byte[] content, ContentF
}

@Override
public TimestampedLwM2mNodes decodeTimestampedNodes(byte[] content, ContentFormat format, LwM2mModel model)
throws CodecException {
public TimestampedLwM2mNodes decodeTimestampedNodes(byte[] content, ContentFormat format, List<LwM2mPath> paths,
LwM2mModel model) throws CodecException {
LOG.trace("Decoding value for format {}: {}", format, content);

if (format == null) {
Expand All @@ -221,10 +221,10 @@ public TimestampedLwM2mNodes decodeTimestampedNodes(byte[] content, ContentForma
}

if (decoder instanceof TimestampedMultiNodeDecoder) {
return ((TimestampedMultiNodeDecoder) decoder).decodeTimestampedNodes(content, model);
return ((TimestampedMultiNodeDecoder) decoder).decodeTimestampedNodes(content, paths, model);
} else if (decoder instanceof MultiNodeDecoder) {
return new TimestampedLwM2mNodes.Builder()
.addNodes(((MultiNodeDecoder) decoder).decodeNodes(content, null, model)).build();
return new TimestampedLwM2mNodes.Builder(paths)
.addNodes(((MultiNodeDecoder) decoder).decodeNodes(content, paths, model)).build();
} else {
throw new CodecException(
"Decoder does not support multiple nodes decoding for this content format %s [%s] ", format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,14 @@ List<TimestampedLwM2mNode> decodeTimestampedData(byte[] content, ContentFormat f
* @param content the content
* @param format the content format
* @param model the collection of supported object models
* @param paths the list of path of node to build. The list of path can be <code>null</code> meaning that we don't
* know which kind of {@link LwM2mNode} is encoded. In this case, let's assume this is a list of
* {@link LwM2mSingleResource} or {@link LwM2mResourceInstance}.
* @return the decoded timestamped nodes represented by {@link TimestampedLwM2mNodes}
* @throws CodecException if content is malformed.
*/
TimestampedLwM2mNodes decodeTimestampedNodes(byte[] content, ContentFormat format, LwM2mModel model)
throws CodecException;
TimestampedLwM2mNodes decodeTimestampedNodes(byte[] content, ContentFormat format, List<LwM2mPath> paths,
LwM2mModel model) throws CodecException;

/**
* Deserializes a binary content into a list of {@link LwM2mPath}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
*******************************************************************************/
package org.eclipse.leshan.core.node.codec;

import java.util.List;

import org.eclipse.leshan.core.model.LwM2mModel;
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mResourceInstance;
import org.eclipse.leshan.core.node.LwM2mSingleResource;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;

/**
Expand All @@ -29,10 +35,14 @@ public interface TimestampedMultiNodeDecoder {
* <p>
*
* @param content the content
* @param paths the list of path of node to build. The list of path can be <code>null</code> meaning that we don't
* know which kind of {@link LwM2mNode} is encoded. In this case, let's assume this is a list of
* {@link LwM2mSingleResource} or {@link LwM2mResourceInstance}.
* @param model the collection of supported object models
* @return the decoded timestamped nodes represented by {@link TimestampedLwM2mNodes}
* @throws CodecException if content is malformed.
*/
TimestampedLwM2mNodes decodeTimestampedNodes(byte[] content, LwM2mModel model) throws CodecException;
TimestampedLwM2mNodes decodeTimestampedNodes(byte[] content, List<LwM2mPath> paths, LwM2mModel model)
throws CodecException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public Map<LwM2mPath, LwM2mNode> decodeNodes(byte[] content, List<LwM2mPath> pat

Map<LwM2mPath, LwM2mNode> nodes = new HashMap<>();
if (paths != null) {
// Resolve records & Group it by time-stamp
// Resolve records & Group it by path
Map<LwM2mPath, Collection<LwM2mResolvedSenMLRecord>> recordsByPath = groupByPath(pack.getRecords(),
paths);

Expand Down Expand Up @@ -193,23 +193,59 @@ public List<TimestampedLwM2mNode> decodeTimestampedData(byte[] content, LwM2mPat
}

@Override
public TimestampedLwM2mNodes decodeTimestampedNodes(byte[] content, LwM2mModel model) throws CodecException {
public TimestampedLwM2mNodes decodeTimestampedNodes(byte[] content, List<LwM2mPath> paths, LwM2mModel model)
throws CodecException {
try {
// Decode SenML pack
SenMLPack pack = decoder.fromSenML(content);

TimestampedLwM2mNodes.Builder nodes = TimestampedLwM2mNodes.builder();
TimestampedLwM2mNodes.Builder nodes;

LwM2mSenMLResolver resolver = new LwM2mSenMLResolver();
for (SenMLRecord record : pack.getRecords()) {
LwM2mResolvedSenMLRecord resolvedRecord = resolver.resolve(record);
LwM2mPath path = resolvedRecord.getPath();
LwM2mNode node = parseRecords(Arrays.asList(resolvedRecord), path, model,
DefaultLwM2mDecoder.nodeClassFromPath(path));
nodes.put(TimestampUtil.fromSeconds(resolvedRecord.getTimeStamp()), path, node);
if (paths != null && !paths.isEmpty()) {
nodes = TimestampedLwM2mNodes.builder(paths);

// Group by time-stamp
SortedMap<BigDecimal, Collection<LwM2mResolvedSenMLRecord>> recordsByTimestamp = groupRecordByTimestamp(
pack.getRecords(), null);

// For each time-stamp
for (Entry<BigDecimal, Collection<LwM2mResolvedSenMLRecord>> entryByTimestamp : recordsByTimestamp
.entrySet()) {
// Group records by path
Map<LwM2mPath, Collection<LwM2mResolvedSenMLRecord>> recordsByPath = groupResolvedRecordByPath(
entryByTimestamp.getValue(), paths);

for (LwM2mPath path : paths) {
Collection<LwM2mResolvedSenMLRecord> records = recordsByPath.get(path);
if (records.isEmpty()) {
// Node can be null as the LWM2M specification says that "Read-Composite operation is
// treated as non-atomic and handled as best effort by the client. That is, if any of the
// requested
// resources do not have a valid value to return, they will not be included in the
// response".
// Meaning that a given path could have no corresponding value.
nodes.put(TimestampUtil.fromSeconds(entryByTimestamp.getKey()), path, null);
} else {
LwM2mNode node = parseRecords(records, path, model,
DefaultLwM2mDecoder.nodeClassFromPath(path));
nodes.put(TimestampUtil.fromSeconds(entryByTimestamp.getKey()), path, node);
}
}
}
} else {
nodes = TimestampedLwM2mNodes.builder();
LwM2mSenMLResolver resolver = new LwM2mSenMLResolver();
for (SenMLRecord record : pack.getRecords()) {
LwM2mResolvedSenMLRecord resolvedRecord = resolver.resolve(record);
LwM2mPath path = resolvedRecord.getPath();
LwM2mNode node = parseRecords(Arrays.asList(resolvedRecord), path, model,
DefaultLwM2mDecoder.nodeClassFromPath(path));
nodes.put(TimestampUtil.fromSeconds(resolvedRecord.getTimeStamp()), path, node);
}
}

return nodes.build();

} catch (SenMLException | IllegalArgumentException e) {
String hexValue = content != null ? Hex.encodeHexString(content) : "";
throw new CodecException(e, "Unable to decode nodes : %s", hexValue, e);
Expand Down Expand Up @@ -362,6 +398,33 @@ private Map<LwM2mPath, Collection<LwM2mResolvedSenMLRecord>> groupByPath(List<Se
return result;
}

/**
* Group Resolved Record by LwM2mPath
*/
private Map<LwM2mPath, Collection<LwM2mResolvedSenMLRecord>> groupResolvedRecordByPath(
Collection<LwM2mResolvedSenMLRecord> records, List<LwM2mPath> paths) throws SenMLException {

// Prepare map result
Map<LwM2mPath, Collection<LwM2mResolvedSenMLRecord>> result = new HashMap<>(paths.size());
for (LwM2mPath path : paths) {
result.put(path, new ArrayList<LwM2mResolvedSenMLRecord>());
}

// Add it to the map
for (LwM2mResolvedSenMLRecord resolvedRecord : records) {

// Find the corresponding path for this record.
LwM2mPath selectedPath = selectPath(resolvedRecord.getPath(), paths);
if (selectedPath == null) {
throw new CodecException("Invalid path [%s] for resource, it should start by one of %s",
resolvedRecord.getPath(), paths);
}

result.get(selectedPath).add(resolvedRecord);
}
return result;
}

/**
* Search in the list <code>paths<code> which one is a "start" for the given path.
* <p>
Expand All @@ -379,6 +442,8 @@ private LwM2mPath selectPath(LwM2mPath recordPath, List<LwM2mPath> paths) {
/**
* Resolved record then group it by time-stamp
*
* @param records list of records to group
* @param requestPath If not <code>null</code> then all record should belong to this path
* @return a sorted map (timestamp => collection of record) order by descending time-stamp (most recent one at first
* place). If null time-stamp (meaning no time information) exists it always at first place.
*/
Expand Down Expand Up @@ -409,7 +474,7 @@ public int compare(BigDecimal o1, BigDecimal o2) {
"Invalid path [%s] for resource, it should be a resource or a resource instance path",
resolvedRecord.getName());
}
if (!resolvedRecord.getPath().startWith(requestPath)) {
if (requestPath != null && !resolvedRecord.getPath().startWith(requestPath)) {
throw new CodecException("Invalid path [%s] for resource, it should start by %s",
resolvedRecord.getName(), requestPath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,27 @@
import org.eclipse.leshan.core.ResponseCode;
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;
import org.eclipse.leshan.core.observation.CompositeObservation;

public class CancelCompositeObservationResponse extends ObserveCompositeResponse {

public CancelCompositeObservationResponse(ResponseCode code, Map<LwM2mPath, LwM2mNode> content, String errorMessage,
Object coapResponse, CompositeObservation observation) {
super(code, content, errorMessage, coapResponse, observation);
public CancelCompositeObservationResponse(ResponseCode code, Map<LwM2mPath, LwM2mNode> content,
TimestampedLwM2mNodes timestampedValues, CompositeObservation observation, String errorMessage,
Object coapResponse) {
super(code, content, null, observation, errorMessage, coapResponse);
}

@Override
public String toString() {
if (errorMessage != null) {
return String.format("CancelCompositeObservationResponse [code=%s, errormessage=%s]", code, errorMessage);
} else {
return String.format("CancelCompositeObservationResponse [code=%s, content=%s, observation=%s]", code,
content, observation);
}
if (errorMessage != null)
return String.format("CancelCompositeObservationResponse [code=%s, errorMessage=%s]", code, errorMessage);
else if (timestampedValues != null)
return String.format(
"CancelCompositeObservationResponse [code=%s, content=%s, observation=%s, timestampedValues= %d values]",
code, content, observation, timestampedValues.getTimestamps().size());
else
return String.format("CancelCompositeObservationResponse [code=%s, content=%s, observation=%s]", code,
observation, content);
}
}
Loading

0 comments on commit b303aa4

Please sign in to comment.