能力说明
- 能力名称(中文):Python执行器团队版
- 能力名称(英文):PythonProcessor
- 简称:Python执行器团队版
- 版本:1.0.0
- 能力入口类名:com.hylanda.processors.crawl.http.PythonProcessor
- 上线时间:2024-04-16 14:00:00
- 作者:崔运海
- 贡献者:无
- 编程语言:java、python
- 合作公司:无
- 授权范围:公开
- 能力分类:普通能力
- 工程分类:数据工程
- 能力级别:普通
- 状态:启用
- 标签:python,脚本执行
功能简介
根据能力提供的Python脚本编写规范,进行Python脚本编写,在平台中实现数据处理、数据采集等功能。 此能力执行Python脚本的技术原理为:使用fastapi包开启一个 Python进程的api服务。 在执行期,java能力代码读取队列数据,用Http的Post请求方式请求process函数的代码。
执行
能力配置中选中的代码,如果有变更。有两种方法进行更新代码的获取:
一是手动获取,如果能力是停止状态,重新启动即可获取最新代码。如果能力是运行状态,手动停止能力再重新启动,即可获取最新代码。
二是自动获取,如果能力是运行状态,会自动进行代码变更的检查(约3-5分钟检查一次),发现有变化时自动获取最新代码。
能力执行过程中,能力上会显示查看python脚本报错信息的地址,如:http://IP:9966/show ,可直接在浏览器打开查看。 如果脚本修正错误后重新运行能力,刷新http://IP:9966/show页面就能捕捉到新的错误。
如果一个画布上有多个Python能力在使用,该页面左侧列表中也会显示多能力,点击对应能力,右侧显示其对应debug信息。
配置参数
| 参数名称 | 参数类型 | 是否必填 | 默认值 | 参数说明 |
|---|---|---|---|---|
| 任务内名称 | 文本 | 是 | 无 | 可自定义为任务内所需要的名称 |
| 并行任务数量 | 文本 | 否 | true | 此处理器可以同时调试的任务数 |
| 运行计划 | 文本 | 否 | 0 | 此项无需做修改,定时设置见“定时执行参数” |
| Python代码 | 文本 | 是 | 无 | python脚本代码 |
| 定时执行参数 | 文本 | 否 | 无 | 定时设置 |
如果在Python脚本知识管理页面,有新增或修改操作时,能力配置下拉列表内容将在3分钟内进行更新。 如果能力执行过程中,其代码有更新,在能力正常运行情况下会在3分钟左右进行自动更新成新版代码。
输入简介
Python执行脚本:指待执行的Python脚本代码,脚本代码需要按约定的规范编写,具体规范参考样例注释。
执行模式:队列模式(QueueTrigger)、自循环模式(SelfTrigger);当为队列模式时,消费之前队列中的数据; 自循环模式时,则能力不断主动触发Trigger事件,即能力不断调用能力中的 process函数。
输入数据样例
输入参数: 触发模式:QueueTrigger
Python执行脚本代码样例
处理队列中Json数据的样例:
#!/usr/bin/python
# encoding=utf-8
import traceback
import pymysql
# 数据库配置(1111可修改)
db_config = {"host": '127.0.0.1', "port": 3306, "user": 'root',
"password": 'hylanda', "database": 'sakila', "charset": "utf8"}
# 数据库连接,默认一定要为None
db_conn = None
db_cursor = None
# 尝试连接数据库,如果失败,则进行重连(修改)
def reconnect_db(retry=False):
global db_conn, db_cursor
if db_conn is None:
db_conn = pymysql.connect(**db_config)
db_cursor = db_conn.cursor()
return
if retry:
try:
db_conn.ping(reconnect=True)
except Exception as e:
print(e)
# 关闭数据库连接
def close():
global db_conn, db_cursor
try:
if db_cursor:
db_cursor.close()
db_cursor = None
except Exception as e:
pass
try:
if db_conn:
db_conn.close()
db_conn = None
except Exception as e:
pass
# 一个flow文件的json对象(入口处理)
def process(one_flow_file_json):
global db_conn, db_cursor
# 连接数据库2
reconnect_db()
# 返回值
ret = {}
try:
inJsonObj = one_flow_file_json
sql = f"SELECT url,release_date,weixin_name FROM `zpc_weixinshuju` where url = '{inJsonObj['url']}' order by download_date desc limit 1"
db_cursor.execute(sql)
results = db_cursor.fetchall()
# 将数据添加进Json串
for row in results:
inJsonObj['release_date'] = str(row[1])
inJsonObj['weixin_name'] = str(row[2])
ret = inJsonObj
except Exception as e:
# 如果数据库连接失败,则进行重连
reconnect_db(retry=True)
# 返回的结果,必须按照这个格式返回,能力会按照code自动分流。
traceback.print_exc()
filename = traceback.extract_tb(e.__traceback__)[-1].filename
lineno = traceback.extract_tb(e.__traceback__)[-1].lineno
last_err = f"Exception Type: {type(e)},Message: {str(e)},File: {filename},Line:{lineno}"
ret["__err__"] = last_err
finally:
return ret
输出简介
能力的输出为Python脚本完成后的Json字符串,也可以为Json数组,当为数组时必须以[开头。
输出数据样例
{
org: "福安市阿里巴巴烤鱼馆",
pagefrom: https://www.dingtalk.com/qiye/2.html,
url: https://www.dingtalk.com/qiye/ding1e6f2e316676cfec?token=d3bfbb8b0576d8b5be9200fe4b488999&orgName=%E7%A6%8F%E5%AE%89%E5%B8%82%E9%98%BF%E9%87%8C%E5%B7%B4%E5%B7%B4%E7%83%A4%E9%B1%BC%E9%A6%86
}
相关能力
python脚本知识页面化管理
在平台“我的知识”模块---python脚本选项中,进行脚本的管理操作。
模型序号
无
远程group url
无
初始化配置
无
计费说明
- 能力计费方式:免费
- 能力计费规则:免费
关于计费的其他说明:免费
参考指标
处理性能:未知
参考环境
附加说明
版本迭代记录
2024-04-16更新记录
较之前能力进行了资源占用优化和效率提升。
增加报错信息查看的页面。
能力配置优化,多脚本在能力配置中以下拉列表方式进行选择。
多Python脚本进行知识页面化管理。
隐藏参数说明
无


