Parsing / Сбор информации, Python, Анализ данных

Выявление мошеннических сборов в Instagram

Время прочтения: 6 мин.

Изначально была выдвинута следующая гипотеза: злоумышленники часто берут фотографии из аккаунтов реальных детей, при этом изменив имя ребенка и реквизиты сбора. Первой мыслью был поиск подобных аккаунтов с дальнейшей классификацией их как подлинные, либо поддельные по каким-то признакам. Однако на практике оказалось, что такие аккаунты довольно быстро блокирует администрация по жалобам пользователей или мошенники закрывают свой аккаунт настройками приватности после появления «разоблачающих» комментариев, неудобных вопросов, и создают новый. При этом реквизиты сбора часто остаются те же самые.

Получается, что получить данные напрямую из профилей мошенников мы не можем. Пойдем другим путем. В социальных сетях существуют так называемые группы «антимошенников», где пользователи делятся случаями недобросовестных, по их мнению, сборов. Возникла идея попробовать поискать подобные посты и получить информацию из них.

Для получения содержимого постов будем использовать Python и библиотеку InstaLoader. Это инструмент, который позволяет в автоматическом режиме получать публикации, комментарии, метаданные и многое другое из Instagram, при этом некоторые функции доступны даже без авторизации. Его можно использовать как в режиме командной строки, так и в виде библиотеки Python.

Библиотека позволяет искать и получать данные множеством разных способов. Так, возможно скачать данные конкретного профиля целиком, включая фото-, видеоконтент, истории, комментарии, а также метаданные, информацию о подписчиках пользователя и его подписках, при этом для такого способа получения данных доступен «режим паузы» — можно приостановить процесс скачивания и в дальнейшем запустить его с того же места. Интересный факт: в метаданных, получаемых библиотекой, можно найти результат работы внутренних алгоритмов Instagram, которые генерируют текстовые описания фотографий («на картинке два человека, мужчина и женщина» и т.п.). Для просмотренных мною фотографий такие описания были на удивление точны.

Однако, в нашем случае мы не знаем, с каких профилей потребуются данные, поэтому воспользуемся другой опцией – поиск по хештегам. При работе с Python взаимодействие с Instagram ведется через класс Instaloader(), поэтому первым шагом мы создаем его экземпляр и при необходимости задаем параметры.

bot = instaloader.Instaloader()
bot.download_videos = False
bot.download_location = False
bot.download_pictures = False
bot.download_geotags = False
bot.download_video_thumbnails = False
bot.filename_pattern = '{shortcode}'

Как отмечалось ранее, часть функционала библиотеки доступны без авторизации, в том числе можно скачивать посты. Однако на практике Instagram блокирует клиента после определенного числа запросов, и в случае работы без авторизации этот лимит достигается гораздо быстрее, в особенности, если ваш IP-адрес уже был «засвечен» или провайдер использует NAT – запросы будут суммироваться, иногда не удается получить вообще ничего. Поэтому лучше авторизоваться сразу, но можно работать в «анонимном» режиме до получения исключения LoginRequired и залогиниться уже после него. Я бы не рекомендовал использовать данные своего личного аккаунта ввиду риска его блокировки. Для поиска по хештегам будет используем следующую функцию. Функционал библиотеки предусматривает вывод как топ-постов, так и всех подряд. Контент в постах бывает трех типов – фото, видео и «карусель» — несколько медиафайлов двух предыдущих типов. Видео мы анализировать не будем, поэтому исключаем такие посты из рассмотрения.

def GetPostsFromHashtag(hashtag_name:str, max_count:int, top_posts:bool=False):
    """
    Функция загружает заданное количество постов по хештегу
    """
        
    hashtag = instaloader.Hashtag.from_name(bot.context, hashtag_name)
    
    if top_posts:
        posts = hashtag.get_top_posts()
    else:
        posts = hashtag.get_posts()
    while max_count > 0:
        clear_output(wait=True)
        max_count -= 1
        post = next(posts)
        if post.typename != 'GraphVideo':
            bot.download_post(post, pathlib.Path(f'{media_folder}/{post.owner_username}'))
        else:
            continue   

Теперь среди загруженных выберем публикации, в тексте которых присутствуют номера платежных карт и (опционально) номера телефонов. Для этого используются регулярные выражения – номера карт состоят из 16 цифр, а телефоны – из 8, при этом опытным путем выявлено, что авторы указывают их в формате хештегов, предваряя символом #. Это тоже учтем при написании выражения.

Из выбранных постов извлекается их текст, полный URL и уникальный идентификатор поста (shortcode), который понадобится нам на следующем шаге. Эти коды легко получить, поскольку на этапе скачивания библиотека назначает их как имя папки, в которые помещается содержимое соответствующего поста (этот параметр настраиваемый, shortcode установлен по умолчанию).

Также для внутренних целей нам были нужны временные метки с фотографий, они берутся из json-файла, который скачивается библиотекой в виде архива xz (архивирование можно отключить)

Следующая функция ответственна за описанную обработку данных:

