Skip to content

Commit

Permalink
Add Advanced proxy routing via environment variables & refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
mhdzumair committed Nov 15, 2024
1 parent ff110e1 commit f9bda43
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 99 deletions.
76 changes: 68 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,87 @@ MediaFlow Proxy is a powerful and flexible solution for proxifying various types

## Features

### Stream Processing
- Convert MPEG-DASH streams (DRM-protected and non-protected) to HLS
- Support for Clear Key DRM-protected MPD DASH streams
- Support for non-DRM protected DASH live and VOD streams
- Proxy and modify HLS (M3U8) streams in real-time
- Proxy HTTP/HTTPS links with custom headers
- Proxy and modify HLS (M3U8) streams in real-time with custom headers and key URL modifications for bypassing some sneaky restrictions.
- Retrieve public IP address of the MediaFlow Proxy server for use with Debrid services

### Proxy & Routing
- Advanced proxy routing system with support for:
- Domain-based routing rules
- Protocol-specific routing (HTTP/HTTPS)
- Subdomain and wildcard patterns
- Port-specific routing
- Support for HTTP/HTTPS/SOCKS5 proxy forwarding
- Protect against unauthorized access and network bandwidth abuses
- Support for play expired or self-signed SSL certificates server streams `(verify_ssl=false)` default is `false`
- Flexible request proxy usage control per request `(use_request_proxy=true/false)` default is `true`
- Obfuscating endpoint parameters by encrypting them to hide sensitive information from third-party.
- Optional IP-based access control restriction & expiration for encrypted URLs to prevent unauthorized access
- Flexible SSL verification control per route
- Support for expired or self-signed SSL certificates
- Public IP address retrieval for Debrid services integration

### Security
- API password protection against unauthorized access & Network bandwidth abuse prevention
- Parameter encryption to hide sensitive information
- Optional IP-based access control for encrypted URLs
- URL expiration support for encrypted URLs

### Additional Features
- Built-in speed test for RealDebrid and AllDebrid services
- Custom header injection and modification
- Real-time HLS manifest manipulation
- HLS Key URL modifications for bypassing stream restrictions


## Configuration

Set the following environment variables:

- `API_PASSWORD`: Required. Protects against unauthorized access and API network abuses.
- `PROXY_URL`: Optional. HTTP/HTTPS/SOCKS5 proxy URL for forwarding network requests.
- `ENABLE_STREAMING_PROGRESS`: Optional. Enable streaming progress logging. Default is `false`.

### Proxy Configuration Examples

MediaFlow Proxy now supports advanced proxy routing using HTTPX's routing system. You can configure different proxy rules for different domains, protocols, and patterns. Here are some examples:

1. Basic proxy configuration with a default proxy:
```env
PROXY_DEFAULT_URL=http://default-proxy:8080
```

2. Advanced routing with multiple rules:
```env
PROXY_ROUTES='{
"all://*.debrid.com": {
"proxy_url": "socks5://debrid-proxy:8080"
},
"https://internal.company.com": {
"proxy_url": null,
"verify_ssl": false
},
"all://api.external.com": {
"proxy_url": "http://api-proxy:8080",
"verify_ssl": false
}
}'
```

Proxy routing supports various patterns:
- Domain routing: `"all://example.com"`
- Subdomain routing: `"all://*.example.com"`
- Protocol-specific routing: `"https://example.com"`
- Port-specific routing: `"all://*:1234"`
- Wildcard routing: `"all://"`

### Speed Test Feature

MediaFlow Proxy now includes a built-in speed test feature for testing RealDebrid and AllDebrid network speeds. To access the speed test:

1. Open your browser and navigate to `http://your-server:8888/speedtest.html`
2. The speed test page allows you to:
- Test download speeds from RealDebrid servers
- Test download speeds from AllDebrid servers


## Installation

### Option 1: Self-Hosted Deployment
Expand Down
40 changes: 39 additions & 1 deletion mediaflow_proxy/configs.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,47 @@
from typing import Dict, Optional

import httpx
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings


class ProxyRoute(BaseModel):
proxy_url: Optional[str] = None
verify_ssl: bool = True


class ProxyConfig(BaseSettings):
default_url: Optional[str] = None
routes: Dict[str, ProxyRoute] = Field(default_factory=dict)

def get_mounts(
self, async_http: bool = True
) -> Dict[str, Optional[httpx.HTTPTransport | httpx.AsyncHTTPTransport]]:
"""
Get a dictionary of httpx mount points to transport instances.
"""
mounts = {}
transport_cls = httpx.AsyncHTTPTransport if async_http else httpx.HTTPTransport

# Add specific routes
for pattern, route in self.routes.items():
mounts[pattern] = transport_cls(proxy=route.proxy_url, verify=route.verify_ssl) if route.proxy_url else None

# Set default proxy if specified
if self.default_url:
mounts["all://"] = transport_cls(proxy=self.default_url)

return mounts

class Config:
env_file = ".env"
env_prefix = "PROXY_"
extra = "ignore"


