Skip to content
This repository has been archived by the owner on May 28, 2019. It is now read-only.

Commit

Permalink
Merge pull request #479 from taotaowill/master
Browse files Browse the repository at this point in the history
support best-effort type job (cpu and memory excess)
  • Loading branch information
haolifei authored Dec 19, 2016
2 parents 46348f2 + e5fb5d5 commit 03cbc71
Show file tree
Hide file tree
Showing 16 changed files with 485 additions and 161 deletions.
30 changes: 0 additions & 30 deletions sandbox/quick_test.sh

This file was deleted.

101 changes: 92 additions & 9 deletions sandbox/start_all.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,96 @@
#!/bin/bash

echo "1.start nexus"
cd ../thirdsrc/ins/sandbox && nohup ./start_all.sh > ins_start.log 2>&1 &
sleep 2
HOSTNAME=`hostname`
SERVERS="$HOSTNAME:8868,$HOSTNAME:8869,$HOSTNAME:8870,$HOSTNAME:8871,$HOSTNAME:8872"
ROOTPATH=`pwd`
IP=`hostname -i`

#galaxyflag=`pwd`/galaxy.flag
##echo "2.start appmaster"
##nohup ../appmaster --flagfile=$galaxyflag >appmaster.log 2>&1 &
#
#echo "3.start appworker"
#nohup ../appworker --flagfile=$galaxyflag >/dev/null 2>&1 &
function assert_process_ok() {
ps xf | grep "$1" | grep -v grep >/dev/null
if [ $? -eq 0 ]; then
echo "$1 ok"
else
echo "$1 err"
exit 1
fi
}

# copy files
cp -rf ../thirdsrc/ins/output/bin/ins ./
cp -rf ../resman ./
cp -rf ../agent ./
cp -rf ../appmaster ./
cp -rf ../appworker ./
cp -rf ../galaxy_client ./
cp -rf ../galaxy_res_client ./

# prepare ins.flag
echo "--cluster_members=$SERVERS" >ins.flag

# 1.start nexus
echo "start nexus"
nohup ./ins --flagfile=ins.flag --server_id=1 >/dev/null 2>&1 &
nohup ./ins --flagfile=ins.flag --server_id=2 >/dev/null 2>&1 &
nohup ./ins --flagfile=ins.flag --server_id=3 >/dev/null 2>&1 &
nohup ./ins --flagfile=ins.flag --server_id=4 >/dev/null 2>&1 &
nohup ./ins --flagfile=ins.flag --server_id=5 >/dev/null 2>&1 &
sleep 10
assert_process_ok "./ins --flagfile=ins.flag"

# prepare galaxy.flag
echo "--logbufsecs=0" >> galaxy.flag
echo "--alsologtostderr=1" >> galaxy.flag
echo "--username=test" >> galaxy.flag
echo "--token=test" >> galaxy.flag
echo "# resman" >> galaxy.flag
echo "--nexus_addr=$SERVERS" >> galaxy.flag
echo "--nexus_root=/galaxy_test" >> galaxy.flag
echo "--resman_path=/resman" >> galaxy.flag
echo "--safe_mode_percent=0.7" >> galaxy.flag
echo "--sched_interval=10" >> galaxy.flag
echo "--agent_query_interval=2" >> galaxy.flag
echo "# appmaster" >> galaxy.flag
echo "--appmaster_port=8123" >> galaxy.flag
echo "# agent" >> galaxy.flag
echo "--nexus_root_path=/galaxy_test" >> galaxy.flag
echo "--nexus_servers=$SERVERS" >> galaxy.flag
echo "--cmd_line=appworker --nexus_addr=$SERVERS --nexus_root_path=/galaxy_test" >> galaxy.flag
echo "--cpu_resource=4000" >> galaxy.flag
echo "--memory_resource=4294967296" >> galaxy.flag
echo "--mount_templat=/bin,/boot,/cgroups,/dev,/etc,/lib,/lib64,/lost+found,/media,/misc,/mnt,/opt,/sbin,/selinux,/srv,/sys,/tmp,/usr,/var,/noah,/noah/download,/noah/modules,/noah/tmp,/noah/bin,/proc,/cgroups/cpu,/cgroups/memory,/cgroups/cpuacct,/cgroups/tcp_throt,/cgroups/blkio,/cgroups/freezer,/cgroups/net_cls,/home/opt" >> galaxy.flag
echo "--agent_port=1025" >> galaxy.flag
echo "--master_path=/resman" >> galaxy.flag
echo "--galaxy_root_path=$ROOTPATH" >> galaxy.flag
echo "--cgroup_root_path=/cgroups" >> galaxy.flag
echo "--gc_delay_time=86400" >> galaxy.flag
echo "--agent_ip=$IP" >> galaxy.flag
echo "--agent_hostname=$HOSTNAME" >> galaxy.flag
echo "--volum_resource=/dev/vdb:2906620387328:DISK:/home/galaxy" >> galaxy.flag

