Spring Cloud Eureka 源码分析 —— Client端

一. 前言

eureka的client端主要完成几件事情:

  1. 服务实例的注册
  2. 服务实例的续约
  3. 拉取server端的注册表

整个源码有几个重点类值得关注:

类名 说明
EurekaClientConfigBean 配置参数的bean,保存eureka.client.xxx的所有参数
EurekaInstanceConfigBean 配置参数的bean,保存eureka.instance.xxx的所有参数
InstanceInfo 服务实例信息的封装类
ApplicationInfoManager 封装注册到eureka的应用信息,包括实例信息,参数配置,实例状态等,并管理实例的各种操作
CloudEurekaClient 发起操作的客户端类,如register、fetch、heartbeat

二. 源码分析

1. 启动配置bean注入

EurekaClientAutoConfiguration

EurekaClientConfigBean
    @Bean
    @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
    public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
        return new EurekaClientConfigBean();
    }

EurekaClientConfigBean 对应的配置前缀是eureka.client,从配置文件读取的配置会保存到此bean中

常用配置:
参数 说明
defaultZone 默认zone的地址
transport 各种通信配置
registryFetchIntervalSeconds 每次从eureka服务端拉取服务列表的时间间隔(默认30s)
instanceInfoReplicationIntervalSeconds 复制实例变化信息到eureka服务端的时间间隔(默认30s)
initialInstanceInfoReplicationIntervalSeconds 初始时复制实例信息到eureka服务端所需的时间(默认40s)
eurekaServiceUrlPollIntervalSeconds 拉取eureka服务端地址更改的时间间隔(默认300s)
registerWithEureka 本实例是否注册到eureka服务端
EurekaInstanceConfigBean
    @Bean
    @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
    public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
            ManagementMetadataProvider managementMetadataProvider) {
        String hostname = getProperty("eureka.instance.hostname");
        boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
        String ipAddress = getProperty("eureka.instance.ip-address");
        boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));

        String serverContextPath = env.getProperty("server.servlet.context-path", "/");
        int serverPort = Integer.parseInt(env.getProperty("server.port", env.getProperty("port", "8080")));

        Integer managementPort = env.getProperty("management.server.port", Integer.class);
        String managementContextPath = env.getProperty("management.server.servlet.context-path");
        Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);
        EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);

        instance.setNonSecurePort(serverPort);
        instance.setInstanceId(getDefaultInstanceId(env));
        instance.setPreferIpAddress(preferIpAddress);
        instance.setSecurePortEnabled(isSecurePortEnabled);
        if (StringUtils.hasText(ipAddress)) {
            instance.setIpAddress(ipAddress);
        }

        if (isSecurePortEnabled) {
            instance.setSecurePort(serverPort);
        }

        if (StringUtils.hasText(hostname)) {
            instance.setHostname(hostname);
        }
        String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
        String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");

        if (StringUtils.hasText(statusPageUrlPath)) {
            instance.setStatusPageUrlPath(statusPageUrlPath);
        }
        if (StringUtils.hasText(healthCheckUrlPath)) {
            instance.setHealthCheckUrlPath(healthCheckUrlPath);
        }

        ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath,
                managementContextPath, managementPort);

        if (metadata != null) {
            instance.setStatusPageUrl(metadata.getStatusPageUrl());
            instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
            if (instance.isSecurePortEnabled()) {
                instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
            }
            Map<String, String> metadataMap = instance.getMetadataMap();
            metadataMap.computeIfAbsent("management.port", k -> String.valueOf(metadata.getManagementPort()));
        }
        else {
            // without the metadata the status and health check URLs will not be set
            // and the status page and health check url paths will not include the
            // context path so set them here
            if (StringUtils.hasText(managementContextPath)) {
                instance.setHealthCheckUrlPath(managementContextPath + instance.getHealthCheckUrlPath());
                instance.setStatusPageUrlPath(managementContextPath + instance.getStatusPageUrlPath());
            }
        }

        setupJmxPort(instance, jmxPort);
        return instance;
    }

