from ast import alias from dis import Instruction import db import re from sqlalchemy import select, desc, exists, not_, except_ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import sessionmaker import bs4 from urllib.request import urlopen from urllib.parse import urljoin import logging from argparse import ArgumentParser def parse_ingredient(ingredient_text): units = ['teaspoon', 'tablespoon', 'gram', 'ounce', 'jar', 'cup', 'pinch', 'container', 'slice', 'package', 'pound', 'can', 'dash', 'spear', 'bunch', 'quart', 'cube', 'envelope', 'square', 'sprig', 'bag', 'box', 'drop', 'fluid ounce', 'gallon', 'head', 'link', 'loaf', 'pint', 'pod', 'sheet', 'stalk', 'whole', 'bar', 'bottle', 'bulb', 'year', 'fillet', 'litter', 'packet', 'slices'] instructions = ['and', 'or', 'chopped', 'diced', 'brewed', 'chilled', 'chunky', 'small', 'medium', 'large', 'couarse', 'cracked', 'crushed', 'ground', 'cooked', 'cubed', 'crumbled', 'cut', 'cold', 'hot', 'warm', 'day', 'old', 'drained', 'canned', 'dried', 'dry', 'fine', 'firm', 'fresh', 'frozen', 'grated', 'grilled', 'hard', 'hot', 'juliened?', 'leftover', 'light', 'lite', 'mashed', 'melted', 'minced', 'packed', 'peeled', 'pitted', 'sliced', 'prepared', 'refrigerated', 'rehydrated', 'seedless', 'shaved', 'shredded', 'sifted', 'sieved', 'shucked', 'slivered', 'thick', 'sliced', 'thin', 'toasted', 'trimmed', 'unbaked', 'uncooked', 'unpeeled', 'unopened', 'unseasoned'] number_regex = '((?:[\d\\./\\u00BC-\\u00BE\\u2150-\\u215E]*\s?(?:\(.+\))?)*)' ingredient_regex = '([a-zA-Z \'\-]+)' supplement_regex = ',?(.*)' units_regex = "|".join([f'[{unit[0]}{unit[0].capitalize()}]{unit[1:]}' for unit in units]) units_regex = f"((?:(?:{units_regex})e?s?)?)" instructions_regex = "|".join([f'[{inst[0]}{inst[0].capitalize()}]{inst[1:]}' for inst in instructions]) instructions_regex = f"((?:(?:(?:{instructions_regex})(?:ly)?)| )*)" regex = re.compile(number_regex + units_regex + instructions_regex + ingredient_regex + supplement_regex) m = regex.match(ingredient_text) logging.info(f"Parsed {ingredient_text}, found: {m}") if not m: return None return [text.strip() if text else None for text in m.groups()] def reparse_ingredients(session): cte = (except_(select(db.RecipeIngredient.id), select(db.RecipeIngredientParts.id))).\ alias('missing') missing = session.query(db.RecipeIngredient).where(db.RecipeIngredient.id.in_(cte)).all() for ingredient in missing: parts = parse_ingredient(ingredient.text) if not parts: continue quantity, unit, instruction, name, supplement = parts session.add(db.RecipeIngredientParts(id = ingredient.id, quantity = quantity, unit = unit, instruction = instruction, ingredient = name, supplement = supplement)) def load_recipe(recipe_url): try: logging.info(f'Loading Recipe: {recipe_url}') with urlopen(recipe_url) as f: if f.getcode() == 404: raise Exception(f"Recipe Does not exist: {recipe_url}") return bs4.BeautifulSoup(f.read().decode(), 'html.parser') except Exception as e: logging.warning(f"Could not download or parse recipe: {recipe_url}") logging.warning(e) return None def parse_recipe(session, recipe, site): recipe_url = urljoin(site.base_url, str(recipe.identifier)) recipe_page = load_recipe(recipe_url) if not recipe_page: return None name_candidates = recipe_page.find_all(class_=site.name_class) if len(name_candidates) == 0: raise Exception(f"Could not extract recipe name: {recipe_url}") name_div = name_candidates[0] recipe.name = name_div.text logging.info(f"Adding Recipe {recipe.name} from {recipe_url}") session.add(recipe) session.flush() ingred_candidates = recipe_page.find_all(class_=site.ingredient_class) for candidate in ingred_candidates: ingred = db.RecipeIngredient(text=candidate.text, recipe_id=recipe.id) session.add(ingred) session.flush() parts = parse_ingredient(ingred.text) if parts: quantity, unit, instruction,ingredient, supplement = parts ingred_parts = db.RecipeIngredientParts(id = ingred.id, quantity = quantity, unit = unit, instruction = instruction, ingredient = ingredient, supplement = supplement) session.add(ingred_parts) logging.info(f"{len(ingred_candidates)} ingredients found. Inserting into DB") return recipe parser = ArgumentParser(description="Scrape a recipe site for recipies") parser.add_argument('site', help='Name of site') parser.add_argument('-id', '--identifier', dest='id', help='url of recipe(reletive to base url of site) or commma seperated list') parser.add_argument('-a', '--auto', action='store', dest='n', help='automaticaly generate identifier(must supply number of recipies to scrape)') parser.add_argument('-v', '--verbose', action='store_true') args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.INFO) logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) eng = db.get_engine() S = sessionmaker(eng) with S.begin() as sess: site = sess.query(db.RecipeSite).where(db.RecipeSite.name == args.site).one() site_id = site.id recipe_ids = [] starting_id = 0 if args.id and not args.n: recipe_ids.append(args.id) logging.info(f'Retreiving single recipe: {args.id}') elif args.n: if not args.id: last_recipe = sess.query(db.Recipe).\ where(db.Recipe.recipe_site_id == site.id).\ order_by(desc(db.Recipe.identifier)).\ limit(1).\ scalar() starting_id = int(last_recipe.identifier) + 1 else: starting_id = int(args.id) recipe_ids = range(starting_id, starting_id+int(args.n)) logging.info(f'Retreving {args.n} recipes from {site.base_url} starting at {starting_id}') for recipe_id in recipe_ids: try: savepoint = sess.begin_nested() recipe = db.Recipe(identifier = recipe_id, recipe_site_id = site.id) parse_recipe(sess, recipe, site) savepoint.commit() except KeyboardInterrupt as e: savepoint.rollback() break except Exception as e: savepoint.rollback() logging.error(e) continue