Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Begin working on native helm repository client #910

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions assets/src/generated/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,12 @@ export type ConsoleConfiguration = {
vpnEnabled?: Maybe<Scalars['Boolean']['output']>;
};

export enum ConstraintEnforcement {
Deny = 'DENY',
DryRun = 'DRY_RUN',
Warn = 'WARN'
}

export type ConstraintRef = {
__typename?: 'ConstraintRef';
kind: Scalars['String']['output'];
Expand Down Expand Up @@ -1832,6 +1838,7 @@ export type HelmConfigAttributes = {
/** pointer to a Plural GitRepository */
repositoryId?: InputMaybe<Scalars['ID']['input']>;
set?: InputMaybe<HelmValueAttributes>;
url?: InputMaybe<Scalars['String']['input']>;
values?: InputMaybe<Scalars['String']['input']>;
valuesFiles?: InputMaybe<Array<InputMaybe<Scalars['String']['input']>>>;
version?: InputMaybe<Scalars['String']['input']>;
Expand Down Expand Up @@ -1876,6 +1883,8 @@ export type HelmSpec = {
repositoryId?: Maybe<Scalars['ID']['output']>;
/** a list of helm name/value pairs to precisely set individual values */
set?: Maybe<Array<Maybe<HelmValue>>>;
/** the helm repository url to use */
url?: Maybe<Scalars['String']['output']>;
/** a helm values file to use with this service, requires auth and so is heavy to query */
values?: Maybe<Scalars['String']['output']>;
/** a list of relative paths to values files to use for helm applies */
Expand Down Expand Up @@ -3196,6 +3205,7 @@ export type PolicyConstraint = {
__typename?: 'PolicyConstraint';
cluster?: Maybe<Cluster>;
description?: Maybe<Scalars['String']['output']>;
enforcement?: Maybe<ConstraintEnforcement>;
id: Scalars['ID']['output'];
insertedAt?: Maybe<Scalars['DateTime']['output']>;
name: Scalars['String']['output'];
Expand All @@ -3212,6 +3222,7 @@ export type PolicyConstraint = {
/** inputs to add constraint data from an OPA gatekeeper constraint CRD */
export type PolicyConstraintAttributes = {
description?: InputMaybe<Scalars['String']['input']>;
enforcement?: InputMaybe<ConstraintEnforcement>;
name: Scalars['String']['input'];
recommendation?: InputMaybe<Scalars['String']['input']>;
/** pointer to the group/name for the CR */
Expand Down
2 changes: 2 additions & 0 deletions lib/console/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ defmodule Console.Application do
{Registry, [keys: :unique, name: Console.Deployments.Git.Agent.registry()]},
{Registry, [keys: :unique, name: Console.Deployments.Pipelines.Supervisor.registry()]},
{Registry, [keys: :unique, name: Console.Deployments.Stacks.Worker.registry()]},
{Registry, [keys: :unique, name: Console.Deployments.Helm.Agent.registry()]},
{Cluster.Supervisor, [topologies, [name: Console.ClusterSupervisor]]},
Console.Deployments.Git.Supervisor,
Console.Deployments.Stacks.Supervisor,
Console.Deployments.Helm.Server,
Console.Deployments.Pipelines.Supervisor,
Console.Deployments.Helm.Supervisor,
Console.Deployments.Git.Kick,
Console.Deployments.Deprecations.Table,
Console.Deployments.Compatibilities.Table,
Expand Down
2 changes: 1 addition & 1 deletion lib/console/deployments/compatibilities/table.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Console.Deployments.Compatibilities.Table do
end

def handle_info(:poll, %State{table: table, static: true} = state) do
table = Enum.reduce(Static.compatibilities(), table, &KeyValueSet.put(&2, &1.name, &1))
table = Enum.reduce(Static.compatibilities(), table, &KeyValueSet.put!(&2, &1.name, &1))
{:noreply, %{state | table: table}}
end

Expand Down
16 changes: 15 additions & 1 deletion lib/console/deployments/git.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ defmodule Console.Deployments.Git do
ScmWebhook,
PrAutomation,
PullRequest,
DependencyManagementService
DependencyManagementService,
HelmRepository
}

@cache Console.conf(:cache_adapter)
@ttl :timer.minutes(30)

@type repository_resp :: {:ok, GitRepository.t} | Console.error
@type helm_resp :: {:ok, HelmRepository.t} | Console.error
@type connection_resp :: {:ok, ScmConnection.t} | Console.error
@type webhook_resp :: {:ok, ScmWebhook.t} | Console.error
@type automation_resp :: {:ok, PrAutomation.t} | Console.error
Expand All @@ -30,6 +32,8 @@ defmodule Console.Deployments.Git do

def get_repository!(id), do: Repo.get!(GitRepository, id)

def get_helm_repository(url), do: Repo.get_by(HelmRepository, url: url)

def get_by_url!(url), do: Repo.get_by!(GitRepository, url: url)

def get_by_url(url), do: Repo.get_by(GitRepository, url: url)
Expand Down Expand Up @@ -327,6 +331,16 @@ defmodule Console.Deployments.Git do
end
end

@spec upsert_helm_repository(binary) :: helm_resp
def upsert_helm_repository(url) do
case Console.Repo.get_by(HelmRepository, url: url) do
%HelmRepository{} = repo -> repo
nil -> %HelmRepository{url: url}
end
|> HelmRepository.changeset()
|> Console.Repo.insert_or_update()
end

@doc """
Sets up a service to run the renovate cron given an scm connection and target repositories
"""
Expand Down
86 changes: 86 additions & 0 deletions lib/console/deployments/helm/agent.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule Console.Deployments.Helm.Agent do
use GenServer, restart: :transient
alias Console.Deployments.Git
alias Console.Deployments.Helm.{AgentCache, Discovery}
alias Console.Schema.HelmRepository
require Logger

defmodule State, do: defstruct [:repo, :cache]

@poll :timer.minutes(2)
@jitter 15

def registry(), do: __MODULE__

def fetch(pid, chart, vsn), do: GenServer.call(pid, {:fetch, chart, vsn})

def start(url) do
GenServer.start(__MODULE__, url, name: via(url))
end

def start_link([url]), do: start_link(url)
def start_link(url) do
GenServer.start_link(__MODULE__, url, name: via(url))
end

defp via(url), do: {:via, Registry, {registry(), {:helm, url}}}

def init(url) do
{:ok, repo} = Git.upsert_helm_repository(url)
schedule_pull()
:timer.send_interval(@poll, :move)
send self(), :pull
{:ok, %State{repo: repo, cache: AgentCache.new(repo)}}
end

def handle_call({:fetch, c, v}, _, %State{cache: cache} = state) do
with {:ok, l, cache} <- AgentCache.fetch(cache, c, v),
{:ok, f} <- File.open(l.file) do
{:reply, {:ok, f, l.digest}, %{state | cache: cache}}
else
err -> {:reply, err, state}
end
end

def handle_info(:pull, %State{repo: repo, cache: cache} = state) do
with {:ok, repo} <- Git.upsert_helm_repository(repo.url),
{:ok, cache} <- AgentCache.refresh(cache),
{:ok, repo} <- refresh(repo) do
schedule_pull()
{:noreply, %{state | cache: cache, repo: repo}}
else
err ->
schedule_pull()
Logger.error "Failed to resync helm repo: #{inspect(err)}"
{:noreply, state}
end
end

def handle_info({:refresh, c, v}, %State{cache: cache} = state) do
case AgentCache.write(cache, c, v) do
{:ok, _, cache} -> {:noreply, %{state | cache: cache}}
_ -> {:noreply, state}
end
end

def handle_info(:move, %State{repo: repo} = state) do
case Discovery.local?(repo.url) do
true -> {:noreply, state}
false -> {:stop, {:shutdown, :moved}, state}
end
end

def handle_info(_, state), do: {:noreply, state}

defp refresh(%HelmRepository{} = repo) do
HelmRepository.changeset(repo, %{pulled_at: Timex.now(), health: :pullable})
|> Console.Repo.update()
end

defp schedule_pull(), do: Process.send_after(self(), :pull, @poll + jitter())

defp jitter() do
:rand.uniform(@jitter)
|> :timer.seconds()
end
end
67 changes: 67 additions & 0 deletions lib/console/deployments/helm/agent_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Console.Deployments.Helm.AgentCache do
alias Console.Helm.Client
alias Console.Deployments.Helm.Utils
require Logger

defstruct [:repo, :index, :dir, cache: %{}]

defmodule Line do
@expiry [minutes: -10]
defstruct [:file, :chart, :vsn, :digest, :touched]

def new(file, chart, vsn, digest) do
%__MODULE__{file: file, chart: chart, vsn: vsn, digest: digest, touched: Timex.now()}
end

def touch(%__MODULE__{} = mod), do: %{mod | touched: Timex.now()}

def expire(%__MODULE__{file: f}), do: File.rm(f)

def expired?(%__MODULE__{touched: touched}) do
Timex.now()
|> Timex.shift(@expiry)
|> Timex.after?(touched)
end
end

def new(repo) do
{:ok, dir} = Briefly.create(directory: true)
%__MODULE__{repo: repo, dir: dir, cache: %{}}
end

def refresh(%__MODULE__{} = cache) do
case Client.index(cache.repo.url) do
{:ok, indx} -> {:ok, sweep(%{cache | index: indx})}
_ -> {:error, "could not fetch index"}
end
end

def fetch(%__MODULE__{index: nil} = cache, chart, vsn) do
with {:ok, cache} <- refresh(cache),
do: fetch(cache, chart, vsn)
end

def fetch(%__MODULE__{cache: lines} = cache, chart, vsn) do
case lines[{chart, vsn}] do
%Line{} = l -> {:ok, l, put_in(cache.cache[{chart, vsn}], Line.touch(l))}
nil -> write(cache, chart, vsn)
end
end

def write(%__MODULE__{} = cache, chart, vsn) do
path = Path.join(cache.dir, "#{chart}.#{vsn}.tgz")
with {:ok, url, digest} <- Client.chart(cache.index, chart, vsn),
{:ok, tmp} <- Briefly.create(),
{:ok, _} <- Client.download(url, File.stream!(tmp)),
:ok <- Utils.clean_chart(tmp, path, chart),
line <- Line.new(path, chart, vsn, digest),
do: {:ok, line, put_in(cache.cache[{chart, vsn}], line)}
end

defp sweep(%__MODULE__{cache: lines} = cache) do
{keep, expire} = Enum.split_with(lines, fn {_, l} -> !Line.expired?(l) end)
Enum.each(expire, &Line.expire/1)
Enum.each(keep, fn l -> send(self(), {:refresh, l.chart, l.vsn}) end)
%{cache | cache: Map.new(keep)}
end
end
17 changes: 9 additions & 8 deletions lib/console/deployments/helm/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Console.Deployments.Helm.Cache do
require Logger
alias Kube.HelmChart
alias Kube.HelmChart.Status
alias Console.Deployments.Tar
alias Console.Deployments.{Helm.Utils, Tar}

defstruct [:dir, :touched]

Expand Down Expand Up @@ -48,12 +48,17 @@ defmodule Console.Deployments.Helm.Cache do
end

defp build_tarball(url, cache, path, chart) do
with {:ok, f} <- Tar.from_url(url),
{:ok, contents} <- Tar.tar_stream(f),
:ok <- Tar.tarball(path, remove_prefix(contents, chart)),
with {:ok, path} <- download_to(url, path, chart),
do: open(cache, path)
end

def download_to(url, path, chart) do
with {:ok, tmp} <- Tar.from_url(url),
:ok <- File.open!(tmp) |> Utils.clean_chart(path, chart),
:ok <- File.rm(tmp),
do: {:ok, path}
end

def refresh(%__MODULE__{touched: touched} = cache) do
Logger.info "expiring helm chart cache..."
expires = Timex.now() |> Timex.shift(minutes: -30)
Expand Down Expand Up @@ -81,8 +86,4 @@ defmodule Console.Deployments.Helm.Cache do
end
end
defp reason(_), do: "downloading"

defp remove_prefix(contents, chart) do
Enum.map(contents, fn {path, content} -> {String.trim_leading(path, "#{chart}/"), content} end)
end
end
33 changes: 33 additions & 0 deletions lib/console/deployments/helm/discovery.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Console.Deployments.Helm.Discovery do
alias Console.Deployments.Helm.{Supervisor, Agent}

def agent(url) do
case maybe_rpc(url, Supervisor, :start_child, [url]) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
err -> err
end
end

def fetch(url, chart, vsn) do
with {:ok, pid} <- agent(url),
do: Agent.fetch(pid, chart, vsn)
end

defp maybe_rpc(id, module, func, args) do
me = node()
case worker_node(id) do
^me -> apply(module, func, args)
node -> :rpc.call(node, module, func, args)
end
end

def worker_node(url), do: HashRing.key_to_node(ring(), url)

def local?(url), do: worker_node(url) == node()

defp ring() do
HashRing.new()
|> HashRing.add_nodes([node() | Node.list()])
end
end
17 changes: 17 additions & 0 deletions lib/console/deployments/helm/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Console.Deployments.Helm.Supervisor do
use DynamicSupervisor
alias Console.Deployments.Helm.Agent

def start_link(init_arg \\ :ok) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def start_child(url) do
DynamicSupervisor.start_child(__MODULE__, {Agent, url})
end

@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
23 changes: 23 additions & 0 deletions lib/console/deployments/helm/utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Console.Deployments.Helm.Utils do
alias Console.Deployments.Tar

def clean_chart(path, to, chart) when is_binary(path) do
file = File.open!(path)
try do
clean_chart(file, to, chart)
after
File.close(file)
end
end

def clean_chart(f, to, chart) do
with {:ok, contents} <- Tar.tar_stream(f),
do: Tar.tarball(to, remove_prefix(contents, chart))
end

defp remove_prefix(contents, chart) do
Enum.map(contents, fn {path, content} ->
{String.trim_leading(path, "#{chart}/"), content}
end)
end
end
11 changes: 10 additions & 1 deletion lib/console/deployments/services.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,16 @@ defmodule Console.Deployments.Services do
Git.get_repository!(id)
|> Git.Discovery.fetch(git)
end
defp tarfile(%Service{helm: %Service.Helm{chart: c, version: v}} = svc) when is_binary(c) and is_binary(v) do

defp tarfile(%Service{helm: %Service.Helm{chart: c, version: v, url: url}} = svc)
when is_binary(c) and is_binary(v) and is_binary(url) do
with {:ok, f, sha} <- Helm.Discovery.fetch(url, c, v),
{:ok, _} <- update_sha_without_revision(svc, sha),
do: {:ok, f}
end

defp tarfile(%Service{helm: %Service.Helm{chart: c, version: v}} = svc)
when is_binary(c) and is_binary(v) do
with {:ok, f, sha} <- Helm.Charts.artifact(svc),
{:ok, _} <- update_sha_without_revision(svc, sha),
do: {:ok, f}
Expand Down
Loading
Loading