netkeiba.comから新着のデータのみスクレイピングしてデータベースにいれてみた

2020年12月21日

netkeiba.comから新着のデータのみスクレイピングする

競馬を分析するのが少々流行っているこの頃です。調べてみるとnetkeiba.comからデータを取得してきている記事を多く見ます。それにならい私も前回このような記事を書いてみましたが、今回は改良して下記の要件を加えてみました。

1.取得した値をデータベースに格納する
2.新着のデータのみスクレイピングする
3.slackに通知する

netkeiba.comのURLについて

netkeiba.comでスクレイピングするページのURLは下記のようになっています。
「https://race.netkeiba.com/race/result.html?race_id=202001020812&rf=race_list」

今回肝になってくるのはrace_idのパラメータですが、下記の規則で成り立っています。
race_id=開催年(NNNN年開催)+競馬場コード(競馬場NN)+開催回数(1年の内の開催N回目)+日数(開催の内のN日目)+レース数(1日の内のNレース目)

今回race_idをデータベースで管理することで「2」を実現しています。
まずスクレイピングを行い取得したデータのINSERTが成功した場合、該当するURLのパラメータをデータベースに保管します。次にプログラムが動く時は、「保管したパラメータのレース数に+1したURL」からスクレイピングを開始します。このロジックをコース毎に行います。

イメージは下の通りです。※厳密には+1ではなくcontinueです

コード

コードは下の通りです。


#%%
#--データベース------------------------------------------
import mysql.connector as mariadb

#--スクレイピング----------------------------------------
from bs4 import BeautifulSoup
import requests
import time
#--Slack-------------------------------------------------
import slackweb

#--データ分析--------------------------------------------
import pandas as pd

#--日付--------------------------------------------------
from datetime import date

#--エラー------------------------------------------------
import sys
import traceback

class ScrapingController():

    def make_url(self, holding_year, race_course, holding_count, date_count, race_number):
        race_id = [holding_year, race_course, holding_count, date_count, race_number]
        race_id = [str(r) for r in race_id]
        race_id = list(map(lambda x: x.zfill(2), race_id))
        race_id = ''.join(race_id)
        url     = f'https://race.netkeiba.com/race/result.html?race_id={race_id}&rf=race_list'
        return url

    def get_horse_datas(self, url, holding_year, race_course, holding_count, date_count, race_number):
        request = requests.get(url)
        soup = BeautifulSoup(request.content, 'html.parser')
        horses = soup.find_all(class_='HorseList')
        horse_datas = []
        for horse in horses:
            horse_data = {}
            #開催年(NNNN年開催)
            #競馬場コード(競馬場NN)
            #開催回数(1年の内の開催N回目)
            #日数(開催の内のN日目)
            #レース数(1日の内のNレース目)
            horse_data['url']                   = url
            horse_data['holding_year']          = holding_year
            horse_data['race_course']           = race_course
            horse_data['holding_count']         = holding_count
            horse_data['date_count']            = date_count
            horse_data['race_number']           = race_number
            horse_data['rank']                  = horse.find(class_='Rank').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_data['horse_number']          = horse.find(class_='Txt_C').find('div').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_data['horse_name']            = horse.find(class_='Horse_Name').find('a').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_detail                        = horse.find(class_='Detail_Left').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_data['horse_sex']             = horse_detail[0] if len(horse_detail)==2 else ''
            horse_data['horse_age']             = horse_detail[1] if len(horse_detail)==2 else ''
            horse_data['jockey_weight']         = horse.find(class_='JockeyWeight').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_data['jockey_name']           = horse.find(class_='Jockey').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_data['race_time']             = horse.find(class_='RaceTime').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_data['popularity']            = horse.find(class_='OddsPeople').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_data['odds']                  = horse.find(class_='Txt_R').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_data['passage']               = horse.find(class_='PassageRate').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_data['trainer']               = horse.find(class_='Trainer').find('a').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_weight                        = horse.find(class_='Weight').get_text().replace('\n','').replace(' ','').replace(' ','')
            horse_weights                       = horse_weight.split('(')
            horse_data['horse_weight']          = horse_weights[0] if len(horse_weights)==2 else ''
            horse_data['horse_weight_changes']  = horse_weights[1].replace(')', '') if len(horse_weights)==2 else ''
            horse_datas.append(horse_data)
            time.sleep(1)
        return(horse_datas)

