源码解读

本文基于dubbo2.7.8。

Dubbo的启动

dubbo中的服务,之所以可以暴露/引入,都是基于xml文件的配置。

以下是一段配置信息

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

<dubbo:application name="dubbo-server" />
<dubbo:registry address="N/A" timeout="10000" check="false"/>
<dubbo:protocol name="dubbo" port="20880" />
<dubbo:service interface="com.example.dubbo.server.loginService" ref="loginService" />
<bean id="oginService" class="com.example.dubbo.server.loginServiceImpl"/>

</beans>

在Spring2.5之后,Spring支持自定义的Schema扩展xml配置。

接下来将展示dubbo在获取到以上xml文件后做了哪些处理。

image-20230305224606479

http\://dubbo.apache.org/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/compat/dubbo.xsd

由此可见,dubbo的约束都存放在/compat/dubbo.xsd中。

同目录下的spring.handlers中存放着真是的配置文件

image-20230305225537991

http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler

打开DubboNamespaceHandler方法

image-20230305225824841

可以看到该类中定义了初始化方法,并在其中将dubbo的各个层级都封装到对应的BeanDefinition对象(bean定义信息)中。

上述的xml文件经过解析之后,就会得到以下代码:

<bean id="dubbo-server" class="com.alibaba.dubbo.config.ApplicationConfig"/>  
<bean id="registryConfig" class="com.alibaba.dubbo.config.RegistryConfig">
<property name="address" value="192.168.88.88:2181"/>
<property name="protocol" value="zookeeper"/>
</bean>
<bean id="dubbo" class="com.alibaba.dubbo.config.ProtocolConfig">
<property name="port" value="20880"/>
</bean>
<bean id="com.example.dubbo.server.loginService" class="com.alibaba.dubbo.config.spring.ServiceBean">
<property name="interface" value="com.example.dubbo.server.loginService"/>
<property name="ref" ref="loginService"/>
</bean>
<bean id="loginService" class="com.example.dubbo.server.loginServiceImpl" />

由此可见,服务的发送和接收都是由com.alibaba.dubbo.config.spring.ServiceBean方法完成的。

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware
  • InitializingBean:初始化方法
  • DisposableBean:销毁方法
  • ApplicationContextAware:设置上下文对象属性
  • BeanNameAware:设置BeanName属性
  • ApplicationEventPublisherAware:事件发布的相关属性(事件的发布对应着事件的监听——观察者模式)

打开DubboBootstrap方法

public class DubboBootstrap extends GenericEventListener {

发现其继承了监听器

并且有一个start方法,其中有一个exportServices方法

private void exportServices() {
//创建的每一个serviceConfig都会添加到configManager(一个map)中
//我们可以在此获取到所有的serviceConfig对象,并进行遍历处理
configManager.getServices().forEach(sc -> {
// TODO, compatible with ServiceConfig.export()
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
//使用线程池进行服务的暴露,异步暴露
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
asyncExportingFutures.add(future);
} else {
sc.export();
//记录所有暴露的服务
exportedServices.add(sc);
}
});
}

但是存在一个问题,在哪里调用start方法?

随意打开一个Dubbo工程,找到SimpleApplicationEventMulticaster类的doInvokeListener方法

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
} catch (ClassCastException var6) {
String msg = var6.getMessage();
if (msg != null && !this.matchesClassCastMessage(msg, event.getClass().getName())) {
throw var6;
}

Log logger = LogFactory.getLog(this.getClass());
if (logger.isDebugEnabled()) {
logger.debug("Non-matching event type for listener: " + listener, var6);
}
}

}

其中调用了listener.onApplicationEvent(event)接口

public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E var1);
}

image-20230306001940715

他在Dubbo中有一个叫做OneTimeExecutionApplicationContextEventListener的实现类,其中有一个onApplicationContextEvent抽象方法。

abstract class OneTimeExecutionApplicationContextEventListener implements ApplicationListener, ApplicationContextAware {
private ApplicationContext applicationContext;

OneTimeExecutionApplicationContextEventListener() {
}
...
protected abstract void onApplicationContextEvent(ApplicationContextEvent event);
...
}

该抽象方法在DubboBootstrapApplicationListener实现类中的onContextRefreshedEvent就是调用start方法的位置。

