能力说明

  • 能力名称(中文):Python执行器团队版
  • 能力名称(英文):PythonProcessor
  • 简称:Python执行器团队版
  • 版本:1.0.0
  • 能力入口类名:com.hylanda.processors.crawl.http.PythonProcessor
  • 上线时间:2024-04-16 14:00:00
  • 作者:崔运海
  • 贡献者:无
  • 编程语言:java、python
  • 合作公司:无
  • 授权范围:公开
  • 能力分类:普通能力
  • 工程分类:数据工程
  • 能力级别:普通
  • 状态:启用
  • 标签:python,脚本执行

功能简介

根据能力提供的Python脚本编写规范,进行Python脚本编写,在平台中实现数据处理、数据采集等功能。 此能力执行Python脚本的技术原理为:使用fastapi包开启一个 Python进程的api服务。 在执行期,java能力代码读取队列数据,用Http的Post请求方式请求process函数的代码。

执行

能力配置中选中的代码,如果有变更。有两种方法进行更新代码的获取:

一是手动获取,如果能力是停止状态,重新启动即可获取最新代码。如果能力是运行状态,手动停止能力再重新启动,即可获取最新代码。

二是自动获取,如果能力是运行状态,会自动进行代码变更的检查(约3-5分钟检查一次),发现有变化时自动获取最新代码。

能力执行过程中,能力上会显示查看python脚本报错信息的地址,如:http://IP:9966/show ,可直接在浏览器打开查看。 如果脚本修正错误后重新运行能力,刷新http://IP:9966/show页面就能捕捉到新的错误。

如果一个画布上有多个Python能力在使用,该页面左侧列表中也会显示多能力,点击对应能力,右侧显示其对应debug信息。

配置参数

参数名称 参数类型 是否必填 默认值 参数说明
任务内名称 文本 可自定义为任务内所需要的名称
并行任务数量 文本 true 此处理器可以同时调试的任务数
运行计划 文本 0 此项无需做修改,定时设置见“定时执行参数”
Python代码 文本 python脚本代码
定时执行参数 文本 定时设置

如果在Python脚本知识管理页面,有新增或修改操作时,能力配置下拉列表内容将在3分钟内进行更新。 如果能力执行过程中,其代码有更新,在能力正常运行情况下会在3分钟左右进行自动更新成新版代码。

输入简介

Python执行脚本:指待执行的Python脚本代码,脚本代码需要按约定的规范编写,具体规范参考样例注释。

执行模式:队列模式(QueueTrigger)、自循环模式(SelfTrigger);当为队列模式时,消费之前队列中的数据; 自循环模式时,则能力不断主动触发Trigger事件,即能力不断调用能力中的 process函数。

输入数据样例

输入参数: 触发模式:QueueTrigger

Python执行脚本代码样例

处理队列中Json数据的样例:

#!/usr/bin/python
# encoding=utf-8
import traceback

import pymysql

# 数据库配置(1111可修改)
db_config = {"host": '127.0.0.1', "port": 3306, "user": 'root',
             "password": 'hylanda', "database": 'sakila', "charset": "utf8"}
# 数据库连接,默认一定要为None
db_conn = None
db_cursor = None


# 尝试连接数据库,如果失败,则进行重连(修改)
def reconnect_db(retry=False):
    global db_conn, db_cursor
    if db_conn is None:
        db_conn = pymysql.connect(**db_config)
        db_cursor = db_conn.cursor()
        return
    if retry:
        try:
            db_conn.ping(reconnect=True)
        except Exception as e:
            print(e)


# 关闭数据库连接
def close():
    global db_conn, db_cursor
    try:
        if db_cursor:
            db_cursor.close()
            db_cursor = None
    except Exception as e:
        pass
    try:
        if db_conn:
            db_conn.close()
            db_conn = None
    except Exception as e:
        pass


# 一个flow文件的json对象(入口处理)
def process(one_flow_file_json):
    global db_conn, db_cursor
    # 连接数据库2
    reconnect_db()
    # 返回值
    ret = {}
    try:
        inJsonObj = one_flow_file_json
        sql = f"SELECT url,release_date,weixin_name FROM `zpc_weixinshuju` where url = '{inJsonObj['url']}' order by download_date desc limit 1"
        db_cursor.execute(sql)
        results = db_cursor.fetchall()

        # 将数据添加进Json串
        for row in results:
            inJsonObj['release_date'] = str(row[1])
            inJsonObj['weixin_name'] = str(row[2])

        ret = inJsonObj

    except Exception as e:
        # 如果数据库连接失败,则进行重连
        reconnect_db(retry=True)
        # 返回的结果,必须按照这个格式返回,能力会按照code自动分流。
        traceback.print_exc()
        filename = traceback.extract_tb(e.__traceback__)[-1].filename
        lineno = traceback.extract_tb(e.__traceback__)[-1].lineno
        last_err = f"Exception Type: {type(e)},Message: {str(e)},File: {filename},Line:{lineno}"
        ret["__err__"] = last_err
    finally:
        return ret

输出简介

能力的输出为Python脚本完成后的Json字符串,也可以为Json数组,当为数组时必须以[开头。

输出数据样例

{
org: "福安市阿里巴巴烤鱼馆",
pagefrom: https://www.dingtalk.com/qiye/2.html,
url: https://www.dingtalk.com/qiye/ding1e6f2e316676cfec?token=d3bfbb8b0576d8b5be9200fe4b488999&orgName=%E7%A6%8F%E5%AE%89%E5%B8%82%E9%98%BF%E9%87%8C%E5%B7%B4%E5%B7%B4%E7%83%A4%E9%B1%BC%E9%A6%86
}

相关能力

python脚本知识页面化管理

在平台“我的知识”模块---python脚本选项中,进行脚本的管理操作。

模型序号

远程group url

初始化配置

计费说明

  • 能力计费方式:免费
  • 能力计费规则:免费
  • 关于计费的其他说明:免费

    参考指标

  • 处理性能:未知

参考环境

附加说明

版本迭代记录

2024-04-16更新记录

  1. 较之前能力进行了资源占用优化和效率提升。

  2. 增加报错信息查看的页面。

  3. 能力配置优化,多脚本在能力配置中以下拉列表方式进行选择。

  4. 多Python脚本进行知识页面化管理。

隐藏参数说明

results matching ""

    No results matching ""