1.每日爬取数据后,将爬取的公告通知,同步到数据库
2.检索当日的公告信息,查看是否有科技创新企业,如果有则提醒通知(提醒未写,简单完善数据)
取的关键词模糊搜索+排除词排除掉无关条目,来查找响应数据
检索是否有科技创新企业的公告/通知
# 检查当日数据是否有科创企业名录
import re
import time
import pymysql
import requests
from gxt_spider import get_industry
from kjt_spider import get_sci_kjt
from sdszf_spider import get_sci_sdszf
from jinja2 import Template
import json
def connect_to_database():
connection = pymysql.connect(
host='127.0.0.1',
user='root',
password='123456',
database='my_database_test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return connection
def query_today_kc_enterprises():
keywords = [
"科技型中小企业",
"高新技术企业",
"众创空间",
"科技领军企业",
"技术先进型服务企业",
"技术创新示范企业",
"专精特新",
"科技企业",
"瞪羚",
"独角兽",
"科技小巨人企业",
'小巨人']
not_contain_keywords = ["取消","组织申报","认定和复核","申报","补助名单","绩效评价"]
sql = build_sql_query(keywords, not_contain_keywords)
connection = connect_to_database()
try:
with connection.cursor() as cursor:
cursor.execute(sql)
results = cursor.fetchall()
return {
"total": len(results),
"list": results
}
finally:
connection.close()
def build_sql_query(keywords, not_contain_keywords):
like_conditions = " OR ".join([f"title LIKE '%{keyword}%'" for keyword in keywords])
not_like_conditions = " and ".join([f"title NOT LIKE '%{not_contain_keyword}%'" for not_contain_keyword in not_contain_keywords])
sql = f"""
SELECT
CASE type
WHEN '1' THEN '山东省科学技术厅'
WHEN '2' THEN '山东省工业和技术化厅'
WHEN '3' THEN '山东省人民政府'
ELSE '未知类型'
END AS type_name,date,title,url FROM `sci_spider`
WHERE ({like_conditions})
AND ({not_like_conditions})
AND DATE(create_date) = DATE(NOW())
"""
return sql
def mail_sender(content):
import smtplib
from email.mime.text import MIMEText
from email.header import Header
# 第三方 SMTP 服务
mail_host = "smtp.163.com" # 设置服务器
mail_user = "18631839859@163.com" # 用户名
mail_pass = "GENGs7dM45TJDH6y" # 口令
sender = '18631839859@163.com'
receivers = ['wonder1999@126.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
# message = MIMEText(content, 'plain', 'utf-8')
message = MIMEText(content, 'html', 'utf-8')
message['From'] = Header("科技型中小企业通知", 'utf-8')
message['To'] = Header("科技型中小企业", 'utf-8')
subject = '科技型中小企业通知'
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host, 25) # 25 为 SMTP 端口号
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, message.as_string())
print("邮件发送成功")
except smtplib.SMTPException:
print("Error: 无法发送邮件")
def wx_web_hook(data):
"""
通过企业微信Webhook发送Markdown格式的消息
:param data: 包含通知数据的字典,结构应包含'total'和'list'键
:return: None
"""
# Webhook地址(请替换为你的实际Key)
webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=ef84945d-2247-4f09-ac0b-be7a6607c24e"
# 构造Markdown内容
content = f"**找到 {data['total']} 条疑似符合条件的记录:**\n"
for row in data['list']:
content += (
f"- [{row['title']}]({row['url']}) "
f"<font color=\"comment\">{row['date']}</font> "
f"<font color=\"warning\">{row['type_name']}</font>\n"
)
# 构建请求体
payload = {
"msgtype": "markdown",
"markdown": {
"content": content
}
}
# 发送请求并处理响应
try:
response = requests.post(webhook_url, json=payload)
response.raise_for_status() # 抛出HTTP错误
result = response.json()
if result.get("errcode") == 0:
print("✅ 消息发送成功")
else:
print(f"❌ 消息发送失败: {result.get('errmsg')}")
except requests.exceptions.RequestException as e:
print(f"⚠️ 请求异常: {e}")
if __name__ == '__main__':
get_industry(1, 2)
get_sci_kjt(1, 1)
get_sci_sdszf(1, 3)
data = query_today_kc_enterprises()
title = f"找到 {data['total']} 条疑似符合条件的记录:"
for row in data['list']:
print(row)
if data['total'] > 0:
wx_web_hook(data)
# mail_sender('测试消息')
工信厅爬虫
import re
import time
import pymysql
import requests
# 数据库链接
def connect_to_database():
connection = pymysql.connect(
host='127.0.0.1',
user='root',
password='123456',
database='my_database_test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return connection
def find_new_date():
connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = "SELECT date FROM `sci_spider` WHERE type = '2' ORDER BY DATE(date) DESC LIMIT 0,1"
cursor.execute(sql)
results = cursor.fetchall()
return results[0]['date']
except Exception as e:
return ''
connection.close()
finally:
connection.close()
def get_industry(page_num, type):
url = (f'http://gxt.shandong.gov.cn/col/col15201/index.html?uid=586830&pageNum={page_num}')
user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
headers = {
"Referer": None,
"User-Agent": user_Agent
}
while True:
try:
response = requests.get(url=url, headers=headers)
response.encoding = 'utf-8'
response = response.text
break
except:
print("请求失败,尝试睡眠一会(半小时)")
sleep_time = 60 * 30
time.sleep(sleep_time)
print("睡眠结束,继续运行...")
continue
da = re.findall(r'<div class="bottom"> <span> (.*?) </span>', response)
in_url = re.findall(r'target="_blank" href="(.*?)">', response)
content = re.findall(r'<a title="(.*?)" target="_blank"', response)
for i in range(0, len(da)):
print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + in_url[i])
if len(da)*2 != len(in_url) or len(da)*2 != len(content):
print("数据不完整,跳过插入")
return
new_date = find_new_date()
if not new_date or new_date == '':
new_date = '1970-01-01' # 默认最小日期
connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO `my_database_test`.`sci_spider`
(`title`, `url`, `date`, `type`, `create_date`)
VALUES (%s, %s, %s, %s, NOW())
"""
count = 0
for i in range(len(da)):
if da[i][0:10] > new_date:
count = count + 1
cursor.execute(sql, (content[i], in_url[i], da[i][0:10], type))
connection.commit()
print(f"已成功插入 {count} 条数据")
except Exception as e:
print(f"插入数据失败: {e}")
connection.rollback()
finally:
connection.close()
if __name__ == '__main__':
get_industry(1, 2)
科技厅爬虫
import re
import time
import pymysql
import requests
def connect_to_database():
connection = pymysql.connect(
host='127.0.0.1',
user='root',
password='123456',
database='my_database_test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return connection
def find_new_date():
connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = "SELECT date FROM `sci_spider` WHERE type = '1' ORDER BY DATE(date) DESC LIMIT 0,1"
cursor.execute(sql)
results = cursor.fetchall()
return results[0]['date']
except Exception as e:
return ''
connection.close()
finally:
connection.close()
def get_sci_kjt(page_num, type):
url = (f'http://kjt.shandong.gov.cn/col/col13360/index.html?uid=85651&pageNum={page_num}')
user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
headers = {
"Referer": None,
"User-Agent": user_Agent
}
while True:
try:
response = requests.get(url=url, headers=headers)
response.encoding = 'utf-8'
response = response.text
break
except:
print("请求失败,尝试睡眠一会(半小时)")
sleep_time = 60 * 30
time.sleep(sleep_time)
print("睡眠结束,继续运行...")
continue
da = re.findall(r'<span class="pull-right">(.*?)</span>', response)
sci_url = re.findall(r'href="(.*?)" class="ellipsis-line-clamp">', response)
content = re.findall(r'<s></s>(.*?)</a></li>', response)
for i in range(0, len(da)):
print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i])
if len(da) != len(sci_url) or len(da) != len(content):
print("数据不完整,跳过插入")
return
new_date = find_new_date()
if not new_date or new_date == '':
new_date = '1970-01-01' # 默认最小日期
connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO `my_database_test`.`sci_spider`
(`title`, `url`, `date`, `type`, `create_date`)
VALUES (%s, %s, %s, %s, NOW())
"""
count = 0
for i in range(len(da)):
if da[i] > new_date:
count = count + 1
cursor.execute(sql, (content[i], sci_url[i], da[i], type))
connection.commit()
print(f"已成功插入 {count} 条数据")
except Exception as e:
print(f"插入数据失败: {e}")
connection.rollback()
finally:
connection.close()
if __name__ == '__main__':
get_sci_kjt(1, 1)
山东省人民政府爬虫
import re
import time
import pymysql
import requests
def connect_to_database():
connection = pymysql.connect(
host='127.0.0.1',
user='root',
password='123456',
database='my_database_test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return connection
def find_new_date():
connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = "SELECT date FROM `sci_spider` WHERE type = '3' ORDER BY DATE(date) DESC LIMIT 0,1"
cursor.execute(sql)
results = cursor.fetchall()
return results[0]['date']
except Exception as e:
return ''
connection.close()
finally:
connection.close()
def get_sci_sdszf(page_num, type):
url = (f'http://www.shandong.gov.cn/col/col94237/index.html?uid=633233&pageNum={page_num}')
user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
headers = {
"Referer": None,
"User-Agent": user_Agent
}
while True:
try:
response = requests.get(url=url, headers=headers)
response.encoding = 'utf-8'
response = response.text
break
except:
print("请求失败,尝试睡眠一会(半小时)")
sleep_time = 60 * 30
time.sleep(sleep_time)
print("睡眠结束,继续运行...")
continue
# 提取日期
da = re.findall(r'<span>\s*(\d{4}-\d{2}-\d{2})\s*</span>', response)
# 提取链接
sci_url = re.findall(r'href="(.*?)"\s+target="_blank"\s+title="', response)
# 提取标题(title 属性)
content = re.findall(r'\s+target="_blank"\s+title="(.*?)"', response)
# return
print(len(da), len(sci_url), len(content))
for i in range(0, len(da)):
print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i])
if len(da) != len(sci_url) or len(da) != len(content):
print("数据不完整,跳过插入")
return
new_date = find_new_date()
if not new_date or new_date == '':
new_date = '1970-01-01' # 默认最小日期
connection = connect_to_database()
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO `my_database_test`.`sci_spider`
(`title`, `url`, `date`, `type`, `create_date`)
VALUES (%s, %s, %s, %s, NOW())
"""
count = 0
for i in range(len(da)):
if da[i] > new_date:
count = count + 1
cursor.execute(sql, (content[i], sci_url[i], da[i], type))
connection.commit()
print(f"已成功插入 {count} 条数据")
except Exception as e:
print(f"插入数据失败: {e}")
connection.rollback()
finally:
connection.close()
if __name__ == '__main__':
get_sci_sdszf(1, 3)