Skip to content
This repository has been archived by the owner on Aug 26, 2022. It is now read-only.

Commit

Permalink
update load, consul, checkeups and add logger
Browse files Browse the repository at this point in the history
  • Loading branch information
huangnauh committed May 9, 2017
1 parent 7342dad commit 9d8d9e4
Show file tree
Hide file tree
Showing 21 changed files with 1,128 additions and 281 deletions.
40 changes: 40 additions & 0 deletions nginx/app/etc/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,45 @@ _M.consul = {
},
}

_M.load_init = {
-- load_init module name for lua-resty-load
module_name = "resty.consul.load"
}


_M.logger = {

timeout = 2,

-- enable checkups heartbeat.
enable = true,

-- node info in the log message
node_type = "slardar_access",
node_host = "127.0.0.1",

config = {

-- config for lua-resty-logger-socket
flush_limit = 4096,
drop_limit = 1024 * 1024, -- 1MB
pool_size = 10,
retry_interval = 100,
max_retry_times = 3,

-- upstream name for lua-resty-checkups
ups_name = "logger",
},

cluster = {
{
servers = {
-- change these to your own log server addresses
{ host = "127.0.0.1", port = 3100 },
},
},
},
}


return _M
58 changes: 43 additions & 15 deletions nginx/app/lib/resty/checkups/api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

local cjson = require "cjson.safe"

local consistent_hash = require "resty.checkups.consistent_hash"
local round_robin = require "resty.checkups.round_robin"
local heartbeat = require "resty.checkups.heartbeat"
local dyconfig = require "resty.checkups.dyconfig"
local base = require "resty.checkups.base"
Expand All @@ -22,6 +20,10 @@ local WARN = ngx.WARN
local INFO = ngx.INFO
local worker_id = ngx.worker.id
local get_phase = ngx.get_phase
local type = type
local next = next
local pairs = pairs
local ipairs = ipairs