public void onApplicationContextEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
this.onContextRefreshedEvent((ContextRefreshedEvent)event);
} else if (event instanceof ContextClosedEvent) {
this.onContextClosedEvent((ContextClosedEvent)event);
}
}

private void onContextRefreshedEvent(ContextRefreshedEvent event) {
this.dubboBootstrap.start();
}

服务发布的流程

调用start方法时,我们注意到有一个exportServices()方法,该方法里面都是通过ServiceConfig来调用export方法。

进入export方法,发现主要做了类加载和属性初始化的处理。

public synchronized void export() {
if (!shouldExport()) {
return;
}

//类加载
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.initialize();
}

checkAndUpdateSubConfigs();

//初始化属性
serviceMetadata.setVersion(getVersion());
serviceMetadata.setGroup(getGroup());
serviceMetadata.setDefaultGroup(getGroup());
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());

if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport();
}

exported();
}

并且有一个doExport方法。

protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;

if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}

其中有一个doExportUrls方法

private void doExportUrls() {
//1.获取到服务仓库,用来做类似于缓存的操作
ServiceRepository repository = ApplicationModel.getServiceRepository();
//2.注册当前的所发布的服务到服务仓库
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
//3.注册当前提供者到服务仓库中去
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);

//4.获取服务的注册与发现,因为返回的是List,所以体现出了Dubbo支持多注册的特点
//这里会将我们的实例转换成registry协议(便于管理)
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
//循环进行协议的发布,体现了dubbo支持多协议的特点
for (ProtocolConfig protocolConfig : protocols) {
//构建一个pathKey,服务名称/group/version
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
//将获得到path注册到注册仓库中
repository.registerService(pathKey, interfaceClass);
// TODO, uncomment this line once service key is unified
//设置当前服务元数据的key
serviceMetadata.setServiceKey(pathKey);
//发布当前的协议服务,到注册中心
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

其中的核心方法doExportUrlsFor1Protocol,负责将当前得到协议服务发布到注册中心中。

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//解析配置参数,获取协议名称
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}

//用map存放服务的配置信息
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);

//添加运行时参数
ServiceConfig.appendRuntimeParameters(map);
//添加应用信息、module信息、provider信息...
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, provider);
AbstractConfig.appendParameters(map, protocolConfig);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();

//REMOTE表示配置信息的远程
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}

//如果有单独的配置,则对单独的配置信息做处理
if (CollectionUtils.isNotEmpty(getMethods())) {
for (MethodConfig method : getMethods()) {
AbstractConfig.appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}

}
}
} // end of methods for
}

//泛化方式
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}

//获取包装类,调用的还是接口对象的原有方法,为了可以用统一的方式获取到不同用户的不同的Dubbo接口
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}

/**
* Here the token value configured by the provider is used to assign the value to ServiceConfig#token
*/
if(ConfigUtils.isEmpty(token) && provider != null) {
token = provider.getToken();
}

if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
//init serviceMetadata attachments
serviceMetadata.getAttachments().putAll(map);

// export service
//将name, host, port等信息进行拼接成一个完整的url,通过该url就可以获取到对应的参数、协议等信息
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

// You can customize Configurator to append extra parameters
//判断
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}

//以上都是为了加下来操作的前置操作,开始服务暴露
//本地暴露、远程暴露
//判断你的配置信息,在远程还是在本地,如果没有配置默认在本地
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
//1
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

// export to local if the config is not remote (export to remote only when config is remote)
//本地暴露
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
//判断是否远程暴露
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
//注册中心不为空,循环遍历注册中心信息
//2
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
//如果在本地,就将协议改为injvm
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
//生产动态URL
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
//加入监控
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}

// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}