EurekaInstanceConfigBean 对应的配置前缀是eureka.instance,从配置文件读取的实例信息配置会保存到此bean中

常用配置:
参数 说明
instanceId 注册到eureka服务端实例id,默认是ip:服务名:端口
hostname 注册的主机名, 默认使用主机名注册
preferIpAddress 是否开启ip注册
ipAddress 注册的ip地址
leaseRenewalIntervalInSeconds 向eureka服务端续约的时间间隔(默认30s)
leaseExpirationDurationInSeconds 向eureka服务端设置续约到期的时间间隔(默认90s),超过此间隔服务端会下掉该服务
initialStatus 注册到eureka服务端的初始化状态,默认UP
EurekaServiceRegistry
    @Bean
    public EurekaServiceRegistry eurekaServiceRegistry() {
        return new EurekaServiceRegistry();
    }

EurekaServiceRegistry 实现了ServiceRegistry,是实例注册的具体实现类,内部通过register完成服务注册事件的发送动作。

EurekaRegistration
    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
            CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager,
            @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
        return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient)
                    .with(healthCheckHandler).build();
    }

EurekaRegistration 是真正被注册的对象,内部封装了客户端所有的注册信息,是org.springframework.cloud.client.serviceregistry.ServiceInstance此标准的具体实现类

ApplicationInfoManager
    @Bean
    @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
    public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
        InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
        return new ApplicationInfoManager(config, instanceInfo);
    }
public InstanceInfo create(EurekaInstanceConfig config) {
        LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
                .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
                .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());

        // Builder the instance information to be registered with eureka
        // server
        InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();

        String namespace = config.getNamespace();
        if (!namespace.endsWith(".")) {
            namespace = namespace + ".";
        }
        builder.setNamespace(namespace).setAppName(config.getAppname()).setInstanceId(config.getInstanceId())
                .setAppGroupName(config.getAppGroupName()).setDataCenterInfo(config.getDataCenterInfo())
                .setIPAddr(config.getIpAddress()).setHostName(config.getHostName(false))
                .setPort(config.getNonSecurePort())
                .enablePort(InstanceInfo.PortType.UNSECURE, config.isNonSecurePortEnabled())
                .setSecurePort(config.getSecurePort())
                .enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
                .setVIPAddress(config.getVirtualHostName()).setSecureVIPAddress(config.getSecureVirtualHostName())
                .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
                .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
                .setHealthCheckUrls(config.getHealthCheckUrlPath(), config.getHealthCheckUrl(),
                        config.getSecureHealthCheckUrl())
                .setASGName(config.getASGName());

        // Start off with the STARTING state to avoid traffic
        if (!config.isInstanceEnabledOnit()) {
            InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING;
            if (log.isInfoEnabled()) {
                log.info("Setting initial instance status as: " + initialStatus);
            }
            builder.setStatus(initialStatus);
        }
        else {
            if (log.isInfoEnabled()) {
                log.info("Setting initial instance status as: " + InstanceInfo.InstanceStatus.UP
                        + ". This may be too early for the instance to advertise itself as available. "
                        + "You would instead want to control this via a healthcheck handler.");
            }
        }

        // Add any user-specific metadata information
        for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
            String key = mapEntry.getKey();
            String value = mapEntry.getValue();
            // only add the metadata if the value is present
            if (value != null && !value.isEmpty()) {
                builder.add(key, value);
            }
        }

        InstanceInfo instanceInfo = builder.build();
        instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
        return instanceInfo;
    }

首先创建了初始化状态的InstanceInfo,根据配置对InstanceInfo的大量属性赋值。
构建ApplicationInfoManager内部封装了注册到eureka的应用信息,包括实例信息,参数配置,实例状态等。
InstanceInfo对象也会被设置到到ApplicationInfoManager中。

