Skip to content

Commit

Permalink
Work in progress file transfer fixes and optimizations in streaming mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Ri0n committed Apr 21, 2024
1 parent 53ef4a4 commit 17201e5
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 106 deletions.
9 changes: 6 additions & 3 deletions src/xmpp/xmpp-im/jingle-connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ namespace XMPP { namespace Jingle {
return 0;
}

qint64 Connection::readData(char *, qint64)
qint64 Connection::readData(char *buf, qint64 maxSize)
{
qCritical("Calling unimplemented function readData");
return 0;
auto sz = readDataInternal(buf, maxSize);
if (sz != -1 && _readHook) {
_readHook(buf, sz);
}
return sz;
}

size_t Connection::blockSize() const
Expand Down
15 changes: 11 additions & 4 deletions src/xmpp/xmpp-im/jingle-connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ namespace XMPP { namespace Jingle {
class Connection : public ByteStream {
Q_OBJECT
public:
using Ptr = QSharedPointer<Connection>; // will be shared between transport and application
using Ptr = QSharedPointer<Connection>; // will be shared between transport and application
using ReadHook = std::function<void(char *, qint64)>;

virtual bool hasPendingDatagrams() const;
virtual QNetworkDatagram readDatagram(qint64 maxSize = -1);
virtual bool writeDatagram(const QNetworkDatagram &data);
Expand All @@ -46,17 +48,22 @@ namespace XMPP { namespace Jingle {
inline void setId(const QString &id) { _id = id; }
inline bool isRemote() const { return _isRemote; }
inline void setRemote(bool value) { _isRemote = value; }
inline void setReadHook(ReadHook hook) { _readHook = hook; }

signals:
void connected();
void disconnected();

protected:
qint64 writeData(const char *data, qint64 maxSize);
qint64 readData(char *data, qint64 maxSize);
qint64 readData(char *buf, qint64 maxSize) final;

// same rules as for QIOdevice::readData. It was just necessary to wrap it.
virtual qint64 readDataInternal(char *data, qint64 maxSize) = 0;

bool _isRemote = false;
QString _id;
bool _isRemote = false;
QString _id;
ReadHook _readHook;
};

using ConnectionAcceptorCallback = std::function<bool(Connection::Ptr)>;
Expand Down
136 changes: 86 additions & 50 deletions src/xmpp/xmpp-im/jingle-ft.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* jignle-ft.h - Jingle file transfer
* Copyright (C) 2019 Sergey Ilinykh
* Copyright (C) 2019-2024 Sergey Ilinykh
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -51,6 +51,22 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
static const QString CHECKSUM_TAG = QStringLiteral("checksum");
static const QString RECEIVED_TAG = QStringLiteral("received");

class Checksum : public ContentBase {
public:
inline Checksum() { }
Checksum(const QDomElement &file);
bool isValid() const;
QDomElement toXml(QDomDocument *doc) const;

File file;
};

class Received : public ContentBase {
public:
using ContentBase::ContentBase;
QDomElement toXml(QDomDocument *doc) const;
};

//----------------------------------------------------------------------------
// Checksum
//----------------------------------------------------------------------------
Expand Down Expand Up @@ -177,9 +193,21 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
setState(State::Finished);
}

void handleStreamFail()
void onIncomingChecksum(const QList<Hash> &hashes)
{
lastReason = Reason(Reason::Condition::FailedApplication, QString::fromLatin1("stream failed"));
if (!hasher || q->_senders != q->_pad->session()->peerRole()) {
qDebug("jignle-ft: unexpected incoming checksum. was it negotiated? %s",
qUtf8Printable(q->pad()->session()->peer().full()));
return;
}
incomingChecksum = hashes;
tryFinalizeIncoming();
}

void handleStreamFail(const QString &errorMsg = {})
{
lastReason = Reason(Reason::Condition::FailedApplication,
errorMsg.isEmpty() ? QString::fromLatin1("stream failed") : errorMsg);
setState(State::Finished);
}

Expand Down Expand Up @@ -218,6 +246,12 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
}
}

inline qint64 getBlockSize()
{
auto sz = qint64(connection->blockSize());
return sz ? sz : 8192;
}

void writeNextBlockToTransport()
{
if (bytesLeft && *bytesLeft == 0) {
Expand All @@ -232,20 +266,24 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
expectReceived();
return; // everything is written
}
auto sz = qint64(connection->blockSize());
sz = sz ? sz : 8192;
auto sz = getBlockSize();
if (bytesLeft && sz > *bytesLeft) {
sz = *bytesLeft;
}
QByteArray data;
if (device->isSequential()) {
if (!device->bytesAvailable())
sz = qMin(sz, device->bytesAvailable());
if (!sz)
return; // we will come back on readyRead
data = device->read(qMin(qint64(sz), device->bytesAvailable()));
} else {
data = device->read(sz);
}
if (data.isEmpty()) {
data.resize(sz);
sz = device->read(data.data(), sz);
if (sz == -1) {
handleStreamFail(QString::fromLatin1("source device failed"));
return;
}
data.resize(sz);
if (sz == 0) {
if (!bytesLeft) {
lastReason = Reason(Reason::Condition::Success);
if (hasher) {
Expand All @@ -261,11 +299,10 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
handleStreamFail();
}
return;
}
// qDebug("JINGLE-FT write %d bytes to connection", data.size());
if (hasher) {
} else if (hasher) {
hasher->addData(data);
}

if (connection->features() & TransportFeature::MessageOriented) {
if (!connection->writeDatagram(data)) {
handleStreamFail();
Expand Down Expand Up @@ -331,12 +368,27 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
qDebug("jingle-ft: connected. ready to transfer user data with %s",
qUtf8Printable(q->pad()->session()->peer().full()));
connection = newConnection;

lastReason = Reason();
lastError.reset();

if (streamingMode) {
qDebug("jingle-ft: streaming mode is active. giving up with handling on our own with %s",
qDebug("jingle-ft: streaming mode is active for %s",
qUtf8Printable(q->pad()->session()->peer().full()));
if (amIReceiver()) {
connection->setReadHook([this](char *buf, qint64 size) {
// in streaming mode we need this to compute hash sum and detect stream end is size was defined
if (hasher) {
hasher->addData(QByteArray::fromRawData(buf, size));
}
if (bytesLeft) {
*bytesLeft -= size;
}
if (bytesLeft && *bytesLeft == 0) {
tryFinalizeIncoming();
}
});
}
setState(State::Active);
emit q->connectionReady();
return;
Expand All @@ -362,8 +414,8 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
qUtf8Printable(q->pad()->session()->peer().full()));
writeLoggingStarted = true;
}
Q_UNUSED(bytes)
if (q->pad()->session()->role() == q->senders() && !connection->bytesToWrite()) {
auto bs = getBlockSize();
if (q->pad()->session()->role() == q->senders() && connection->bytesToWrite() < bs) {
writeNextBlockToTransport();
}
},
Expand Down Expand Up @@ -428,6 +480,18 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
outgoingReceived = true;
emit q->updated();
}

void prepareThumbnail(File &file)
{
if (file.thumbnail().data.size()) {
auto client = q->pad()->session()->manager()->client();
auto thumb = file.thumbnail();
auto bm = client->bobManager();
BoBData data = bm->append(thumb.data, thumb.mimeType);
thumb.uri = QLatin1String("cid:") + data.cid();
file.setThumbnail(thumb);
}
}
};

Application::Application(const QSharedPointer<Pad> &pad, const QString &contentName, Origin creator,
Expand Down Expand Up @@ -488,18 +552,6 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
return ret;
}

void Application::prepareThumbnail(File &file)
{
if (file.thumbnail().data.size()) {
auto client = _pad->session()->manager()->client();
auto thumb = file.thumbnail();
auto bm = client->bobManager();
BoBData data = bm->append(thumb.data, thumb.mimeType);
thumb.uri = QLatin1String("cid:") + data.cid();
d->file.setThumbnail(thumb);
}
}

QDomElement Application::makeLocalOffer()
{
if (!d->file.isValid()) {
Expand All @@ -508,7 +560,7 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
auto doc = _pad->doc();
auto el = doc->createElementNS(NS, "description");

prepareThumbnail(d->file);
d->prepareThumbnail(d->file);
el.appendChild(d->file.toXml(doc));
return el;
}
Expand Down Expand Up @@ -698,25 +750,6 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {

Connection::Ptr Application::connection() const { return d->connection.staticCast<XMPP::Jingle::Connection>(); }

void Application::incomingChecksum(const QList<Hash> &hashes)
{
qDebug("jignle-ft: got checksum: %s for %s", qPrintable(hashes.value(0).toString()),
qUtf8Printable(pad()->session()->peer().full()));
if (!d->hasher || _senders != _pad->session()->peerRole()) {
qDebug("jignle-ft: unexpected incoming checksum. was it negotiated? %s",
qUtf8Printable(pad()->session()->peer().full()));
return;
}
d->incomingChecksum = hashes;
d->tryFinalizeIncoming();
}

void Application::incomingReceived()
{
qDebug("jingle-ft: got received for %s", qUtf8Printable(pad()->session()->peer().full()));
d->onReceived();
}

Pad::Pad(Manager *manager, Session *session) : _manager(manager), _session(session) { }

QDomElement Pad::takeOutgoingSessionInfoUpdate()
Expand Down Expand Up @@ -751,14 +784,17 @@ namespace XMPP { namespace Jingle { namespace FileTransfer {
Checksum checksum(el);
auto app = session()->content(checksum.name, checksum.creator);
if (app) {
static_cast<Application *>(app)->incomingChecksum(checksum.file.hashes());
qDebug("jignle-ft: got checksum: %s for %s", qPrintable(checksum.file.hashes().value(0).toString()),
qUtf8Printable(session()->peer().full()));
static_cast<Application *>(app)->d->onIncomingChecksum(checksum.file.hashes());
}
return true;
} else if (el.tagName() == RECEIVED_TAG) {
Received received(el);
auto app = session()->content(received.name, received.creator);
if (app) {
static_cast<Application *>(app)->incomingReceived();
qDebug("jingle-ft: got received for %s", qUtf8Printable(session()->peer().full()));
static_cast<Application *>(app)->d->onReceived();
}
return true;
} else {
Expand Down
Loading

0 comments on commit 17201e5

Please sign in to comment.