class DatabaseController:

    def __init__(self):
        self.mariadb_connection = mariadb.connect(host='localhost', user='root', password='', database='netkeiba')
        self.cursor = self.mariadb_connection.cursor(dictionary=True)

    def insert_horse_datas(self, horse_datas):
        for horse_data in horse_datas:
            columns = ', '.join(horse_data.keys())
            values  = ', '.join(map(lambda x: f"'{x}'", horse_data.values()))
            query = f'INSERT INTO horse_datas ({columns}) VALUES ({values})'
            self.cursor.execute(query)
            self.mariadb_connection.commit()
        return True

    def get_last_scraping_infos(self):
        query = 'SELECT id, holding_year, race_course, holding_count, date_count, race_number FROM last_scraping_infos'
        self.cursor.execute(query)
        last_scraping_infos = list(self.cursor)
        return last_scraping_infos

    def update_last_scraping_info(self, holding_year, race_course, holding_count, date_count, race_number):
        query = f'UPDATE last_scraping_infos SET holding_year = {holding_year}, holding_count = {holding_count}, date_count = {date_count}, race_number = {race_number} WHERE race_course = {race_course}'
        self.cursor.execute(query)
        self.mariadb_connection.commit()
        return True

class SlackController:

    def __init__(self):
        self.slack = slackweb.Slack(url='https://hooks.slack.com/services/xxxxxxxxxx')

    def notify_result(self, result_message):
        text = result_message
        self.slack.notify(text=text)

    def notify_error(self, url, error_section, error_message):
        text = f'============Error Occurred============\nURL:\n{url}\n\nセクション:\n{error_section}\n\nエラー内容:\n{error_message}\n===================================='
        print(text)
        self.slack.notify(text=text)

def main():
    scrapingController = ScrapingController()
    databaseController = DatabaseController()
    slackController    = SlackController()

    last_scraping_infos = databaseController.get_last_scraping_infos()
    
    next_year = date.today().year + 1

    for last_scraping_info in last_scraping_infos:
        print('#--FIRST_ROOP_OF_RACE_COURSE#-------------------------------------------------------------')
        #コース毎の最初のループのルール
            #1.コース毎に「続きからスクレイピングを開始する」為に最後に取得した情報を代入する
            #2.重複を避ける為「最後に取得した情報」の次の値からスクレイピングを開始する
        first_roop_of_race_course = True
        race_course        = last_scraping_info['race_course']   #1
        holding_year_from  = last_scraping_info['holding_year']  #1
        holding_count_from = last_scraping_info['holding_count'] #1
        date_count_from    = last_scraping_info['date_count']    #1
        race_number_from   = last_scraping_info['race_number']   #1
        for holding_year in range(holding_year_from, next_year):
            for holding_count in range(holding_count_from, 11):
                for date_count in range(date_count_from, 11):
                    print('------------------------------------------------------------------------------------------')
                    for race_number in range(race_number_from, 14):
                        if first_roop_of_race_course:
                            holding_year_from  = 1
                            holding_count_from = 1
                            date_count_from    = 1
                            race_number_from   = 1
                            first_roop_of_race_course = False
                            continue #2

                        #分岐となるパラメータを初期化する
                        horse_datas = []
                        result_insert_horse_datas = False
                        result_update_last_scraping_info = False

                        #URLを生成する
                        url = scrapingController.make_url(holding_year, race_course, holding_count, date_count, race_number)

                        #スクレイピング開始する
                        try:
                            horse_datas = scrapingController.get_horse_datas(url, holding_year, race_course, holding_count, date_count, race_number)
                        except:
                            error_section = 'Scraping Horse Datas'
                            error_message = traceback.format_exc()
                            slackController.notify_error(url, error_section, error_message)
                            sys.exit()
                        #スクレイピングしたデータをデータベースに格納する
                        if horse_datas:
                            try:
                                result_insert_horse_datas = databaseController.insert_horse_datas(horse_datas)
                            except:
                                error_section = 'Insert Horse Datas'
                                error_message = traceback.format_exc()
                                slackController.notify_error(url, error_section, error_message)
                                sys.exit()
                        #最後にスクレイピングしたデータを記録する
                        if result_insert_horse_datas:
                            try:
                                result_update_last_scraping_info =databaseController.update_last_scraping_info(holding_year, race_course, holding_count, date_count, race_number)
                            except:
                                error_section = 'Update Last Scraping Info'
                                error_message = traceback.format_exc()
                                slackController.notify_error(url, error_section, error_message)
                                sys.exit()
                        #※ターミナル確認用
                        if result_update_last_scraping_info:
                            print(f'{url}: SUCCESS - {len(horse_datas)}')
                        else:
                            print(f'{url}: SKIP    - {len(horse_datas)}')
    slackController.notify_result('更新が完了しました。')
    print('--DONE------------------------------------------------------------------------------------')

if __name__ == '__main__':
    main()

最後に

これで今後データを取得する時はURLを指定してプログラムを実行しなくて済むようになりました。これならcronに任せられますし、エラーも通知が来るので一安心です。一応2万件取得してみましたが問題なく動いてくれています。

関連のある書籍

2020年12月21日