recipe-graph/src/recipe_graph/scrape.py

190 lines
7.7 KiB
Python

import sys
from recipe_graph import db
import re
from sqlalchemy import select, desc, exists, not_, except_
from sqlalchemy.orm import sessionmaker
import bs4
from urllib.request import urlopen
from urllib.parse import urljoin
import logging
from argparse import ArgumentParser
def parse_ingredient(ingredient_text):
units = ['teaspoon', 'tablespoon', 'gram', 'ounce', 'jar', 'cup', 'pinch',
'container', 'slice', 'package', 'pound', 'can', 'dash', 'spear',
'bunch', 'quart', 'cube', 'envelope', 'square', 'sprig', 'bag',
'box', 'drop', 'fluid ounce', 'gallon', 'head', 'link', 'loaf',
'pint', 'pod', 'sheet', 'stalk', 'whole', 'bar', 'bottle', 'bulb',
'year', 'fillet', 'litter', 'packet', 'slices']
instructions = ['and', 'or', 'chopped', 'diced', 'brewed', 'chilled',
'chunky', 'small', 'medium', 'large', 'couarse', 'cracked',
'crushed', 'ground', 'cooked', 'cubed', 'crumbled', 'cut',
'cold', 'hot', 'warm', 'day', 'old', 'drained', 'canned',
'dried', 'dry', 'fine', 'firm', 'fresh', 'frozen',
'grated', 'grilled', 'hard', 'hot', 'juliened?', 'leftover',
'light', 'lite', 'mashed', 'melted', 'minced', 'packed',
'peeled', 'pitted', 'sliced', 'prepared', 'refrigerated',
'rehydrated', 'seedless', 'shaved', 'shredded', 'sifted',
'sieved', 'shucked', 'slivered', 'thick', 'sliced', 'thin',
'toasted', 'trimmed', 'unbaked', 'uncooked', 'unpeeled',
'unopened', 'unseasoned']
number_regex = '((?:[\d\\./\\u00BC-\\u00BE\\u2150-\\u215E]*\s?(?:\(.+\))?)*)'
ingredient_regex = '([a-zA-Z \'\-]+)'
supplement_regex = ',?(.*)'
units_regex = "|".join([f'[{unit[0]}{unit[0].capitalize()}]{unit[1:]}'
for unit in units])
units_regex = f"((?:(?:{units_regex})e?s?)?)"
instructions_regex = "|".join([f'[{inst[0]}{inst[0].capitalize()}]{inst[1:]}'
for inst in instructions])
instructions_regex = f"((?:(?:(?:{instructions_regex})(?:ly)?)| )*)"
regex = re.compile(number_regex +
units_regex +
instructions_regex +
ingredient_regex +
supplement_regex)
m = regex.match(ingredient_text)
logging.info(f"Parsed {ingredient_text}, found: {m}")
if not m:
return None
return [text.strip() if text else None for text in m.groups()]
def reparse_ingredients(session):
cte = (except_(select(db.RecipeIngredient.id),
select(db.RecipeIngredientParts.id))).\
alias('missing')
missing = session.query(db.RecipeIngredient).where(db.RecipeIngredient.id.in_(cte)).all()
for ingredient in missing:
parts = parse_ingredient(ingredient.text)
if not parts:
continue
quantity, unit, instruction, name, supplement = parts
session.add(db.RecipeIngredientParts(id = ingredient.id,
quantity = quantity,
unit = unit,
instruction = instruction,
ingredient = name,
supplement = supplement))
def load_recipe(recipe_url):
try:
logging.info(f'Loading Recipe: {recipe_url}')
with urlopen(recipe_url) as f:
if f.getcode() == 404:
raise Exception(f"Recipe Does not exist: {recipe_url}")
return bs4.BeautifulSoup(f.read().decode(), 'html.parser')
except Exception as e:
logging.warning(f"Could not download or parse recipe: {recipe_url}")
logging.warning(e)
return None
def parse_recipe(session, recipe, site):
recipe_url = urljoin(site.base_url, str(recipe.identifier))
recipe_page = load_recipe(recipe_url)
if not recipe_page:
return None
name_candidates = recipe_page.find_all(class_=site.name_class)
if len(name_candidates) == 0:
raise Exception(f"Could not extract recipe name: {recipe_url}")
name_div = name_candidates[0]
recipe.name = name_div.text
logging.info(f"Adding Recipe {recipe.name} from {recipe_url}")
session.add(recipe)
session.flush()
ingred_candidates = recipe_page.find_all(class_=site.ingredient_class)
for candidate in ingred_candidates:
ingred = db.RecipeIngredient(text=candidate.text,
recipe_id=recipe.id)
session.add(ingred)
session.flush()
parts = parse_ingredient(ingred.text)
if parts:
quantity, unit, instruction,ingredient, supplement = parts
ingred_parts = db.RecipeIngredientParts(id = ingred.id,
quantity = quantity,
unit = unit,
instruction = instruction,
ingredient = ingredient,
supplement = supplement)
session.add(ingred_parts)
logging.info(f"{len(ingred_candidates)} ingredients found. Inserting into DB")
return recipe
def main():
parser = ArgumentParser(description="Scrape a recipe site for recipies")
parser.add_argument('site',
help='Name of site')
parser.add_argument('-id', '--identifier', dest='id',
help='url of recipe(reletive to base url of site) or commma seperated list')
parser.add_argument('-a', '--auto', action='store', dest='n',
help='automaticaly generate identifier(must supply number of recipies to scrape)')
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args(sys.argv)
if args.verbose:
logging.basicConfig(level=logging.INFO)
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
eng = db.get_engine()
S = sessionmaker(eng)
with S.begin() as sess:
site = sess.query(db.RecipeSite).where(db.RecipeSite.name == args.site).one()
site_id = site.id
recipe_ids = []
starting_id = 0
if args.id and not args.n:
recipe_ids.append(args.id)
logging.info(f'Retreiving single recipe: {args.id}')
elif args.n:
if not args.id:
last_recipe = sess.query(db.Recipe).\
where(db.Recipe.recipe_site_id == site.id).\
order_by(desc(db.Recipe.identifier)).\
limit(1).\
scalar()
starting_id = int(last_recipe.identifier) + 1
else:
starting_id = int(args.id)
recipe_ids = range(starting_id, starting_id+int(args.n))
logging.info(f'Retreving {args.n} recipes from {site.base_url} starting at {starting_id}')
for recipe_id in recipe_ids:
try:
savepoint = sess.begin_nested()
recipe = db.Recipe(identifier = recipe_id, recipe_site_id = site.id)
parse_recipe(sess, recipe, site)
savepoint.commit()
except KeyboardInterrupt as e:
savepoint.rollback()
break
except Exception as e:
savepoint.rollback()
logging.error(e)
continue
if __name__ == "__main__": # pragma: no cover
main()