CloudEurekaClient
        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
                EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
            // If we use the proxy of the ApplicationInfoManager we could run into a problem
            // when shutdown is called on the CloudEurekaClient where the ApplicationInfoManager bean is
            // requested but wont be allowed because we are shutting down. To avoid this
            // we use the object directly.
            ApplicationInfoManager appManager;
            if (AopUtils.isAopProxy(manager)) {
                appManager = ProxyUtils.getTargetObject(manager);
            }
            else {
                appManager = manager;
            }
            CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
                    this.context);
            cloudEurekaClient.registerHealthCheck(healthCheckHandler);
            return cloudEurekaClient;
        }

这个bean非常重要,在内部完成了几乎所有客户端操作的定义,是发起操作的客户端类。重点在构造方法中实现,下文会具体分析。

2. SmartLifecycle的启动点

eureka客户端的注册动作,通过SmartLifecycle来触发,对应的实现类是EurekaAutoServiceRegistration

public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener {

    private static final Log log = LogFactory.getLog(EurekaAutoServiceRegistration.class);

    private AtomicBoolean running = new AtomicBoolean(false);

    private int order = 0;

    private AtomicInteger port = new AtomicInteger(0);

    private ApplicationContext context;

    private EurekaServiceRegistry serviceRegistry;

    private EurekaRegistration registration;

    public EurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry serviceRegistry,
            EurekaRegistration registration) {
        this.context = context;
        this.serviceRegistry = serviceRegistry;
        this.registration = registration;
    }

    @Override
    public void start() {
        ...
        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        // getNonSecurePort() 就是项目的启动端口
        if (!this.running.get() && this.registration.getNonSecurePort() > 0) {

            this.serviceRegistry.register(this.registration);

            this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
            this.running.set(true);
        }
    }

直接看start方法,根据初始化参数,直接进入if方法。有两个关键方法,分别是register()开始注册, publishEvent 发布当前实例注册的事件。

EurekaServiceRegistry

public class EurekaServiceRegistry implements ServiceRegistry<EurekaRegistration> {

    private static final Log log = LogFactory.getLog(EurekaServiceRegistry.class);

    @Override
    public void register(EurekaRegistration reg) {
        maybeInitializeClient(reg);

        if (log.isInfoEnabled()) {
            log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
                    + " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
        }

        reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

        reg.getHealthCheckHandler()
                .ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
    }

    // 这个方法的实际作用是让ApplicationInfoManager和CloudEurekaClient被使用,来触发bean注入,这两个bean都是lazy的
    private void maybeInitializeClient(EurekaRegistration reg) {
        // force initialization of possibly scoped proxies
        reg.getApplicationInfoManager().getInfo();
        reg.getEurekaClient().getApplications();
    }

maybeInitializeClient 触发了后续两个重要的bean:ApplicationInfoManagerEurekaClient的实例化过程。
ApplicationInfoManager存放了InstanceInfo的信息和InstanceConfig的所有配置信息。
EurekaClient(CloudEurekaClient) 在构造过程中,完成了整个客户端的注册、向服务端进行数据同步,schedule任务的定义和开启。

依次介绍这两个类的创建过程。

new ApplicationInfoManager()

bean的定义在上面的AutoConfiguration中

    @Inject
    public ApplicationInfoManager(EurekaInstanceConfig config, InstanceInfo instanceInfo, OptionalArgs optionalArgs) {
        this.config = config;
        this.instanceInfo = instanceInfo;
        this.listeners = new ConcurrentHashMap<String, StatusChangeListener>();
        if (optionalArgs != null) {
            this.instanceStatusMapper = optionalArgs.getInstanceStatusMapper();
        } else {
            this.instanceStatusMapper = NO_OP_MAPPER;
        }

        // Hack to allow for getInstance() to use the DI'd ApplicationInfoManager
        instance = this;
    }

这里首先通过工厂创建了一个InstanceInfo的对象,create方法将EurekaInstanceConfig里的所有配置通过builder模式赋值到InstanceInfo中。
并在内部初始化了两个属性,listeners和instanceStatusMapper。

new CloudEurekaClient()

bean的定义在上面的AutoConfiguration中

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config,
            AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {
        super(applicationInfoManager, config, args);
        this.applicationInfoManager = applicationInfoManager;
        this.publisher = publisher;
        this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
        ReflectionUtils.makeAccessible(this.eurekaTransportField);
    }

先调用父类的super构造方法

public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
        this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
            private volatile BackupRegistry backupRegistryInstance;