//创建invoker
//通过PROXY_FACTORY获取到invoker,并将需要导出的信息封装到invoker对象中
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//将invoker对象封装到Exporter中(用于对外暴露)
//3 4
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);

}
} else {
//如果没有配置注册中心地址的话
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
if (metadataService != null) {
metadataService.publishServiceDefinition(url);
}
}
}
this.urls.add(url);
}
  1. 判断是否要发布到远程服务,或者说是否需要发布服务,如果为none,那么我们就不需要发布

    1. 如果在本地,那么还需要将服务在本地暴露(暴露在JVM当中,不需要网络之间的通信)

      if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
      exportLocal(url);
      }
      ...
      //创建invoke
      private void exportLocal(URL url) {
      //创建一个新的URL,指向本地
      URL local = URLBuilder.from(url)
      .setProtocol(LOCAL_PROTOCOL)
      .setHost(LOCALHOST_VALUE)
      .setPort(0)
      .build();
      //导出方法
      Exporter<?> exporter = PROTOCOL.export(
      PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
      exporters.add(exporter);
      logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
      }
  2. 循环遍历注册中心的配置,实际上这里就是对多注册中心支持的最好体现

  3. Protocol是自适应扩展点,在该类中export方法被@Adaptive修饰着,会生成对应的动态代理类Protoco$Adaptive,这个实例就会调用export方法

    1. 通过getInvoker方法创建invoker,并进行初始化

      public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
      // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
      //为目标代理类生成Wrapper,并放到缓存;
      //后续会从缓存中进行获取,存在直接返回
      final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
      //不存在就新建一个返回
      return new AbstractProxyInvoker<T>(proxy, type, url) {
      @Override
      protected Object doInvoke(T proxy, String methodName,
      Class<?>[] parameterTypes,
      Object[] arguments) throws Throwable {
      return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
      }
      };
      }
  4. 包装类的作用

    1. Protocol中export方法的流程:三个比较核心的实现类

      1. ProtocolListenerWrapper

        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //是否注册?注册了就继续下一步
        if (UrlUtils.isRegistry(invoker.getUrl())) {
        return protocol.export(invoker);
        }
        //没有注册就开启监听
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
        Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
        .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
        }
      2. ProtocolFilterWrapper

        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //是否注册?注册了就继续下一步
        if (UrlUtils.isRegistry(invoker.getUrl())) {
        return protocol.export(invoker);
        }
        //
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
        }
      3. RegistryProtocol

        @Override
        public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //后续需要查看注册到哪个注册中心等信息,所以还是需要的
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        //消费者消费的URL
        URL providerUrl = getProviderUrl(originInvoker);


        ...
        //export invoker
        //启动netty服务,发布dubbo
        //originInvoker 是javasiste生成的代理类,核心实体域
        //providerUrl ip+地址+全限定名(配置文件中)
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        ...

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
        //服务的注册
        //registeredProviderUrl 服务提供者的IP、端口号
        //可以获取到服务提供者的一些信息、从而能够调用到服务
        register(registryUrl, registeredProviderUrl);
        }

        ...

        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
        }

总结

在serviceConfig中,在进行本地/远程暴露的判断后,将url等信息封装到invoker对象中,再将invoker封装到Protocol中(为了能够调用自适应扩展点),并调用export方法。然后通过自适应扩展点的方法,经过Listener(监听)、Filter(拦截点)、Registry(netty的启动和服务的注册、发布)对数据进行处理。

doLocalExport—服务启动的流程

doLocalExport(originInvoker, providerUrl)方法

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);

return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();

// export service.
// dubbo会先对url进行封装
// 192.168.20.216:8080
String key = serviceKey(url);
//然后在进行二次封装
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);

//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}

}
}

//开启服务
openServer(url);
optimizeSerialization(url);

return exporter;
}

由serviceKey可以看出exporter的key就是端口号、ip和url。

protected static String serviceKey(URL url) {
int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
return serviceKey(port, url.getPath(), url.getParameter(VERSION_KEY), url.getParameter(GROUP_KEY));
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
//服务在缓存中存在,则取出,不存在则新建
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//创建一个服务(也用到了url)
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}

ExchangeServer server;
try {
//service最终赋值的地方,底层用netty,管道传输...
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}

str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}

return new DubboProtocolServer(server);
}

invokerDelegate委派的类进行发布,底层使用的是netty

protocol.export自适应的扩展点

register—服务注册的流程

会将已经启动的服务发不到注册中心上去,根据不同的服务类型,发送到不同的服务(nocas、zookeeper、dubbo…)

image-20230310001919562

RegistryProtocol中的export方法

// url to registry
//注册方法
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
protected Registry getRegistry(final Invoker<?> originInvoker) {
//URL拼接
URL registryUrl = getRegistryUrl(originInvoker);
//主要方法
return registryFactory.getRegistry(registryUrl);
}
@SPI("dubbo")
public interface RegistryFactory {
//自适应拓展类,会自动生产RegistryAdaptive代理类
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}

当我们使用xml配置文件,绑定注册中心后,dubbo就会去读取注册中心的配置文件org.apache.dubbo.registry.RegistryFactory,其中有很多RegistryFactory的实现类。

