源码解读 本文基于dubbo2.7.8。
服务消费 在配置文件中,有一个ref标签配置,找到我们的服务提供者。
还可以通过@DubboReference注解来获取服务提供者的信息。
生成远程服务的代理
获取目标服务的Url
建立与注册中心的动态感知
网络连接的建立
连接的建立有两种方式,启动时创建还是通信是创建?因为Dubbo采用长链接的方式,因此采用启动时创建的方式(Netty(nio))
而且dubbo是共享连接,不用使用时还去判断是否创建,都是在启动时就创建好的。
服务通信
filter过滤
负载均衡
容错
服务消费端的操作 服务消费端的对象的注入
都是在BeanPostProcessor中进行服务消费端的对象注入。
在ReferenceAnnotationBeanPostProcessor类中的构造方法:
由此可见:DubboReference.class可以做到Reference.class相同的功能。
同样在此类中,有一个真正执行bean的方法:doGetInjectedBean
其中的ReferenceBean方法和ServiceBean在同一级目录下,和ServiceBean的作用是相同的,都是将Reference注入到bean中,ReferenceBean在IOC容器中只能存在一个。
init方法中进行了初始化。主要进行了url的拼接。
public synchronized void init () { if (initialized) { return ; } if (bootstrap == null ) { bootstrap = DubboBootstrap.getInstance(); bootstrap.init(); } checkAndUpdateSubConfigs(); checkStubAndLocal(interfaceClass); ConfigValidationUtils.checkMock(interfaceClass, this ); Map<String, String> map = new HashMap <String, String>(); map.put(SIDE_KEY, CONSUMER_SIDE); ReferenceConfigBase.appendRuntimeParameters(map); if (!ProtocolUtils.isGeneric(generic)) { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0 ) { map.put(REVISION_KEY, revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0 ) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet <String>(Arrays.asList(methods)), COMMA_SEPARATOR)); } } map.put(INTERFACE_KEY, interfaceName); AbstractConfig.appendParameters(map, getMetrics()); AbstractConfig.appendParameters(map, getApplication()); AbstractConfig.appendParameters(map, getModule()); AbstractConfig.appendParameters(map, consumer); AbstractConfig.appendParameters(map, this ); MetadataReportConfig metadataReportConfig = getMetadataReportConfig(); if (metadataReportConfig != null && metadataReportConfig.isValid()) { map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE); } Map<String, AsyncMethodInfo> attributes = null ; if (CollectionUtils.isNotEmpty(getMethods())) { attributes = new HashMap <>(); for (MethodConfig methodConfig : getMethods()) { AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName()); String retryKey = methodConfig.getName() + ".retry" ; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false" .equals(retryValue)) { map.put(methodConfig.getName() + ".retries" , "0" ); } } AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig); if (asyncMethodInfo != null ) { attributes.put(methodConfig.getName(), asyncMethodInfo); } } } String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY); if (StringUtils.isEmpty(hostToRegistry)) { hostToRegistry = NetUtils.getLocalHost(); } else if (isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException ("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry); } map.put(REGISTER_IP_KEY, hostToRegistry); serviceMetadata.getAttachments().putAll(map); ref = createProxy(map); serviceMetadata.setTarget(ref); serviceMetadata.addAttribute(PROXY_CLASS_REF, ref); ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey()); consumerModel.setProxyObject(ref); consumerModel.init(attributes); initialized = true ; checkInvokerAvailable(); dispatch(new ReferenceConfigInitializedEvent (this , invoker)); }
其中的核心方法createProxy(),用于将map的中信息解析,构建一个动态代理。
动态代理类的生成 需要注意的点:
如果有多个注册中心
服务地址的发现
远程连接的建立
public synchronized void init () { if (initialized) { return ; } if (bootstrap == null ) { bootstrap = DubboBootstrap.getInstance(); bootstrap.init(); } checkAndUpdateSubConfigs(); checkStubAndLocal(interfaceClass); ConfigValidationUtils.checkMock(interfaceClass, this ); Map<String, String> map = new HashMap <String, String>(); map.put(SIDE_KEY, CONSUMER_SIDE); ReferenceConfigBase.appendRuntimeParameters(map); if (!ProtocolUtils.isGeneric(generic)) { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0 ) { map.put(REVISION_KEY, revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0 ) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet <String>(Arrays.asList(methods)), COMMA_SEPARATOR)); } } map.put(INTERFACE_KEY, interfaceName); AbstractConfig.appendParameters(map, getMetrics()); AbstractConfig.appendParameters(map, getApplication()); AbstractConfig.appendParameters(map, getModule()); AbstractConfig.appendParameters(map, consumer); AbstractConfig.appendParameters(map, this ); MetadataReportConfig metadataReportConfig = getMetadataReportConfig(); if (metadataReportConfig != null && metadataReportConfig.isValid()) { map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE); } Map<String, AsyncMethodInfo> attributes = null ; if (CollectionUtils.isNotEmpty(getMethods())) { attributes = new HashMap <>(); for (MethodConfig methodConfig : getMethods()) { AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName()); String retryKey = methodConfig.getName() + ".retry" ; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false" .equals(retryValue)) { map.put(methodConfig.getName() + ".retries" , "0" ); } } AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig); if (asyncMethodInfo != null ) { attributes.put(methodConfig.getName(), asyncMethodInfo); } } } String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY); if (StringUtils.isEmpty(hostToRegistry)) { hostToRegistry = NetUtils.getLocalHost(); } else if (isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException ("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry); } map.put(REGISTER_IP_KEY, hostToRegistry); serviceMetadata.getAttachments().putAll(map); ref = createProxy(map); serviceMetadata.setTarget(ref); serviceMetadata.addAttribute(PROXY_CLASS_REF, ref); ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey()); consumerModel.setProxyObject(ref); consumerModel.init(attributes); initialized = true ; checkInvokerAvailable(); dispatch(new ReferenceConfigInitializedEvent (this , invoker)); } @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) private T createProxy (Map<String, String> map) { if (shouldJvmRefer(map)) { URL url = new URL (LOCAL_PROTOCOL, LOCALHOST_VALUE, 0 , interfaceClass.getName()).addParameters(map); invoker = REF_PROTOCOL.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { urls.clear(); if (url != null && url.length() > 0 ) { String[] us = SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0 ) { for (String u : us) { URL url = URL.valueOf(u); if (StringUtils.isEmpty(url.getPath())) { url = url.setPath(interfaceName); } if (UrlUtils.isRegistry(url)) { urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) { checkRegistry(); List<URL> us = ConfigValidationUtils.loadRegistries(this , false ); if (CollectionUtils.isNotEmpty(us)) { for (URL u : us) { URL monitorUrl = ConfigValidationUtils.loadMonitor(this , u); if (monitorUrl != null ) { map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException ("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config." ); } } } if (urls.size() == 1 ) { invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0 )); } else { List<Invoker<?>> invokers = new ArrayList <Invoker<?>>(); URL registryURL = null ; for (URL url : urls) { invokers.add(REF_PROTOCOL.refer(interfaceClass, url)); if (UrlUtils.isRegistry(url)) { registryURL = url; } } if (registryURL != null ) { String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME); invoker = Cluster.getCluster(cluster, false ).join(new StaticDirectory (registryURL, invokers)); } else { String cluster = CollectionUtils.isNotEmpty(invokers) ? (invokers.get(0 ).getUrl() != null ? invokers.get(0 ).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT) : Cluster.DEFAULT; invoker = Cluster.getCluster(cluster).join(new StaticDirectory (invokers)); } } } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } String metadata = map.get(METADATA_KEY); WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata); if (metadataService != null ) { URL consumerURL = new URL (CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0 , map.get(INTERFACE_KEY), map); metadataService.publishServiceDefinition(consumerURL); } return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic)); }
最终调用的PROXY_FACTORY.getProxy
实际执行的是JavassistProxyFactory中的getProxy
方法,其作用就是帮助我们生成reference注解所修饰的内容的一个动态代理类。
@Override @SuppressWarnings("unchecked") public <T> T getProxy (Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler (invoker)); }
还有一个问题,getProxy方法中的invoker对象是什么?
invoker可以看作一个调用时所需要的一个对象工具,我们并不会给他赋予真正的含义,而是根据上下文赋予其属于该上下文环境中的含义。例如init方法中的invoker,就可以看作与通信有关的invoker。.
invoker的创建:
发现refer被@Adaptive注解所修饰
@Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
如果interfaceClass, urls.get(0)
是一个类,那么返回的就是当前类,如果不是一个类,就会返回一个动态代理类。
RegistryProtocol实现类
@Override @SuppressWarnings("unchecked") public <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException { url = getRegistryUrl(url); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0 ) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*" .equals(group)) { return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url); } } Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY)); return doRefer(cluster, registry, type, url); }
doRefer方法
private <T> Invoker<T> doRefer (Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory <T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap <String, String>(directory.getConsumerUrl().getParameters()); URL subscribeUrl = new URL (CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0 , type.getName(), parameters); if (directory.isShouldRegister()) { directory.setRegisteredConsumerUrl(subscribeUrl); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); directory.subscribe(toSubscribeUrl(subscribeUrl)); Invoker<T> invoker = cluster.join(directory); List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url); if (CollectionUtils.isEmpty(listeners)) { return invoker; } RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper <>(directory, cluster, invoker); for (RegistryProtocolListener listener : listeners) { listener.onRefer(this , registryInvokerWrapper); } return registryInvokerWrapper; }
invoker的使用:
invoker的执行是在JavassistProxyFactory中的getProxy
方法
服务端代理生成的过程
生成动态代理
构建一个invoker代表服务消费端的调用器
服务消费启动 在doRefer中有一个订阅一系列结点数据的方法subscribe方法。
每一个注册中心都会有对应的实现类doSubscribe方法来完成实际的业务。
当zookeeper的结点信息发生变化时,就会发起通知。
费者建立连接并调用