forked from Tribler/tribler
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconversion.py
35 lines (25 loc) · 1.08 KB
/
conversion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
"""
Example file
"""
from struct import pack, unpack_from
from Tribler.dispersy.conversion import BinaryConversion
from Tribler.dispersy.message import DropPacket
class Conversion(BinaryConversion):
def __init__(self, community):
super(Conversion, self).__init__(community, "\x02")
self.define_meta_message(chr(1), community.get_meta_message(u"text"), self._encode_text, self._decode_text)
def _encode_text(self, message):
assert len(message.payload.text.encode("UTF-8")) < 256
text = message.payload.text.encode("UTF-8")
return pack("!B", len(text)), text[:255]
def _decode_text(self, placeholder, offset, data):
if len(data) < offset + 1:
raise DropPacket("Insufficient packet size")
text_length, = unpack_from("!B", data, offset)
offset += 1
try:
text = data[offset:offset + text_length].decode("UTF-8")
offset += text_length
except UnicodeError:
raise DropPacket("Unable to decode UTF-8")
return offset, placeholder.meta.payload.implement(text)