1.每日爬取数据后,将爬取的公告通知,同步到数据库

2.检索当日的公告信息,查看是否有科技创新企业,如果有则提醒通知(提醒未写,简单完善数据)

取的关键词模糊搜索+排除词排除掉无关条目,来查找响应数据

检索是否有科技创新企业的公告/通知

# 检查当日数据是否有科创企业名录

import re
import time
import pymysql
import requests
from gxt_spider import get_industry
from kjt_spider import get_sci_kjt
from sdszf_spider import get_sci_sdszf
from jinja2 import Template
import json

 def connect_to_database():
    connection = pymysql.connect(
         host='127.0.0.1',
         user='root',
         password='123456',
        database='my_database_test',
        charset='utf8mb4',
         cursorclass=pymysql.cursors.DictCursor
     )
     return connection


def query_today_kc_enterprises():
    keywords = [
        "科技型中小企业",
        "高新技术企业",
        "众创空间",
        "科技领军企业",
        "技术先进型服务企业",
        "技术创新示范企业",
        "专精特新",
        "科技企业",
        "瞪羚",
        "独角兽",
        "科技小巨人企业",
        '小巨人']
    not_contain_keywords = ["取消","组织申报","认定和复核","申报","补助名单","绩效评价"]
    sql = build_sql_query(keywords, not_contain_keywords)

    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            cursor.execute(sql)
            results = cursor.fetchall()

            return {
                "total": len(results),
                "list": results
            }
    finally:
        connection.close()

def build_sql_query(keywords, not_contain_keywords):
    like_conditions = " OR ".join([f"title LIKE '%{keyword}%'" for keyword in keywords])
    not_like_conditions = " and ".join([f"title NOT LIKE '%{not_contain_keyword}%'" for not_contain_keyword in not_contain_keywords])
    sql = f"""
        SELECT 
        CASE type
            WHEN '1' THEN '山东省科学技术厅'
            WHEN '2' THEN '山东省工业和技术化厅'
            WHEN '3' THEN '山东省人民政府'
            ELSE '未知类型'
        END AS type_name,date,title,url FROM `sci_spider`
        WHERE ({like_conditions}) 
        AND ({not_like_conditions})
        AND DATE(create_date) = DATE(NOW())
    """
    return sql


def mail_sender(content):
    import smtplib
    from email.mime.text import MIMEText
    from email.header import Header
    # 第三方 SMTP 服务
    mail_host = "smtp.163.com"  # 设置服务器
    mail_user = "18631839859@163.com"  # 用户名
    mail_pass = "GENGs7dM45TJDH6y"  # 口令
    sender = '18631839859@163.com'
    receivers = ['wonder1999@126.com']  # 接收邮件,可设置为你的QQ邮箱或者其他邮箱

    # message = MIMEText(content, 'plain', 'utf-8')
    message = MIMEText(content, 'html', 'utf-8')
    message['From'] = Header("科技型中小企业通知", 'utf-8')
    message['To'] = Header("科技型中小企业", 'utf-8')

    subject = '科技型中小企业通知'
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtpObj = smtplib.SMTP()
        smtpObj.connect(mail_host, 25)  # 25 为 SMTP 端口号
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, receivers, message.as_string())
        print("邮件发送成功")
    except smtplib.SMTPException:
        print("Error: 无法发送邮件")


def wx_web_hook(data):
    """
    通过企业微信Webhook发送Markdown格式的消息
    :param data: 包含通知数据的字典,结构应包含'total'和'list'键
    :return: None
    """
    # Webhook地址(请替换为你的实际Key)
    webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=ef84945d-2247-4f09-ac0b-be7a6607c24e"

    # 构造Markdown内容
    content = f"**找到 {data['total']} 条疑似符合条件的记录:**\n"
    for row in data['list']:
        content += (
            f"- [{row['title']}]({row['url']}) "
            f"<font color=\"comment\">{row['date']}</font> "
            f"<font color=\"warning\">{row['type_name']}</font>\n"
        )

    # 构建请求体
    payload = {
        "msgtype": "markdown",
        "markdown": {
            "content": content
        }
    }
    # 发送请求并处理响应
    try:
        response = requests.post(webhook_url, json=payload)
        response.raise_for_status()  # 抛出HTTP错误
        result = response.json()

        if result.get("errcode") == 0:
            print("✅ 消息发送成功")
        else:
            print(f"❌ 消息发送失败: {result.get('errmsg')}")

    except requests.exceptions.RequestException as e:
        print(f"⚠️ 请求异常: {e}")

