Skip to content

Commit

Permalink
FFmpegSource支持sendFilterCmd和enableFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
cqm committed Apr 8, 2024
1 parent 73bb600 commit 8d22e16
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 14 deletions.
9 changes: 8 additions & 1 deletion server/FFmpegSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ void FFmpegSource::setupRecordFlag(bool enable_hls, bool enable_mp4){
_enable_mp4 = enable_mp4;
}

void FFmpegSource::sendFilterCmd(const char *filter, const char *cmd, const char *args) {
char line[1024];
int n = snprintf(line, sizeof(line) - 1, "c%s -1 %s %s\n", filter, cmd, args);
InfoL << line;
_process.write(line, n);
}

void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url, const string &dst_url, int timeout_ms, const onPlay &cb) {
GET_CONFIG(string, ffmpeg_bin, FFmpeg::kBin);
GET_CONFIG(string, ffmpeg_cmd_default, FFmpeg::kCmd);
Expand Down Expand Up @@ -100,7 +107,7 @@ void FFmpegSource::play(const string &ffmpeg_cmd_key, const string &src_url, con
char cmd[2048] = { 0 };
snprintf(cmd, sizeof(cmd), ffmpeg_cmd.data(), File::absolutePath("", ffmpeg_bin).data(), src_url.data(), dst_url.data());
auto log_file = ffmpeg_log.empty() ? "" : File::absolutePath("", ffmpeg_log);
_process.run(cmd, log_file);
_process.run(cmd, log_file, true);
InfoL << cmd;

if (is_local_ip(_media_info.host)) {
Expand Down
4 changes: 4 additions & 0 deletions server/FFmpegSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class FFmpegSource : public std::enable_shared_from_this<FFmpegSource> , public
*/
void setupRecordFlag(bool enable_hls, bool enable_mp4);

void sendFilterCmd(const char *filter, const char *cmd, const char *args);
void enableFilter(const char *filter, bool enable) {
sendFilterCmd(filter, "enable", enable ? "1" : "0");
}
private:
void findAsync(int maxWaitMS ,const std::function<void(const mediakit::MediaSource::Ptr &src)> &cb);
void startTimer(int timeout_ms);
Expand Down
80 changes: 69 additions & 11 deletions server/Process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "Util/File.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/onceToken.h"
#include "Util/uv_errno.h"
#include "Poller/EventPoller.h"

Expand All @@ -46,19 +47,30 @@ static void setupChildProcess() {
signal(SIGSEGV, SIG_DFL);
signal(SIGABRT, SIG_DFL);
}
struct ChildArgs {
std::string cmd, log_file;
int pipeIn[2] = {0};
};

/* Start function for cloned child */
static int runChildProcess(string cmd, string log_file) {
static int runChildProcess(ChildArgs* args) {
setupChildProcess();

string log_file = args->log_file;
if (log_file.empty()) {
//未指定子进程日志文件时,重定向至/dev/null
log_file = "/dev/null";
} else {
log_file = StrPrinter << log_file << "." << getpid();
}

if (isatty(STDIN_FILENO)) {
if (args->pipeIn[0]) {
if (dup2(args->pipeIn[0], STDIN_FILENO) < 0) {
fprintf(stderr, "dup2 stdin failed:%d(%s)\r\n", get_uv_error(), get_uv_errmsg());
}
close(agrs->pipeIn[0]);
close(agrs->pipeIn[1]);
}
else if (isatty(STDIN_FILENO)) {
/* bb_error_msg("ignoring input"); */
close(STDIN_FILENO);
open("/dev/null", O_RDONLY, 0666); /* will be fd 0 (STDIN_FILENO) */
Expand All @@ -80,9 +92,9 @@ static int runChildProcess(string cmd, string log_file) {
// 关闭日志文件
::fclose(fp);
}
fprintf(stderr, "\r\n\r\n#### pid=%d,cmd=%s #####\r\n\r\n", getpid(), cmd.data());
fprintf(stderr, "\r\n\r\n#### pid=%d,cmd=%s #####\r\n\r\n", getpid(), args->cmd.data());

auto params = split(cmd, " ");
auto params = split(args->cmd, " ");
// memory leak in child process, it's ok.
char **charpv_params = new char *[params.size() + 1];
for (int i = 0; i < (int)params.size(); i++) {
Expand All @@ -102,13 +114,12 @@ static int runChildProcess(string cmd, string log_file) {
}

static int cloneFunc(void *ptr) {
auto pair = reinterpret_cast<std::pair<string, string> *>(ptr);
return runChildProcess(pair->first, pair->second);
return runChildProcess(reinterpret_cast<ChildArgs *>(ptr));
}

#endif

void Process::run(const string &cmd, string log_file) {
void Process::run(const string &cmd, string log_file, bool pipeIn) {
kill(2000);
#ifdef _WIN32
STARTUPINFO si = { 0 };
Expand All @@ -133,6 +144,15 @@ void Process::run(const string &cmd, string log_file) {
si.dwFlags = STARTF_USESHOWWINDOW | STARTF_USESTDHANDLES;
si.hStdError = log_fd;
si.hStdOutput = log_fd;
if (pipeIn) {
SECURITY_ATTRIBUTES saAttr;
saAttr.nLength = sizeof(SECURITY_ATTRIBUTES);
saAttr.bInheritHandle = TRUE;
saAttr.lpSecurityDescriptor = NULL;
if (!CreatePipe(&si.hStdInput, &hPipeIn, &saAttr, 0)) {
WarnL << "create stdin pipe error";
}
}
}

LPTSTR lpDir = const_cast<char *>(cmd.data());
Expand All @@ -146,12 +166,24 @@ void Process::run(const string &cmd, string log_file) {
} else {
WarnL << "start child process fail: " << get_uv_errmsg();
}
if (si.hStdInput) {
CloseHandle(si.hStdInput);
}
fclose(fp);
#else

ChildArgs args;
args.cmd = cmd;
args.log_file = log_file;
if (pipeIn) {
static toolkit::onceToken s_token([]() { signal(SIGPIPE, SIG_IGN); });
if (-1 == pipe(args.pipeIn)) {
WarnL << "pip error: " << get_uv_errmsg();
} else {
hPipeIn = args.pipeIn[1];
}
}
#if (defined(__linux) || defined(__linux__))
_process_stack = malloc(STACK_SIZE);
auto args = std::make_pair(cmd, log_file);
_pid = clone(reinterpret_cast<int (*)(void *)>(&cloneFunc), (char *)_process_stack + STACK_SIZE, CLONE_FS | SIGCHLD, (void *)(&args));
if (_pid == -1) {
WarnL << "clone process failed:" << get_uv_errmsg();
Expand All @@ -166,9 +198,12 @@ void Process::run(const string &cmd, string log_file) {
}
if (_pid == 0) {
//子进程
exit(runChildProcess(cmd, log_file));
exit(runChildProcess(&args));
}
#endif
if (pipeIn) {
close(args.pipeIn[0]);
}
if (log_file.empty()) {
//未指定子进程日志文件时,重定向至/dev/null
log_file = "/dev/null";
Expand Down Expand Up @@ -310,12 +345,35 @@ static void s_kill(pid_t pid, void *handle, int max_delay, bool force) {
});
}

int Process::write(const char *buff, int size) {
int ret = -1;
if (hPipeIn) {
#if _WIN32
DWORD temp = 0;
if (WriteFile(hPipeIn, buff, size, &temp, NULL))
ret = temp;
#else
ret = ::write(hPipeIn, buff, size);
#endif
}
return ret;
}

void Process::kill(int max_delay, bool force) {
if (_pid <= 0) {
return;
}
s_kill(_pid, _handle, max_delay, force);
_pid = -1;
if (hPipeIn) {
InfoL << "close pipe " << hPipeIn;
#if _WIN32
CloseHandle(hPipeIn);
#else
close(hPipeIn);
#endif
hPipeIn = 0;
}
#ifdef _WIN32
if (_handle) {
CloseHandle(_handle);
Expand Down
9 changes: 7 additions & 2 deletions server/Process.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#if !defined(__MINGW32__)
typedef int pid_t;
#endif
typedef void *HPIPE;
#else
typedef int HPIPE;
#include <sys/wait.h>
#endif // _WIN32

Expand All @@ -26,14 +28,17 @@ class Process {
public:
Process();
~Process();
void run(const std::string &cmd, std::string log_file);
void kill(int max_delay,bool force = false);
void run(const std::string &cmd, std::string log_file, bool pipeIn = false);
// write to stdin, useable when pipeIn = true
int write(const char *buff, int size);
void kill(int max_delay, bool force = false);
bool wait(bool block = true);
int exit_code();
private:
int _exit_code = 0;
pid_t _pid = -1;
void *_handle = nullptr;
HPIPE hPipeIn = 0;
#if (defined(__linux) || defined(__linux__))
void *_process_stack = nullptr;
#endif
Expand Down

0 comments on commit 8d22e16

Please sign in to comment.