#!/usr/bin/python3 '''=================================================================== 这个模块用于对 splash 服务进行封装,方便在 python 中使用。 版本:1.0 作者:陈进钱 日期:2023-12-18 ===================================================================''' import os import re import json import codecs import configparser from requests_html import HTMLSession from requests_html import HTML config = configparser.ConfigParser() # splash 基类 class SBase: def __init__(self): self.__lua_script = '' self.config = {} # 创建 ConfigParser 对象 self.root = os.path.dirname(os.path.abspath(__file__)) # 自动创建配置文件 dir = self.root + '/config' if not os.path.exists(dir): os.makedirs(dir) file_path = self.root + '/config/splash.json' if os.path.exists(file_path): file = codecs.open(file_path, 'r', 'utf-8') content = file.read() self.config = json.loads(content) file.close() else: self.config['description'] = 'This is splash config file.' self.config['server'] = 'localhost' self.config['port'] = '8050' content = json.dumps(self.config) with codecs.open(file_path, 'w', 'utf-8') as file: file.write(content) # 自动创建空的脚本文件 dir = self.root + '/scripts' if not os.path.exists(dir): os.makedirs(dir) # 这个代码要更新为一个通用代码 file_path = dir + '/main.lua' if os.path.exists(file_path): file = codecs.open(file_path, 'r', 'utf-8') self.__lua_script = file.read() file.close() else: with codecs.open(file_path, 'w', 'utf-8') as file: self.__lua_script = 'This is lua script file' file.write(self.__lua_script) def script(self): return self.__lua_script; def class_name(self): return type(self).__name__; def replace(self, source, param, value): return re.sub('{{[\s]*\$' + param + '[\s]*}}', value, source) # 向lua脚本传递参数变量 def set_params_for_lua(self, scripts, params): for param in params: scripts = self.replace(scripts, param, params[param]) return scripts ''' -------------------------------------------------------------------------------------- 本函数用于打开指定的网址。具体的网址、参数、等待就绪的网页元件、等待就绪的翻页元件、 headers 等参数默认为空。这些参数的任意一个为空时,则从配置文件中的相关类名项下提取。 本函数会调用 lua 主脚本文件,执行页面解析的lua脚本文件。该文件名称通过参数 parser 传递。 -------------------------------------------------------------------------------------- ''' def open(self): pass def open(self, scripts_js, pages=1, url='', params=None, wait_for='', page_element='', headers='', annoucement_type=''): if url == '': url = self.config['class'][self.class_name()]['url'] if params == None: params = self.config['class'][self.class_name()]['param'] if len(params) > 0: for param in params: url = self.replace(url, param, params[param]) if wait_for == '': wait_for = self.config['class'][self.class_name()]['wait_for'] if page_element =='': page_element = self.config['class'][self.class_name()]['page_element'] if headers == '': headers = self.config['class'][self.class_name()]['headers'] scripts = self.script() scripts = self.set_params_for_lua(scripts, { 'pages':str(pages), 'url':url, 'wait_for':wait_for, 'page_element':page_element, # 这个解析器要从通过参数传递 'scripts_js': scripts_js, 'announcement_type':annoucement_type }) # print(scripts) data = json.dumps({'lua_source':scripts}) splash_url = 'http://' + self.config['server'] + ':' + self.config['port'] + '/execute' r = HTMLSession().post(splash_url, headers=headers, data=data) return r