if __name__ == '__main__':
    get_industry(1, 2)
    get_sci_kjt(1, 1)
    get_sci_sdszf(1, 3)
    data = query_today_kc_enterprises()
    title = f"找到 {data['total']} 条疑似符合条件的记录:"
    for row in data['list']:
        print(row)

    if data['total'] > 0:
        wx_web_hook(data)
        # mail_sender('测试消息')

工信厅爬虫

import re
import time
import pymysql
import requests


# 数据库链接
def connect_to_database():
    connection = pymysql.connect(
        host='127.0.0.1',
        user='root',
        password='123456',
        database='my_database_test',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    return connection


def find_new_date():
    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = "SELECT date FROM `sci_spider` WHERE type = '2' ORDER BY DATE(date) DESC LIMIT 0,1"
            cursor.execute(sql)
            results = cursor.fetchall()
            return results[0]['date']
    except Exception as e:
        return ''
        connection.close()
    finally:
        connection.close()


def get_industry(page_num, type):
    url = (f'http://gxt.shandong.gov.cn/col/col15201/index.html?uid=586830&pageNum={page_num}')

    user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
    headers = {
        "Referer": None,
        "User-Agent": user_Agent
    }
    while True:
        try:
            response = requests.get(url=url, headers=headers)
            response.encoding = 'utf-8'
            response = response.text
            break
        except:
            print("请求失败,尝试睡眠一会(半小时)")
            sleep_time = 60 * 30
            time.sleep(sleep_time)
            print("睡眠结束,继续运行...")
            continue

    da = re.findall(r'<div class="bottom">            <span>                (.*?)            </span>', response)
    in_url = re.findall(r'target="_blank" href="(.*?)">', response)
    content = re.findall(r'<a title="(.*?)" target="_blank"', response)

    for i in range(0, len(da)):
        print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + in_url[i])

    if len(da)*2 != len(in_url) or len(da)*2 != len(content):
        print("数据不完整,跳过插入")
        return

    new_date = find_new_date()
    if not new_date or new_date == '':
        new_date = '1970-01-01'  # 默认最小日期

    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = """
               INSERT INTO `my_database_test`.`sci_spider` 
               (`title`, `url`, `date`, `type`, `create_date`) 
               VALUES (%s, %s, %s, %s, NOW())
               """
            count = 0
            for i in range(len(da)):
                if da[i][0:10] > new_date:
                    count = count + 1
                    cursor.execute(sql, (content[i], in_url[i], da[i][0:10], type))

        connection.commit()
        print(f"已成功插入 {count} 条数据")
    except Exception as e:
        print(f"插入数据失败: {e}")
        connection.rollback()
    finally:
        connection.close()

if __name__ == '__main__':
    get_industry(1, 2)

科技厅爬虫

import re
import time
import pymysql
import requests


