sql >> Database >  >> NoSQL >> Redis

Flask by example - Een Redis-taakwachtrij implementeren

In dit deel van de zelfstudie wordt beschreven hoe u een Redis-taakwachtrij implementeert om tekstverwerking af te handelen.

Updates:

  • 02/12/2020:geüpgraded naar Python-versie 3.8.1 en de nieuwste versies van Redis, Python Redis en RQ. Zie hieronder voor details. Vermeld een bug in de laatste RQ-versie en zorg voor een oplossing. De http voor https-bug opgelost.
  • 22-03-2016:geüpgraded naar Python-versie 3.5.1 en de nieuwste versies van Redis, Python Redis en RQ. Zie hieronder voor details.
  • 22/02/2015:ondersteuning voor Python 3 toegevoegd.

Gratis bonus: Klik hier om toegang te krijgen tot een gratis Flask + Python-videozelfstudie die u stap voor stap laat zien hoe u de Flask-webapp bouwt.

Onthoud:dit is wat we aan het bouwen zijn:een Flask-app die woordfrequentieparen berekent op basis van de tekst van een bepaalde URL.

  1. Deel één:zet een lokale ontwikkelomgeving op en implementeer vervolgens zowel een staging- als een productieomgeving op Heroku.
  2. Deel twee:stel een PostgreSQL-database samen met SQLAlchemy en Alembic om migraties af te handelen.
  3. Deel drie:voeg de back-endlogica toe om het aantal woorden van een webpagina te schrapen en vervolgens te verwerken met behulp van de bibliotheken met verzoeken, BeautifulSoup en Natural Language Toolkit (NLTK).
  4. Deel vier:Implementeer een Redis-taakwachtrij om de tekstverwerking af te handelen. (huidig )
  5. Deel vijf:stel Angular in aan de front-end om continu de back-end te pollen om te zien of het verzoek is verwerkt.
  6. Deel zes:push naar de staging-server op Heroku - Redis instellen en beschrijven hoe twee processen (web en worker) op een enkele Dyno kunnen worden uitgevoerd.
  7. Deel zeven:werk de front-end bij om deze gebruiksvriendelijker te maken.
  8. Deel acht:maak een aangepaste hoekrichtlijn om een ​​frequentieverdelingsdiagram weer te geven met JavaScript en D3.

Code nodig? Pak het uit de repo.


Installatievereisten

Gebruikte tools:

  • Redis (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) - een eenvoudige bibliotheek voor het maken van een takenwachtrij

Begin met het downloaden en installeren van Redis vanaf de officiële site of via Homebrew (brew install redis ). Na installatie start u de Redis-server:

$ redis-server

Installeer vervolgens Python Redis en RQ in een nieuw terminalvenster:

$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


De werker instellen

Laten we beginnen met het maken van een werkproces om te luisteren naar taken in de wachtrij. Maak een nieuw bestand worker.py , en voeg deze code toe:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Hier hebben we geluisterd naar een wachtrij genaamd default en een verbinding tot stand gebracht met de Redis-server op localhost:6379 .

Start dit op in een ander terminalvenster:

$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Nu moeten we onze app.py updaten om opdrachten naar de wachtrij te sturen...



Update app.py

Voeg de volgende importen toe aan app.py :

from rq import Queue
from rq.job import Job
from worker import conn

Werk vervolgens het configuratiegedeelte bij:

app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) een Redis-verbinding opzetten en een wachtrij initialiseren op basis van die verbinding.

Verplaats de tekstverwerkingsfunctionaliteit uit onze indexroute en naar een nieuwe functie genaamd count_and_save_words() . Deze functie accepteert één argument, een URL, die we eraan zullen doorgeven wanneer we deze vanuit onze indexroute aanroepen.

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Noteer de volgende code:

job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Opmerking: We moeten de count_and_save_words . importeren functie in onze functie index omdat het RQ-pakket momenteel een bug heeft, waardoor het geen functies in dezelfde module kan vinden.

Hier gebruikten we de wachtrij die we eerder hebben geïnitialiseerd en de enqueue_call() . hebben genoemd functie. Dit voegde een nieuwe taak toe aan de wachtrij en die taak voerde de count_and_save_words() uit functie met de URL als argument. De result_ttl=5000 lijnargument vertelt RQ hoe lang het resultaat van de taak moet worden vastgehouden - in dit geval 5000 seconden. Vervolgens hebben we de taak-ID naar de terminal uitgevoerd. Deze id is nodig om te zien of de taak is verwerkt.

Laten we daarvoor een nieuwe route opzetten...



Resultaten ophalen

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Laten we dit eens testen.

Start de server, ga naar http://localhost:5000/, gebruik de URL https://realpython.com en pak de taak-ID van de terminal. Gebruik die id vervolgens in het eindpunt '/results/' - d.w.z. http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Zolang er minder dan 5.000 seconden zijn verstreken voordat u de status controleert, zou u een id-nummer moeten zien, dat wordt gegenereerd wanneer we de resultaten aan de database toevoegen:

# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Laten we nu de route iets aanpassen om de werkelijke resultaten uit de database in JSON te retourneren:

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Zorg ervoor dat u de import toevoegt:

from flask import jsonify

Test dit nog eens uit. Als alles goed is gegaan, zou je iets vergelijkbaars moeten zien in je browser:

[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


Wat nu?

Gratis bonus: Klik hier om toegang te krijgen tot een gratis Flask + Python-videozelfstudie die u stap voor stap laat zien hoe u de Flask-webapp bouwt.

In deel 5 brengen we de client en de server samen door Angular aan de mix toe te voegen om een ​​poller te maken, die elke vijf seconden een verzoek stuurt naar de /results/<job_key> eindpunt dat om updates vraagt. Zodra de gegevens beschikbaar zijn, voegen we deze toe aan het DOM.

Proost!

Dit is een samenwerkingsstuk tussen Cam Linke, mede-oprichter van Startup Edmonton, en de mensen van Real Python



  1. MongoDB aan met Docker kon geen verbinding maken met server [localhost:27017] bij eerste verbinding

  2. Verbind laravel jenssegers met mongodb atlascluster

  3. Vermijd de totale limiet van 16 MB

  4. Afronding op 2 decimalen met behulp van het MongoDB-aggregatieraamwerk