feign的bean创建过程-底层请求过程-源码走读

feign的bean创建过程-底层请求过程-源码走读

八月 10, 2025

feign的bean创建过程,请求过程-源码走读

目标

  1. 了解feign的bean的创建过程
  2. 了解feign发送请求时的过程

前置知识:了解spring的bean创建过程

feign的版本是spring-cloud-openfeign-core:4.1.2

采用的nacos做注册中心

项目需要配置 spring.cloud.loadbalancer.nacos.enabled=true

bean的创建过程

一般我们会通过注解
@EnableFeignClients(basePackages = {“org.example.**.feign”})
指定扫描的bean
启动项目之后,就会在对应目录下面扫描出来,feign是采用factoryBean的形式创建bean的,所以断点直接打到FeignClientFactoryBean.getObject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
   @Override
public Object getObject() {
return getTarget();
}

/**
* @param <T> the target type of the Feign client
* @return a {@link Feign} client created with the specified data and the context
* information
*/
@SuppressWarnings("unchecked")
<T> T getTarget() {
FeignClientFactory feignClientFactory = beanFactory != null ? beanFactory.getBean(FeignClientFactory.class)
: applicationContext.getBean(FeignClientFactory.class);
Feign.Builder builder = feign(feignClientFactory);
if (!StringUtils.hasText(url) && !isUrlAvailableInConfig(contextId)) { // 没有指定url的情况,feign要自己拼

if (LOG.isInfoEnabled()) {
LOG.info("For '" + name + "' URL not provided. Will try picking an instance via load-balancing.");
}
if (!name.startsWith("http://") && !name.startsWith("https://")) {
url = "http://" + name;
}
else {
url = name;
}
url += cleanPath();
return (T) loadBalance(builder, feignClientFactory, new HardCodedTarget<>(type, name, url)); // 没有写死url,选择负载均衡,进入这里看一下怎么创建的
}
if (StringUtils.hasText(url) && !url.startsWith("http://") && !url.startsWith("https://")) {
url = "http://" + url;
}
Client client = getOptional(feignClientFactory, Client.class);
if (client != null) {
if (client instanceof FeignBlockingLoadBalancerClient) {
// not load balancing because we have a url,
// but Spring Cloud LoadBalancer is on the classpath, so unwrap
client = ((FeignBlockingLoadBalancerClient) client).getDelegate();
}
if (client instanceof RetryableFeignBlockingLoadBalancerClient) {
// not load balancing because we have a url,
// but Spring Cloud LoadBalancer is on the classpath, so unwrap
client = ((RetryableFeignBlockingLoadBalancerClient) client).getDelegate();
}
builder.client(client);
}

applyBuildCustomizers(feignClientFactory, builder);

Targeter targeter = get(feignClientFactory, Targeter.class);
return targeter.target(this, builder, feignClientFactory, resolveTarget(feignClientFactory, contextId, url));
}

loadBalance 里面实际创建是最后一步调用target函数,一步一步点进去之后,会发现是采用jdk代理创建的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
protected <T> T loadBalance(Feign.Builder builder, FeignClientFactory context, HardCodedTarget<T> target) {
Client client = getOptional(context, Client.class);
if (client != null) {
builder.client(client);
applyBuildCustomizers(context, builder);
Targeter targeter = get(context, Targeter.class);
return targeter.target(this, builder, context, target);
}
}
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignClientFactory context,
Target.HardCodedTarget<T> target) {
return feign.target(target);
}

public <T> T target(Target<T> target) {
return build().newInstance(target); // build 函数内部会注册 ResponseHandler,里面有编码器,解码器,日志,interceptor等东西
}

// biuld->internalBuild
public Feign internalBuild() {
final ResponseHandler responseHandler =
new ResponseHandler(logLevel, logger, decoder, errorDecoder,
dismiss404, closeAfterDecode, decodeVoid, responseInterceptorChain());
MethodHandler.Factory<Object> methodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors,
responseHandler, logger, logLevel, propagationPolicy,
new RequestTemplateFactoryResolver(encoder, queryMapEncoder),
options);
return new ReflectiveFeign<>(contract, methodHandlerFactory, invocationHandlerFactory,
() -> null);
}
// 回到instance

public <T> T newInstance(Target<T> target) {
return newInstance(target, defaultContextSupplier.newContext());
}