class Settings(BaseSettings):
api_password: str # The password for accessing the API endpoints.
proxy_url: str | None = None # The URL of the proxy server to route requests through.
proxy_config: ProxyConfig = Field(default_factory=ProxyConfig) # Configuration for proxying requests.
enable_streaming_progress: bool = False # Whether to enable streaming progress tracking.

user_agent: str = (
Expand Down
9 changes: 3 additions & 6 deletions mediaflow_proxy/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import httpx

from mediaflow_proxy.configs import settings
from mediaflow_proxy.utils.http_utils import create_httpx_client


class BaseExtractor(ABC):
Expand All @@ -17,19 +18,15 @@ def __init__(self, proxy_enabled: bool, request_headers: dict):
}
self.base_headers.update(request_headers)

async def _make_request(
self, url: str, headers: Optional[Dict] = None, follow_redirects: bool = True, **kwargs
) -> httpx.Response:
async def _make_request(self, url: str, headers: Optional[Dict] = None, **kwargs) -> httpx.Response:
"""Make HTTP request with error handling."""
try:
async with httpx.AsyncClient(proxy=self.proxy_url) as client:
async with create_httpx_client() as client:
request_headers = self.base_headers
request_headers.update(headers or {})
response = await client.get(
url,
headers=request_headers,
follow_redirects=follow_redirects,
timeout=30,
**kwargs,
)
response.raise_for_status()
Expand Down
52 changes: 10 additions & 42 deletions mediaflow_proxy/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,22 @@
request_with_retry,
EnhancedStreamingResponse,
ProxyRequestHeaders,
create_httpx_client,
)
from .utils.m3u8_processor import M3U8Processor
from .utils.mpd_utils import pad_base64

logger = logging.getLogger(__name__)


async def setup_client_and_streamer(use_request_proxy: bool, verify_ssl: bool) -> tuple[httpx.AsyncClient, Streamer]:
async def setup_client_and_streamer() -> tuple[httpx.AsyncClient, Streamer]:
"""
Set up an HTTP client and a streamer.
Args:
use_request_proxy (bool): Whether to use a proxy for the request.
verify_ssl (bool): Whether to verify SSL certificates.
Returns:
tuple: An httpx.AsyncClient instance and a Streamer instance.
"""
client = httpx.AsyncClient(
follow_redirects=True,
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
proxy=settings.proxy_url if use_request_proxy else None,
verify=verify_ssl,
)
client = create_httpx_client()
return client, Streamer(client)


