diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 61e3d26..0000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "SimpleWebSocketServer"] - path = app/SimpleWebSocketServer - url = https://github.com/OliverF/SimpleWebSocketServer diff --git a/Dockerfile b/Dockerfile index 2b5b985..1ed9b4f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,11 @@ -from python:2.7-onbuild +FROM --platform=$TARGETPLATFORM python:3.10-slim -expose 54321 +ENV PYTHONUNBUFFERED=1 +ENV SOURCE_URL="http://localhost:8081/?action=stream" -entrypoint ["python", "relay.py"] +COPY . / +RUN pip3 install -r /requirements.txt + +EXPOSE 54321 +EXPOSE 54322 +CMD ["python", "/relay.py"] diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..7803b96 --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,36 @@ +pipeline{ + + agent any + + environment { + IMAGE_NAME="hdavid0510/mjpeg-relay" + IMAGE_TAG="latest" + REGISTRY_CREDENTIALS=credentials('dockerhub-credential') + } + + stages { + stage('Init') { + steps { + echo 'Initializing.' + + sh 'echo $REGISTRY_CREDENTIALS_PSW | docker login -u $REGISTRY_CREDENTIALS_USR --password-stdin' + echo "Running ${env.BUILD_ID} on ${env.JENKINS_URL}" + echo "Building ${IMAGE_NAME} on branch ${IMAGE_TAG}" + } + } + stage('Build/Push') { + steps { + echo 'Building image and pushing to DockerHub.' + + sh 'docker buildx build --push --platform linux/386,linux/amd64,linux/arm/v5,linux/arm/v7,linux/arm64,linux/ppc64le,linux/s390x -t $IMAGE_NAME:$IMAGE_TAG .' + } + } + } + + post { + always { + sh 'docker logout' + } + } + +} diff --git a/README.md b/README.md index 2ec7e82..6399a2f 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,25 @@ -mjpeg-relay -=========== +# mjpeg-relay mjpeg-relay is a simple Python script which accepts input from an existing MJPEG stream, and relays it to any number of clients. This is intended for a scenario where the original MJPEG stream is hosted on a low-bandwidth internet connection (such as a home connection) or from a low-resource device (such as a Raspberry Pi or IP camera), and you have a server from which you wish to relay the stream to multiple clients without placing extra demand on the original stream. The script is designed to be simple to use with minimal configuration. All video parameters are defined by the source stream, ensuring mjpeg-relay is as transparent as possible. Rather than creating its own MJPEG stream, mjpeg-relay simply re-streams the original MJPEG stream directly. This is a faster and more transparent approach. -# Features +**Python 3.6+ is required.** + + +## Features - Low resource - Low latency - Status page - Option to stream to clients via WebSockets -# Installation + +## Installation 1. Clone this repository with `git clone ` -2. Ensure submodules are correctly installed by running `git submodule update --init` +2. Ensure dependencies are correctly installed by running `pip install -r requirements.txt` -# Usage + +## Usage `relay.py [-p ] [-w ] [-q] [-d] stream-source-url` - **-p \**: Port that the stream will be relayed on (default is 54321) @@ -30,16 +34,18 @@ Once it is running, you can access the following URLs: * `/stream`: the MJPEG stream. This can be embedded directly into an `` tag on modern browsers like so: `` * `/snapshot`: the latest JPEG frame from the MJPEG stream -# Example -**Relaying MJPEG stream at 192.0.2.1:1234/?action=stream on port 54017** +## Example + +**Situation:** Relaying MJPEG stream at 192.0.2.1:1234/?action=stream on port 54017 1. Start the relay: `python relay.py -p 54017 "http://192.0.2.1:1234/?action=stream"` 2. Confirm that mjpeg-relay has connected to the remote stream 3. Connect to the relayed stream at `http://localhost:54017/stream`. This can be embedded directly into an `` tag on modern browsers like so: `` 4. The status of mjpeg-relay is displayed at `http://localhost:54017/status` -# WebSocket Example + +## WebSocket Example **As above, but also relaying the MJPEG stream via WebSockets on port 54018** @@ -48,11 +54,38 @@ Once it is running, you can access the following URLs: 3. Copy and paste the example HTML and JavaScript in the file `websocketexample.html` into your website, and adapt as necessary 4. The status of mjpeg-relay is displayed at `http://localhost:54017/status` -# Docker -Here is the same example done with docker: + +# mjpeg-relay Docker image + +[![](https://img.shields.io/docker/pulls/hdavid0510/mjpeg-relay?style=flat-square)](https://hub.docker.com/r/hdavid0510/mjpeg-relay) [![](https://img.shields.io/github/issues/hdavid0510/mjpeg-relay?style=flat-square)](https://github.com/hdavid0510/mjpeg-relay/issues) +Docker image which all the scripts in this repository is preinstalled on [python:alpine](https://hub.docker.com/r/_/python) + + +## Tags + +### latest +[![](https://img.shields.io/docker/v/hdavid0510/mjpeg-relay/latest?style=flat-square)]() [![](https://img.shields.io/docker/image-size/hdavid0510/mjpeg-relay/latest?style=flat-square)]() +Built from `master` branch + + +## Environment Variables + +### `SOURCE_URL` +* URL of the source of the stream, in form of `http://user:password@ip:port/path/to/stream/`. +* **DEFAULT** `http://localhost:8081/?action=stream` + + +## Port Bindings +| Option | Port# | Type | Service | +| ------ | ----- | ---- | ------- | +|__Required__|54321|tcp| Relayed stream is shown through this port. | +|_Optional_|54322|tcp| Relayed stream **via WebSocket** is shown through this port.| + + +## Build ``` shell -docker build -t relay . -docker run -d -p 54017:54321 relay "http://192.0.2.1:1234/?action=stream" +docker build -t IMAGE_NAME . +docker run -it -p 54017:54321 -e "http://192.0.2.1:1234/?action=stream" IMAGE_NAME ``` diff --git a/app/SimpleWebSocketServer b/app/SimpleWebSocketServer deleted file mode 160000 index 9da9b75..0000000 --- a/app/SimpleWebSocketServer +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 9da9b759f63f8e2f5c1a9a215099c5a5f4c19852 diff --git a/app/broadcaster.py b/app/broadcaster.py index d2e018d..9ccc8da 100644 --- a/app/broadcaster.py +++ b/app/broadcaster.py @@ -1,27 +1,29 @@ -import Queue +import sys +sys.tracebacklimit = 0 +import traceback import threading import logging import requests import re import time import base64 -from status import Status +from .status import Status class HTTPBasicThenDigestAuth(requests.auth.HTTPDigestAuth): """Try HTTPBasicAuth, then HTTPDigestAuth.""" - def __init__(self): - super(HTTPBasicThenDigestAuth, self).__init__(None, None) + def __init__(self): + super(HTTPBasicThenDigestAuth, self).__init__(None, None) - def __call__(self, r): - # Extract auth from URL - self.username, self.password = requests.utils.get_auth_from_url(r.url) + def __call__(self, r): + # Extract auth from URL + self.username, self.password = requests.utils.get_auth_from_url(r.url) - # Prepare basic auth - r = requests.auth.HTTPBasicAuth(self.username, self.password).__call__(r) + # Prepare basic auth + r = requests.auth.HTTPBasicAuth(self.username, self.password).__call__(r) - # Let HTTPDigestAuth handle the 401 - return super(HTTPBasicThenDigestAuth, self).__call__(r) + # Let HTTPDigestAuth handle the 401 + return super(HTTPBasicThenDigestAuth, self).__call__(r) class Broadcaster: """Handles relaying the source MJPEG stream to connected clients""" @@ -40,8 +42,8 @@ def __init__(self, url): self.broadcastThread = threading.Thread(target = self.streamFromSource) self.broadcastThread.daemon = True - self.lastFrame = "" - self.lastFrameBuffer = "" + self.lastFrame: bytes = b"" + self.lastFrameBuffer: bytes = b"" self.connected = False self.broadcasting = False @@ -50,12 +52,11 @@ def __init__(self, url): feedLostFile = open("app/resources/feedlost.jpeg", "rb") #read-only, binary feedLostImage = feedLostFile.read() feedLostFile.close() - - self.feedLostFrame = "Content-Type: image/jpeg\r\n"\ - "Content-Length: {}\r\n\r\n"\ - "{}".format(len(feedLostImage), feedLostImage) - except IOError, e: - logging.warning("Unable to read feedlost.jpeg: {}".format(e)) + self.feedLostFrame = bytearray(f"Content-Type: image/jpeg\r\nContent-Length: {len(feedLostImage)}\r\n\r\n",'utf-8') + self.feedLostFrame.extend(feedLostImage) + except IOError as e: + logging.warning("Unable to read feedlost.jpeg") + # traceback.print_exc() self.feedLostFrame = False Broadcaster._instance = self @@ -63,7 +64,7 @@ def __init__(self, url): def start(self): if (self.connectToStream()): self.broadcasting = True - logging.info("Connected to stream source, boundary separator: {}".format(self.boundarySeparator)) + logging.info(f"Connected to stream source, boundary separator: {self.boundarySeparator}") self.broadcastThread.start() # @@ -71,12 +72,17 @@ def start(self): # def connectToStream(self): try: - self.sourceStream = requests.get(self.url, stream = True, timeout = 10, auth = HTTPBasicThenDigestAuth()) - except Exception, e: - logging.error("Error: Unable to connect to stream source at {}: {}".format(self.url, e)) + self.sourceStream = requests.get(self.url, stream = True, timeout = 3, auth = HTTPBasicThenDigestAuth()) + except Exception as e: + logging.error(f"[ERROR] Unable to connect to stream source at {self.url}") + traceback.print_exc() return False + except: + logging.error("[ERROR] failed to connect to stream source.") + pass + - self.boundarySeparator = self.parseStreamHeader(self.sourceStream.headers['Content-Type']) + self.boundarySeparator: bytes = self.parseStreamHeader(self.sourceStream.headers['Content-Type']).encode() if (not self.boundarySeparator): logging.error("Unable to find boundary separator in the header returned from the stream source") @@ -88,7 +94,7 @@ def connectToStream(self): # # Parses the stream header and returns the boundary separator # - def parseStreamHeader(self, header): + def parseStreamHeader(self, header) -> str: if (not isinstance(header, str)): return None @@ -99,8 +105,8 @@ def parseStreamHeader(self, header): boundary = "--" + boundary return boundary except: - logging.error("Unexpected header returned from stream source: unable to parse boundary") - logging.error(header) + logging.error("[ERROR] Unexpected header returned from stream source; unable to parse boundary") + logging.debug(f"header={header}") return None # @@ -112,7 +118,7 @@ def getClientCount(self): # # Process data in frame buffer, extract frames when present # - def extractFrames(self, frameBuffer): + def extractFrames(self, frameBuffer: bytes): if (frameBuffer.count(self.boundarySeparator) >= 2): #calculate the start and end points of the frame start = frameBuffer.find(self.boundarySeparator) @@ -122,7 +128,7 @@ def extractFrames(self, frameBuffer): mjpegFrame = frameBuffer[start:end] #extract frame data - frameStart = frameBuffer.find("\r\n\r\n", start) + len("\r\n\r\n") + frameStart = frameBuffer.find(b"\r\n\r\n", start) + len(b"\r\n\r\n") frame = frameBuffer[frameStart:end] #process for WebSocket clients @@ -135,17 +141,17 @@ def extractFrames(self, frameBuffer): # # Broadcast data to a list of StreamingClients # - def broadcastToStreamingClients(self, clients, data): + def broadcastToStreamingClients(self, clients, data: bytes): for client in clients: if (not client.connected): clients.remove(client) - logging.info("Client left. Client count: {}".format(self.getClientCount())) + logging.info(f"\nClient left; {self.getClientCount()} client(s) connected") client.bufferStreamData(data) # # Broadcast data to all connected clients # - def broadcast(self, data): + def broadcast(self, data: bytes): self.lastFrameBuffer += data mjpegFrame, webSocketFrame, frame, bufferProcessedTo = self.extractFrames(self.lastFrameBuffer) @@ -177,15 +183,18 @@ def streamFromSource(self): self.broadcast(data) self.status.addToBytesIn(len(data)) self.status.addToBytesOut(len(data)*self.getClientCount()) - except Exception, e: - logging.error("Lost connection to the stream source: {}".format(e)) + except Exception as e: + logging.error(f"Lost connection to the stream source: \n") finally: #flush the frame buffer to avoid conflicting with future frame data - self.lastFrameBuffer = "" + self.lastFrameBuffer = b"" self.connected = False while (not self.connected): + data_during_lost = bytearray(self.boundarySeparator) + data_during_lost.extend(b"\r\n") + data_during_lost.extend(self.feedLostFrame) + data_during_lost.extend(b"\r\n") if (self.feedLostFrame): - data = self.boundarySeparator + "\r\n" + self.feedLostFrame + "\r\n" - self.broadcast(data) + self.broadcast(data_during_lost) time.sleep(5) self.connectToStream() \ No newline at end of file diff --git a/app/httprequesthandler.py b/app/httprequesthandler.py index 2b6237c..80848ad 100644 --- a/app/httprequesthandler.py +++ b/app/httprequesthandler.py @@ -2,9 +2,10 @@ import threading import logging import re -from status import Status -from streaming import TCPStreamingClient -from broadcaster import Broadcaster +import traceback +from .status import Status +from .streaming import TCPStreamingClient +from .broadcaster import Broadcaster class HTTPRequestHandler: """Handles the initial connection with HTTP clients""" @@ -40,56 +41,57 @@ def start(self): # Thread to process client requests # def handleRequest(self, clientsock): - buff = "" + buff = bytearray() while True: try: data = clientsock.recv(64) - if (data == ""): + if (data == b""): break - buff += data + buff.extend(data) - if "\r\n\r\n" in buff or "\n\n" in buff: + if buff.find(b"\r\n\r\n") >= 0 or buff.find(b"\n\n") >= 0: break #as soon as the header is sent - we only care about GET requests - except Exception, e: - logging.info(e) + except Exception as e: + logging.error(f"Error on importing request data to buffer") + traceback.print_exc() break - if (buff != ""): + if (buff != b""): try: - match = re.search(r'GET (.*) ', buff) - + match = re.search(b'GET (.*) ', buff) requestPath = match.group(1) - except Exception, e: - logging.info("Client sent unexpected request: {}".format(buff)) + except Exception as e: + logging.error("Client sent unexpected request") + logging.debug(f"Request: {buff}") return #explicitly deal with individual requests. Verbose, but more secure - if ("/status" in requestPath): - clientsock.sendall('HTTP/1.0 200 OK\r\nContentType: text/html\r\n\r\n') - clientsock.sendall(self.statusHTML.format(clientcount = self.broadcast.getClientCount(), bwin = float(self.status.bandwidthIn*8)/1000000, bwout = float(self.status.bandwidthOut*8)/1000000)) + if (b"/status" in requestPath): + clientsock.sendall(b'HTTP/1.0 200 OK\r\nContentType: text/html\r\n\r\n') + clientsock.sendall(self.statusHTML.format(clientcount = self.broadcast.getClientCount(), bwin = float(self.status.bandwidthIn*8)/1000000, bwout = float(self.status.bandwidthOut*8)/1000000).encode()) clientsock.close() - elif ("/style.css" in requestPath): - clientsock.sendall('HTTP/1.0 200 OK\r\nContentType: text/html\r\n\r\n') - clientsock.sendall(self.statusCSS) + elif (b"/style.css" in requestPath): + clientsock.sendall(b'HTTP/1.0 200 OK\r\nContentType: text/html\r\n\r\n') + clientsock.sendall(self.statusCSS.encode()) clientsock.close() - elif ("/stream" in requestPath): + elif (b"/stream" in requestPath): if (self.broadcast.broadcasting): - clientsock.sendall(self.dummyHeader.format(boundaryKey = self.broadcast.boundarySeparator)) + clientsock.sendall(self.dummyHeader.format(boundaryKey = self.broadcast.boundarySeparator.decode()).encode()) client = TCPStreamingClient(clientsock) client.start() self.broadcast.clients.append(client) else: clientsock.close() - elif ("/snapshot" in requestPath): - clientsock.sendall('HTTP/1.0 200 OK\r\n') - clientsock.sendall('Content-Type: image/jpeg\r\n') - clientsock.sendall('Content-Length: {}\r\n\r\n'.format(len(self.broadcast.lastFrame))) + elif (b"/snapshot" in requestPath): + clientsock.sendall(b'HTTP/1.0 200 OK\r\n') + clientsock.sendall(b'Content-Type: image/jpeg\r\n') + clientsock.sendall(f'Content-Length: {len(self.broadcast.lastFrame)}\r\n\r\n'.encode()) clientsock.sendall(self.broadcast.lastFrame) clientsock.close() else: - clientsock.sendall('HTTP/1.0 302 FOUND\r\nLocation: /status') + clientsock.sendall(b'HTTP/1.0 302 FOUND\r\nLocation: /status') clientsock.close() else: logging.info("Client connected but didn't make a request") @@ -105,4 +107,4 @@ def acceptClients(self): clientsock.close() return handlethread = threading.Thread(target = self.handleRequest, args = (clientsock,)) - handlethread.start() \ No newline at end of file + handlethread.start() diff --git a/app/streaming.py b/app/streaming.py index ddeab5d..907d7d5 100644 --- a/app/streaming.py +++ b/app/streaming.py @@ -1,21 +1,21 @@ -import sys -import Queue +import queue import socket import threading -from broadcaster import Broadcaster +from .broadcaster import Broadcaster -try: - from SimpleWebSocketServer.SimpleWebSocketServer import WebSocket -except ImportError, e: - print "Failed to import dependency: {}".format(e) - print "Please ensure the SimpleWebSocketServer submodule has been correctly installed: git submodule update --init" - sys.exit(1) +# try: +# from .SimpleWebSocketServer.SimpleWebSocketServer import WebSocket +# except ImportError as e: +# print(("Failed to import dependency: {}".format(e))) +# print("Please ensure the SimpleWebSocketServer submodule has been correctly installed: git submodule update --init") +# sys.exit(1) +from SimpleWebSocketServer import WebSocket class StreamingClient(object): def __init__(self): - self.streamBuffer = "" - self.streamQueue = Queue.Queue() + self.streamBuffer: bytes = b"" + self.streamQueue = queue.Queue() self.streamThread = threading.Thread(target = self.stream) self.streamThread.daemon = True self.connected = True @@ -50,7 +50,7 @@ def stream(self): if (streamedTo and streamedTo >= 0): self.streamBuffer = self.streamBuffer[streamedTo:] else: - self.streamBuffer = "" + self.streamBuffer = b"" class TCPStreamingClient(StreamingClient): def __init__(self, sock): @@ -64,7 +64,7 @@ def stop(self): def transmit(self, data): try: return self.sock.send(data) - except socket.error, e: + except socket.error as e: self.connected = False self.sock.close() diff --git a/relay.py b/relay.py index 02c0778..cda19e3 100644 --- a/relay.py +++ b/relay.py @@ -1,24 +1,16 @@ import sys +sys.tracebacklimit = 0 import socket import threading -import time -from optparse import OptionParser import os -import Queue -import re import logging -import requests -import base64 +from SimpleWebSocketServer import SimpleWebSocketServer +from optparse import OptionParser from app.status import Status from app.broadcaster import Broadcaster from app.httprequesthandler import HTTPRequestHandler from app.streaming import WebSocketStreamingClient -try: - from app.SimpleWebSocketServer.SimpleWebSocketServer import SimpleWebSocketServer -except ImportError, e: - print "Failed to import dependency: {}".format(e) - print "Please ensure the SimpleWebSocketServer submodule has been correctly installed: git submodule update --init" - sys.exit(1) + # # Close threads gracefully @@ -42,14 +34,20 @@ def quit(): (options, args) = op.parse_args() if (len(args) != 1): - op.print_help() - sys.exit(1) + logging.info(f"ENV SOURCE_URL = {os.environ.get('SOURCE_URL', None)}") + if os.environ.get('SOURCE_URL', None) == None: + op.print_help() + sys.exit(1) + else: + source = os.environ.get('SOURCE_URL', None) + else: + source = args[0] logging.basicConfig(level=logging.WARNING if options.quiet else logging.INFO, format="%(message)s") logging.getLogger("requests").setLevel(logging.WARNING if options.quiet else logging.INFO) if options.debug: - from httplib import HTTPConnection + from http.client import HTTPConnection HTTPConnection.debuglevel = 1 logging.getLogger().setLevel(logging.DEBUG) logging.getLogger("requests").setLevel(logging.DEBUG) @@ -67,7 +65,7 @@ def quit(): statusThread.daemon = True statusThread.start() - broadcaster = Broadcaster(args[0]) + broadcaster = Broadcaster(source) broadcaster.start() requestHandler = HTTPRequestHandler(options.port) @@ -79,7 +77,7 @@ def quit(): webSocketHandlerThread.start() try: - while raw_input() != "quit": + while eval(input()) != "quit": continue quit() except KeyboardInterrupt: diff --git a/requirements.txt b/requirements.txt index f229360..a2d3bca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ requests +SimpleWebSocketServer \ No newline at end of file