public <T> T newInstance(Target<T> target, C requestContext) {
TargetSpecificationVerifier.verify(target);

Map<Method, MethodHandler> methodToHandler =
targetToHandlersByName.apply(target, requestContext);
InvocationHandler handler = factory.create(target, methodToHandler); // 熟悉的InvocationHandler,jdk动态代理的写法
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
new Class<?>[] {target.type()}, handler);

for (MethodHandler methodHandler : methodToHandler.values()) {
if (methodHandler instanceof DefaultMethodHandler) {
((DefaultMethodHandler) methodHandler).bindTo(proxy);
}
}

return proxy;
}

InvocationHandler 是一个内部类实现的 FeignInvocationHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static class FeignInvocationHandler implements InvocationHandler {

private final Target target;
private final Map<Method, MethodHandler> dispatch;

FeignInvocationHandler(Target target, Map<Method, MethodHandler> dispatch) {
this.target = checkNotNull(target, "target");
this.dispatch = checkNotNull(dispatch, "dispatch for %s", target);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
} else if (!dispatch.containsKey(method)) {
throw new UnsupportedOperationException(
String.format("Method \"%s\" should not be called", method.getName()));
}

return dispatch.get(method).invoke(args); // dispatch是一个map,key是我们的远程调用函数,value是SynchronousMethodHandler,也就是实际执行时候要调用的函数,在第二节执行的时候会展开
}

@Override
public boolean equals(Object obj) {
if (obj instanceof FeignInvocationHandler) {
FeignInvocationHandler other = (FeignInvocationHandler) obj;
return target.equals(other.target);
}
return false;
}

@Override
public int hashCode() {
return target.hashCode();
}

@Override
public String toString() {
return target.toString();
}
}

feign的调用过程

直接从方法入口进入

1
2
3
4
5
6
7
8
9
10
11
@FeignClient(value = "database", contextId = "db")
public interface DatabaseFeign {

@GetMapping(value = Constant.FEIGN_URL_PREFIX + "/dbInfo/{id}")
Response<DbInfo> getDbInfo(@PathVariable("id") Integer id);
}

@GetMapping("/v1")
public Response<DbInfo> v1() {
return databaseFeign.getDbInfo(1);
}

第一节提到,代理是jdk动态代理,所以预期应该是invocationHandler里面执行,直接把断点打到FeignInvocationHandler的invoke上面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static class FeignInvocationHandler implements InvocationHandler {

private final Target target;
private final Map<Method, MethodHandler> dispatch;

FeignInvocationHandler(Target target, Map<Method, MethodHandler> dispatch) {
this.target = checkNotNull(target, "target");
this.dispatch = checkNotNull(dispatch, "dispatch for %s", target);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
} else if (!dispatch.containsKey(method)) {
throw new UnsupportedOperationException(
String.format("Method \"%s\" should not be called", method.getName()));
}

return dispatch.get(method).invoke(args); // 正常肯定走到这里
}

@Override
public boolean equals(Object obj) {
if (obj instanceof FeignInvocationHandler) {
FeignInvocationHandler other = (FeignInvocationHandler) obj;
return target.equals(other.target);
}
return false;
}

@Override
public int hashCode() {
return target.hashCode();
}

@Override
public String toString() {
return target.toString();
}
}

