Skip to content

Commit

Permalink
Merge pull request #27 from mtstickney/stream_partial_writes
Browse files Browse the repository at this point in the history
Stream partial writes
  • Loading branch information
Roman Isaikin authored Mar 2, 2017
2 parents 9d01728 + ca56d9e commit e64de7e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 19 deletions.
67 changes: 48 additions & 19 deletions src/msgpackstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@
return retVal;

MsgPackStream::MsgPackStream() :
dev(0), owndev(false), q_status(Ok)
dev(0), owndev(false), q_status(Ok), flushWrites(false)
{ }

MsgPackStream::MsgPackStream(QIODevice *d) :
dev(d), owndev(false)
dev(d), owndev(false), q_status(Ok), flushWrites(false)
{ }

MsgPackStream::MsgPackStream(QByteArray *a, QIODevice::OpenMode mode) :
owndev(true), q_status(Ok)
owndev(true), q_status(Ok), flushWrites(false)
{
QBuffer *buf = new QBuffer(a);
buf->open(mode);
dev = buf;
}

MsgPackStream::MsgPackStream(const QByteArray &a) :
owndev(true), q_status(Ok)
owndev(true), q_status(Ok), flushWrites(false)
{
QBuffer *buf = new QBuffer();
buf->setData(a);
Expand Down Expand Up @@ -87,6 +87,16 @@ void MsgPackStream::setStatus(Status status)
q_status = status;
}

void MsgPackStream::setFlushWrites(bool flush)
{
flushWrites = flush;
}

bool MsgPackStream::willFlushWrites()
{
return flushWrites;
}

MsgPackStream &MsgPackStream::operator>>(bool &b)
{
CHECK_STREAM_PRECOND(*this)
Expand Down Expand Up @@ -385,7 +395,7 @@ MsgPackStream &MsgPackStream::operator<<(bool b)
CHECK_STREAM_WRITE_PRECOND(*this);
quint8 m = b == true ?
MsgPack::FirstByte::MTRUE : MsgPack::FirstByte::MFALSE;
if (dev->write((char *)&m, 1) != 1)
if (writeBytes((char *)&m, 1) != 1)
setStatus(WriteFailed);
return *this;
}
Expand All @@ -395,7 +405,7 @@ MsgPackStream &MsgPackStream::operator<<(quint32 u32)
CHECK_STREAM_WRITE_PRECOND(*this);
quint8 p[5];
quint8 sz = MsgPackPrivate::pack_uint(u32, p, true) - p;
if (dev->write((char *)p, sz) != sz)
if (writeBytes((char *)p, sz) != sz)
setStatus(WriteFailed);
return *this;
}
Expand All @@ -405,7 +415,7 @@ MsgPackStream &MsgPackStream::operator<<(quint64 u64)
CHECK_STREAM_WRITE_PRECOND(*this);
quint8 p[9];
quint8 sz = MsgPackPrivate::pack_ulonglong(u64, p, true) - p;
if (dev->write((char *)p, sz) != sz)
if (writeBytes((char *)p, sz) != sz)
setStatus(WriteFailed);
return *this;
}
Expand All @@ -415,7 +425,7 @@ MsgPackStream &MsgPackStream::operator<<(qint32 i32)
CHECK_STREAM_WRITE_PRECOND(*this);
quint8 p[5];
quint8 sz = MsgPackPrivate::pack_int(i32, p, true) - p;
if (dev->write((char *)p, sz) != sz)
if (writeBytes((char *)p, sz) != sz)
setStatus(WriteFailed);
return *this;
}
Expand All @@ -425,7 +435,7 @@ MsgPackStream &MsgPackStream::operator<<(qint64 i64)
CHECK_STREAM_WRITE_PRECOND(*this);
quint8 p[9];
quint8 sz = MsgPackPrivate::pack_longlong(i64, p, true) - p;
if (dev->write((char *)p, sz) != sz)
if (writeBytes((char *)p, sz) != sz)
setStatus(WriteFailed);
return *this;
}
Expand All @@ -435,7 +445,7 @@ MsgPackStream &MsgPackStream::operator<<(float f)
CHECK_STREAM_WRITE_PRECOND(*this);
quint8 p[5];
quint8 sz = MsgPackPrivate::pack_float(f, p, true) - p;
if (dev->write((char *)p, sz) != sz)
if (writeBytes((char *)p, sz) != sz)
setStatus(WriteFailed);
return *this;
}
Expand All @@ -445,7 +455,7 @@ MsgPackStream &MsgPackStream::operator<<(double d)
CHECK_STREAM_WRITE_PRECOND(*this);
quint8 p[9];
quint8 sz = MsgPackPrivate::pack_double(d, p, true) - p;
if (dev->write((char *)p, sz) != sz)
if (writeBytes((char *)p, sz) != sz)
setStatus(WriteFailed);
return *this;
}
Expand All @@ -457,7 +467,7 @@ MsgPackStream &MsgPackStream::operator<<(QString str)
quint32 sz = MsgPackPrivate::pack_string(str, p, false) - p;
quint8 *data = new quint8[sz];
MsgPackPrivate::pack_string(str, data, true);
if (dev->write((char *)data, sz) != sz)
if (writeBytes((char *)data, sz) != sz)
setStatus(WriteFailed);
delete[] data;
return *this;
Expand All @@ -471,7 +481,7 @@ MsgPackStream &MsgPackStream::operator<<(const char *str)
quint32 sz = MsgPackPrivate::pack_string_raw(str, str_len, p, false) - p;
quint8 *data = new quint8[sz];
MsgPackPrivate::pack_string_raw(str, str_len, data, true);
if (dev->write((char *)data, sz) != sz)
if (writeBytes((char *)data, sz) != sz)
setStatus(WriteFailed);
delete[] data;
return *this;
Expand All @@ -483,21 +493,40 @@ MsgPackStream &MsgPackStream::operator<<(QByteArray array)
quint8 p[5];
quint32 len = array.length();
quint8 header_len = MsgPackPrivate::pack_bin_header(len, p, true) - p;
if (dev->write((char *)p, header_len) != header_len) {
if (writeBytes((char *)p, header_len) != header_len) {
setStatus(WriteFailed);
return *this;
}
if (dev->write(array.data(), len) != len)
if (writeBytes(array.data(), len) != len)
setStatus(WriteFailed);
return *this;
}

