diff --git a/sandbox/quick_test.sh b/sandbox/quick_test.sh deleted file mode 100755 index 0efe389f..00000000 --- a/sandbox/quick_test.sh +++ /dev/null @@ -1,30 +0,0 @@ -#! /bin/bash - -hn=`hostname` - -## prepare cgroups -#CGROUP_ROOT=/cgroups -#mkdir -p $CGROUP_ROOT/cpu && mount -t cgroup -ocpu none $CGROUP_ROOT/cpu >/dev/null 2>&1 -#mkdir -p $CGROUP_ROOT/memory && mount -t cgroup -omemory none $CGROUP_ROOT/memory >/dev/null 2>&1 -#mkdir -p $CGROUP_ROOT/cpuacct && mount -t cgroup -ocpuacct none $CGROUP_ROOT/cpuacct >/dev/null 2>&1 -#mkdir -p $CGROUP_ROOT/freezer && mount -t cgroup -ofreezer none $CGROUP_ROOT/freezer >/dev/null 2>&1 - -# start nexus -rm -rf galaxy.flag -touch galaxy.flag -echo "--nexus_servers=$hn:8868,$hn:8869,$hn:8870,$hn:8871,$hn:8872" >> galaxy.flag -echo "--appworker_endpoint=127.0.0.1:8221" >> galaxy.flag -echo "--appworker_container_id=container-1" >> galaxy.flag -echo "--logbufsecs=0" >> galaxy.flag - -## stop galaxy -#./stop_all.sh -# -## start galaxy -#./start_all.sh -# -### submit job -##../galaxy submit -f sample.json -##../galaxy jobs -##../galaxy agents - diff --git a/sandbox/start_all.sh b/sandbox/start_all.sh index ea8e83fd..3d77c406 100755 --- a/sandbox/start_all.sh +++ b/sandbox/start_all.sh @@ -1,13 +1,96 @@ #!/bin/bash -echo "1.start nexus" -cd ../thirdsrc/ins/sandbox && nohup ./start_all.sh > ins_start.log 2>&1 & -sleep 2 +HOSTNAME=`hostname` +SERVERS="$HOSTNAME:8868,$HOSTNAME:8869,$HOSTNAME:8870,$HOSTNAME:8871,$HOSTNAME:8872" +ROOTPATH=`pwd` +IP=`hostname -i` -#galaxyflag=`pwd`/galaxy.flag -##echo "2.start appmaster" -##nohup ../appmaster --flagfile=$galaxyflag >appmaster.log 2>&1 & -# -#echo "3.start appworker" -#nohup ../appworker --flagfile=$galaxyflag >/dev/null 2>&1 & +function assert_process_ok() { + ps xf | grep "$1" | grep -v grep >/dev/null + if [ $? -eq 0 ]; then + echo "$1 ok" + else + echo "$1 err" + exit 1 + fi +} +# copy files +cp -rf ../thirdsrc/ins/output/bin/ins ./ +cp -rf ../resman ./ +cp -rf ../agent ./ +cp -rf ../appmaster ./ +cp -rf ../appworker ./ +cp -rf ../galaxy_client ./ +cp -rf ../galaxy_res_client ./ + +# prepare ins.flag +echo "--cluster_members=$SERVERS" >ins.flag + +# 1.start nexus +echo "start nexus" +nohup ./ins --flagfile=ins.flag --server_id=1 >/dev/null 2>&1 & +nohup ./ins --flagfile=ins.flag --server_id=2 >/dev/null 2>&1 & +nohup ./ins --flagfile=ins.flag --server_id=3 >/dev/null 2>&1 & +nohup ./ins --flagfile=ins.flag --server_id=4 >/dev/null 2>&1 & +nohup ./ins --flagfile=ins.flag --server_id=5 >/dev/null 2>&1 & +sleep 10 +assert_process_ok "./ins --flagfile=ins.flag" + +# prepare galaxy.flag +echo "--logbufsecs=0" >> galaxy.flag +echo "--alsologtostderr=1" >> galaxy.flag +echo "--username=test" >> galaxy.flag +echo "--token=test" >> galaxy.flag +echo "# resman" >> galaxy.flag +echo "--nexus_addr=$SERVERS" >> galaxy.flag +echo "--nexus_root=/galaxy_test" >> galaxy.flag +echo "--resman_path=/resman" >> galaxy.flag +echo "--safe_mode_percent=0.7" >> galaxy.flag +echo "--sched_interval=10" >> galaxy.flag +echo "--agent_query_interval=2" >> galaxy.flag +echo "# appmaster" >> galaxy.flag +echo "--appmaster_port=8123" >> galaxy.flag +echo "# agent" >> galaxy.flag +echo "--nexus_root_path=/galaxy_test" >> galaxy.flag +echo "--nexus_servers=$SERVERS" >> galaxy.flag +echo "--cmd_line=appworker --nexus_addr=$SERVERS --nexus_root_path=/galaxy_test" >> galaxy.flag +echo "--cpu_resource=4000" >> galaxy.flag +echo "--memory_resource=4294967296" >> galaxy.flag +echo "--mount_templat=/bin,/boot,/cgroups,/dev,/etc,/lib,/lib64,/lost+found,/media,/misc,/mnt,/opt,/sbin,/selinux,/srv,/sys,/tmp,/usr,/var,/noah,/noah/download,/noah/modules,/noah/tmp,/noah/bin,/proc,/cgroups/cpu,/cgroups/memory,/cgroups/cpuacct,/cgroups/tcp_throt,/cgroups/blkio,/cgroups/freezer,/cgroups/net_cls,/home/opt" >> galaxy.flag +echo "--agent_port=1025" >> galaxy.flag +echo "--master_path=/resman" >> galaxy.flag +echo "--galaxy_root_path=$ROOTPATH" >> galaxy.flag +echo "--cgroup_root_path=/cgroups" >> galaxy.flag +echo "--gc_delay_time=86400" >> galaxy.flag +echo "--agent_ip=$IP" >> galaxy.flag +echo "--agent_hostname=$HOSTNAME" >> galaxy.flag +echo "--volum_resource=/dev/vdb:2906620387328:DISK:/home/galaxy" >> galaxy.flag + +# 2. start resman +echo "start resman" +nohup ./resman --flagfile=galaxy.flag >resman.log 2>&1 & +sleep 5 +assert_process_ok "./resman" + +# 3.start appmaster +echo "start appmaster" +nohup ./appmaster --flagfile=galaxy.flag >appmaster.log 2>&1 & +sleep 5 +assert_process_ok "./appmaster" + +# 4.start agent +echo "start agent" +export PATH=$PATH:$ROOTPATH +nohup ./agent --flagfile=galaxy.flag >agent.log 2>&1 & +assert_process_ok "./agent" + +# galaxy options +./galaxy_res_client add_agent -e 10.100.40.100:1025 -p main_pool +./galaxy_res_client add_user -u test -t test +./galaxy_res_client grant_user -u test -p main_pool -o add -a create_container,remove_container,update_container,list_containers,submit_job,remove_job,update_job,list_jobs +./galaxy_res_client assign_quota -u test -c 4000 -d 4G -s 1M e -m 4G -r 1000 + +sleep 20 +./galaxy_client submit -f test.json +./galaxy_client submit -f over.json diff --git a/sandbox/stop_all.sh b/sandbox/stop_all.sh index 1669763a..8f40a090 100755 --- a/sandbox/stop_all.sh +++ b/sandbox/stop_all.sh @@ -1,7 +1,32 @@ #!/bin/bash -pkill ins -echo "stop ins" +# remove jobs +jobs=`./galaxy_client list | grep job | awk '{print $2}'` +if [ -n "$jobs" ] + then + for jobid in $jobs + do + ./galaxy_client remove -i $jobid + done + sleep 10 +fi -pkill appworker -echo "stop appworker" +# 1.kill appworker +# killall appworker + +# 2.stop appmaster +killall appmaster + +# 3.stop resman +killall resman + +# 4.stop agent +killall agent + +# 5.stop nexus +killall ins + +rm -rf ./ins ./appmaster ./appworker ./resman ./agent ./galaxy_client ./galaxy_res_client +rm -rf /ins.flag ./galaxy.flag +rm -rf ./binlog *.log *.INFO* *.WARNING* +rm -rf ./data ./gc_dir/ ./work_dir/ diff --git a/src/agent/agent_flags.cc b/src/agent/agent_flags.cc index f154ced4..f5891200 100644 --- a/src/agent/agent_flags.cc +++ b/src/agent/agent_flags.cc @@ -28,3 +28,7 @@ DEFINE_int64(gc_delay_time, 43200, ""); DEFINE_int64(volum_collect_cycle, 18000, ""); DEFINE_int64(cgroup_collect_cycle, 5000, ""); DEFINE_string(v2_prefix, "/home/baidulinux/V2", "v2 prefix"); + +DEFINE_int32(assign_level, 2, "assign level: {0, 1, 2, 3}"); +DEFINE_int32(check_assign_interval, 5000, "check assign interval"); + diff --git a/src/agent/agent_impl.cc b/src/agent/agent_impl.cc index 60701a16..9b740480 100644 --- a/src/agent/agent_impl.cc +++ b/src/agent/agent_impl.cc @@ -96,7 +96,7 @@ void AgentImpl::Setup() LOG(INFO) << "init resource manager watcher successfully"; - baidu::galaxy::collector::CollectorEngine::GetInstance()->Setup(); + baidu::galaxy::collector::CollectorEngine::GetInstance()->Setup(); heartbeat_pool_.AddTask(boost::bind(&AgentImpl::KeepAlive, this, FLAGS_keepalive_interval)); LOG(INFO) << "start keep alive thread, interval is " << FLAGS_keepalive_interval << "ms"; @@ -128,7 +128,7 @@ void AgentImpl::CreateContainer(::google::protobuf::RpcController* controller, { LOG(INFO) << "recv create container request: " << request->DebugString(); int64_t x = baidu::common::timer::get_micros(); - std::cerr <id() << std::endl; + std::cerr << x << "create " << request->id() << std::endl; baidu::galaxy::container::ContainerId id(request->container_group_id(), request->id()); baidu::galaxy::proto::ErrorCode* ec = response->mutable_code(); @@ -187,7 +187,6 @@ void AgentImpl::Query(::google::protobuf::RpcController* controller, ai->set_start_time(start_time_); ai->set_version(version_); - bool full_report = false; if (request->has_full_report() && request->full_report()) { full_report = true; @@ -206,8 +205,10 @@ void AgentImpl::Query(::google::protobuf::RpcController* controller, std::map volum_used; for (size_t i = 0; i < cis.size(); i++) { - cpu_used += cis[i]->cpu_used(); - memory_used += cis[i]->memory_used(); + if (cis[i]->container_desc().priority() != baidu::galaxy::proto::kJobBestEffort) { + cpu_used += cis[i]->cpu_used(); + memory_used += cis[i]->memory_used(); + } for (int j = 0; j < cis[i]->volum_used_size(); j++) { if (cis[i]->volum_used(j).medium() == baidu::galaxy::proto::kTmpfs) { @@ -259,6 +260,5 @@ void AgentImpl::Query(::google::protobuf::RpcController* controller, done->Run(); } - } } diff --git a/src/agent/collector/collector_engine.h b/src/agent/collector/collector_engine.h index b0fe3570..022b604a 100644 --- a/src/agent/collector/collector_engine.h +++ b/src/agent/collector/collector_engine.h @@ -122,7 +122,7 @@ class CollectorEngine { bool running_; baidu::common::ThreadPool fast_collector_pool_; // for fast baidu::common::ThreadPool collector_pool_; // for slow - baidu::common::Thread main_collect_thread_;; + baidu::common::Thread main_collect_thread_; }; } diff --git a/src/agent/container/container_manager.cc b/src/agent/container/container_manager.cc index 1409febe..4a65d53d 100644 --- a/src/agent/container/container_manager.cc +++ b/src/agent/container/container_manager.cc @@ -11,12 +11,18 @@ #include "boost/bind.hpp" #include +DECLARE_int32(check_assign_interval); +DECLARE_int64(cpu_resource); +DECLARE_int64(memory_resource); +DECLARE_int32(assign_level); + namespace baidu { namespace galaxy { namespace container { ContainerManager::ContainerManager(boost::shared_ptr resman) : res_man_(resman), + check_assign_pool_(1), running_(false), serializer_(new Serializer()), container_gc_(new ContainerGc()) { @@ -66,6 +72,11 @@ void ContainerManager::Setup() { LOG(INFO) << "setup container gc successful"; running_ = true; this->keep_alive_thread_.Start(boost::bind(&ContainerManager::KeepAliveRoutine, this)); + if (FLAGS_assign_level > 0) { + this->check_assign_pool_.DelayTask( + FLAGS_check_assign_interval, + boost::bind(&ContainerManager::CheckAssignRoutine, this)); + } } void ContainerManager::KeepAliveRoutine() { @@ -84,6 +95,90 @@ void ContainerManager::KeepAliveRoutine() { } } +void ContainerManager::CheckAssignRoutine() { + std::vector > cis; + ListContainers(cis, true); + + int64_t cpu_used = 0L; + int64_t cpu_deep_assigned = 0L; + int64_t memory_used = 0L; + int64_t memory_deep_assigned = 0L; + + for (size_t i = 0; i < cis.size(); i++) { + const baidu::galaxy::proto::ContainerDescription& desc = cis[i]->container_desc(); + if (desc.priority() != baidu::galaxy::proto::kJobBestEffort) { + cpu_used += cis[i]->cpu_used(); + memory_used += cis[i]->memory_used(); + } else { + // calculate resource + for (int i = 0; i < desc.cgroups_size(); i++) { + cpu_deep_assigned += desc.cgroups(i).cpu().milli_core(); + memory_deep_assigned += desc.cgroups(i).memory().size(); + } + for (int i = 0; i < desc.data_volums_size(); i++) { + if (desc.data_volums(i).medium() == baidu::galaxy::proto::kTmpfs) { + // tmpfs as assigned + memory_deep_assigned += desc.data_volums(i).size(); + } + } + } + } + + VLOG(10) + << "### check assign routine" + << ", cpu_used: " << cpu_used + << ", cpu_deep_assigned: " << cpu_deep_assigned + << ", cpu: " << (cpu_used + cpu_deep_assigned) + << ", cpu_total: " << FLAGS_cpu_resource + << ", memory_used: " << memory_used + << ", memory_deep_assigned: " << memory_deep_assigned + << ", memory: " << (memory_used + memory_deep_assigned) + << ", memory_total: " << FLAGS_memory_resource; + + if (memory_used + memory_deep_assigned > FLAGS_memory_resource) { + LOG(WARNING) << "memory_reserved is danger"; + EvictAssignedContainer(cis, kEvictTypeMemory); + } else { + if (cpu_used + cpu_deep_assigned > FLAGS_cpu_resource) { + LOG(WARNING) << "cpu_reserved is danger"; + EvictAssignedContainer(cis, kEvictTypeCpu); + } + } + + check_assign_pool_.DelayTask( + FLAGS_check_assign_interval, + boost::bind(&ContainerManager::CheckAssignRoutine, this)); +} + +void ContainerManager::EvictAssignedContainer( + std::vector >& cis, + EvictType evict_type) { + VLOG(10) << "evict assigned container, by: " << evict_type; + + ContainerId container_id; + int64_t max_used = -1L; + for (size_t i = 0; i < cis.size(); i++) { + if (cis[i]->container_desc().priority() == baidu::galaxy::proto::kJobBestEffort) { + LOG(INFO) << "find best effort container: " << cis[i]->id(); + if (kEvictTypeMemory == evict_type) { + if (cis[i]->memory_used() > max_used) { + max_used = cis[i]->memory_used(); + container_id.SetGroupId(cis[i]->group_id()).SetSubId(cis[i]->id()); + } + } else { + if (cis[i]->cpu_used() > max_used) { + max_used = cis[i]->cpu_used(); + container_id.SetGroupId(cis[i]->group_id()).SetSubId(cis[i]->id()); + } + } + } + } + if (!container_id.Empty()) { + VLOG(10) << "will evict: " << container_id.ToString(); + ReleaseContainer(container_id); + } +} + baidu::galaxy::util::ErrorCode ContainerManager::CreateContainer(const ContainerId& id, const baidu::galaxy::proto::ContainerDescription& desc) { // enter creating stage, every time only one thread does creating ScopedCreatingStage lock_stage(stage_, id.SubId()); @@ -352,31 +447,6 @@ int ContainerManager::Reload() { return 0; } - -baidu::galaxy::util::ErrorCode ContainerManager::CheckDescription(const baidu::galaxy::proto::ContainerDescription& desc) { - if ((desc.has_container_type() - && desc.container_type() == baidu::galaxy::proto::kVolumContainer) - || desc.volum_containers_size() == 0) { - return ERRORCODE_OK; - } - - int size = desc.volum_containers_size(); - boost::mutex::scoped_lock lock(mutex_); - - for (int i = 0; i < size; i++) { - //std::string id = desc.volum_containers(i); - const ContainerId id; - std::map >::iterator iter = work_containers_.find(id); - - if (work_containers_.end() == iter) { - return ERRORCODE(-1, "%s donot exist", id.CompactId().c_str()); - } - } - - return ERRORCODE_OK; -} - - baidu::galaxy::util::ErrorCode ContainerManager::DependentVolums(const baidu::galaxy::proto::ContainerDescription& desc, std::map& dv) { if (desc.volum_containers_size() > 0) { @@ -445,7 +515,6 @@ baidu::galaxy::util::ErrorCode ContainerManager::DependentVolums(const Container return ERRORCODE_OK; } - } } } diff --git a/src/agent/container/container_manager.h b/src/agent/container/container_manager.h index c33ae88b..b0821652 100644 --- a/src/agent/container/container_manager.h +++ b/src/agent/container/container_manager.h @@ -9,9 +9,9 @@ #include "serializer.h" #include "boost/shared_ptr.hpp" -#include "boost/thread/mutex.hpp" - +#include "boost/thread/mutex.hpp" #include "thread.h" +#include "thread_pool.h" #include "container_gc.h" #include @@ -21,6 +21,11 @@ namespace baidu { namespace galaxy { namespace container { +enum EvictType { + kEvictTypeMemory = 1, + kEvictTypeCpu = 2 +}; + class ContainerManager { public: explicit ContainerManager(boost::shared_ptr resman); @@ -33,9 +38,6 @@ class ContainerManager { baidu::galaxy::util::ErrorCode ReleaseContainer(const ContainerId& id); void ListContainers(std::vector >& cis, bool fullinfo); - // check dependence - baidu::galaxy::util::ErrorCode CheckDescription(const baidu::galaxy::proto::ContainerDescription& desc); - private: baidu::galaxy::util::ErrorCode DependentVolums(const baidu::galaxy::proto::ContainerDescription& desc, std::map& dv); @@ -48,6 +50,10 @@ class ContainerManager { const baidu::galaxy::proto::ContainerDescription& desc); void KeepAliveRoutine(); + void CheckAssignRoutine(); + void EvictAssignedContainer( + std::vector >& cis, + EvictType evict_type); int Reload(); void DumpProperty(boost::shared_ptr container); @@ -58,8 +64,9 @@ class ContainerManager { ContainerStage stage_; - boost::unordered_map > contianer_info_; + std::vector > container_infos_; baidu::common::Thread keep_alive_thread_; + baidu::common::ThreadPool check_assign_pool_; bool running_; boost::shared_ptr serializer_; diff --git a/src/agent/resource/resource_manager.cc b/src/agent/resource/resource_manager.cc index df7c3b3a..e45f442c 100644 --- a/src/agent/resource/resource_manager.cc +++ b/src/agent/resource/resource_manager.cc @@ -56,14 +56,15 @@ baidu::galaxy::util::ErrorCode ResourceManager::Allocate(const baidu::galaxy::pr CalResource(desc, cpu_millicores, memroy_require, vv); boost::mutex::scoped_lock lock(mutex_); - // allocate cpu - if (0 != cpu_->Allocate(cpu_millicores)) { - return ERRORCODE(-1, "allocat cpu resource failed"); - } - - if (0 != memory_->Allocate(memroy_require)) { - cpu_->Release(cpu_millicores); - return ERRORCODE(-1, "allocat memory resource failed"); + if (desc.priority() != proto::kJobBestEffort) { + // allocate cpu + if (0 != cpu_->Allocate(cpu_millicores)) { + return ERRORCODE(-1, "allocat cpu resource failed"); + } + if (0 != memory_->Allocate(memroy_require)) { + cpu_->Release(cpu_millicores); + return ERRORCODE(-1, "allocat memory resource failed"); + } } baidu::galaxy::util::ErrorCode ec = this->Allocate(vv); @@ -109,16 +110,18 @@ baidu::galaxy::util::ErrorCode ResourceManager::Release(const baidu::galaxy::pro std::vector vv; CalResource(desc, cpu_millicores, memroy_require, vv); boost::mutex::scoped_lock lock(mutex_); - int ret = cpu_->Release(cpu_millicores); + if (desc.priority() != proto::kJobBestEffort) { + int ret = cpu_->Release(cpu_millicores); - if (0 != ret) { - return ERRORCODE(-1, "release cpu resource failed"); - } + if (0 != ret) { + return ERRORCODE(-1, "release cpu resource failed"); + } - ret = memory_->Release(memroy_require); + ret = memory_->Release(memroy_require); - if (0 != ret) { - return ERRORCODE(-1, "release memory resource failed"); + if (0 != ret) { + return ERRORCODE(-1, "release memory resource failed"); + } } for (size_t i = 0; i < vv.size(); i++) { @@ -184,8 +187,10 @@ void ResourceManager::CalResource(const baidu::galaxy::proto::ContainerDescripti vv.clear(); for (int i = 0; i < desc.cgroups_size(); i++) { - memroy_require += desc.cgroups(i).memory().size(); - cpu_millicores += desc.cgroups(i).cpu().milli_core(); + if (desc.priority() != proto::kJobBestEffort) { + memroy_require += desc.cgroups(i).memory().size(); + cpu_millicores += desc.cgroups(i).cpu().milli_core(); + } } vv.push_back(&desc.workspace_volum()); diff --git a/src/agent/volum/bind_volum.cc b/src/agent/volum/bind_volum.cc index 45d8ae8a..b9fd63b2 100644 --- a/src/agent/volum/bind_volum.cc +++ b/src/agent/volum/bind_volum.cc @@ -41,7 +41,6 @@ baidu::galaxy::util::ErrorCode BindVolum::Construct_() { const boost::shared_ptr vr = Description(); assert(vr->medium() == baidu::galaxy::proto::kDisk || vr->medium() == baidu::galaxy::proto::kSsd); - assert(!vr->use_symlink()); boost::system::error_code ec; boost::filesystem::path target_path(this->TargetPath()); diff --git a/src/appworker/process_manager.cc b/src/appworker/process_manager.cc index 73a70177..cdfec768 100644 --- a/src/appworker/process_manager.cc +++ b/src/appworker/process_manager.cc @@ -13,6 +13,8 @@ #include #include +#include +#include #include #include #include @@ -147,10 +149,19 @@ int ProcessManager::CreateProcess(const ProcessEnv& env, // if package exist if (!file::IsExists(download_context->package)) { - cmd = "wget --timeout=" + boost::lexical_cast(FLAGS_process_manager_download_timeout) - + " -O " + download_context->package - + " " + download_context->src_path - + " && " + cmd; + // p2p or wget + if (boost::contains(download_context->src_path, "ftp://") + || boost::contains(download_context->src_path, "http://")) { + cmd = "wget --timeout=" + boost::lexical_cast(FLAGS_process_manager_download_timeout) + + " -O " + download_context->package + + " " + download_context->src_path + + " && " + cmd; + } else { + cmd = "gko3 down --hang-time " + boost::lexical_cast(FLAGS_process_manager_download_timeout) + + " -n " + download_context->package + + " -i " + download_context->src_path + + " && " + cmd; + } } } @@ -374,7 +385,8 @@ void ProcessManager::LoopWaitProcesses() { LOG(INFO) << "process: " << it->second->process_id << ", " << "pid: " << pid << ", " - << "exit code: " << it->second->exit_code; + << "exit code: " << it->second->exit_code << ", " + << "exit status: " << proto::ProcessStatus_Name(it->second->status); break; } } diff --git a/src/protocol/galaxy.proto b/src/protocol/galaxy.proto index 0339fea5..31bde50a 100755 --- a/src/protocol/galaxy.proto +++ b/src/protocol/galaxy.proto @@ -402,7 +402,7 @@ message ContainerInfo { optional string id = 1; optional string group_id = 2; optional int64 created_time = 3; - optional ContainerStatus status = 4; + optional ContainerStatus status = 4; optional ContainerDescription container_desc = 5; //when RM need full report // resource? optional int64 cpu_used = 6; diff --git a/src/resman/resman_flags.cc b/src/resman/resman_flags.cc index 99a3189a..2772d18b 100755 --- a/src/resman/resman_flags.cc +++ b/src/resman/resman_flags.cc @@ -11,3 +11,6 @@ DEFINE_int32(container_group_max_replica, 100000, "max replica allowed for one g DEFINE_double(safe_mode_percent, 0.85, "when agent alive percent bigger than this, leave safe mode"); DEFINE_bool(check_container_version, false, "by default, AM will handle that"); DEFINE_int32(max_batch_pods, 12, "max batch pods per agent"); + +DEFINE_int32(overassign_level, 2, "overassign level: {0, 1, 2, 3}"); +DEFINE_double(reserved_percent, 2.0, "resource reserved percent"); diff --git a/src/resman/resman_impl.cc b/src/resman/resman_impl.cc index 10feb80d..e0614286 100755 --- a/src/resman/resman_impl.cc +++ b/src/resman/resman_impl.cc @@ -41,7 +41,7 @@ namespace galaxy { ResManImpl::ResManImpl() : scheduler_(new sched::Scheduler()), safe_mode_(true), - force_safe_mode_(false), + force_safe_mode_(false), start_time_(0) { nexus_ = new InsSDK(FLAGS_nexus_addr); } @@ -64,7 +64,7 @@ bool ResManImpl::Init() { const proto::AgentMeta& agent_meta = agent_it->second; pools_[agent_meta.pool()].insert(endpoint); } - + std::map tag_map; load_ok = LoadObjects(sTagPrefix, tag_map); if (!load_ok) { @@ -86,7 +86,7 @@ bool ResManImpl::Init() { LOG(WARNING) << "fail to load user meta"; return false; } - + ReloadUsersAuth(); load_ok = LoadObjects(sContainerGroupPrefix, container_groups_); if (!load_ok) { @@ -187,10 +187,15 @@ void ResManImpl::LeaveSafeMode(::google::protobuf::RpcController* controller, ::google::protobuf::Closure* done) { MutexLock lock(&mu_); if (safe_mode_) { - safe_mode_ = false; - force_safe_mode_ = false; - scheduler_->Start(); - response->mutable_error_code()->set_status(proto::kOk); + if (force_safe_mode_) { + safe_mode_ = false; + force_safe_mode_ = false; + scheduler_->Start(); + response->mutable_error_code()->set_status(proto::kOk); + } else { + response->mutable_error_code()->set_status(proto::kError); + response->mutable_error_code()->set_reason("invalide op, cluster in auto safemode"); + } } else { response->mutable_error_code()->set_status(proto::kError); response->mutable_error_code()->set_reason("invalid op, cluster is not in safe mode"); @@ -360,7 +365,8 @@ void ResManImpl::QueryAgentCallback(std::string agent_endpoint, const std::set& tags = agent_tags_[agent_endpoint]; std::string pool_name = agent_meta.pool(); sched::Agent::Ptr agent(new sched::Agent(agent_endpoint, - cpu, memory, + cpu, + memory, volums, tags, pool_name)); @@ -387,8 +393,8 @@ void ResManImpl::QueryAgentCallback(std::string agent_endpoint, return; } agent_stats_[agent_endpoint].info = response->agent_info(); - if (!force_safe_mode_ && - safe_mode_ && + if (!force_safe_mode_ && + safe_mode_ && agent_stats_.size() > (double)agents_.size() * FLAGS_safe_mode_percent) { int64_t running_time = (common::timer::get_micros() - start_time_) / 1000000; LOG(INFO) << "running time: " << running_time << " seconds"; diff --git a/src/resman/scheduler.cc b/src/resman/scheduler.cc index 0141d0f1..1c9d767e 100755 --- a/src/resman/scheduler.cc +++ b/src/resman/scheduler.cc @@ -20,6 +20,7 @@ DECLARE_int64(sched_interval); DECLARE_int64(container_group_gc_check_interval); DECLARE_bool(check_container_version); DECLARE_int32(max_batch_pods); +DECLARE_double(reserved_percent); namespace baidu { namespace galaxy { @@ -38,8 +39,14 @@ Agent::Agent(const AgentEndpoint& endpoint, endpoint_ = endpoint; cpu_total_ = cpu; cpu_assigned_ = 0; + cpu_reserved_ = 0; + cpu_deep_assigned_ = 0; + cpu_deep_reserved_ = 0; memory_total_ = memory; memory_assigned_ = 0; + memory_reserved_ = 0; + memory_deep_assigned_ = 0; + memory_deep_reserved_ = 0; volum_total_ = volums; port_total_ = sMaxPort - sMinPort + 1; tags_ = tags; @@ -54,12 +61,16 @@ ContainerGroupId Agent::ExtractGroupId(const ContainerId& container_id) { } void Agent::SetAssignment(int64_t cpu_assigned, + int64_t cpu_deep_assigned, int64_t memory_assigned, + int64_t memory_deep_assigned, const std::map& volum_assigned, const std::set port_assigned, const std::map& containers) { cpu_assigned_ = cpu_assigned; + cpu_deep_assigned_ = cpu_deep_assigned; memory_assigned_ = memory_assigned; + memory_deep_assigned_ = memory_deep_assigned; volum_assigned_ = volum_assigned; port_assigned_ = port_assigned; containers_ = containers; @@ -79,7 +90,7 @@ void Agent::SetAssignment(int64_t cpu_assigned, << endpoint_; } if (container->priority == proto::kJobBatch) { - batch_container_count_ ++; + batch_container_count_ ++; } } @@ -93,15 +104,36 @@ void Agent::SetAssignment(int64_t cpu_assigned, } } +void Agent::SetReserved(int64_t cpu_reserved, + int64_t cpu_deep_reserved, + int64_t memory_reserved, + int64_t memory_deep_reserved) { + LOG(INFO) + << "# cpu_reserved: " << cpu_reserved + << ", cpu_deep_reserved: " << cpu_deep_reserved + << ", memory_reserved: " << memory_reserved + << ", memory_deep_reserved: " << memory_deep_reserved; + cpu_reserved_ = cpu_reserved; + cpu_deep_reserved_ = cpu_deep_reserved; + memory_reserved_ = memory_reserved; + memory_deep_reserved_ = memory_deep_reserved; +} bool Agent::TryPut(const Container* container, ResourceError& err) { + LOG(INFO) + << "### TryPut, agent: " << endpoint_ + << ", container: " << container->id + << ", cpu[a/r/da/dr]: " + << cpu_assigned_ << "," << cpu_reserved_ << "," << cpu_deep_assigned_ << "," << cpu_deep_reserved_ + << ", mem[a/r/da/dr]: " + << memory_assigned_ << "," << memory_reserved_ << "," << memory_deep_assigned_ << "," << memory_deep_reserved_; if (!container->require->tag.empty() && tags_.find(container->require->tag) == tags_.end()) { err = proto::kTagMismatch; return false; } - if (container->require->pool_names.find(pool_name_) - == container->require->pool_names.end()) { + if (container->require->pool_names.find(pool_name_) + == container->require->pool_names.end()) { err = proto::kPoolMismatch; return false; } @@ -114,17 +146,28 @@ bool Agent::TryPut(const Container* container, ResourceError& err) { if (cur_counts >= container->require->max_per_host) { err = proto::kTooManyPods; return false; - } + } } } - if (container->require->CpuNeed() + cpu_assigned_ > cpu_total_) { - err = proto::kNoCpu; - return false; - } - if (container->require->MemoryNeed() + memory_assigned_ > memory_total_) { - err = proto::kNoMemory; - return false; + if (container->priority != proto::kJobBestEffort) { + if (container->require->CpuNeed() + cpu_assigned_ > cpu_total_) { + err = proto::kNoCpu; + return false; + } + if (container->require->MemoryNeed() + memory_assigned_ > memory_total_) { + err = proto::kNoMemory; + return false; + } + } else { + if (cpu_reserved_ + cpu_deep_assigned_ + container->require->CpuNeed() > cpu_total_) { + err = proto::kNoCpu; + return false; + } + if (memory_reserved_ + memory_deep_assigned_ + container->require->MemoryNeed() > memory_total_) { + err = proto::kNoMemory; + return false; + } } int64_t size_ramdisk = 0; @@ -139,9 +182,16 @@ bool Agent::TryPut(const Container* container, ResourceError& err) { } } - if (size_ramdisk + memory_assigned_ + container->require->MemoryNeed()> memory_total_) { - err = proto::kNoMemoryForTmpfs; - return false; + if (container->priority != proto::kJobBestEffort) { + if (size_ramdisk + memory_assigned_ + container->require->MemoryNeed()> memory_total_) { + err = proto::kNoMemoryForTmpfs; + return false; + } + } else { + if (size_ramdisk + memory_assigned_ > memory_total_) { + err = proto::kNoMemoryForTmpfs; + return false; + } } std::vector devices; @@ -182,11 +232,16 @@ bool Agent::TryPut(const Container* container, ResourceError& err) { void Agent::Put(Container::Ptr container) { assert(container->status == kContainerPending); assert(container->allocated_agent.empty()); - //cpu - cpu_assigned_ += container->require->CpuNeed(); - assert(cpu_assigned_ <= cpu_total_); - //memory - memory_assigned_ += container->require->MemoryNeed(); + if (container->priority != proto::kJobBestEffort) { + //cpu + cpu_assigned_ += container->require->CpuNeed(); + assert(cpu_assigned_ <= cpu_total_); + //memory + memory_assigned_ += container->require->MemoryNeed(); + } else { + cpu_deep_assigned_ += container->require->CpuNeed(); + memory_deep_assigned_ += container->require->MemoryNeed(); + } int64_t size_ramdisk = 0; std::vector volums_no_ramdisk; BOOST_FOREACH(const proto::VolumRequired& v, container->require->volums) { @@ -359,12 +414,20 @@ void Agent::Evict(Container::Ptr container) { LOG(WARNING) << "invalid evict, no such container:" << container->id; return; } - //cpu - cpu_assigned_ -= container->require->CpuNeed(); - assert(cpu_assigned_ >= 0); - //memory - memory_assigned_ -= container->require->MemoryNeed(); - assert(memory_assigned_ >= 0); + if (container->priority != proto::kJobBestEffort) { + //cpu + cpu_assigned_ -= container->require->CpuNeed(); + assert(cpu_assigned_ >= 0); + //memory + memory_assigned_ -= container->require->MemoryNeed(); + assert(memory_assigned_ >= 0); + } else { + cpu_deep_assigned_ -= container->require->CpuNeed(); + memory_deep_assigned_ -= container->require->MemoryNeed(); + if (container->require->TmpfsNeed()) { + memory_assigned_ -= container->require->TmpfsNeed(); + } + } int64_t size_ramdisk = 0; std::vector volums_no_ramdisk; BOOST_FOREACH(const proto::VolumRequired& v, container->require->volums) { @@ -515,9 +578,15 @@ void Scheduler::SetRequirement(Requirement::Ptr require, void Scheduler::AddAgent(Agent::Ptr agent, const proto::AgentInfo& agent_info) { MutexLock locker(&mu_); - + int64_t cpu_assigned = 0; + int64_t cpu_reserved = 0; + int64_t cpu_deep_assigned = 0; + int64_t cpu_deep_reserved = 0; int64_t memory_assigned = 0; + int64_t memory_reserved = 0; + int64_t memory_deep_assigned = 0; + int64_t memory_deep_reserved = 0; std::map volum_assigned; std::set port_assigned; std::map containers; @@ -549,13 +618,13 @@ void Scheduler::AddAgent(Agent::Ptr agent, const proto::AgentInfo& agent_info) { continue; } } - + container->allocated_ports.clear(); container->allocated_volums.clear(); container->allocated_volum_containers.clear(); - Requirement::Ptr require(new Requirement()); - const proto::ContainerDescription& container_desc = container_info.container_desc(); + Requirement::Ptr require(new Requirement()); + const proto::ContainerDescription& container_desc = container_info.container_desc(); SetRequirement(require, container_desc); if (container_group->require->version == require->version) { require = container_group->require; @@ -565,8 +634,25 @@ void Scheduler::AddAgent(Agent::Ptr agent, const proto::AgentInfo& agent_info) { container->priority = container_desc.priority(); container->status = container_info.status(); container->require = require; - cpu_assigned += require->CpuNeed(); - memory_assigned += require->MemoryNeed(); + if (container->priority != proto::kJobBestEffort) { + cpu_assigned += require->CpuNeed(); + cpu_reserved += std::min( + static_cast(container_info.cpu_used() * FLAGS_reserved_percent), + require->CpuNeed()); + memory_assigned += require->MemoryNeed(); + memory_reserved += std::min( + static_cast(container_info.memory_used() * FLAGS_reserved_percent), + require->MemoryNeed()); + } else { + cpu_deep_assigned += require->CpuNeed(); + cpu_deep_reserved += std::min( + static_cast(container_info.cpu_used() * FLAGS_reserved_percent), + require->CpuNeed()); + memory_deep_assigned += require->MemoryNeed(); + memory_deep_reserved += std::min( + static_cast(container_info.memory_used() * FLAGS_reserved_percent), + require->MemoryNeed()); + } for (int j = 0; j < container_desc.cgroups_size(); j++) { const proto::Cgroup& cgroup = container_desc.cgroups(j); for (int k = 0; k < cgroup.ports_size(); k++) { @@ -598,6 +684,7 @@ void Scheduler::AddAgent(Agent::Ptr agent, const proto::AgentInfo& agent_info) { data_volum.size = container_desc.data_volums(j).size(); if (data_volum.medium == proto::kTmpfs) { memory_assigned += data_volum.size; + memory_reserved += data_volum.size; continue; } data_volum.exclusive = container_desc.data_volums(j).exclusive(); @@ -621,8 +708,14 @@ void Scheduler::AddAgent(Agent::Ptr agent, const proto::AgentInfo& agent_info) { container->allocated_agent = agent->endpoint_; ChangeStatus(container, container->status); } - agent->SetAssignment(cpu_assigned, memory_assigned, volum_assigned, - port_assigned, containers); + agent->SetAssignment( + cpu_assigned, cpu_deep_assigned, + memory_assigned, memory_deep_assigned, + volum_assigned, + port_assigned, + containers); + agent->SetReserved(cpu_reserved, cpu_deep_reserved, + memory_reserved, memory_deep_reserved); agents_[agent->endpoint_] = agent; } @@ -1072,10 +1165,10 @@ void Scheduler::ScheduleNextAgent(AgentEndpoint pre_endpoint) { if (agents_.empty()) { VLOG(16) << "no alive agents for scheduler."; } - sched_pool_.DelayTask(FLAGS_sched_interval, + sched_pool_.DelayTask(FLAGS_sched_interval, boost::bind(&Scheduler::ScheduleNextAgent, this, pre_endpoint)); return; - } + } std::map::iterator it; it = agents_.upper_bound(pre_endpoint); if (it != agents_.end()) { @@ -1099,7 +1192,7 @@ void Scheduler::ScheduleNextAgent(AgentEndpoint pre_endpoint) { continue; // no pending pods } ContainerId last_id = container_group->last_sched_container_id; - ContainerMap::iterator container_it = + ContainerMap::iterator container_it = container_group->states[kContainerPending].upper_bound(last_id); if (container_it == container_group->states[kContainerPending].end()) { container_it = container_group->states[kContainerPending].begin(); @@ -1114,16 +1207,16 @@ void Scheduler::ScheduleNextAgent(AgentEndpoint pre_endpoint) { || container->last_res_err == proto::kTooManyPods) { container->last_res_err = res_err; } - VLOG(10) << "try put fail: " << container->id + VLOG(10) << "try put fail: " << container->id << " agent:" << endpoint - << ", err:" << proto::ResourceError_Name(res_err); + << ", err:" << proto::ResourceError_Name(res_err); continue; //no feasiable } agent->Put(container); ChangeStatus(container, kContainerAllocating); } //scheduling round for the next agent - sched_pool_.DelayTask(FLAGS_sched_interval, + sched_pool_.DelayTask(FLAGS_sched_interval, boost::bind(&Scheduler::ScheduleNextAgent, this, endpoint)); } @@ -1228,7 +1321,7 @@ bool Scheduler::Update(const ContainerGroupId& container_group_id, } void Scheduler::MakeCommand(const std::string& agent_endpoint, - const proto::AgentInfo& agent_info, + const proto::AgentInfo& agent_info, std::vector& commands) { MutexLock locker(&mu_); if (stop_) { @@ -1250,6 +1343,11 @@ void Scheduler::MakeCommand(const std::string& agent_endpoint, return; } Agent::Ptr agent = it->second; + + int64_t cpu_reserved = 0; + int64_t cpu_deep_reserved = 0; + int64_t memory_reserved = 0; + int64_t memory_deep_reserved = 0; ContainerMap containers_local = agent->containers_; std::map remote_status; for (int i = 0; i < agent_info.container_info_size(); i++) { @@ -1264,6 +1362,26 @@ void Scheduler::MakeCommand(const std::string& agent_endpoint, commands.push_back(cmd); continue; } + + // get reserved + if (it_local->second->priority != proto::kJobBestEffort) { + cpu_reserved += std::min( + static_cast(container_remote.cpu_used() * FLAGS_reserved_percent), + it_local->second->require->CpuNeed()); + memory_reserved += it_local->second->require->TmpfsNeed(); + memory_reserved += std::min( + static_cast(container_remote.memory_used() * FLAGS_reserved_percent), + it_local->second->require->MemoryNeed()); + } else { + cpu_deep_reserved += std::min( + static_cast(container_remote.cpu_used() * FLAGS_reserved_percent), + it_local->second->require->CpuNeed()); + memory_reserved += it_local->second->require->TmpfsNeed(); + memory_deep_reserved += std::min( + static_cast(container_remote.memory_used() * FLAGS_reserved_percent), + it_local->second->require->MemoryNeed()); + } + const std::string& local_version = it_local->second->require->version; const std::string& remote_version = container_remote.container_desc().version(); if (local_version != remote_version) { @@ -1282,6 +1400,11 @@ void Scheduler::MakeCommand(const std::string& agent_endpoint, container_local->remote_info.mutable_volum_used()->CopyFrom(container_remote.volum_used()); container_local->remote_info.mutable_port_used()->CopyFrom(container_remote.port_used()); } + + // set resource reserved + agent->SetReserved(cpu_reserved, cpu_deep_reserved, + memory_reserved, memory_deep_reserved); + BOOST_FOREACH(ContainerMap::value_type& pair, containers_local) { Container::Ptr container_local = pair.second; AgentCommand cmd; @@ -1645,8 +1768,10 @@ void Scheduler::ShowUserAlloc(const std::string& user_name, proto::Quota& alloc) } int64_t replica = container_group->Replica(); replica_alloc += replica; - cpu_alloc += container_group->require->CpuNeed() * replica; - memory_alloc += container_group->require->MemoryNeed() * replica; + if (container_group->priority != proto::kJobBestEffort) { + cpu_alloc += container_group->require->CpuNeed() * replica; + memory_alloc += container_group->require->MemoryNeed() * replica; + } memory_alloc += container_group->require->TmpfsNeed() * replica; disk_alloc += container_group->require->DiskNeed() * replica; ssd_alloc += container_group->require->SsdNeed() * replica; @@ -1676,8 +1801,12 @@ void Scheduler::MetaToQuota(const proto::ContainerGroupMeta& meta, proto::Quota& SetRequirement(require, meta.desc()); int64_t replica = meta.replica(); quota.set_replica(replica); - quota.set_millicore(require->CpuNeed() * replica); - quota.set_memory( (require->MemoryNeed() + require->TmpfsNeed()) * replica); + if (meta.desc().priority() != proto::kJobBestEffort) { + quota.set_millicore(require->CpuNeed() * replica); + quota.set_memory((require->MemoryNeed() + require->TmpfsNeed()) * replica); + } else { + quota.set_memory(require->TmpfsNeed() * replica); + } quota.set_disk(require->DiskNeed() * replica); quota.set_ssd(require->SsdNeed() * replica); } diff --git a/src/resman/scheduler.h b/src/resman/scheduler.h index 463bfe78..df1296d3 100755 --- a/src/resman/scheduler.h +++ b/src/resman/scheduler.h @@ -98,7 +98,7 @@ struct Requirement { total += volums[i].size(); } } - return total; + return total; } typedef boost::shared_ptr Ptr; }; @@ -174,10 +174,16 @@ class Agent { const std::set& tags, const std::string& pool_name); void SetAssignment(int64_t cpu_assigned, + int64_t cpu_deep_assigned, int64_t memory_assigned, + int64_t memory_deep_assigned, const std::map& volum_assigned, const std::set port_assigned, const std::map& containers); + void SetReserved(int64_t cpu_reserved, + int64_t cpu_deep_reserved, + int64_t memory_reserved, + int64_t memory_deep_reserved); bool TryPut(const Container* container, ResourceError& err); void Put(Container::Ptr container); void Evict(Container::Ptr container); @@ -199,8 +205,14 @@ class Agent { std::string pool_name_; int64_t cpu_total_; int64_t cpu_assigned_; + int64_t cpu_reserved_; + int64_t cpu_deep_assigned_; + int64_t cpu_deep_reserved_; int64_t memory_total_; int64_t memory_assigned_; + int64_t memory_reserved_; + int64_t memory_deep_assigned_; + int64_t memory_deep_reserved_; std::map volum_total_; std::map volum_assigned_; std::set port_assigned_;