dispatch 是一个map,里面存储的是方法对应的SynchronousMethodHandler,都是和http请求相关的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final class SynchronousMethodHandler implements MethodHandler {

private final MethodMetadata metadata;
private final Target<?> target;
private final Client client;
private final Retryer retryer;
private final List<RequestInterceptor> requestInterceptors;
private final Logger logger;
private final Logger.Level logLevel;
private final RequestTemplate.Factory buildTemplateFromArgs;
private final Options options;
private final ExceptionPropagationPolicy propagationPolicy;
private final ResponseHandler responseHandler;
// ...
@Override
public Object invoke(Object[] argv) throws Throwable { // 上面的代码,接下来进入invoke
RequestTemplate template = buildTemplateFromArgs.create(argv); // 把参数填到url路径里面
Options options = findOptions(argv); // 1. 参数,包含超时时间等信息,可以写在函数参数里面
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template, options); // 3.
} catch (RetryableException e) {
try { // 2. 失败自动重试,重试取决于retryer,如果超过重试次数就直接抛异常了,不会重新循环,如果不配置是不重试的,会抛出异常
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
}

1中 options可以这么实现

1
2
@GetMapping(value = Constant.FEIGN_URL_PREFIX + "/dbInfo/{id}")
Response<DbInfo> getDbInfo(@PathVariable("id") Integer id, Request.Options options);

2 中的重试规则,默认不重试

1
2
3
4
5
6
7
8
9
Retryer NEVER_RETRY = new Retryer() {
public void continueOrPropagate(RetryableException e) {
throw e;
}

public Retryer clone() {
return this;
}
};

抛出 RetryableException 有两个位置 ioExecption 和 返回的resp header是否有 Retry-After 字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private CompletableFuture<Object> executeAndDecode(RequestTemplate template, Options options) {
Request request = targetRequest(template);

if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}

long start = System.nanoTime();
return client.execute(request, options, Optional.ofNullable(requestContext))
.thenApply(response ->
// TODO: remove in Feign 12
ensureRequestIsSet(response, template, request))
.exceptionally(throwable -> {
CompletionException completionException = throwable instanceof CompletionException
? (CompletionException) throwable
: new CompletionException(throwable);
if (completionException.getCause() instanceof IOException) {
IOException ioException = (IOException) completionException.getCause();
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, ioException,
elapsedTime(start));
}

throw errorExecuting(request, ioException); // 这里会包装成RetryableException再抛出
} else {
throw completionException;
}
})
.thenCompose(response -> handleResponse(response, elapsedTime(start)));
}

这是ErrorDecoder的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Exception decode(String methodKey, Response response) {
FeignException exception = errorStatus(methodKey, response, maxBodyBytesLength,
maxBodyCharsLength);
Long retryAfter = retryAfterDecoder.apply(firstOrNull(response.headers(), RETRY_AFTER)); // RETRY_AFTER 是http头里面的字符串 Retry-After,有这个字符串就抛出 RetryableException
if (retryAfter != null) {
return new RetryableException(
response.status(),
exception.getMessage(),
response.request().httpMethod(),
exception,
retryAfter,
response.request());
}
return exception;
}

来看3中executeAndDecode请求是如何执行的

executeAndDecode 执行请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
Request request = targetRequest(template); // 主要是遍历RequestInterceptor,装填interceptor,可以实现这个接口,对请求头加一些操作

if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}

Response response;
long start = System.nanoTime();
try {
response = client.execute(request, options); // 这里就开始执行请求了
// ensure the request is set. TODO: remove in Feign 12
response = response.toBuilder()
.request(request)
.requestTemplate(template)
.build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}

long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
return responseHandler.handleResponse(
metadata.configKey(), response, metadata.returnType(), elapsedTime);
}

execute的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public Response execute(Request request, Request.Options options) throws IOException {
final URI originalUri = URI.create(request.url());
String serviceId = originalUri.getHost();
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(serviceId,
loadBalancerClient); // 这里创建的是 BlockingLoadBalancedRetryPolicy,这里面代码不多,简单来说就是,默认get肯定重试,或者如果配置文件指定哪些请求可以重试。哪些状态码和异常可以重试。重试次数通过配置文件的次数限制。
RetryTemplate retryTemplate = buildRetryTemplate(serviceId, request, retryPolicy);
return retryTemplate.execute(context -> {
Request feignRequest = null;
ServiceInstance retrievedServiceInstance = null;
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RetryableRequestContext.class, ResponseData.class, ServiceInstance.class); // 返回空
String hint = getHint(serviceId);
DefaultRequest<RetryableRequestContext> lbRequest = new DefaultRequest<>(
new RetryableRequestContext(null, buildRequestData(request), hint));
// On retries the policy will choose the server and set it in the context
// and extract the server and update the request being made
if (context instanceof LoadBalancedRetryContext lbContext) {
retrievedServiceInstance = lbContext.getServiceInstance();
if (retrievedServiceInstance == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Service instance retrieved from LoadBalancedRetryContext: was null. "
+ "Reattempting service instance selection");
}
ServiceInstance previousServiceInstance = lbContext.getPreviousServiceInstance(); // 这里没有值
lbRequest.getContext().setPreviousServiceInstance(previousServiceInstance);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest)); // supportedLifecycleProcessors 是空的
retrievedServiceInstance = loadBalancerClient.choose(serviceId, lbRequest); // 最终在这里获取实例
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Selected service instance: %s", retrievedServiceInstance));
}
lbContext.setServiceInstance(retrievedServiceInstance);
}