            @Override
            public synchronized BackupRegistry get() {
                if (backupRegistryInstance == null) {
                    String backupRegistryClassName = config.getBackupRegistryImpl();
                    if (null != backupRegistryClassName) {
                        try {
                            backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                            logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                        } catch (InstantiationException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (IllegalAccessException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (ClassNotFoundException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        }
                    }

                    if (backupRegistryInstance == null) {
                        logger.warn("Using default backup registry implementation which does not do anything.");
                        backupRegistryInstance = new NotImplementedRegistryImpl();
                    }
                }

                return backupRegistryInstance;
            }
        }, randomizer);
    }

在构造方法的初始逻辑前,先初始化了BackupRegistry的对象,这个是当eureka所有的url都无法连接时,会按照backupRegistryImpl指定的自定义类来实现,默认使用NotImplementedRegistryImpl,内部什么都不处理。

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
        // 是否用自定义参数来覆盖
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        
        // -------------   将构造方法的参数 给全局参数赋值   -----------------
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }

        this.backupRegistryProvider = backupRegistryProvider;
        this.endpointRandomizer = endpointRandomizer;
        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());

        fetchRegistryGeneration = new AtomicLong(0);

        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
        // -------------   将构造方法的参数 给全局参数赋值   -----------------

        // 如果开启了拉取注册表,就在ThresholdLevelsMetric中注册JXM,提供metric的查询
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
        
        // 如果开启了注册到eurkea,就在ThresholdLevelsMetric中注册JXM,提供metric的查询
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

        //如果既不需要拉取,又不需要注,则直接结束
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            initRegistrySize = this.getApplications().size();
            registrySize = initRegistrySize;
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, initRegistrySize);

            return;  // no need to setup up an network tasks and we are done
        }

        // 否则就完成整个client端初始化的动作。
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            // 线程池创建两个线程,分别给心跳和刷新cache使用
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            // 用于执行心跳任务的线程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
             
           // 用于执行刷新缓存任务的线程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            // 完成eurekaTransport的创建和内部属性的初始化
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

            // 初始化默认的az和region的对应关系
            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            // 构建instanceRegionChecker 用于查询InstanceInfo对应的region,判断是否localRegion等
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        if (clientConfig.shouldFetchRegistry()) {
            try {
                // 重要方法,具体说明看fetchRegistry的分析
                boolean primaryFetchRegistryResult = fetchRegistry(false);
                // 如果拉取失败,则会输出如下异常
                if (!primaryFetchRegistryResult) {
                    logger.info("Initial registry fetch from primary servers failed");
                }
                boolean backupFetchRegistryResult = true;
                // 拉取失败则调用backup的实现类
                if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                    backupFetchRegistryResult = false;
                    logger.info("Initial registry fetch from backup servers failed");
                }
                if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                    throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
                }
            } catch (Throwable th) {
                logger.error("Fetch registry error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        // 提供了一个扩展点,需要在发起注册之前,进行一些操作。
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }

        // 在初始化阶段强制注册,这里shouldEnforceRegistrationAtInit()默认值是false,如果注册失败会抛出异常
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        // 关键方法,初始化各种schedule任务,包括心跳、拉取注册表等,具体说明看initScheduledTasks的分析
        initScheduledTasks();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        initRegistrySize = this.getApplications().size();
        registrySize = initRegistrySize;
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, initRegistrySize);
    }
fetchRegistry()
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            // Application 内部封装了从eureka服务端返回的所有注册表信息,第一次启动时这里时空的,在拉取全量或增量时更新
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                // 全量拉取注册表信息
                getAndStoreFullRegistry();
            } else {
                // 增量拉取注册表信息
                getAndUpdateDelta(applications);
            }
            // 这里的hashCode用来判断拉取跟上次相比是否发生过变更
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
                    appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;

