//4.获取服务的注册与发现,因为返回的是List,所以体现出了Dubbo支持多注册的特点 //这里会将我们的实例转换成registry协议(便于管理) List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true); //循环进行协议的发布,体现了dubbo支持多协议的特点 for (ProtocolConfig protocolConfig : protocols) { //构建一个pathKey,服务名称/group/version StringpathKey= URL.buildKey(getContextPath(protocolConfig) .map(p -> p + "/" + path) .orElse(path), group, version); // In case user specified path, register service one more time to map it to path. //将获得到path注册到注册仓库中 repository.registerService(pathKey, interfaceClass); // TODO, uncomment this line once service key is unified //设置当前服务元数据的key serviceMetadata.setServiceKey(pathKey); //发布当前的协议服务,到注册中心 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
//添加运行时参数 ServiceConfig.appendRuntimeParameters(map); //添加应用信息、module信息、provider信息... AbstractConfig.appendParameters(map, getMetrics()); AbstractConfig.appendParameters(map, getApplication()); AbstractConfig.appendParameters(map, getModule()); // remove 'default.' prefix for configs from ProviderConfig // appendParameters(map, provider, Constants.DEFAULT_KEY); AbstractConfig.appendParameters(map, provider); AbstractConfig.appendParameters(map, protocolConfig); AbstractConfig.appendParameters(map, this); MetadataReportConfigmetadataReportConfig= getMetadataReportConfig(); //REMOTE表示配置信息的远程 if (metadataReportConfig != null && metadataReportConfig.isValid()) { map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE); } //如果有单独的配置,则对单独的配置信息做处理 if (CollectionUtils.isNotEmpty(getMethods())) { for (MethodConfig method : getMethods()) { AbstractConfig.appendParameters(map, method, method.getName()); StringretryKey= method.getName() + ".retry"; if (map.containsKey(retryKey)) { StringretryValue= map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } List<ArgumentConfig> arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { for (ArgumentConfig argument : arguments) { // convert argument type if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); // visit all methods if (methods.length > 0) { for (inti=0; i < methods.length; i++) { StringmethodName= methods[i].getName(); // target the method, and get its signature if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); // one callback in the method if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { thrownewIllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // multiple callbacks in the method for (intj=0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { AbstractConfig.appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { thrownewIllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } elseif (argument.getIndex() != -1) { AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { thrownewIllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); }
} } } // end of methods for }
//泛化方式 if (ProtocolUtils.isGeneric(generic)) { map.put(GENERIC_KEY, generic); map.put(METHODS_KEY, ANY_VALUE); } else { Stringrevision= Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put(REVISION_KEY, revision); } //获取包装类,调用的还是接口对象的原有方法,为了可以用统一的方式获取到不同用户的不同的Dubbo接口 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(newHashSet<String>(Arrays.asList(methods)), ",")); } }
/** * Here the token value configured by the provider is used to assign the value to ServiceConfig#token */ if(ConfigUtils.isEmpty(token) && provider != null) { token = provider.getToken(); }
if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } //init serviceMetadata attachments serviceMetadata.getAttachments().putAll(map);
// You can customize Configurator to append extra parameters //判断 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); }
//以上都是为了加下来操作的前置操作,开始服务暴露 //本地暴露、远程暴露 //判断你的配置信息,在远程还是在本地,如果没有配置默认在本地 Stringscope= url.getParameter(SCOPE_KEY); // don't export when none is configured //1 if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote) //本地暴露 if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } //判断是否远程暴露 // export to remote if the config is not local (export to local only when config is local) if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (CollectionUtils.isNotEmpty(registryURLs)) { //注册中心不为空,循环遍历注册中心信息 //2 for (URL registryURL : registryURLs) { //if protocol is only injvm ,not register //如果在本地,就将协议改为injvm if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } //生产动态URL url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); //加入监控 URLmonitorUrl= ConfigValidationUtils.loadMonitor(this, registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { if (url.getParameter(REGISTER_KEY, true)) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } else { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } }
// For providers, this is used to enable custom proxy to generate invoker Stringproxy= url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } //创建invoker //通过PROXY_FACTORY获取到invoker,并将需要导出的信息封装到invoker对象中 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvokerwrapperInvoker=newDelegateProviderMetaDataInvoker(invoker, this); //将invoker对象封装到Exporter中(用于对外暴露) //3 4 Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); } } else { //如果没有配置注册中心地址的话 if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvokerwrapperInvoker=newDelegateProviderMetaDataInvoker(invoker, this);
// decide if we need to delay publish booleanregister= providerUrl.getParameter(REGISTER_KEY, true); if (register) { //服务的注册 //registeredProviderUrl 服务提供者的IP、端口号 //可以获取到服务提供者的一些信息、从而能够调用到服务 register(registryUrl, registeredProviderUrl); }
... //Ensure that a new exporter instance is returned every time export returnnewDestroyableExporter<>(exporter); }
//export an stub service for dispatching event BooleanisStubSupportEvent= url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); BooleanisCallbackservice= url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { StringstubServiceMethods= url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(newIllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); }
privatevoidopenServer(URL url) { // find server. Stringkey= url.getAddress(); //client can export a service which's only for server to invoke //服务在缓存中存在,则取出,不存在则新建 booleanisServer= url.getParameter(IS_SERVER_KEY, true); if (isServer) { ProtocolServerserver= serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { //创建一个服务(也用到了url) serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override server.reset(url); } } }
private ProtocolServer createServer(URL url) { url = URLBuilder.from(url) // send readonly event when server closes, it's enabled by default .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()) // enable heartbeat by default .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)) .addParameter(CODEC_KEY, DubboCodec.NAME) .build(); Stringstr= url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
public Registry getRegistry(URL url) { //判断是否存在 if (destroyed.get()) { LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " + "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of."); return DEFAULT_NOP_REGISTRY; }
url = URLBuilder.from(url) .setPath(RegistryService.class.getName()) .addParameter(INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(EXPORT_KEY, REFER_KEY) .build(); //没有就创建 Stringkey= createRegistryCacheKey(url); // Lock the registry access process to ensure a single instance of the registry LOCK.lock(); try { Registryregistry= REGISTRIES.get(key); if (registry != null) { return registry; } //create registry by spi/ioc //创建注册中心 registry = createRegistry(url); if (registry == null) { thrownewIllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // Release the lock LOCK.unlock(); } }
publicstatic <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { if (type == null) { thrownewIllegalArgumentException("Extension type == null"); } if (!type.isInterface()) { thrownewIllegalArgumentException("Extension type (" + type + ") is not an interface!"); } if (!withExtensionAnnotation(type)) { thrownewIllegalArgumentException("Extension type (" + type + ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!"); }