diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index d9e47ad..dd7b960 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -113,11 +113,40 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } state.endpointsFound = clusterClient != nil && singleClients != nil + if clusterClient != nil { + state.endpoints = clusterClient.Endpoints() + } + + // fetch PVCs + state.pvcs, err = factory.PVCs(ctx, instance, r.Client) + if err != nil { + return ctrl.Result{}, err + } + if !state.endpointsFound { if !state.stsExists { - // TODO: happy path for new cluster creation - log.Debug(ctx, "happy path for new cluster creation (not yet implemented)") + return r.createClusterFromScratch(ctx, &state) // TODO: needs implementing + } + // else try reconciling the sts + existingSts := state.statefulSet.DeepCopy() + desiredSts := factory.TemplateStatefulSet() // TODO: needs implementing + existingSts.Spec.Template.Spec = desiredSts.Spec.Template.Spec + err := r.patchOrCreateObject(ctx, existingSts) + if err != nil { + return ctrl.Result{}, err + } + state.statefulSet = *existingSts + if existingSts.Status.ReadyReplicas != *existingSts.Spec.Replicas { // TODO: this check might not be the best to check for a ready sts + return ctrl.Result{}, fmt.Errorf("waiting for statefulset to become ready") + } + if *existingSts.Spec.Replicas > 0 { + return ctrl.Result{}, fmt.Errorf("reached an impossible state (no endpoints, but active pods)") + } + if *instance.Spec.Replicas == 0 { + // cluster successfully scaled down to zero + return ctrl.Result{}, nil } + return r.scaleUpFromZero(ctx, &state) // TODO: needs implementing } // get status of every endpoint and member list from every endpoint diff --git a/internal/controller/observables.go b/internal/controller/observables.go index 4d080f4..66e5cc5 100644 --- a/internal/controller/observables.go +++ b/internal/controller/observables.go @@ -2,12 +2,12 @@ package controller import ( "context" - // "strconv" - // "strings" + "strconv" + "strings" "sync" "github.com/aenix-io/etcd-operator/api/v1alpha1" - // "github.com/aenix-io/etcd-operator/pkg/set" + "github.com/aenix-io/etcd-operator/pkg/set" clientv3 "go.etcd.io/etcd/client/v3" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -49,15 +49,43 @@ func (o *observables) setClusterID() { // inSplitbrain compares clusterID field with clusterIDs in etcdStatuses. // If more than one unique ID is reported, cluster is in splitbrain. +// Also if members have different opinions on the list of members, this is +// also a splitbrain. func (o *observables) inSplitbrain() bool { + return o.clusterIDsAllEqual() && o.memberListsAllEqual() +} + +func (o *observables) clusterIDsAllEqual() bool { + ids := set.New[uint64]() for i := range o.etcdStatuses { if o.etcdStatuses[i].endpointStatus != nil { - if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId { - return true + ids.Add(o.etcdStatuses[i].endpointStatus.Header.ClusterId) + } + } + return len(ids) <= 1 +} + +func (o *observables) memberListsAllEqual() bool { + type m struct { + Name string + ID uint64 + } + memberLists := make([]set.Set[m], 0, len(o.etcdStatuses)) + for i := range o.etcdStatuses { + if o.etcdStatuses[i].memberList != nil { + memberSet := set.New[m]() + for _, member := range o.etcdStatuses[i].memberList.Members { + memberSet.Add(m{member.Name, member.ID}) } + memberLists = append(memberLists, memberSet) + } + } + for i := range memberLists { + if !memberLists[0].Equals(memberLists[i]) { + return false } } - return false + return true } // fill takes a single-endpoint client and populates the fields of etcdStatus @@ -73,15 +101,75 @@ func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) { wg.Wait() } -// TODO: make a real function +func (o *observables) pvcMaxIndex() (max int) { + max = -1 + for i := range o.pvcs { + tokens := strings.Split(o.pvcs[i].Name, "-") + index, err := strconv.Atoi(tokens[len(tokens)-1]) + if err != nil { + continue + } + if index > max { + max = index + } + } + return max +} + +func (o *observables) endpointMaxIndex() (max int) { + for i := range o.endpoints { + tokens := strings.Split(o.endpoints[i], ":") + if len(tokens) < 2 { + continue + } + tokens = strings.Split(tokens[len(tokens)-2], "-") + index, err := strconv.Atoi(tokens[len(tokens)-1]) + if err != nil { + continue + } + if index > max { + max = index + } + } + return max +} + +// TODO: make a real function to determine the right number of replicas. +// Hint: if ClientURL in the member list is absent, the member has not yet +// started, but if the name field is populated, this is a member of the +// initial cluster. If the name field is empty, this member has just been +// added with etcdctl member add (or equivalent API call). // nolint:unused -func (o *observables) desiredReplicas() int { +func (o *observables) desiredReplicas() (max int) { + max = -1 if o.etcdStatuses != nil { for i := range o.etcdStatuses { if o.etcdStatuses[i].memberList != nil { - return len(o.etcdStatuses[i].memberList.Members) + for j := range o.etcdStatuses[i].memberList.Members { + tokens := strings.Split(o.etcdStatuses[i].memberList.Members[j].Name, "-") + index, err := strconv.Atoi(tokens[len(tokens)-1]) + if err != nil { + continue + } + if index > max { + max = index + } + } } } } - return 0 + if max > -1 { + return max + 1 + } + + if epMax := o.endpointMaxIndex(); epMax > max { + max = epMax + } + if pvcMax := o.pvcMaxIndex(); pvcMax > max { + max = pvcMax + } + if max == -1 { + return int(*o.instance.Spec.Replicas) + } + return max + 1 }