this_id = 1
for profile in os.listdir(media_folder):
    profile_dir = os.path.join(media_folder, profile)
    for file in os.listdir(profile_dir):
        if pathlib.Path(file).suffix == '.txt':
            full_file_path = os.path.join(profile_dir, file)
            with open(full_file_path) as current_file:
                post_text = current_file.read()
                cards=re.findall('\d{16}', post_text)
                phones = re.findall('#8\d{10}', post_text)
            json_filename = os.path.join(profile_dir, pathlib.Path(file).stem+'.json.xz')
            if os.path.isfile(json_filename):
                with lzma.open(json_filename) as f:
                    json_file = json.load(f)
                    time = datetime.datetime.utcfromtimestamp(json_file['node']['taken_at_timestamp']).strftime('%Y-%m-%d %H:%M:%S')
            if cards and cards not in data['cards']:
                data['id'].append(this_id)
                data['post_id'].append(pathlib.Path(file).stem)
                data['post_link'].append('https://instagram.com/p/'+pathlib.Path(file).stem)
                data['full_text'].append(post_text)
                data['time'].append(time)
                if len(cards) == 1:
                    data['cards'].append(*cards)
                else:
                    data['cards'].append(tuple(cards))
                if phones:
                    data['phones'].append(phones)
                else:
                    data['phones'].append('None')
                this_id += 1

Теперь изменим параметры бота, чтобы скачать прикрепленные изображения. Мы не сделали этого сразу, чтобы запрашивать такие данные только для нужных постов и таким образом минимизировать число запросов к серверу, снизить вероятность блокировки и число таймаутов. К слову, библиотека при получении от сервера сообщения о превышении допустимого числа запросов самостоятельно определяет необходимый период ожидания, после чего возобновляет работу. Дополнительных действий от пользователя не требуется.

bot.download_pictures = True
bot.download_comments = False
bot.save_metadata = False

Получение изображений выглядит так: на предыдущем шаге в словарь записывался в том числе id поста (shortcode). Преобразуем словарь в pandas dataframe с последующим удалением дубликатов. Этот датафрейм также можно легко экспортировать в Excel.

df = pd.DataFrame(data)
df = df.drop_duplicates(subset='cards').reset_index(drop=True)

Теперь скачаем медиафайлы постов используя еще одну возможность библиотеки – получение поста по его короткому коду.

for idx, post_id in enumerate(df['post_id'], 1):
    this_post = instaloader.Post.from_shortcode(bot.context, post_id)
    bot.download_post(this_post, pathlib.Path(f'selected_media/{idx}'))

Отфильтруем текстовые файлы – они нам не нужны

for profile in os.listdir('selected_media'):
    profile_dir = os.path.join('selected_media', profile)
    for file in os.listdir(profile_dir):
        if pathlib.Path(file).suffix == '.txt':
            os.remove(os.path.join('selected_media', profile, file))

Получим список всех файлов в папке с изображениями:

def getListOfFiles(dirName):
    # create a list of file and sub directories 
    # names in the given directory 
    listOfFile = os.listdir(dirName)
    allFiles = list()
    # Iterate over all the entries
    for entry in listOfFile:
        # Create full path
        fullPath = os.path.join(dirName, entry)
        # If entry is a directory then get the list of files in this directory 
        if os.path.isdir(fullPath):
            allFiles = allFiles + getListOfFiles(fullPath)
        else:
            allFiles.append(fullPath)
                
    return sorted(allFiles, key=lambda item: int(item.split('/')[-2]))

Теперь запускаем распознавание текста. Для этого используются библиотеки Tesseract и OpenCV.

ocr_results = defaultdict(list)

for img in files:
  this_idx = int(img.split('/')[-2])
  this_img = cv2.imread(img)
  this_gray_img = cv2.cvtColor(this_img, cv2.COLOR_BGR2GRAY)
  tmp_file = 'tmp.png'
  cv2.imwrite(tmp_file, this_gray_img)
  this_txt = pytesseract.image_to_string(Image.open(tmp_file), lang='rus')
  ocr_results[this_idx].append(this_txt)
  os.remove(tmp_file)

Для облегчения преобразования в pdndas dataframe, и как следствие более логичной структуры экспортированного файла Excel списки результатов распознавания теста каждого из постов должны иметь одинаковую длину. Для этого вычислим максимальное количество изображений в посте и сделаем паддинг значениями-заглушками у всех остальных постов.

max_len = max(len(value) for value in ocr_results.values())
ocr_dict = defaultdict(lambda: [np.nan]*max_len)
for key, value in ocr_results.items():
  if not value:
    ocr_dict[key]
  else:
    for idx, text in enumerate(value):
      ocr_dict[key][idx] = text

Используя регулярные выражения, уберем спецсимволы из текстов и затем экспортируем в Excel

ocr_processed = {key: ['None' if subitem is np.nan else re.sub(r'\W+', ' ', subitem) for subitem in value] for key, value in ocr_dict.items()}
df = pd.DataFrame(ocr_processed).T
df.to_excel("insta_ocr.xlsx")

На выходе получаем сводный файл – таблицу из потенциально мошеннических постов и соответствующих им банковских карт и номеров телефонов. Аналитики банка могут отобрать эмитированные им карты, к примеру, по БИН, и проверить транзакции.

Стоит отметить, что библиотека умеет определять, какие посты уже скачивались ранее, поэтому алгоритм можно запускать по прошествии некоторого времени, чтобы выявить новые публикации.

Советуем почитать