Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modernize #28

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
3 changes: 0 additions & 3 deletions .gitmodules

This file was deleted.

12 changes: 9 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from python:2.7-onbuild
FROM --platform=$TARGETPLATFORM python:3.10-slim

expose 54321
ENV PYTHONUNBUFFERED=1
ENV SOURCE_URL="http://localhost:8081/?action=stream"

entrypoint ["python", "relay.py"]
COPY . /
RUN pip3 install -r /requirements.txt

EXPOSE 54321
EXPOSE 54322
CMD ["python", "/relay.py"]
36 changes: 36 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
pipeline{

agent any

environment {
IMAGE_NAME="hdavid0510/mjpeg-relay"
IMAGE_TAG="latest"
REGISTRY_CREDENTIALS=credentials('dockerhub-credential')
}

stages {
stage('Init') {
steps {
echo 'Initializing.'

sh 'echo $REGISTRY_CREDENTIALS_PSW | docker login -u $REGISTRY_CREDENTIALS_USR --password-stdin'
echo "Running ${env.BUILD_ID} on ${env.JENKINS_URL}"
echo "Building ${IMAGE_NAME} on branch ${IMAGE_TAG}"
}
}
stage('Build/Push') {
steps {
echo 'Building image and pushing to DockerHub.'

sh 'docker buildx build --push --platform linux/386,linux/amd64,linux/arm/v5,linux/arm/v7,linux/arm64,linux/ppc64le,linux/s390x -t $IMAGE_NAME:$IMAGE_TAG .'
}
}
}

post {
always {
sh 'docker logout'
}
}

}
59 changes: 46 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
mjpeg-relay
===========
# mjpeg-relay

mjpeg-relay is a simple Python script which accepts input from an existing MJPEG stream, and relays it to any number of clients. This is intended for a scenario where the original MJPEG stream is hosted on a low-bandwidth internet connection (such as a home connection) or from a low-resource device (such as a Raspberry Pi or IP camera), and you have a server from which you wish to relay the stream to multiple clients without placing extra demand on the original stream.

The script is designed to be simple to use with minimal configuration. All video parameters are defined by the source stream, ensuring mjpeg-relay is as transparent as possible. Rather than creating its own MJPEG stream, mjpeg-relay simply re-streams the original MJPEG stream directly. This is a faster and more transparent approach.

# Features
**Python 3.6+ is required.**


## Features
- Low resource
- Low latency
- Status page
- Option to stream to clients via WebSockets

# Installation

## Installation
1. Clone this repository with `git clone <URL>`
2. Ensure submodules are correctly installed by running `git submodule update --init`
2. Ensure dependencies are correctly installed by running `pip install -r requirements.txt`

# Usage

## Usage
`relay.py [-p <relay port>] [-w <WebSocket port>] [-q] [-d] stream-source-url`

- **-p \<relay port\>**: Port that the stream will be relayed on (default is 54321)
Expand All @@ -30,16 +34,18 @@ Once it is running, you can access the following URLs:
* `/stream`: the MJPEG stream. This can be embedded directly into an `<img>` tag on modern browsers like so: `<img src="http://localhost:54321/stream">`
* `/snapshot`: the latest JPEG frame from the MJPEG stream

# Example

**Relaying MJPEG stream at 192.0.2.1:1234/?action=stream on port 54017**
## Example

**Situation:** Relaying MJPEG stream at 192.0.2.1:1234/?action=stream on port 54017

1. Start the relay: `python relay.py -p 54017 "http://192.0.2.1:1234/?action=stream"`
2. Confirm that mjpeg-relay has connected to the remote stream
3. Connect to the relayed stream at `http://localhost:54017/stream`. This can be embedded directly into an `<img>` tag on modern browsers like so: `<img src="http://localhost:54017/stream">`
4. The status of mjpeg-relay is displayed at `http://localhost:54017/status`

# WebSocket Example

## WebSocket Example

**As above, but also relaying the MJPEG stream via WebSockets on port 54018**

Expand All @@ -48,11 +54,38 @@ Once it is running, you can access the following URLs:
3. Copy and paste the example HTML and JavaScript in the file `websocketexample.html` into your website, and adapt as necessary
4. The status of mjpeg-relay is displayed at `http://localhost:54017/status`

# Docker

Here is the same example done with docker:

# mjpeg-relay Docker image