local _M = {
Expand Down Expand Up @@ -94,7 +96,7 @@ function _M.prepare_checker(config)
if phase == "init" or
phase == "init_worker" and worker_id() == 0 then
local key = dyconfig._gen_shd_key(skey)
shd_config:set(key, cjson.encode(base.upstream.checkups[skey].cluster))
shd_config:set(key, cjson.encode(base.upstream.checkups[skey]))
end
skeys[skey] = 1
else
Expand Down Expand Up @@ -209,39 +211,65 @@ function _M.select_peer(skey)
end


function _M.update_upstream(skey, upstream)
if not upstream or not next(upstream) then
return false, "can not set empty upstream"
local function gen_upstream(skey, upstream)
local ups = upstream
if upstream.cluster then
-- all upstream
if type(upstream.cluster) ~= "table" then
return nil, "cluster invalid"
end
else
-- only servers
local dyupstream, err = dyconfig.do_get_upstream(skey)
if err then
return nil, err
end

dyupstream = dyupstream or {}
dyupstream.cluster = upstream
ups = dyupstream
end

local ok, err
for level, cls in pairs(upstream) do
-- check servers
local ok
for level, cls in pairs(ups.cluster) do
if not cls or not next(cls) then
return false, "can not update empty level"
return nil, "can not update empty level"
end

local servers = cls.servers
if not servers or not next(servers) then
return false, "can not update empty servers"
return nil, "can not update empty servers"
end

for _, srv in ipairs(servers) do
ok, err = dyconfig.check_update_server_args(skey, level, srv)
local ok, err = dyconfig.check_update_server_args(skey, level, srv)
if not ok then
return false, err
return nil, err
end
end
end

local lock
return ups
end


lock, err = base.get_lock(base.SKEYS_KEY)
function _M.update_upstream(skey, upstream)
if not upstream or not next(upstream) then
return false, "can not set empty upstream"
end

local lock, err = base.get_lock(base.SKEYS_KEY)
if not lock then
log(WARN, "failed to acquire the lock: ", err)
return false, err
end

ok, err = dyconfig.do_update_upstream(skey, upstream)
local ups, err = gen_upstream(skey, upstream)
local ok = false
if not err then
ok, err = dyconfig.do_update_upstream(skey, ups)
end

base.release_lock(lock)

Expand Down
6 changes: 5 additions & 1 deletion nginx/app/lib/resty/checkups/base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ local tab_sort = table.sort
local tab_concat = table.concat
local tab_insert = table.insert
local unpack = unpack
local tostring = tostring
local ipairs = ipairs
local pairs = pairs
local type = type

local log = ngx.log
local ERR = ngx.ERR
Expand Down Expand Up @@ -159,7 +163,7 @@ function _M.check_res(res, check_opts)

if typ == "http" and type(res) == "table"
and res.status then
local status = tonumber(res.status)
local status = tostring(res.status)
local http_opts = check_opts.http_opts
if http_opts and http_opts.statuses and
http_opts.statuses[status] == false then
Expand Down
34 changes: 28 additions & 6 deletions nginx/app/lib/resty/checkups/dyconfig.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ local WARN = ngx.WARN
local INFO = ngx.INFO

local str_format = string.format
local type = type

local _M = {
_VERSION = "0.11",
Expand Down Expand Up @@ -81,12 +82,7 @@ local function shd_config_syncer(premature)
log(INFO, "get ", skey, " from shm: ", shd_servers)
if shd_servers then
shd_servers = cjson.decode(shd_servers)
-- add new skey
if not base.upstream.checkups[skey] then
base.upstream.checkups[skey] = {}
end

base.upstream.checkups[skey].cluster = base.table_dup(shd_servers)
base.upstream.checkups[skey] = base.table_dup(shd_servers)
elseif err then
success = false
log(WARN, "failed to get from shm: ", err)
Expand Down Expand Up @@ -141,6 +137,32 @@ function _M.check_update_server_args(skey, level, server)
end


function _M.do_get_upstream(skey)
local skeys = shd_config:get(base.SKEYS_KEY)
if not skeys then
return nil, "no skeys found from shm"
end

local key = _gen_shd_key(skey)
local shd_servers, err = shd_config:get(key)
if shd_servers then
shd_servers = cjson.decode(shd_servers)
if type(shd_servers) ~= "table" then
return nil
end

return shd_servers
elseif err then
log(WARN, "failed to get from shm: ", err)
return nil, err
else
log(WARN, "upstream " .. skey .. " not found")
return nil
end
end



function _M.do_update_upstream(skey, upstream)
local skeys = shd_config:get(base.SKEYS_KEY)
if not skeys then
Expand Down
4 changes: 2 additions & 2 deletions nginx/app/lib/resty/checkups/heartbeat.lua
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ local heartbeat = {
return _M.STATUS_ERR, err
end

local status = tonumber(str_sub(status_line, from, to))
local status = str_sub(status_line, from, to)
if statuses[status] == false then
return _M.STATUS_ERR, "bad status code"
end
Expand Down Expand Up @@ -338,7 +338,7 @@ local function cluster_heartbeat(skey)
ups_protected = false
end

ups.timeout = ups.timeout or 60
ups.timeout = ups.timeout or 5

local server_count = 0
for level, cls in pairs(ups.cluster) do
Expand Down
3 changes: 2 additions & 1 deletion nginx/app/lib/resty/checkups/try.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ local max = math.max
local sqrt = math.sqrt
local floor = math.floor
local tab_insert = table.insert
local tostring = tostring

local update_time = ngx.update_time
local now = ngx.now
Expand Down Expand Up @@ -104,7 +105,7 @@ local function prepare_callbacks(skey, opts)
local try_limit = opts.try or ups.try or srvs_cnt
local retry_cb = function(res)
if is_tab(res) and res.status and is_tab(statuses) then
if statuses[res.status] ~= false then
if statuses[tostring(res.status)] ~= false then
return REQUEST_SUCCESS
end
elseif res then
Expand Down
83 changes: 83 additions & 0 deletions nginx/app/lib/resty/consul/api.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
-- Copyright (C) 2016 Libo Huang (huangnauh), UPYUN Inc.
local http = require "socket.http"
local httpipe = require "resty.httpipe"
local checkups = require "resty.checkups"

local pairs = pairs
local type = type
local str_format = string.format

local _M = {}


function _M.get_kv_blocking(cluster, key, opts)
local opts = opts or {}
-- try all the consul servers
for _, cls in pairs(cluster) do
for _, srv in pairs(cls.servers) do
local url = str_format("http://%s:%s/v1/kv/%s", srv.host, srv.port, key)
local body, code = http.request(url)
if code == 404 then
return opts.default
elseif code == 200 and body then
local decode = opts.decode
if type(decode) == "function" then
return decode(body)
else
return body
end
else
ngx.log(ngx.ERR, str_format("get config from %s failed", url))
end
end
end

return nil, "get config failed"
end


function _M.get_kv(key, opts)
local opts = opts or {}
local hp, err = httpipe:new()
if not hp then
ngx.log(ngx.ERR, "failed to new httpipe: ", err)
return
end

hp:set_timeout(5 * 1000)
local req = {
method = "GET",
path = "/v1/kv/" .. key,
}

local callback = function(host, port)
return hp:request(host, port, req)
end

local res, err = checkups.ready_ok("consul", callback)
if not res then
ngx.log(ngx.ERR, "failed to get config from consul, err:", err)
hp:close()
return
end

if res.status == 404 then
return opts.default
elseif res.status ~= 200 then
ngx.log(ngx.ERR, "failed to get config from consul: ", res.status)
hp:close()
return
end

hp:set_keepalive()
local body = res.body

if type(opts.decode) == "function" then
return opts.decode(body)
else
return body
end
end


return _M
Loading

0 comments on commit 9d8d9e4

Please sign in to comment.