这个fetchRegistry方法会在启动时调用,以及schedule任务执行时,每次刷新客户端缓存时调用。主要做了几件事情。

  1. 如果是启动时,会全量拉取eureka的注册信息,调用/eureka/apps
  private void getAndStoreFullRegistry() throws Throwable {
        ...
        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        ...

这里的apps就是从eureka服务端拉取的全量注册表信息,如下图


  1. 如果是schedule任务会增量拉取,调用getAndUpdateDelta(),内部通过http请求/eureka/apps/delta,这里的delta就是从eureka服务端拉取的增量注册表信息。
   private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }
        if (delta == null) {
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    // 获取到增量列表后,依次遍历每一个application,更新到本地cache中
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } 
    ...
  1. 调用onCacheRefreshed(),刷新客户端缓存。子类CloudEurekaClient重写了onCacheRefresh,由eventListeners来处理CacheRefreshedEvent,并发出HeartbeatEvent事件。
    @Override
    protected void onCacheRefreshed() {
        super.onCacheRefreshed();

        if (this.cacheRefreshedCount != null) { // might be called during construction and
            // will be null
            long newCount = this.cacheRefreshedCount.incrementAndGet();
            log.trace("onCacheRefreshed called with count: " + newCount);
            this.publisher.publishEvent(new HeartbeatEvent(this, newCount));
        }
    }
    protected void onCacheRefreshed() {
        fireEvent(new CacheRefreshedEvent());
    }

    protected void fireEvent(final EurekaEvent event) {
        for (EurekaEventListener listener : eventListeners) {
            try {
                listener.onEvent(event);
            } catch (Exception e) {
                logger.info("Event {} throw an exception for listener {}", event, listener, e.getMessage());
            }
        }
    }
  1. 调用updateInstanceRemoteStatus将当前实例的状态更新到eureka服务端。
  private synchronized void updateInstanceRemoteStatus() {
        // Determine this instance's status for this app and set to UNKNOWN if not found
        InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
        if (instanceInfo.getAppName() != null) {
            Application app = getApplication(instanceInfo.getAppName());
            if (app != null) {
                InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
                if (remoteInstanceInfo != null) {
                    // 本次拉取的服务状态
                    currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
                }
            }
        }
        if (currentRemoteInstanceStatus == null) {
            currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
        }

        // Notify if status changed
        // 本次拉取的服务状态 和 上次拉取的服务状态不一致
        if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
            onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
            lastRemoteInstanceStatus = currentRemoteInstanceStatus;
        }
    }

  protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
        fireEvent(new StatusChangeEvent(oldStatus, newStatus));
    }

先将服务远程的当前状态跟上一次拉取的状态进行比较,如果一致,则结束,不一致则发出一个StatusChangeEvent事件。

如果成功从eureka 服务端拉取注册表成功,则fetchRegistry返回true。