最终得到的结果是一个包装类,外层是RegistryFactoryWrapper,内层是真正的ZookeeperRegistryFactory

RegistryFactoryWrapper(ZookeeperRegistryFactory)

但是我们发现,RegistryFactory并没有具体的ZookeeperRegistryFactory/NocasRegistryFactory,都是通过继承RegistryFactory的抽象实现类AbstractRegistryFactory来实现的。

public Registry getRegistry(URL url) {
//判断是否存在
if (destroyed.get()) {
LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
"Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
return DEFAULT_NOP_REGISTRY;
}

url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
//没有就创建
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
//创建注册中心
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
protected String createRegistryCacheKey(URL url) {
return url.toServiceStringWithoutResolving();
}

image-20230310005738865

其中的createRegistry方法在各个注册中心中进行实现。

但是此处生成的register并没有被用到,而真正用到的方法是register(registryUrl, registeredProviderUrl);

private void register(URL registryUrl, URL registeredProviderUrl) {
//和上面一样,为了发布到注册中心
Registry registry = registryFactory.getRegistry(registryUrl);

registry.register(registeredProviderUrl);
}

image-20230310011226264

因为register的返回需要一个包装类,所以优先调用的是ListenerRegistryWrapper

但是其中并没有直接的注册中心相关的实现类,回过头来查看注册中心注册类的类关系:

image-20230310011805220

可见并没有直接的关系。

一般情况下,通用的处理逻辑都会放到AbstractRegistry中,当需要容错处理时,才会去FailbackRegistry中进行处理。在FailbackRegistry会执行doRegister方法,如果执行失败就行进行容错处理。

总结

  1. 生成invoke对象

  2. doLocalExport方法发布一个本地服务,实际上就是利用配置去进行一个监听

  3. 通过Registy去进行服务注册

整个服务注册的过程中,都是对URL的处理。SPI机制也贯穿了整个服务注册的流程

扩展点在源码中的应用

在ServiceConfig类中,有很多的参数都是通过扩展点的方式获取的,例如:

private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();


private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

getExtensionLoader方法

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type (" + type +
") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
}

ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}

getExtension方法

public T getExtension(String name) {
return getExtension(name, true);
}

public T getExtension(String name, boolean wrap) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name, wrap);
holder.set(instance);
}
}
}
return (T) instance;
}

两个方法的实现逻辑基本一致,先去查找缓存,缓存中存在则直接使用,不存在就新建

Dubbo的扩展点

概述

扩展点分类

指定名称的扩展点

ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("name");

自适应的扩展点

ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

激活扩展点

ExtensionLoader.getExtensionLoader(Protocol.class).getExtension();

dubbo的扩展点

image-20230307222118748

org.apache.dubbo.rpc.Protocol为例:

filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
rmi=org.apache.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
org.apache.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=org.apache.dubbo.rpc.protocol.thrift.ThriftProtocol
native-thrift=org.apache.dubbo.rpc.protocol.nativethrift.ThriftProtocol
memcached=org.apache.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=org.apache.dubbo.rpc.protocol.redis.RedisProtocol
rest=org.apache.dubbo.rpc.protocol.rest.RestProtocol
xmlrpc=org.apache.dubbo.xml.rpc.protocol.xmlrpc.XmlRpcProtocol
grpc=org.apache.dubbo.rpc.protocol.grpc.GrpcProtocol
registry=org.apache.dubbo.registry.integration.RegistryProtocol
service-discovery-registry=org.apache.dubbo.registry.client.ServiceDiscoveryRegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper

其中包含了很多协议的链接方式,当我们配置不同的协议时,dubbo就会根据协议对应的全限定的名称去查找他的处理类

自定义的扩展点

在resources目录下创建dubbo目录,创建一个org.apache.dubbo.rpc.cluster.LoadBalance文件

LoadBalance=com.example.dubbo.client.Demo

AbstractLoadBalance类的package

image-20230307221051995

dubbo默认的负载均衡机制是随机

@SPI("random")
public interface LoadBalance {
@Adaptive({"loadbalance"})
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}

我们可以将负载均衡的策略换成我们自己的。设计一个简单的扩展点

package com.example.dubbo.client;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance;
import org.springframework.cglib.core.internal.Function;

import java.util.List;