# 2. start resman
echo "start resman"
nohup ./resman --flagfile=galaxy.flag >resman.log 2>&1 &
sleep 5
assert_process_ok "./resman"

# 3.start appmaster
echo "start appmaster"
nohup ./appmaster --flagfile=galaxy.flag >appmaster.log 2>&1 &
sleep 5
assert_process_ok "./appmaster"

# 4.start agent
echo "start agent"
export PATH=$PATH:$ROOTPATH
nohup ./agent --flagfile=galaxy.flag >agent.log 2>&1 &
assert_process_ok "./agent"

# galaxy options
./galaxy_res_client add_agent -e 10.100.40.100:1025 -p main_pool
./galaxy_res_client add_user -u test -t test
./galaxy_res_client grant_user -u test -p main_pool -o add -a create_container,remove_container,update_container,list_containers,submit_job,remove_job,update_job,list_jobs
./galaxy_res_client assign_quota -u test -c 4000 -d 4G -s 1M e -m 4G -r 1000

sleep 20
./galaxy_client submit -f test.json
./galaxy_client submit -f over.json
33 changes: 29 additions & 4 deletions sandbox/stop_all.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
#!/bin/bash

pkill ins
echo "stop ins"
# remove jobs
jobs=`./galaxy_client list | grep job | awk '{print $2}'`
if [ -n "$jobs" ]
then
for jobid in $jobs
do
./galaxy_client remove -i $jobid
done
sleep 10
fi

pkill appworker
echo "stop appworker"
# 1.kill appworker
# killall appworker

# 2.stop appmaster
killall appmaster

# 3.stop resman
killall resman

# 4.stop agent
killall agent

# 5.stop nexus
killall ins

rm -rf ./ins ./appmaster ./appworker ./resman ./agent ./galaxy_client ./galaxy_res_client
rm -rf /ins.flag ./galaxy.flag
rm -rf ./binlog *.log *.INFO* *.WARNING*
rm -rf ./data ./gc_dir/ ./work_dir/
4 changes: 4 additions & 0 deletions src/agent/agent_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ DEFINE_int64(gc_delay_time, 43200, "");
DEFINE_int64(volum_collect_cycle, 18000, "");
DEFINE_int64(cgroup_collect_cycle, 5000, "");
DEFINE_string(v2_prefix, "/home/baidulinux/V2", "v2 prefix");

DEFINE_int32(assign_level, 2, "assign level: {0, 1, 2, 3}");
DEFINE_int32(check_assign_interval, 5000, "check assign interval");

12 changes: 6 additions & 6 deletions src/agent/agent_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void AgentImpl::Setup()
LOG(INFO) << "init resource manager watcher successfully";


baidu::galaxy::collector::CollectorEngine::GetInstance()->Setup();
baidu::galaxy::collector::CollectorEngine::GetInstance()->Setup();

