Skip to content

Commit

Permalink
add information_schema.backend_kerberos_ticket_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jan 31, 2025
1 parent fbecf5f commit 678ba24
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 37 deletions.
15 changes: 10 additions & 5 deletions be/src/common/kerberos/kerberos_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ namespace doris::kerberos {
KerberosConfig::KerberosConfig()
: _refresh_interval_second(3600), _min_time_before_refresh_second(600) {}

std::string KerberosConfig::get_hash_code() {
// use md5(principal + keytab_path) as hash code
// so that same (principal + keytab_path) will have same name.
std::string combined = _principal + _keytab_path;
std::string KerberosConfig::get_hash_code(const std::string& principal, const std::string& keytab) {
return _get_hash_code(principal, keytab);
}

std::string KerberosConfig::_get_hash_code(const std::string& principal,
const std::string& keytab) {
// use md5(principal + keytab) as hash code
// so that same (principal + keytab) will have same name.
std::string combined = principal + keytab;
Md5Digest digest;
digest.update(combined.c_str(), combined.length());
digest.digest();
return digest.hext();
return digest.hex();
}

} // namespace doris::kerberos
7 changes: 6 additions & 1 deletion be/src/common/kerberos/kerberos_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ class KerberosConfig {
// Get the minimum time before refresh in seconds
int32_t get_min_time_before_refresh_second() const { return _min_time_before_refresh_second; }

std::string get_hash_code() const { return _get_hash_code(_principal, _keytab_path); }

// Use principal and keytab to generate a hash code.
std::string get_hash_code() const;
static std::string get_hash_code(const std::string& principal, const std::string& keytab);

private:
static std::string _get_hash_code(const std::string& principal, const std::string& keytab);

private:
// Kerberos principal name (e.g., "[email protected]")
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/kerberos/kerberos_ticket_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ std::vector<KerberosTicketInfo> KerberosTicketCache::get_ticket_info() {
if (_krb5_interface->unparse_name(_context, creds.server, &service_name) == Status::OK()) {
info.service_principal = service_name;
_krb5_interface->free_unparsed_name(_context, service_name);
} else {
info.service_principal = "Unparse Error";
}
info.cache_path = _ticket_cache_path;
info.refresh_interval_second = _config.get_refresh_interval_second();
Expand Down
1 change: 1 addition & 0 deletions be/src/common/kerberos/kerberos_ticket_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct KerberosTicketInfo {
std::string keytab_path; // Path to keytab file
std::string service_principal; // Service principal this credential is for
std::string cache_path; // Path of ticket cache file
std::string hash_code; // the hash code from config
int64_t start_time; // Unix timestamp in seconds
int64_t expiry_time; // Unix timestamp in seconds
int64_t auth_time; // Unix timestamp in seconds
Expand Down
43 changes: 26 additions & 17 deletions be/src/common/kerberos/kerberos_ticket_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <sstream>

#include "common/logging.h"
#include "exec/schema_scanner/schema_scanner_helper.h"
#include "service/backend_options.h"
#include "vec/core/block.h"

namespace doris::kerberos {

Expand Down Expand Up @@ -125,7 +128,7 @@ Status KerberosTicketMgr::get_or_set_ticket_cache(

std::shared_ptr<KerberosTicketCache> KerberosTicketMgr::get_ticket_cache(
const std::string& principal, const std::string& keytab_path) {
std::string key = _generate_key(principal, keytab_path);
std::string key = KerberosConfig::get_hash_code(principal, keytab_path);

std::lock_guard<std::mutex> lock(_mutex);
auto it = _ticket_caches.find(key);
Expand All @@ -140,22 +143,6 @@ std::shared_ptr<KerberosTicketCache> KerberosTicketMgr::_make_new_ticket_cache(
return std::make_shared<KerberosTicketCache>(config, _root_path);
}

Status KerberosTicketMgr::get_cache_file_path(const std::string& principal,
const std::string& keytab_path,
std::string* cache_path) {
std::string key = _generate_key(principal, keytab_path);

std::lock_guard<std::mutex> lock(_mutex);

auto it = _ticket_caches.find(key);
if (it == _ticket_caches.end()) {
return Status::NotFound("Kerberos ticket cache not found for principal: " + principal);
}

*cache_path = it->second.cache->get_ticket_cache_path();
return Status::OK();
}

std::vector<KerberosTicketInfo> KerberosTicketMgr::get_krb_ticket_cache_info() {
std::vector<KerberosTicketInfo> result;
std::lock_guard<std::mutex> lock(_mutex);
Expand All @@ -168,4 +155,26 @@ std::vector<KerberosTicketInfo> KerberosTicketMgr::get_krb_ticket_cache_info() {
return result;
}

void KerberosTicketMgr::get_ticket_cache_info_block(vectorized::Block* block,
const cctz::time_zone& ctz) {
TBackend be = BackendOptions::get_local_backend();
int64_t be_id = be.id;
std::string be_ip = be.host;
std::vector<KerberosTicketInfo> infos = get_krb_ticket_cache_info();
for (auto& info : infos) {
SchemaScannerHelper::insert_int64_value(0, be_id, block);
SchemaScannerHelper::insert_string_value(1, be_ip, block);
SchemaScannerHelper::insert_string_value(2, info.principal, block);
SchemaScannerHelper::insert_string_value(3, info.keytab_path, block);
SchemaScannerHelper::insert_string_value(4, info.service_principal, block);
SchemaScannerHelper::insert_string_value(5, info.cache_path, block);
SchemaScannerHelper::insert_string_value(6, info.hash_code, block);
SchemaScannerHelper::insert_datetime_value(7, info.start_time, ctz, block);
SchemaScannerHelper::insert_datetime_value(8, info.expiry_time, ctz, block);
SchemaScannerHelper::insert_datetime_value(9, info.auth_time, ctz, block);
SchemaScannerHelper::insert_int64_value(10, info.use_count, block);
SchemaScannerHelper::insert_int64_value(11, info.refresh_interval_second, block);
}
}

} // namespace doris::kerberos
20 changes: 12 additions & 8 deletions be/src/common/kerberos/kerberos_ticket_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@
#include <unordered_map>
#include <vector>

#include "cctz/time_zone.h"
#include "common/kerberos/kerberos_config.h"
#include "common/kerberos/kerberos_ticket_cache.h"
#include "common/status.h"

namespace doris::kerberos {
namespace doris {

namespace vectorized {
class Block;
}

namespace kerberos {

// Structure to hold a ticket cache instance and its last access time
struct KerberosTicketEntry {
Expand All @@ -51,12 +58,6 @@ class KerberosTicketMgr {
Status get_or_set_ticket_cache(const KerberosConfig& config,
std::shared_ptr<KerberosTicketCache>* ticket_cache);

// Retrieve the cache file path for a given principal and keytab combination
// Logic: Generates key from principal and keytab, looks up in cache map,
// updates last access time if found
Status get_cache_file_path(const std::string& principal, const std::string& keytab_path,
std::string* cache_path);

Status remove_ticket_cache(const std::string& principal, const std::string& keytab_path);

// Get the ticket cache object. This is used by HdfsHandler to hold a reference
Expand All @@ -69,6 +70,8 @@ class KerberosTicketMgr {
// Set the cleanup interval for testing purpose
void set_cleanup_interval(std::chrono::seconds interval) { _cleanup_interval = interval; }

void get_ticket_cache_info_block(vectorized::Block* block, const cctz::time_zone& ctz);

virtual ~KerberosTicketMgr();

protected:
Expand Down Expand Up @@ -102,4 +105,5 @@ class KerberosTicketMgr {
std::chrono::seconds _cleanup_interval {3600}; // Default to 1 hour
};

} // namespace doris::kerberos
} // namespace kerberos
} // namespace doris
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "exec/schema_scanner/schema_active_queries_scanner.h"
#include "exec/schema_scanner/schema_backend_active_tasks.h"
#include "exec/schema_scanner/schema_backend_kerberos_ticket_cache.h"
#include "exec/schema_scanner/schema_catalog_meta_cache_stats_scanner.h"
#include "exec/schema_scanner/schema_charsets_scanner.h"
#include "exec/schema_scanner/schema_collations_scanner.h"
Expand Down Expand Up @@ -225,6 +226,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return SchemaFileCacheStatisticsScanner::create_unique();
case TSchemaTableType::SCH_CATALOG_META_CACHE_STATISTICS:
return SchemaCatalogMetaCacheStatsScanner::create_unique();
case TSchemaTableType::SCH_BACKEND_KERBEROS_TICKET_CACHE:
return SchemaBackendKerberosTicketCacheScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/schema_scanner/schema_backend_kerberos_ticket_cache.h"

#include "common/kerberos/kerberos_ticket_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaBackendKerberosTicketCacheScanner::_s_tbls_columns = {
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), true},
{"BE_IP", TYPE_STRING, sizeof(StringRef), true},
{"PRINCIPAL", TYPE_STRING, sizeof(StringRef), true},
{"KEYTAB", TYPE_STRING, sizeof(StringRef), true},
{"SERVICE_PRINCIPAL", TYPE_STRING, sizeof(StringRef), true},
{"TICKET_CACHE_PATH", TYPE_STRING, sizeof(StringRef), true},
{"HASH_CODE", TYPE_STRING, sizeof(StringRef), true},
{"START_TIME", TYPE_DATETIME, sizeof(int128_t), true},
{"EXPIRE_TIME", TYPE_DATETIME, sizeof(int128_t), true},
{"AUTH_TIME", TYPE_DATETIME, sizeof(int128_t), true},
{"REF_COUNT", TYPE_BIGINT, sizeof(int64_t), true},
{"REFRESH_INTERVAL_SECOND", TYPE_BIGINT, sizeof(int64_t), true}};

SchemaBackendKerberosTicketCacheScanner::SchemaBackendKerberosTicketCacheScanner()
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_BACKEND_KERBEROS_TICKET_CACHE) {}

SchemaBackendKerberosTicketCacheScanner::~SchemaBackendKerberosTicketCacheScanner() {}

Status SchemaBackendKerberosTicketCacheScanner::start(RuntimeState* state) {
_block_rows_limit = state->batch_size();
_timezone_obj = state->timezone_obj();
return Status::OK();
}

Status SchemaBackendKerberosTicketCacheScanner::get_next_block_internal(vectorized::Block* block,
bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}

if (nullptr == block || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}

if (_info_block == nullptr) {
_info_block = vectorized::Block::create_unique();

for (int i = 0; i < _s_tbls_columns.size(); ++i) {
TypeDescriptor descriptor(_s_tbls_columns[i].type);
auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
_info_block->insert(vectorized::ColumnWithTypeAndName(
data_type->create_column(), data_type, _s_tbls_columns[i].name));
}

_info_block->reserve(_block_rows_limit);

ExecEnv::GetInstance()->kerberos_ticket_mgr()->get_ticket_cache_info_block(
_info_block.get(), _timezone_obj);
_total_rows = (int)_info_block->rows();
}

if (_row_idx == _total_rows) {
*eos = true;
return Status::OK();
}

int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
RETURN_IF_ERROR(mblock.add_rows(_info_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;

*eos = _row_idx == _total_rows;
return Status::OK();
}

} // namespace doris
51 changes: 51 additions & 0 deletions be/src/exec/schema_scanner/schema_backend_kerberos_ticket_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <vector>

#include "cctz/time_zone.h"
#include "common/status.h"
#include "exec/schema_scanner.h"

namespace doris {
class RuntimeState;
namespace vectorized {
class Block;
} // namespace vectorized

class SchemaBackendKerberosTicketCacheScanner : public SchemaScanner {
ENABLE_FACTORY_CREATOR(SchemaBackendKerberosTicketCacheScanner);

public:
SchemaBackendKerberosTicketCacheScanner();
~SchemaBackendKerberosTicketCacheScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

private:
int _block_rows_limit = 4096;
int _row_idx = 0;
int _total_rows = 0;
std::unique_ptr<vectorized::Block> _info_block = nullptr;
cctz::time_zone _timezone_obj;
};
}; // namespace doris
19 changes: 19 additions & 0 deletions be/src/exec/schema_scanner/schema_scanner_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/schema_scanner/schema_scanner_helper.h"

#include "cctz/time_zone.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -49,6 +50,24 @@ void SchemaScannerHelper::insert_datetime_value(int col_index, const std::vector
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_datetime_value(int col_index, int64_t timestamp,
const cctz::time_zone& ctz,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column = assert_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();

std::vector<void*> datas(1);
VecDateTimeValue src[1];
src[0].from_unixtime(timestamp, ctz);
datas[0] = src;
auto data = datas[0];
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);
nullable_column->get_null_map_data().emplace_back(0);
}

void SchemaScannerHelper::insert_int64_value(int col_index, int64_t int_val,
vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/schema_scanner/schema_scanner_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <string>
#include <vector>

#include "cctz/time_zone.h"

// this is a util class which can be used by all shema scanner
// all common functions are added in this class.
namespace doris {
Expand All @@ -34,6 +36,8 @@ class SchemaScannerHelper {
static void insert_string_value(int col_index, std::string str_val, vectorized::Block* block);
static void insert_datetime_value(int col_index, const std::vector<void*>& datas,
vectorized::Block* block);
static void insert_datetime_value(int col_index, int64_t timestamp, const cctz::time_zone& ctz,
vectorized::Block* block);

static void insert_int64_value(int col_index, int64_t int_val, vectorized::Block* block);
static void insert_double_value(int col_index, double double_val, vectorized::Block* block);
Expand Down
Loading

0 comments on commit 678ba24

Please sign in to comment.