Skip to content

Commit

Permalink
feat: Adapt config.load (apache#2531)
Browse files Browse the repository at this point in the history
* Adapt config.load

* resolve circular dependencies

* add license

* add comment

* modify comment

* modify comment

* fix key

* modify comment
  • Loading branch information
FinalT authored Dec 5, 2023
1 parent 2794682 commit ffb8b76
Show file tree
Hide file tree
Showing 22 changed files with 168 additions and 2,724 deletions.
4 changes: 2 additions & 2 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,13 @@ const (

// reflection service
const (
ReflectionServiceTypeName = "DubbogoServerReflectionServer"
ReflectionServiceTypeName = "ReflectionServer"
ReflectionServiceInterface = "grpc.reflection.v1alpha.ServerReflection"
)

// healthcheck service
const (
HealthCheckServiceTypeName = "DubbogoHealthServer"
HealthCheckServiceTypeName = "HealthCheckServer"
HealthCheckServiceInterface = "grpc.health.v1.Health"
)

Expand Down
27 changes: 27 additions & 0 deletions config/invoker_compat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

// NewInfoInvoker is used to resolve circular dependencies temporarily.
// source: server/compat.go func compatNewInfoInvoker(url *common.URL, info interface{}, svc common.RPCService) protocol.Invoker
var NewInfoInvoker func(url *common.URL, info interface{}, svc common.RPCService) protocol.Invoker
56 changes: 29 additions & 27 deletions config/provider_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/dubbogo/gost/log/logger"

perrors "github.com/pkg/errors"

tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
)

import (
Expand Down Expand Up @@ -121,33 +123,33 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {
serviceConfig.adaptiveService = c.AdaptiveService
}

//for k, v := range rc.Protocols {
// if v.Name == tripleConstant.TRIPLE {
// // Auto create grpc based health check service.
// healthService := NewServiceConfigBuilder().
// SetProtocolIDs(k).
// SetNotRegister(true).
// SetInterface(constant.HealthCheckServiceInterface).
// Build()
// if err := healthService.Init(rc); err != nil {
// return err
// }
// c.Services[constant.HealthCheckServiceTypeName] = healthService
//
// // Auto create reflection service configure only when provider with triple service is configured.
// tripleReflectionService := NewServiceConfigBuilder().
// SetProtocolIDs(k).
// SetNotRegister(true).
// SetInterface(constant.ReflectionServiceInterface).
// Build()
// if err := tripleReflectionService.Init(rc); err != nil {
// return err
// }
// // Maybe only register once, If setting this service, break from traversing Protocols.
// c.Services[constant.ReflectionServiceTypeName] = tripleReflectionService
// break
// }
//}
for k, v := range rc.Protocols {
if v.Name == tripleConstant.TRIPLE {
// Auto create grpc based health check service.
healthService := NewServiceConfigBuilder().
SetProtocolIDs(k).
SetNotRegister(true).
SetInterface(constant.HealthCheckServiceInterface).
Build()
if err := healthService.Init(rc); err != nil {
return err
}
c.Services[constant.HealthCheckServiceTypeName] = healthService

// Auto create reflection service configure only when provider with triple service is configured.
tripleReflectionService := NewServiceConfigBuilder().
SetProtocolIDs(k).
SetNotRegister(true).
SetInterface(constant.ReflectionServiceInterface).
Build()
if err := tripleReflectionService.Init(rc); err != nil {
return err
}
// Maybe only register once, If setting this service, break from traversing Protocols.
c.Services[constant.ReflectionServiceTypeName] = tripleReflectionService
break
}
}

if err := c.check(); err != nil {
return err
Expand Down
23 changes: 21 additions & 2 deletions config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import (
var (
conServicesLock = sync.Mutex{} // used to guard conServices map.
conServices = map[string]common.RPCService{} // service name -> service
proServicesLock = sync.Mutex{} // used to guard proServices map
proServicesLock = sync.Mutex{} // used to guard proServices map and proServicesInfo map
proServices = map[string]common.RPCService{} // service name -> service
proServicesInfo = map[string]interface{}{} // service name -> service info
interfaceNameConServicesLock = sync.Mutex{} // used to guard interfaceNameConServices map
interfaceNameConServices = map[string]common.RPCService{} // interfaceName -> service
)
Expand All @@ -49,7 +50,7 @@ func SetConsumerService(service common.RPCService) {
conServices[ref] = service
}

// SetProviderService is called by init() of implement of RPCService
// SetProviderService is called by init() of implementation of RPCService
func SetProviderService(service common.RPCService) {
ref := common.GetReference(service)
proServicesLock.Lock()
Expand All @@ -60,6 +61,18 @@ func SetProviderService(service common.RPCService) {
proServices[ref] = service
}

// SetProviderServiceWithInfo is called by init() of implementation of RPCService
func SetProviderServiceWithInfo(service common.RPCService, info interface{}) {
ref := common.GetReference(service)
proServicesLock.Lock()
defer func() {
proServicesLock.Unlock()
logger.Debugf("A provider service %s was registered successfully.", ref)
}()
proServices[ref] = service
proServicesInfo[ref] = info
}

// GetConsumerService gets ConsumerService by @name
func GetConsumerService(name string) common.RPCService {
conServicesLock.Lock()
Expand All @@ -79,6 +92,12 @@ func GetProviderServiceMap() map[string]common.RPCService {
return proServices
}

func GetProviderServiceInfo(name string) interface{} {
proServicesLock.Lock()
defer proServicesLock.Unlock()
return proServicesInfo[name]
}

// GetConsumerServiceMap gets ProviderServiceMap
func GetConsumerServiceMap() map[string]common.RPCService {
return conServices
Expand Down
22 changes: 19 additions & 3 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,10 @@ func (s *ServiceConfig) Export() error {
return nil
}

var invoker protocol.Invoker
ports := getRandomPort(protocolConfigs)
nextPort := ports.Front()
proxyFactory := extension.GetProxyFactory(s.ProxyFactoryKey)

for _, proto := range protocolConfigs {
// registry the service reflect
methods, err := common.ServiceMap.Register(s.Interface, proto.Name, s.Group, s.Version, s.rpcService)
Expand Down Expand Up @@ -290,6 +291,11 @@ func (s *ServiceConfig) Export() error {
common.WithParamsValue(constant.MaxServerSendMsgSize, proto.MaxServerSendMsgSize),
common.WithParamsValue(constant.MaxServerRecvMsgSize, proto.MaxServerRecvMsgSize),
)
info := GetProviderServiceInfo(s.id)
if info != nil {
ivkURL.SetAttribute(constant.ServiceInfoKey, info)
}

if len(s.Tag) > 0 {
ivkURL.AddParam(constant.Tagkey, s.Tag)
}
Expand All @@ -311,7 +317,9 @@ func (s *ServiceConfig) Export() error {

for _, regUrl := range regUrls {
setRegistrySubURL(ivkURL, regUrl)
invoker := proxyFactory.GetInvoker(regUrl)

invoker = s.generatorInvoker(regUrl, info)

exporter := s.cacheProtocol.Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is {%v}, url is {%v}", regUrl, ivkURL))
Expand All @@ -329,7 +337,7 @@ func (s *ServiceConfig) Export() error {
logger.Warnf("SetMetadataServiceURL error = %s", err)
}
}
invoker := proxyFactory.GetInvoker(ivkURL)
s.generatorInvoker(ivkURL, info)
exporter := extension.GetProtocol(protocolwrapper.FILTER).Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error, url is {%v}", ivkURL))
Expand All @@ -342,6 +350,14 @@ func (s *ServiceConfig) Export() error {
return nil
}

func (s *ServiceConfig) generatorInvoker(regUrl *common.URL, info interface{}) protocol.Invoker {
proxyFactory := extension.GetProxyFactory(s.ProxyFactoryKey)
if info == nil {
return proxyFactory.GetInvoker(regUrl)
}
return NewInfoInvoker(regUrl, info, s.rpcService)
}

// setRegistrySubURL set registry sub url is ivkURl
func setRegistrySubURL(ivkURL *common.URL, regUrl *common.URL) {
ivkURL.AddParam(constant.RegistryKey, regUrl.GetParam(constant.RegistryKey, ""))
Expand Down
Loading

0 comments on commit ffb8b76

Please sign in to comment.