if (retrievedServiceInstance == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("Service instance was not resolved, executing the original request");
}
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
retrievedServiceInstance);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RetryableRequestContext>(
CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
feignRequest = request;
}
else {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Using service instance from LoadBalancedRetryContext: %s",
retrievedServiceInstance));
}
String reconstructedUrl = loadBalancerClient.reconstructURI(retrievedServiceInstance, originalUri)
.toString();
feignRequest = buildRequest(request, reconstructedUrl, retrievedServiceInstance);
}
}
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
retrievedServiceInstance);
LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);
Response response = LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(delegate, options,
feignRequest, lbRequest, lbResponse, supportedLifecycleProcessors,
retrievedServiceInstance != null);
int responseStatus = response.status();
if (retryPolicy != null && retryPolicy.retryableStatusCode(responseStatus)) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Retrying on status code: %d", responseStatus));
}
byte[] byteArray = response.body() == null ? new byte[] {}
: StreamUtils.copyToByteArray(response.body().asInputStream());
response.close();
throw new LoadBalancerResponseStatusCodeException(serviceId, response, byteArray,
URI.create(request.url()));
}
return response;
}, new LoadBalancedRecoveryCallback<Response, Response>() {
@Override
protected Response createResponse(Response response, URI uri) {
return response;
}
});
}

loadBalancer.choose的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId); // 这里面会返回NacosLoadBalancer
if (loadBalancer == null) {
return null;
}
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block(); // block表示同步的,同步的从loadBalancer,当前是Nacos,里面获取一个可用的实例,看一下choose
if (loadBalancerResponse == null) {
return null;
}
return loadBalancerResponse.getServer();
}
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next().map(this::getInstanceResponse);
}

getInstanceResponse会从多个节点里面选一个出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private Response<ServiceInstance> getInstanceResponse(
List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}

try {
String clusterName = this.nacosDiscoveryProperties.getClusterName();

List<ServiceInstance> instancesToChoose = serviceInstances; // 这里拿到的是所有的实例
if (StringUtils.isNotBlank(clusterName)) { // 通过配置 clusterName,会优先调用clusterName一样的集群里面的机器
List<ServiceInstance> sameClusterInstances = serviceInstances.stream()
.filter(serviceInstance -> {
String cluster = serviceInstance.getMetadata()
.get("nacos.cluster");
return StringUtils.equals(cluster, clusterName);
}).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(sameClusterInstances)) {
instancesToChoose = sameClusterInstances;
}
}
else { // 跨集群调用,打个日志
log.warn(
"A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}",
serviceId, clusterName, serviceInstances);
}
instancesToChoose = this.filterInstanceByIpType(instancesToChoose);

ServiceInstance instance = NacosBalancer
.getHostByRandomWeight3(instancesToChoose); // 通过这个算法,选出一个可用的节点。首先判断节点是否健康,健康才能选,然后从健康的里面,根据权重,随机选一个

return new DefaultResponse(instance);
}
catch (Exception e) {
log.warn("NacosLoadBalancer error", e);
return null;
}
}

然后我们退回发送请求的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public Response execute(Request request, Request.Options options) throws IOException {
final URI originalUri = URI.create(request.url());
String serviceId = originalUri.getHost();
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(serviceId,
loadBalancerClient);
RetryTemplate retryTemplate = buildRetryTemplate(serviceId, request, retryPolicy);
return retryTemplate.execute(context -> {
Request feignRequest = null;
ServiceInstance retrievedServiceInstance = null;
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RetryableRequestContext.class, ResponseData.class, ServiceInstance.class);
String hint = getHint(serviceId);
DefaultRequest<RetryableRequestContext> lbRequest = new DefaultRequest<>(
new RetryableRequestContext(null, buildRequestData(request), hint));
// On retries the policy will choose the server and set it in the context
// and extract the server and update the request being made
if (context instanceof LoadBalancedRetryContext lbContext) {
retrievedServiceInstance = lbContext.getServiceInstance();
if (retrievedServiceInstance == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Service instance retrieved from LoadBalancedRetryContext: was null. "
+ "Reattempting service instance selection");
}
ServiceInstance previousServiceInstance = lbContext.getPreviousServiceInstance();
lbRequest.getContext().setPreviousServiceInstance(previousServiceInstance);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
retrievedServiceInstance = loadBalancerClient.choose(serviceId, lbRequest); // 从这里退出来的,现在拿到实例了,里面有ip和端口,就能发请求了
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Selected service instance: %s", retrievedServiceInstance));
}
lbContext.setServiceInstance(retrievedServiceInstance);
}