Expand Down Expand Up @@ -83,7 +74,7 @@ async def handle_hls_stream_proxy(
Returns:
Union[Response, EnhancedStreamingResponse]: Either a processed m3u8 playlist or a streaming response.
"""
client, streamer = await setup_client_and_streamer(hls_params.use_request_proxy, hls_params.verify_ssl)
client, streamer = await setup_client_and_streamer()

try:
if urlparse(hls_params.destination).path.endswith((".m3u", ".m3u8")):
Expand Down Expand Up @@ -111,16 +102,13 @@ async def handle_hls_stream_proxy(
background=BackgroundTask(streamer.close),
)
except Exception as e:
await client.aclose()
return handle_exceptions(e)


async def handle_stream_request(
method: str,
video_url: str,
proxy_headers: ProxyRequestHeaders,
verify_ssl: bool = True,
use_request_proxy: bool = True,
) -> Response:
"""
Handle general stream requests.
Expand All @@ -131,13 +119,11 @@ async def handle_stream_request(
method (str): The HTTP method (e.g., 'GET' or 'HEAD').
video_url (str): The URL of the video to stream.
proxy_headers (ProxyRequestHeaders): Headers to be used in the proxy request.
verify_ssl (bool, optional): Whether to verify SSL certificates. Defaults to True.
use_request_proxy (bool, optional): Whether to use a proxy for the request. Defaults to True.
Returns:
Union[Response, EnhancedStreamingResponse]: Either a HEAD response or a streaming response.
"""
client, streamer = await setup_client_and_streamer(use_request_proxy, verify_ssl)
client, streamer = await setup_client_and_streamer()

try:
response = await streamer.head(video_url, proxy_headers.request)
Expand Down Expand Up @@ -189,9 +175,7 @@ async def proxy_stream(method: str, stream_params: ProxyStreamParams, proxy_head
Returns:
Response: The HTTP response with the streamed content.
"""
return await handle_stream_request(
method, stream_params.destination, proxy_headers, stream_params.verify_ssl, stream_params.use_request_proxy
)
return await handle_stream_request(method, stream_params.destination, proxy_headers)


async def fetch_and_process_m3u8(
Expand Down Expand Up @@ -277,8 +261,6 @@ async def get_manifest(
manifest_params.destination,
headers=proxy_headers.request,
parse_drm=not manifest_params.key_id and not manifest_params.key,
verify_ssl=manifest_params.verify_ssl,
use_request_proxy=manifest_params.use_request_proxy,
)
except DownloadError as e:
raise HTTPException(status_code=e.status_code, detail=f"Failed to download MPD: {e.message}")
Expand Down Expand Up @@ -320,8 +302,6 @@ async def get_playlist(
headers=proxy_headers.request,
parse_drm=not playlist_params.key_id and not playlist_params.key,
parse_segment_profile_id=playlist_params.profile_id,
verify_ssl=playlist_params.verify_ssl,
use_request_proxy=playlist_params.use_request_proxy,
)
return await process_playlist(request, mpd_dict, playlist_params.profile_id, proxy_headers)

Expand All @@ -341,15 +321,8 @@ async def get_segment(
Response: The HTTP response with the processed segment.
"""
try:
init_content = await get_cached_init_segment(
segment_params.init_url, proxy_headers.request, segment_params.verify_ssl, segment_params.use_request_proxy
)
segment_content = await download_file_with_retry(
segment_params.segment_url,
proxy_headers.request,
verify_ssl=segment_params.verify_ssl,
use_request_proxy=segment_params.use_request_proxy,
)
init_content = await get_cached_init_segment(segment_params.init_url, proxy_headers.request)
segment_content = await download_file_with_retry(segment_params.segment_url, proxy_headers.request)
except Exception as e:
return handle_exceptions(e)

Expand All @@ -363,17 +336,12 @@ async def get_segment(
)


async def get_public_ip(use_request_proxy: bool = True):
async def get_public_ip():
"""
Retrieves the public IP address of the MediaFlow proxy.
Args:
use_request_proxy (bool, optional): Whether to use the proxy configuration from the user's MediaFlow config. Defaults to True.
Returns:
Response: The HTTP response with the public IP address.
"""
ip_address_data = await request_with_retry(
"GET", "https://api.ipify.org?format=json", {}, use_request_proxy=use_request_proxy
)
ip_address_data = await request_with_retry("GET", "https://api.ipify.org?format=json", {})
return ip_address_data.json()
6 changes: 2 additions & 4 deletions mediaflow_proxy/routes/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,11 @@ async def segment_endpoint(


@proxy_router.get("/ip")
async def get_mediaflow_proxy_public_ip(
use_request_proxy: bool = True,
):
async def get_mediaflow_proxy_public_ip():
"""
Retrieves the public IP address of the MediaFlow proxy server.
Returns:
Response: The HTTP response with the public IP address in the form of a JSON object. {"ip": "xxx.xxx.xxx.xxx"}
"""
return await get_public_ip(use_request_proxy)
return await get_public_ip()
3 changes: 0 additions & 3 deletions mediaflow_proxy/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ class GenerateUrlRequest(BaseModel):
class GenericParams(BaseModel):
model_config = ConfigDict(populate_by_name=True)

verify_ssl: bool = Field(False, description="Whether to verify the SSL certificate of the destination.")
use_request_proxy: bool = Field(True, description="Whether to use the MediaFlow proxy configuration.")


class HLSManifestParams(GenericParams):
destination: str = Field(..., description="The URL of the HLS manifest.", alias="d")
Expand Down
14 changes: 3 additions & 11 deletions mediaflow_proxy/utils/cache_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,7 @@ async def check_and_clean_file(file_path: Path):


# Specific cache implementations
async def get_cached_init_segment(
init_url: str, headers: dict, verify_ssl: bool = True, use_request_proxy: bool = True
) -> Optional[bytes]:
async def get_cached_init_segment(init_url: str, headers: dict) -> Optional[bytes]:
"""Get initialization segment from cache or download it."""
# Try cache first
cached_data = await INIT_SEGMENT_CACHE.get(init_url)
Expand All @@ -332,9 +330,7 @@ async def get_cached_init_segment(

# Download if not cached
try:
init_content = await download_file_with_retry(
init_url, headers, verify_ssl=verify_ssl, use_request_proxy=use_request_proxy
)
init_content = await download_file_with_retry(init_url, headers)
if init_content:
await INIT_SEGMENT_CACHE.set(init_url, init_content)
return init_content
Expand All @@ -348,8 +344,6 @@ async def get_cached_mpd(
headers: dict,
parse_drm: bool,
parse_segment_profile_id: str | None = None,
verify_ssl: bool = True,
use_request_proxy: bool = True,
) -> Optional[dict]:
"""Get MPD from cache or download and parse it."""
# Try cache first
Expand All @@ -363,9 +357,7 @@ async def get_cached_mpd(

# Download and parse if not cached
try:
mpd_content = await download_file_with_retry(
mpd_url, headers, verify_ssl=verify_ssl, use_request_proxy=use_request_proxy
)
mpd_content = await download_file_with_retry(mpd_url, headers)
mpd_dict = parse_mpd(mpd_content)
parsed_dict = parse_mpd_dict(mpd_dict, mpd_url, parse_drm, parse_segment_profile_id)

Expand Down
Loading

0 comments on commit f9bda43

Please sign in to comment.