initScheduledTasks()
    // 初始化所有的schedule任务
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            // 定义cacheRefreshTask,schedule每隔registryFetchIntervalSeconds来刷新客户端缓存
            cacheRefreshTask = new TimedSupervisorTask("cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread());
            scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            // 定义heartbeatTask,schedule每隔renewalIntervalInSecs来发起心跳(续约)
            heartbeatTask = new TimedSupervisorTask("heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread());
            scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            // 实例信息的复制器,内部包含schedule,同步到eureka服务端
            instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize

            // 定义了状态变更的listener,并注册到applicationInfoManager中
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            // 开启schedule任务
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
  1. 首先给cache和heartbeat创建了两个TimedSupervisorTask,定义如下,scheduler会每隔一定时间来执行task。
  2. 创建了instanceInfoReplicator,是实例信息变化的复制器,当发现出现变化时,内部通过register方法将新的信息同步到eureka服务端。
  3. 创建了statusChangeListener,当触发状态变更回调时,调用instanceInfoReplicator.onDemandUpdate(); 并将其添加到applicationInfoManager中。
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.name = name;
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;
        ...
    }
@Override
    public void run() {
        Future<?> future = null;
        try {
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();

            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);
        } finally {
            if (future != null) {
                future.cancel(true);
            }

            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }

这里还需要 重点说明下run的实现
scheduler.schedule在调度任务时,只传了delay的时间,并没有传周期时间,而执行周期其实是会动态变化的,这里的动态变化是通过scheduler.schedule的递归来完成。

可以看到在try块中,delay的值会被重置,并且如果出现超时,则将newDelay可能设置成2倍的时间,再在finally中再次执行schedule,延迟的时间就是2倍的delay,这是一种动态周期的schedule实现方案

cacheRefreshTask
class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

    @VisibleForTesting
    void refreshRegistry() {
        try {
            ...
            //remoteRegionsModified  
            // 前文判断根据remoteRegions是否发生变化来决定是全量拉取还是增量拉取
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                // 这里的localRegionApps是拉取注册表信息内容的本地缓存
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
            ...
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }
    }

这个task其实就是调用了fetchRegistry,拉取最新的注册表,刷新本地cache。

heartbeatTask
    private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            ... //这里判断如果是404的请求,则再发起一次注册请求,同步实例信息
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

这个task就是发起了一个http请求,调用了sendHearBeat方法,发起了一次心跳续约。

InstanceInfoReplicator
   public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

     public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            if (!scheduler.isShutdown()) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        logger.debug("Executing on-demand update of local InstanceInfo");
    
                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                            latestPeriodic.cancel(false);
                        }
    
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
                logger.warn("Ignoring onDemand update due to stopped scheduler");
                return false;
            }
        } else {
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }

    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

InstanceInfoReplicator 是实例信息的同步器,作用是当发生变更时(状态、地址、ip、配置参数值等),将新的InstanceInfo同步给eureka服务端。
这里的三个方法:
start():在初始化时,调用start开启schedule任务,每隔一段时间来调用run()检测变更。
onDemandUpdate():在触发状态变更时,作为listener来调用,也会调用run()检测变更。
run():检测当前InstanceInfo是否跟上次相同,如果出现变更,则调用discoveryClient.register();将新的信息同步到eureka服务端。依据则是instanceInfo是否被标记为isDirty

而标记的依据在discoveryClient.refreshInstanceInfo();中实现。

/**
     * Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the
     * isDirty flag on the instanceInfo is set to true
     */
    void refreshInstanceInfo() {
        // 判断地址、ip、实例所在的部署节点是否变更
        applicationInfoManager.refreshDataCenterInfoIfRequired();
        // 判断各种续约配置值(LeaseInfo) 是否变更
        applicationInfoManager.refreshLeaseInfoIfRequired();

        InstanceStatus status;
        try {
            status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
        } catch (Exception e) {
            logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
            status = InstanceStatus.DOWN;
        }

        if (null != status) {
            applicationInfoManager.setInstanceStatus(status);
        }
    }

先根据两个refresh方法来判断是否变更,如果变更,内部会调用instanceInfo.setIsDirty();来进行标记。

public synchronized void setIsDirty() {
        isInstanceInfoDirty = true;
        lastDirtyTimestamp = System.currentTimeMillis();
    }

至此,initScheduledTasks分析完成。内部分别启动了三个schedule,分别用于将新的注册表刷新本地cache发送心跳续约发送客户端最新的实例信息

再回到DiscoveryClient的构造方法,在initScheduledTasks执行完成后,就是注册JMX bean,设置一些全局属性。

CloudEurekaClient 在构造完成后,会根据是否配置了healthCheckHandler(默认的EurekaHealthCheckHandler实现需要配置springboot actuator),如果配置了会将健康状态同步到eureka 服务端一次。

    @Override
    public void registerHealthCheck(HealthCheckHandler healthCheckHandler) {
        if (instanceInfo == null) {
            logger.error("Cannot register a healthcheck handler when instance info is null!");
        }
        if (healthCheckHandler != null) {
            this.healthCheckHandlerRef.set(healthCheckHandler);
            // schedule an onDemand update of the instanceInfo when a new healthcheck handler is registered
            if (instanceInfoReplicator != null) {
                instanceInfoReplicator.onDemandUpdate();
            }
        }
    }