if (retrievedServiceInstance == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("Service instance was not resolved, executing the original request");
}
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
retrievedServiceInstance);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RetryableRequestContext>(
CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
feignRequest = request;
}
else {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Using service instance from LoadBalancedRetryContext: %s",
retrievedServiceInstance));
}
String reconstructedUrl = loadBalancerClient.reconstructURI(retrievedServiceInstance, originalUri)
.toString(); // 重新编写url,把ip和端口加进去
feignRequest = buildRequest(request, reconstructedUrl, retrievedServiceInstance); // 根据url构建request
}
}
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
retrievedServiceInstance);
LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);
Response response = LoadBalancerUtils.executeWithLoadBalancerLifecycleProcessing(delegate, options,
feignRequest, lbRequest, lbResponse, supportedLifecycleProcessors,
retrievedServiceInstance != null); // 实际去发送请求
int responseStatus = response.status();
if (retryPolicy != null && retryPolicy.retryableStatusCode(responseStatus)) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Retrying on status code: %d", responseStatus));
}
byte[] byteArray = response.body() == null ? new byte[] {}
: StreamUtils.copyToByteArray(response.body().asInputStream());
response.close();
throw new LoadBalancerResponseStatusCodeException(serviceId, response, byteArray,
URI.create(request.url()));
}
return response;
}, new LoadBalancedRecoveryCallback<Response, Response>() {
@Override
protected Response createResponse(Response response, URI uri) {
return response;
}
});
}

最后就是发送请求了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static Response executeWithLoadBalancerLifecycleProcessing(Client feignClient, Request.Options options,
Request feignRequest, org.springframework.cloud.client.loadbalancer.Request lbRequest,
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse,
Set<LoadBalancerLifecycle> supportedLifecycleProcessors, boolean loadBalanced) throws IOException {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, lbResponse));
try {
Response response = feignClient.execute(feignRequest, options); // 发送请求
if (loadBalanced) {
supportedLifecycleProcessors.forEach(
lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
lbRequest, lbResponse, buildResponseData(response))));
}
return response;
}
catch (Exception exception) {
if (loadBalanced) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, lbResponse)));
}
throw exception;
}
}
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection, request);
}
HttpURLConnection convertAndSend(Request request, Options options) throws IOException {
final URL url = new URL(request.url());
final HttpURLConnection connection = this.getConnection(url);
//......
}

最后的convertAndSend就不仔细看了,目前代码可知,使用的是java.net的发送方式。如果用httpclient,feign会自动装配,只需要引入依赖即可。

总结and面试

首先是feign要初始化bean,EnableFeignClients要写上feign在哪些包里面,扫描出来。
feign的bean是用FeignClientFactoryBean实现的,听名字就知道这实现了FactoryBean接口,里面getObject实现的很复杂。里面需要设置请求url,设置负载均衡的bean等信息。然后会用jdk代理的方式,把这个feign的bean执行代理。实现一个FeignInvocationHandler,重写invoke方法,把远程调用这个过程通过代理实现。
然后在请求的时候,因为要走代理,代理里面实现了如下几个内容。一个是参数信息,比如请求超时时间,这个可以通过函数参数传进来,然后这些参数会封装到请求里面。代理还帮忙做了一些重试,默认是不重试的。
发送请求的位置在executeAndDecode函数。我们可以实现 RequestInterceptor 接口,发送请求的时候,会执行这个interceptor,对header做一些处理。然后因为一般微服务都是不直接写ip的,要向配置中心获取实例地址。假设配置中心是nacos,向nacos请求获得所有的实例,然后nacos会有算法,从健康的实例里面选出一个实例,这样就有ip端口了,就可以得到完整的url。然后在编码一下请求体,就可以发送请求了。