[![](https://img.shields.io/docker/pulls/hdavid0510/mjpeg-relay?style=flat-square)](https://hub.docker.com/r/hdavid0510/mjpeg-relay) [![](https://img.shields.io/github/issues/hdavid0510/mjpeg-relay?style=flat-square)](https://github.com/hdavid0510/mjpeg-relay/issues)
Docker image which all the scripts in this repository is preinstalled on [python:alpine](https://hub.docker.com/r/_/python)


## Tags

### latest
[![](https://img.shields.io/docker/v/hdavid0510/mjpeg-relay/latest?style=flat-square)]() [![](https://img.shields.io/docker/image-size/hdavid0510/mjpeg-relay/latest?style=flat-square)]()
Built from `master` branch


## Environment Variables

### `SOURCE_URL`
* URL of the source of the stream, in form of `http://user:password@ip:port/path/to/stream/`.
* **DEFAULT** `http://localhost:8081/?action=stream`


## Port Bindings
| Option | Port# | Type | Service |
| ------ | ----- | ---- | ------- |
|__Required__|54321|tcp| Relayed stream is shown through this port. |
|_Optional_|54322|tcp| Relayed stream **via WebSocket** is shown through this port.|


## Build

``` shell
docker build -t relay .
docker run -d -p 54017:54321 relay "http://192.0.2.1:1234/?action=stream"
docker build -t IMAGE_NAME .
docker run -it -p 54017:54321 -e "http://192.0.2.1:1234/?action=stream" IMAGE_NAME
```
1 change: 0 additions & 1 deletion app/SimpleWebSocketServer
Submodule SimpleWebSocketServer deleted from 9da9b7
83 changes: 46 additions & 37 deletions app/broadcaster.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
import Queue
import sys
sys.tracebacklimit = 0
import traceback
import threading
import logging
import requests
import re
import time
import base64
from status import Status
from .status import Status

class HTTPBasicThenDigestAuth(requests.auth.HTTPDigestAuth):
"""Try HTTPBasicAuth, then HTTPDigestAuth."""

def __init__(self):
super(HTTPBasicThenDigestAuth, self).__init__(None, None)
def __init__(self):
super(HTTPBasicThenDigestAuth, self).__init__(None, None)

def __call__(self, r):
# Extract auth from URL
self.username, self.password = requests.utils.get_auth_from_url(r.url)
def __call__(self, r):
# Extract auth from URL
self.username, self.password = requests.utils.get_auth_from_url(r.url)

# Prepare basic auth
r = requests.auth.HTTPBasicAuth(self.username, self.password).__call__(r)
# Prepare basic auth
r = requests.auth.HTTPBasicAuth(self.username, self.password).__call__(r)

# Let HTTPDigestAuth handle the 401
return super(HTTPBasicThenDigestAuth, self).__call__(r)
# Let HTTPDigestAuth handle the 401
return super(HTTPBasicThenDigestAuth, self).__call__(r)

class Broadcaster:
"""Handles relaying the source MJPEG stream to connected clients"""
Expand All @@ -40,8 +42,8 @@ def __init__(self, url):
self.broadcastThread = threading.Thread(target = self.streamFromSource)
self.broadcastThread.daemon = True

self.lastFrame = ""
self.lastFrameBuffer = ""
self.lastFrame: bytes = b""
self.lastFrameBuffer: bytes = b""

self.connected = False
self.broadcasting = False
Expand All @@ -50,33 +52,37 @@ def __init__(self, url):
feedLostFile = open("app/resources/feedlost.jpeg", "rb") #read-only, binary
feedLostImage = feedLostFile.read()
feedLostFile.close()

self.feedLostFrame = "Content-Type: image/jpeg\r\n"\
"Content-Length: {}\r\n\r\n"\
"{}".format(len(feedLostImage), feedLostImage)
except IOError, e:
logging.warning("Unable to read feedlost.jpeg: {}".format(e))
self.feedLostFrame = bytearray(f"Content-Type: image/jpeg\r\nContent-Length: {len(feedLostImage)}\r\n\r\n",'utf-8')
self.feedLostFrame.extend(feedLostImage)
except IOError as e:
logging.warning("Unable to read feedlost.jpeg")
# traceback.print_exc()
self.feedLostFrame = False

Broadcaster._instance = self

def start(self):
if (self.connectToStream()):
self.broadcasting = True
logging.info("Connected to stream source, boundary separator: {}".format(self.boundarySeparator))
logging.info(f"Connected to stream source, boundary separator: {self.boundarySeparator}")
self.broadcastThread.start()

#
# Connects to the stream source
#
def connectToStream(self):
try:
self.sourceStream = requests.get(self.url, stream = True, timeout = 10, auth = HTTPBasicThenDigestAuth())
except Exception, e:
logging.error("Error: Unable to connect to stream source at {}: {}".format(self.url, e))
self.sourceStream = requests.get(self.url, stream = True, timeout = 3, auth = HTTPBasicThenDigestAuth())
except Exception as e:
logging.error(f"[ERROR] Unable to connect to stream source at {self.url}")
traceback.print_exc()
return False
except:
logging.error("[ERROR] failed to connect to stream source.")
pass


self.boundarySeparator = self.parseStreamHeader(self.sourceStream.headers['Content-Type'])
self.boundarySeparator: bytes = self.parseStreamHeader(self.sourceStream.headers['Content-Type']).encode()

if (not self.boundarySeparator):
logging.error("Unable to find boundary separator in the header returned from the stream source")
Expand All @@ -88,7 +94,7 @@ def connectToStream(self):
#
# Parses the stream header and returns the boundary separator
#
def parseStreamHeader(self, header):
def parseStreamHeader(self, header) -> str:
if (not isinstance(header, str)):
return None

Expand All @@ -99,8 +105,8 @@ def parseStreamHeader(self, header):
boundary = "--" + boundary
return boundary
except:
logging.error("Unexpected header returned from stream source: unable to parse boundary")
logging.error(header)
logging.error("[ERROR] Unexpected header returned from stream source; unable to parse boundary")
logging.debug(f"header={header}")
return None

#
Expand All @@ -112,7 +118,7 @@ def getClientCount(self):
#
# Process data in frame buffer, extract frames when present
#
def extractFrames(self, frameBuffer):
def extractFrames(self, frameBuffer: bytes):
if (frameBuffer.count(self.boundarySeparator) >= 2):
#calculate the start and end points of the frame
start = frameBuffer.find(self.boundarySeparator)
Expand All @@ -122,7 +128,7 @@ def extractFrames(self, frameBuffer):
mjpegFrame = frameBuffer[start:end]

#extract frame data
frameStart = frameBuffer.find("\r\n\r\n", start) + len("\r\n\r\n")
frameStart = frameBuffer.find(b"\r\n\r\n", start) + len(b"\r\n\r\n")
frame = frameBuffer[frameStart:end]

#process for WebSocket clients
Expand All @@ -135,17 +141,17 @@ def extractFrames(self, frameBuffer):
#
# Broadcast data to a list of StreamingClients
#
def broadcastToStreamingClients(self, clients, data):
def broadcastToStreamingClients(self, clients, data: bytes):
for client in clients:
if (not client.connected):
clients.remove(client)
logging.info("Client left. Client count: {}".format(self.getClientCount()))
logging.info(f"\nClient left; {self.getClientCount()} client(s) connected")
client.bufferStreamData(data)

#
# Broadcast data to all connected clients
#
def broadcast(self, data):
def broadcast(self, data: bytes):
self.lastFrameBuffer += data

mjpegFrame, webSocketFrame, frame, bufferProcessedTo = self.extractFrames(self.lastFrameBuffer)
Expand Down Expand Up @@ -177,15 +183,18 @@ def streamFromSource(self):
self.broadcast(data)
self.status.addToBytesIn(len(data))
self.status.addToBytesOut(len(data)*self.getClientCount())
except Exception, e:
logging.error("Lost connection to the stream source: {}".format(e))
except Exception as e:
logging.error(f"Lost connection to the stream source: \n")
finally:
#flush the frame buffer to avoid conflicting with future frame data
self.lastFrameBuffer = ""
self.lastFrameBuffer = b""
self.connected = False
while (not self.connected):
data_during_lost = bytearray(self.boundarySeparator)
data_during_lost.extend(b"\r\n")
data_during_lost.extend(self.feedLostFrame)
data_during_lost.extend(b"\r\n")
if (self.feedLostFrame):
data = self.boundarySeparator + "\r\n" + self.feedLostFrame + "\r\n"
self.broadcast(data)
self.broadcast(data_during_lost)
time.sleep(5)
self.connectToStream()
Loading