//继承负载均衡扩展点
public class Demo extends AbstractLoadBalance {
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//...
return null;
}

interface a<T> {
}

public void SPI() {
//替换扩展点
String loadBalance = "random";
//URL loadBalance = 'LoadBalance'
//loadBalance = "LoadBalance"
LoadBalance msbLoadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(loadBalance);
}
}

指定名称的扩展点

从指定文件中根据指定的名称找到自定义的拓展点,直接调用

加载流程

  1. 找到路径:在我们的META-INF/dubbo/inertnal下面

  2. 找到对应的文件,查看全限定名

random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
shortestresponse=org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance
  1. 通过全限定名查找到对应的类

  2. 解析文件(类似properties文件的解析 key:value)

  3. 内容放进内存key(name): value(org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance)

  4. 通过key:value中的全限定名,利用反射获取到对象

自适应拓展点

会根据上下文来判断当前你返回哪个拓展点,通过类似于代理的方法调用

@Adaptive

  • 声明在我们的方法级别
    • 会动态的创建一个代理类 (javassist创建的)
  • 声明在我们的类级别
    • 直接返回被修饰的这个类
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
//修饰的是类,直接返回
return cachedAdaptiveClass;
}
//动态的创建一个代理类
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
private Class<?> createAdaptiveExtensionClass() {
//生成字节码
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

如果我们当加载的扩展点存在自适应的类,那么我们直接返回,否则,我们会动态的创建一个字节码,然后进行返回。

激活拓展点

Filter

image-20230307234101005

Filter的作用是过滤请求链路,同时也可以拿来做扩展。

计算激活的Filter

public void testSPI(){
ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(Filter.class);
URL url = new URL("","",0);//协议、端口、IP
List<Filter> cache = extensionLoader.getActivateExtension(url, "cache");
System.out.println(cache.size());
}

运行结果

...

10

Process finished with exit code 0

表示默认激活的Filter有10个。

对方法进行调整,加入自定义的url

public void testSPI() {
ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(Filter.class);
URL url = new URL("", "", 0);//协议、端口、IP
//自定义
url = url.addParameter("cache", "cache");
List<Filter> cache = extensionLoader.getActivateExtension(url, "cache");
System.out.println(cache.size());
}

运行结果

...

11

Process finished with exit code 0

由此可见,自定义的url被注册到了Filter中

打开Filter的实现类CacheFilter

 */
@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)
public class CacheFilter implements Filter {
private CacheFactory cacheFactory;
...
}

@Activate

作用:当@Activate注解中有value时,此时当前的Filter就会被激活。

@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)

相当于Spring当中的@ConditionalOnBean(XXX.class)

@Activate(group = CommonConstants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
}

invoke对象

对象实例,保存在Map集合中。

当我们进行服务注册时,会收到客户端的请求,其中包含了方法名称,接口的全路径,参数,参数类型。

根据接口的全路径做为key,找到实例方法,并进行反射调用。

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

invoker的创建时基于PROXY_FACTORYgetInvoker方法的。

查看PROXY_FACTORY方法,发现他是一个自适应扩展点,通过SPI机制,查找配置文件。发现有三个实现类。

image-20230319232643964

  • 如果我们的Url中有配置了proxy,我就按照我们Proxy参数的配置来进行查找

  • 如果没有配置,那么我们采用默认的拓展点

image-20230319232810313

默认的扩展点是javassist

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//包装的信息,在getWrapper方法中进行拼接
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
//当我们的服务端接受到请求之后,我们接下来调用具体的方法,用动态的方式去生成invoker
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}

wrapper是为了帮我们生成一个invoker的代理类。而invoker本身就是一个调用器,服务本身会以我们invoker的形态存在。

服务消费

在配置文件中,有一个ref标签配置,找到我们的服务提供者。

还可以通过@DubboReference注解来获取服务提供者的信息。

  1. 生成远程服务的代理

  2. 获取目标服务的Url

    1. 建立与注册中心的动态感知
  3. 网络连接的建立

    1. 连接的建立有两种方式,启动时创建还是通信是创建?因为Dubbo采用长链接的方式,因此采用启动时创建的方式(Netty(nio))
    2. 而且dubbo是共享连接,不用使用时还去判断是否创建,都是在启动时就创建好的。
  4. 服务通信

    1. filter过滤

    2. 负载均衡

    3. 容错