Commit b6a79ec5 authored by Victor Lindquist's avatar Victor Lindquist
Browse files

Refactored NetworkEngine and PointDataMessageHandler into Data-Oriented Design...

Refactored NetworkEngine and PointDataMessageHandler into Data-Oriented Design to simplify development
parent 7b2aafcc
No related merge requests found
Showing with 672 additions and 388 deletions
+672 -388
......@@ -26,9 +26,9 @@ include(${OPENSPACE_CMAKE_EXT_DIR}/module_definition.cmake)
set(HEADER_FILES
${CMAKE_CURRENT_SOURCE_DIR}/softwareintegrationmodule.h
${CMAKE_CURRENT_SOURCE_DIR}/pointdatamessagehandler.h
${CMAKE_CURRENT_SOURCE_DIR}/messagehandler.h
${CMAKE_CURRENT_SOURCE_DIR}/network/network.h
${CMAKE_CURRENT_SOURCE_DIR}/network/softwareconnection.h
${CMAKE_CURRENT_SOURCE_DIR}/network/networkengine.h
${CMAKE_CURRENT_SOURCE_DIR}/rendering/renderablepointscloud.h
${CMAKE_CURRENT_SOURCE_DIR}/utils.h
${CMAKE_CURRENT_SOURCE_DIR}/syncablefloatdatastorage.h
......@@ -41,9 +41,9 @@ source_group("Header Files" FILES ${HEADER_FILES})
set(SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/softwareintegrationmodule.cpp
${CMAKE_CURRENT_SOURCE_DIR}/softwareintegrationmodule_lua.inl
${CMAKE_CURRENT_SOURCE_DIR}/pointdatamessagehandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/messagehandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/network/network.cpp
${CMAKE_CURRENT_SOURCE_DIR}/network/softwareconnection.cpp
${CMAKE_CURRENT_SOURCE_DIR}/network/networkengine.cpp
${CMAKE_CURRENT_SOURCE_DIR}/rendering/renderablepointscloud.cpp
${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
${CMAKE_CURRENT_SOURCE_DIR}/syncablefloatdatastorage.cpp
......
......@@ -2,7 +2,7 @@
* *
* OpenSpace *
* *
* Copyright (c) 2014-2021 *
* Copyright (c) 2014-2022 *
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy of this *
* software and associated documentation files (the "Software"), to deal in the Software *
......@@ -22,8 +22,8 @@
* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *
****************************************************************************************/
#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___POINTDATAMESSAGEHANDLER___H__
#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___POINTDATAMESSAGEHANDLER___H__
#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___MESSAGEHANDLER___H__
#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___MESSAGEHANDLER___H__
#include <unordered_map>
......@@ -31,76 +31,20 @@
#include <modules/softwareintegration/network/softwareconnection.h>
namespace openspace {
namespace openspace::softwareintegration::network {
class Renderable;
class PointDataMessageHandler {
struct Callback {
std::function<void()> function;
std::vector<softwareintegration::storage::Key> waitForData = {};
std::string description = "???"; // To help debugging. Maybe remove?
};
using CallbackList = std::vector<Callback>;
using CallbackMap = std::unordered_map<std::string, CallbackList>;
public:
void handlePointDataMessage(const std::vector<char>& message, std::shared_ptr<SoftwareConnection> connection);
void handleVelocityDataMessage(const std::vector<char>& message, std::shared_ptr<SoftwareConnection> connection);
void handleFixedColorMessage(const std::vector<char>& message, std::shared_ptr<SoftwareConnection> connection);
void handleColormapMessage(const std::vector<char>& message, std::shared_ptr<SoftwareConnection> connection);
void handleAttributeDataMessage(const std::vector<char>& message, std::shared_ptr<SoftwareConnection> connection);
void handleOpacityMessage(const std::vector<char>& message, std::shared_ptr<SoftwareConnection> connection);
void handleFixedPointSizeMessage(const std::vector<char>& message, std::shared_ptr<SoftwareConnection> connection);
void handleLinearPointSizeMessage(const std::vector<char>& message, std::shared_ptr<SoftwareConnection> connection);
void handleVisibilityMessage(const std::vector<char>& message, std::shared_ptr<SoftwareConnection> connection);
void handleRemoveSGNMessage(const std::vector<char>& message, std::shared_ptr<SoftwareConnection> connection);
void postSync();
private:
const Renderable* getRenderable(const std::string& identifier);
void checkRenderable(
const std::vector<char>& message, size_t& messageOffset,
std::shared_ptr<SoftwareConnection> connection, std::string& identifier
);
void addCallback(
const std::string& identifier,
const Callback& newCallback
);
CallbackMap _onceNodeExistsCallbacks;
std::mutex _onceNodeExistsCallbacksMutex;
size_t _onceNodeExistsCallbacksRetries{0};
struct Callback {
std::function<void()> function;
std::vector<softwareintegration::storage::Key> waitForData = {};
std::string description = "???"; // To help debugging. Maybe remove?
};
using CallbackList = std::vector<Callback>;
using CallbackMap = std::unordered_map<std::string, CallbackList>;
void onFixedColorChange(
properties::Property* property,
const std::string& identifier,
std::shared_ptr<SoftwareConnection> connection
);
void onOpacityChange(
properties::Property* property,
const std::string& identifier,
std::shared_ptr<SoftwareConnection> connection
);
void onFixedPointSizeChange(
properties::Property* property,
const std::string& identifier,
std::shared_ptr<SoftwareConnection> connection
);
void onVisibilityChange(
properties::Property* property,
const std::string& identifier,
std::shared_ptr<SoftwareConnection> connection
);
void postSyncCallbacks();
void convertToMeterPerSecond(
softwareintegration::simp::LengthUnit currLengthUnit,
std::vector<float>& data
);
};
void handleMessage(IncomingMessage& incomingMessage);
} // namespace openspace
} // namespace openspace::softwareintegration::messagehandler
#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___POINTDATAMESSAGEHANDLER___H__
#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___MESSAGEHANDLER___H__
......@@ -22,8 +22,10 @@
* OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *
****************************************************************************************/
#include <modules/softwareintegration/network/networkengine.h>
#include <modules/softwareintegration/network/network.h>
#include <modules/softwareintegration/messagehandler.h>
#include <modules/softwareintegration/utils.h>
#include <openspace/engine/globals.h>
#include <openspace/engine/globalscallbacks.h>
#include <openspace/scene/scene.h>
......@@ -32,214 +34,99 @@
namespace {
constexpr const char* _loggerCat = "NetworkEngine";
} // namespace
namespace openspace {
using namespace softwareintegration;
NetworkEngine::NetworkEngine(const int port)
: _port{port}
{}
NetworkEngine::~NetworkEngine() {
stop();
}
void NetworkEngine::start() {
_socketServer.listen(_port);
constexpr const char* _loggerCat = "NetworkEngine";
_serverThread = std::thread([this]() { handleNewSoftwareConnections(); });
_eventLoopThread = std::thread([this]() { eventLoop(); });
}
} // namespace
void NetworkEngine::stop() {
_shouldStopServerThread = true;
namespace openspace::softwareintegration::network {
namespace {
{
std::lock_guard guardSoftwareConnections(_softwareConnectionsMutex);
for (auto& [id, connectionPtr] : _softwareConnections) {
SoftwareConnection::NetworkEngineFriends::stopThread(connectionPtr);
void eventLoop(std::weak_ptr<NetworkState> networkStateWeakPtr) {
while (!networkStateWeakPtr.expired()) {
auto networkState = networkStateWeakPtr.lock();
if (networkState->shouldStopThreads) break;
// The call to "pop" below will block execution
// on this thread until interrupt is called
try {
auto pm = networkState->incomingMessages.pop();
handleMessage(pm);
}
catch (const ghoul::RuntimeError&) {
break;
}
}
_incomingMessages.interrupt();
_shouldStopEventThread = true;
_socketServer.close();
_softwareConnections.clear();
if (_serverThread.joinable()) {
_serverThread.join();
}
if (_eventLoopThread.joinable()) {
_eventLoopThread.join();
}
}
void NetworkEngine::postSync() {
_pointDataMessageHandler.postSync();
}
void NetworkEngine::handleNewSoftwareConnections() {
while (!_shouldStopServerThread) {
std::unique_ptr<ghoul::io::TcpSocket> socket = _socketServer.awaitPendingTcpSocket();
void serverLoop(std::weak_ptr<NetworkState> networkStateWeakPtr) {
while (!networkStateWeakPtr.expired()) {
auto networkState = networkStateWeakPtr.lock();
if (networkState->shouldStopThreads) break;
std::unique_ptr<ghoul::io::TcpSocket> socket = networkState->server.awaitPendingTcpSocket();
if (!socket) return;
socket->startStreams();
auto p = std::make_shared<SoftwareConnection>(std::move(socket));
std::lock_guard guard(_softwareConnectionsMutex);
auto [it, peerInserted] = _softwareConnections.emplace(p->id(), p);
std::lock_guard guard(networkState->softwareConnectionsMutex);
auto [it, peerInserted] = networkState->softwareConnections.emplace(p->id(), std::move(p));
if (peerInserted) {
auto& connectionPtr = it->second;
auto connectionWeak = std::weak_ptr<SoftwareConnection>{ it->second };
auto thread = std::thread{
[this, &connectionPtr] {
peerEventLoop(connectionPtr->id());
[connectionWeak, networkStateWeakPtr] {
connection::eventLoop(connectionWeak, networkStateWeakPtr);
}
};
connectionPtr->setThread(thread);
it->second->setThread(thread);
}
}
}
void NetworkEngine::peerEventLoop(size_t connection_id) {
using namespace std::literals::chrono_literals;
auto connectionPtr = getSoftwareConnection(connection_id);
} // namespace
while (!connectionPtr->shouldStopThread()) {
try {
SoftwareConnection::Message m = connectionPtr->receiveMessageFromSoftware();
_incomingMessages.push({ connection_id, m });
}
catch (const SoftwareConnection::SoftwareConnectionLostError& err) {
if (connectionPtr->shouldStopThread()) break;
if (connectionPtr && (!connectionPtr->shouldStopThread() || !connectionPtr->isConnectedOrConnecting())) {
LDEBUG(fmt::format("Connection lost to {}: {}", connection_id, err.message));
_incomingMessages.push({
connection_id,
SoftwareConnection::Message{ simp::MessageType::InternalDisconnection }
});
}
break;
}
}
}
std::shared_ptr<NetworkState> serve(const int port) {
auto networkState = std::make_shared<NetworkState>();
void NetworkEngine::eventLoop() {
while (!_shouldStopEventThread) {
// The call to "pop" below will block execution
// on this thread until interrupt is called
try {
auto pm = _incomingMessages.pop();
handleIncomingMessage(pm);
}
catch (const ghoul::RuntimeError&) {
break;
}
}
}
// 4700, is the defualt port where the tcp socket will be opened to the ext. software
networkState->server.listen(port);
std::shared_ptr<SoftwareConnection> NetworkEngine::getSoftwareConnection(size_t id) {
std::lock_guard guard(_softwareConnectionsMutex);
auto it = _softwareConnections.find(id);
if (it == _softwareConnections.end()) return nullptr;
return it->second;
}
std::weak_ptr<NetworkState> networkStateWeakPtr = networkState;
networkState->serverThread = std::thread{ [networkStateWeakPtr] {
serverLoop(networkStateWeakPtr);
} };
void NetworkEngine::handleIncomingMessage(IncomingMessage incomingMessage) {
auto connectionPtr = getSoftwareConnection(incomingMessage.connection_id);
networkState->eventLoopThread = std::thread{ [networkStateWeakPtr] {
eventLoop(networkStateWeakPtr);
} };
if(!connectionPtr) {
LDEBUG(fmt::format("Trying to handle message from disconnected peer. Aborting."));
return;
}
return networkState;
};
const simp::MessageType messageType = incomingMessage.message.type;
std::vector<char>& message = incomingMessage.message.content;
void stopServer(std::shared_ptr<NetworkState> networkState) {
networkState->shouldStopThreads = true;
switch (messageType) {
case simp::MessageType::Connection: {
LDEBUG(fmt::format("Message recieved... Connection: {}", incomingMessage.connection_id));
size_t offset = 0;
const std::string software = simp::readString(message, offset);
networkState->incomingMessages.interrupt();
// Send back message to software to complete handshake
connectionPtr->sendMessage(simp::formatConnectionMessage(software));
LINFO(fmt::format("OpenSpace has connected with {} through socket", software));
break;
}
case simp::MessageType::PointData: {
LDEBUG("Message recieved... Point data");
_pointDataMessageHandler.handlePointDataMessage(message, connectionPtr);
break;
}
case simp::MessageType::VelocityData: {
LDEBUG("Message recieved... Velocity data");
_pointDataMessageHandler.handleVelocityDataMessage(message, connectionPtr);
break;
}
case simp::MessageType::RemoveSceneGraphNode: {
LDEBUG(fmt::format("Message recieved... Remove SGN"));
_pointDataMessageHandler.handleRemoveSGNMessage(message, connectionPtr);
break;
}
case simp::MessageType::Color: {
LDEBUG(fmt::format("Message recieved... Color"));
_pointDataMessageHandler.handleFixedColorMessage(message, connectionPtr);
break;
}
case simp::MessageType::Colormap: {
LDEBUG(fmt::format("Message recieved... Colormap"));
_pointDataMessageHandler.handleColormapMessage(message, connectionPtr);
break;
}
case simp::MessageType::AttributeData: {
LDEBUG(fmt::format("Message recieved... Attribute data"));
_pointDataMessageHandler.handleAttributeDataMessage(message, connectionPtr);
break;
}
case simp::MessageType::Opacity: {
LDEBUG(fmt::format("Message recieved... Opacity"));
_pointDataMessageHandler.handleOpacityMessage(message, connectionPtr);
break;
}
case simp::MessageType::FixedSize: {
LDEBUG(fmt::format("Message recieved... Size"));
_pointDataMessageHandler.handleFixedPointSizeMessage(message, connectionPtr);
break;
}
case simp::MessageType::LinearSize: {
LDEBUG(fmt::format("Message recieved... Linear size"));
_pointDataMessageHandler.handleLinearPointSizeMessage(message, connectionPtr);
break;
}
case simp::MessageType::Visibility: {
LDEBUG(fmt::format("Message recieved... Visibility"));
_pointDataMessageHandler.handleVisibilityMessage(message, connectionPtr);
break;
}
case simp::MessageType::InternalDisconnection: {
LDEBUG(fmt::format("Message recieved... Disconnection from software connection: {}", incomingMessage.connection_id));
std::lock_guard guard(_softwareConnectionsMutex);
SoftwareConnection::NetworkEngineFriends::stopThread(connectionPtr);
if (_softwareConnections.count(incomingMessage.connection_id)) {
_softwareConnections.erase(incomingMessage.connection_id);
}
break;
}
default: {
LERROR(fmt::format(
"Unsupported message type: {}", incomingMessage.message.rawMessageType
));
break;
}
networkState->server.close();
{
std::lock_guard guardSoftwareConnections(networkState->softwareConnectionsMutex);
networkState->softwareConnections.clear();
}
if (networkState->serverThread.joinable()) {
networkState->serverThread.join();
}
if (networkState->eventLoopThread.joinable()) {
networkState->eventLoopThread.join();
}
}
} // namespace openspace
SoftwareConnectionLostError::SoftwareConnectionLostError(const std::string& msg)
: ghoul::RuntimeError(fmt::format("{}{}", "Software connection lost", msg), "SoftwareConnection")
{}
} // namespace openspace::softwareintegration::network
......@@ -26,54 +26,45 @@
#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___NETWORKENGINE___H__
#include <modules/softwareintegration/network/softwareconnection.h>
#include <modules/softwareintegration/pointdatamessagehandler.h>
#include <modules/softwareintegration/interruptibleconcurrentqueue.h>
#include <modules/softwareintegration/utils.h>
#include <ghoul/io/socket/tcpsocketserver.h>
#include <modules/softwareintegration/interruptibleconcurrentqueue.h>
namespace openspace {
#include <functional>
#include <unordered_map>
class NetworkEngine {
namespace openspace::softwareintegration::network {
class SoftwareConnectionLostError : public ghoul::RuntimeError {
public:
NetworkEngine(const int port = 4700);
~NetworkEngine();
explicit SoftwareConnectionLostError(const std::string& msg);
};
struct IncomingMessage {
size_t connection_id;
SoftwareConnection::Message message{ softwareintegration::simp::MessageType::Unknown };
};
struct IncomingMessage {
std::weak_ptr<SoftwareConnection> connection;
softwareintegration::simp::MessageType type{ softwareintegration::simp::MessageType::Unknown };
std::vector<char> content{};
std::string rawMessageType{""};
};
void start();
void stop();
void postSync();
struct NetworkState {
ghoul::io::TcpSocketServer server;
private:
void handleNewSoftwareConnections();
void handleIncomingMessage(IncomingMessage incomingMessage);
void peerEventLoop(size_t connection_id);
void eventLoop();
std::thread serverThread;
std::thread eventLoopThread;
// The destuction of the object a shared_ptr is pointing to, occurs when the pointer no longer has any owners
std::shared_ptr<SoftwareConnection> getSoftwareConnection(size_t id);
std::unordered_map<size_t, std::shared_ptr<SoftwareConnection>> softwareConnections{};
std::mutex softwareConnectionsMutex{};
std::unordered_map<size_t, std::shared_ptr<SoftwareConnection>> _softwareConnections;
std::mutex _softwareConnectionsMutex;
ghoul::io::TcpSocketServer _socketServer;
std::thread _serverThread;
std::atomic_bool _shouldStopServerThread = false;
std::thread _eventLoopThread;
std::atomic_bool _shouldStopEventThread = false;
std::atomic_bool shouldStopThreads{ false };
const int _port;
InterruptibleConcurrentQueue<IncomingMessage> incomingMessages{};
};
// Message handlers
PointDataMessageHandler _pointDataMessageHandler;
std::shared_ptr<NetworkState> serve(const int port = 4700);
InterruptibleConcurrentQueue<IncomingMessage> _incomingMessages;
};
void stopServer(std::shared_ptr<NetworkState> networkState);
} // namespace openspace
} // namespace openspace::softwareintegration::network
#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___NETWORKENGINE___H__
......@@ -2,7 +2,7 @@
* *
* OpenSpace *
* *
* Copyright (c) 2014-2021 *
* Copyright (c) 2014-2022 *
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy of this *
* software and associated documentation files (the "Software"), to deal in the Software *
......@@ -24,6 +24,7 @@
#include <modules/softwareintegration/network/softwareconnection.h>
#include <modules/softwareintegration/network/network.h>
#include <ghoul/logging/logmanager.h>
#include <openspace/engine/globals.h>
#include <openspace/engine/syncengine.h>
......@@ -39,10 +40,6 @@ namespace openspace {
std::atomic_size_t SoftwareConnection::_nextConnectionId = 1;
SoftwareConnection::SoftwareConnectionLostError::SoftwareConnectionLostError(const std::string& msg)
: ghoul::RuntimeError(fmt::format("{}{}", "Software connection lost", msg), "SoftwareConnection")
{}
SoftwareConnection::SoftwareConnection(std::unique_ptr<ghoul::io::TcpSocket> socket)
: _id{ _nextConnectionId++ }, _socket{ std::move(socket) }, _sceneGraphNodes{},
_thread{}, _shouldStopThread{ false }
......@@ -52,7 +49,7 @@ SoftwareConnection::SoftwareConnection(std::unique_ptr<ghoul::io::TcpSocket> soc
SoftwareConnection::SoftwareConnection(SoftwareConnection&& sc)
: _id{ std::move(sc._id) }, _socket{ std::move(sc._socket) },
_isConnected{ sc._isConnected }, _sceneGraphNodes{ std::move(sc._sceneGraphNodes) },
_sceneGraphNodes{ std::move(sc._sceneGraphNodes) },
_thread{}, _shouldStopThread{ false }
{}
......@@ -61,16 +58,13 @@ SoftwareConnection::~SoftwareConnection() {
// destructor is called when disconnecting external
// since NetworkEngine and MessageHandler has
// shared_ptrs to SoftwareConnection, which can cause
// bugs if not handled properly.
// bugs if not handled properly.
// Tips: use weak_ptr instead of shared_ptr in callbacks.
LDEBUG(fmt::format("Removing software connection {}", _id));
if (!_isConnected) return;
_isConnected = false;
if (_socket) {
_socket->disconnect();
}
_shouldStopThread = true;
_thread.detach();
disconnect();
}
void SoftwareConnection::addPropertySubscription(
......@@ -199,8 +193,7 @@ void SoftwareConnection::removePropertySubscriptions(const std::string& identifi
_subscribedProperties.erase(propertySubscriptions);
}
void SoftwareConnection::PointDataMessageHandlerFriends::removePropertySubscription(
std::shared_ptr<SoftwareConnection> connectionPtr,
void SoftwareConnection::removePropertySubscription(
const std::string& propertyName,
const std::string& identifier
) {
......@@ -224,8 +217,8 @@ void SoftwareConnection::PointDataMessageHandlerFriends::removePropertySubscript
auto property = r->property(propertyName);
auto propertySubscriptions = connectionPtr->_subscribedProperties.find(identifier);
if (propertySubscriptions != connectionPtr->_subscribedProperties.end()) {
auto propertySubscriptions = _subscribedProperties.find(identifier);
if (propertySubscriptions != _subscribedProperties.end()) {
// At least one property have been subscribed to on this SGN
auto propertySubscription = propertySubscriptions->second.find(propertyName);
if (propertySubscription != propertySubscriptions->second.end()) {
......@@ -246,11 +239,11 @@ void SoftwareConnection::disconnect() {
}
bool SoftwareConnection::isConnected() const {
return _isConnected && _socket && _socket->isConnected();
return _socket && _socket->isConnected();
}
bool SoftwareConnection::isConnectedOrConnecting() const {
return _isConnected && _socket && (_socket->isConnected() || _socket->isConnecting());
return _socket && (_socket->isConnected() || _socket->isConnecting());
}
bool SoftwareConnection::sendMessage(const std::string& message) {
......@@ -285,7 +278,51 @@ void SoftwareConnection::removeSceneGraphNode(const std::string& identifier) {
}
}
SoftwareConnection::Message SoftwareConnection::receiveMessageFromSoftware() {
size_t SoftwareConnection::id() {
return _id;
}
void SoftwareConnection::setThread(std::thread& t) {
_thread = std::move(t);
}
ghoul::io::TcpSocket* SoftwareConnection::socket() {
return _socket.get();
}
namespace softwareintegration::network::connection {
void eventLoop(
std::weak_ptr<SoftwareConnection> connectionWeakPtr,
std::weak_ptr<NetworkState> networkStateWeakPtr
) {
while (!connectionWeakPtr.expired()) {
auto connectionPtr = connectionWeakPtr.lock();
if (connectionPtr->_shouldStopThread) break;
try {
IncomingMessage m = receiveMessageFromSoftware(connectionPtr);
if (networkStateWeakPtr.expired()) break;
networkStateWeakPtr.lock()->incomingMessages.push(m);
}
catch (const SoftwareConnectionLostError& err) {
if (!networkStateWeakPtr.expired()
& (!connectionPtr->_shouldStopThread || !connectionPtr->isConnectedOrConnecting())
) {
LDEBUG(fmt::format("Connection lost to {}: {}", connectionPtr->id(), err.message));
auto networkState = networkStateWeakPtr.lock();
if (networkState->softwareConnections.count(connectionPtr->id())) {
networkState->softwareConnections.erase(connectionPtr->id());
}
}
break;
}
}
}
IncomingMessage receiveMessageFromSoftware(
std::shared_ptr<SoftwareConnection> connectionPtr
) {
// Header consists of version (3 char), message type (4 char) & subject size (15 char)
size_t headerSize = 22 * sizeof(char);
......@@ -294,7 +331,7 @@ SoftwareConnection::Message SoftwareConnection::receiveMessageFromSoftware() {
std::vector<char> subjectBuffer;
// Receive the header data
if (!_socket->get(headerBuffer.data(), headerSize)) {
if (!connectionPtr->socket()->get(headerBuffer.data(), headerSize)) {
throw SoftwareConnectionLostError("Failed to read header from socket. Disconnecting.");
}
......@@ -329,34 +366,16 @@ SoftwareConnection::Message SoftwareConnection::receiveMessageFromSoftware() {
auto typeEnum = softwareintegration::simp::getMessageType(type);
// Receive the message data
if (typeEnum != softwareintegration::simp::MessageType::InternalDisconnection && typeEnum != softwareintegration::simp::MessageType::Unknown) {
if (typeEnum != softwareintegration::simp::MessageType::Unknown) {
subjectBuffer.resize(subjectSize);
if (!_socket->get(subjectBuffer.data(), subjectSize)) {
if (!connectionPtr->socket()->get(subjectBuffer.data(), subjectSize)) {
throw SoftwareConnectionLostError("Failed to read message from socket. Disconnecting.");
}
}
return Message{ typeEnum, subjectBuffer, type };
return { connectionPtr, typeEnum, subjectBuffer, type };
}
bool SoftwareConnection::shouldStopThread() {
return _shouldStopThread;
}
size_t SoftwareConnection::id() {
return _id;
}
void SoftwareConnection::setThread(std::thread& t) {
_thread = std::move(t);
}
void SoftwareConnection::NetworkEngineFriends::stopThread(std::shared_ptr<SoftwareConnection> connectionPtr) {
connectionPtr->_shouldStopThread = true;
connectionPtr->disconnect();
if (connectionPtr->_thread.joinable()) {
connectionPtr->_thread.join();
}
}
} // namespace softwareintegration::network::connection
} // namespace openspace
......@@ -2,7 +2,7 @@
* *
* OpenSpace *
* *
* Copyright (c) 2014-2021 *
* Copyright (c) 2014-2022 *
* *
* Permission is hereby granted, free of charge, to any person obtaining a copy of this *
* software and associated documentation files (the "Software"), to deal in the Software *
......@@ -25,8 +25,8 @@
#ifndef __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SOFTWARECONNECTION___H__
#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SOFTWARECONNECTION___H__
#include <openspace/network/messagestructures.h>
#include <modules/softwareintegration/utils.h>
#include <openspace/network/messagestructures.h>
#include <ghoul/io/socket/tcpsocket.h>
#include <openspace/properties/property.h>
......@@ -36,8 +36,30 @@
namespace openspace {
class Renderable;
class SoftwareConnection;
namespace softwareintegration::network {
struct NetworkState;
struct IncomingMessage;
namespace connection {
void eventLoop(
std::weak_ptr<SoftwareConnection> connectionWeakPtr,
std::weak_ptr<softwareintegration::network::NetworkState> networkStateWeakPtr
);
IncomingMessage receiveMessageFromSoftware(
std::shared_ptr<SoftwareConnection> connectionPtr
);
} // namespace connection
} // namespace softwareintegration::network
using namespace softwareintegration::network;
class SoftwareConnection {
public:
using OnChangeHandle = properties::Property::OnChangeHandle;
struct PropertySubscription {
......@@ -49,14 +71,6 @@ public:
using Identifier = std::string;
using SubscribedProperties = std::unordered_map<Identifier, PropertySubscriptions>;
struct Message {
softwareintegration::simp::MessageType type;
std::vector<char> content{};
std::string rawMessageType{""};
};
class SoftwareConnectionLostError;
explicit SoftwareConnection(std::unique_ptr<ghoul::io::TcpSocket> socket);
SoftwareConnection(SoftwareConnection&& p);
~SoftwareConnection();
......@@ -76,40 +90,33 @@ public:
// const std::string& identifier
// );
SoftwareConnection::Message receiveMessageFromSoftware();
void addSceneGraphNode(const std::string& identifier);
void removeSceneGraphNode(const std::string& identifier);
size_t id();
size_t nConnections();
void setThread(std::thread& t);
bool shouldStopThread();
class NetworkEngineFriends {
private:
static void stopThread(std::shared_ptr<SoftwareConnection> connectionPtr);
friend class NetworkEngine;
};
friend void connection::eventLoop(
std::weak_ptr<SoftwareConnection> connectionWeakPtr,
std::weak_ptr<NetworkState> networkStateWeakPtr
);
class PointDataMessageHandlerFriends {
private:
static void removePropertySubscription(
std::shared_ptr<SoftwareConnection> connectionPtr,
const std::string& propertyName,
const std::string& identifier
);
friend class PointDataMessageHandler;
};
friend IncomingMessage connection::receiveMessageFromSoftware(
std::shared_ptr<SoftwareConnection> connectionPtr
);
void removePropertySubscription(const std::string& propertyName, const std::string& identifier);
private:
void removePropertySubscriptions(const std::string& identifier);
ghoul::io::TcpSocket* socket();
private:
SubscribedProperties _subscribedProperties;
std::unordered_set<std::string> _sceneGraphNodes;
bool _isConnected = true;
std::unique_ptr<ghoul::io::TcpSocket> _socket;
size_t _id;
......@@ -119,11 +126,6 @@ private:
static std::atomic_size_t _nextConnectionId;
};
class SoftwareConnection::SoftwareConnectionLostError : public ghoul::RuntimeError {
public:
explicit SoftwareConnectionLostError(const std::string& msg);
};
} // namespace openspace
#endif // __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SOFTWAREINTEGRATIONMODULE___H__
......@@ -24,13 +24,13 @@
#include <modules/softwareintegration/softwareintegrationmodule.h>
#include <ghoul/filesystem/filesystem.h>
#include <modules/softwareintegration/rendering/renderablepointscloud.h>
#include <modules/softwareintegration/messagehandler.h>
#include <ghoul/logging/logmanager.h>
#include <openspace/engine/globals.h>
#include <openspace/engine/syncengine.h>
#include <openspace/engine/moduleengine.h>
#include <openspace/scripting/scriptengine.h>
#include <modules/softwareintegration/rendering/renderablepointscloud.h>
#include <openspace/documentation/documentation.h>
#include <openspace/engine/globalscallbacks.h>
#include <openspace/engine/windowdelegate.h>
......@@ -48,14 +48,9 @@ constexpr const char* _loggerCat = "SoftwareIntegrationModule";
namespace openspace {
SoftwareIntegrationModule::SoftwareIntegrationModule() : OpenSpaceModule(Name) {
if (global::windowDelegate->isMaster()) {
// The Master node will handle all communication with the external software
// and forward it to the Client nodes
// 4700, is the defualt port where the tcp socket will be opened to the ext. software
_networkEngine = std::make_unique<NetworkEngine>();
}
}
SoftwareIntegrationModule::SoftwareIntegrationModule()
: OpenSpaceModule(Name)
{}
SoftwareIntegrationModule::~SoftwareIntegrationModule() {
internalDeinitialize();
......@@ -89,17 +84,21 @@ void SoftwareIntegrationModule::internalInitialize(const ghoul::Dictionary&) {
fRenderable->registerClass<RenderablePointsCloud>("RenderablePointsCloud");
if (global::windowDelegate->isMaster()) {
_networkEngine->start();
// The Master node will handle all communication with the external software
// and forward it to the Client nodes
_networkState = softwareintegration::network::serve();
global::callback::postSyncPreDraw->emplace_back([this]() {
if (!_networkEngine) return;
_networkEngine->postSync();
softwareintegration::network::postSyncCallbacks();
});
}
}
void SoftwareIntegrationModule::internalDeinitialize() {
global::syncEngine->removeSyncables(getSyncables());
if (_networkState) {
softwareintegration::network::stopServer(_networkState);
}
}
std::vector<documentation::Documentation> SoftwareIntegrationModule::documentations() const {
......
......@@ -26,10 +26,9 @@
#define __OPENSPACE_MODULE_SOFTWAREINTEGRATION___SOFTWAREINTEGRATIONMODULE___H__
#include <openspace/util/openspacemodule.h>
#include <modules/softwareintegration/network/networkengine.h>
#include <modules/softwareintegration/syncablefloatdatastorage.h>
#include <openspace/documentation/documentation.h>
#include <modules/softwareintegration/network/network.h>
namespace openspace {
......@@ -64,8 +63,7 @@ private:
// Centralized storage for datasets
SyncableFloatDataStorage _syncableFloatDataStorage;
// Network engine
std::unique_ptr<NetworkEngine> _networkEngine;
std::shared_ptr<softwareintegration::network::NetworkState> _networkState;
};
} // namespace openspace
......
......@@ -38,6 +38,20 @@ namespace openspace {
namespace softwareintegration {
namespace storage {
// Anonymous namespace
namespace {
const std::unordered_map<std::string, Key> _keyStringFromKey{
{ "DataPoints", Key::DataPoints },
{ "VelocityData", Key::VelocityData },
{ "Colormap", Key::Colormap },
{ "ColormapAttributeData", Key::ColormapAttrData },
{ "LinearSizeAttributeData", Key::LinearSizeAttrData },
};
} // namespace
bool hasStorageKey(const std::string& key) {
return _keyStringFromKey.count(key) > 0;
}
......@@ -72,6 +86,7 @@ namespace {
const std::unordered_map<std::string, MessageType> _messageTypeFromSIMPType{
{ "CONN", MessageType::Connection },
{ "PDAT", MessageType::PointData },
{ "VDAT", MessageType::VelocityData },
{ "RSGN", MessageType::RemoveSceneGraphNode },
{ "FCOL", MessageType::Color },
{ "LCOL", MessageType::Colormap },
......
......@@ -40,13 +40,6 @@ enum class Key : uint8_t {
Unknown
};
const std::unordered_map<std::string, Key> _keyStringFromKey{
{ "DataPoints", Key::DataPoints },
{ "Colormap", Key::Colormap },
{ "ColormapAttributeData", Key::ColormapAttrData },
{ "LinearSizeAttributeData", Key::LinearSizeAttrData },
};
Key getStorageKey(const std::string& key);
std::string getStorageKeyString(const Key key);
......@@ -73,7 +66,6 @@ enum class MessageType : uint32_t {
FixedSize,
LinearSize,
Visibility,
InternalDisconnection,
Unknown
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment