能力说明
- 能力名称(中文):Python执行器
- 能力名称(英文):HLPythonExecuteProcessor
- 简称:Python执行器
- 版本:1.0.0
- 能力入口类名:com.hylanda.processors.HLPythonExecuteProcessor
- 上线时间:2018-04-09 11:23:50
- 作者:宋传宝
- 贡献者:无
- 编程语言:java、python
- 权利所属公司:
- 合作公司:无
- 授权范围:公开
- 能力分类:普通能力
- 工程分类:数据工程
- 能力级别:普通
- 状态:启用
- 标签:python,脚本执行
功能简介
根据能力提供的Python脚本编写规范,进行Python脚本编写,在平台中实现数据处理、数据采集等功能。 此能力执行Python脚本的技术原理为:使用eve包开启一个 Python进程的Restful服务,服务在初始化时会调用Python脚本中的init函数。 在执行期,java能力代码读取队列数据,用Http的Post请求方式请求calc函数的代码;执行期间,服务会通过hello函数,探测服务是否OK。
Python脚本临时存储在 /usr/local/nifi/pythonexecute/ 目录,此目录下必须存在一个脚本文件:settings.py,内容为: DOMAIN={'test': {}}
执行
配置参数
| 参数名称 | 参数类型 | 是否必填 | 默认值 | 参数说明 | | 能力端口 | int | true | | 能力端口 | | pip命令 | string | true | | pip命令 | | Python执行脚本 | string | false | | 待执行的python脚本 | | 执行模式 | string | true | | 队列模式、自循环模式 |
输入简介
能力端口: 指能力在启动Python的Eve Restful服务所开启的网络端口,如果同一任务节点下有多个Python执行器,需要将此端口设为不同的多个端口。
pip命令:指待执行的Python脚本依赖了外部包时,可以提前执行pip进行依赖包的安装。
Python执行脚本:指待执行的Python脚本代码,脚本代码需要按约定的规范编写,具体规范参考样例注释。
执行模式:队列模式(QueueTrigger)、自循环模式(SelfTrigger);当为队列模式时,消费之前队列中的数据; 自循环模式时,则能力不断主动触发Trigger事件,即能力不断调用能力中的 calc函数。
输入数据样例
输入参数: 能力端口:5000 pip命令:pip install BeautifulSoup4 触发模式:QueueTrigger
Python执行脚本代码样例
处理队列中Json数据的样例:
#!/usr/bin/python
# encoding=utf-8
import urllib2,socket,traceback
import os # 此行请勿修改
from eve import Eve # 此行请勿修改
from flask import request # 此行请勿修改
import json # 此行请勿修改
import time,random,Queue,thread
from bs4 import BeautifulSoup
import sys
reload(sys)
sys.setdefaultencoding('utf8')
app = Eve() # 此行请勿修改
downUrlQue = Queue.Queue()
# 使用Request
def get_request(url):
# 可以设置超时
socket.setdefaulttimeout(30)
# 可以加入参数 [无参数,使用get,以下这种方式,使用post]
params = {"wd": "a", "b": "2"}
# 可以加入请求头信息,以便识别
i_headers = {"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5",
"Accept": "text/plain"}
req = urllib2.Request(url, headers=i_headers)
# 创建request后,还可以进行其他添加,若是key重复,后者生效
# request.add_header('Accept','application/json')
# 可以指定提交方式
# request.get_method = lambda: 'PUT'
try:
page = urllib2.urlopen(req)
data = page.read()
pageUrl = page.geturl()
page.close()
return data, pageUrl
except urllib2.HTTPError, e:
print "Error Code:", e.code
except urllib2.URLError, e:
print "Error Reason:", e.reason
except:
print traceback.format_exc()
return "",""
def init():
print 'global init'
@app.route('/hello', methods=['POST'])
def hello():
return 'ok'
@app.route('/uninit', methods=['POST'])
def uninit():
return 'global uninit'
@app.route('/calc', methods=['POST'])
def calc():
inputJson = request.data # 能力数据流,单次输入的json字符串
inJsonObj = json.loads(inputJson)
# ------自定义代码处, 请填写业务代码
subUrl = ''
# 例如 结巴分词代码
if not inJsonObj.has_key('url'):
return ''
subUrl = inJsonObj['url']
subHtml, pageUrl = get_request(subUrl)
dictOne = {}
dictOne['url'] = pageUrl
subSoup = BeautifulSoup(subHtml, "html.parser")
orgDiv = subSoup.select_one('div[data-reactid="28"]')
if orgDiv != None:
dictOne['org'] = orgDiv.getText()
dictOne['zijin'] = ''
zijinDiv = subSoup.select_one('div[data-reactid="48"]')
if zijinDiv != None:
dictOne['zijin'] = zijinDiv.getText()
areaDiv = subSoup.select_one('div[data-reactid="71"]')
dictOne['area'] = ''
if areaDiv != None:
dictOne['area'] = areaDiv.getText()
dictOne['chengli'] = ''
chengliDiv = subSoup.select_one('div[data-reactid="58"]')
if chengliDiv != None:
dictOne['chengli'] = chengliDiv.getText()
dictOne['jingying'] = ''
jingyingDiv = subSoup.select_one('div[data-reactid="92"]')
if jingyingDiv != None:
dictOne['jingying'] = jingyingDiv.getText()
dictOne['html'] = urllib2.quote(subHtml)
putJson = json.dumps(dictOne, encoding="utf-8", ensure_ascii=False, indent=4, separators=(',', ': '))
return putJson
return ''
自循环下载数据样例:
#!/usr/bin/python
# encoding=utf-8
import urllib2,socket,traceback
import os # 此行请勿修改
from eve import Eve # 此行请勿修改
from flask import request # 此行请勿修改
import json # 此行请勿修改
import time,random,Queue,thread
from bs4 import BeautifulSoup
import sys
reload(sys)
sys.setdefaultencoding('utf8')
app = Eve() # 此行请勿修改
downUrlQue = Queue.Queue()
# 使用Request
def get_request(url):
# 可以设置超时
socket.setdefaulttimeout(30)
# 可以加入参数 [无参数,使用get,以下这种方式,使用post]
params = {"wd": "a", "b": "2"}
# 可以加入请求头信息,以便识别
i_headers = {"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5",
"Accept": "text/plain"}
req = urllib2.Request(url, headers=i_headers)
# 创建request后,还可以进行其他添加,若是key重复,后者生效
# request.add_header('Accept','application/json')
# 可以指定提交方式
# request.get_method = lambda: 'PUT'
try:
page = urllib2.urlopen(req)
data = page.read()
pageUrl = page.geturl()
page.close()
return data, pageUrl
except urllib2.HTTPError, e:
print "Error Code:", e.code
except urllib2.URLError, e:
print "Error Reason:", e.reason
except:
print traceback.format_exc()
return "",""
#下载线程
def downThread(threadName, delay):
try:
for i in range(1, 1811):
downUrl = 'https://www.dingtalk.com/qiye/'+str(i)+'.html'
srcHtml,srcPage = get_request(downUrl)
sleepSec = random.randrange(0, 3)
time.sleep(sleepSec)
if (srcHtml != None):
# 解析网页,获得网页首页
soup = BeautifulSoup(srcHtml, "html.parser")
itemLinks = soup.select('a.item')
listOrgs = []
for item in itemLinks :
subUrl = item.get('href')
dictOne = {}
dictOne['org'] = item.getText()
dictOne['pagefrom'] = downUrl
dictOne['url'] = subUrl
listOrgs.append(dictOne)
putJson = json.dumps(listOrgs,encoding="utf-8",ensure_ascii=False,indent=4, separators=(',', ': '))
downUrlQue.put(putJson)
except IOError, e:
print "IOError:", e
except:
print traceback.format_exc()
return ''
def init():
try:
thread.start_new_thread(downThread, ("Thread-1", 2,))
except:
print "Error: unable to start thread"
print 'global init'
@app.route('/hello', methods=['POST'])
def hello():
return 'ok'
@app.route('/uninit', methods=['POST'])
def uninit():
return 'global uninit'
@app.route('/calc', methods=['POST'])
def calc():
if not downUrlQue.empty():
subJson = downUrlQue.get()
return subJson
return ''
# 以下代码,在能力输入框中请删除
输出简介
能力的输出为Python脚本完成后的Json字符串,也可以为Json数组,当为数组时必须以[开头。
输出数据样例
{
org: "福安市阿里巴巴烤鱼馆",
pagefrom: https://www.dingtalk.com/qiye/2.html,
url: https://www.dingtalk.com/qiye/ding1e6f2e316676cfec?token=d3bfbb8b0576d8b5be9200fe4b488999&orgName=%E7%A6%8F%E5%AE%89%E5%B8%82%E9%98%BF%E9%87%8C%E5%B7%B4%E5%B7%B4%E7%83%A4%E9%B1%BC%E9%A6%86
}
相关能力
模型序号
无
远程group url
无
初始化配置
无
计费说明
- 能力计费方式:免费
- 能力计费规则:免费
关于计费的其他说明:免费
参考指标
处理性能:未知
参考环境
附加说明
版本迭代记录
2020-12-30更新记录
- 增加对桌面版的支持。
隐藏参数说明
无