Skip to content

Commit

Permalink
extract interface SessionContextStore
Browse files Browse the repository at this point in the history
reason: to enable multiple implementations
  • Loading branch information
goekay committed Feb 8, 2025
1 parent 887b55c commit 4779754
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public abstract class AbstractWebSocketEndpoint extends ConcurrentWebSocketHandl

public static final String CHARGEBOX_ID_KEY = "CHARGEBOX_ID_KEY";

private final SessionContextStore sessionContextStore = new SessionContextStore();
private final SessionContextStore sessionContextStore = new SessionContextStoreImpl();
private final List<Consumer<String>> connectedCallbackList = new ArrayList<>();
private final List<Consumer<String>> disconnectedCallbackList = new ArrayList<>();
private final Object sessionContextLock = new Object();
Expand Down
122 changes: 9 additions & 113 deletions src/main/java/de/rwth/idsg/steve/ocpp/ws/SessionContextStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,135 +18,31 @@
*/
package de.rwth.idsg.steve.ocpp.ws;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Striped;
import de.rwth.idsg.steve.SteveException;
import de.rwth.idsg.steve.ocpp.ws.custom.WsSessionSelectStrategy;
import de.rwth.idsg.steve.ocpp.ws.data.SessionContext;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.web.socket.WebSocketSession;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;

import static de.rwth.idsg.steve.SteveConfiguration.CONFIG;