至此,CloudEurekaClient的整个构建过程完成的所有内容也分析完成。

再回到EurekaServiceRegistry的register方法

  public void register(EurekaRegistration reg) {
        maybeInitializeClient(reg);

        if (log.isInfoEnabled()) {
            log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
                    + " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
        }

        reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

        reg.getHealthCheckHandler()
                .ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
    }

在完成整个过程的定义之后,先调用setInstanceStatus设置Instance的状态,启动成功这里就是UP。
接着,如果配置了健康检查,还会触发一个发送健康检查同步的事件。

public synchronized void setInstanceStatus(InstanceStatus status) {
        InstanceStatus next = instanceStatusMapper.map(status);
        if (next == null) {
            return;
        }

        InstanceStatus prev = instanceInfo.setStatus(next);
        if (prev != null) {
            for (StatusChangeListener listener : listeners.values()) {
                try {
                    listener.notify(new StatusChangeEvent(prev, next));
                } catch (Exception e) {
                    logger.warn("failed to notify listener: {}", listener.getId(), e);
                }
            }
        }
    }

还会发出一个StatusChangeEvent,通知listener来向eureka服务端做同步。

register至此分析完成。

再回到EurekaAutoServiceRegistration(SmartLifecycle)的start方法。

@Override
    public void start() {
        // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
        if (this.port.get() != 0) {
            if (this.registration.getNonSecurePort() == 0) {
                this.registration.setNonSecurePort(this.port.get());
            }

            if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
                this.registration.setSecurePort(this.port.get());
            }
        }

        // only initialize if nonSecurePort is greater than 0 and it isn't already running
        // because of containerPortInitializer below
        if (!this.running.get() && this.registration.getNonSecurePort() > 0) {

            this.serviceRegistry.register(this.registration);

            this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
            this.running.set(true);
        }
    }

register完成后,context发出了一个InstanceRegisteredEvent事件,这个事件不是eureka的事件,而是一个ApplicationEvent。目的是提供一个扩展点,来通知eureka实例注册完成。用户可以自定义listener来处理。

至此,整个eureka客户端的启动流程,注册原理分析完成。

3. 下线分析

在SmartLifecycle的定义中,当实例停止前,会调用stop方法,eureka也因此触发下线流程。

    @Override
    public void stop() {
        this.serviceRegistry.deregister(this.registration);
        this.running.set(false);
    }
@Override
    public void deregister(EurekaRegistration reg) {
        if (reg.getApplicationInfoManager().getInfo() != null) {

            if (log.isInfoEnabled()) {
                log.info("Unregistering application " + reg.getApplicationInfoManager().getInfo().getAppName()
                        + " with eureka with status DOWN");
            }

            reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);

            // shutdown of eureka client should happen with EurekaRegistration.close()
            // auto registration will create a bean which will be properly disposed
            // manual registrations will need to call close()
        }
    }

可以看到下线的处理流程非常简单,就是设置了ApplicationInfoManager中的instanceStatus为DOWN的状态,设置down的同时,会触发StatusChangeEvent事件的发送,由statusChangeListener来向eureka服务端发送新的下线通知。

总结

  1. eureka的客户端通过SmartLifecycle作为入口点,开启整个eureka的bean注入;
  2. 所有的核心代码都在CloudEurekaClient的父类构造方法中完成。
    a. 定义refreshCacheTask来定时拉取注册表
    b. 定义heartbeatTask来定时发送续约请求
    c. 定义instanceInfoReplicator来向eureka服务端发送实例信息
    d. 定义statusChangeListener 来接受实例变化事件,发送register请求。
  3. 每当实例信息发生变更时,就发送StatusChangeEvent,判定变更的条件包括InstanceInfo实例属性变化、InstanceConfig配置发生变更。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,980评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,422评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,130评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,553评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,408评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,326评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,720评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,373评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,678评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,722评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,486评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,335评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,738评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,283评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,692评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,893评论 2 335

推荐阅读更多精彩内容