def connect_to_database():
    connection = pymysql.connect(
        host='127.0.0.1',
        user='root',
        password='123456',
        database='my_database_test',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    return connection


def find_new_date():
    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = "SELECT date FROM `sci_spider` WHERE type = '1' ORDER BY DATE(date) DESC LIMIT 0,1"
            cursor.execute(sql)
            results = cursor.fetchall()
            return results[0]['date']
    except Exception as e:
        return ''
        connection.close()
    finally:
        connection.close()


def get_sci_kjt(page_num, type):
    url = (f'http://kjt.shandong.gov.cn/col/col13360/index.html?uid=85651&pageNum={page_num}')
    user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
    headers = {
        "Referer": None,
        "User-Agent": user_Agent
    }
    while True:
        try:
            response = requests.get(url=url, headers=headers)
            response.encoding = 'utf-8'
            response = response.text
            break
        except:
            print("请求失败,尝试睡眠一会(半小时)")
            sleep_time = 60 * 30
            time.sleep(sleep_time)
            print("睡眠结束,继续运行...")
            continue

    da = re.findall(r'<span class="pull-right">(.*?)</span>', response)
    sci_url = re.findall(r'href="(.*?)" class="ellipsis-line-clamp">', response)
    content = re.findall(r'<s></s>(.*?)</a></li>', response)

    for i in range(0, len(da)):
        print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i])

    if len(da) != len(sci_url) or len(da) != len(content):
        print("数据不完整,跳过插入")
        return

    new_date = find_new_date()
    if not new_date or new_date == '':
        new_date = '1970-01-01'  # 默认最小日期

    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = """
            INSERT INTO `my_database_test`.`sci_spider` 
            (`title`, `url`, `date`, `type`, `create_date`) 
            VALUES (%s, %s, %s, %s, NOW())
            """
            count = 0
            for i in range(len(da)):
                if da[i] > new_date:
                    count = count + 1
                    cursor.execute(sql, (content[i], sci_url[i], da[i], type))

        connection.commit()
        print(f"已成功插入 {count} 条数据")
    except Exception as e:
        print(f"插入数据失败: {e}")
        connection.rollback()
    finally:
        connection.close()


if __name__ == '__main__':
    get_sci_kjt(1, 1)

山东省人民政府爬虫

import re
import time
import pymysql
import requests


def connect_to_database():
    connection = pymysql.connect(
        host='127.0.0.1',
        user='root',
        password='123456',
        database='my_database_test',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    return connection


def find_new_date():
    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = "SELECT date FROM `sci_spider` WHERE type = '3' ORDER BY DATE(date) DESC LIMIT 0,1"
            cursor.execute(sql)
            results = cursor.fetchall()
            return results[0]['date']
    except Exception as e:
        return ''
        connection.close()
    finally:
        connection.close()


def get_sci_sdszf(page_num, type):
    url = (f'http://www.shandong.gov.cn/col/col94237/index.html?uid=633233&pageNum={page_num}')
    user_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36"
    headers = {
        "Referer": None,
        "User-Agent": user_Agent
    }
    while True:
        try:
            response = requests.get(url=url, headers=headers)
            response.encoding = 'utf-8'
            response = response.text
            break
        except:
            print("请求失败,尝试睡眠一会(半小时)")
            sleep_time = 60 * 30
            time.sleep(sleep_time)
            print("睡眠结束,继续运行...")
            continue

    # 提取日期
    da = re.findall(r'<span>\s*(\d{4}-\d{2}-\d{2})\s*</span>', response)
    # 提取链接
    sci_url = re.findall(r'href="(.*?)"\s+target="_blank"\s+title="', response)
    # 提取标题(title 属性)
    content = re.findall(r'\s+target="_blank"\s+title="(.*?)"', response)
    # return
    print(len(da), len(sci_url), len(content))

    for i in range(0, len(da)):
        print(str(i+1) + ' : ' + da[i][0:10] + ' : '+content[i]+ ' : ' + sci_url[i])

    if len(da) != len(sci_url) or len(da) != len(content):
        print("数据不完整,跳过插入")
        return

    new_date = find_new_date()
    if not new_date or new_date == '':
        new_date = '1970-01-01'  # 默认最小日期

    connection = connect_to_database()
    try:
        with connection.cursor() as cursor:
            sql = """
            INSERT INTO `my_database_test`.`sci_spider` 
            (`title`, `url`, `date`, `type`, `create_date`) 
            VALUES (%s, %s, %s, %s, NOW())
            """
            count = 0
            for i in range(len(da)):
                if da[i] > new_date:
                    count = count + 1
                    cursor.execute(sql, (content[i], sci_url[i], da[i], type))

        connection.commit()
        print(f"已成功插入 {count} 条数据")
    except Exception as e:
        print(f"插入数据失败: {e}")
        connection.rollback()
    finally:
        connection.close()


if __name__ == '__main__':
    get_sci_sdszf(1, 3)
最后修改:2025 年 06 月 28 日
如果觉得我的文章对你有用,请随意赞赏