/**
* @author Sevket Goekay <[email protected]>
* @since 17.03.2015
* @since 08.02.2025
*/
@Slf4j
public class SessionContextStore {

/**
* Key (String) = chargeBoxId
* Value (Deque<SessionContext>) = WebSocket session contexts
*/
private final ConcurrentHashMap<String, Deque<SessionContext>> lookupTable = new ConcurrentHashMap<>();

private final Striped<Lock> locks = Striped.lock(16);

private final WsSessionSelectStrategy wsSessionSelectStrategy = CONFIG.getOcpp().getWsSessionSelectStrategy();

public void add(String chargeBoxId, WebSocketSession session, ScheduledFuture pingSchedule) {
Lock l = locks.get(chargeBoxId);
l.lock();
try {
SessionContext context = new SessionContext(session, pingSchedule, DateTime.now());

Deque<SessionContext> endpointDeque = lookupTable.computeIfAbsent(chargeBoxId, str -> new ArrayDeque<>());
endpointDeque.addLast(context); // Adding at the end

log.debug("A new SessionContext is stored for chargeBoxId '{}'. Store size: {}",
chargeBoxId, endpointDeque.size());
} finally {
l.unlock();
}
}

public void remove(String chargeBoxId, WebSocketSession session) {
Lock l = locks.get(chargeBoxId);
l.lock();
try {
Deque<SessionContext> endpointDeque = lookupTable.get(chargeBoxId);
if (endpointDeque == null) {
log.debug("No session context to remove for chargeBoxId '{}'", chargeBoxId);
return;
}
public interface SessionContextStore {

// Prevent "java.util.ConcurrentModificationException: null"
// Reason: Cannot modify the set (remove the item) we are iterating
// Solution: Iterate the set, find the item, remove the item after the for-loop
//
SessionContext toRemove = null;
for (SessionContext context : endpointDeque) {
if (context.getSession().getId().equals(session.getId())) {
toRemove = context;
break;
}
}
void add(String chargeBoxId, WebSocketSession session, ScheduledFuture pingSchedule);

if (toRemove != null) {
// 1. Cancel the ping task
toRemove.getPingSchedule().cancel(true);
// 2. Delete from collection
if (endpointDeque.remove(toRemove)) {
log.debug("A SessionContext is removed for chargeBoxId '{}'. Store size: {}",
chargeBoxId, endpointDeque.size());
}
// 3. Delete empty collection from lookup table in order to correctly calculate
// the number of connected chargeboxes with getNumberOfChargeBoxes()
if (endpointDeque.size() == 0) {
lookupTable.remove(chargeBoxId);
}
}
} finally {
l.unlock();
}
}
void remove(String chargeBoxId, WebSocketSession session);

public WebSocketSession getSession(String chargeBoxId) {
Lock l = locks.get(chargeBoxId);
l.lock();
try {
Deque<SessionContext> endpointDeque = lookupTable.get(chargeBoxId);
if (endpointDeque == null) {
throw new NoSuchElementException();
}
return wsSessionSelectStrategy.getSession(endpointDeque);
} catch (NoSuchElementException e) {
throw new SteveException("No session context for chargeBoxId '%s'", chargeBoxId, e);
} finally {
l.unlock();
}
}
WebSocketSession getSession(String chargeBoxId);

public int getSize(String chargeBoxId) {
Deque<SessionContext> endpointDeque = lookupTable.get(chargeBoxId);
if (endpointDeque == null) {
return 0;
} else {
return endpointDeque.size();
}
}
int getSize(String chargeBoxId);

public int getNumberOfChargeBoxes() {
return lookupTable.size();
}
int getNumberOfChargeBoxes();

public List<String> getChargeBoxIdList() {
return Collections.list(lookupTable.keys());
}
List<String> getChargeBoxIdList();

public Map<String, Deque<SessionContext>> getACopy() {
return ImmutableMap.copyOf(lookupTable);
}
Map<String, Deque<SessionContext>> getACopy();
}
159 changes: 159 additions & 0 deletions src/main/java/de/rwth/idsg/steve/ocpp/ws/SessionContextStoreImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* SteVe - SteckdosenVerwaltung - https://github.com/steve-community/steve
* Copyright (C) 2013-2025 SteVe Community Team
* All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package de.rwth.idsg.steve.ocpp.ws;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Striped;
import de.rwth.idsg.steve.SteveException;
import de.rwth.idsg.steve.ocpp.ws.custom.WsSessionSelectStrategy;
import de.rwth.idsg.steve.ocpp.ws.data.SessionContext;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.web.socket.WebSocketSession;

import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;

import static de.rwth.idsg.steve.SteveConfiguration.CONFIG;

/**
* @author Sevket Goekay <[email protected]>
* @since 17.03.2015
*/
@Slf4j
public class SessionContextStoreImpl implements SessionContextStore {

/**
* Key (String) = chargeBoxId
* Value (Deque<SessionContext>) = WebSocket session contexts
*/
private final ConcurrentHashMap<String, Deque<SessionContext>> lookupTable = new ConcurrentHashMap<>();

private final Striped<Lock> locks = Striped.lock(16);

private final WsSessionSelectStrategy wsSessionSelectStrategy = CONFIG.getOcpp().getWsSessionSelectStrategy();

@Override
public void add(String chargeBoxId, WebSocketSession session, ScheduledFuture pingSchedule) {
Lock l = locks.get(chargeBoxId);
l.lock();
try {
SessionContext context = new SessionContext(session, pingSchedule, DateTime.now());

Deque<SessionContext> endpointDeque = lookupTable.computeIfAbsent(chargeBoxId, str -> new ArrayDeque<>());
endpointDeque.addLast(context); // Adding at the end

log.debug("A new SessionContext is stored for chargeBoxId '{}'. Store size: {}",
chargeBoxId, endpointDeque.size());
} finally {
l.unlock();
}
}

@Override
public void remove(String chargeBoxId, WebSocketSession session) {
Lock l = locks.get(chargeBoxId);
l.lock();
try {
Deque<SessionContext> endpointDeque = lookupTable.get(chargeBoxId);
if (endpointDeque == null) {
log.debug("No session context to remove for chargeBoxId '{}'", chargeBoxId);
return;
}

// Prevent "java.util.ConcurrentModificationException: null"
// Reason: Cannot modify the set (remove the item) we are iterating
// Solution: Iterate the set, find the item, remove the item after the for-loop
//
SessionContext toRemove = null;
for (SessionContext context : endpointDeque) {
if (context.getSession().getId().equals(session.getId())) {
toRemove = context;
break;
}
}

if (toRemove != null) {
// 1. Cancel the ping task
toRemove.getPingSchedule().cancel(true);
// 2. Delete from collection
if (endpointDeque.remove(toRemove)) {
log.debug("A SessionContext is removed for chargeBoxId '{}'. Store size: {}",
chargeBoxId, endpointDeque.size());
}
// 3. Delete empty collection from lookup table in order to correctly calculate
// the number of connected chargeboxes with getNumberOfChargeBoxes()
if (endpointDeque.size() == 0) {
lookupTable.remove(chargeBoxId);
}
}
} finally {
l.unlock();
}
}

@Override
public WebSocketSession getSession(String chargeBoxId) {
Lock l = locks.get(chargeBoxId);
l.lock();
try {
Deque<SessionContext> endpointDeque = lookupTable.get(chargeBoxId);
if (endpointDeque == null) {
throw new NoSuchElementException();
}
return wsSessionSelectStrategy.getSession(endpointDeque);
} catch (NoSuchElementException e) {
throw new SteveException("No session context for chargeBoxId '%s'", chargeBoxId, e);
} finally {
l.unlock();
}
}

@Override
public int getSize(String chargeBoxId) {
Deque<SessionContext> endpointDeque = lookupTable.get(chargeBoxId);
if (endpointDeque == null) {
return 0;
} else {
return endpointDeque.size();
}
}

@Override
public int getNumberOfChargeBoxes() {
return lookupTable.size();
}

@Override
public List<String> getChargeBoxIdList() {
return Collections.list(lookupTable.keys());
}

@Override
public Map<String, Deque<SessionContext>> getACopy() {
return ImmutableMap.copyOf(lookupTable);
}
}

0 comments on commit 4779754

Please sign in to comment.