-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReceiver_Data.py
45 lines (37 loc) · 1.1 KB
/
Receiver_Data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import numpy as np
import os
import datetime
import time
import serial
from digi.xbee.devicese import XBeeDevice
from xbee import XBee
from digi.xbee import serial
from digi.xbee.util import utils
from abc import ABCMeta, abstractmethod
from enum import enum, unique
from functools import wraps
from ipadress import IPv4Address
from queue import Queue, Empty
from digi.xbee.serial import FlowControl, XBeeSerialPort
dir_path = '/path/to/directory'
def save_file(data):
file_path = os.path.join(dir_path, 'received_file')
with open(file_path, 'wb') as file:
file.write(data)
def main():
ser = serial.Serial("/dev/ttyUSB0", 9600)
# NOTE: Change "/dev/ttyUSB0" to the according serial port
# that the XBee is plugged into.
# dmesg | grep tty
# This is the command to find what port the RPi is in
# Ex: "now attached to ttyUSB0"
xbee = XBee(ser)
while True:
try:
response = xbee.wait_read_frame()
save_file(response['rf_data'])
except KeyboardInterrupt:
break
ser.close()
if __name__ == '__main__':
main()