什么是持续集成系统?
开发软件时,我们希望能够验证我们的新功能或错误修复是否安全,并如预期的工作。我们通过代码测试来做到这一点。有时,开发人员将在本地运行测试以验证其更改是否安全,但可能没有时间在其软件运行的每个系统上测试代码。此外,越来越多的测试被添加到运行所需的时间,甚至在当地不太可行。因此,建立了持续的集成系统。
连续集成(CI)系统是用于测试新代码的专用系统。在提交到代码存储库时,持续集成系统验证此提交不会中断。为此,系统必须能够获取新的更改,运行测试并报告其结果。像其他系统一样,这是无法抵抗的。意味着如果系统的任何部分发生故障,它应该能够从该点恢复并继续。
这个测试系统还应该很好地处理负载,以便我们可以在合理的时间内获得测试结果,以便在提交速度比测试运行的时间更快的情况下。我们可以通过分发和并行化测试工作来实现这一点。该项目将展示为扩展性设计的小型,裸机分布式连续集成系统。
项目限制和注释
该项目使用Git作为需要测试的代码的存储库。只使用标准的源代码管理,如果你不熟悉Git但熟悉svn或者Mercurial等其他版本控制系统(VCS),你仍然可以参考。
由于代码长度和单元测试的限制,我简化了测试,只运行位于存储库中tests的目录下的测试。
连续集成系统监视通常托管在Web服务器上的主存储库,而不是CI的文件系统本地。对于我们示例的情况,我们将使用本地存储库而不是远程存储库。
持续集成系统不需要按照固定的时间表进行运行。可以运行几个提交或一次提交。我们的例子中CI系统定期运行。这意味着如果设置为检查五秒钟内的更改,将针对五秒钟后提交的内容进行测试。它不会测试在这段时间内做出的所有提交,只有最新的一次。
该CI系统旨在定期检查存储库中的更改。在现实的CI系统中,你还可以通过托管存储库通知存储库观察器。例如,Github提供了发送通知给URL的“post-commit hooks”。遵循这个模型,存储库观察者将被托管在该URL处的Web服务器调用以响应该通知。由于这在本地进行模拟是复杂的,所以我们使用一个观察者模型,其中存储库观察者将检查更改,而不是被通知。
CI系统还有一个report,测试者将其结果报告给可供人们查看的组件,可能在网页上。为简单起见,此项目收集测试结果,并将其作为文件存储在调度程序进程本地的文件系统中。
请注意,CI系统使用的架构只是其中的一种可能性。我们的案例研究使用这种方法简化为三个主要组成部分。
介绍
连续集成系统的基本结构由三个部分组成:an observer, a test job dispatcher, and a test runner。observer观看存储库。当它注意到提交时,它会通知作业调度程序。然后,作业调度程序会找到一个测试运行器,并给出它要进行测试的提交号。
建立CI系统有许多方法。我们可以让observer,dispatcher和runner在单个机器上运行相同的过程。这种方法非常有限,因为没有负载处理,如果更多的更改添加到存储库,会积累大量的积压。这种方法也是不容有错的;如果运行的计算机出现故障或断电,则没有后备系统,因此不会运行测试。理想的系统根据要求处理尽可能多的测试工作的系统,并且将尽力补偿。
为了构建一个容错和负载的CI系统,在这个项目中,每个组件都是独立的。这使每个进程独立于其他进程,并运行每个进程的多个实例。当你有多个测试作业需要同时运行时。然后我们可以并行产生多个测试runners,让我们能够运行尽可能多的工作,并阻止积累的排队测试。
这个项目中,这些组件不仅可作为单独的进程运行,而且还可以通过套接字进行通信,这使我们在单独的联网计算机上运行每个进程。每个组件分配唯一的主机/端口地址,并且每个进程可以通过在分配的地址发布消息来与其他进程通信。
通过启用分布式架构,此设计可即时处理硬件故障。我们可以让observer在一台机器上运行,另一台机器上的测试作业dispatcher ,另一台机器上的测试runner可以通过网络进行通信。如果这些机器中的任何一台机器停机,我们可以安排一台新机器上线,这样系统就会失效。
该项目不包括自动恢复代码,这取决于你的分布式系统的架构,但现实中,CI系统在这样的分布式环境中运行,因此它们可以具有故障转移冗余。
因此,这些过程的每一个将在本地和手动启动不同的本地端口。
此项目中的文件
此项目包含以下每个组件的Python文件:repository observer(repo_observer.py
),the test job dispatcher (dispatcher.py
)和 test runner (test_runner.py
)。这三个进程中的每一个都使用套接字彼此进行通信,并且由于用于传输信息的代码由所有这些进程共享,所以包含helpers.py文件,因此每个进程导入通信函数。
还有这些进程使用的bash脚本文件。这些脚本文件用于通过简单的方式执行bash和git命令,而不是持续使用Python的操作系统级别模块(如os和子进程)。
最后,有一个测试目录,包含CI系统将运行的两个示例测试。一个通过,另一个失败。
初始设置
虽然这个CI系统在分布式系统中工作,但首先在计算机上本地运行一切,这样我们可以掌握CI系统的工作原理,而不增加运行在网络的风险。如果想在分布式环境中运行,可以在自己的机器上运行每个组件。
持续集成系统通过检测代码库中的更改来运行测试,因此起初要设置CI系统将监视的存储库。
调用test_repo
:
$ mkdir test_repo
$ cd test_repo
$ git init
这是我们的主存储库。开发人员检查代码的地方,所以我们的CI应该拉这个存储库并检查提交,然后运行测试。repository observer检查新提交的事情。
repository observer通过检查提交工作,因此在主存储库中至少提交一次。
将测试文件夹从该代码库复制到test_repo
并提交:
$ cp -r /this/directory/tests /path/to/test_repo/
$ cd /path/to/test\_repo
$ git add tests/
$ git commit -m ”add tests”
现在在主仓库中有一次提交。
repo observer component需要克隆代码,则可以检测何时进行新的提交。 让我们创建一个我们的主存储库的克隆test_repo_clone_obs
:
$ git clone /path/to/test_repo test_repo_clone_obs
test runner还需要克隆代码,则可以在给定的提交中检出存储库并运行测试。让我们创建另一个克隆我们的主存储库,并将其称为test_repo_clone_runner
:
$ git clone /path/to/test_repo test_repo_clone_runner
组件
Repository Observer(repo_observer.py
)
repository observer监视存储库,并在看到新的提交时通知调度程序。为了与所有版本控制系统配合使用,这个存储库观察器为定期检查存储库是否有新的提交,而不是依靠VCS来通知已做出更改。
observer定期轮询存储库,当看到更改时,它会告诉调度程序最新的提交ID以对其进行测试。observer通过在存储库中查找当前提交ID来检查新的提交,然后更新存储库,最后,找到最新的提交ID进行比较。observer仅针对最新的提交进行测试。这意味着如果在定期检查之间进行两次提交,观察者将只针对最新的提交运行测试。通常,CI系统检测自上次测试提交以来的所有提交,并将为每个新提交发送测试运行程序,但为简单起见,我修改了这一假设。
observer必须知道要查看哪个存储库。我们以前在/path/to/test_repo_clone_obs
创建了存储库的克隆。观察者使用此克隆来检测更改。为了让repository observer使用这个克隆,我们在调用repo_observer.py
文件时传递它。repository observer使用该克隆从主存储库中拉出。
我们必须告诉observer调度员的地址,所以observer 可以发送消息。当启动repository observer时,可以使用・–dispatcher-server・命令参数传入调度程序的服务器地址。如果不用,则默认使用localhost:8888
。
def poll():
parser = argparse.ArgumentParser()
parser.add_argument("--dispatcher-server",
help="dispatcher host:port, " \
"by default it uses localhost:8888",
default="localhost:8888",
action="store")
parser.add_argument("repo", metavar="REPO", type=str,
help="path to the repository this will observe")
args = parser.parse_args()
dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")
一旦调用repository observer文件,则启动poll()
函数。 此函数解析命令行参数,启动无限while循环。while循环用于定期检查存储库以进行更改。第一件事就是调用update_repo.sh
Bash脚本。
while True:
try:
# call the bash script that will update the repo and check
# for changes. If there's a change, it will drop a .commit_id file
# with the latest commit in the current working directory
subprocess.check_output(["./update_repo.sh", args.repo])
except subprocess.CalledProcessError as e:
raise Exception("Could not update and check repository. " +
"Reason: %s" % e.output)
update_repo.sh
文件用于识别新提交,并告知repository observer。通过提交ID,提取存储库并检查最新的提交ID 如果它们匹配,则不进行任何更改,因此存储库观察器不需要执行任何操作,但是如果提交ID有差异,则交了新的提交。这种情况下,update_repo.sh
将创建一个名为.commit_id
的文件,其中存储最新的提交ID。
update_repo.sh
的分步如下。首先,run_or_fail.sh
脚本,该文件提供了所有shell使用的run_or_fail
帮助程序。该方法给出运行命令,或者错误信息。
#!/bin/bash
source run_or_fail.sh
接下来,脚本尝试删除.commit_id
文件。由于updaterepo.sh
被repo_observer.py
无限调用,如果我们有一个新的提交,则创建.commit_id
,但是保存了已经测试过的提交。因此,我们要删除该文件,并且只有在发现新的提交时才创建新文件。
bash rm -f .commit_id
删除文件后(如果存在),它验证我们正在观察的存储库是否存在,然后将其重置为最近的提交,以防任何导致其失去同步。
run_or_fail "Repository folder not found!" pushd $1 1> /dev/null
run_or_fail "Could not reset git" git reset --hard HEAD
然后调用git log
解析输出,寻找最新的提交ID。
COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
echo "Could not call 'git log' on repository"
exit 1
fi
COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`
run_or_fail "Could not pull from repository" git pull
COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
echo "Could not call 'git log' on repository"
exit 1
fi
NEW_COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`
然后拉取存储库,获取最新的更改,获取提交ID。
# if the id changed, then write it to a file
if [ $NEW_COMMIT_ID != $COMMIT_ID ]; then
popd 1> /dev/null
echo $NEW_COMMIT_ID > .commit_id
fi
如果提交ID与以前的ID不匹配,则说明有新的提交要检查,所以脚本将最新的提交ID存储在.commit_id
文件中。
# if the id changed, then write it to a file
if [ $NEW_COMMIT_ID != $COMMIT_ID ]; then
popd 1> /dev/null
echo $NEW_COMMIT_ID > .commit_id
fi
当update_repo.sh
完成repo_observer.py
运行,repository observer检查.commit_id
文件的存在。如果存在,则有一个新的提交,我们需要通知调度程序,启动测试。repository observer通过连接并发送“状态”请求检查调度服务器的状态,以确保没有问题且准备好进行指令。
if os.path.isfile(".commit_id"):
try:
response = helpers.communicate(dispatcher_host,
int(dispatcher_port),
"status")
except socket.error as e:
raise Exception("Could not communicate with dispatcher server: %s" % e)
如果回应”OK”,则 repository observer打开.commit_id
文件,读取最新的提交ID,并使用dispatch:<commit ID>
请求将该ID发送给调度程序。然后睡眠五秒钟,重复此过程。 如果出现问题,五秒内再试一次。
if response == "OK":
commit = ""
with open(".commit_id", "r") as f:
commit = f.readline()
response = helpers.communicate(dispatcher_host,
int(dispatcher_port),
"dispatch:%s" % commit)
if response != "OK":
raise Exception("Could not dispatch the test: %s" %
response)
print "dispatched!"
else:
raise Exception("Could not dispatch the test: %s" %
response)
time.sleep(5)
repository observer永远重复此过程,直到通过KeyboardInterrupt
(Ctrl+c)终止进程,或发出kill信号。
Dispatcher (dispatcher.py
)
调度程序是用于委派测试任务的独立任务。侦听test runners和repository observer请求的端口。它允许test runners注册自己,并且当repository observer给出提交ID时,它将派生一个test runners对新的提交。它还优雅地处理test runners的任何问题,如果出现了问题,将重新将提交ID重新分配给新的test runners。
执行dispatch.py
时,调用serve
函数。首先,解析指定的调度程序的主机和端口的参数:
def serve():
parser = argparse.ArgumentParser()
parser.add_argument("--host",
help="dispatcher's host, by default it uses localhost",
default="localhost",
action="store")
parser.add_argument("--port",
help="dispatcher's port, by default it uses 8888",
default=8888,
action="store")
args = parser.parse_args()
这将启动调度服务器和另外两个线程。一个线程运行runner_checker
函数,其他线程运行redistribute
函数。
server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
print `serving on %s:%s` % (args.host, int(args.port))
...
runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
redistributor = threading.Thread(target=redistribute, args=(server,))
try:
runner_heartbeat.start()
redistributor.start()
# Activate the server; this will keep running until you
# interrupt the program with Ctrl+C or Cmd+C
server.serve_forever()
except (KeyboardInterrupt, Exception):
# if any exception occurs, kill the thread
server.dead = True
runner_heartbeat.join()
redistributor.join()
runner_checker
函数定期ping每个注册的test runner,以确保它们仍然响应。如果它们无响应了,那么该运行程序将其删除,其提交ID将被分派到下一个可用的运行程序。该函数将在pending_commits
变量中记录提交ID。
def runner_checker(server):
def manage_commit_lists(runner):
for commit, assigned_runner in server.dispatched_commits.iteritems():
if assigned_runner == runner:
del server.dispatched_commits[commit]
server.pending_commits.append(commit)
break
server.runners.remove(runner)
while not server.dead:
time.sleep(1)
for runner in server.runners:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
response = helpers.communicate(runner["host"],
int(runner["port"]),
"ping")
if response != "pong":
print "removing runner %s" % runner
manage_commit_lists(runner)
except socket.error as e:
manage_commit_lists(runner)
redistribute
函数用于调度挂在pending_commits
中的提交ID。当重新分发运行时,它会检查pending_commits
中是否有提交ID。如果有,它会使用提交ID来调用dispatch_tests
函数。
def redistribute(server):
while not server.dead:
for commit in server.pending_commits:
print "running redistribute"
print server.pending_commits
dispatch_tests(server, commit)
time.sleep(5)
dispatch_tests
函数用于从注册runner池中找到可用的test runner。如果有,它将使用提交ID发送最新消息。如果没有,则等待两秒钟并重复此过程。一旦调度,它将记录dispatched_commits
变量中测试运行程序正在测试哪个提交ID。如果提交ID在pending_commits
变量中,则删除dispatch_tests
,因为它已经被成功重新分派。
def dispatch_tests(server, commit_id):
# NOTE: usually we don't run this forever
while True:
print "trying to dispatch to runners"
for runner in server.runners:
response = helpers.communicate(runner["host"],
int(runner["port"]),
"runtest:%s" % commit_id)
if response == "OK":
print "adding id %s" % commit_id
server.dispatched_commits[commit_id] = runner
if commit_id in server.pending_commits:
server.pending_commits.remove(commit_id)
return
time.sleep(2)
调度服务器使用SocketServer
模块,它是一个非常简单的服务器,标准库的一部分。SocketServer
模块有四种基本的服务器类型:TCP
,UDP
,UnixStreamServer
和UnixDatagramServer
。我们将使用基于TCP
的套接字服务器,因此可以确保服务器之间有连续的有序数据流,因为UDP
不能确保。
SocketServer
提供的默认TCPServer
只能一次处理一个请求,因此它不能处理调度程序与一个连接进行通话的情况,比如测试运行程序,然后从存储库观察器发出一个新连接。如果发生这种情况,repository observer将不得不等待第一个连接完成并断开才能被服务。这是不理想的,因为调度服务器必须能够直接和迅速地与所有的test runners和repository observer。
为了使调度服务器处理并发连接,使用ThreadingTCPServer
自定义类,它将线程能力添加到默认SocketServer
。这意味着在调度员接收到连接请求的任何时候,它只会为该连接旋转一个新进程。调度员可以同时处理多个请求。
class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
runners = [] # Keeps track of test runner pool
dead = False # Indicate to other threads that we are no longer running
dispatched_commits = {} # Keeps track of commits we dispatched
pending_commits = [] # Keeps track of commits we have yet to dispatch
调度服务器通过为每个请求定义处理程序来工作。这是由DispatcherHandler
类定义的,继承了SocketServer
的BaseRequestHandler
。这个基类只需要我们定义句柄函数,在请求一个连接时被调用。DispatcherHandler
中定义的句柄函数是我们自定义的处理程序,它将在每个连接上被调用。查看传入的连接请求(self.request
保存请求信息),并解析正在请求的命令。
class DispatcherHandler(SocketServer.BaseRequestHandler):
"""
The RequestHandler class for our dispatcher.
This will dispatch test runners against the incoming commit
and handle their requests and test results
"""
command_re = re.compile(r"(\w+)(:.+)*")
BUF_SIZE = 1024
def handle(self):
self.data = self.request.recv(self.BUF_SIZE).strip()
command_groups = self.command_re.match(self.data)
if not command_groups:
self.request.sendall("Invalid command")
return
command = command_groups.group(1)
它处理四个命令:status
,register
,dispatch
和results
。status
检查分派器服务器是否已启动并运行。
if command == "status":
print "in status"
self.request.sendall("OK")
为了让调度员做有用的事情,它需要注册至少一个test runner。当在主机端口对调用寄存器时,将运行程序的信息存储在列表中(连接ThreadingTCPServer
对象的运行程序对象),以便稍后可以与运行程序通信。
elif command == "register":
# Add this test runner to our pool
print "register"
address = command_groups.group(2)
host, port = re.findall(r":(\w*)", address)
runner = {"host": host, "port":port}
self.server.runners.append(runner)
self.request.sendall("OK")
repository observer使用dispatch
派生test runner。此命令的格式是dispatch:<commit ID>
。调度员从消息中解析出提交ID,并将其发送给test runner。
elif command == "dispatch":
print "going to dispatch"
commit_id = command_groups.group(2)[1:]
if not self.server.runners:
self.request.sendall("No runners are registered")
else:
# The coordinator can trust us to dispatch the test
self.request.sendall("OK")
dispatch_tests(self.server, commit_id)
test runner使用results
报告测试运行的结果。此命令为:results:<commit ID>:<length of results data in bytes>:<results>
。<commit ID>
用于标识测试运行的提交ID。<length of results data in bytes>
用于确定结果数据需要多大的缓冲区。<results>
保存实际的结果输出。
elif command == "results":
print "got test results"
results = command_groups.group(2)[1:]
results = results.split(":")
commit_id = results[0]
length_msg = int(results[1])
# 3 is the number of ":" in the sent command
remaining_buffer = self.BUF_SIZE - \
(len(command) + len(commit_id) + len(results[1]) + 3)
if length_msg > remaining_buffer:
self.data += self.request.recv(length_msg - remaining_buffer).strip()
del self.server.dispatched_commits[commit_id]
if not os.path.exists("test_results"):
os.makedirs("test_results")
with open("test_results/%s" % commit_id, "w") as f:
data = self.data.split(":")[3:]
data = "\n".join(data)
f.write(data)
self.request.sendall("OK")
Test Runner (test_runner.py
)
test runner负责根据给定的提交ID运行测试并报告结果。它只与调度程序服务器通信,调度程序服务器负责为其提供运行的提交ID,并且将接收测试结果。
当调用test_runner.py
时,调用启动测试运行器服务器的服务功能,并启动一个线程来运行dispatcher_checker
函数。由于此启动过程与repo_observer.py
和dispatcher.py
中描述的过程非常相似,因此在此省略说明。
dispatcher_checker
函数每隔五秒钟ping一次调度服务器,以确保它仍然处于运行状态。这对于资源管理非常重要。如果调度员失败,则关闭test runner,因为如果没有调度员给它工作或报告,则无法进行任何有意义的工作。
def dispatcher_checker(server):
while not server.dead:
time.sleep(5)
if (time.time() - server.last_communication) > 10:
try:
response = helpers.communicate(
server.dispatcher_server["host"],
int(server.dispatcher_server["port"]),
"status")
if response != "OK":
print "Dispatcher is no longer functional"
server.shutdown()
return
except socket.error as e:
print "Can't communicate with dispatcher: %s" % e
server.shutdown()
return
test runner是一个ThreadingTCPServer
,就像调度服务器一样。它需要线程,因为调度员不仅会给它一个提交ID来运行,而且会在运行测试时定期ping运行程序,以验证它是否仍然处于起始状态。
class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
dispatcher_server = None # Holds the dispatcher server host/port information
last_communication = None # Keeps track of last communication from dispatcher
busy = False # Status flag
dead = False # Status flag
通信流从调度程序开始,请求转发程序接受提交ID以运行。如果测试运行程序准备好运行,则会对调度程序服务器作出响应,然后关闭连接。为了使测试运行器服务器都运行测试并接受来自调度程序的更多请求,它将在新线程上启动所请求的测试作业。
这意味着当调度服务器发出请求并期望响应时,它将在单独的线程上完成,而测试运行器忙于在自己的线程上运行测试。允许测试运行器服务器同时处理多个任务。代替这种线程设计,可以让调度器服务器保持与每个测试运行程序的连接,但这将增加调度程序服务器的内存需求,且容易受到网络问题的影响,例如意外丢弃的连接。
测试运行器服务器响应来自分派器的两个消息。一个是ping,调度程序服务器用来验证该运行程序是否仍然处于活动状态。
class TestHandler(SocketServer.BaseRequestHandler):
...
def handle(self):
....
if command == "ping":
print "pinged"
self.server.last_communication = time.time()
self.request.sendall("pong")
另一个是runtest
,它接收runtest:<commit ID>
的消息,用于启动给定提交的测试。当调用runtest
时,test runner检查它是否在运行测试,如果是,返回一个BUSY
响应给调度程序。如果可用,它将用OK
消息响应服务器,将其状态设置为忙,并运行其run_tests
。
elif command == "runtest":
print "got runtest command: am I busy? %s" % self.server.busy
if self.server.busy:
self.request.sendall("BUSY")
else:
self.request.sendall("OK")
print "running"
commit_id = command_groups.group(2)[1:]
self.server.busy = True
self.run_tests(commit_id,
self.server.repo_folder)
self.server.busy = False
此函数调用shell脚本test_runner_script.sh
,将存储库更新为给定的提交ID。一旦脚本返回,如果成功更新存储库,使用unittest
运行测试,并将结果收集到文件中。当测试运行完成时,test runner读取结果文件,并将其发送到调度员的结果消息中。
def run_tests(self, commit_id, repo_folder):
# update repo
output = subprocess.check_output(["./test_runner_script.sh",
repo_folder, commit_id])
print output
# run the tests
test_folder = os.path.join(repo_folder, "tests")
suite = unittest.TestLoader().discover(test_folder)
result_file = open("results", "w")
unittest.TextTestRunner(result_file).run(suite)
result_file.close()
result_file = open("results", "r")
# give the dispatcher the results
output = result_file.read()
helpers.communicate(self.server.dispatcher_server["host"],
int(self.server.dispatcher_server["port"]),
"results:%s:%s:%s" % (commit_id, len(output), output))
test_runner_script.sh
代码:
#!/bin/bash
REPO=$1
COMMIT=$2
source run_or_fail.sh
run_or_fail "Repository folder not found" pushd "$REPO" 1> /dev/null
run_or_fail "Could not clean repository" git clean -d -f -x
run_or_fail "Could not call git pull" git pull
run_or_fail "Could not update to given commit hash" git reset --hard "$COMMIT"
为了运行test_runner.py
,你必须将其指向存储库的克隆以对其进行测试。这种情况下,你可以使用先前创建的/path/to/test_repo test_repo_clone_runner
克隆。默认情况下,test_runner.py
将使用8900-9000范围内的端口在本地主机上启动自己的服务器,并尝试通过localhost:8888
连接到调度服务器。可以传递可选参数来更改这些值。--host
和--port
参数用于指定运行test runner服务器的特定地址,-dispatcher-server
参数指定调度程序的地址。
控制流程图
图2.1是该系统的概况图。该图假设所有三个文件(repo_observer.py
,dispatcher.py
和test_runner.py
)都在运行,并描述了每个进程在新提交时所执行的操作。
运行代码
我们可以在本地运行这个简单的CI系统,每个进程使用三个不同的终端shell。首先启动调度程序,在端口8888上运行:
$ python dispatcher.py
在新的shell中,我们启动test runner(所以它可以向调度程序注册):
$ python test_runner.py <path/to/test_repo_clone_runner>
测试运行器将端口分配给自己,范围为8900-9000。你可以运行尽可能多的test runners。
最后,在另一个新的shell中,我们来启动repo obserever:
$ python repo_observer.py --dispatcher-server=localhost:8888 <path/to/repo_clone_obs>
现在配置好一切,让我们开始一些测试!为此,我们需要做一个新的提交。转到主存储库并进行任意更改:
$ cd /path/to/test_repo
$ touch new_file
$ git add new_file
$ git commit -m"new file" new_file
repo_observer.py
会意识到有一个新的提交并通知调度程序。你可以在各自的shell中看到输出,以便监视。一旦调度程序接收到测试结果,将其存储在此代码库中的test_results/
文件夹中,使用提交ID作为文件名。
错误处理
该CI系统包括一些简单的错误处理。
如果你杀死了test_runner.py
进程,则dispatcher.py
将会知道这个运行程序不再可用,并且将它删除。
您也可以杀死test runner,以模拟机器崩溃或网络故障。这样,调度员将会意识到runner下降,如果在池中可用,则会分配给另一个test runner,或者等待新的test runner在池中注册。
如果你杀死了调度程序,那么repository observer就会知道它下降,并且会抛出异常。test runner还会注意到并关闭。
结论
通过分解问题,我们能够构建分布式连续集成系统的基础。随着进程通过套接字请求相互通信,能够跨多个机器分发系统,有助于使我们的系统更可靠和可扩展。
由于现在的CI系统非常简单,你可以自己扩展功能。以下是一些改进建议:
运行预提交测试
当前系统定期检查以查看新的提交是否运行,并运行最近的提交。应该改进这一点来测试每个提交。为此,你可以修改定期检查器,以便在最后一次测试和最后一次提交之间的日志中为每个提交分派测试运行。
Smarter Test Runners
如果测试运行程序检测到调度程序没有响应,它将停止运行。即使test runner在运行测试中也会发生这种情况!如果测试者等待一段时间(或永远地,如果你不在乎资源管理),调度员将重新上线,这会更好。在这种情况下,如果调度员在test runner运行测试时下降,而不是关闭它完成测试,并等待调度程序重新联机,报告结果。这将确保我们不会浪费test runner所做的任何努力,只在提交时运行一次测试。
Real Reporting
在一个真正的CI系统中,您可以将测试结果报告给记者服务,收集结果,将其发布到某个地方进行审核,并在发生故障或其他显着事件时通知有关方面的列表。您可以通过创建一个新过程来获取报告结果来扩展我们的简单CI系统,而不是调度员收集结果。这个新进程可能是一个Web服务器(或可以连接到Web服务器),可以在线发布结果,并且可以使用邮件服务器来提醒用户任何测试失败。
Test Runner Manager
现在,你必须手动启动test_runner.py
文件以启动test runner。相反,也可以创建一个test runner manager进程,该过程将评估调度员的测试请求的当前负载,并相应地缩放test runners数量。此过程收到最新的消息,并将为每个请求启动一个测试转移程序进程,在负载减少时杀死未使用的进程。
你可以让这个简单的CI系统更加强大和容错,也可以将其与其他系统(如基于Web的test report)集成。
如果你想看到持续集成系统可以实现的灵活性水平,建议你查看Jenkins,这是一个用Java编写的非常强大的开源CI系统。它为你提供了一个基本的CI系统,也可以使用插件扩展。也可以通过GitHub访问其源代码。另一个推荐的项目是Travis CI,它是用Ruby编写的,其源代码也可以通过GitHub获得。
这是一个理解CI系统如何工作以及如何自己构建的练习。你现在应该更加牢固地了解建立可靠的分布式系统所需的内容,可以使用这些知识来开发更复杂的解决方案。
引用
- Bash is used because we need to check file existence, create files, and use Git, and a shell script is the most direct and easy way to achieve this. Alternatively, there are cross-platform Python packages you can use; for example, Python’s os built-in module can be used for accessing the file system, and GitPython can be used for Git access, but they perform actions in a more roundabout way.