heartbeat_pool_.AddTask(boost::bind(&AgentImpl::KeepAlive, this, FLAGS_keepalive_interval));
LOG(INFO) << "start keep alive thread, interval is " << FLAGS_keepalive_interval << "ms";
Expand Down Expand Up @@ -128,7 +128,7 @@ void AgentImpl::CreateContainer(::google::protobuf::RpcController* controller,
{
LOG(INFO) << "recv create container request: " << request->DebugString();
int64_t x = baidu::common::timer::get_micros();
std::cerr <<x<< "create " << request->id() << std::endl;
std::cerr << x << "create " << request->id() << std::endl;

baidu::galaxy::container::ContainerId id(request->container_group_id(), request->id());
baidu::galaxy::proto::ErrorCode* ec = response->mutable_code();
Expand Down Expand Up @@ -187,7 +187,6 @@ void AgentImpl::Query(::google::protobuf::RpcController* controller,
ai->set_start_time(start_time_);
ai->set_version(version_);


bool full_report = false;
if (request->has_full_report() && request->full_report()) {
full_report = true;
Expand All @@ -206,8 +205,10 @@ void AgentImpl::Query(::google::protobuf::RpcController* controller,

std::map<std::string, int64_t> volum_used;
for (size_t i = 0; i < cis.size(); i++) {
cpu_used += cis[i]->cpu_used();
memory_used += cis[i]->memory_used();
if (cis[i]->container_desc().priority() != baidu::galaxy::proto::kJobBestEffort) {
cpu_used += cis[i]->cpu_used();
memory_used += cis[i]->memory_used();
}

for (int j = 0; j < cis[i]->volum_used_size(); j++) {
if (cis[i]->volum_used(j).medium() == baidu::galaxy::proto::kTmpfs) {
Expand Down Expand Up @@ -259,6 +260,5 @@ void AgentImpl::Query(::google::protobuf::RpcController* controller,
done->Run();
}


}
}
2 changes: 1 addition & 1 deletion src/agent/collector/collector_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class CollectorEngine {
bool running_;
baidu::common::ThreadPool fast_collector_pool_; // for fast
baidu::common::ThreadPool collector_pool_; // for slow
baidu::common::Thread main_collect_thread_;;
baidu::common::Thread main_collect_thread_;

};
}
Expand Down
121 changes: 95 additions & 26 deletions src/agent/container/container_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@
#include "boost/bind.hpp"
#include <glog/logging.h>

DECLARE_int32(check_assign_interval);
DECLARE_int64(cpu_resource);
DECLARE_int64(memory_resource);
DECLARE_int32(assign_level);

namespace baidu {
namespace galaxy {
namespace container {

ContainerManager::ContainerManager(boost::shared_ptr<baidu::galaxy::resource::ResourceManager> resman) :
res_man_(resman),
check_assign_pool_(1),
running_(false),
serializer_(new Serializer()),
container_gc_(new ContainerGc()) {
Expand Down Expand Up @@ -66,6 +72,11 @@ void ContainerManager::Setup() {
LOG(INFO) << "setup container gc successful";
running_ = true;
this->keep_alive_thread_.Start(boost::bind(&ContainerManager::KeepAliveRoutine, this));
if (FLAGS_assign_level > 0) {
this->check_assign_pool_.DelayTask(
FLAGS_check_assign_interval,
boost::bind(&ContainerManager::CheckAssignRoutine, this));
}
}

void ContainerManager::KeepAliveRoutine() {
Expand All @@ -84,6 +95,90 @@ void ContainerManager::KeepAliveRoutine() {
}
}

void ContainerManager::CheckAssignRoutine() {
std::vector<boost::shared_ptr<baidu::galaxy::proto::ContainerInfo> > cis;
ListContainers(cis, true);

int64_t cpu_used = 0L;
int64_t cpu_deep_assigned = 0L;
int64_t memory_used = 0L;
int64_t memory_deep_assigned = 0L;

for (size_t i = 0; i < cis.size(); i++) {
const baidu::galaxy::proto::ContainerDescription& desc = cis[i]->container_desc();
if (desc.priority() != baidu::galaxy::proto::kJobBestEffort) {
cpu_used += cis[i]->cpu_used();
memory_used += cis[i]->memory_used();
} else {
// calculate resource
for (int i = 0; i < desc.cgroups_size(); i++) {
cpu_deep_assigned += desc.cgroups(i).cpu().milli_core();
memory_deep_assigned += desc.cgroups(i).memory().size();
}
for (int i = 0; i < desc.data_volums_size(); i++) {
if (desc.data_volums(i).medium() == baidu::galaxy::proto::kTmpfs) {
// tmpfs as assigned
memory_deep_assigned += desc.data_volums(i).size();
}
}
}
}

VLOG(10)
<< "### check assign routine"
<< ", cpu_used: " << cpu_used
<< ", cpu_deep_assigned: " << cpu_deep_assigned
<< ", cpu: " << (cpu_used + cpu_deep_assigned)
<< ", cpu_total: " << FLAGS_cpu_resource
<< ", memory_used: " << memory_used
<< ", memory_deep_assigned: " << memory_deep_assigned
<< ", memory: " << (memory_used + memory_deep_assigned)
<< ", memory_total: " << FLAGS_memory_resource;

if (memory_used + memory_deep_assigned > FLAGS_memory_resource) {
LOG(WARNING) << "memory_reserved is danger";
EvictAssignedContainer(cis, kEvictTypeMemory);
} else {
if (cpu_used + cpu_deep_assigned > FLAGS_cpu_resource) {
LOG(WARNING) << "cpu_reserved is danger";
EvictAssignedContainer(cis, kEvictTypeCpu);
}
}

check_assign_pool_.DelayTask(
FLAGS_check_assign_interval,
boost::bind(&ContainerManager::CheckAssignRoutine, this));
}

void ContainerManager::EvictAssignedContainer(
std::vector<boost::shared_ptr<baidu::galaxy::proto::ContainerInfo> >& cis,
EvictType evict_type) {
VLOG(10) << "evict assigned container, by: " << evict_type;

ContainerId container_id;
int64_t max_used = -1L;
for (size_t i = 0; i < cis.size(); i++) {
if (cis[i]->container_desc().priority() == baidu::galaxy::proto::kJobBestEffort) {
LOG(INFO) << "find best effort container: " << cis[i]->id();
if (kEvictTypeMemory == evict_type) {
if (cis[i]->memory_used() > max_used) {
max_used = cis[i]->memory_used();
container_id.SetGroupId(cis[i]->group_id()).SetSubId(cis[i]->id());
}
} else {
if (cis[i]->cpu_used() > max_used) {
max_used = cis[i]->cpu_used();
container_id.SetGroupId(cis[i]->group_id()).SetSubId(cis[i]->id());
}
}
}
}
if (!container_id.Empty()) {
VLOG(10) << "will evict: " << container_id.ToString();
ReleaseContainer(container_id);
}
}

baidu::galaxy::util::ErrorCode ContainerManager::CreateContainer(const ContainerId& id, const baidu::galaxy::proto::ContainerDescription& desc) {
// enter creating stage, every time only one thread does creating
ScopedCreatingStage lock_stage(stage_, id.SubId());
Expand Down Expand Up @@ -352,31 +447,6 @@ int ContainerManager::Reload() {
return 0;
}


baidu::galaxy::util::ErrorCode ContainerManager::CheckDescription(const baidu::galaxy::proto::ContainerDescription& desc) {
if ((desc.has_container_type()
&& desc.container_type() == baidu::galaxy::proto::kVolumContainer)
|| desc.volum_containers_size() == 0) {
return ERRORCODE_OK;
}

int size = desc.volum_containers_size();
boost::mutex::scoped_lock lock(mutex_);

for (int i = 0; i < size; i++) {
//std::string id = desc.volum_containers(i);
const ContainerId id;
std::map<ContainerId, boost::shared_ptr<baidu::galaxy::container::IContainer> >::iterator iter = work_containers_.find(id);

if (work_containers_.end() == iter) {
return ERRORCODE(-1, "%s donot exist", id.CompactId().c_str());
}
}

return ERRORCODE_OK;
}


baidu::galaxy::util::ErrorCode ContainerManager::DependentVolums(const baidu::galaxy::proto::ContainerDescription& desc,
std::map<std::string, std::string>& dv) {
if (desc.volum_containers_size() > 0) {
Expand Down Expand Up @@ -445,7 +515,6 @@ baidu::galaxy::util::ErrorCode ContainerManager::DependentVolums(const Container
return ERRORCODE_OK;
}


}
}
}
Loading

0 comments on commit 03cbc71

Please sign in to comment.