Skip to content

Commit

Permalink
yarn-operator: change to client short connection (#17)
Browse files Browse the repository at this point in the history
Signed-off-by: 佑祎 <[email protected]>
  • Loading branch information
zwzhang0107 authored Jul 19, 2023
1 parent db349ce commit 3d48afc
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- uses: golangci/golangci-lint-action@v3
with:
version: v1.47.3
args: --timeout 3m0s
args: --timeout 10m0s

unit-tests:
strategy:
Expand Down
2 changes: 1 addition & 1 deletion config/manager/yarn-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ spec:
fieldPath: metadata.namespace
- name: HADOOP_CONF_DIR
value: /etc/hadoop-conf
image: registry.cn-beijing.aliyuncs.com/koordinator-sh/yarn-operator:add-ha-client-43e76a8
image: registry.cn-beijing.aliyuncs.com/koordinator-sh/yarn-operator:fix-rpc-8843dea
imagePullPolicy: Always
name: yarn-operator
ports:
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/noderesource/resource_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (r *YARNResourceSyncReconciler) getYARNNodeIDWithPodAnno(node *corev1.Node)
}
return tokens[0], int32(port), nil
}
return "", 0, fmt.Errorf("node %s doesn't have %s pods, just ignored", node.Name, YarnNamespace)
return "", 0, nil
}

func (r *YARNResourceSyncReconciler) getYARNNodeID(node *corev1.Node) (string, int32, error) {
Expand Down
89 changes: 45 additions & 44 deletions pkg/yarn/client/ipc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ import (
"bytes"
"errors"
"fmt"
"log"
"net"
"strings"
"sync"

gouuid "github.com/nu7hatch/gouuid"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"k8s.io/klog/v2"
"log"
"net"
"strings"

yarnauth "github.com/koordinator-sh/goyarn/pkg/yarn/apis/auth"
hadoop_common "github.com/koordinator-sh/goyarn/pkg/yarn/apis/proto/hadoopcommon"
Expand Down Expand Up @@ -104,13 +102,16 @@ func (c *Client) Call(rpc *hadoop_common.RequestHeaderProto, rpcRequest proto.Me
// Read & return response
err = c.readResponse(conn, &rpcCall)

// TODO keep connection alive for reuse
conn.con.Close()

return err
}

var connectionPool = struct {
sync.RWMutex
connections map[connection_id]*connection
}{connections: make(map[connection_id]*connection)}
//var connectionPool = struct {
// sync.RWMutex
// connections map[connection_id]*connection
//}{connections: make(map[connection_id]*connection)}

func findUsableTokenForService(service string) (*hadoop_common.TokenProto, bool) {
userTokens := security.GetCurrentUser().GetUserTokens()
Expand All @@ -131,52 +132,52 @@ func findUsableTokenForService(service string) (*hadoop_common.TokenProto, bool)

func getConnection(c *Client, connectionId *connection_id) (*connection, error) {
// Try to re-use an existing connection
connectionPool.RLock()
con := connectionPool.connections[*connectionId]
connectionPool.RUnlock()
//connectionPool.RLock()
//con := connectionPool.connections[*connectionId]
//connectionPool.RUnlock()

// If necessary, create a new connection and save it in the connection-pool
var err error
if con == nil {
con, err = setupConnection(c)
if err != nil {
klog.Warningf("Couldn't setup connection: ", err)
return nil, err
}

connectionPool.Lock()
connectionPool.connections[*connectionId] = con
connectionPool.Unlock()
//var err error
//if con == nil {
con, err := setupConnection(c)
if err != nil {
klog.Warningf("Couldn't setup connection: ", err)
return nil, err
}

var authProtocol yarnauth.AuthProtocol = yarnauth.AUTH_PROTOCOL_NONE
//connectionPool.Lock()
//connectionPool.connections[*connectionId] = con
//connectionPool.Unlock()

if _, found := findUsableTokenForService(c.ServerAddress); found {
log.Printf("found token for service: %s", c.ServerAddress)
authProtocol = yarnauth.AUTH_PROTOCOL_SASL
}
var authProtocol yarnauth.AuthProtocol = yarnauth.AUTH_PROTOCOL_NONE

err := writeConnectionHeader(con, authProtocol)
if err != nil {
return nil, err
}
if _, found := findUsableTokenForService(c.ServerAddress); found {
log.Printf("found token for service: %s", c.ServerAddress)
authProtocol = yarnauth.AUTH_PROTOCOL_SASL
}

if authProtocol == yarnauth.AUTH_PROTOCOL_SASL {
log.Println("attempting SASL negotiation.")
err = writeConnectionHeader(con, authProtocol)
if err != nil {
return nil, err
}

if err = negotiateSimpleTokenAuth(c, con); err != nil {
klog.Warningf("failed to complete SASL negotiation!")
return nil, err
}
if authProtocol == yarnauth.AUTH_PROTOCOL_SASL {
log.Println("attempting SASL negotiation.")

} else {
log.Println("no usable tokens. proceeding without auth.")
}

err = writeConnectionContext(c, con, connectionId, authProtocol)
if err != nil {
if err = negotiateSimpleTokenAuth(c, con); err != nil {
klog.Warningf("failed to complete SASL negotiation!")
return nil, err
}

} else {
log.Println("no usable tokens. proceeding without auth.")
}

err = writeConnectionContext(c, con, connectionId, authProtocol)
if err != nil {
return nil, err
}
//}

return con, nil
}
Expand Down

0 comments on commit 3d48afc

Please sign in to comment.