bool MsgPackStream::writeBytes(const char *data, uint len)
{
CHECK_STREAM_WRITE_PRECOND(false);
if (dev->write(data, len) != len) {
setStatus(WriteFailed);
return false;
uint written = 0;
uint thisWrite;
while (written < len) {
thisWrite = dev->write(data, len - written);
if (thisWrite < 0) {
setStatus(WriteFailed);
return false;
}
/* Apparently on Windows, the buffer size for named pipes is 0, and
* any data that is written before the remote end reads it is
* dropped (!!) without error (see https://bugreports.qt.io/browse/QTBUG-18385).
* We must be very sure that the data has been written before we try
* another write. This degrades performance in other cases, so callers
* must enable this behavior explicitly.
*/
if (this->flushWrites) {
dev->waitForBytesWritten(-1);
}

/* Increment the write pointer and the total byte count. */
data += thisWrite;
written += thisWrite;
}
return true;
}
Expand Down Expand Up @@ -534,7 +563,7 @@ bool MsgPackStream::writeExtHeader(quint32 len, qint8 msgpackType)
d[5] = msgpackType;
sz = 6;
}
if (dev->write((const char *)d, sz) != sz) {
if (writeBytes((const char *)d, sz) != sz) {
setStatus(WriteFailed);
return false;
}
Expand Down
3 changes: 3 additions & 0 deletions src/msgpackstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class MSGPACK_EXPORT MsgPackStream
Status status() const;
void resetStatus();
void setStatus(Status status);
void setFlushWrites(bool flushWrites);
bool willFlushWrites();

MsgPackStream &operator>>(bool &b);
MsgPackStream &operator>>(quint8 &u8);
Expand Down Expand Up @@ -60,6 +62,7 @@ class MSGPACK_EXPORT MsgPackStream
QIODevice *dev;
bool owndev;
Status q_status;
bool flushWrites;

bool unpack_longlong(qint64 &i64);
bool unpack_ulonglong(quint64 &u64);
Expand Down

0 comments on commit e64de7e

Please sign in to comment.