import pika import time import datetime import xlrd import json import numpy as np from flask import Flask, request, jsonify from flask_restful import Api, Resource, reqparse from flask import Flask from flask_cors import CORS from threading import Thread from os.path import dirname, abspath app = Flask(__name__) CORS(app) api = Api(app) global BASE_DIR BASE_DIR="./" global RUNNING RUNNING=False def data_send(): global BASE_DIR global RUNNING json_file = BASE_DIR+'/producerConfig.json' with open(json_file, 'r', encoding="utf-8") as f: # 读取demp.json文件内容 j = json.load(f) exchange_name = j['exchange_name'] test_num = j['num'] unit_list = j['unit'] sys_list = j['system'] node_list = j['node'] param_list = j['param'] print(param_list) data_type_list = j['type'] data_file_list = j['filename'] data_num = j['data_num'] time_step = j['time_step'] pause_points = j['pause_points'] pause_time = j['pause_time'] pack_size = j['pack_size'] user_pwd = pika.PlainCredentials("sjtuadmin", "SjtuAdmin$220731") #connection = pika.BlockingConnection(pika.ConnectionParameters(host='amqp.autobrowsers.com', virtual_host="sjtu", port=5672, credentials=user_pwd)) # user_pwd = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=user_pwd)) channel = connection.channel() channel.exchange_declare(exchange=exchange_name, exchange_type='topic', durable=True) # channel.queue_declare("", exclusive=True, durable=True) ## 读取数据源文件用于模拟发送 ## xls文件读取 data_source = [] for i in range(test_num): workbook = xlrd.open_workbook(BASE_DIR+"/"+data_file_list[i][2:]) # 获取所有sheet # sheet_name = workbook.sheet_names()[0] # 根据sheet索引或者名称获取sheet内容 sheet = workbook.sheet_by_index(0) # sheet索引从0开始 # sheet = workbook.sheet_by_name('Sheet1') # sheet的名称,行数,列数 print (sheet.name,sheet.nrows,sheet.ncols) # 获取整行和整列的值(数组) # rows = sheet.row_values(1) # 获取第2行内容 data_list = sheet.col_values(0) # 获取第0列内容 data_source.append(data_list) data_pack = {'data':[]} ## 发送数据 for i in range(data_num): if i in pause_points: time.sleep(pause_time) t = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') for k in range(test_num): d = {'time': t, 'value': '%.3f'%data_source[k][i], 'unit': unit_list[k], 'system': sys_list[k], 'node': node_list[k], 'param': param_list[k], 'type': data_type_list[k]} data_pack['data'].append(d) if len(data_pack['data']) == pack_size: msg = json.dumps(data_pack, ensure_ascii=False) print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) try: channel.basic_publish(exchange='sjtu_exchange', routing_key="", body=msg) except: pass # print(msg) data_pack['data'].clear() time.sleep(time_step) # 设置发送间隔时间 data_pack = {'data': []} msg = json.dumps(data_pack, ensure_ascii=False) try: channel.basic_publish(exchange='sjtu_exchange', routing_key="", body=msg) except: pass # print(msg) try: connection.close() except: pass RUNNING=False @app.route('/data/api', methods=['GET']) def email_temp_post(): global RUNNING if not RUNNING: RUNNING=True # data_send(BASE_DIR+'/producerConfig.json') x = Thread(target=data_send) x.daemon = True x.start() return jsonify({'code': 200, "message": "success"}) else: return jsonify({'code': 201, "message": "success"}) if __name__ == '__main__': BASE_DIR = dirname(abspath(__file__)) app.run(host='0.0.0.0',port=8111) import pika import sys import time import json import datetime from clickhouse_driver import Client import numpy as np global CH_CLIENT CH_CLIENT = None current_status = {} Insert_interval = [] def callback(ch, method, properties, body): # print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) data_str = body.decode("utf-8") data_dict = json.loads(data_str) data_list = data_dict['data'] if len(data_list) == 0: return insert_sql = "insert into sjtu_data_all values" for data in data_list: if data['param']=="油源电流": print(data) if float(data['value'])>10 or float(data['value'])<0: print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") if data['type'] == '01': tmp_key = data['node'] if tmp_key in current_status.keys(): if data['value'] == current_status[tmp_key]: continue else: current_status[tmp_key] = data['value'] else: current_status[tmp_key] = data['value'] t = data['time'][0:19] data_sql = "('%s','%s','%s','%s','%s','%s', %s)" % (t, data['unit'], data['system'], data['node'], data['param'], data['type'], data['value']) insert_sql = insert_sql+data_sql start = datetime.datetime.now() CH_CLIENT.execute(insert_sql) end = datetime.datetime.now() print(end - start) def consuming(): global CH_CLIENT CH_CLIENT = Client(host="localhost", port=9000, database="default",user="default" ,password="") create_table_sql = "create table if not exists sjtu_data_all (" \ "time DateTime, " \ "unit String, " \ "system String, " \ "node String, " \ "param String, " \ "type String, " \ "value Float64) " \ "engine=MergeTree " \ "order by (time);" res = CH_CLIENT.execute(create_table_sql) print(create_table_sql) print(res) user_pwd = pika.PlainCredentials("sjtuadmin", "SjtuAdmin$220731") # connection = pika.BlockingConnection(pika.ConnectionParameters(host='amqp.autobrowsers.com', virtual_host="sjtu", port=5672, credentials=user_pwd)) # user_pwd = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=user_pwd)) channel = connection.channel() channel.exchange_declare(exchange='sjtu_exchange', exchange_type='topic', durable=True) queue_name = "db_queue" result = channel.queue_declare(queue_name, durable=True) print("queuename:", queue_name) channel.queue_bind(exchange='sjtu_exchange',queue=queue_name,routing_key="") channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) print(' [*] Waiting for logs. To exit press CTRL+C') channel.start_consuming() connection.close() CH_CLIENT.disconnect() if __name__ == '__main__': while True: try: time.sleep(3) consuming() except Exception as e: print("error %s" % str(e)) pass cat > /etc/systemd/system/producer.service <&1 >> /var/log/producer.log Environment=PYTHONUNBUFFERED=1 Type=simple [Install] WantedBy=multi-user.target EOF systemctl daemon-reload systemctl enable producer systemctl restart producer cat > /etc/systemd/system/consumer.service <&1 >> /var/log/consumer.log Environment=PYTHONUNBUFFERED=1 Type=simple [Install] WantedBy=multi-user.target EOF systemctl daemon-reload systemctl enable consumer systemctl restart consumer