能力说明

  • 能力名称(中文):Python执行器
  • 能力名称(英文):HLPythonExecuteProcessor
  • 简称:Python执行器
  • 版本:1.0.0
  • 能力入口类名:com.hylanda.processors.HLPythonExecuteProcessor
  • 上线时间:2018-04-09 11:23:50
  • 作者:宋传宝
  • 贡献者:无
  • 编程语言:java、python
  • 权利所属公司:
  • 合作公司:无
  • 授权范围:公开
  • 能力分类:普通能力
  • 工程分类:数据工程
  • 能力级别:普通
  • 状态:启用
  • 标签:python,脚本执行

功能简介

根据能力提供的Python脚本编写规范,进行Python脚本编写,在平台中实现数据处理、数据采集等功能。 此能力执行Python脚本的技术原理为:使用eve包开启一个 Python进程的Restful服务,服务在初始化时会调用Python脚本中的init函数。 在执行期,java能力代码读取队列数据,用Http的Post请求方式请求calc函数的代码;执行期间,服务会通过hello函数,探测服务是否OK。

Python脚本临时存储在 /usr/local/nifi/pythonexecute/ 目录,此目录下必须存在一个脚本文件:settings.py,内容为: DOMAIN={'test': {}}

执行

配置参数

| 参数名称 | 参数类型 | 是否必填 | 默认值 | 参数说明 | | 能力端口 | int | true | | 能力端口 | | pip命令 | string | true | | pip命令 | | Python执行脚本 | string | false | | 待执行的python脚本 | | 执行模式 | string | true | | 队列模式、自循环模式 |

输入简介

能力端口: 指能力在启动Python的Eve Restful服务所开启的网络端口,如果同一任务节点下有多个Python执行器,需要将此端口设为不同的多个端口。

pip命令:指待执行的Python脚本依赖了外部包时,可以提前执行pip进行依赖包的安装。

Python执行脚本:指待执行的Python脚本代码,脚本代码需要按约定的规范编写,具体规范参考样例注释。

执行模式:队列模式(QueueTrigger)、自循环模式(SelfTrigger);当为队列模式时,消费之前队列中的数据; 自循环模式时,则能力不断主动触发Trigger事件,即能力不断调用能力中的 calc函数。

输入数据样例

输入参数: 能力端口:5000 pip命令:pip install BeautifulSoup4 触发模式:QueueTrigger

Python执行脚本代码样例

处理队列中Json数据的样例:

#!/usr/bin/python
# encoding=utf-8
import urllib2,socket,traceback
import os  # 此行请勿修改
from eve import Eve  # 此行请勿修改
from flask import request  # 此行请勿修改
import json  # 此行请勿修改
import time,random,Queue,thread
from bs4 import BeautifulSoup
import sys

reload(sys)
sys.setdefaultencoding('utf8')

app = Eve()  # 此行请勿修改

downUrlQue = Queue.Queue()

# 使用Request
def get_request(url):
    # 可以设置超时
    socket.setdefaulttimeout(30)
    # 可以加入参数    [无参数,使用get,以下这种方式,使用post]
    params = {"wd": "a", "b": "2"}
    # 可以加入请求头信息,以便识别
    i_headers = {"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5",
                 "Accept": "text/plain"}
    req = urllib2.Request(url, headers=i_headers)

    # 创建request后,还可以进行其他添加,若是key重复,后者生效
    # request.add_header('Accept','application/json')
    # 可以指定提交方式
    # request.get_method = lambda: 'PUT'
    try:
        page = urllib2.urlopen(req)
        data = page.read()
        pageUrl = page.geturl()
        page.close()
        return data, pageUrl
    except urllib2.HTTPError, e:
        print "Error Code:", e.code
    except urllib2.URLError, e:
        print "Error Reason:", e.reason
    except:
        print traceback.format_exc()
    return  "",""

def init():
    print 'global init'

@app.route('/hello', methods=['POST'])
def hello():
    return 'ok'

@app.route('/uninit', methods=['POST'])
def uninit():
    return 'global uninit'

@app.route('/calc', methods=['POST'])
def calc():
    inputJson = request.data  # 能力数据流,单次输入的json字符串
    inJsonObj = json.loads(inputJson)

    # ------自定义代码处, 请填写业务代码
    subUrl = ''
    # 例如 结巴分词代码
    if not inJsonObj.has_key('url'):
        return ''
    subUrl = inJsonObj['url']
    subHtml, pageUrl = get_request(subUrl)

    dictOne = {}
    dictOne['url'] = pageUrl
    subSoup = BeautifulSoup(subHtml, "html.parser")
    orgDiv = subSoup.select_one('div[data-reactid="28"]')
    if orgDiv != None:
        dictOne['org'] = orgDiv.getText()
        dictOne['zijin'] = ''
        zijinDiv = subSoup.select_one('div[data-reactid="48"]')
        if zijinDiv != None:
            dictOne['zijin'] = zijinDiv.getText()
        areaDiv = subSoup.select_one('div[data-reactid="71"]')
        dictOne['area'] = ''
        if areaDiv != None:
            dictOne['area'] = areaDiv.getText()
        dictOne['chengli'] = ''
        chengliDiv = subSoup.select_one('div[data-reactid="58"]')
        if chengliDiv != None:
            dictOne['chengli'] = chengliDiv.getText()
        dictOne['jingying'] = ''
        jingyingDiv = subSoup.select_one('div[data-reactid="92"]')
        if jingyingDiv != None:
            dictOne['jingying'] = jingyingDiv.getText()
        dictOne['html'] = urllib2.quote(subHtml)

        putJson = json.dumps(dictOne, encoding="utf-8", ensure_ascii=False, indent=4, separators=(',', ': '))
        return putJson
    return ''

自循环下载数据样例:

#!/usr/bin/python
# encoding=utf-8

import urllib2,socket,traceback
import os  # 此行请勿修改
from eve import Eve  # 此行请勿修改
from flask import request  # 此行请勿修改
import json  # 此行请勿修改
import time,random,Queue,thread
from bs4 import BeautifulSoup
import sys

reload(sys)
sys.setdefaultencoding('utf8')

app = Eve()  # 此行请勿修改

downUrlQue = Queue.Queue()

# 使用Request
def get_request(url):
    # 可以设置超时
    socket.setdefaulttimeout(30)
    # 可以加入参数    [无参数,使用get,以下这种方式,使用post]
    params = {"wd": "a", "b": "2"}
    # 可以加入请求头信息,以便识别
    i_headers = {"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5",
                 "Accept": "text/plain"}
    req = urllib2.Request(url, headers=i_headers)

    # 创建request后,还可以进行其他添加,若是key重复,后者生效
    # request.add_header('Accept','application/json')
    # 可以指定提交方式
    # request.get_method = lambda: 'PUT'
    try:
        page = urllib2.urlopen(req)
        data = page.read()
        pageUrl = page.geturl()
        page.close()
        return data, pageUrl
    except urllib2.HTTPError, e:
        print "Error Code:", e.code
    except urllib2.URLError, e:
        print "Error Reason:", e.reason
    except:
        print traceback.format_exc()
    return  "",""


#下载线程
def downThread(threadName, delay):
    try:
        for i in range(1, 1811):
            downUrl = 'https://www.dingtalk.com/qiye/'+str(i)+'.html'
            srcHtml,srcPage = get_request(downUrl)
            sleepSec = random.randrange(0, 3)
            time.sleep(sleepSec)

            if (srcHtml != None):
                # 解析网页,获得网页首页
                soup = BeautifulSoup(srcHtml, "html.parser")
                itemLinks = soup.select('a.item')

                listOrgs = []
                for item in itemLinks :
                    subUrl = item.get('href')

                    dictOne = {}
                    dictOne['org'] = item.getText()
                    dictOne['pagefrom'] = downUrl
                    dictOne['url'] = subUrl

                    listOrgs.append(dictOne)

                putJson = json.dumps(listOrgs,encoding="utf-8",ensure_ascii=False,indent=4, separators=(',', ': '))
                downUrlQue.put(putJson)
    except IOError, e:
        print "IOError:", e
    except:
        print traceback.format_exc()
    return ''



def init():
    try:
        thread.start_new_thread(downThread, ("Thread-1", 2,))
    except:
        print "Error: unable to start thread"
    print 'global init'


@app.route('/hello', methods=['POST'])
def hello():
    return 'ok'


@app.route('/uninit', methods=['POST'])
def uninit():
    return 'global uninit'


@app.route('/calc', methods=['POST'])
def calc():
    if not downUrlQue.empty():
        subJson = downUrlQue.get()
        return subJson
    return ''

# 以下代码,在能力输入框中请删除

输出简介

能力的输出为Python脚本完成后的Json字符串,也可以为Json数组,当为数组时必须以[开头。

输出数据样例

{
org: "福安市阿里巴巴烤鱼馆",
pagefrom: https://www.dingtalk.com/qiye/2.html,
url: https://www.dingtalk.com/qiye/ding1e6f2e316676cfec?token=d3bfbb8b0576d8b5be9200fe4b488999&orgName=%E7%A6%8F%E5%AE%89%E5%B8%82%E9%98%BF%E9%87%8C%E5%B7%B4%E5%B7%B4%E7%83%A4%E9%B1%BC%E9%A6%86
}

相关能力

模型序号

远程group url

初始化配置

计费说明

  • 能力计费方式:免费
  • 能力计费规则:免费
  • 关于计费的其他说明:免费

    参考指标

  • 处理性能:未知

参考环境

附加说明

版本迭代记录

2020-12-30更新记录

  1. 增加对桌面版的支持。

隐藏参数说明

results matching ""

    No results matching ""