需求描述
项目需求测试过程中,需要向Nginx服务器发送一些用例请求,然后查看对应的Nginx日志,判断是否存在特征内容,来判断任务是否执行成功。为了提升效率,需要将这一过程实现自动化。
实践环境
python教程 3.6.5
代码设计与实现
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
-
-
- '''
- @CreateTime: 2021/06https://cdn.jxasp.com:9143/image/26 9:05
- @Author : shouke
- '''
-
-
- import time
- import threading
- import subprocess
- from collections import deque
-
-
- def collect_nginx_log():
- global nginx_log_queue
- global is_tasks_compete
- global task_status
-
-
- args = 'tail -0f /usr/local/openresty/nginx/logs/access.log'
- while task_status != 'req_log_got':
- with subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines = True) as proc:
- log_for_req = ''
- outs, errs = '', ''
-
- try:
- outs, errs = proc.communicate(timeout=2)
- except subprocess.TimeoutExpired:
- print('获取nginx日志超时,正在重试')
- proc.kill()
- try:
- outs, errs = proc.communicate(timeout=5)
- except subprocess.TimeoutExpired:
- print('获取nginx日志超时,再次超时,停止重试')
- break
- finally:
- for line in outs.split('\n'):
- flag = '\"client_ip\":\"10.118.0.77\"' # 特征
- if flag in line: # 查找包含特征内容的日志
- log_for_req += line
-
- if task_status == 'req_finished':
- nginx_log_queue.append(log_for_req)
- task_status = 'req_log_got'
-
-
-
- def run_tasks(task_list):
- '''
- 运行任务
- :param task_list 任务列表
- '''
-
- global nginx_log_queue
- global is_tasks_compete
- global task_status
-
-
- for task in task_list:
- thread = threading.Thread(target=collect_nginx_log,
- name="collect_nginx_log")
- thread.start()
- time.sleep(1) # 执行任务前,让收集日志线程先做好准备
-
- print('正在执行任务:%s' % task.get('name'))
-
- # 执行Nginx任务请求
- # ...
-
- task_status = 'req_finished'
- time_to_wait = 0.1
- while task_status != 'req_log_got': # 请求触发的nginx日志收集未完成
- time.sleep(time_to_wait)
- time_to_wait += 0.01
- else:# 获取到用例请求触发的nginx日志
- if nginx_log_queue:
- nginx_log = nginx_log_queue.popleft()
- task_status = 'req_ready'
- # 解析日志
- # do something here
- # ...
- else:
- print('存储请求日志的队列为空')
- # do something here
- # ...
-
-
- if __name__ == '__main__':
- nginx_log_queue = deque()
- is_tasks_compete = False # 所有任务是否执行完成
-
- task_status = 'req_ready' # req_ready,req_finished,req_log_got # 存放执行次任务任务的一些状态
- print('###########################任务开始###########################')
-
- tast_list = [{'name':'test_task', 'other':'...'}]
- run_tasks(tast_list)
-
- is_tasks_compete = True
-
- current_active_thread_num = len(threading.enumerate())
- while current_active_thread_num != 1:
- time.sleep(2)
- current_active_thread_num = len(threading.enumerate())
- print('###########################任务完成###########################')
注意:
1、上述代码为啥不一步到位,直接 tail -0f /usr/local/openresty/nginx/logs/access.log | grep "特征内容"
呢?这是因为这样做无法获取到Nginx的日志
2、实践时发现,第一次执行proc.communicate(timeout=2)
获取日志时,总是无法获取,会超时,需要二次获取,并且timeout
设置太小时(实践时尝试过设置为1秒
),也会导致第二次执行时无法获取Nginx日志。
作者:授客
本文版权归原作者所有,仅供学习参考之用,转载请注明出处:,未经作者允许请务必保留此段声明!
作者:授客