400 028 6601

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

如何在python项目中利用Apscheduler实现一个定时任务-创新互联

本篇文章给大家分享的是有关如何在python 项目中利用Apscheduler实现一个定时任务,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

创新互联公司长期为1000+客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为惠农企业提供专业的网站设计制作、网站制作,惠农网站改版等技术服务。拥有十余年丰富建站经验和众多成功案例,为您定制开发。

先简单介绍一下Apscheduler模块包含的四种组件:

大概了解了Apscheduler包含的几种概念,现在先来看一下一个简单的示例:

# -*- coding: utf-8 -*-

from apscheduler.schedulers.blocking import BlockingScheduler
import time


def hello():
  print(time.strftime("%c"))


if __name__ == "__main__":
  scheduler = BlockingScheduler()
  scheduler.add_job(hello, 'interval', seconds=5)
  scheduler.start()

示例的输出:

Thu Dec 3 16:01:20 2020
Thu Dec 3 16:01:25 2020
Thu Dec 3 16:01:30 2020
Thu Dec 3 16:01:35 2020
Thu Dec 3 16:01:40 2020
..........

这个简单的示例,我们用上面提到几种组件分析一下运行逻辑:

内置的三种Trigger触发器类型:

常见的Scheduler调度器:

常见的JobStore:

进阶使用

通过上面一个简单的示例了解大概的工作流程,以及各个组件在整个流程中的作用,以下的示例是Flask Web框架结合使用Apscheduler定时器,定时执行任务。

# -*- coding: utf-8 -*-

from flask import Flask, Blueprint, request
from apscheduler.executors.pool import ThreadPoolExecutor 
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.redis import RedisJobStore
import time

app = Flask(__name__)
executors = {"default": ThreadPoolExecutor(5)}
default_redis_jobstore = RedisJobStore(db=2, 
    jobs_key="apschedulers.default_jobs",
    run_times_key="apschedulers.default_run_times",
    host = '127.0.0.1',
    port = 6379
    )

scheduler = BackgroundScheduler(executors=executors)
scheduler.add_jobstore(default_redis_jobstore)
scheduler.start()

def say_hello():
  print(time.strftime("%c"))


@app.route("/get_job", methods=['GET'])
def get_job():
  if scheduler.get_job("say_hello_test"):
    return "YES"
  else:
    return "NO"

@app.route("/start_job", methods=["GET"])
def start_job():
  if not scheduler.get_job("say_hello_test"):
    scheduler.add_job(say_hello, "interval", seconds=5, id="say_hello_test")
    return "Start Scuessfully!"
  else:
    return "Started Failed"
  
@app.route("/remove_job", methods=["GET"])
def remove_job():
  if scheduler.get_job("say_hello_test"):
    scheduler.remove_job("say_hello_test")
    return "Delete Successfully!"
  else:
    return "Delete Failed"


if __name__ == "__main__":
  app.run(host="127.0.0.1", port=8787, debug=True)

以上就是如何在python 项目中利用Apscheduler实现一个定时任务,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。


分享文章:如何在python项目中利用Apscheduler实现一个定时任务-创新互联
URL分享:http://mbwzsj.com/article/ceooji.html

其他资讯

让你的专属顾问为你服务