为什么需要熔断器
软件系统通常会远程调用运行在不同进程中的软件,这些进程可能位于网络上的不同机器上。 内存中调用与远程调用之间的一大区别是,远程调用可能会失败,或者在达到某个超时限制之前挂起,没有响应。 更糟糕的是,如果一个无响应的供应商上有很多调用者,那么你可能会耗尽关键资源,导致多个系统出现连锁故障。 Michael Nygard 在其出色的《Release It!》一书中普及了熔断器模式,以防止这种灾难性的级联故障。
熔断器背后的基本思想非常简单。 你可以将受保护的函数调用封装在熔断器对象中,由熔断器对象监控故障。 一旦故障达到一定的阈值,熔断器就会跳闸,所有对熔断器的进一步调用都会以错误返回,而受保护的调用根本不会被执行。 如果熔断器跳闸,通常还需要某种监控警报。
译者:从下图我们得知,熔断器的定位在发送端(客户端)——这是和限流机制的一大不同。
熔断器的机制
下面是一个简单的示例,使用 Ruby 如何防止超时。 我在熔断器中设置了一个块(Lambda),它是受保护的调用。
cb = CircuitBreaker.new {|arg| @supplier.func arg}
熔断器存储块,初始化各种参数(用于阈值、超时和监控),并将熔断器重置为闭合状态。
class CircuitBreaker...
attr_accessor :invocation_timeout, :failure_threshold, :monitor
def initialize &block
@circuit = block
@invocation_timeout = 0.01
@failure_threshold = 5
@monitor = acquire_monitor
reset
end
如果电路关闭,调用熔断器将继续后续的操作(原文:underlying block),如果熔断器处于打开状态,则返回错误信息。
# client code
aCircuitBreaker.call(5)
class CircuitBreaker...
def call args
case state
when :closed
begin
do_call args
rescue Timeout::Error
record_failure
raise $!
end
when :open then raise CircuitBreaker::Open
else raise "Unreachable Code"
end
end
def do_call args
result = Timeout::timeout(@invocation_timeout) do
@circuit.call args
end
reset
return result
end
每一次调用如果出现超时,我们会递增失败计数器,但只有有一次成功调用,我们就会将其(失败计数器)重置为零。
class CircuitBreaker...
def record_failure
@failure_count += 1
@monitor.alert(:open_circuit) if :open == state
end
def reset
@failure_count = 0
@monitor.alert :reset_circuit
end
我们通过将故障计数与阈值进行比较,确定断路器的状态
class CircuitBreaker...
def state
(@failure_count >= @failure_threshold) ? :open : :closed
end
这种简单的熔断器可以在熔断器处于开启状态时避免发出受保护的调用,但当熔断器恢复关闭状态时,则需要外部干预来重置熔断器。 对于建筑物中的熔断器来说,这是一种合理的方法;但对于软件熔断器来说,我们可以让熔断器自己检测底层调用是否恢复正常。 我们可以在适当的时间间隔后再次尝试发送受保护的调用,并在成功后重置断路器,从而实现这种自重置行为。
创建这种断路器意味着要添加一个尝试重置的阈值,并设置一个变量来保存上次出错的时间。
class ResetCircuitBreaker...
def initialize &block
@circuit = block
@invocation_timeout = 0.01
@failure_threshold = 5
@monitor = BreakerMonitor.new
@reset_timeout = 0.1
reset
end
def reset
@failure_count = 0
@last_failure_time = nil
@monitor.alert :reset_circuit
end
现在出现了第三种状态–半开,这意味着电路已经准备好作为试验进行真正的呼叫,看看问题是否已经解决。
class ResetCircuitBreaker...
def state
case
when (@failure_count >= @failure_threshold) &&
(Time.now - @last_failure_time) > @reset_timeout
:half_open
when (@failure_count >= @failure_threshold)
:open
else
:closed
end
end
如果要求在半开状态下呼叫,则会导致试呼叫,如果成功,则会重置断路器,如果失败,则会重新启动超时。
class ResetCircuitBreaker...
def call args
case state
when :closed, :half_open
begin
do_call args
rescue Timeout::Error
record_failure
raise $!
end
when :open
raise CircuitBreaker::Open
else
raise "Unreachable"
end
end
def record_failure
@failure_count += 1
@last_failure_time = Time.now
@monitor.alert(:open_circuit) if :open == state
end
这个例子只是一个简单的说明,实际上断路器提供了更多的功能和参数。 通常情况下,断路器会防止受保护调用可能引发的一系列错误,如网络连接故障。 并非所有错误都会导致电路跳闸,有些错误应反映正常故障,并作为常规逻辑的一部分进行处理。
如果流量很大,可能会出现许多调用都在等待初始超时的问题。 由于远程调用通常速度较慢,因此最好将每次调用放在不同的线程上,使用 future 或 promise 来处理返回的结果。 通过从线程池中抽取这些线程,可以在线程池用完时安排断路。
该示例展示了一种简单的断路器跳闸方法,即在呼叫成功后重置计数。 一个更复杂的方法可能会考虑错误发生的频率,比如一旦失败率达到 50%,就会跳闸。 你也可以为不同的错误设置不同的阈值,例如超时的阈值为 10,而连接失败的阈值为 3。
由此我们可以总结,熔断器有3
种状态:
CLOSED
:默认状态。熔断器观察到请求失败比例没有达到阈值,熔断器认为被代理服务状态良好。OPEN
:熔断器观察到请求失败比例已经达到阈值,熔断器认为被代理服务故障,打开开关,请求不再到达被代理的服务,而是快速失败。HALF OPEN
:熔断器打开后,为了能自动恢复对被代理服务的访问,会切换到半开放状态,(一段时间后)去尝试请求被代理服务以查看服务是否已经故障恢复。如果成功,会转成CLOSED
状态,否则转到OPEN
状态。
关于熔断器,更广泛的意义
我展示的例子是同步呼叫的断路器,但断路器对异步通信也很有用。 这里的一种常用技术是将所有请求放在一个队列中,供应商以自己的速度消耗队列中的请求–这是一种避免服务器过载的有用技术。 在这种情况下,当队列排满时,电路就会断开。
断路器本身有助于减少被可能失败的操作占用的资源。 你可以避免客户端等待超时,而断路则可以避免给苦苦挣扎的服务器带来负担。 我在这里讨论的是远程调用,这是断路器的一种常见情况,但它们也可用于任何需要保护系统部分免受其他部分故障影响的情况。
断路器是进行监控的重要场所。 断路器状态的任何变化都应记录在案,断路器应显示其状态的详细信息,以便进行更深入的监控。 断路器的行为往往是对环境中更深层次问题发出警告的良好来源。 运行人员应该能够跳闸或重置断路器。
断路器本身是有价值的,但使用断路器的客户需要对断路器故障做出反应。 与任何远程调用一样,您需要考虑出现故障时该如何处理。 是正在执行的操作失败,还是可以采取变通方法? 信用卡授权可以放在队列中稍后处理,无法获得某些数据可以通过显示一些足够好的陈旧数据来缓解。
AtomicInteger
AtomicInteger
是 Java 中 java.util.concurrent.atomic
包提供的一个类,用于在多线程环境下进行整数的原子操作。它是线程安全的,并通过使用底层的硬件原子操作来确保操作的原子性,因此避免了传统的锁机制,提供了更高效的性能。
主要功能
AtomicInteger
提供了一些常见的原子操作方法,例如:
- 获取与设置
get()
: 获取当前的值。set(int newValue)
: 设置为指定的新值。
- 增减操作
incrementAndGet()
: 原子地将当前值加一,并返回新的值。decrementAndGet()
: 原子地将当前值减一,并返回新的值。getAndIncrement()
: 返回当前值,并原子地将当前值加一。getAndDecrement()
: 返回当前值,并原子地将当前值减一。
- 更新操作
getAndSet(int newValue)
: 返回当前值,并将其设置为指定的新值。compareAndSet(int expect, int update)
: 如果当前值 ==expect
,则将其设置为update
,并返回true
;否则返回false
。
示例代码
下面是一个简单的示例,展示了如何使用 AtomicInteger
进行线程安全的计数操作:
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerExample {
private static final int THREADS_COUNT = 10;
private static AtomicInteger atomicInteger = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
atomicInteger.incrementAndGet();
}
});
threads[i].start();
}
for (int i = 0; i < THREADS_COUNT; i++) {
threads[i].join();
}
System.out.println("Final count is: " + atomicInteger.get());
}
}
关键点总结
- 线程安全:
AtomicInteger
通过原子操作确保了多线程环境下的线程安全。 - 无锁机制:使用硬件级别的原子操作,避免了传统的锁机制,从而提高了性能。
- 常用方法:提供了多种常用的原子操作方法,如
get()
,set(int)
,incrementAndGet()
,compareAndSet(int, int)
等。
AtomicInteger
非常适合在高并发环境下需要进行简单整数操作的场景,可以有效地减少锁的竞争和上下文切换,提高程序的并发性能。